Celery队列
来自CloudWiki
Celery 队列
Celery非常容易设置和运行,它通常会使用默认名为Celery的队列(可以通过CELERY_DEFAULT_QUEUE修改)来存放任务。Celery支持同时运行多个队列,还可以使用优先级不同的队列来确保高优先级的任务不需要等待就立即得到响应。
基于9.5节使用的工程源代码,我们来实现不同的队列来执行不同的任务:使任务add在队列default中运行;taskA在队列task_A中运行;taskB在队列task_B中运行。
【示例9-4】定义三个队列,并将任务自动的分配到相应的队列中(myCeleryProj_2)。
定义任务队列
settings.py:
from kombu import Queue from kombu import Queue CELERY_QUEUES = ( # 定义任务队列 Queue("default", routing_key="task.#"), # 路由键以“task.”开头的消息都进default队列 Queue("tasks_A", routing_key="A.#"), # 路由键以“A.”开头的消息都进tasks_A队列 Queue("tasks_B", routing_key="B.#"), # 路由键以“B.”开头的消息都进tasks_B队列 ) CELERY_ROUTES = ( [ ("myCeleryProj.tasks.add", {"queue": "default"}), # 将add任务分配至队列 default ("myCeleryProj.tasks.taskA", {"queue": "tasks_A"}),# 将taskA任务分配至队列 tasks_A ("myCeleryProj.tasks.taskB", {"queue": "tasks_B"}),# 将taskB任务分配至队列 tasks_B ], ) BROKER_URL = 'redis://127.0.0.1:6379/0'#使用redis 作为消息代理 CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 任务结果存在Redis CELERY_RESULT_SERIALIZER = 'json' # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间,不建议直接写86400,应该让这样的magic数字表述更明显
然后开启三个终端窗口,分别启动三个队列的worker,执行以下命令。
myCeleryProj.app worker -Q default --loglevel=info celery -A myCeleryProj.app worker -Q tasks_A --loglevel=info celery -A myCeleryProj.app worker -Q tasks_B --loglevel=info
最后 开启一个窗口来调用task:
>>> taskA.apply_async( queue='task_A') <AsyncResult: 106d4cc7-9e55-4b14-b172-8fc4db202c50> >>> taskB.apply_async( queue='task_B') <AsyncResult: 24684cb9-424f-451b-86ce-cb1527315778> >>> add.apply_async((2, 2), queue='default') <AsyncRe
三个队列的输出:
[2020-06-01 12:32:06,696: WARNING/ForkPoolWorker-2] 主机IP 10.0.0.30: x + y = 4 [2020-06-01 12:32:06,697: INFO/ForkPoolWorker-2] Task myCeleryProj.tasks.add[9e0fd7ab-2b80-46cf-8ba9-1c68aee74044] succeeded in 3.0028064170037396s: 4
[2020-06-01 12:28:52,994: WARNING/ForkPoolWorker-2] taskA [2020-06-01 12:28:56,002: INFO/ForkPoolWorker-2] Task myCeleryProj.tasks.taskA[a9921170-57a5-48f5-9d3b-5e21458ad57d] succeeded in 3.008730553003261s: None [2020-06-01 12:31:23,276: INFO/MainProcess] Received task: myCeleryProj.tasks.taskA[106d4cc7-9e55-4b14-b172-8fc4db202c50]
[2020-06-01 12:31:36,647: WARNING/ForkPoolWorker-2] taskB [2020-06-01 12:31:39,653: INFO/ForkPoolWorker-2] Task myCeleryProj.tasks.taskB[24684cb9-424f-451b-86ce-cb1527315778] succeeded in 3.006150394998258s: None