您好,登錄后才能下訂單哦!
這篇文章將為大家詳細(xì)講解有關(guān)Redis分布式鎖該怎么實現(xiàn)續(xù)期,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。
據(jù)肥朝了解,很多同學(xué)在用分布式鎖時,都是直接百度搜索找一個Redis分布式鎖工具類就直接用了.關(guān)鍵是該工具類中還充斥著很多System.out.println();等語句.其實Redis分布式鎖比較正確的姿勢是采用redisson這個客戶端工具.具體介紹可以搜索最大的同性交友網(wǎng)站github.
首先如果你之前用Redis的分布式鎖的姿勢正確,并且看過相應(yīng)的官方文檔的話,這個問題So easy.我們來看
坦白說,如果你英文棒棒噠那么看英文文檔可能更好理解
By default lock watchdog timeout is 30 seconds and can be changed through Config.lockWatchdogTimeout setting.
但是你如果看的是中文文檔
看門狗檢查鎖的超時時間默認(rèn)是30秒
這句話肥朝從語文角度分析就是一個歧義句,他有兩個意思
1.看門狗默認(rèn)30秒去檢查一次鎖的超時時間
2.看們狗會去檢查鎖的超時時間,鎖的時間時間默認(rèn)是30秒
看到這里,我希望大家不要黑我的小學(xué)體育老師,雖然他和語文老師是同個人.語文不行,我們可以源碼來湊!
我們根據(jù)官方文檔給出的例子,寫了一個最簡單的demo,例子根據(jù)上面截圖中Ctr+C和Ctr+V一波操作,如下
public class DemoMain { public static void main(String[] args) throws Exception { Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379"); RedissonClient redisson = Redisson.create(config); RLock lock = redisson.getLock("anyLock"); lock.lock(); //lock.unlock(); } }
create
從這里我們知道,internalLockLeaseTime 和 lockWatchdogTimeout這兩個參數(shù)是相等的.
lockWatchdogTimeout默認(rèn)值如下
public class Config { private long lockWatchdogTimeout = 30 * 1000; public long getLockWatchdogTimeout() { return lockWatchdogTimeout; } //省略無關(guān)代碼 }
從internalLockLeaseTime這個單詞也可以看出,這個加的分布式鎖的超時時間默認(rèn)是30秒.但是還有一個問題,那就是這個看門狗,多久來延長一次有效期呢?我們往下看
lock
從我圖中框起來的地方我們就知道了,獲取鎖成功就會開啟一個定時任務(wù),也就是watchdog,定時任務(wù)會定期檢查去續(xù)期renewExpirationAsync(threadId).
這里定時用的是netty-common包中的HashedWheelTimer,肥朝公眾號已經(jīng)和各大搜索引擎建立了密切的合作關(guān)系,你只需要把這個類在任何搜索引擎一搜,都能知道相關(guān)API參數(shù)的意義.
從圖中我們明白,該定時調(diào)度每次調(diào)用的時間差是internalLockLeaseTime / 3.也就10秒.
通過源碼分析我們知道,默認(rèn)情況下,加鎖的時間是30秒.如果加鎖的業(yè)務(wù)沒有執(zhí)行完,那么到 30-10 = 20秒的時候,就會進(jìn)行一次續(xù)期,把鎖重置成30秒.那這個時候可能又有同學(xué)問了,那業(yè)務(wù)的機(jī)器萬一宕機(jī)了呢?宕機(jī)了定時任務(wù)跑不了,就續(xù)不了期,那自然30秒之后鎖就解開了唄.
這種情況是一種低級錯誤,就是我上邊犯的錯,由于當(dāng)前線程 獲取到redis 鎖,處理完業(yè)務(wù)后未及時釋放鎖,導(dǎo)致其它線程會一直嘗試獲取鎖阻塞,例如:用Jedis客戶端會報如下的錯誤信息
redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool
redis線程池已經(jīng)沒有空閑線程來處理客戶端命令。
解決的方法也很簡單,只要我們細(xì)心一點,拿到鎖的線程處理完業(yè)務(wù)及時釋放鎖,如果是重入鎖未拿到鎖后,線程可以釋放當(dāng)前連接并且sleep一段時間。
public void lock() { while (true) { boolean flag = this.getLock(key); if (flag) { TODO ......... } else { // 釋放當(dāng)前redis連接 redis.close(); // 休眠1000毫秒 sleep(1000); } } }
我們知道Redis實現(xiàn)鎖的原理在于 SETNX命令。當(dāng) key不存在時將 key的值設(shè)為 value ,返回值為 1;若給定的 key已經(jīng)存在,則 SETNX不做任何動作,返回值為 0 。
SETNX key value
我們來設(shè)想一下這個場景:A、B兩個線程來嘗試給key myLock加鎖,A線程先拿到鎖(假如鎖3秒后過期),B線程就在等待嘗試獲取鎖,到這一點毛病沒有。
那如果此時業(yè)務(wù)邏輯比較耗時,執(zhí)行時間已經(jīng)超過redis鎖過期時間,這時A線程的鎖自動釋放(刪除key),B線程檢測到myLock這個key不存在,執(zhí)行 SETNX命令也拿到了鎖。
但是,此時A線程執(zhí)行完業(yè)務(wù)邏輯之后,還是會去釋放鎖(刪除key),這就導(dǎo)致B線程的鎖被A線程給釋放了。
為避免上邊的情況,一般我們在每個線程加鎖時要帶上自己獨有的value值來標(biāo)識,只釋放指定value的key,否則就會出現(xiàn)釋放鎖混亂的場景。
emm~ 聊redis鎖咋還扯到數(shù)據(jù)庫事務(wù)上來了?別著急往下看,看下邊這段代碼:
@Transaction public void lock() { while (true) { boolean flag = this.getLock(key); if (flag) { insert(); } } }
給這個方法添加一個@Transaction注解開啟事務(wù),如代碼中拋出異常進(jìn)行回滾,要知道數(shù)據(jù)庫事務(wù)可是有超時時間限制的,并不會無條件的一直等一個耗時的數(shù)據(jù)庫操作。
比如:我們解析一個大文件,再將數(shù)據(jù)存入到數(shù)據(jù)庫,如果執(zhí)行時間太長,就會導(dǎo)致事務(wù)超時自動回滾。
一旦你的key長時間獲取不到鎖,獲取鎖等待的時間遠(yuǎn)超過數(shù)據(jù)庫事務(wù)超時時間,程序就會報異常。
一般為解決這種問題,我們就需要將數(shù)據(jù)庫事務(wù)改為手動提交、回滾事務(wù)。
@Autowired DataSourceTransactionManager dataSourceTransactionManager; @Transaction public void lock() { //手動開啟事務(wù) TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); try { while (true) { boolean flag = this.getLock(key); if (flag) { insert(); //手動提交事務(wù) dataSourceTransactionManager.commit(transactionStatus); } } } catch (Exception e) { //手動回滾事務(wù) dataSourceTransactionManager.rollback(transactionStatus); } }
這種情況和我們上邊提到的第二種比較類似,但解決思路上略有不同。
同樣是redis分布式鎖過期,而業(yè)務(wù)邏輯沒執(zhí)行完的場景,不過,這里換一種思路想問題,把redis鎖的過期時間再弄長點不就解決了嗎?
那還是有問題,我們可以在加鎖的時候,手動調(diào)長redis鎖的過期時間,可這個時間多長合適?業(yè)務(wù)邏輯的執(zhí)行時間是不可控的,調(diào)的過長又會影響操作性能。
要是redis鎖的過期時間能夠自動續(xù)期就好了。
為了解決這個問題我們使用redis客戶端redisson,redisson很好的解決了redis在分布式環(huán)境下的一些棘手問題,它的宗旨就是讓使用者減少對Redis的關(guān)注,將更多精力用在處理業(yè)務(wù)邏輯上。
redisson對分布式鎖做了很好封裝,只需調(diào)用API即可。
RLock lock = redissonClient.getLock("stockLock");
redisson在加鎖成功后,會注冊一個定時任務(wù)監(jiān)聽這個鎖,每隔10秒就去查看這個鎖,如果還持有鎖,就對過期時間進(jìn)行續(xù)期。默認(rèn)過期時間30秒。這個機(jī)制也被叫做:“看門狗”,這名字。。。
舉例子:假如加鎖的時間是30秒,過10秒檢查一次,一旦加鎖的業(yè)務(wù)沒有執(zhí)行完,就會進(jìn)行一次續(xù)期,把鎖的過期時間再次重置成30秒。
通過分析下邊redisson的源碼實現(xiàn)可以發(fā)現(xiàn),不管是加鎖、解鎖、續(xù)約都是客戶端把一些復(fù)雜的業(yè)務(wù)邏輯,通過封裝在Lua腳本中發(fā)送給redis,保證這段復(fù)雜業(yè)務(wù)邏輯執(zhí)行的原子性。
@Slf4j @Service public class RedisDistributionLockPlus { /** * 加鎖超時時間,單位毫秒, 即:加鎖時間內(nèi)執(zhí)行完操作,如果未完成會有并發(fā)現(xiàn)象 */ private static final long DEFAULT_LOCK_TIMEOUT = 30; private static final long TIME_SECONDS_FIVE = 5 ; /** * 每個key的過期時間 {@link LockContent} */ private Map<String, LockContent> lockContentMap = new ConcurrentHashMap<>(512); /** * redis執(zhí)行成功的返回 */ private static final Long EXEC_SUCCESS = 1L; /** * 獲取鎖lua腳本, k1:獲鎖key, k2:續(xù)約耗時key, arg1:requestId,arg2:超時時間 */ private static final String LOCK_SCRIPT = "if redis.call('exists', KEYS[2]) == 1 then ARGV[2] = math.floor(redis.call('get', KEYS[2]) + 10) end " + "if redis.call('exists', KEYS[1]) == 0 then " + "local t = redis.call('set', KEYS[1], ARGV[1], 'EX', ARGV[2]) " + "for k, v in pairs(t) do " + "if v == 'OK' then return tonumber(ARGV[2]) end " + "end " + "return 0 end"; /** * 釋放鎖lua腳本, k1:獲鎖key, k2:續(xù)約耗時key, arg1:requestId,arg2:業(yè)務(wù)耗時 arg3: 業(yè)務(wù)開始設(shè)置的timeout */ private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then " + "local ctime = tonumber(ARGV[2]) " + "local biz_timeout = tonumber(ARGV[3]) " + "if ctime > 0 then " + "if redis.call('exists', KEYS[2]) == 1 then " + "local avg_time = redis.call('get', KEYS[2]) " + "avg_time = (tonumber(avg_time) * 8 + ctime * 2)/10 " + "if avg_time >= biz_timeout - 5 then redis.call('set', KEYS[2], avg_time, 'EX', 24*60*60) " + "else redis.call('del', KEYS[2]) end " + "elseif ctime > biz_timeout -5 then redis.call('set', KEYS[2], ARGV[2], 'EX', 24*60*60) end " + "end " + "return redis.call('del', KEYS[1]) " + "else return 0 end"; /** * 續(xù)約lua腳本 */ private static final String RENEW_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end"; private final StringRedisTemplate redisTemplate; public RedisDistributionLockPlus(StringRedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; ScheduleTask task = new ScheduleTask(this, lockContentMap); // 啟動定時任務(wù) ScheduleExecutor.schedule(task, 1, 1, TimeUnit.SECONDS); } /** * 加鎖 * 取到鎖加鎖,取不到鎖一直等待知道獲得鎖 * * @param lockKey * @param requestId 全局唯一 * @param expire 鎖過期時間, 單位秒 * @return */ public boolean lock(String lockKey, String requestId, long expire) { log.info("開始執(zhí)行加鎖, lockKey ={}, requestId={}", lockKey, requestId); for (; ; ) { // 判斷是否已經(jīng)有線程持有鎖,減少redis的壓力 LockContent lockContentOld = lockContentMap.get(lockKey); boolean unLocked = null == lockContentOld; // 如果沒有被鎖,就獲取鎖 if (unLocked) { long startTime = System.currentTimeMillis(); // 計算超時時間 long bizExpire = expire == 0L ? DEFAULT_LOCK_TIMEOUT : expire; String lockKeyRenew = lockKey + "_renew"; RedisScript<Long> script = RedisScript.of(LOCK_SCRIPT, Long.class); List<String> keys = new ArrayList<>(); keys.add(lockKey); keys.add(lockKeyRenew); Long lockExpire = redisTemplate.execute(script, keys, requestId, Long.toString(bizExpire)); if (null != lockExpire && lockExpire > 0) { // 將鎖放入map LockContent lockContent = new LockContent(); lockContent.setStartTime(startTime); lockContent.setLockExpire(lockExpire); lockContent.setExpireTime(startTime + lockExpire * 1000); lockContent.setRequestId(requestId); lockContent.setThread(Thread.currentThread()); lockContent.setBizExpire(bizExpire); lockContent.setLockCount(1); lockContentMap.put(lockKey, lockContent); log.info("加鎖成功, lockKey ={}, requestId={}", lockKey, requestId); return true; } } // 重復(fù)獲取鎖,在線程池中由于線程復(fù)用,線程相等并不能確定是該線程的鎖 if (Thread.currentThread() == lockContentOld.getThread() && requestId.equals(lockContentOld.getRequestId())){ // 計數(shù) +1 lockContentOld.setLockCount(lockContentOld.getLockCount()+1); return true; } // 如果被鎖或獲取鎖失敗,則等待100毫秒 try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { // 這里用lombok 有問題 log.error("獲取redis 鎖失敗, lockKey ={}, requestId={}", lockKey, requestId, e); return false; } } } /** * 解鎖 * * @param lockKey * @param lockValue */ public boolean unlock(String lockKey, String lockValue) { String lockKeyRenew = lockKey + "_renew"; LockContent lockContent = lockContentMap.get(lockKey); long consumeTime; if (null == lockContent) { consumeTime = 0L; } else if (lockValue.equals(lockContent.getRequestId())) { int lockCount = lockContent.getLockCount(); // 每次釋放鎖, 計數(shù) -1,減到0時刪除redis上的key if (--lockCount > 0) { lockContent.setLockCount(lockCount); return false; } consumeTime = (System.currentTimeMillis() - lockContent.getStartTime()) / 1000; } else { log.info("釋放鎖失敗,不是自己的鎖。"); return false; } // 刪除已完成key,先刪除本地緩存,減少redis壓力, 分布式鎖,只有一個,所以這里不加鎖 lockContentMap.remove(lockKey); RedisScript<Long> script = RedisScript.of(UNLOCK_SCRIPT, Long.class); List<String> keys = new ArrayList<>(); keys.add(lockKey); keys.add(lockKeyRenew); Long result = redisTemplate.execute(script, keys, lockValue, Long.toString(consumeTime), Long.toString(lockContent.getBizExpire())); return EXEC_SUCCESS.equals(result); } /** * 續(xù)約 * * @param lockKey * @param lockContent * @return true:續(xù)約成功,false:續(xù)約失敗(1、續(xù)約期間執(zhí)行完成,鎖被釋放 2、不是自己的鎖,3、續(xù)約期間鎖過期了(未解決)) */ public boolean renew(String lockKey, LockContent lockContent) { // 檢測執(zhí)行業(yè)務(wù)線程的狀態(tài) Thread.State state = lockContent.getThread().getState(); if (Thread.State.TERMINATED == state) { log.info("執(zhí)行業(yè)務(wù)的線程已終止,不再續(xù)約 lockKey ={}, lockContent={}", lockKey, lockContent); return false; } String requestId = lockContent.getRequestId(); long timeOut = (lockContent.getExpireTime() - lockContent.getStartTime()) / 1000; RedisScript<Long> script = RedisScript.of(RENEW_SCRIPT, Long.class); List<String> keys = new ArrayList<>(); keys.add(lockKey); Long result = redisTemplate.execute(script, keys, requestId, Long.toString(timeOut)); log.info("續(xù)約結(jié)果,True成功,F(xiàn)alse失敗 lockKey ={}, result={}", lockKey, EXEC_SUCCESS.equals(result)); return EXEC_SUCCESS.equals(result); } static class ScheduleExecutor { public static void schedule(ScheduleTask task, long initialDelay, long period, TimeUnit unit) { long delay = unit.toMillis(initialDelay); long period_ = unit.toMillis(period); // 定時執(zhí)行 new Timer("Lock-Renew-Task").schedule(task, delay, period_); } } static class ScheduleTask extends TimerTask { private final RedisDistributionLockPlus redisDistributionLock; private final Map<String, LockContent> lockContentMap; public ScheduleTask(RedisDistributionLockPlus redisDistributionLock, Map<String, LockContent> lockContentMap) { this.redisDistributionLock = redisDistributionLock; this.lockContentMap = lockContentMap; } @Override public void run() { if (lockContentMap.isEmpty()) { return; } Set<Map.Entry<String, LockContent>> entries = lockContentMap.entrySet(); for (Map.Entry<String, LockContent> entry : entries) { String lockKey = entry.getKey(); LockContent lockContent = entry.getValue(); long expireTime = lockContent.getExpireTime(); // 減少線程池中任務(wù)數(shù)量 if ((expireTime - System.currentTimeMillis())/ 1000 < TIME_SECONDS_FIVE) { //線程池異步續(xù)約 ThreadPool.submit(() -> { boolean renew = redisDistributionLock.renew(lockKey, lockContent); if (renew) { long expireTimeNew = lockContent.getStartTime() + (expireTime - lockContent.getStartTime()) * 2 - TIME_SECONDS_FIVE * 1000; lockContent.setExpireTime(expireTimeNew); } else { // 續(xù)約失敗,說明已經(jīng)執(zhí)行完 OR redis 出現(xiàn)問題 lockContentMap.remove(lockKey); } }); } } } } }
redis高可用最常見的方案就是主從復(fù)制(master-slave),這種模式也給redis分布式鎖挖了一坑。
redis cluster集群環(huán)境下,假如現(xiàn)在A客戶端想要加鎖,它會根據(jù)路由規(guī)則選擇一臺master節(jié)點寫入key mylock,在加鎖成功后,master節(jié)點會把key異步復(fù)制給對應(yīng)的slave節(jié)點。
如果此時redis master節(jié)點宕機(jī),為保證集群可用性,會進(jìn)行主備切換,slave變?yōu)榱藃edis master。B客戶端在新的master節(jié)點上加鎖成功,而A客戶端也以為自己還是成功加了鎖的。
此時就會導(dǎo)致同一時間內(nèi)多個客戶端對一個分布式鎖完成了加鎖,導(dǎo)致各種臟數(shù)據(jù)的產(chǎn)生。
至于解決辦法嘛,目前看還沒有什么根治的方法,只能盡量保證機(jī)器的穩(wěn)定性,減少發(fā)生此事件的概率。
小結(jié)一下:上面就是我在使用Redis 分布式鎖時遇到的一些坑,有點小感慨,經(jīng)常用一個方法填上這個坑,沒多久就發(fā)現(xiàn)另一個坑又出來了,其實根本沒有什么十全十美的解決方案,哪有什么銀彈,只不過是在權(quán)衡利弊后,選一個在接受范圍內(nèi)的折中方案而已。
關(guān)于Redis分布式鎖該怎么實現(xiàn)續(xù)期就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。