您好,登錄后才能下訂單哦!
這篇文章主要講解了“Java從零實(shí)現(xiàn)Redis分布式鎖的方法教程”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“Java從零實(shí)現(xiàn)Redis分布式鎖的方法教程”吧!
在 jdk 中為我們提供了加鎖的方式:
(1)synchronized 關(guān)鍵字
(2)volatile + CAS 實(shí)現(xiàn)的樂(lè)觀鎖
(3)ReadWriteLock 讀寫鎖
(4)ReenTrantLock 可重入鎖
等等,這些鎖為我們變成提供極大的便利性,保證在多線程的情況下,保證線程安全。
但是在分布式系統(tǒng)中,上面的鎖就統(tǒng)統(tǒng)沒(méi)用了。
我們想要解決分布式系統(tǒng)中的并發(fā)問(wèn)題,就需要引入分布式鎖的概念。
首先是對(duì)鎖實(shí)現(xiàn)原理的一個(gè)實(shí)現(xiàn),理論指導(dǎo)實(shí)踐,實(shí)踐完善理論。
晚上關(guān)于 redis 分布式鎖的文章一大堆,但是也都稂莠不齊。
redis 分布式鎖工具有時(shí)候中間件團(tuán)隊(duì)不見(jiàn)得會(huì)提供,提供了也不見(jiàn)得經(jīng)常維護(hù),不如自己實(shí)現(xiàn)一個(gè),知道原理,也方便修改。
為了便于和 JDK 復(fù)用,我們讓接口繼承自 jdk 的 Lock 接口。
package com.github.houbb.lock.api.core; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; /** * 鎖定義 * @author binbin.hou * @since 0.0.1 */ public interface ILock extends Lock { /** * 嘗試加鎖 * @param time 時(shí)間 * @param unit 當(dāng)為 * @param key key * @return 返回 * @throws InterruptedException 異常 * @since 0.0.1 */ boolean tryLock(long time, TimeUnit unit, String key) throws InterruptedException; /** * 嘗試加鎖 * @param key key * @return 返回 * @since 0.0.1 */ boolean tryLock(String key); /** * 解鎖 * @param key key * @since 0.0.1 */ void unlock(String key); }
方法我們只添加了三個(gè)比較常用的核心方法,作為第一個(gè)版本,簡(jiǎn)單點(diǎn)。
后續(xù)陸續(xù)添加即可。
為了便于后期添加更多的所實(shí)現(xiàn),這里首先實(shí)現(xiàn)了一個(gè)公用的抽象父類。
package com.github.houbb.lock.redis.core; import com.github.houbb.lock.api.core.ILock; import com.github.houbb.lock.redis.constant.LockRedisConst; import com.github.houbb.wait.api.IWait; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; /** * 抽象實(shí)現(xiàn) * @author binbin.hou * @since 0.0.1 */ public abstract class AbstractLockRedis implements ILock { /** * 鎖等待 * @since 0.0.1 */ private final IWait wait; protected AbstractLockRedis(IWait wait) { this.wait = wait; } @Override public void lock() { throw new UnsupportedOperationException(); } @Override public void lockInterruptibly() throws InterruptedException { throw new UnsupportedOperationException(); } @Override public boolean tryLock() { return tryLock(LockRedisConst.DEFAULT_KEY); } @Override public void unlock() { unlock(LockRedisConst.DEFAULT_KEY); } @Override public boolean tryLock(long time, TimeUnit unit, String key) throws InterruptedException { long startTimeMills = System.currentTimeMillis(); // 一次獲取,直接成功 boolean result = this.tryLock(key); if(result) { return true; } // 時(shí)間判斷 if(time <= 0) { return false; } long durationMills = unit.toMillis(time); long endMills = startTimeMills + durationMills; // 循環(huán)等待 while (System.currentTimeMillis() < endMills) { result = tryLock(key); if(result) { return true; } // 等待 10ms wait.wait(TimeUnit.MILLISECONDS, 10); } return false; } @Override public synchronized boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return tryLock(time, unit, LockRedisConst.DEFAULT_KEY); } @Override public Condition newCondition() { throw new UnsupportedOperationException(); } }
最核心的實(shí)際上是 public boolean tryLock(long time, TimeUnit unit, String key) throws InterruptedException 方法。
這個(gè)方法會(huì)調(diào)用 this.tryLock(key) 獲取鎖,如果成功,直接返回;如果不成功,則循環(huán)等待。
這里設(shè)置了超時(shí)時(shí)間,如果超時(shí),則直接返回 true。
我們實(shí)現(xiàn)的 redis 分布鎖,繼承自上面的抽象類。
package com.github.houbb.lock.redis.core; import com.github.houbb.heaven.util.lang.StringUtil; import com.github.houbb.id.api.Id; import com.github.houbb.id.core.util.IdThreadLocalHelper; import com.github.houbb.lock.redis.constant.LockRedisConst; import com.github.houbb.lock.redis.exception.LockRedisException; import com.github.houbb.lock.redis.support.operator.IOperator; import com.github.houbb.wait.api.IWait; /** * 這里是基于 redis 實(shí)現(xiàn) * * 實(shí)際上也可以基于 zk/數(shù)據(jù)庫(kù)等實(shí)現(xiàn)。 * * @author binbin.hou * @since 0.0.1 */ public class LockRedis extends AbstractLockRedis { /** * redis 操作實(shí)現(xiàn) * @since 0.0.1 */ private final IOperator redisOperator; /** * 主鍵標(biāo)識(shí) * @since 0.0.1 */ private final Id id; public LockRedis(IWait wait, IOperator redisOperator, Id id) { super(wait); this.redisOperator = redisOperator; this.id = id; } @Override public boolean tryLock(String key) { final String requestId = id.id(); IdThreadLocalHelper.put(requestId); return redisOperator.lock(key, requestId, LockRedisConst.DEFAULT_EXPIRE_MILLS); } @Override public void unlock(String key) { final String requestId = IdThreadLocalHelper.get(); if(StringUtil.isEmpty(requestId)) { String threadName = Thread.currentThread().getName(); throw new LockRedisException("Thread " + threadName +" not contains requestId"); } boolean unlock = redisOperator.unlock(key, requestId); if(!unlock) { throw new LockRedisException("Unlock key " + key + " result is failed!"); } } }
這里就是 redis 鎖的核心實(shí)現(xiàn)了,如果不太理解,建議回顧一下原理篇:
redis 分布式鎖原理詳解
加鎖部分,這里會(huì)生成一個(gè) id 標(biāo)識(shí),用于區(qū)分當(dāng)前操作者。
為了安全也設(shè)置了默認(rèn)的超時(shí)時(shí)間。
當(dāng)然這里是為了簡(jiǎn)化調(diào)用者的使用成本,開發(fā)在使用的時(shí)候只需要關(guān)心自己要加鎖的 key 即可。
當(dāng)然,甚至連加鎖的 key 都可以進(jìn)一步抽象掉,比如封裝 @DistributedLock 放在方法上,即可實(shí)現(xiàn)分布式鎖。這個(gè)后續(xù)有時(shí)間可以拓展,原理也不難。
解鎖的時(shí)候,就會(huì)獲取當(dāng)前進(jìn)程的持有標(biāo)識(shí)。
憑借當(dāng)前線程持有的 id 標(biāo)識(shí),去解鎖。
我們對(duì) redis 的操作進(jìn)行了抽象,為什么抽象呢?
因?yàn)?redis 服務(wù)種類實(shí)際很多,可以是 redis 單點(diǎn),集群,主從,哨兵。
連接的客戶端也可以很多,jedis,spring redisTemplate, codis, redisson 等等。
這里為了后期拓展方便,就對(duì)操作進(jìn)行了抽象。
定義接口如下:
package com.github.houbb.lock.redis.support.operator; /** * Redis 客戶端 * @author binbin.hou * @since 0.0.1 */ public interface IOperator { /** * 嘗試獲取分布式鎖 * * @param lockKey 鎖 * @param requestId 請(qǐng)求標(biāo)識(shí) * @param expireTimeMills 超期時(shí)間 * @return 是否獲取成功 * @since 0.0.1 */ boolean lock(String lockKey, String requestId, int expireTimeMills); /** * 解鎖 * @param lockKey 鎖 key * @param requestId 請(qǐng)求標(biāo)識(shí) * @return 結(jié)果 * @since 0.0.1 */ boolean unlock(String lockKey, String requestId); }
我們實(shí)現(xiàn)一個(gè) jedis 單點(diǎn)版本的:
package com.github.houbb.lock.redis.support.operator.impl; import com.github.houbb.lock.redis.constant.LockRedisConst; import com.github.houbb.lock.redis.support.operator.IOperator; import redis.clients.jedis.Jedis; import java.util.Collections; /** * Redis 客戶端 * @author binbin.hou * @since 0.0.1 */ public class JedisOperator implements IOperator { /** * jedis 客戶端 * @since 0.0.1 */ private final Jedis jedis; public JedisOperator(Jedis jedis) { this.jedis = jedis; } /** * 嘗試獲取分布式鎖 * * expireTimeMills 保證當(dāng)前進(jìn)程掛掉,也能釋放鎖 * * requestId 保證解鎖的是當(dāng)前進(jìn)程(鎖的持有者) * * @param lockKey 鎖 * @param requestId 請(qǐng)求標(biāo)識(shí) * @param expireTimeMills 超期時(shí)間 * @return 是否獲取成功 * @since 0.0.1 */ @Override public boolean lock(String lockKey, String requestId, int expireTimeMills) { String result = jedis.set(lockKey, requestId, LockRedisConst.SET_IF_NOT_EXIST, LockRedisConst.SET_WITH_EXPIRE_TIME, expireTimeMills); return LockRedisConst.LOCK_SUCCESS.equals(result); } /** * 解鎖 * * (1)使用 requestId,保證為當(dāng)前鎖的持有者 * (2)使用 lua 腳本,保證執(zhí)行的原子性。 * * @param lockKey 鎖 key * @param requestId 請(qǐng)求標(biāo)識(shí) * @return 結(jié)果 * @since 0.0.1 */ @Override public boolean unlock(String lockKey, String requestId) { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); return LockRedisConst.RELEASE_SUCCESS.equals(result); } }
這里時(shí)最核心的部分。
別看簡(jiǎn)單幾行代碼,需要注意的點(diǎn)還是很多的。
加鎖時(shí)附帶 requestId,用來(lái)標(biāo)識(shí)自己為鎖的持有者。
SETNX 當(dāng) key 不存在時(shí)才進(jìn)行加鎖。
設(shè)置加鎖的過(guò)期時(shí)間,避免因異常等原因未釋放鎖,導(dǎo)致鎖的長(zhǎng)時(shí)間占用。
使用 lua 腳本,保證操作的原子性。
為了證明為鎖的持有者,傳入 requestId。
<dependency> <groupId>com.github.houbb</groupId> <artifactId>lock-core</artifactId> <version>0.0.1</version> </dependency>
Jedis jedis = new Jedis("127.0.0.1", 6379); IOperator operator = new JedisOperator(jedis); // 獲取鎖 ILock lock = LockRedisBs.newInstance().operator(operator).lock(); try { boolean lockResult = lock.tryLock(); System.out.println(lockResult); // 業(yè)務(wù)處理 } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); }
到這里,一個(gè)簡(jiǎn)單版本的 redis 分布式鎖就實(shí)現(xiàn)完成了。
當(dāng)然還有很多可以改進(jìn)的地方:
(1)比如引入遞增的 sequence,避免分布式鎖中的 GC 導(dǎo)致的問(wèn)題
(2)對(duì)于更多 redis 服務(wù)端+客戶端的支持
(3)對(duì)于注解式 redis 分布式鎖的支持
感謝各位的閱讀,以上就是“Java從零實(shí)現(xiàn)Redis分布式鎖的方法教程”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)Java從零實(shí)現(xiàn)Redis分布式鎖的方法教程這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。