您好,登錄后才能下訂單哦!
這期內(nèi)容當(dāng)中小編將會給大家?guī)碛嘘P(guān)Python項(xiàng)目中如何安裝和使用APScheduler定時(shí)任務(wù),文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
1、簡介
APScheduler是一個 Python 定時(shí)任務(wù)框架,使用起來十分方便。提供了基于日期、固定時(shí)間間隔以及 crontab 類型的任務(wù),并且可以持久化任務(wù)、并以 daemon 方式運(yùn)行應(yīng)用。
2、APScheduler四個組件
APScheduler 四個組件分別為:觸發(fā)器(trigger),作業(yè)存儲(job store),執(zhí)行器(executor),調(diào)度器(scheduler)。
觸發(fā)器(trigger)
包含調(diào)度邏輯,每一個作業(yè)有它自己的觸發(fā)器,用于決定接下來哪一個作業(yè)會運(yùn)行。除了他們自己初始配置意外,觸發(fā)器完全是無狀態(tài)的
APScheduler 有三種內(nèi)建的 trigger:
作業(yè)存儲(job store)
存儲被調(diào)度的作業(yè),默認(rèn)的作業(yè)存儲是簡單地把作業(yè)保存在內(nèi)存中,其他的作業(yè)存儲是將作業(yè)保存在數(shù)據(jù)庫中。一個作業(yè)的數(shù)據(jù)講在保存在持久化作業(yè)存儲時(shí)被序列化,并在加載時(shí)被反序列化。調(diào)度器不能分享同一個作業(yè)存儲。
APScheduler 默認(rèn)使用 MemoryJobStore,可以修改使用 DB 存儲方案
執(zhí)行器(executor)
處理作業(yè)的運(yùn)行,他們通常通過在作業(yè)中提交制定的可調(diào)用對象到一個線程或者進(jìn)城池來進(jìn)行。當(dāng)作業(yè)完成時(shí),執(zhí)行器將會通知調(diào)度器。
最常用的 executor 有兩種:
調(diào)度器(scheduler)
通常在應(yīng)用中只有一個調(diào)度器,應(yīng)用的開發(fā)者通常不會直接處理作業(yè)存儲、調(diào)度器和觸發(fā)器,相反,調(diào)度器提供了處理這些的合適的接口。配置作業(yè)存儲和執(zhí)行器可以在調(diào)度器中完成,例如添加、修改和移除作業(yè)
2、安裝
$ pip install apscheduler
接下來我們看下簡單的幾個示例:
===============interval: 固定時(shí)間間隔觸發(fā)=============== from apscheduler.schedulers.blocking import BlockingScheduler from datetime import datetime def job(): print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # 定義BlockingScheduler sched = BlockingScheduler() sched.add_job(job, 'interval', seconds=5) sched.start()
===============cron: 特定時(shí)間周期性地觸發(fā)=============== import time from apscheduler.schedulers.blocking import BlockingScheduler def job(text): t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) print('{} --- {}'.format(text, t)) scheduler = BlockingScheduler() # 在每天22點(diǎn),每隔 1分鐘 運(yùn)行一次 job 方法 scheduler.add_job(job, 'cron', hour=17, minute='*/1', args=['job1']) # 在每天22和23點(diǎn)的25分,運(yùn)行一次 job 方法 scheduler.add_job(job, 'cron', hour='22-23', minute='25', args=['job2']) scheduler.start()
通過裝飾器scheduled_job()添加方法
import time from apscheduler.schedulers.blocking import BlockingScheduler scheduler = BlockingScheduler() @scheduler.scheduled_job('interval', seconds=5) def job1(): t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) print('job1 --- {}'.format(t)) @scheduler.scheduled_job('cron', second='*/7') def job2(): t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) print('job2 --- {}'.format(t)) scheduler.start()
上述就是小編為大家分享的Python項(xiàng)目中如何安裝和使用APScheduler定時(shí)任務(wù)了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。