“APScheduler安装及基本概念”的版本间的差异
第23行: | 第23行: | ||
一个简单的间隔任务实例(ex_interval.py): | 一个简单的间隔任务实例(ex_interval.py): | ||
− | <nowiki># -*- coding: utf-8 -*- | + | <nowiki> # -*- coding: utf-8 -*- |
# Time: 2018/10/13 19:01:30 | # Time: 2018/10/13 19:01:30 | ||
# File Name: ex_interval.py | # File Name: ex_interval.py | ||
− | from datetime import datetime | + | from datetime import datetime#导入datetime模块, |
import os | import os | ||
from apscheduler.schedulers.blocking import BlockingScheduler | from apscheduler.schedulers.blocking import BlockingScheduler | ||
+ | '''导入调度器模块BlockingScheduler,这是比较简单的调度器, | ||
+ | 调用start方法后不再返回。如果希望将apscheduler用于独立的调度器,如守护进程, | ||
+ | 那么BlockingScheduler非常有用''' | ||
+ | #定义一个作业tick,这个任务打印出当前的时间 | ||
def tick(): | def tick(): | ||
print('Tick! The time is: %s' % datetime.now()) | print('Tick! The time is: %s' % datetime.now()) | ||
+ | #定义主函数入口 | ||
if __name__ == '__main__': | if __name__ == '__main__': | ||
scheduler = BlockingScheduler() | scheduler = BlockingScheduler() | ||
+ | #第10行实例化一个BlockingScheduler类, | ||
+ | #不带参数表明使用默认的作业存储器-内存。 | ||
+ | #默认的执行器是线程池执行器,最大线程数为10个(另一个是进程池执行器) | ||
+ | |||
+ | #添加一个作业tick,触发器为interval,每隔3秒执行一次,另外的触发器为date,cron。 | ||
+ | #date特定时间点触发,cron则按固定的时间间隔触发。 | ||
scheduler.add_job(tick, 'interval', seconds=3) | scheduler.add_job(tick, 'interval', seconds=3) | ||
− | print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C | + | print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) |
− | + | #打印退出方法信息 | |
− | try: | + | try:#try关键字,表明尝试执行下面的代码 |
− | scheduler.start() | + | scheduler.start()#启用调度器BlockingScheduler。 |
− | except (KeyboardInterrupt, SystemExit): | + | except (KeyboardInterrupt, SystemExit):#捕捉用户中断执行和解释器退出异常 |
− | pass | + | pass #为pass关键字,表示什么也不做 |
2020年3月7日 (六) 10:16的版本
APScheduler使用起来十分方便,其提供了基于日期、固定时间间隔及crontab类型的任务,我们可以在主程序的运行过程中快速增加新作业或删除旧作业。如果把作业存储在数据库中,那么作业的状态会被保存,当调度器重启时,不必重新添加作业,作业会恢复原状态继续执行。APScheduler可以当作一个跨平台的调度工具来使用,可以作为Linux系统crontab工具或Windows计划任务程序的替换
注意,APScheduler不是一个守护进程或服务,其自身不带有任何命令行工具。它主要是在现有的应用程序中运行,也就是说,APScheduler为我们提供了构建专用调度器或调度服务的基础模块。基于这些功能,我们可以很方便地实现本章开始提到的运维需求。
APScheduler的安装
pip3 install apscheduler
几个基本概念
触发器(triggers):触发器包含调度逻辑,描述一个任务何时被触发,有按日期、按时间间隔、按cronjob描述式三种触发方式。每个作业都有它自己的触发器,除了初始配置之外,触发器是完全无状态的。
作业存储器(job stores):作业存储器指定了作业被存放的位置,默认的作业存储器是内存,也可以将作业保存在各种数据库中。当作业被存放在数据库中时,它会被序列化;当被重新加载时,会反序列化。作业存储器充当保存、加载、更新和查找作业的中间商。在调度器之间不能共享作业存储。
执行器(executors):执行器是将指定的作业(调用函数)提交到线程池或进程池中运行,当任务完成时,执行器通知调度器触发相应的事件。
调度器(schedulers):任务调度器,控制器角色,通过它配置作业存储器、执行器和触发器,添加、修改和删除任务。调度器协调触发器、作业存储器、执行器的运行,通常只有一个调度程序运行在应用程序中,开发人员不需要直接处理作业存储器、执行器或触发
实例
一个简单的间隔任务实例(ex_interval.py):
# -*- coding: utf-8 -*- # Time: 2018/10/13 19:01:30 # File Name: ex_interval.py from datetime import datetime#导入datetime模块, import os from apscheduler.schedulers.blocking import BlockingScheduler '''导入调度器模块BlockingScheduler,这是比较简单的调度器, 调用start方法后不再返回。如果希望将apscheduler用于独立的调度器,如守护进程, 那么BlockingScheduler非常有用''' #定义一个作业tick,这个任务打印出当前的时间 def tick(): print('Tick! The time is: %s' % datetime.now()) #定义主函数入口 if __name__ == '__main__': scheduler = BlockingScheduler() #第10行实例化一个BlockingScheduler类, #不带参数表明使用默认的作业存储器-内存。 #默认的执行器是线程池执行器,最大线程数为10个(另一个是进程池执行器) #添加一个作业tick,触发器为interval,每隔3秒执行一次,另外的触发器为date,cron。 #date特定时间点触发,cron则按固定的时间间隔触发。 scheduler.add_job(tick, 'interval', seconds=3) print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) #打印退出方法信息 try:#try关键字,表明尝试执行下面的代码 scheduler.start()#启用调度器BlockingScheduler。 except (KeyboardInterrupt, SystemExit):#捕捉用户中断执行和解释器退出异常 pass #为pass关键字,表示什么也不做
【示例7-2】一个简单的间隔任务实例(ex_cron.py):
# -*- coding: utf-8 -*- # Time: 2018/10/13 19:21:09 # File Name: ex_cron.py from datetime import datetime import os from apscheduler.schedulers.blocking import BlockingScheduler def tick(): print('Tick! The time is: %s' % datetime.now()) if __name__ == '__main__': scheduler = BlockingScheduler() scheduler.add_job(tick, 'cron', hour=19,minute=24) print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C ')) try: scheduler.start() except (KeyboardInterrupt, SystemExit): pass
配置调度器
调度器的主循环其实就是反复检查是否有到期需要执行的任务,分以下两步进行。
(1)询问自己的每一个作业存储器,有没有到期需要执行的任务。如果有需要执行的任务,就计算这些作业中每个作业需要运行的时间
(2)提交给执行器按时间点运行。
启动调度器
启动调度器前需要先添加作业,有两种方法可以向调度器添加作业:一是通过接口add_job();二是通过使用函数装饰器,其中add_job()返回一个apscheduler.job.Job类的实例,用于后续修改或删除作业。
# -*- coding: utf-8 -*- # !/usr/local/bin/python # Time: 2018/10/13 20:27:58 # Description: # File Name: start_schduler.py from apscheduler.schedulers.blocking import BlockingScheduler import datetime from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor def my_job(id='my_job'): print (id,'-->',datetime.datetime.now()) jobstores = { 'default': MemoryJobStore() } executors = { 'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(10) } job_defaults = { 'coalesce': False, 'max_instances': 3 } scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults) scheduler.add_job(my_job, args=['job_interval',],id='job_interval',trigger='interval', seconds=5,replace_existing=True) scheduler.add_job(my_job, args=['job_cron',],id='job_cron',trigger='cron',month='4-8,11-12',hour='7-11', second='*/10',\ end_date='2018-05-30') scheduler.add_job(my_job, args=['job_once_now',],id='job_once_now') scheduler.add_job(my_job, args=['job_date_once',],id='job_date_once',trigger='date',run_date='2018-04-05 07:48:05') try: scheduler.start() except SystemExit: print('exit') exit()