第一个Celery工程
上节的第一个Celery程序非常简单,实际的项目开发应该是模块化的,程序的功能分散在多个文件中,Celery也不例外。下面扩展第一个Celery程序。
新建myCeleryProj目录,并在myCeleryProj目录中新建__init__.py、app.py、settings.py、tasks.py文件。其中__init__.py文件保持为空即可,其作用是把目录myCeleryProj作为一个包让Python程序导入。
第一个工程项目
(目录myCeleryProj)。
app.py
app.py是celery worker的入口,如下所示。
from __future__ import absolute_import from celery import Celery app = Celery("myCeleryProj", include=["myCeleryProj.tasks"]) #app = Celery("myCeleryProj", include=["tasks"]) app.config_from_object("myCeleryProj.settings") if __name__ == "__main__": app.start()
tasks.py
task.py主要存放具体执行的任务,如下所示:
import os from myCeleryProj.app import app import time import socket def get_host_ip(): """ 查询本机ip地址 :return: ip """ try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) ip = s.getsockname()[0] finally: s.close() return ip @app.task def add(x, y): time.sleep(3) # 模拟耗时操作 s = x + y print("主机IP {}: x + y = {}".format(get_host_ip(), s)) return s @app.task def taskA(): print("taskA") time.sleep(3) @app.task def taskB(): print("taskB") time.sleep(3)
settings.py
settings.py存放配置信息,如下所示。
BROKER_URL = 'redis://127.0.0.1:6379/0'#使用redis 作为消息代理 CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 任务结果存在Redis CELERY_RESULT_SERIALIZER = 'json' # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间,不建议直接写86400,应该让这样的magic数字表述更明显
运行工程项目
下面运行工程项目,在myCeleryProj的同级目录下执行如下命令。
celery -A myCeleryProj.app worker -c 3 --loglevel=info
-c 3表示启用三个子进程执行该队列中的任务。
[tasks] . myCeleryProj.tasks.add . myCeleryProj.tasks.taskA . myCeleryProj.tasks.taskB [2020-05-31 12:33:41,965: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0 [2020-05-31 12:33:41,985: INFO/MainProcess] mingle: searching for neighbors [2020-05-31 12:33:43,032: INFO/MainProcess] mingle: all alone [2020-05-31 12:33:43,053: INFO/MainProcess] celery@localhost.localdomain ready.
异步调用
现在我们已经启动了worker,从运行的打印输出可以看到有三个任务:myCeleryProj.tasks.add、myCeleryProj.tasks.taskA和myCeleryProj.tasks.taskB。接下来手动执行异步调用。
[root@localhost op]# python3
Python 3.7.5 (default, May 8 2020, 10:08:52) [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux Type "help", "copyright", "credits" or "license" for more information. >>> from myCeleryProj.tasks import * >>> add.delay(5,6);taskA.delay();taskB.delay() <AsyncResult: 428341b4-a97e-44b4-9d23-699f7258d56c> <AsyncResult: 34c65940-81a5-4e29-a3f2-d37c871c5bfd> <AsyncResult: 26019bf6-b3e5-4024-8e27-f004b74a9a89>
这里dd.delay(5,6);taskA.delay();taskB.delay()写在一行是在于同时发出异步执行的命令。worker界面新增的信息如下:
[2020-06-01 11:15:35,487: INFO/MainProcess] Received task: myCeleryProj.tasks.add[efe9a3ed-ed1f-49cf-b8bd-b8a3d6f5bc35] [2020-06-01 11:15:35,492: INFO/MainProcess] Received task: myCeleryProj.tasks.taskA[e9b3cbac-d4f7-41ce-b496-cda06a25bff8] [2020-06-01 11:15:35,493: INFO/MainProcess] Received task: myCeleryProj.tasks.taskB[e924b8b8-e9f7-4650-91ee-c1dfec6cef6b] [2020-06-01 11:15:35,495: WARNING/ForkPoolWorker-1] taskA [2020-06-01 11:15:35,495: WARNING/ForkPoolWorker-3] taskB [2020-06-01 11:15:38,494: WARNING/ForkPoolWorker-2] 主机IP 10.0.0.30: x + y = 11 [2020-06-01 11:15:38,503: INFO/ForkPoolWorker-1] Task myCeleryProj.tasks.taskA[e9b3cbac-d4f7-41ce-b496-cda06a25bff8] succeeded in 3.008365719986614s: None [2020-06-01 11:15:38,503: INFO/ForkPoolWorker-2] Task myCeleryProj.tasks.add[efe9a3ed-ed1f-49cf-b8bd-b8a3d6f5bc35] succeeded in 3.0143131380027626s: 11 [2020-06-01 11:15:38,506: INFO/ForkPoolWorker-3] Task myCeleryProj.tasks.taskB[e924b8b8-e9f7-4650-91ee-c1dfec6cef6b] succeeded in 3.011205949005671s: None
从worker界面新增的信息中可以看出,worker在20:53:58同时收到了三个任务,由于并发数是3,且三个任务都执行了等待3秒的模拟耗时操作,因此它们都在20:54:01打印了相应的信息并退出。读者可以将并发数设置为1再试验一下运行结果。
调用task的方法有以下三种。