您好,登錄后才能下訂單哦!
一. 如何調(diào)用
def f1(arg1, arg2): print('f1', arg1, arg2) def f2(arg1): print('f2', arg1) def f3(): print('f3') def f4(): print('周期任務(wù)', int(time.time())) timer = TaskTimer() # 把任務(wù)加入任務(wù)隊(duì)列 timer.join_task(f1, [1, 2], timing=15.5) # 每天15:30執(zhí)行 timer.join_task(f2, [3], timing=14) # 每天14:00執(zhí)行 timer.join_task(f3, [], timing=15) # 每天15:00執(zhí)行 timer.join_task(f4, [], interval=10) # 每10秒執(zhí)行1次 # 開始執(zhí)行(此時(shí)才會創(chuàng)建線程) timer.start()
f1~f4是我們需要定時(shí)執(zhí)行的函數(shù)。
首先創(chuàng)建TaskTimer對象(TaskTimer的代碼在下面)。調(diào)用join_task函數(shù),把需要執(zhí)行的函數(shù)加入到任務(wù)隊(duì)列。最后調(diào)用start,任務(wù)就開始執(zhí)行了。
join_task參數(shù):
fun:需要執(zhí)行的函數(shù)
arg:fun的參數(shù),如果沒有就傳一個(gè)空列表
interval:如果有此參數(shù),說明任務(wù)是周期任務(wù),單位為秒(注意interval最少5秒)
timing:如果有此參數(shù),說明任務(wù)是定時(shí)任務(wù),單位為時(shí)
注意:interval和timing只能選填1個(gè)
二. 源碼
import datetime import time from threading import Thread from time import sleep class TaskTimer: __instance = None def __new__(cls, *args, **kwargs): """ 單例模式 """ if not cls.__instance: cls.__instance = object.__new__(cls) return cls.__instance def __init__(self): if not hasattr(self, 'task_queue'): setattr(self, 'task_queue', []) if not hasattr(self, 'is_running'): setattr(self, 'is_running', False) def write_log(self, level, msg): cur_time = datetime.datetime.now() with open('./task.log', mode='a+', encoding='utf8') as file: s = "[" + str(cur_time) + "][" + level + "] " + msg print(s) file.write(s + "\n") def work(self): """ 處理任務(wù)隊(duì)列 """ while True: for task in self.task_queue: if task['interval']: self.cycle_task(task) elif task['timing']: self.timing_task(task) sleep(5) def cycle_task(self, task): """ 周期任務(wù) """ if task['next_sec'] <= int(time.time()): try: task['fun'](*task['arg']) self.write_log("正常", "周期任務(wù):" + task['fun'].__name__ + " 已執(zhí)行") except Exception as e: self.write_log("異常", "周期任務(wù):" + task['fun'].__name__ + " 函數(shù)內(nèi)部異常:" + str(e)) finally: task['next_sec'] = int(time.time()) + task['interval'] def timing_task(self, task): """ 定時(shí)任務(wù) """ # 今天已過秒數(shù) today_sec = self.get_today_until_now() # 到了第二天,就重置任務(wù)狀態(tài) if task['today'] != self.get_today(): task['today'] = self.get_today() task['today_done'] = False # 第一次執(zhí)行 if task['first_work']: if today_sec >= task['task_sec']: task['today_done'] = True task['first_work'] = False else: task['first_work'] = False # 今天還沒有執(zhí)行 if not task['today_done']: if today_sec >= task['task_sec']: # 到點(diǎn)了,開始執(zhí)行任務(wù) try: task['fun'](*task['arg']) self.write_log("正常", "定時(shí)任務(wù):" + task['fun'].__name__ + " 已執(zhí)行") except Exception as e: self.write_log("異常", "定時(shí)任務(wù):" + task['fun'].__name__ + " 函數(shù)內(nèi)部異常:" + str(e)) finally: task['today_done'] = True if task['first_work']: task['first_work'] = False def get_today_until_now(self): """ 獲取今天凌晨到現(xiàn)在的秒數(shù) """ i = datetime.datetime.now() return i.hour * 3600 + i.minute * 60 + i.second def get_today(self): """ 獲取今天的日期 """ i = datetime.datetime.now() return i.day def join_task(self, fun, arg, interval=None, timing=None): """ interval和timing只能存在1個(gè) :param fun: 你要調(diào)用的任務(wù) :param arg: fun的參數(shù) :param interval: 周期任務(wù),單位秒 :param timing: 定時(shí)任務(wù),取值:[0,24) """ # 參數(shù)校驗(yàn) if (interval != None and timing != None) or (interval == None and timing == None): raise Exception('interval和timing只能選填1個(gè)') if timing and not 0 <= timing < 24: raise Exception('timing的取值范圍為[0,24)') if interval and interval < 5: raise Exception('interval最少為5') # 封裝一個(gè)task task = { 'fun': fun, 'arg': arg, 'interval': interval, 'timing': timing, } # 封裝周期或定時(shí)任務(wù)相應(yīng)的參數(shù) if timing: task['task_sec'] = timing * 3600 task['today_done'] = False task['first_work'] = True task['today'] = self.get_today() elif interval: task['next_sec'] = int(time.time()) + interval # 把task加入任務(wù)隊(duì)列 self.task_queue.append(task) self.write_log("正常", "新增任務(wù):" + fun.__name__) def start(self): """ 開始執(zhí)行任務(wù) 返回線程標(biāo)識符 """ if not self.is_running: thread = Thread(target=self.work) thread.start() self.is_running = True self.write_log("正常", "TaskTimer已開始運(yùn)行!") return thread.ident self.write_log("警告", "TaskTimer已運(yùn)行,請勿重復(fù)啟動!")
以上這篇python異步實(shí)現(xiàn)定時(shí)任務(wù)和周期任務(wù)的方法就是小編分享給大家的全部內(nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持億速云。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。