当前位置:首页 > 科技  > 软件

高效定时任务处理:深入学习Python中APScheduler库的奥秘

来源: 责编: 时间:2023-09-28 10:08:53 422观看
导读APScheduler是Python中一个强大的第三方库,用于在后台执行定时任务。它允许我们根据设定的时间间隔、日期规则或特定时间来执行任务,适用于定时执行脚本、定时发送邮件、定时处理数据等场景。APScheduler的功能使得在Py

APScheduler是Python中一个强大的第三方库,用于在后台执行定时任务。它允许我们根据设定的时间间隔、日期规则或特定时间来执行任务,适用于定时执行脚本、定时发送邮件、定时处理数据等场景。APScheduler的功能使得在Python中实现定时任务变得非常简单和高效。本文将从入门到精通地介绍APScheduler库的使用方法,带你掌握在Python中实现定时任务的技巧。saz28资讯网——每日最新资讯28at.com

saz28资讯网——每日最新资讯28at.com

1. 安装和导入

首先,我们需要安装APScheduler库。可以使用pip命令进行安装:saz28资讯网——每日最新资讯28at.com

pip install apscheduler

安装完成后,我们可以在Python代码中导入APScheduler:saz28资讯网——每日最新资讯28at.com

from apscheduler.schedulers.background import BackgroundScheduler

2. 创建定时任务

APScheduler提供了BackgroundScheduler和BlockingScheduler两种类型的调度器,用于创建定时任务。BackgroundScheduler在后台运行,不会阻塞主线程;而BlockingScheduler会阻塞主线程直到所有任务完成。saz28资讯网——每日最新资讯28at.com

from apscheduler.schedulers.background import BackgroundSchedulerimport time# 创建后台调度器scheduler = BackgroundScheduler()# 定义任务函数def job():    print("定时任务执行:", time.strftime("%Y-%m-%d %H:%M:%S"))    # 添加定时任务,每隔5秒执行一次scheduler.add_job(job, 'interval', seconds=5)# 启动调度器scheduler.start()# 主线程等待一段时间后结束time.sleep(20)# 关闭调度器scheduler.shutdown()print("主线程结束")

在上述代码中,我们首先创建了一个后台调度器scheduler,然后定义了一个名为job的任务函数,在其中打印当前时间。使用scheduler.add_job()添加了一个定时任务,设置为每隔5秒执行一次。然后,我们启动了调度器scheduler,让定时任务在后台执行。主线程等待20秒后结束,并调用scheduler.shutdown()关闭调度器。saz28资讯网——每日最新资讯28at.com

3. 定时任务触发器

APScheduler提供了多种触发器类型,用于设置定时任务的触发条件。 interval触发器: 按照设定的时间间隔来触发任务。saz28资讯网——每日最新资讯28at.com

from apscheduler.schedulers.background import BackgroundSchedulerimport time# 创建后台调度器scheduler = BackgroundScheduler()# 定义任务函数def job():    print("定时任务执行:", time.strftime("%Y-%m-%d %H:%M:%S"))    # 添加定时任务,每隔5秒执行一次scheduler.add_job(job, 'interval', seconds=5)# 启动调度器scheduler.start()# 主线程等待一段时间后结束time.sleep(20)# 关闭调度器scheduler.shutdown()print("主线程结束")

在上述代码中,我们使用'interval'触发器,设置任务每隔5秒执行一次。 cron触发器: 使用类似于Linux中cron表达式的规则来触发任务,可以精确到秒。saz28资讯网——每日最新资讯28at.com

from apscheduler.schedulers.background import BackgroundSchedulerimport time# 创建后台调度器scheduler = BackgroundScheduler()# 定义任务函数def job():    print("定时任务执行:", time.strftime("%Y-%m-%d %H:%M:%S"))    # 添加定时任务,每天的13点30分触发任务scheduler.add_job(job, 'cron', hour=13, minute=30)# 启动调度器scheduler.start()# 主线程等待一段时间后结束time.sleep(60)# 关闭调度器scheduler.shutdown()print("主线程结束")

在上述代码中,我们使用'cron'触发器,设置任务每天的13点30分触发。 date触发器: 在指定的时间点触发任务。saz28资讯网——每日最新资讯28at.com

from apscheduler.schedulers.background import BackgroundSchedulerimport time# 创建后台调度器scheduler = BackgroundScheduler()# 定义任务函数def job():    print("定时任务执行:", time.strftime("%Y-%m-%d %H:%M:%S"))    # 添加定时任务,设置任务在2023年7月31日10点30分触发scheduler.add_job(job, 'date', run_date='2023-07-31 10:30:00')# 启动调度器scheduler.start()# 主线程等待一段时间后结束time.sleep(60)# 关闭调度器scheduler.shutdown()print("主线程结束")

