您好,登錄后才能下訂單哦!
本篇內(nèi)容主要講解“Python的周期任務(wù)調(diào)度工具是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學(xué)習(xí)“Python的周期任務(wù)調(diào)度工具是什么”吧!
如果你想周期性地執(zhí)行某個 Python 腳本,最出名的選擇應(yīng)該是 Crontab 腳本,但是 Crontab 具有以下缺點:
1.不方便執(zhí)行秒級任務(wù)。
2.當(dāng)需要執(zhí)行的定時任務(wù)有上百個的時候,Crontab 的管理就會特別不方便。
還有一個選擇是 Celery,但是 Celery 的配置比較麻煩,如果你只是需要一個輕量級的調(diào)度工具,Celery 不會是一個好選擇。
在你想要使用一個輕量級的任務(wù)調(diào)度工具,而且希望它盡量簡單、容易使用、不需要外部依賴,最好能夠容納 Crontab 的所有基本功能,那么 Schedule 模塊是你的不二之選。
使用它來調(diào)度任務(wù)可能只需要幾行代碼,感受一下:
# Python 實用寶典 import schedule import time def job(): print("I'm working...") schedule.every(10).minutes.do(job) while True: schedule.run_pending() time.sleep(1)
上面的代碼表示每10分鐘執(zhí)行一次 job 函數(shù),非常簡單方便。你只需要引入 schedule 模塊,通過調(diào)用 scedule.every(時間數(shù)).時間類型.do(job) 發(fā)布周期任務(wù)。
發(fā)布后的周期任務(wù)需要用 run_pending 函數(shù)來檢測是否執(zhí)行,因此需要一個 While 循環(huán)不斷地輪詢這個函數(shù)。
下面具體講講Schedule模塊的安裝和初級、進階使用方法。
開始之前,你要確保Python和pip已經(jīng)成功安裝在電腦上,如果沒有,可以訪問這篇文章:超詳細(xì)Python安裝指南 進行安裝。
(可選1) 如果你用Python的目的是數(shù)據(jù)分析,可以直接安裝Anaconda:Python數(shù)據(jù)分析與挖掘好幫手—Anaconda,它內(nèi)置了Python和pip.
(可選2) 此外,推薦大家用VSCode編輯器,它有許多的優(yōu)點:Python 編程的最好搭檔—VSCode 詳細(xì)指南。
請選擇以下任一種方式輸入命令安裝依賴:
1. Windows 環(huán)境 打開 Cmd (開始-運行-CMD)。
2. MacOS 環(huán)境 打開 Terminal (command+空格輸入Terminal)。
3. 如果你用的是 VSCode編輯器 或 Pycharm,可以直接使用界面下方的Terminal.
pip install schedule
最基本的使用在文首已經(jīng)提到過,下面給大家展示更多的調(diào)度任務(wù)例子:
# Python 實用寶典 import schedule import time def job(): print("I'm working...") # 每十分鐘執(zhí)行任務(wù) schedule.every(10).minutes.do(job) # 每個小時執(zhí)行任務(wù) schedule.every().hour.do(job) # 每天的10:30執(zhí)行任務(wù) schedule.every().day.at("10:30").do(job) # 每個月執(zhí)行任務(wù) schedule.every().monday.do(job) # 每個星期三的13:15分執(zhí)行任務(wù) schedule.every().wednesday.at("13:15").do(job) # 每分鐘的第17秒執(zhí)行任務(wù) schedule.every().minute.at(":17").do(job) while True: schedule.run_pending() time.sleep(1)
可以看到,從月到秒的配置,上面的例子都覆蓋到了。不過如果你想只運行一次任務(wù)的話,可以這么配:
# Python 實用寶典 import schedule import time def job_that_executes_once(): # 此處編寫的任務(wù)只會執(zhí)行一次... return schedule.CancelJob schedule.every().day.at('22:30').do(job_that_executes_once) while True: schedule.run_pending() time.sleep(1)
參數(shù)傳遞
如果你有參數(shù)需要傳遞給作業(yè)去執(zhí)行,你只需要這么做:
# Python 實用寶典 import schedule def greet(name): print('Hello', name) # do() 將額外的參數(shù)傳遞給job函數(shù) schedule.every(2).seconds.do(greet, name='Alice') schedule.every(4).seconds.do(greet, name='Bob')
獲取目前所有的作業(yè)
如果你想獲取目前所有的作業(yè):
# Python 實用寶典 import schedule def hello(): print('Hello world') schedule.every().second.do(hello) all_jobs = schedule.get_jobs()
取消所有作業(yè)
如果某些機制觸發(fā)了,你需要立即清除當(dāng)前程序的所有作業(yè):
# Python 實用寶典 import schedule def greet(name): print('Hello {}'.format(name)) schedule.every().second.do(greet) schedule.clear()
標(biāo)簽功能
在設(shè)置作業(yè)的時候,為了后續(xù)方便管理作業(yè),你可以給作業(yè)打個標(biāo)簽,這樣你可以通過標(biāo)簽過濾獲取作業(yè)或取消作業(yè)。
# Python 實用寶典 import schedule def greet(name): print('Hello {}'.format(name)) # .tag 打標(biāo)簽 schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend') schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend') schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer') schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest') # get_jobs(標(biāo)簽):可以獲取所有該標(biāo)簽的任務(wù) friends = schedule.get_jobs('friend') # 取消所有 daily-tasks 標(biāo)簽的任務(wù) schedule.clear('daily-tasks')
設(shè)定作業(yè)截止時間
如果你需要讓某個作業(yè)到某個時間截止,你可以通過這個方法:
# Python 實用寶典 import schedule from datetime import datetime, timedelta, time def job(): print('Boo') # 每個小時運行作業(yè),18:30后停止 schedule.every(1).hours.until("18:30").do(job) # 每個小時運行作業(yè),2030-01-01 18:33 today schedule.every(1).hours.until("2030-01-01 18:33").do(job) # 每個小時運行作業(yè),8個小時后停止 schedule.every(1).hours.until(timedelta(hours=8)).do(job) # 每個小時運行作業(yè),11:32:42后停止 schedule.every(1).hours.until(time(11, 33, 42)).do(job) # 每個小時運行作業(yè),2020-5-17 11:36:20后停止 schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)
截止日期之后,該作業(yè)將無法運行。
立即運行所有作業(yè),而不管其安排如何
如果某個機制觸發(fā)了,你需要立即運行所有作業(yè),可以調(diào)用 schedule.run_all() :
# Python 實用寶典 import schedule def job_1(): print('Foo') def job_2(): print('Bar') schedule.every().monday.at("12:40").do(job_1) schedule.every().tuesday.at("16:40").do(job_2) schedule.run_all() # 立即運行所有作業(yè),每次作業(yè)間隔10秒 schedule.run_all(delay_seconds=10)
裝飾器安排作業(yè)
如果你覺得設(shè)定作業(yè)這種形式太啰嗦了,也可以使用裝飾器模式:
# Python 實用寶典 from schedule import every, repeat, run_pending import time # 此裝飾器效果等同于 schedule.every(10).minutes.do(job) @repeat(every(10).minutes) def job(): print("I am a scheduled job") while True: run_pending() time.sleep(1)
并行執(zhí)行
默認(rèn)情況下,Schedule 按順序執(zhí)行所有作業(yè)。其背后的原因是,很難找到讓每個人都高興的并行執(zhí)行模型。
不過你可以通過多線程的形式來運行每個作業(yè)以解決此限制:
# Python 實用寶典 import threading import time import schedule def job1(): print("I'm running on thread %s" % threading.current_thread()) def job2(): print("I'm running on thread %s" % threading.current_thread()) def job3(): print("I'm running on thread %s" % threading.current_thread()) def run_threaded(job_func): job_thread = threading.Thread(target=job_func) job_thread.start() schedule.every(10).seconds.do(run_threaded, job1) schedule.every(10).seconds.do(run_threaded, job2) schedule.every(10).seconds.do(run_threaded, job3) while True: schedule.run_pending() time.sleep(1)
日志記錄
Schedule 模塊同時也支持 logging 日志記錄,這么使用:
# Python 實用寶典 import schedule import logging logging.basicConfig() schedule_logger = logging.getLogger('schedule') # 日志級別為DEBUG schedule_logger.setLevel(level=logging.DEBUG) def job(): print("Hello, Logs") schedule.every().second.do(job) schedule.run_all() schedule.clear()
效果如下:
DEBUG:schedule:Running *all* 1 jobs with 0s delay in between DEBUG:schedule:Running job Job(interval=1, unit=seconds, do=job, args=(), kwargs={}) Hello, Logs DEBUG:schedule:Deleting *all* jobs
異常處理
Schedule 不會自動捕捉異常,它遇到異常會直接拋出,這會導(dǎo)致一個嚴(yán)重的問題:后續(xù)所有的作業(yè)都會被中斷執(zhí)行,因此我們需要捕捉到這些異常。
你可以手動捕捉,但是某些你預(yù)料不到的情況需要程序進行自動捕獲,加一個裝飾器就能做到了:
# Python 實用寶典 import functools def catch_exceptions(cancel_on_failure=False): def catch_exceptions_decorator(job_func): @functools.wraps(job_func) def wrapper(*args, **kwargs): try: return job_func(*args, **kwargs) except: import traceback print(traceback.format_exc()) if cancel_on_failure: return schedule.CancelJob return wrapper return catch_exceptions_decorator @catch_exceptions(cancel_on_failure=True) def bad_task(): return 1 / 0 schedule.every(5).minutes.do(bad_task)
這樣,bad_task 在執(zhí)行時遇到的任何錯誤,都會被 catch_exceptions 捕獲,這點在保證調(diào)度任務(wù)正常運轉(zhuǎn)的時候非常關(guān)鍵。
到此,相信大家對“Python的周期任務(wù)調(diào)度工具是什么”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進入相關(guān)頻道進行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。