溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Python3如何實現(xiàn)定時任務

發(fā)布時間:2021-05-07 11:57:21 來源:億速云 閱讀:196 作者:小新 欄目:開發(fā)技術(shù)

小編給大家分享一下Python3如何實現(xiàn)定時任務,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

python有哪些常用庫

python常用的庫:1.requesuts;2.scrapy;3.pillow;4.twisted;5.numpy;6.matplotlib;7.pygama;8.ipyhton等。

最近做一個小程序開發(fā)任務,主要負責后臺部分開發(fā);根據(jù)項目需求,需要實現(xiàn)三個定時任務:

1>定時更新微信token,需要2小時更新一次;

2>商品定時上線;

3>定時檢測后臺服務是否存活;

使用Python去實現(xiàn)這三個任務,這里需要使用定時相關(guān)知識點;

Python實現(xiàn)定點與定時任務方式比較多,找到下面四中實現(xiàn)方式,每個方式都有自己應用場景;下面來快速介紹Python中常用的定時任務實現(xiàn)方式:

1>循環(huán)+sleep;

2>線程模塊中Timer類;

3>schedule模塊;

4>定時框架:APScheduler

在開始之前先設(shè)定一個任務(這樣不用依賴外部環(huán)境):

1:定時或者定點監(jiān)測CPU與內(nèi)存使用率;

2:將時間,CPU,內(nèi)存使用情況保存到日志文件;

先來實現(xiàn)系統(tǒng)監(jiān)測功能:

準備工作:安裝psutil:pip install psutil

功能實現(xiàn)

#psutil:獲取系統(tǒng)信息模塊,可以獲取CPU,內(nèi)存,磁盤等的使用情況
import psutil
import time
import datetime
#logfile:監(jiān)測信息寫入文件
def MonitorSystem(logfile = None):
 #獲取cpu使用情況
 cpuper = psutil.cpu_percent()
 #獲取內(nèi)存使用情況:系統(tǒng)內(nèi)存大小,使用內(nèi)存,有效內(nèi)存,內(nèi)存使用率
 mem = psutil.virtual_memory()
 #內(nèi)存使用率
 memper = mem.percent
 #獲取當前時間
 now = datetime.datetime.now()
 ts = now.strftime('%Y-%m-%d %H:%M:%S')
 line = f'{ts} cpu:{cpuper}%, mem:{memper}%'
 print(line)
 if logfile:
  logfile.write(line)

代碼運行結(jié)果:

2019-03-21 14:23:41 cpu:0.6%, mem:77.2%

接下來我們要實現(xiàn)定時監(jiān)測,比如3s監(jiān)測一下系統(tǒng)資源使用情況。

最簡單使用方式:sleep

這種方式最簡單,直接使用while+sleep就可以實現(xiàn):

def loopMonitor():
 while True:
  MonitorSystem()
  #2s檢查一次
  time.sleep(3)
loopMonitor()

輸出結(jié)果:

2019-03-21 14:28:42 cpu:1.5%, mem:77.6%
2019-03-21 14:28:45 cpu:1.6%, mem:77.6%
2019-03-21 14:28:48 cpu:1.4%, mem:77.6%
2019-03-21 14:28:51 cpu:1.4%, mem:77.6%
2019-03-21 14:28:54 cpu:1.3%, mem:77.6%

這種方式存在問題:只能處理單個定時任務。

如果你依然在編程的世界里迷茫,不知道自己的未來規(guī)劃

自己是一名高級python開發(fā)工程師,從基礎(chǔ)的python腳本到web開發(fā)、爬蟲、django、數(shù)據(jù)挖掘等,零基礎(chǔ)到項目實戰(zhàn)的資料都有整理。送給每一位python的小伙伴!分享一些學習的方法和需要注意的小細節(jié)

又來了新任務:需要每秒監(jiān)測網(wǎng)絡(luò)收發(fā)字節(jié),代碼實現(xiàn)如下:

def MonitorNetWork(logfile = None):
 #獲取網(wǎng)絡(luò)收信息
 netinfo = psutil.net_io_counters()
 #獲取當前時間
 now = datetime.datetime.now()
 ts = now.strftime('%Y-%m-%d %H:%M:%S')
 line = f'{ts} bytessent={netinfo.bytes_sent}, bytesrecv={netinfo.bytes_recv}'
 print(line)
 if logfile:
  logfile.write(line)
MonitorNetWork()

代碼執(zhí)行結(jié)果:

2019-03-21 14:47:21 bytessent=169752183, bytesrecv=1107900973

如果我們同時在while循環(huán)中監(jiān)測兩個任務會有等待問題,不能每秒監(jiān)測網(wǎng)絡(luò)情況。

Timer實現(xiàn)方式

timer最基本理解就是定時器,我們可以啟動多個定時任務,這些定時器任務是異步執(zhí)行,所以不存在等待順序執(zhí)行問題。

先來看Timer的基本使用:

導入:from threading import Timer

主要方法:

