溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

使用Redis實(shí)現(xiàn)定時(shí)任務(wù)多節(jié)點(diǎn)部署及自動(dòng)任務(wù)分發(fā)的方法

發(fā)布時(shí)間:2020-10-19 18:19:33 來源:億速云 閱讀:562 作者:小新 欄目:編程語言

這篇文章主要介紹使用Redis實(shí)現(xiàn)定時(shí)任務(wù)多節(jié)點(diǎn)部署及自動(dòng)任務(wù)分發(fā)的方法,文中介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們一定要看完!

最近部署一個(gè)定時(shí)任務(wù)項(xiàng)目遇到一個(gè)問題:部署的代碼每天會(huì)定時(shí)啟動(dòng)多個(gè)任務(wù)在后臺(tái)串行執(zhí)行,且每個(gè)任務(wù)耗時(shí)比較長。我們的生產(chǎn)環(huán)境會(huì)自動(dòng)開啟三個(gè)相同的節(jié)點(diǎn)把部署的代碼同步運(yùn)行,這就導(dǎo)致三個(gè)節(jié)點(diǎn)的定時(shí)任務(wù)會(huì)重復(fù)執(zhí)行,我只好強(qiáng)行關(guān)閉另外兩個(gè)節(jié)點(diǎn),以單節(jié)點(diǎn)的方式串行執(zhí)行定時(shí)任務(wù)。

這樣代碼是能跑起來,但是效率低且浪費(fèi)了另外兩個(gè)服務(wù)器。我需要改造代碼以便能支持多節(jié)點(diǎn)運(yùn)行,且能讓每個(gè)節(jié)點(diǎn)分發(fā)到不同的任務(wù)去執(zhí)行。

思考過后,我借助redis實(shí)現(xiàn)了一個(gè)簡單的分布式定時(shí)任務(wù),在這里記錄一下。

思路

要實(shí)現(xiàn)分布式,必須要使用到每個(gè)節(jié)點(diǎn)都能訪問到的公共變量作為全局鎖,這里我使用了redis來實(shí)現(xiàn)鎖。
對(duì)每個(gè)不同類的任務(wù)設(shè)置唯一的字符串鍵,當(dāng)定時(shí)任務(wù)開始時(shí),每個(gè)節(jié)點(diǎn)將會(huì)去遍歷任務(wù)搶占鎖,只有獲取到鎖的節(jié)點(diǎn)可以執(zhí)行鎖對(duì)應(yīng)的任務(wù),而未獲取到鎖的節(jié)點(diǎn)將繼續(xù)遍歷其他任務(wù)鎖,直到所有任務(wù)鎖被獲取并執(zhí)行。

示例代碼:task_manager.py

from redis import Redisfrom apscheduler.schedulers.blocking import BlockingSchedulerimport timeimport randomr = Redis(host='localhost', port=6379, db=0, decode_responses=True)TASK_LIST = ["任務(wù)參數(shù)1", "任務(wù)參數(shù)2", "任務(wù)參數(shù)3", "任務(wù)參數(shù)4"]# 存放當(dāng)前已完成的集合鍵FINISHED_TASK_KEY = "task:finished"# 給每個(gè)任務(wù)順序分配不同的redis鍵TASK_KEYS = ["task: {}".format(task) for task in range(len(TASK_LIST))]# redis鍵對(duì)應(yīng)的任務(wù)參數(shù)TASK_MAP = {key: task for key, task in zip(TASK_KEYS, TASK_LIST)}EXPIRE = 60 * 60 * 23# 模擬定時(shí)執(zhí)行的任務(wù),可接受不同的參數(shù)def task(param):
    print("----------------------------------")
    print("開始執(zhí)行任務(wù):{}....".format(param))
    time.sleep(random.randint(15, 30))
    print("完成{}".format(param))
    print("----------------------------------")def start():
    # 每日任務(wù)開始前先刪除redis中已完成任務(wù)集合
    r.delete(FINISHED_TASK_KEY)
    # 添加redis中唯一任務(wù)鎖,若添加成功且任務(wù)完成表中無當(dāng)前任務(wù)則執(zhí)行
    for key in TASK_KEYS:
        status = r.set(key, 1, ex=EXPIRE, nx=True)
        # 檢查當(dāng)前任務(wù)是否已執(zhí)行
        is_finished = r.sismember(FINISHED_TASK_KEY, key)
        if status is True and is_finished is False:
            print("當(dāng)前節(jié)點(diǎn)獲取任務(wù):{} 成功, 準(zhǔn)備執(zhí)行。。".format(key))
            # 獲取當(dāng)前redis鍵對(duì)應(yīng)的任務(wù)參數(shù)并執(zhí)行任務(wù)
            task_param = TASK_MAP.get(key)
            task(task_param)
            # 任務(wù)執(zhí)行完成將當(dāng)前任務(wù)添加到已完成集合,不允許其他節(jié)點(diǎn)再獲取該任務(wù)
            r.sadd(FINISHED_TASK_KEY, key)
            r.delete(key)
        elif status is True and is_finished is True:
            print("未執(zhí)行: {},其他節(jié)點(diǎn)已完成該任務(wù)".format(key))
        else:
            print("未執(zhí)行:{},其他節(jié)點(diǎn)正在執(zhí)行當(dāng)前任務(wù)".format(key))if __name__ == '__main__':
    # 這里可使用apscheduler定時(shí)任務(wù)框架調(diào)度start函數(shù),該處模擬未使用定時(shí)任務(wù)
    start()

模擬多節(jié)點(diǎn)運(yùn)行

先同時(shí)打開三個(gè)命令行窗口,并鍵入執(zhí)行命名,然后盡量相近時(shí)間點(diǎn)擊回車運(yùn)行,模擬多節(jié)點(diǎn)下定時(shí)任務(wù)同時(shí)觸發(fā)的場(chǎng)景:

使用Redis實(shí)現(xiàn)定時(shí)任務(wù)多節(jié)點(diǎn)部署及自動(dòng)任務(wù)分發(fā)的方法

執(zhí)行情況如下

使用Redis實(shí)現(xiàn)定時(shí)任務(wù)多節(jié)點(diǎn)部署及自動(dòng)任務(wù)分發(fā)的方法

模擬多節(jié)點(diǎn)同步運(yùn)行結(jié)果

根據(jù)打印的執(zhí)行情況看,整體符合預(yù)期,一個(gè)任務(wù)只能被一個(gè)節(jié)點(diǎn)執(zhí)行,且能做到自動(dòng)任務(wù)分發(fā),執(zhí)行完當(dāng)前任務(wù)的節(jié)點(diǎn)會(huì)自動(dòng)去獲取還未執(zhí)行的任務(wù),直到所有任務(wù)執(zhí)行完畢。

最后

當(dāng)然,以上只是一個(gè)demo,還需要處理很多異常判斷任務(wù),任務(wù)完成情況之類的,針對(duì)不同的任務(wù)情景還可以有其他變通方法。這里只是整理一下思路和分享,希望對(duì)你也有所啟發(fā)。

以上是使用Redis實(shí)現(xiàn)定時(shí)任務(wù)多節(jié)點(diǎn)部署及自動(dòng)任務(wù)分發(fā)的方法的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對(duì)大家有幫助,更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI