APScheduler安装及基本概念

来自CloudWiki
Cloud17讨论 | 贡献2020年3月7日 (六) 10:40的版本 配置调度器
跳转至: 导航搜索

APScheduler使用起来十分方便,其提供了基于日期、固定时间间隔及crontab类型的任务,我们可以在主程序的运行过程中快速增加新作业或删除旧作业。如果把作业存储在数据库中,那么作业的状态会被保存,当调度器重启时,不必重新添加作业,作业会恢复原状态继续执行。APScheduler可以当作一个跨平台的调度工具来使用,可以作为Linux系统crontab工具或Windows计划任务程序的替换

注意,APScheduler不是一个守护进程或服务,其自身不带有任何命令行工具。它主要是在现有的应用程序中运行,也就是说,APScheduler为我们提供了构建专用调度器或调度服务的基础模块。基于这些功能,我们可以很方便地实现本章开始提到的运维需求。


APScheduler的安装

pip3 install apscheduler

几个基本概念

触发器(triggers):触发器包含调度逻辑,描述一个任务何时被触发,有按日期、按时间间隔、按cronjob描述式三种触发方式。每个作业都有它自己的触发器,除了初始配置之外,触发器是完全无状态的。

作业存储器(job stores):作业存储器指定了作业被存放的位置,默认的作业存储器是内存,也可以将作业保存在各种数据库中。当作业被存放在数据库中时,它会被序列化;当被重新加载时,会反序列化。作业存储器充当保存、加载、更新和查找作业的中间商。在调度器之间不能共享作业存储。

执行器(executors):执行器是将指定的作业(调用函数)提交到线程池或进程池中运行,当任务完成时,执行器通知调度器触发相应的事件。

调度器(schedulers):任务调度器,控制器角色,通过它配置作业存储器、执行器和触发器,添加、修改和删除任务。调度器协调触发器、作业存储器、执行器的运行,通常只有一个调度程序运行在应用程序中,开发人员不需要直接处理作业存储器、执行器或触发

Auto8-1.png

实例

一个简单的间隔任务实例(ex_interval.py):

  # -*- coding: utf-8 -*-
# Time: 2018/10/13 19:01:30
# File Name: ex_interval.py

from datetime import datetime#导入datetime模块,
import os
from apscheduler.schedulers.blocking import BlockingScheduler
'''导入调度器模块BlockingScheduler,这是比较简单的调度器,
调用start方法后不再返回。如果希望将apscheduler用于独立的调度器,如守护进程,
那么BlockingScheduler非常有用'''

#定义一个作业tick,这个任务打印出当前的时间
def tick():
    print('Tick! The time is: %s' % datetime.now())

#定义主函数入口
if __name__ == '__main__':
    scheduler = BlockingScheduler()
    #第10行实例化一个BlockingScheduler类,
    #不带参数表明使用默认的作业存储器-内存。
    #默认的执行器是线程池执行器,最大线程数为10个(另一个是进程池执行器)

    #添加一个作业tick,触发器为interval,每隔3秒执行一次,另外的触发器为date,cron。
    #date特定时间点触发,cron则按固定的时间间隔触发。
    scheduler.add_job(tick, 'interval', seconds=3)
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
    #打印退出方法信息
    try:#try关键字,表明尝试执行下面的代码
        scheduler.start()#启用调度器BlockingScheduler。
    except (KeyboardInterrupt, SystemExit):#捕捉用户中断执行和解释器退出异常
        pass #为pass关键字,表示什么也不做





【示例7-2】一个简单的间隔任务实例(ex_cron.py):

# -*- coding: utf-8 -*-
# Time: 2018/10/13 19:21:09
# File Name: ex_cron.py


from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingScheduler

def tick():
    print('Tick! The time is: %s' % datetime.now())

if __name__ == '__main__':
    scheduler = BlockingScheduler()
    scheduler.add_job(tick, 'cron', hour=19,minute=24)
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C    '))

    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        pass

定时cron任务也非常简单,直接给触发器trigger传入'cron'即可。hour =19,minute =23表示每天的19时23分执行任务,这里可以填写数字,也可以填写字符串。

our =19 , minute =23
hour ='19', minute ='23'
minute = '*/3' 表示每 5 分钟执行一次
hour ='19-21', minute= '23' 表示 19:23、 20:23、 21:23 各执行一次任务

练习:自己练一练以上用法

配置调度器

调度器的主循环其实就是反复检查是否有到期需要执行的任务,分以下两步进行。

(1)询问自己的每一个作业存储器,有没有到期需要执行的任务。如果有需要执行的任务,就计算这些作业中每个作业需要运行的时间

(2)提交给执行器按时间点运行。

在配置调度器前,首先需要选取适合应用环境场景的调度器、存储器和执行器。下面是各调度器的适用场景:

  • BlockingScheduler:适用于调度程序,是进程中唯一运行的进程,调用start函数会阻塞当前线程,不能立即返回。
  • BackgroundScheduler:适用于调度程序,在应用程序的后台运行,调用start后主线程不会阻塞。
  • AsyncIOScheduler:适用于使用了asyncio模块的应用程序。
  • GeventScheduler:适用于使用了gevent模块的应用程序。
  • TwistedScheduler:适用于构建Twisted的应用程序。
  • QtScheduler:适用于构建Qt的应用程序。

作业存储器的选择

作业存储器的选择有两种:一是内存,也是默认的配置;二是数据库。具体选哪种要看我们的应用程序在崩溃时是否重启整个应用程序,如果重启整个应用程序,作业就会被重新添加到调度器中,此时选取内存作为作业存储器既简单又高效。但是,如果调度器重启或应用程序崩溃,就需要作业从中断时恢复正常运行,我们通常选择将作业存储在数据库中,使用哪种数据库取决于在编程环境中使用了什么数据库。我们可以自由选择,PostgreSQL是推荐的选择,因为它具有强大的数据完整性保护。

执行器的选择

同样的,执行器的选择也取决于应用场景。通常默认的ThreadPoolExecutor已经足够好。如果作业负载涉及CPU密集型操作,那么应该考虑使用ProcessPoolExecutor,甚至可以同时使用这两种执行器,将ProcessPoolExecutor执行器添加为二级执行器。

如果调度程序在应用程序的后台运行,则选择BackgroundScheduler,并使用默认的jobstore和executor。

from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()

配置调度器:

jobstores = {
    'default': MemoryJobStore()

}
executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(10)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}

启动调度器

启动调度器前需要先添加作业,有两种方法可以向调度器添加作业:一是通过接口add_job();二是通过使用函数装饰器,其中add_job()返回一个apscheduler.job.Job类的实例,用于后续修改或删除作业。

# -*- coding: utf-8 -*-
# !/usr/local/bin/python
# Time: 2018/10/13 20:27:58
# Description:
# File Name: start_schduler.py

from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

def my_job(id='my_job'):
    print (id,'-->',datetime.datetime.now())
jobstores = {
    'default': MemoryJobStore()

}
executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(10)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
scheduler.add_job(my_job, args=['job_interval',],id='job_interval',trigger='interval', seconds=5,replace_existing=True)
scheduler.add_job(my_job, args=['job_cron',],id='job_cron',trigger='cron',month='4-8,11-12',hour='7-11', second='*/10',\
                  end_date='2018-05-30')
scheduler.add_job(my_job, args=['job_once_now',],id='job_once_now')
scheduler.add_job(my_job, args=['job_date_once',],id='job_date_once',trigger='date',run_date='2018-04-05 07:48:05')
try:
    scheduler.start()
except SystemExit:
    print('exit')
    exit()