第一个Celery程序
目录
启动消息中间件(Redis)
Redis配置和启动
按Ctrl+ C 退出 Redis 启动界面,
[root@localhost src]# ps -ef |grep redis
root 17931 1 0 09:51 ? 00:00:12 ./redis-server *:6379 root 18679 18288 0 11:49 pts/2 00:00:00 grep --color=auto redis
[root@localhost src]# kill -9 17931
然后开始修改Redis配置文件redis.conf,修改bind = 127.0.0.0.1为bind =0.0.0.0,意思是允许远程访问Redis数据库。
redis-4.0.11/redis.conf
bind 0.0.0.0
接下来启动Redis
cd src
./redis-server ../redis.conf
python配置redis模块
[root@localhost op]# pip3 install redis
[root@localhost op]# python3
Python 3.7.5 (default, May 8 2020, 10:08:52) [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux Type "help", "copyright", "credits" or "license" for more information. >>> import redis >>> r = redis.Redis(host='localhost',port=6379,db=0) >>> r.set('foo','bar') True >>> r.get('foo') b'bar' >>> exit()
启动任务执行模块
第一个Celery程序
my_first_celery.py:
#encoding=utf-8 from celery import Celery import time import socket app = Celery('tasks', broker='redis://127.0.0.1:6379/0',backend ='redis://127.0.0.1:6379/0' ) def get_host_ip(): """ 查询本机ip地址 :return: ip """ try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('8.8.8.8', 80)) ip = s.getsockname()[0] finally: s.close() return ip @app.task def add(x, y): time.sleep(3) # 模拟耗时操作 s = x + y print("主机IP {}: x + y = {}".format(get_host_ip(),s)) return s
代码说明:第7行指定了中间人broker为本机的Redis数据库0,结果后端同样使用Redis;第9~20行定义了一个获取本机IP地址的函数,为后序分布式队列做铺垫;第23行到最后定义了一个模拟虚耗时的任务函数,使用app.task来装饰。
运行这个python程序,
celery -A my_first_celery worker --loglevel=info
/usr/local/Python3/lib/python3.7/site-packages/celery/platforms.py:801: RuntimeWarning: You're running the worker with superuser privileges: this is absolutely not recommended! Please specify a different user using the --uid option. User information: uid=0 euid=0 gid=0 egid=0 uid=uid, euid=euid, gid=gid, egid=egid, -------------- celery@localhost.localdomain v4.4.2 (cliffs) --- ***** ----- -- ******* ---- Linux-3.10.0-1062.el7.x86_64-x86_64-with-centos-7.7.1908-Core 2020-05-31 11:17:26 - *** --- * --- - ** ---------- [config] - ** ---------- .> app: tasks:0x7f920e95a1d0 - ** ---------- .> transport: redis://127.0.0.1:6379/0 - ** ---------- .> results: redis://127.0.0.1:6379/0 - *** --- * --- .> concurrency: 2 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . my_first_celery.add [2020-05-31 11:17:26,645: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0 [2020-05-31 11:17:26,650: INFO/MainProcess] mingle: searching for neighbors [2020-05-31 11:17:27,665: INFO/MainProcess] mingle: all alone [2020-05-31 11:17:27,677: INFO/MainProcess] celery@localhost.localdomain ready.
调用任务函数
start_task.py
from my_first_celery import add import time result = add.delay(12,12) #异步调用,这一步不会阻塞,程序会立即往下运行 while not result.ready(): #循环检查任务是否执行完毕 print(time.strftime("%H:%M:%S")) time.sleep(1) print(result.get())#获取任务的返回结果 print(result.successful())#判断任务是否成功执行
注意:调用任务函数时需保证前面的Worker函数已开启。
[root@localhost op]# python3 start_task.py
12:08:38 12:08:39 12:08:40 24 True
等待几秒钟,在my_first_celery界面出现了如下输出:
[2020-05-31 12:08:41,313: WARNING/ForkPoolWorker-2] 主机IP 10.0.0.30: x + y = 24 [2020-05-31 12:08:41,315: INFO/ForkPoolWorker-2] Task my_first_celery.add[f9410076-d86f-49b4-9ad2-0724a58dfc37] succeeded in 3.006598425010452s: 24
查看任务存储结果
其中f9410076-d86f-49b4-9ad2-0724a58dfc37是taskid,只要指定了backend,根据这个taskid就可以随时去backend查找运行结果。使用方法如下:
[root@localhost op]# python3
Python 3.7.5 (default, May 8 2020, 10:08:52) [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux Type "help", "copyright", "credits" or "license" for more information. >>> from my_first_celery import add >>> taskid='f9410076-d86f-49b4-9ad2-0724a58dfc37' >>> add.AsyncResult(taskid).get() 24
或者
>>> from celery.result import AsyncResult >>> AsyncResult(taskid).get() 24