您好,登錄后才能下訂單哦!
這篇文章主要介紹使用Redis實(shí)現(xiàn)定時(shí)任務(wù)多節(jié)點(diǎn)部署及自動(dòng)任務(wù)分發(fā)的方法,文中介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們一定要看完!
最近部署一個(gè)定時(shí)任務(wù)項(xiàng)目遇到一個(gè)問題:部署的代碼每天會(huì)定時(shí)啟動(dòng)多個(gè)任務(wù)在后臺(tái)串行執(zhí)行,且每個(gè)任務(wù)耗時(shí)比較長。我們的生產(chǎn)環(huán)境會(huì)自動(dòng)開啟三個(gè)相同的節(jié)點(diǎn)把部署的代碼同步運(yùn)行,這就導(dǎo)致三個(gè)節(jié)點(diǎn)的定時(shí)任務(wù)會(huì)重復(fù)執(zhí)行,我只好強(qiáng)行關(guān)閉另外兩個(gè)節(jié)點(diǎn),以單節(jié)點(diǎn)的方式串行執(zhí)行定時(shí)任務(wù)。
這樣代碼是能跑起來,但是效率低且浪費(fèi)了另外兩個(gè)服務(wù)器。我需要改造代碼以便能支持多節(jié)點(diǎn)運(yùn)行,且能讓每個(gè)節(jié)點(diǎn)分發(fā)到不同的任務(wù)去執(zhí)行。
思考過后,我借助redis實(shí)現(xiàn)了一個(gè)簡單的分布式定時(shí)任務(wù),在這里記錄一下。
思路
要實(shí)現(xiàn)分布式,必須要使用到每個(gè)節(jié)點(diǎn)都能訪問到的公共變量作為全局鎖,這里我使用了redis來實(shí)現(xiàn)鎖。
對(duì)每個(gè)不同類的任務(wù)設(shè)置唯一的字符串鍵,當(dāng)定時(shí)任務(wù)開始時(shí),每個(gè)節(jié)點(diǎn)將會(huì)去遍歷任務(wù)搶占鎖,只有獲取到鎖的節(jié)點(diǎn)可以執(zhí)行鎖對(duì)應(yīng)的任務(wù),而未獲取到鎖的節(jié)點(diǎn)將繼續(xù)遍歷其他任務(wù)鎖,直到所有任務(wù)鎖被獲取并執(zhí)行。
示例代碼:task_manager.py
from redis import Redisfrom apscheduler.schedulers.blocking import BlockingSchedulerimport timeimport randomr = Redis(host='localhost', port=6379, db=0, decode_responses=True)TASK_LIST = ["任務(wù)參數(shù)1", "任務(wù)參數(shù)2", "任務(wù)參數(shù)3", "任務(wù)參數(shù)4"]# 存放當(dāng)前已完成的集合鍵FINISHED_TASK_KEY = "task:finished"# 給每個(gè)任務(wù)順序分配不同的redis鍵TASK_KEYS = ["task: {}".format(task) for task in range(len(TASK_LIST))]# redis鍵對(duì)應(yīng)的任務(wù)參數(shù)TASK_MAP = {key: task for key, task in zip(TASK_KEYS, TASK_LIST)}EXPIRE = 60 * 60 * 23# 模擬定時(shí)執(zhí)行的任務(wù),可接受不同的參數(shù)def task(param): print("----------------------------------") print("開始執(zhí)行任務(wù):{}....".format(param)) time.sleep(random.randint(15, 30)) print("完成{}".format(param)) print("----------------------------------")def start(): # 每日任務(wù)開始前先刪除redis中已完成任務(wù)集合 r.delete(FINISHED_TASK_KEY) # 添加redis中唯一任務(wù)鎖,若添加成功且任務(wù)完成表中無當(dāng)前任務(wù)則執(zhí)行 for key in TASK_KEYS: status = r.set(key, 1, ex=EXPIRE, nx=True) # 檢查當(dāng)前任務(wù)是否已執(zhí)行 is_finished = r.sismember(FINISHED_TASK_KEY, key) if status is True and is_finished is False: print("當(dāng)前節(jié)點(diǎn)獲取任務(wù):{} 成功, 準(zhǔn)備執(zhí)行。。".format(key)) # 獲取當(dāng)前redis鍵對(duì)應(yīng)的任務(wù)參數(shù)并執(zhí)行任務(wù) task_param = TASK_MAP.get(key) task(task_param) # 任務(wù)執(zhí)行完成將當(dāng)前任務(wù)添加到已完成集合,不允許其他節(jié)點(diǎn)再獲取該任務(wù) r.sadd(FINISHED_TASK_KEY, key) r.delete(key) elif status is True and is_finished is True: print("未執(zhí)行: {},其他節(jié)點(diǎn)已完成該任務(wù)".format(key)) else: print("未執(zhí)行:{},其他節(jié)點(diǎn)正在執(zhí)行當(dāng)前任務(wù)".format(key))if __name__ == '__main__': # 這里可使用apscheduler定時(shí)任務(wù)框架調(diào)度start函數(shù),該處模擬未使用定時(shí)任務(wù) start()
模擬多節(jié)點(diǎn)運(yùn)行
先同時(shí)打開三個(gè)命令行窗口,并鍵入執(zhí)行命名,然后盡量相近時(shí)間點(diǎn)擊回車運(yùn)行,模擬多節(jié)點(diǎn)下定時(shí)任務(wù)同時(shí)觸發(fā)的場(chǎng)景:
執(zhí)行情況如下
模擬多節(jié)點(diǎn)同步運(yùn)行結(jié)果
根據(jù)打印的執(zhí)行情況看,整體符合預(yù)期,一個(gè)任務(wù)只能被一個(gè)節(jié)點(diǎn)執(zhí)行,且能做到自動(dòng)任務(wù)分發(fā),執(zhí)行完當(dāng)前任務(wù)的節(jié)點(diǎn)會(huì)自動(dòng)去獲取還未執(zhí)行的任務(wù),直到所有任務(wù)執(zhí)行完畢。
最后
當(dāng)然,以上只是一個(gè)demo,還需要處理很多異常判斷任務(wù),任務(wù)完成情況之類的,針對(duì)不同的任務(wù)情景還可以有其他變通方法。這里只是整理一下思路和分享,希望對(duì)你也有所啟發(fā)。
以上是使用Redis實(shí)現(xiàn)定時(shí)任務(wù)多節(jié)點(diǎn)部署及自動(dòng)任務(wù)分發(fā)的方法的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對(duì)大家有幫助,更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。