第一个Celery程序

来自CloudWiki
跳转至: 导航搜索

启动消息中间件(Redis)

Redis配置和启动

按Ctrl+ C 退出 Redis 启动界面,

[root@localhost src]# ps -ef |grep redis

root      17931      1  0 09:51 ?        00:00:12 ./redis-server *:6379
root      18679  18288  0 11:49 pts/2    00:00:00 grep --color=auto redis

[root@localhost src]# kill -9 17931

然后开始修改Redis配置文件redis.conf,修改bind = 127.0.0.0.1为bind =0.0.0.0,意思是允许远程访问Redis数据库。

redis-4.0.11/redis.conf

bind 0.0.0.0

接下来启动Redis

cd src

./redis-server ../redis.conf


python配置redis模块

[root@localhost op]# pip3 install redis

[root@localhost op]# python3

Python 3.7.5 (default, May  8 2020, 10:08:52)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import redis
>>> r = redis.Redis(host='localhost',port=6379,db=0)
>>> r.set('foo','bar')
True
>>> r.get('foo')
b'bar'
>>> exit()

启动任务执行模块

第一个Celery程序

my_first_celery.py:

#encoding=utf-8

from celery import Celery
import time
import socket

app = Celery('tasks', broker='redis://127.0.0.1:6379/0',backend ='redis://127.0.0.1:6379/0' )

def get_host_ip():
    """
    查询本机ip地址
    :return: ip
    """
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(('8.8.8.8', 80))
        ip = s.getsockname()[0]
    finally:
        s.close()
    return ip


@app.task
def add(x, y):
    time.sleep(3) # 模拟耗时操作
    s = x + y
    print("主机IP {}: x + y = {}".format(get_host_ip(),s))
    return s



代码说明:第7行指定了中间人broker为本机的Redis数据库0,结果后端同样使用Redis;第9~20行定义了一个获取本机IP地址的函数,为后序分布式队列做铺垫;第23行到最后定义了一个模拟虚耗时的任务函数,使用app.task来装饰。


运行这个python程序,

celery -A my_first_celery worker --loglevel=info

/usr/local/Python3/lib/python3.7/site-packages/celery/platforms.py:801: RuntimeWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!

Please specify a different user using the --uid option.

User information: uid=0 euid=0 gid=0 egid=0

  uid=uid, euid=euid, gid=gid, egid=egid,

 -------------- celery@localhost.localdomain v4.4.2 (cliffs)
--- ***** -----
-- ******* ---- Linux-3.10.0-1062.el7.x86_64-x86_64-with-centos-7.7.1908-Core 2020-05-31 11:17:26
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x7f920e95a1d0
- ** ---------- .> transport:   redis://127.0.0.1:6379/0
- ** ---------- .> results:     redis://127.0.0.1:6379/0
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . my_first_celery.add

[2020-05-31 11:17:26,645: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0
[2020-05-31 11:17:26,650: INFO/MainProcess] mingle: searching for neighbors
[2020-05-31 11:17:27,665: INFO/MainProcess] mingle: all alone
[2020-05-31 11:17:27,677: INFO/MainProcess] celery@localhost.localdomain ready.

调用任务函数

start_task.py

from my_first_celery import add
import time
result = add.delay(12,12) #异步调用,这一步不会阻塞,程序会立即往下运行

while not result.ready(): #循环检查任务是否执行完毕
    print(time.strftime("%H:%M:%S"))
    time.sleep(1)

print(result.get())#获取任务的返回结果
print(result.successful())#判断任务是否成功执行

注意:调用任务函数时需保证前面的Worker函数已开启。

[root@localhost op]# python3 start_task.py

12:08:38
12:08:39
12:08:40
24
True

等待几秒钟,在my_first_celery界面出现了如下输出:

[2020-05-31 12:08:41,313: WARNING/ForkPoolWorker-2] 主机IP 10.0.0.30: x + y = 24
[2020-05-31 12:08:41,315: INFO/ForkPoolWorker-2] Task my_first_celery.add[f9410076-d86f-49b4-9ad2-0724a58dfc37] succeeded in 3.006598425010452s: 24

查看任务存储结果

其中f9410076-d86f-49b4-9ad2-0724a58dfc37是taskid,只要指定了backend,根据这个taskid就可以随时去backend查找运行结果。使用方法如下:

[root@localhost op]# python3

Python 3.7.5 (default, May  8 2020, 10:08:52)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from my_first_celery import add
>>> taskid='f9410076-d86f-49b4-9ad2-0724a58dfc37'
>>> add.AsyncResult(taskid).get()
24

或者

>>> from celery.result import AsyncResult
>>> AsyncResult(taskid).get()
24