您好,登錄后才能下訂單哦!
這篇文章將為大家詳細(xì)講解有關(guān)SpringBoot整合Redisson如何實(shí)現(xiàn)分布式鎖,小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。
Redisson是架設(shè)在redis基礎(chǔ)上的一個(gè)Java駐內(nèi)存數(shù)據(jù)網(wǎng)格(In-Memory Data Grid)。充分的利用了Redis鍵值數(shù)據(jù)庫(kù)提供的一系列優(yōu)勢(shì),基于Java實(shí)用工具包中常用接口,為使用者提供了一系列具有分布式特性的常用工具類。使得原本作為協(xié)調(diào)單機(jī)多線程并發(fā)程序的工具包獲得了協(xié)調(diào)分布式多機(jī)多線程并發(fā)系統(tǒng)的能力,大大降低了設(shè)計(jì)和研發(fā)大規(guī)模分布式系統(tǒng)的難度。同時(shí)結(jié)合各富特色的分布式服務(wù),更進(jìn)一步簡(jiǎn)化了分布式環(huán)境中程序相互之間的協(xié)作。
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- springboot整合redis --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- springboot整合redisson --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.13.6</version> </dependency> </dependencies>
server: port: 8000 spring: redis: host: localhost port: 6379 password: null database: 1 timeout: 30000
@Configuration public class MyRedissonConfig { @Value("${spring.redis.host}") String redisHost; @Value("${spring.redis.port}") String redisPort; @Value("${spring.redis.password}") String redisPassword; @Value("${spring.redis.timeout}") Integer redisTimeout; /** * Redisson配置 * @return */ @Bean RedissonClient redissonClient() { //1、創(chuàng)建配置 Config config = new Config(); redisHost = redisHost.startsWith("redis://") ? redisHost : "redis://" + redisHost; SingleServerConfig serverConfig = config.useSingleServer() .setAddress(redisHost + ":" + redisPort) .setTimeout(redisTimeout); if (StringUtils.isNotBlank(redisPassword)) { serverConfig.setPassword(redisPassword); } return Redisson.create(config); } }
//單機(jī) RedissonClient redisson = Redisson.create(); Config config = new Config(); config.useSingleServer().setAddress("myredisserver:6379"); RedissonClient redisson = Redisson.create(config); //主從 Config config = new Config(); config.useMasterSlaveServers() .setMasterAddress("127.0.0.1:6379") .addSlaveAddress("127.0.0.1:6389", "127.0.0.1:6332", "127.0.0.1:6419") .addSlaveAddress("127.0.0.1:6399"); RedissonClient redisson = Redisson.create(config); //哨兵 Config config = new Config(); config.useSentinelServers() .setMasterName("mymaster") .addSentinelAddress("127.0.0.1:26389", "127.0.0.1:26379") .addSentinelAddress("127.0.0.1:26319"); RedissonClient redisson = Redisson.create(config); //集群 Config config = new Config(); config.useClusterServers() .setScanInterval(2000) // cluster state scan interval in milliseconds .addNodeAddress("127.0.0.1:7000", "127.0.0.1:7001") .addNodeAddress("127.0.0.1:7002"); RedissonClient redisson = Redisson.create(config);
基于Redis的Redisson分布式可重入鎖RLock對(duì)象實(shí)現(xiàn)了java.util.concurrent.locks.Lock接口。
@RequestMapping("/redisson") public String testRedisson(){ //獲取分布式鎖,只要鎖的名字一樣,就是同一把鎖 RLock lock = redissonClient.getLock("lock"); //加鎖(阻塞等待),默認(rèn)過期時(shí)間是無(wú)限期 lock.lock(); try{ //如果業(yè)務(wù)執(zhí)行過長(zhǎng),Redisson會(huì)自動(dòng)給鎖續(xù)期 Thread.sleep(1000); System.out.println("加鎖成功,執(zhí)行業(yè)務(wù)邏輯"); } catch (InterruptedException e) { e.printStackTrace(); } finally { //解鎖,如果業(yè)務(wù)執(zhí)行完成,就不會(huì)繼續(xù)續(xù)期 lock.unlock(); } return "Hello Redisson!"; }
如果拿到分布式鎖的節(jié)點(diǎn)宕機(jī),且這個(gè)鎖正好處于鎖住的狀態(tài)時(shí),會(huì)出現(xiàn)鎖死的狀態(tài),為了避免這種情況的發(fā)生,鎖都會(huì)設(shè)置一個(gè)過期時(shí)間。這樣也存在一個(gè)問題,一個(gè)線程拿到了鎖設(shè)置了30s超時(shí),在30s后這個(gè)線程還沒有執(zhí)行完畢,鎖超時(shí)釋放了,就會(huì)導(dǎo)致問題,Redisson給出了自己的答案,就是 watch dog 自動(dòng)延期機(jī)制。
Redisson提供了一個(gè)監(jiān)控鎖的看門狗,它的作用是在Redisson實(shí)例被關(guān)閉前,不斷的延長(zhǎng)鎖的有效期,也就是說,如果一個(gè)拿到鎖的線程一直沒有完成邏輯,那么看門狗會(huì)幫助線程不斷的延長(zhǎng)鎖超時(shí)時(shí)間,鎖不會(huì)因?yàn)槌瑫r(shí)而被釋放。
默認(rèn)情況下,看門狗的續(xù)期時(shí)間是30s,也可以通過修改Config.lockWatchdogTimeout來(lái)另行指定。
另外Redisson 還提供了可以指定leaseTime參數(shù)的加鎖方法來(lái)指定加鎖的時(shí)間。超過這個(gè)時(shí)間后鎖便自動(dòng)解開了,不會(huì)延長(zhǎng)鎖的有效期。
在RedissonLock類的renewExpiration()方法中,會(huì)啟動(dòng)一個(gè)定時(shí)任務(wù)每隔30/3=10秒給鎖續(xù)期。如果業(yè)務(wù)執(zhí)行期間,應(yīng)用掛了,那么不會(huì)自動(dòng)續(xù)期,到過期時(shí)間之后,鎖會(huì)自動(dòng)釋放。
private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getName() + " expiration", e); return; } if (res) { // reschedule itself renewExpiration(); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task); }
另外Redisson還提供了leaseTime的參數(shù)來(lái)指定加鎖的時(shí)間。超過這個(gè)時(shí)間后鎖便自動(dòng)解開了。
// 加鎖以后10秒鐘自動(dòng)解鎖 // 無(wú)需調(diào)用unlock方法手動(dòng)解鎖 lock.lock(10, TimeUnit.SECONDS); // 嘗試加鎖,最多等待100秒,上鎖以后10秒自動(dòng)解鎖 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
如果指定了鎖的超時(shí)時(shí)間,底層直接調(diào)用lua腳本,進(jìn)行占鎖。如果超過leaseTime,業(yè)務(wù)邏輯還沒有執(zhí)行完成,則直接釋放鎖,所以在指定leaseTime時(shí),要讓leaseTime大于業(yè)務(wù)執(zhí)行時(shí)間。RedissonLock類的tryLockInnerAsync()方法
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
分布式可重入讀寫鎖允許同時(shí)有多個(gè)讀鎖和一個(gè)寫鎖處于加鎖狀態(tài)。在讀寫鎖中,讀讀共享、讀寫互斥、寫寫互斥。
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock"); // 最常見的使用方法 rwlock.readLock().lock(); // 或 rwlock.writeLock().lock();
讀寫鎖測(cè)試類,當(dāng)訪問write接口時(shí),read接口會(huì)被阻塞住。
@RestController public class TestController { @Autowired RedissonClient redissonClient; @Autowired StringRedisTemplate redisTemplate; @RequestMapping("/write") public String write(){ RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("wr-lock"); RLock writeLock = readWriteLock.writeLock(); String s = UUID.randomUUID().toString(); writeLock.lock(); try { redisTemplate.opsForValue().set("wr-lock-key", s); Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); }finally { writeLock.unlock(); } return s; } @RequestMapping("/read") public String read(){ RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("wr-lock"); RLock readLock = readWriteLock.readLock(); String s = ""; readLock.lock(); try { s = redisTemplate.opsForValue().get("wr-lock-key"); } finally { readLock.unlock(); } return s; } }
基于Redis的Redisson的分布式信號(hào)量(Semaphore)Java對(duì)象RSemaphore
采用了與java.util.concurrent.Semaphore
類似的接口和用法
關(guān)于信號(hào)量的使用你們能夠想象一下這個(gè)場(chǎng)景,有三個(gè)停車位,當(dāng)三個(gè)停車位滿了后,其余車就不停了。能夠把車位比做信號(hào),如今有三個(gè)信號(hào),停一次車,用掉一個(gè)信號(hào),車離開就是釋放一個(gè)信號(hào)。
咱們用 Redisson 來(lái)演示上述停車位的場(chǎng)景。
先定義一個(gè)占用停車位的方法:
/** * 停車,占用停車位 * 總共 3 個(gè)車位 */ @ResponseBody @RequestMapping("park") public String park() throws InterruptedException { // 獲取信號(hào)量(停車場(chǎng)) RSemaphore park = redisson.getSemaphore("park"); // 獲取一個(gè)信號(hào)(停車位) park.acquire(); return "OK"; }
再定義一個(gè)離開車位的方法:
/** * 釋放車位 * 總共 3 個(gè)車位 */ @ResponseBody @RequestMapping("leave") public String leave() throws InterruptedException { // 獲取信號(hào)量(停車場(chǎng)) RSemaphore park = redisson.getSemaphore("park"); // 釋放一個(gè)信號(hào)(停車位) park.release(); return "OK"; }
為了簡(jiǎn)便,我用 Redis 客戶端添加了一個(gè) key:“park”,值等于 3,表明信號(hào)量為 park,總共有三個(gè)值。
而后用 postman 發(fā)送 park 請(qǐng)求占用一個(gè)停車位。
而后在 redis 客戶端查看 park 的值,發(fā)現(xiàn)已經(jīng)改成 2 了。繼續(xù)調(diào)用兩次,發(fā)現(xiàn) park 的等于 0,當(dāng)調(diào)用第四次的時(shí)候,會(huì)發(fā)現(xiàn)請(qǐng)求一直處于等待中
,說明車位不夠了。若是想要不阻塞,能夠用 tryAcquire 或 tryAcquireAsync。
咱們?cè)僬{(diào)用離開車位的方法,park 的值變?yōu)榱?1,表明車位剩余 1 個(gè)。
注意:屢次執(zhí)行釋放信號(hào)量操做,剩余信號(hào)量會(huì)一直增長(zhǎng),而不是到 3 后就封頂了。
CountDownLatch作用:某一線程,等待其他線程執(zhí)行完畢之后,自己再繼續(xù)執(zhí)行。
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch"); latch.trySetCount(1); latch.await(); // 在其他線程或其他JVM里 RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch"); latch.countDown();
在TestController中添加測(cè)試方法,訪問close接口時(shí),調(diào)用await()方法進(jìn)入阻塞狀態(tài),直到有三次訪問release接口時(shí),close接口才會(huì)返回。
@RequestMapping("/close") public String close() throws InterruptedException { RCountDownLatch close = redissonClient.getCountDownLatch("close"); close.trySetCount(3); close.await(); return "close"; } @RequestMapping("/release") public String release(){ RCountDownLatch close = redissonClient.getCountDownLatch("close"); close.countDown(); return "release"; }
關(guān)于“SpringBoot整合Redisson如何實(shí)現(xiàn)分布式鎖”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。