“APScheduler安装及基本概念”的版本间的差异
(→实例) |
(→配置调度器) |
||
第100行: | 第100行: | ||
(2)提交给执行器按时间点运行。 | (2)提交给执行器按时间点运行。 | ||
+ | |||
+ | 在配置调度器前,首先需要选取适合应用环境场景的调度器、存储器和执行器。下面是各调度器的适用场景: | ||
+ | |||
+ | *BlockingScheduler:适用于调度程序,是进程中唯一运行的进程,调用start函数会阻塞当前线程,不能立即返回。 | ||
+ | *BackgroundScheduler:适用于调度程序,在应用程序的后台运行,调用start后主线程不会阻塞。 | ||
+ | *AsyncIOScheduler:适用于使用了asyncio模块的应用程序。 | ||
+ | *GeventScheduler:适用于使用了gevent模块的应用程序。 | ||
+ | *TwistedScheduler:适用于构建Twisted的应用程序。 | ||
+ | *QtScheduler:适用于构建Qt的应用程序。 | ||
==启动调度器== | ==启动调度器== |
2020年3月7日 (六) 10:30的版本
APScheduler使用起来十分方便,其提供了基于日期、固定时间间隔及crontab类型的任务,我们可以在主程序的运行过程中快速增加新作业或删除旧作业。如果把作业存储在数据库中,那么作业的状态会被保存,当调度器重启时,不必重新添加作业,作业会恢复原状态继续执行。APScheduler可以当作一个跨平台的调度工具来使用,可以作为Linux系统crontab工具或Windows计划任务程序的替换
注意,APScheduler不是一个守护进程或服务,其自身不带有任何命令行工具。它主要是在现有的应用程序中运行,也就是说,APScheduler为我们提供了构建专用调度器或调度服务的基础模块。基于这些功能,我们可以很方便地实现本章开始提到的运维需求。
APScheduler的安装
pip3 install apscheduler
几个基本概念
触发器(triggers):触发器包含调度逻辑,描述一个任务何时被触发,有按日期、按时间间隔、按cronjob描述式三种触发方式。每个作业都有它自己的触发器,除了初始配置之外,触发器是完全无状态的。
作业存储器(job stores):作业存储器指定了作业被存放的位置,默认的作业存储器是内存,也可以将作业保存在各种数据库中。当作业被存放在数据库中时,它会被序列化;当被重新加载时,会反序列化。作业存储器充当保存、加载、更新和查找作业的中间商。在调度器之间不能共享作业存储。
执行器(executors):执行器是将指定的作业(调用函数)提交到线程池或进程池中运行,当任务完成时,执行器通知调度器触发相应的事件。
调度器(schedulers):任务调度器,控制器角色,通过它配置作业存储器、执行器和触发器,添加、修改和删除任务。调度器协调触发器、作业存储器、执行器的运行,通常只有一个调度程序运行在应用程序中,开发人员不需要直接处理作业存储器、执行器或触发
实例
一个简单的间隔任务实例(ex_interval.py):
# -*- coding: utf-8 -*- # Time: 2018/10/13 19:01:30 # File Name: ex_interval.py from datetime import datetime#导入datetime模块, import os from apscheduler.schedulers.blocking import BlockingScheduler '''导入调度器模块BlockingScheduler,这是比较简单的调度器, 调用start方法后不再返回。如果希望将apscheduler用于独立的调度器,如守护进程, 那么BlockingScheduler非常有用''' #定义一个作业tick,这个任务打印出当前的时间 def tick(): print('Tick! The time is: %s' % datetime.now()) #定义主函数入口 if __name__ == '__main__': scheduler = BlockingScheduler() #第10行实例化一个BlockingScheduler类, #不带参数表明使用默认的作业存储器-内存。 #默认的执行器是线程池执行器,最大线程数为10个(另一个是进程池执行器) #添加一个作业tick,触发器为interval,每隔3秒执行一次,另外的触发器为date,cron。 #date特定时间点触发,cron则按固定的时间间隔触发。 scheduler.add_job(tick, 'interval', seconds=3) print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) #打印退出方法信息 try:#try关键字,表明尝试执行下面的代码 scheduler.start()#启用调度器BlockingScheduler。 except (KeyboardInterrupt, SystemExit):#捕捉用户中断执行和解释器退出异常 pass #为pass关键字,表示什么也不做
【示例7-2】一个简单的间隔任务实例(ex_cron.py):
# -*- coding: utf-8 -*- # Time: 2018/10/13 19:21:09 # File Name: ex_cron.py from datetime import datetime import os from apscheduler.schedulers.blocking import BlockingScheduler def tick(): print('Tick! The time is: %s' % datetime.now()) if __name__ == '__main__': scheduler = BlockingScheduler() scheduler.add_job(tick, 'cron', hour=19,minute=24) print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C ')) try: scheduler.start() except (KeyboardInterrupt, SystemExit): pass
定时cron任务也非常简单,直接给触发器trigger传入'cron'即可。hour =19,minute =23表示每天的19时23分执行任务,这里可以填写数字,也可以填写字符串。
our =19 , minute =23 hour ='19', minute ='23' minute = '*/3' 表示每 5 分钟执行一次 hour ='19-21', minute= '23' 表示 19:23、 20:23、 21:23 各执行一次任务
练习:自己练一练以上用法
配置调度器
调度器的主循环其实就是反复检查是否有到期需要执行的任务,分以下两步进行。
(1)询问自己的每一个作业存储器,有没有到期需要执行的任务。如果有需要执行的任务,就计算这些作业中每个作业需要运行的时间
(2)提交给执行器按时间点运行。
在配置调度器前,首先需要选取适合应用环境场景的调度器、存储器和执行器。下面是各调度器的适用场景:
- BlockingScheduler:适用于调度程序,是进程中唯一运行的进程,调用start函数会阻塞当前线程,不能立即返回。
- BackgroundScheduler:适用于调度程序,在应用程序的后台运行,调用start后主线程不会阻塞。
- AsyncIOScheduler:适用于使用了asyncio模块的应用程序。
- GeventScheduler:适用于使用了gevent模块的应用程序。
- TwistedScheduler:适用于构建Twisted的应用程序。
- QtScheduler:适用于构建Qt的应用程序。
启动调度器
启动调度器前需要先添加作业,有两种方法可以向调度器添加作业:一是通过接口add_job();二是通过使用函数装饰器,其中add_job()返回一个apscheduler.job.Job类的实例,用于后续修改或删除作业。
# -*- coding: utf-8 -*- # !/usr/local/bin/python # Time: 2018/10/13 20:27:58 # Description: # File Name: start_schduler.py from apscheduler.schedulers.blocking import BlockingScheduler import datetime from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor def my_job(id='my_job'): print (id,'-->',datetime.datetime.now()) jobstores = { 'default': MemoryJobStore() } executors = { 'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(10) } job_defaults = { 'coalesce': False, 'max_instances': 3 } scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults) scheduler.add_job(my_job, args=['job_interval',],id='job_interval',trigger='interval', seconds=5,replace_existing=True) scheduler.add_job(my_job, args=['job_cron',],id='job_cron',trigger='cron',month='4-8,11-12',hour='7-11', second='*/10',\ end_date='2018-05-30') scheduler.add_job(my_job, args=['job_once_now',],id='job_once_now') scheduler.add_job(my_job, args=['job_date_once',],id='job_date_once',trigger='date',run_date='2018-04-05 07:48:05') try: scheduler.start() except SystemExit: print('exit') exit()