您好,登錄后才能下訂單哦!
這篇文章主要介紹了Redisson如何實現(xiàn)分布式鎖、鎖續(xù)約的相關(guān)知識,內(nèi)容詳細(xì)易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇Redisson如何實現(xiàn)分布式鎖、鎖續(xù)約文章都會有所收獲,下面我們一起來看看吧。
使用當(dāng)前(2022年12月初)最新的版本:3.18.1;
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.18.1</version> </dependency>
案例
案例采用redis-cluster集群的方式;
public class Main { public static void main(String[] args) throws Exception { // 1.配置Redis-Cluster集群節(jié)點的ip和port Config config = new Config(); config.useClusterServers() .addNodeAddress("redis://127.0.0.1:7001") .addNodeAddress("redis://127.0.0.1:7002") .addNodeAddress("redis://127.0.0.1:7003") .addNodeAddress("redis://127.0.0.1:7004"); // 2.創(chuàng)建Redisson的客戶端 RedissonClient redisson = Redisson.create(config); // 3.測試Redisson可重?鎖的加鎖、釋放鎖 testLock(redisson); } private static void testLock(RedissonClient redisson) throws InterruptedException { // 1.獲取key為"anyLock"的鎖對象 final RLock lock = redisson.getLock("test_lock"); boolean locked = true; try { //2.1:加鎖 lock.lock(); // 2.2:加鎖,并設(shè)置嘗試獲取鎖超時時間30s、鎖超時?動釋放的時間10s // locked = lock.tryLock(30, 10, TimeUnit.SECONDS); if (locked) System.out.println("加鎖成功!" + new Date()); Thread.sleep(20 * 1000); System.out.println("鎖邏輯執(zhí)行完畢!" + new Date()); } finally { // 3.釋放鎖 lock.unlock(); } } }
redission支持4種連接redis方式,分別為單機、主從、Sentinel、Cluster 集群;在分布式鎖的實現(xiàn)上區(qū)別在于hash槽的獲取方式。
具體配置方式見Redisson的GitHub
分布式鎖主要需要以下redis命令:
EXISTS key:當(dāng) key 存在,返回1;不存在,返回0。
GETSET key value:將給定 key 的值設(shè)為 value ,并返回 key 的舊值 (old value);當(dāng) key 存在但不是字符串類型時,返回一個錯誤;當(dāng)key不存在時,返回nil。
GET key:返回 key 所關(guān)聯(lián)的字符串值,如果 key 不存在那么返回 nil。
DEL key [KEY …]:刪除給定的一個或多個 key(不存在的 key 會被忽略),返回實際刪除的key的個數(shù)(integer)。
DEL key1 key2 key3
HSET key field value:給一個key 設(shè)置一個{field=value}的組合值,如果key沒有就直接賦值并返回1;如果field已有,那么就更新value的值,并返回0。
HEXISTS key field:當(dāng)key中存儲著field的時候返回1,如果key或者field有一個不存在返回0。
HINCRBY key field increment:將存儲在key中的哈希(Hash)對象中的指定字段field的值加上增量increment;
如果鍵key不存在,一個保存了哈希對象{field=value}的key將被創(chuàng)建;如果字段field不存在,在進行當(dāng)前操作前,feild將被創(chuàng)建,且對應(yīng)的值被置為0;返回值是increment。PEXPIRE key milliseconds:設(shè)置存活時間,單位是毫秒。EXPIRE操作單位是秒。
PUBLISH channel message:向channel post一個message內(nèi)容的消息,返回接收消息的客戶端數(shù)。
Redisson源碼中,執(zhí)行redis命令的是lua腳本,其中主要有如下幾個概念:
redis.call():執(zhí)行redis命令。
KEYS[n]:指腳本中第n個參數(shù),比如KEYS[1]指腳本中的第一個參數(shù)。
ARGV[n]:指腳本中第n個參數(shù)的值,比如ARGV[1]指腳本中的第一個參數(shù)的值。
返回值中nil與false同一個意思。
在redis執(zhí)行l(wèi)ua腳本時,相當(dāng)于一個redis級別的鎖,不能執(zhí)行其他操作,類似于原子操作,這也是redisson實現(xiàn)的一個關(guān)鍵點。
另外,如果lua腳本執(zhí)行過程中出現(xiàn)了異?;蛘遰edis服務(wù)器宕機了,會將腳本中已經(jīng)執(zhí)行的命令在AOF、RDB日志中刪除;即LUA腳本執(zhí)行報錯會進行回滾操作。
RLock接口主要繼承了Lock接口,并擴展了部分方法,比如:tryLock(long waitTime, long leaseTime, TimeUnit unit)方法中加入的leaseTime參數(shù),用來設(shè)置鎖的過期時間,如果超過leaseTime還沒有解鎖的話,redis就強制解鎖;leaseTime的默認(rèn)時間是30s。
獲取RLock對象
RLock lock = redissonClient.getLock("test_lock");
RLock對象表示?個鎖對象,我們要某一個key加鎖時,需要先獲取?個鎖對象。
這里并沒有具體請求Redis進行加鎖的邏輯,而只是調(diào)用RedissonLock的構(gòu)造函數(shù),設(shè)置一些變量。
進入到Rlock#lock()方法,先看主流程;關(guān)于競爭鎖等待時間、鎖超時釋放時間的配置、使用,在流程中穿插著聊。
0)加鎖流程圖
1)加鎖到哪臺機器
lock()方法執(zhí)行鏈路:
走到這里,已經(jīng)可以看到加鎖的底層邏輯:LUA腳本。
而lua腳本只是??串字符串,作為evalWriteAsync()?法的?個參數(shù)?已;所以下?步進到evalWriteAsync()?法中:
走到這里會調(diào)用ConnectionManager#getEntry(String)方法;
在創(chuàng)建RedissonClient時,筆者配置的是Redis-Cluster,而走到這里卻會進入到MasterSlaveConnectionManager
,實際上實例化的ConnectionManager就是RedisCluster模式下的ClusterConnectionManager,而ClusterConnectionManager
繼承自MasterSlaveConnectionManager
,并且ClusterConnectionManager
沒有重寫getEntry(String)方法,所以會進入到MasterSlaveConnectionManager
#getEntry(String)方法。
ConnectionManager#getEntry(String)方法會根據(jù)傳入的key名稱找到相應(yīng)的Redis節(jié)點、目標(biāo)master。
Redis-Cluster集群中的數(shù)據(jù)分布式是 通過?個?個的hash slot來實現(xiàn)的,Redis-Cluster集群總共16384個hash slot,它們都 會被均勻分布到所有的master節(jié)點上;這里ClusterConnectionManager通過key名稱計算出相應(yīng)的hash slot方式如下:
?先通過key計算出CRC16值,然后 CRC16值對16384進?取模,進?得到hash slot。
@Override public int calcSlot(String key) { if (key == null) { return 0; } int start = key.indexOf('{'); if (start != -1) { int end = key.indexOf('}'); if (end != -1 && start + 1 < end) { key = key.substring(start + 1, end); } } int result = CRC16.crc16(key.getBytes()) % MAX_SLOT; log.debug("slot {} for {}", result, key); return result; }
這?計算出key的hash slot之后,就可以通過hash slot 去看?看哪個master上有這個hash slot,如果某個master上有個這個hash slot,那么這個 key當(dāng)然就會落到該master節(jié)點上,執(zhí)?加鎖指令也就應(yīng)該在該master上執(zhí)?。
下面進入本文重點,可重入鎖的各種加鎖、釋放鎖。
2)Client第一次加鎖
在尋找應(yīng)該在哪臺Redis機器上加鎖時,在RedissonLock#tryLockInnerAsync()方法中我們看到了一堆LUA腳本:
LUA腳本參數(shù)解析:
KEYS[1] 表示的是 getName() ,即鎖key的名稱,比如案例中的 test_lock;
ARGV[1] 表示的是 internalLockLeaseTime 默認(rèn)值是30s;
ARGV[2] 表示的是 getLockName(threadId) ,唯一標(biāo)識當(dāng)前訪問線程,使用鎖對象id+線程id(UUID:ThreadId)方式表示,用于區(qū)分不同服務(wù)器上的線程。
UUID用來唯?標(biāo)識?個客戶端,因為會有多個客戶端的多個線程加鎖;
結(jié)合起來的UUID:ThreadId 表示:具體哪個客戶端上的哪個線程過來加鎖,通 過這樣的組合?式唯?標(biāo)識?個線程。
LUA腳本邏輯:
如果鎖名稱不存在;
則向redis中添加一個key為test_lock的HASH結(jié)構(gòu)、添加一個field為線程id,值=1的鍵值對{field:increment},表示此線程的重入次數(shù)為1;
設(shè)置test_lock的過期時間,防止當(dāng)前服務(wù)器出問題后導(dǎo)致死鎖,然后return nil; end;返回nil,lua腳本執(zhí)行完畢;
如果鎖存在,檢測當(dāng)前線程是否持有鎖;
如果是當(dāng)前線程持有鎖,hincrby將該線程重入的次數(shù)++;并重新設(shè)置鎖的過期時間;返回nil,lua腳本執(zhí)行完畢;
如果不是當(dāng)前線程持有鎖,pttl返回鎖的過期時間,單位ms。
第一次加鎖時,key肯定不存在與master節(jié)點上;
會執(zhí)行下列LUA腳本對應(yīng)的Redis指令:
hset test_lock UUID:ThreadId 1 pexpire test_lock 30000
此時,Redis中多一個Hash結(jié)構(gòu)的key(test_lock):
test_lock : { UUID:ThreadId:1 }
這里的1使用來做鎖重入的。
pexpire
指令為test_lock這個key設(shè)置過期時間為30s,即:30s后這個key會?動過期被刪除,key對應(yīng)的鎖在那時也就被釋放了。
總體來看,加鎖的邏輯很簡單:
在key對應(yīng)的hash數(shù)據(jù)結(jié)構(gòu)中記錄了? 下當(dāng)前是哪個客戶端的哪個線程過來加鎖了,然后設(shè)置了?下key的過期時間為30s。 3)加鎖成功之后的鎖續(xù)約
成功加鎖后,lua腳本返回nil,即null。
加鎖成功之后,tryLockInnerAsync()?法返回;再結(jié)合Java8的Stream,對加鎖結(jié)果進一步處理;
因為加鎖成功后返回的是nil,這是lua腳本的返回形式,體現(xiàn)到j(luò)ava代碼中的返回值為:null。
又由于RLock#lock()方法傳入的leaseTime是-1,所以進入到scheduleExpirationRenewal(long)
方法做鎖續(xù)約。
renewExpirationAsync()方法負(fù)責(zé)做具體的鎖續(xù)約:
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId)); }
這里L(fēng)UA腳本的邏輯很簡單:
判斷當(dāng)前key中,是否還被線程UUID:ThreadId持有鎖,持有則設(shè)置過期時間為30s(續(xù)命)。
鎖續(xù)約(看門狗機制)其實就是每次加鎖成功后,會?上開啟?個后臺線程, 每隔10s檢查?下key是否存在,如果存在就為key續(xù)期30s。
這里的10s,取自配置的lockWatchdogTimeout
參數(shù),默認(rèn)為30 * 1000 ms;
所以?個key往往當(dāng)過期時間慢慢消逝到20s左右時就?會被定時任務(wù)重置為了30s,這樣就能保證:只要這個定時任務(wù)還在、這個key還在,就?直維持加鎖。
如果當(dāng)前持有鎖的線程被中斷了,會停止鎖續(xù)約,即殺死看門狗;
protected void cancelExpirationRenewal(Long threadId) { ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (task == null) { return; } if (threadId != null) { task.removeThreadId(threadId); } if (threadId == null || task.hasNoThreads()) { Timeout timeout = task.getTimeout(); if (timeout != null) { timeout.cancel(); } EXPIRATION_RENEWAL_MAP.remove(getEntryName()); } }
所謂的停止鎖續(xù)約,實際就是將當(dāng)前線程的threadId從看門狗緩存中移除,后續(xù)在執(zhí)行鎖續(xù)約時,如果發(fā)現(xiàn)看門狗緩存中已經(jīng)沒有了當(dāng)前線程threadId,則直接退出鎖續(xù)約 并且 不再延時10s開啟一個定時任務(wù)。
如果加鎖時指定了leaseTime > 0,則不會開門狗機制,表示強制鎖leaseTime 毫秒后過期。一共有三種加鎖方式可以做到,如下:
RLock#lock(long leaseTime, TimeUnit unit)
RLock#tryLock(long waitTime, long leaseTime, TimeUnit unit)
RLock#lockInterruptibly(long leaseTime, TimeUnit unit)
4)重入加鎖(相同線程多次加鎖)
再次回到加鎖的LUA腳本:
同一個線程對分布式鎖多次加鎖時,會走以下邏輯:
判斷當(dāng)前key是否被當(dāng)前線程持有,如果是則增加鎖重入的次數(shù),并重新設(shè)置鎖的過期時間為30s;
對應(yīng)的Redis命令為:
hexists test_lock UUID:ThreadId hincrby test_lock UUID:ThreadId 1 pexpire test_lock 30000
此時Redis中test_key對應(yīng)的數(shù)據(jù)結(jié)構(gòu)從
test_lock : { UUID:ThreadId:1 }
變成:
test_lock : { UUID:ThreadId:2 }
并將key的過期時間重新設(shè)置為30s。
鎖重入成功之后,后臺也會開啟?個watchdog后臺線程做鎖續(xù)約,每隔10s檢查?下key,如果key存在就將key的過期時間重新設(shè)置為30s。
Redisson可重?加鎖的語義,實際是通過Hash結(jié)構(gòu)的key中某個線程(UUID:ThreadId)對應(yīng)的加鎖次數(shù)來表示的。
5)鎖競爭(其他線程加鎖失敗)
再再次回到加鎖的LUA腳本:
如果分布式鎖已經(jīng)被其他線程持有,LUA腳本會執(zhí)行以下邏輯:
返回當(dāng)前key的剩余存活時間,因為不是返回nil,也就代表著加鎖失??;
對應(yīng)的Redis的命令為:
pttl test_lock
針對加鎖方式的不同,加鎖失敗的邏輯也不同;可以分兩大類:指定了加鎖失敗的等待時間waitTime和未指定waitTime。
未執(zhí)行加鎖失敗的等待時間waitTime
:獲取分布式鎖失敗會一直重試,直到獲取鎖成功。比如下列加鎖方法:
Rlock#lock()
:一直嘗試獲取分布式鎖,直到獲取鎖成功。
RLock#lockInterruptibly(long leaseTime, TimeUnit unit)
RLock#lock(long leaseTime, TimeUnit unit)
指定了加鎖失敗的等待時間waitTime
:獲取分布式鎖會超時,超時之后返回加鎖失??;
Rlock#tryLock(long waitTime, TimeUnit unit)
:指定獲取鎖失敗的等待時間。在等待時間范圍之內(nèi)進行重試,超時則返回加鎖失敗。
Rlock#tryLock(long waitTime, long leaseTime, TimeUnit unit)
:同樣是指定獲取鎖失敗的等待時間,并且強制指定鎖過期的時間(不開啟看門狗)。在等待時間范圍之內(nèi)進行重試,超時則返回加鎖失敗。
可以簡單的概述為RLock接口下的tryLock()
方法獲取鎖會失敗,lock()
方法獲取鎖一定會成功。
1> 一直重試直到加鎖成功
以Rlock#lock()
方法為例:
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } CompletableFuture<RedissonLockEntry> future = subscribe(threadId); pubSub.timeout(future); RedissonLockEntry entry; if (interruptibly) { entry = commandExecutor.getInterrupted(future); } else { entry = commandExecutor.get(future); } try { while (true) { // lock() 或 lockInterruptibly()為入口走到這里時。leaseTime為-1,表示會開始開門狗;如果leaseTime大于0,則不會開啟開門狗; ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0) { try { // 因為Semaphore的可用資源為0,所以這里就等價于Thread.sleep(ttl); entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { entry.getLatch().acquire(); } else { entry.getLatch().acquireUninterruptibly(); } } } } finally { unsubscribe(entry, threadId); } }
首先訂閱解鎖channel(命名格式:
redisson_lock__channel:{keyName}
),其他線程解鎖后,會發(fā)布解鎖的消息;這里收到消息會立即嘗試獲取鎖;訂閱解鎖channel的超時時間默認(rèn)為7.5s。也就說獲取鎖失敗7.5s之內(nèi),如果其他線程釋放鎖,當(dāng)前線程可以立即嘗試獲取到鎖。獲取鎖失敗之后會進??個while死循環(huán)中:
每休息鎖的存活時間ttl
之后,就嘗試去獲取鎖,直到成功獲取到鎖才會跳出while死循環(huán)。
2> 等待鎖超時返回加鎖失敗
以Rlock#tryLock(long waitTime, TimeUnit unit)
為例:
@Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } // 獲取鎖剩余的等待時長 time -= System.currentTimeMillis() - current; if (time <= 0) { // 獲取鎖超時,返回獲取分布式鎖失敗 acquireFailed(waitTime, unit, threadId); return false; } current = System.currentTimeMillis(); CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); try { // 訂閱解鎖channel的超時時長為 獲取鎖剩余的等待時長 subscribeFuture.get(time, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { if (!subscribeFuture.completeExceptionally(new RedisTimeoutException( "Unable to acquire subscription lock after " + time + "ms. " + "Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) { subscribeFuture.whenComplete((res, ex) -> { if (ex == null) { unsubscribe(res, threadId); } }); } acquireFailed(waitTime, unit, threadId); return false; } catch (ExecutionException e) { acquireFailed(waitTime, unit, threadId); return false; } try { // 收到解鎖channel的消息之后,走到這里,再次判斷獲取鎖等待時長是否超時 time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } // while循環(huán)中嘗試去獲取鎖 while (true) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } // waiting for message currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { // 如果獲取鎖失敗后,鎖存活時長 小于 剩余鎖等待時長,則線程睡眠 鎖存活時長 commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { // 如果獲取鎖失敗后,鎖存活時間 大于等于 剩余鎖等待時長,則線程睡眠 鎖等待時長 commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } } } finally { unsubscribe(commandExecutor.getNow(subscribeFuture), threadId); } }
加鎖存在超時時間 相比于 一直重試直到加鎖成功,只是多一個時間限制,具體差異體現(xiàn)在:訂閱解鎖channel的超時時長、獲取鎖失敗后線程的睡眠時長、重試獲取鎖次數(shù)的限制;
獲取分布式鎖失敗之后,立即判斷當(dāng)前獲取鎖是否超時,如果超時,則返回加鎖失敗;
否者,訂閱解鎖channel(命名格式:redisson_lock__channel:{keyName}
),其他線程解鎖后,會發(fā)布解鎖的消息;
訂閱解鎖channel的超時時間為 獲取鎖剩余的等待時長
。 在這個時間范圍之內(nèi),如果其他線程釋放鎖,當(dāng)前線程收到解鎖channel的消息之后再次判斷獲取鎖是否超時,如果不超時,嘗試獲取鎖。
獲取鎖之后會進??個while死循環(huán)中: 如果獲取鎖超時,則返回加鎖失?。?/blockquote>否者讓線程睡眠: 如果鎖存活時長ttl
小于 剩余鎖等待時長,則線程睡眠 鎖存活時長;如果鎖存活時間ttl
大于等于 剩余鎖等待時長,則線程睡眠 鎖等待時長;線程睡眠完之后,判斷獲取鎖是否超時,不超時則嘗試去獲取鎖。3、釋放鎖流程
1)Client主動嘗試釋放鎖
進入到Rlock#unlock()方法;
和加鎖的方式?樣,釋放鎖也是通過lua腳本來完成的;
LUA腳本參數(shù)解析:
KEYS[1] 表示的是 getName() ,代表的是鎖名 test_lock;
KEYS[2] 表示getChanelName() 表示的是發(fā)布訂閱過程中使用的Chanel;
ARGV[1] 表示的是LockPubSub.unLockMessage,解鎖消息,實際代表的是數(shù)字 0,代表解鎖消息;
ARGV[2] 表示的是internalLockLeaseTime 默認(rèn)的有效時間 30s;
ARGV[3] 表示的是 getLockName(thread.currentThread().getId()) 代表的是 UUID:ThreadId 用鎖對象id+線程id, 表示當(dāng)前訪問線程,用于區(qū)分不同服務(wù)器上的線程。
LUA腳本邏輯:
如果鎖名稱不存在;
可能是因為鎖過期導(dǎo)致鎖不存在,也可能是并發(fā)解鎖。
則發(fā)布鎖解除的消息,返回1,lua腳本執(zhí)行完畢;
如果鎖存在,檢測當(dāng)前線程是否持有鎖;
如果是當(dāng)前線程持有鎖,定義變量counter,接收執(zhí)行incrby將該線程重入的次數(shù)–的結(jié)果;
如果重入次數(shù)大于0,表示該線程還有其他任務(wù)需要執(zhí)行;重新設(shè)置鎖的過期時間;返回0,lua腳本執(zhí)行完畢;
否則表示該線程執(zhí)行結(jié)束,del刪除該鎖;并且publish發(fā)布該鎖解除的消息;返回1,lua腳本執(zhí)行完畢;
如果不是當(dāng)前線程持有鎖 或 其他情況,都返回nil,lua腳本執(zhí)行完畢。
腳本執(zhí)行結(jié)束之后,如果返回值不是0或1,即當(dāng)前線程去釋放其他線程的加鎖時,拋出異常。
通過LUA腳本釋放鎖成功之后,會將看門狗殺死;
2)Client主動強制釋放鎖
forceUnlockAsync()
方法被調(diào)用的地方很多,大多都是在清理資源時刪除鎖。@Override public RFuture<Boolean> forceUnlockAsync() { cancelExpirationRenewal(null); return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('del', KEYS[1]) == 1) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1 " + "else " + "return 0 " + "end", Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE); }LUA腳本邏輯:
邏輯比較簡單粗暴:刪除鎖成功則并發(fā)布鎖被刪除的消息,返回1結(jié)束,否則返回0結(jié)束。
3)Client宕機,鎖超時釋放
如果Redisson客戶端剛加鎖成功,并且未指定releaseTime,后臺會啟動一個定時任務(wù)watchdog每隔10s檢查key:key如果存在就為它?動續(xù)命到30s;在watchdog定時任務(wù)存在的情況下,如果不是主動釋放鎖,那么key將會?直的被watchdog這個定時任務(wù)維持加鎖。
但是如果客戶端宕機了,定時任務(wù)watchdog也就沒了,也就沒有鎖續(xù)約機制了,那么過完30s之后,key會?動被刪除、key對應(yīng)的鎖也自動被釋放了。
4)不啟動鎖續(xù)約的超時釋放鎖
如果在加鎖時指定了leaseTime,加鎖成功之后,后臺并不會啟動一個定時任務(wù)watchdog做鎖續(xù)約;key存活leaseTime 毫秒之后便會自動被刪除、key對應(yīng)的鎖也就自動被釋放了;無論當(dāng)前線程的業(yè)務(wù)邏輯是否執(zhí)行完畢。
比如使用如下方式加鎖:
RLock#lock(long leaseTime, TimeUnit unit)
RLock#tryLock(long waitTime, long leaseTime, TimeUnit unit)
RLock#lockInterruptibly(long leaseTime, TimeUnit unit)
關(guān)于“Redisson如何實現(xiàn)分布式鎖、鎖續(xù)約”這篇文章的內(nèi)容就介紹到這里,感謝各位的閱讀!相信大家對“Redisson如何實現(xiàn)分布式鎖、鎖續(xù)約”知識都有一定的了解,大家如果還想學(xué)習(xí)更多知識,歡迎關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。