“第一个Celery程序”的版本间的差异
来自CloudWiki
第108行: | 第108行: | ||
[2020-05-31 11:17:27,677: INFO/MainProcess] celery@localhost.localdomain ready. | [2020-05-31 11:17:27,677: INFO/MainProcess] celery@localhost.localdomain ready. | ||
</nowiki> | </nowiki> | ||
+ | |||
+ | ===调用任务函数=== | ||
+ | <nowiki> | ||
+ | from my_first_celery import add | ||
+ | import time | ||
+ | result = add.delay(12,12) #异步调用,这一步不会阻塞,程序会立即往下运行 | ||
+ | |||
+ | while not result.ready(): #循环检查任务是否执行完毕 | ||
+ | print(time.strftime("%H:%M:%S")) | ||
+ | time.sleep(1) | ||
+ | |||
+ | print(result.get())#获取任务的返回结果 | ||
+ | print(result.successful())#判断任务是否成功执行 | ||
+ | </nowiki> | ||
+ | |||
+ | [root@localhost op]# python3 start_task.py | ||
+ | |||
+ | <nowiki>12:08:38 | ||
+ | 12:08:39 | ||
+ | 12:08:40 | ||
+ | 24 | ||
+ | True</nowiki> |
2020年5月31日 (日) 04:11的版本
Redis配置和启动
我们以Redis为例,首先修改Redis配置文件redis.conf,修改bind = 127.0.0.0.1为bind =0.0.0.0,意思是允许远程访问Redis数据库。
redis-4.0.11/redis.conf
bind 0.0.0.0
接下来启动Redis
cd src
./redis-server ../redis.conf
[root@localhost src]# ps -ef |grep redis
root 84370 80953 0 12:07 pts/2 00:00:00 ./redis-server 0.0.0.0:6379 root 84388 84206 0 12:11 pts/3 00:00:00 grep --color=auto redis
python配置redis模块
[root@localhost op]# pip3 install redis
[root@localhost op]# python3
Python 3.7.5 (default, May 8 2020, 10:08:52) [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux Type "help", "copyright", "credits" or "license" for more information. >>> import redis >>> r = redis.Redis(host='localhost',port=6379,db=0) >>> r.set('foo','bar') True >>> r.get('foo') b'bar' >>> exit()
第一个Celery程序
#encoding=utf-8 from celery import Celery import time import socket app = Celery('tasks', broker='redis://127.0.0.1:6379/0',backend ='redis://127.0.0.1:6379/0' ) def get_host_ip(): """ 查询本机ip地址 :return: ip """ try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('8.8.8.8', 80)) ip = s.getsockname()[0] finally: s.close() return ip @app.task def add(x, y): time.sleep(3) # 模拟耗时操作 s = x + y print("主机IP {}: x + y = {}".format(get_host_ip(),s)) return s
代码说明:第7行指定了中间人broker为本机的Redis数据库0,结果后端同样使用Redis;第9~20行定义了一个获取本机IP地址的函数,为后序分布式队列做铺垫;第23行到最后定义了一个模拟虚耗时的任务函数,使用app.task来装饰。
运行这个python程序,
celery -A my_first_celery worker --loglevel=info
/usr/local/Python3/lib/python3.7/site-packages/celery/platforms.py:801: RuntimeWarning: You're running the worker with superuser privileges: this is absolutely not recommended! Please specify a different user using the --uid option. User information: uid=0 euid=0 gid=0 egid=0 uid=uid, euid=euid, gid=gid, egid=egid, -------------- celery@localhost.localdomain v4.4.2 (cliffs) --- ***** ----- -- ******* ---- Linux-3.10.0-1062.el7.x86_64-x86_64-with-centos-7.7.1908-Core 2020-05-31 11:17:26 - *** --- * --- - ** ---------- [config] - ** ---------- .> app: tasks:0x7f920e95a1d0 - ** ---------- .> transport: redis://127.0.0.1:6379/0 - ** ---------- .> results: redis://127.0.0.1:6379/0 - *** --- * --- .> concurrency: 2 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . my_first_celery.add [2020-05-31 11:17:26,645: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0 [2020-05-31 11:17:26,650: INFO/MainProcess] mingle: searching for neighbors [2020-05-31 11:17:27,665: INFO/MainProcess] mingle: all alone [2020-05-31 11:17:27,677: INFO/MainProcess] celery@localhost.localdomain ready.
调用任务函数
from my_first_celery import add import time result = add.delay(12,12) #异步调用,这一步不会阻塞,程序会立即往下运行 while not result.ready(): #循环检查任务是否执行完毕 print(time.strftime("%H:%M:%S")) time.sleep(1) print(result.get())#获取任务的返回结果 print(result.successful())#判断任务是否成功执行
[root@localhost op]# python3 start_task.py
12:08:38 12:08:39 12:08:40 24 True