第一个Celery工程

来自CloudWiki
跳转至: 导航搜索

上节的第一个Celery程序非常简单,实际的项目开发应该是模块化的,程序的功能分散在多个文件中,Celery也不例外。下面扩展第一个Celery程序。

新建myCeleryProj目录,并在myCeleryProj目录中新建__init__.py、app.py、settings.py、tasks.py文件。其中__init__.py文件保持为空即可,其作用是把目录myCeleryProj作为一个包让Python程序导入。

第一个工程项目

(目录myCeleryProj)。

app.py

app.py是celery worker的入口,如下所示。

  from __future__ import absolute_import

from celery import Celery

app = Celery("myCeleryProj", include=["myCeleryProj.tasks"])
#app = Celery("myCeleryProj", include=["tasks"])
app.config_from_object("myCeleryProj.settings")


if __name__ == "__main__":
   app.start()

tasks.py

task.py主要存放具体执行的任务,如下所示:

import os
from myCeleryProj.app import app
import time
import socket

def get_host_ip():
    """
    查询本机ip地址
    :return: ip
    """
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(("8.8.8.8", 80))
        ip = s.getsockname()[0]
    finally:
        s.close()
    return ip


@app.task
def add(x, y):
    time.sleep(3)  # 模拟耗时操作
    s = x + y
    print("主机IP {}: x + y = {}".format(get_host_ip(), s))
    return s


@app.task
def taskA():
    print("taskA")
    time.sleep(3)


@app.task
def taskB():
    print("taskB")
    time.sleep(3)


settings.py

settings.py存放配置信息,如下所示。

BROKER_URL = 'redis://127.0.0.1:6379/0'#使用redis 作为消息代理

CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 任务结果存在Redis

CELERY_RESULT_SERIALIZER = 'json' # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间,不建议直接写86400,应该让这样的magic数字表述更明显


运行工程项目

下面运行工程项目,在myCeleryProj的同级目录下执行如下命令。

celery -A myCeleryProj.app worker -c 3 --loglevel=info

-c 3表示启用三个子进程执行该队列中的任务。

[tasks]
  . myCeleryProj.tasks.add
  . myCeleryProj.tasks.taskA
  . myCeleryProj.tasks.taskB

[2020-05-31 12:33:41,965: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0
[2020-05-31 12:33:41,985: INFO/MainProcess] mingle: searching for neighbors
[2020-05-31 12:33:43,032: INFO/MainProcess] mingle: all alone
[2020-05-31 12:33:43,053: INFO/MainProcess] celery@localhost.localdomain ready.

异步调用

现在我们已经启动了worker,从运行的打印输出可以看到有三个任务:myCeleryProj.tasks.add、myCeleryProj.tasks.taskA和myCeleryProj.tasks.taskB。接下来手动执行异步调用。


[root@localhost op]# python3

Python 3.7.5 (default, May  8 2020, 10:08:52)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from myCeleryProj.tasks import *
>>> add.delay(5,6);taskA.delay();taskB.delay()
<AsyncResult: 428341b4-a97e-44b4-9d23-699f7258d56c>
<AsyncResult: 34c65940-81a5-4e29-a3f2-d37c871c5bfd>
<AsyncResult: 26019bf6-b3e5-4024-8e27-f004b74a9a89>

这里dd.delay(5,6);taskA.delay();taskB.delay()写在一行是在于同时发出异步执行的命令。worker界面新增的信息如下:

[2020-06-01 11:15:35,487: INFO/MainProcess] Received task: myCeleryProj.tasks.add[efe9a3ed-ed1f-49cf-b8bd-b8a3d6f5bc35]
[2020-06-01 11:15:35,492: INFO/MainProcess] Received task: myCeleryProj.tasks.taskA[e9b3cbac-d4f7-41ce-b496-cda06a25bff8]
[2020-06-01 11:15:35,493: INFO/MainProcess] Received task: myCeleryProj.tasks.taskB[e924b8b8-e9f7-4650-91ee-c1dfec6cef6b]
[2020-06-01 11:15:35,495: WARNING/ForkPoolWorker-1] taskA
[2020-06-01 11:15:35,495: WARNING/ForkPoolWorker-3] taskB
[2020-06-01 11:15:38,494: WARNING/ForkPoolWorker-2] 主机IP 10.0.0.30: x + y = 11
[2020-06-01 11:15:38,503: INFO/ForkPoolWorker-1] Task myCeleryProj.tasks.taskA[e9b3cbac-d4f7-41ce-b496-cda06a25bff8] succeeded in 3.008365719986614s: None
[2020-06-01 11:15:38,503: INFO/ForkPoolWorker-2] Task myCeleryProj.tasks.add[efe9a3ed-ed1f-49cf-b8bd-b8a3d6f5bc35] succeeded in 3.0143131380027626s: 11
[2020-06-01 11:15:38,506: INFO/ForkPoolWorker-3] Task myCeleryProj.tasks.taskB[e924b8b8-e9f7-4650-91ee-c1dfec6cef6b] succeeded in 3.011205949005671s: None

从worker界面新增的信息中可以看出,worker在20:53:58同时收到了三个任务,由于并发数是3,且三个任务都执行了等待3秒的模拟耗时操作,因此它们都在20:54:01打印了相应的信息并退出。读者可以将并发数设置为1再试验一下运行结果。

调用task的方法有以下三种。