在上述代码中,我们使用'date'触发器,设置任务在2023年7月31日10点30分触发。saz28资讯网——每日最新资讯28at.com

4. 任务存储

APScheduler支持将任务存储在不同的后端存储中,如内存、数据库等。默认情况下,任务是存储在内存中的。saz28资讯网——每日最新资讯28at.com

from apscheduler.schedulers.background import BackgroundSchedulerimport time# 创建后台调度器scheduler = BackgroundScheduler()# 定义任务函数def job():    print("定时任务执行:", time.strftime("%Y-%m-%d %H:%M:%S"))    # 添加定时任务,每隔5秒执行一次scheduler.add_job(job, 'interval', seconds=5)# 启动调度器scheduler.start()# 主线程等待一段时间后结束time.sleep(20)# 关闭调度器scheduler.shutdown()print("主线程结束")

在上述代码中,我们使用默认的内存存储来存储任务。 如果需要将任务存储在数据库中,可以使用jobstores参数来设置。saz28资讯网——每日最新资讯28at.com

from apscheduler.schedulers.background import BackgroundSchedulerfrom apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStoreimport time# 创建后台调度器scheduler = BackgroundScheduler()# 创建数据库存储jobstores = {    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}# 定义任务函数def job():    print("定时任务执行:", time.strftime("%Y-%m-%d %H:%M:%S"))    # 添加定时任务,每隔5秒执行一次scheduler.add_job(job, 'interval', seconds=5)# 启动调度器scheduler.start()# 主线程等待一段时间后结束time.sleep(20)# 关闭调度器scheduler.shutdown()print("主线程结束")

在上述代码中,我们使用了SQLAlchemyJobStore来将任务存储在SQLite数据库中。saz28资讯网——每日最新资讯28at.com

5. 并发执行

默认情况下,APScheduler会将任务串行执行,也就是说一个任务结束后才会执行下一个任务。如果希望并发执行多个任务,可以使用max_instances参数来设置。saz28资讯网——每日最新资讯28at.com

from apscheduler.schedulers.background import BackgroundSchedulerimport time# 创建后台调度器scheduler = BackgroundScheduler()# 定义任务函数def job(index):    print(f"定时任务{index}执行:", time.strftime("%Y-%m-%d %H:%M:%S"))    # 添加定时任务,每隔5秒执行一次,最多并发3个任务scheduler.add_job(job, 'interval', seconds=5, args=[1], max_instances=3)scheduler.add_job(job, 'interval', seconds=5, args=[2], max_instances=3)scheduler.add_job(job, 'interval', seconds=5, args=[3], max_instances=3)# 启动调度器scheduler.start()# 主线程等待一段时间后结束time.sleep(20)# 关闭调度器scheduler.shutdown()print("主线程结束")

在上述代码中,我们使用了args参数传递参数给任务函数,并使用max_instances参数设置最多并发3个任务。saz28资讯网——每日最新资讯28at.com

6. 阻塞和非阻塞

APScheduler提供了阻塞和非阻塞两种调度器类型。 阻塞调度器: 在调度器启动后,会阻塞主线程直到所有任务完成。saz28资讯网——每日最新资讯28at.com

from apscheduler.schedulers.blocking import BlockingSchedulerimport time# 创建阻塞调度器scheduler = BlockingScheduler()# 定义任务函数def job():    print("定时任务执行:", time.strftime("%Y-%m-%d %H:%M:%S"))    # 添加定时任务,每隔5秒执行一次scheduler.add_job(job, 'interval', seconds=5)# 启动调度器scheduler.start()print("主线程结束")

非阻塞调度器: 在调度器启动后,不会阻塞主线程。saz28资讯网——每日最新资讯28at.com

from apscheduler.schedulers.background import BackgroundSchedulerimport time# 创建后台调度器scheduler = BackgroundScheduler()# 定义任务函数def job():    print("定时任务执行:", time.strftime("%Y-%m-%d %H:%M:%S"))    # 添加定时任务,每隔5秒执行一次scheduler.add_job(job, 'interval', seconds=5)# 启动调度器scheduler.start()# 主线程等待一段时间后结束time.sleep(20)# 关闭调度器scheduler.shutdown()print("主线程结束")

在上述代码中,我们分别使用BlockingScheduler和BackgroundScheduler创建了阻塞和非阻塞调度器。saz28资讯网——每日最新资讯28at.com

7. 错误处理

在任务执行过程中,可能会出现异常。APScheduler提供了异常处理机制,我们可以通过try...except...捕获任务函数中的异常,并进行相应的处理。saz28资讯网——每日最新资讯28at.com

from apscheduler.schedulers.background import BackgroundSchedulerimport time# 创建后台调度器scheduler = BackgroundScheduler()# 定义任务函数def job():    try:        print("定时任务执行:", time.strftime("%Y-%m-%d %H:%M:%S"))        # 抛出一个异常        raise ValueError("任务出现异常")    except Exception as e:        print("任务执行过程中发生异常:", str(e))        # 添加定时任务,每隔5秒执行一次scheduler.add_job(job, 'interval', seconds=5)# 启动调度器scheduler.start()# 主线程等待一段时间后结束time.sleep(20)# 关闭调度器scheduler.shutdown()print("主线程结束")

在上述代码中,我们在任务函数中抛出了一个ValueError异常,并通过try...except...捕获并输出了异常信息。saz28资讯网——每日最新资讯28at.com

8. 立即执行任务

有时候我们可能需要立即执行一个任务,而不是等到下次触发时间。APScheduler提供了run_job方法来立即执行任务。saz28资讯网——每日最新资讯28at.com

from apscheduler.schedulers.background import BackgroundSchedulerimport time# 创建后台调度器scheduler = BackgroundScheduler()# 定义任务函数def job():    print("定时任务执行:", time.strftime("%Y-%m-%d %H:%M:%S"))    # 添加定时任务,每隔5秒执行一次scheduler.add_job(job, 'interval', seconds=5)# 启动调度器scheduler.start()# 立即执行任务scheduler.run_job(job)# 主线程等待一段时间后结束time.sleep(20)# 关闭调度器scheduler.shutdown()print("主线程结束")

在上述代码中,我们使用scheduler.run_job(job)方法立即执行了任务。saz28资讯网——每日最新资讯28at.com

9. 调度器持久化

在实际应用中,我们可能需要将调度器的配置保存到文件中,以便在下次启动时恢复。saz28资讯网——每日最新资讯28at.com

from apscheduler.schedulers.background import BackgroundSchedulerfrom apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStoreimport time# 创建数据库存储jobstores = {    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}# 创建后台调度器,并指定jobstores参数scheduler = BackgroundScheduler(jobstores=jobstores)# 定义任务函数def job():    print("定时任务执行:", time.strftime("%Y-%m-%d %H:%M:%S"))    # 添加定时任务,每隔5秒执行一次scheduler.add_job(job, 'interval', seconds=5)# 启动调度器scheduler.start()# 主线程等待一段时间后结束time.sleep(20)# 关闭调度器scheduler.shutdown()print("主线程结束")

在上述代码中,我们创建了一个数据库存储jobstores,并在创建后台调度器时指定了jobstores参数。这样,在调度器运行过程中,任务的配置将会被持久化到数据库中。saz28资讯网——每日最新资讯28at.com

10. 任务监听器

APScheduler提供了任务监听器,用于监听任务的状态变化。我们可以通过add_listener方法添加监听器,并在任务状态发生变化时进行相应的处理。saz28资讯网——每日最新资讯28at.com

from apscheduler.schedulers.background import BackgroundSchedulerimport time# 创建后台调度器scheduler = BackgroundScheduler()# 定义任务函数def job():    print("定时任务执行:", time.strftime("%Y-%m-%d %H:%M:%S"))    # 添加定时任务,每隔5秒执行一次scheduler.add_job(job, 'interval', seconds=5)# 定义任务监听器def my_listener(event):    if event.exception:        print("任务执行过程中发生异常:", str(event.exception))    else:        print("任务执行成功")        # 添加任务监听器scheduler.add_listener(my_listener, mask='all')# 启动调度器scheduler.start()# 主线程等待一段时间后结束time.sleep(20)# 关闭调度器scheduler.shutdown()print("主线程结束")

在上述代码中,我们创建了一个任务监听器my_listener,并在任务执行过程中通过if...else...判断是否出现异常。然后通过scheduler.add_listener(my_listener, mask='all')方法添加了监听器。saz28资讯网——每日最新资讯28at.com

11. 移除定时任务

如果我们希望在调度器运行过程中移除某个定时任务,可以使用scheduler.remove_job(job_id)方法。saz28资讯网——每日最新资讯28at.com