Timer方法說明
Timer(interval, function, args=None, kwargs=None)創(chuàng)建定時器
cancel()取消定時器
start()使用線程方式執(zhí)行
join(self, timeout=None)等待線程執(zhí)行結(jié)束

定時器只能執(zhí)行一次,如果需要重復執(zhí)行,需要重新添加任務;

我們先來看基本使用:

from threading import Timer
#記錄當前時間
print(datetime.datetime.now())
#3S執(zhí)行一次
sTimer = Timer(3, MonitorSystem)
#1S執(zhí)行一次
nTimer = Timer(1, MonitorNetWork)
#使用線程方式執(zhí)行
sTimer.start()
nTimer.start()
#等待結(jié)束
sTimer.join()
nTimer.join()
#記錄結(jié)束時間
print(datetime.datetime.now())

輸出結(jié)果:

2019-03-21 15:13:36.739798
2019-03-21 15:13:37 bytessent=171337324, bytesrecv=1109002349
2019-03-21 15:13:39 cpu:1.4%, mem:93.2%
2019-03-21 15:13:39.745187

可以看到,花費時間為3S,但是我們想要做的是每秒監(jiān)控網(wǎng)絡(luò)狀態(tài);如何處理。

Timer只能執(zhí)行一次,所以執(zhí)行完成之后需要再次添加任務,我們對代碼進行修改:

from threading import Timer
import psutil
import time
import datetime
def MonitorSystem(logfile = None):
 cpuper = psutil.cpu_percent()
 mem = psutil.virtual_memory()
 memper = mem.percent
 now = datetime.datetime.now()
 ts = now.strftime('%Y-%m-%d %H:%M:%S')
 line = f'{ts} cpu:{cpuper}%, mem:{memper}%'
 print(line)
 if logfile:
  logfile.write(line)
 #啟動定時器任務,每三秒執(zhí)行一次
 Timer(3, MonitorSystem).start()
def MonitorNetWork(logfile = None):
 netinfo = psutil.net_io_counters()
 now = datetime.datetime.now()
 ts = now.strftime('%Y-%m-%d %H:%M:%S')
 line = f'{ts} bytessent={netinfo.bytes_sent}, bytesrecv={netinfo.bytes_recv}'
 print(line)
 if logfile:
  logfile.write(line)
 #啟動定時器任務,每秒執(zhí)行一次
 Timer(1, MonitorNetWork).start()
MonitorSystem()
MonitorNetWork()

執(zhí)行結(jié)果:

2019-03-21 15:18:21 cpu:1.5%, mem:93.2%
2019-03-21 15:18:21 bytessent=171376522, bytesrecv=1109124678
2019-03-21 15:18:22 bytessent=171382215, bytesrecv=1109128294
2019-03-21 15:18:23 bytessent=171384278, bytesrecv=1109129702
2019-03-21 15:18:24 cpu:1.9%, mem:93.2%
2019-03-21 15:18:24 bytessent=171386341, bytesrecv=1109131110
2019-03-21 15:18:25 bytessent=171388527, bytesrecv=1109132600
2019-03-21 15:18:26 bytessent=171390590, bytesrecv=1109134008

從時間中可以看到,這兩個任務可以同時進行不存在等待問題。

Timer的實質(zhì)是使用線程方式去執(zhí)行任務,每次執(zhí)行完后會銷毀,所以不必擔心資源問題。

調(diào)度模塊:schedule

schedule是一個第三方輕量級的任務調(diào)度模塊,可以按照秒,分,小時,日期或者自定義事件執(zhí)行時間;

安裝方式:

pip install schedule

我們來看一個例子:

import datetime
import schedule
import time
def func():
 now = datetime.datetime.now()
 ts = now.strftime('%Y-%m-%d %H:%M:%S')
 print('do func time :',ts)
def func2():
 now = datetime.datetime.now()
 ts = now.strftime('%Y-%m-%d %H:%M:%S')
 print('do func2 time:',ts)
def tasklist():
 #清空任務
 schedule.clear()
 #創(chuàng)建一個按秒間隔執(zhí)行任務
 schedule.every(1).seconds.do(func)
 #創(chuàng)建一個按2秒間隔執(zhí)行任務
 schedule.every(2).seconds.do(func2)
 #執(zhí)行10S
 for i in range(10):
  schedule.run_pending()
  time.sleep(1)
tasklist()

執(zhí)行結(jié)果:

do func  time : 2019-03-22 08:51:38
do func2 time: 2019-03-22 08:51:39
do func  time : 2019-03-22 08:51:39
do func  time : 2019-03-22 08:51:40
do func2 time: 2019-03-22 08:51:41
do func  time : 2019-03-22 08:51:41
do func  time : 2019-03-22 08:51:42
do func2 time: 2019-03-22 08:51:43
do func  time : 2019-03-22 08:51:43
do func  time : 2019-03-22 08:51:44
do func2 time: 2019-03-22 08:51:45
do func  time : 2019-03-22 08:51:45
do func  time : 2019-03-22 08:51:46

執(zhí)行過程分析:

