溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Python怎么用sched模塊實現(xiàn)定時任務(wù)

發(fā)布時間:2023-04-03 15:11:45 來源:億速云 閱讀:129 作者:iii 欄目:開發(fā)技術(shù)

本文小編為大家詳細(xì)介紹“Python怎么用sched模塊實現(xiàn)定時任務(wù)”,內(nèi)容詳細(xì),步驟清晰,細(xì)節(jié)處理妥當(dāng),希望這篇“Python怎么用sched模塊實現(xiàn)定時任務(wù)”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來學(xué)習(xí)新知識吧。

牛刀小試

我們先來看下面這個案例,代碼如下

import sched
import time

def say_hello(name):
    print(f"Hello, world, {name}")

scheduler = sched.scheduler(time.time, time.sleep)

scheduler.enter(5, 1, say_hello, ("張三", ))
scheduler.run()

那么上述的代碼中,第一步首先則是實例化一個定時器,通過如下的代碼

import sched

scheduler = sched.scheduler()

接下來我們通過enter()方法來執(zhí)行定時任務(wù)的操作,其中的參數(shù)分別是延遲的時間、任務(wù)的優(yōu)先級以及具體的執(zhí)行函數(shù)和執(zhí)行函數(shù)中的參數(shù)。像如上的代碼就會在延遲5秒鐘之后執(zhí)行say_hello()函數(shù)

當(dāng)然要是延遲的時間相等的時候,我們可以設(shè)置任務(wù)執(zhí)行的優(yōu)先級來指定函數(shù)方法運行的順序,例如有如下的代碼

import sched
import time

def say_hello(name):
    print(f"Hello, world, {name}")

def say_hello_2(name):
    print(f"Hello, {name}")

scheduler = sched.scheduler(time.time, time.sleep)

scheduler.enter(5, 2, say_hello, ("張三", ))
scheduler.enter(5, 1, say_hello_2, ("李四", ))
scheduler.run()

如上述代碼,盡管延遲的時間都是一樣的,但是say_hello()方法的優(yōu)先級明顯要比say_hello_2()方法要低一些,因此后者會優(yōu)先執(zhí)行。

進階使用

除了讓函數(shù)延遲執(zhí)行,我們還可以讓其重復(fù)執(zhí)行,具體這樣來操作,代碼如下

import sched
import time

def say_hello():
    print("Hello, world!")

scheduler = sched.scheduler(time.time, time.sleep)

def repeat_task():
    scheduler.enter(5, 1, say_hello, ())
    scheduler.enter(5, 1, repeat_task, ())

repeat_task()
scheduler.run()

這里我們新建了一個repeat_task()自定義函數(shù),調(diào)用了scheduler.enter()方法5秒鐘執(zhí)行一次之前定義的say_hello()函數(shù)

在固定時間執(zhí)行任務(wù)

同時我們還可以讓任務(wù)在指定的時間執(zhí)行,這里用到scheduler.entertabs()方法,代碼如下

import sched
import time

def say_hello():
    print("Hello, world!")

scheduler = sched.scheduler(time.time, time.sleep)

# 指定時間執(zhí)行任務(wù)
specific_time = time.time() + 5  # 距離現(xiàn)在的5秒鐘之后執(zhí)行
scheduler.enterabs(specific_time, 1, say_hello, ())

scheduler.run()

我們傳入其中參數(shù)使其在指定的時間,也就是距離當(dāng)下的5秒鐘之后來執(zhí)行任務(wù)

執(zhí)行多個任務(wù)

這里仍然是調(diào)用enter()方法來運行多個任務(wù),代碼如下

import sched
import time

def task_one():
    print("Task One - Hello, world!")
    
def task_two():
    print("Task Two - Hello, world!")

scheduler = sched.scheduler(time.time, time.sleep)

# 任務(wù)一在兩秒鐘只有執(zhí)行
scheduler.enter(2, 1, task_one, ())

# 任務(wù)二在五秒鐘之后運行
scheduler.enter(5, 1, task_two, ())

scheduler.run()

這里定義了兩個函數(shù),task_onetask_two里面分是同樣的執(zhí)行邏輯,打印出“Hello, world!”,然后task_one()是在兩秒鐘之后執(zhí)行而task_two()則是在5秒鐘之后執(zhí)行,兩者執(zhí)行的優(yōu)先級都是一樣的。

