Celery队列

来自CloudWiki
跳转至: 导航搜索

Celery 队列

Celery非常容易设置和运行,它通常会使用默认名为Celery的队列(可以通过CELERY_DEFAULT_QUEUE修改)来存放任务。Celery支持同时运行多个队列,还可以使用优先级不同的队列来确保高优先级的任务不需要等待就立即得到响应。

基于9.5节使用的工程源代码,我们来实现不同的队列来执行不同的任务:使任务add在队列default中运行;taskA在队列task_A中运行;taskB在队列task_B中运行。

【示例9-4】定义三个队列,并将任务自动的分配到相应的队列中(myCeleryProj_2)。

定义任务队列

settings.py:

from kombu import Queue
from kombu import Queue

CELERY_QUEUES = (  # 定义任务队列
    Queue("default", routing_key="task.#"),  # 路由键以“task.”开头的消息都进default队列
    Queue("tasks_A", routing_key="A.#"),  # 路由键以“A.”开头的消息都进tasks_A队列
    Queue("tasks_B", routing_key="B.#"),  # 路由键以“B.”开头的消息都进tasks_B队列
)

CELERY_ROUTES = (
     [
         ("myCeleryProj.tasks.add", {"queue": "default"}), # 将add任务分配至队列 default
         ("myCeleryProj.tasks.taskA", {"queue": "tasks_A"}),# 将taskA任务分配至队列 tasks_A
         ("myCeleryProj.tasks.taskB", {"queue": "tasks_B"}),# 将taskB任务分配至队列 tasks_B
     ],
 )



BROKER_URL = 'redis://127.0.0.1:6379/0'#使用redis 作为消息代理

CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 任务结果存在Redis

CELERY_RESULT_SERIALIZER = 'json' # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间,不建议直接写86400,应该让这样的magic数字表述更明显

然后开启三个终端窗口,分别启动三个队列的worker,执行以下命令。

 myCeleryProj.app worker -Q default  --loglevel=info
celery -A myCeleryProj.app worker -Q tasks_A  --loglevel=info
celery -A myCeleryProj.app worker -Q tasks_B  --loglevel=info

最后 开启一个窗口来调用task:

>>> taskA.apply_async( queue='task_A')
<AsyncResult: 106d4cc7-9e55-4b14-b172-8fc4db202c50>
>>> taskB.apply_async( queue='task_B')
<AsyncResult: 24684cb9-424f-451b-86ce-cb1527315778>
>>> add.apply_async((2, 2), queue='default')
<AsyncRe

三个队列的输出:

[2020-06-01 12:32:06,696: WARNING/ForkPoolWorker-2] 主机IP 10.0.0.30: x + y = 4
[2020-06-01 12:32:06,697: INFO/ForkPoolWorker-2] Task myCeleryProj.tasks.add[9e0fd7ab-2b80-46cf-8ba9-1c68aee74044] succeeded in 3.0028064170037396s: 4

[2020-06-01 12:28:52,994: WARNING/ForkPoolWorker-2] taskA
[2020-06-01 12:28:56,002: INFO/ForkPoolWorker-2] Task myCeleryProj.tasks.taskA[a9921170-57a5-48f5-9d3b-5e21458ad57d] succeeded in 3.008730553003261s: None
[2020-06-01 12:31:23,276: INFO/MainProcess] Received task: myCeleryProj.tasks.taskA[106d4cc7-9e55-4b14-b172-8fc4db202c50]

[2020-06-01 12:31:36,647: WARNING/ForkPoolWorker-2] taskB
[2020-06-01 12:31:39,653: INFO/ForkPoolWorker-2] Task myCeleryProj.tasks.taskB[24684cb9-424f-451b-86ce-cb1527315778] succeeded in 3.006150394998258s: None