这篇教程Python中实现定时任务详解写得很实用,希望能帮到您。 在项目中,我们可能遇到有定时任务的需求。 - 其一:每隔一个时间段就执行任务。
比如:压测中每隔45分钟调整温箱的温度。 - 其二:定时执行任务。
例如每天早上 8 点定时推送早报。
今天,我跟大家分享下 Python 定时任务的实现方法。
固定时间间隔执行任务import timeimport logginglogging.basicConfig( level=logging.debug, format="%(asctime)s.%(msecs)d | %(threadName)s | %(levelname)s - %(message)s")def task(): logging.info("Task Start.") time.sleep(1) logging.info("Task Done.")
time.sleep第一种办法是最简单又最暴力。 那就是在一个死循环中,使用线程睡眠函数 sleep()。 while True: task() time.sleep(5) 上述的方法有几个问题: - 阻塞主进程,这个也好解决,可以放到线程中去执行
- 一次只有一个task,这个也有解决办法,可以多启几个线程
threading.Timer既然第一种方法暴力,那么有没有比较优雅点的方法? Python 标准库 threading 中有个 Timer 类。 它会新启动一个线程来执行定时任务,所以它是非阻塞函式。 原理:线程中预置一个finished的事件,通过finished.wait等待固定时间间隔。 超时则执行任务。如果需要取消任务,可以调用Timer.cancel来取消任务。
from threading import Timert = Timer(task, 5)t.start() 优点: 不阻塞主进程,task在线程中执行 缺点: 一个Timer只能执行一次task就结束了。 如果想循环,需要改造一下task函数。 from threading import Timerdef repeat_task(): t = Timer(5, repeat_task) # 开始任务的位置决定了是任务之间等待固定间隔时间 # 还是每个任务的开始等待固定间隔时间 t.start() task() 这样可以循环执行,但是仍然只能一个线程一个任务。 Q:如何跳出循环 A:加入一个标识符: sleep 可以用一个布尔值来控制Timer 可以用一个Event来控制
固定时间点执行任务以上是用内置的方法中比较简单的实现方式。简单的功能可以实现。 前面执行的都是指定间隔时间的定时任务,那怎么执行指定时间点的任务呢? 上面的方法是可以做到的。有两种思路, - 前面我们指定了间隔时间,那指定时间点,就先计算当前时间到指定时间点的间隔时间
- 不管间隔时间,而是记录任务时间点,然后实时去检查是否到达指定时间点
思路有了,但是对于固定时间点执行任务的场景以及后面更复杂的场景,自己实现可能就变得复杂。 下面再介绍几个进阶的定时任务的实现方式,可以适应更复杂的业务场景。 sched第三种方式是使用标准库中sched 模块。sched 是事件调度器, 它通过 scheduler 类来调度事件,从而达到定时执行任务的效果。 简单示例如下: import schedschedule = sched.scheduler() # 初始化 sched 模块的 scheduler 类schedule.enter(10, 1, task) # 增加调度任务schedule.run() # 开始调度任务 scheduler 提供了两个添加调度任务的函数: enter(delay, priority, action, argument=(), kwargs={}) 该函数可以延迟一定时间执行任务。delay 表示延迟多长时间执行任务,单位是秒。 priority为优先级,越小优先级越大。两个任务指定相同的延迟时间,优先级大的任务会向被执行。 action 即需要执行的函数,argument 和 kwargs 分别是函数的位置和关键字参数。
scheduler.enterabs(time, priority, action, argument=(), kwargs={}) 添加一项任务,但这个任务会在 time 这时刻执行。因此,time 是绝对时间。其他参数用法与 enter() 中的参数用法是一致。
优点: - 执行时间间隔和时间点执行任务
- 可以添加不同的任务
- 任务可以设置优先级
缺点: scheduler 中的每个调度任务只会工作一次,不会无限循环被调用。如果想重复执行同一任务, 需要重复添加调度任务即可。 import schedimport threadingfrom functools import wrapss = sched.scheduler()STOP_FLAG = threading.Event()def repeat(interval): def wrapper(func): @wraps(func) def inner(): if not STOP_FLAG.is_set(): s.enter(interval, 0, inner) func() return inner return wrapper@repeat(5)def new_task(): return task()new_task()t = threading.Thread(target=s.run)t.start() 实现原理当然我们仅仅学会怎么用还是不够的,不能知其然而不知其所以然。 介绍了那么多方法,那么多库,但是底层的实现逻辑都是差不多的。 一个完整的定时任务系统,有三个部分: - 任务队列(task queue)
根据执行时间和优先级进行排序 排序sorted(): 任务一多,整个队列重排,性能堪忧 heapq.pop():堆排序,只获取最高优先级的任务,不重复排序 - 调度器(scheduler)
- 执行器(executor)
从前到后,实现的内容越来越多,控制的精细度也就越来越高,实现的功能也就越来越丰富。
import schedimport timeimport threadingfrom functools import wrapsfrom task import taskfrom typing import CallableSTOP_FLAG = threading.Event()def timeloop(func: Callable, interval: int): while True: func() time.sleep(interval)def repeat_task(func: Callable, interval: int): # 新建任务的前后,决定了是在任务结束后等待时间,还是固定间隔 if not STOP_FLAG.is_set(): t = threading.Timer(interval, repeat_task, args=(func, interval)) t.start() func()def timer_schedule(func: Callable, interval: int): repeat_task(func, interval)s = sched.scheduler()def repeat(interval): def wrapper(func): @wraps(func) def inner(): if not STOP_FLAG.is_set(): s.enter(interval, 0, inner) func() return inner return wrapper@repeat(5)def new_task(): return task()def sched_schedule(func: Callable, interval: int): func() t = threading.Thread(target=s.run) t.start() return tdef main(schedule): schedule_thread = schedule(new_task, 5) while True: try: time.sleep(1) except KeyboardInterrupt: STOP_FLAG.set() schedule_thread.join() breakif __name__ == "__main__": # main(timeloop) # main(timer_schedule) main(sched_schedule) 到此这篇关于Python中实现定时任务详解的文章就介绍到这了,更多相关Python中实现定时任务内容请搜索wanshiok.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持wanshiok.com! Tkinter使用Progressbar创建和管理进度条的操作代码 linux环境部署清华大学大模型最新版 chatglm2-6b 图文教程 |