以不同的優(yōu)先級執(zhí)行不同的任務(wù)

這回我們給task_one()task_two()賦予不同的優(yōu)先級,看一看執(zhí)行的結(jié)果如下

import sched
import time

def task_one():
    print("Task One - Hello, world!")
    
def task_two():
    print("Task Two - Hello, world!")

scheduler = sched.scheduler(time.time, time.sleep)

# 優(yōu)先級是1
scheduler.enter(2, 2, task_one, ())

# 優(yōu)先級是2
scheduler.enter(5, 1, task_two, ())

scheduler.run()

output

Task One - Hello, world!
Task Two - Hello, world!

上述的代碼會在停頓兩秒之后運行task_one()函數(shù),再停頓3秒之后執(zhí)行task_two()函數(shù)

定時任務(wù)加上取消方法

我們給定時任務(wù)添加上取消的方法,代碼如下

import sched
import time

def task_one():
    print("Task One - Hello, world!")
    
def task_two():
    print("Task Two - Hello, world!")

scheduler = sched.scheduler(time.time, time.sleep)

# 任務(wù)一在兩秒鐘只有執(zhí)行
task_one_event = scheduler.enter(2, 1, task_one, ())

# 任務(wù)二在五秒鐘之后運行
task_two_event = scheduler.enter(5, 1, task_two, ())

# 取消執(zhí)行task_one
scheduler.cancel(task_one_event)

scheduler.run()

我們將兩秒鐘之后執(zhí)行的task_one()方法給取消掉,最后就只執(zhí)行了task_two()方法,也就打印出來“Task Two - Hello, world!”

執(zhí)行備份程序

我們來寫一個備份的腳本,在每天固定的時間將文件備份,代碼如下

import sched
import time
import shutil

def backup_files():
    source = '路徑/files'
    destination = '路徑二'
    shutil.copytree(source, destination)

def schedule_backup():
    # 創(chuàng)建新的定時器
    scheduler = sched.scheduler(time.time, time.sleep)

    # 備份程序在每天的1點來執(zhí)行
    backup_time = time.strptime('01:00:00', '%H:%M:%S')
    backup_event = scheduler.enterabs(time.mktime(backup_time), 1, backup_files, ())

    # 開啟定時任務(wù)
    scheduler.run()

schedule_backup()

我們通過shutil模塊當(dāng)中的copytree()方法來執(zhí)行拷貝文件,然后在每天的1點準(zhǔn)時執(zhí)行

執(zhí)行定時分發(fā)郵件的程序

最后我們來執(zhí)行定時分發(fā)郵件的程序,代碼如下

import sched
import time
import smtplib
from email.mime.text import MIMEText

def send_email(subject, message, from_addr, to_addr, smtp_server):
    # 郵件的主體信息
    email = MIMEText(message)
    email['Subject'] = subject
    email['From'] = from_addr
    email['To'] = to_addr

    # 發(fā)郵件
    with smtplib.SMTP(smtp_server) as server:
        server.send_message(email)

def send_scheduled_email(subject, message, from_addr, to_addr, smtp_server, scheduled_time):
    # 創(chuàng)建定時任務(wù)的示例
    scheduler = sched.scheduler(time.time, time.sleep)

    # 定時郵件
    scheduler.enterabs(scheduled_time, 1, send_email, argument=(subject, message, from_addr, to_addr, smtp_server))

    # 開啟定時器
    scheduler.run()

subject = 'Test Email'
message = 'This is a test email'
from_addr = 'test@example.com'
to_addr = 'test@example.com'
smtp_server = 'smtp.test.com'

scheduled_time = time.time() + 60 # 一分鐘之后執(zhí)行程序
send_scheduled_email(subject, message, from_addr, to_addr, smtp_server, scheduled_time)

讀到這里,這篇“Python怎么用sched模塊實現(xiàn)定時任務(wù)”文章已經(jīng)介紹完畢,想要掌握這篇文章的知識點還需要大家自己動手實踐使用過才能領(lǐng)會,如果想了解更多相關(guān)內(nèi)容的文章,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI