Flask+Celery
前言
最近在用自己之前写的那个pdscan的时候,感觉到略有不适,时不时会出现比较诡异的bug,同时也缺少一堆自己想要的功能,于是考虑花几天修一修bug,加一点功能
当时写pdscan的时候没学过什么叫消息任务队列,也懒得搞,所以就自己用@app.before_first_request
和多线程弄了个后台任务,现在来看有点简陋了
打算这次加功能的时候,顺道把自己写的这东西换掉,别放在github上丢人了
什么是celery
Celery是一个基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理
Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)
具体来说,就是当程序产生异步任务需求时,通过Celery将异步任务发送至broker暂存,同时worker从broker中领取属于自己的任务并进行处理和结果存储,从而避免时间过长的异步任务对主任务产生影响
用于暂存任务队列的broker可以使RabbitMQ,也可以是redis
安装和示例
Redis安装略
安装Celery和Redis:
1 | pip install celery redis |
启动redis
1 | redis-server |
以下是一个例子
1 | # test.py |
启动Celery worker
1 | python3 -m celery -A test worker --loglevel=info |
这将启动Celery worker,使其准备好接收和执行任务。
向broker添加任务,可以看到直接就返回了结果,没有等待sleep和输出
在看celery的页面
状态
Celery 可以跟踪任务当前的状态信息。状态信息包含成功任务的结果,或执行失败任务的异常信息。
在任务的执行周期中,可能会有几种状态的变换,每次变换都会附加当前状态信息。当任务进入下一个状态时,上一个状态的信息会被移除,但可以进行推断任务的状态信息(例如,一个任务处于执行 FAILED
状态,则表示在某个时刻是处于 STARTED
状态的)。
如果需要跟踪任务信息、状态或返回值,需要提供一个 Celery 存储的结果后端,便于检索。有几个内置的结果后端可以考虑使用:SQLAlchemy/Django ORM、Memcached、RabbitMQ/QPid(rpc) 和 Redis,也可以自定义后端。
内置状态
PENDING
- 任务正在等待执行或未知。任何未知的任务 ID 都默认处于挂起状态。
STARTED
- 任务已经开始。默认情况下不会记录,需要启用,请参阅
app.Task.track_started.
。 - meta-data:正在执行任务的职程(Worker) pid 和主机名。
- 任务已经开始。默认情况下不会记录,需要启用,请参阅
SUCCESS
- 任务执行成功。
- meta-data:任务结果返回值 propagates:Yes ready: Yes
FAILURE
- 任务执行失败。
- meta-data:执行异常时的任务信息,其中 traceback 包含引发错误的堆栈信息。 propagates:Yes
RETRY
- 任务处于重试状态。
- meta-data:结果信息包含导致重试的异常信息,traceback 包含引发异常时堆栈的回溯。 propagates:No
REVOKED
- 任务被撤销。
- propagates:Yes
示例
还是用上面的那个源码,但是在创建celery实例时,要开启存储结果后端
1 | # 创建 Celery 实例 |
使用delay()方法会返回任务对象,调用任务对象的.state
属性即可看到当前任务的任务状态
状态控制
终止未开始的特定任务:
1 | from celery.result import AsyncResult |
终止已经开始的任务,但不会结束其启动的进程
1 | from celery.result import AsyncResult |
例如:
1 |
|
终止指令下达后,还是能看到sleep的进程
定时任务
celery beat
是一个调度程序;它定期启动任务,然后由集群中的可用节点执行任务。
默认情况下会从配置中的 beat_schedule
项中获取条目(entries),但是也可以使用自定义存储,例如将entries存储在SQL数据库中。
应确保一次只运行一个调度程序来执行一个调度程序,否则最终将导致重复的任务。使用集中式方法意味着时间表不必同步,并且该服务可以在不使用锁的情况下运行。
在pdscan中,需要使用定时任务来检查是否有新扫描任务下达,例如,每10秒检查一次数据库中是否存在waiting的任务
1 | from celery import Celery |