>1>因為在jupyter下執(zhí)行,所以先將schedule任務清空;
>2>按時間間在schedule中隔添加任務;
>3>這里按照秒間隔添加func,按照兩秒間隔添加func2;
>4>schedule添加任務后,需要查詢?nèi)蝿詹?zhí)行任務;
>5>為了防止占用資源,每秒查詢到點任務,然后順序執(zhí)行;

第5個順序執(zhí)行怎么理解,我們修改func函數(shù),里面添加time.sleep(2)

然后只執(zhí)行func工作,輸出結(jié)果:

do func  time : 2019-03-22 09:00:59
do func  time : 2019-03-22 09:01:02
do func  time : 2019-03-22 09:01:05

可以看到時間間隔為3S,為什么不是1S?

因為這個按照順序執(zhí)行,func休眠2S,循環(huán)任務查詢休眠1S,所以會存在這個問題。

在我們使用這種方式執(zhí)行任務需要注意這種阻塞現(xiàn)象。

我們看下schedule模塊常用使用方法:

#schedule.every(1)創(chuàng)建Job, seconds.do(func)按秒間隔查詢并執(zhí)行
schedule.every(1).seconds.do(func)
#添加任務按分執(zhí)行
schedule.every(1).minutes.do(func)
#添加任務按天執(zhí)行
schedule.every(1).days.do(func)
#添加任務按周執(zhí)行
schedule.every().weeks.do(func)
#添加任務每周1執(zhí)行,執(zhí)行時間為下周一這一時刻時間
schedule.every().monday.do(func)
#每周1,1點15開始執(zhí)行
schedule.every().monday.at("12:00").do(job)

這種方式局限性:如果工作任務回非常耗時就會影響其他任務執(zhí)行。我們可以考慮使用并發(fā)機制配置這個模塊使用。

任務框架APScheduler

APScheduler是Python的一個定時任務框架,用于執(zhí)行周期或者定時任務,

可以基于日期、時間間隔,及類似于Linux上的定時任務crontab類型的定時任務;

該該框架不僅可以添加、刪除定時任務,還可以將任務存儲到數(shù)據(jù)庫中,實現(xiàn)任務的持久化,使用起來非常方便。

安裝方式:pip install apscheduler

apscheduler組件及簡單說明:

1>triggers(觸發(fā)器):觸發(fā)器包含調(diào)度邏輯,每一個作業(yè)有它自己的觸發(fā)器

2>job stores(作業(yè)存儲):用來存儲被調(diào)度的作業(yè),默認的作業(yè)存儲器是簡單地把作業(yè)任務保存在內(nèi)存中,支持存儲到MongoDB,Redis數(shù)據(jù)庫中

3> executors(執(zhí)行器):執(zhí)行器用來執(zhí)行定時任務,只是將需要執(zhí)行的任務放在新的線程或者線程池中運行

4>schedulers(調(diào)度器):調(diào)度器是將其它部分聯(lián)系在一起,對使用者提供接口,進行任務添加,設(shè)置,刪除。

來看一個簡單例子:

import time
from apscheduler.schedulers.blocking import BlockingScheduler
def func():
 now = datetime.datetime.now()
 ts = now.strftime('%Y-%m-%d %H:%M:%S')
 print('do func time :',ts)
def func2():
 #耗時2S
 now = datetime.datetime.now()
 ts = now.strftime('%Y-%m-%d %H:%M:%S')
 print('do func2 time:',ts)
 time.sleep(2)
def dojob():
 #創(chuàng)建調(diào)度器:BlockingScheduler
 scheduler = BlockingScheduler()
 #添加任務,時間間隔2S
 scheduler.add_job(func, 'interval', seconds=2, id='test_job1')
 #添加任務,時間間隔5S
 scheduler.add_job(func2, 'interval', seconds=3, id='test_job2')
 scheduler.start()
dojob()

輸出結(jié)果:

do func  time : 2019-03-22 10:32:20
do func2 time: 2019-03-22 10:32:21
do func  time : 2019-03-22 10:32:22
do func  time : 2019-03-22 10:32:24
do func2 time: 2019-03-22 10:32:24
do func  time : 2019-03-22 10:32:26

輸出結(jié)果中可以看到:任務就算是有延時,也不會影響其他任務執(zhí)行。

APScheduler框架提供豐富接口去實現(xiàn)定時任務,可以去參考官方文檔去查看使用方式。

最后選擇:

簡單總結(jié)上面四種定時定點任務實現(xiàn):

1:循環(huán)+sleep方式適合簡答測試,

2:timer可以實現(xiàn)定時任務,但是對定點任務來說,需要檢查當前時間點;

3:schedule可以定點定時執(zhí)行,但是需要在循環(huán)中檢測任務,而且存在阻塞;

4:APScheduler框架更加強大,可以直接在里面添加定點與定時任務;

綜合考慮,決定使用APScheduler框架,實現(xiàn)簡單,只需要直接創(chuàng)建任務,并將添加到調(diào)度器中即可。

以上是“Python3如何實現(xiàn)定時任務”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學習更多知識,歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI