您好,登錄后才能下訂單哦!
這篇“基于Redis分布式鎖的任務(wù)調(diào)度怎么實現(xiàn)”文章的知識點大部分人都不太理解,所以小編給大家總結(jié)了以下內(nèi)容,內(nèi)容詳細(xì),步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“基于Redis分布式鎖的任務(wù)調(diào)度怎么實現(xiàn)”文章吧。
在分布式大批量數(shù)據(jù)采集過程中,信源的管理尤為重要。為保證同一任務(wù)在同一時間,只能被一個采集器處理,必須保證任務(wù)調(diào)度的唯一性。通常我們在進(jìn)行分布式數(shù)據(jù)采集時,一般情況下都會有一個調(diào)度模塊,其主要的職責(zé)就是負(fù)責(zé)采集任務(wù)的分發(fā),同時保證任務(wù)的唯一性。
由于是分布式,涉及到多臺服務(wù)器(多機),每臺服務(wù)器又涉及到多個采集器(多進(jìn)程),每個采集器又有可能涉及到多線程,所以,任務(wù)調(diào)度模塊中的鎖機制顯得尤為重要。一般情況下,鎖的實現(xiàn)方式,按照應(yīng)用的實現(xiàn)架構(gòu),可能會有以下幾種類型:
如果處理程序是單進(jìn)程多線程的,在 python下,就可以使用 threading 模塊的 Lock 對象來限制對共享變量的同步訪問,實現(xiàn)線程安全。
單機多進(jìn)程的情況,在 python 下,可以使用 multiprocessing 的 Lock 對象來處理。
多機多進(jìn)程部署的情況,就得依賴一個第三方組件(存儲鎖對象)來實現(xiàn)一個分布式的同步鎖了。
由于調(diào)度模塊是多機多進(jìn)程多線程的處理機制,所以符合第三種方式。
分布式鎖實現(xiàn)方式
目前主流的分布式鎖實現(xiàn)方式有以下幾種:
基于數(shù)據(jù)庫來實現(xiàn),如 mysql
基于緩存來實現(xiàn),如 redis
基于 zookeeper 來實現(xiàn)
每種實現(xiàn)方式各有千秋,綜合考量,Redis是最為合適的選擇。主要原因是:
redis 是基于內(nèi)存來操作,存取速度比數(shù)據(jù)庫快,在高并發(fā)下,加鎖之后的性能不會下降太多
redis 可以設(shè)置鍵值的生存時間(TTL)
redis 的使用方式簡單,總體實現(xiàn)開銷小
但是,使用 redis 實現(xiàn)的分布鎖還需要具備以下幾個條件:
同一個時刻只能有一個線程占有鎖,其他線程必須等待直到鎖被釋放
鎖的操作必須滿足原子性
不會發(fā)生死鎖,例如已獲得鎖的線程在釋放鎖之前突然異常退出,導(dǎo)致其他線程會一直在循環(huán)等待鎖被釋放
鎖的添加和釋放必須由同一個線程來設(shè)置
我們使用 redis 來實現(xiàn)一個分布式同步鎖,來保證數(shù)據(jù)的一致性,需滿足一下特點:
滿足互斥性,同一個時刻只能有一個線程可以獲取鎖
利用 redis 的 ttl 來確保不會出現(xiàn)死鎖,但同時也會帶來由于鎖過期引發(fā)的多線程同時占有鎖的問題,需要我們合理設(shè)置鎖的過期時間來避免
利用鎖的唯一性來確保不會出現(xiàn)誤刪鎖的情況
我在實際操作過程中,把調(diào)度模塊從整個采集系統(tǒng)中拆離了出來,基于Java客戶端Jredis(JRedis是一個高性能的Java客戶端,用來連接到Redis分布式哈希鍵-值數(shù)據(jù)庫。提供同步和異步)+SpringBoot,實現(xiàn)了一個獨立的服務(wù)。以便其他各個采集器,通過HTTP方式請求所要處理的采集任務(wù)。其處理過程大致如下:
采集器通過HTTP方式,向調(diào)度中心發(fā)送任務(wù)請求;
調(diào)度中心判斷鎖是否存在,如果存在則直接返回空集合;
如果不存在鎖,則對請求加鎖,然后根據(jù)信源規(guī)則獲取相應(yīng)的采集任務(wù);
返回獲取到的任務(wù)(如果沒有待處理任務(wù),則返回空),然后刪除鎖。
調(diào)度模塊的代碼實現(xiàn),大致如下所示:
public static List<Object> fetchTask(String lockKeyValue, RedisHashUtils redisHashUtils, HttpServletRequest request,
HashServiceInterface hif, ZSetServiceInterface zScoreSet, String dicName) {
List<Object> result = new ArrayList<Object>();
try {
String dicNameLock = "Dispatcher_Task_Lock";// 任務(wù)調(diào)度鎖;
if (!redisHashUtils.keyIsExit(dicNameLock, lockKeyValue)) {// 判斷鎖是否存在
// 添加鎖(把任務(wù)唯一性標(biāo)識寫入記錄);
redisHashUtils.addOneData(dicNameLock, lockKeyValue,
DateUtil.getYMDHMS());
// 處理任務(wù)邏輯
..............................................
// 刪除鎖(任務(wù)唯一性標(biāo)識);
hsdi.remove(redisHashUtils, dicNameLock, lockKeyValue);
} else {
//鎖已存在
System.out.println("正在處理任務(wù),暫時返回空集合....");
}
} catch (
Exception e) {e.printStackTrace();
}
return result;
}
在實際的操作過程中,在進(jìn)行鎖添加時,必須要給鎖加上過期時間,否則出現(xiàn)某些不可知的異常時,可能會導(dǎo)致鎖無法釋放,采集器一直無法獲取到采集任務(wù)的情況。
以上就是關(guān)于“基于Redis分布式鎖的任務(wù)調(diào)度怎么實現(xiàn)”這篇文章的內(nèi)容,相信大家都有了一定的了解,希望小編分享的內(nèi)容對大家有幫助,若想了解更多相關(guān)的知識內(nèi)容,請關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。