前言

最近在用自己之前写的那个pdscan的时候,感觉到略有不适,时不时会出现比较诡异的bug,同时也缺少一堆自己想要的功能,于是考虑花几天修一修bug,加一点功能

当时写pdscan的时候没学过什么叫消息任务队列,也懒得搞,所以就自己用@app.before_first_request和多线程弄了个后台任务,现在来看有点简陋了

打算这次加功能的时候,顺道把自己写的这东西换掉,别放在github上丢人了

什么是celery

Celery是一个基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理

Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)

具体来说,就是当程序产生异步任务需求时,通过Celery将异步任务发送至broker暂存,同时worker从broker中领取属于自己的任务并进行处理和结果存储,从而避免时间过长的异步任务对主任务产生影响

用于暂存任务队列的broker可以使RabbitMQ,也可以是redis

这里写图片描述

安装和示例

Redis安装略

安装Celery和Redis:

1
pip install celery redis

启动redis

1
redis-server

以下是一个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# test.py

from celery import Celery
from datetime import datetime, timedelta
import time
from hashlib import md5

# 创建 Celery 实例
app = Celery('testTask', broker='redis://localhost:6379/0')

# 定义任务
@app.task
def my_task1():
# 生成任务md5
task_id = md5(str(time.time()).encode('utf-8')).hexdigest()

# 创建task_id为名称的文件,内容为当前时间
print(f'定时任务1:{task_id} 执行于:', datetime.now())
time.sleep(7)
print(f'定时任务1:{task_id} 结束于:', datetime.now())

# 定义任务
@app.task
def my_task2():
# 生成任务md5
task_id = md5(str(time.time()).encode('utf-8')).hexdigest()

# 在此处定义你的任务逻辑
print(f'定时任务2:{task_id} 执行于:', datetime.now())
time.sleep(9)
print(f'定时任务2:{task_id} 结束于:', datetime.now())

启动Celery worker

1
python3 -m celery -A test worker --loglevel=info

这将启动Celery worker,使其准备好接收和执行任务。

image-20230522195950762

向broker添加任务,可以看到直接就返回了结果,没有等待sleep和输出

image-20230522200726716

在看celery的页面

image-20230522200822739

状态

Celery 可以跟踪任务当前的状态信息。状态信息包含成功任务的结果,或执行失败任务的异常信息。

在任务的执行周期中,可能会有几种状态的变换,每次变换都会附加当前状态信息。当任务进入下一个状态时,上一个状态的信息会被移除,但可以进行推断任务的状态信息(例如,一个任务处于执行 FAILED 状态,则表示在某个时刻是处于 STARTED 状态的)。

如果需要跟踪任务信息、状态或返回值,需要提供一个 Celery 存储的结果后端,便于检索。有几个内置的结果后端可以考虑使用:SQLAlchemy/Django ORM、Memcached、RabbitMQ/QPid(rpc) 和 Redis,也可以自定义后端。

内置状态

  • PENDING

    • 任务正在等待执行或未知。任何未知的任务 ID 都默认处于挂起状态。
  • STARTED

    • 任务已经开始。默认情况下不会记录,需要启用,请参阅 app.Task.track_started.
    • meta-data:正在执行任务的职程(Worker) pid 和主机名。
  • SUCCESS

    • 任务执行成功。
    • meta-data:任务结果返回值 propagates:Yes ready: Yes
  • FAILURE

    • 任务执行失败。
    • meta-data:执行异常时的任务信息,其中 traceback 包含引发错误的堆栈信息。 propagates:Yes
  • RETRY

    • 任务处于重试状态。
    • meta-data:结果信息包含导致重试的异常信息,traceback 包含引发异常时堆栈的回溯。 propagates:No
  • REVOKED

    • 任务被撤销。
    • propagates:Yes

示例

还是用上面的那个源码,但是在创建celery实例时,要开启存储结果后端

1
2
# 创建 Celery 实例
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')

使用delay()方法会返回任务对象,调用任务对象的.state属性即可看到当前任务的任务状态

image-20230522202227723

状态控制

终止未开始的特定任务:

1
2
from celery.result import AsyncResult
AsyncResult(c.task_id).revoke()

终止已经开始的任务,但不会结束其启动的进程

1
2
from celery.result import AsyncResult
AsyncResult(c.task_id).revoke(terminate=True)

例如:

1
2
3
4
5
6
7
8
9
@app.task
def my_task1():
# 生成任务md5
task_id = md5(str(time.time()).encode('utf-8')).hexdigest()

# 创建task_id为名称的文件,内容为当前时间
print(f'定时任务1:{task_id}执行于:', datetime.now())
os.system("sleep 10")
print(f'定时任务1:{task_id}结束于:', datetime.now())

终止指令下达后,还是能看到sleep的进程

定时任务

celery beat 是一个调度程序;它定期启动任务,然后由集群中的可用节点执行任务。

默认情况下会从配置中的 beat_schedule 项中获取条目(entries),但是也可以使用自定义存储,例如将entries存储在SQL数据库中。

应确保一次只运行一个调度程序来执行一个调度程序,否则最终将导致重复的任务。使用集中式方法意味着时间表不必同步,并且该服务可以在不使用锁的情况下运行。

在pdscan中,需要使用定时任务来检查是否有新扫描任务下达,例如,每10秒检查一次数据库中是否存在waiting的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from celery import Celery
from datetime import datetime, timedelta
import time
from hashlib import md5

# 创建 Celery 实例
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')

# 定义任务
@app.task
def my_task1():
# 生成任务md5
task_id = md5(str(time.time()).encode('utf-8')).hexdigest()

# 创建task_id为名称的文件,内容为当前时间
print(f'定时任务1:{task_id}执行于:', datetime.now())
time.sleep(7)
print(f'定时任务1:{task_id}结束于:', datetime.now())

# 定义任务
@app.task
def my_task2():
# 生成任务md5
task_id = md5(str(time.time()).encode('utf-8')).hexdigest()

# 在此处定义你的任务逻辑
print(f'定时任务2:{task_id}执行于:', datetime.now())
time.sleep(9)
print(f'定时任务2:{task_id}结束于:', datetime.now())

# 定义定时任务
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# 每隔 5 秒执行一次任务
sender.add_periodic_task(5.0, my_task1.s(), name='任务1')
# 每隔 5 秒执行一次任务
sender.add_periodic_task(5.0, my_task2.s(), name='任务2')


# 启动 Celery 定时任务
if __name__ == '__main__':
app.start()