您好,登錄后才能下訂單哦!
本篇內(nèi)容介紹了“Python定時(shí)任務(wù)觸發(fā)的方法是什么”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
APScheduler
APScheduler 四個(gè)組件分別為:
調(diào)度器(scheduler)、觸發(fā)器(trigger),作業(yè)存儲(chǔ)(job store),執(zhí)行器(executor)
安裝命令:
pip install setuptools pip install --ignore-installed apscheduler
BlockingScheduler : 調(diào)度器在當(dāng)前進(jìn)程的主線程中運(yùn)行,也就是會(huì)阻塞當(dāng)前線程
BackgroundScheduler : 調(diào)度器在后臺(tái)線程中運(yùn)行,不會(huì)阻塞當(dāng)前線程
import datetime as dt from apscheduler.schedulers.blocking import BlockingScheduler scheduler = BlockingScheduler()
① date 觸發(fā)器:(指定時(shí)間點(diǎn)觸發(fā)),參數(shù)如下:
run_date(datetime或str):任務(wù)運(yùn)行的日期或時(shí)間
timezone(datetime.tzinfo或str):指定時(shí)區(qū)
# 例1:在 2020-9-24 時(shí)刻運(yùn)行一次 func 方法 scheduler.add_job(func, 'date', run_date = dt.date(2020, 9, 24)) # 例2: 在 2020-9-24 15:10:00 時(shí)刻運(yùn)行一次 func 方法 scheduler.add_job(func, 'date', run_date = dt.datetime(2020, 9, 24, 15, 10, 0)) # 例3: 在 2020-9-24 15:11:00 時(shí)刻運(yùn)行一次 func 方法 scheduler.add_job(func, 'date', run_date = '2020-9-24 15:11:00')
② interval 觸發(fā)器: (固定時(shí)間間隔觸發(fā)),參數(shù)如下:
weeks(int):間隔幾周
days(int):間隔幾天
hours(int):間隔幾小時(shí)
minutes(int):間隔幾分鐘
seconds(int):間隔幾秒鐘
start_date(datetime或str):開(kāi)始時(shí)間
end_date(datetime或str):結(jié)束時(shí)間
timezone(datetime.tzinfo或str):時(shí)區(qū)
# 例1:每隔兩分鐘執(zhí)行一次 func 方法 scheduler.add_job(func, 'interval', minutes = 2) # 例2:在 2020-9-24 15:15:00 ~ 2020-9-24 15:20:00 之間, 每隔兩分鐘執(zhí)行一次 func 方法 scheduler.add_job(func, 'interval', minutes = 2, start_date = '2020-9-24 15:15:00' , end_date = '2020-9-24 15:20:00')
③ cron 觸發(fā)器:(在指定時(shí)間周期性地觸發(fā)),參數(shù)如下:
year(int 或 str):年
month(int 或 str):月
day(int 或 str):日
week(int 或 str):周(1-53)
day_of_week(int 或 str):星期幾(0-6)
hour(int 或 str):時(shí)
minute(int 或 str):分
second(int 或 str):秒
start_date(datetime或str):最早開(kāi)始時(shí)間(包含)
end_date(datetime或str):最晚結(jié)束時(shí)間(包含)
timezone(datetime.tzinfo或str):指定時(shí)區(qū)
字符 :
1. * 每一(每一分)
2. ? 表示不關(guān)心,任意
3. - 范圍 (小時(shí):1-12,1到12點(diǎn)運(yùn)行)
4. , 標(biāo)示多個(gè)值 (小時(shí) 1,2,3 1點(diǎn)2點(diǎn)3點(diǎn)運(yùn)行)
5. / 遞增觸發(fā)(0/15,從0開(kāi)始每15秒運(yùn)行一次)
6. L 最后(日L,當(dāng)月最后一天,周L周六)
7. W 指定日期最近的工作日(周一到周五)
8. # 序號(hào)(表示每月的第幾個(gè)周幾)
# 例:在每年 1-3、7-9 月份中的每個(gè)星期一、二中的 00:00, 01:00, 02:00 和 03:00 執(zhí)行 func 任務(wù) scheduler.add_job(func, 'cron', month = '1-3,7-9',day='0, tue', hour='0-3')
scheduler.start()
3.1 測(cè)試時(shí)間
def forecast_adjust(): now_temp = datetime.now() print('執(zhí)行方案一', now_temp, '時(shí)間間隔: ', now_temp-t0) def for2(): now_temp = datetime.now() print('執(zhí)行方案二', now_temp, '時(shí)間間隔: ', now_temp-t0) def fortime3(): now_temp = datetime.now() print('執(zhí)行方案三', now_temp, '時(shí)間間隔: ', now_temp-t0) return '9999999999999' def a__(): b = scheduler.add_job(fortime3, 'cron', hour='15', minute = '18') c = scheduler.add_job(fortime3, 'cron', hour='15', minute = '30') d = scheduler.add_job(fortime3, 'cron', hour='15', minute = '45') print(b) print(c) print(c) return 'kkkqq' t0 = datetime.now() scheduler = BlockingScheduler() # 采用阻塞的方式 scheduler.add_job(func=forecast_adjust, trigger=CronTrigger(minute="*/1", second=20, timezone=tz_now), args=[]) scheduler.add_job(func=for2, trigger=CronTrigger(minute="*/5", second=10, timezone=tz_now), args=[]) k = a__() print(k) scheduler.start()
APScheduler 定點(diǎn)、定時(shí):
四個(gè)組件分別為:觸發(fā)器(trigger),作業(yè)存儲(chǔ)器(job store),執(zhí)行器(executor),調(diào)度器(scheduler)
(1)job stores:對(duì)調(diào)度任務(wù)的管理:
① 添加job:
# add_job():可以改變或者移除 job scheduler.add_job(func, 'interval', minutes = 2) # scheduled_job():只適用于應(yīng)用運(yùn)行期間不會(huì)改變的 job scheduler.scheduled_job(func, 'interval', minutes = 2)
②移除job:
# remove_job() :根據(jù) job 的 id 來(lái)移除,所以要在 job 創(chuàng)建的時(shí)候指定一個(gè) id scheduler.add_job(func, 'interval', minutes = 2, id = 'job_one') scheduler.remove_job(job_one) # job.remove() :對(duì) job 執(zhí)行 remove 方法 job = add_job(func, 'interval', minutes = 2, id = 'job_one') job.remvoe()
③ 暫停job:
apscheduler.job.Job.pause() apscheduler.schedulers.base.BaseScheduler.pause_job()
④ 恢復(fù)job:
apscheduler.job.Job.resume() apscheduler.schedulers.base.BaseScheduler.resume_job()
⑤ 修改job:
# modify_job() scheduler.modify_job('job_one', minutes = 5) # job.modify() job = scheduler.add_job(func, 'interval', minutes = 2) job.modify(minutes = 5)
⑥ 關(guān)閉job:
scheduler.shutdown() scheduler.shutdown(wait=false)
(2)executors:執(zhí)行調(diào)度任務(wù)的模塊,常用的 executor 有兩種:
ProcessPoolExecutor ThreadPoolExecutor
“Python定時(shí)任務(wù)觸發(fā)的方法是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。