from apscheduler.schedulers.background import BackgroundSchedulerimport time# 创建后台调度器scheduler = BackgroundScheduler()# 定义任务函数def job():    print("定时任务执行:", time.strftime("%Y-%m-%d %H:%M:%S"))    # 添加定时任务,每隔5秒执行一次,并获取任务IDjob_id = scheduler.add_job(job, 'interval', seconds=5).id# 启动调度器scheduler.start()# 主线程等待一段时间后移除定时任务time.sleep(10)scheduler.remove_job(job_id)# 主线程等待一段时间后结束time.sleep(10)# 关闭调度器scheduler.shutdown()print("主线程结束")

在上述代码中,我们通过scheduler.add_job(job, 'interval', seconds=5).id获取了定时任务的ID,并使用scheduler.remove_job(job_id)移除了定时任务。saz28资讯网——每日最新资讯28at.com

总结

通过本文的介绍,我们学习了APScheduler库的基本用法,包括创建定时任务、定时任务触发器、任务存储、并发执行、阻塞和非阻塞调度器、错误处理、立即执行任务、调度器持久化、任务监听器和移除定时任务等。APScheduler为Python开发者提供了一个强大的定时任务调度框架,使得在Python中实现定时任务变得非常简单和高效。掌握APScheduler的使用将为我们的项目和程序带来很大的便利。saz28资讯网——每日最新资讯28at.com

本文链接:http://www.28at.com/showinfo-26-11877-0.html高效定时任务处理:深入学习Python中APScheduler库的奥秘

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: C++循环优化:提升性能的关键技巧

下一篇: 为什么写代码注释应该是注释 Why,而不是 How 和什么 What

标签:
  • 热门焦点
  • 一加Ace2 Pro官宣:普及16G内存 引领24G

    一加官方今天继续为本月发布的新机一加Ace2 Pro带来预热,公布了内存方面的信息。“淘汰 8GB ,12GB 起步,16GB 普及,24GB 引领,还有呢?#一加Ace2Pro#,2023 年 8 月,敬请期待。”同时
  • 对标苹果的灵动岛 华为带来实况窗功能

    继苹果的灵动岛之后,华为也在今天正式推出了“实况窗”功能。据今天鸿蒙OS 4.0的现场演示显示,华为的实况窗可以更高效的展现出实时通知,比如锁屏上就能看到外卖、打车、银行
  • 5月安卓手机好评榜:魅族20 Pro夺冠

    性能榜和性价比榜之后,我们来看最后的安卓手机好评榜,数据来源安兔兔评测,收集时间2023年5月1日至5月31日,仅限国内市场。第一名:魅族20 Pro好评率:97.50%不得不感慨魅族老品牌还
  • 服务存储设计模式:Cache-Aside模式

    Cache-Aside模式一种常用的缓存方式,通常是把数据从主存储加载到KV缓存中,加速后续的访问。在存在重复度的场景,Cache-Aside可以提升服务性能,降低底层存储的压力,缺点是缓存和底
  • 重估百度丨“晚熟”的百度云,能等到春天吗?

    ©自象限原创作者|程心排版|王喻可2016年7月13日,百度云计算战略发布会在北京举行,宣告着百度智能云的正式启程。彼时的会场座无虚席,甚至排队排到了门外,在场的所有人几乎都
  • 腾讯盖楼,字节拆墙

    来源 | 光子星球撰文 | 吴坤谚编辑 | 吴先之“想重温暴刷深渊、30+技能搭配暴搓到爽的游戏体验吗?一起上晶核,即刻暴打!”曾凭借直播腾讯旗下代理格斗游戏《DNF》一
  • 新电商三兄弟,“抖快红”成团!

    来源:价值研究所作 者:Hernanderz 随着内容电商的概念兴起,抖音、快手、小红书组成的“新电商三兄弟”成为业内一股不可忽视的势力,给阿里、京东、拼多多带去了巨大压
  • 三星显示已开始为AR设备研发硅基LED微显示屏

    7月18日消息,据外媒报道,随着苹果首款头显产品Vision Pro在6月份正式推出,AR/VR/MR等头显产品也就将成为各大公司下一个重要的竞争领域,对显示屏这一关
  • 与兆芯合作 联想推出全新旗舰版笔记本电脑开天N7系列

    联想与兆芯合作推出全新联想旗舰版笔记本电脑开天 N7系列。这个系列采用兆芯KX-6640MA处理器平台,KX-6640MA 处理器是采用了陆家嘴架构,16nm 工艺,4 核 4 线
Top