溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

SpringBoot整合Redisson如何實(shí)現(xiàn)分布式鎖

發(fā)布時(shí)間:2021-11-13 09:11:09 來(lái)源:億速云 閱讀:546 作者:小新 欄目:開發(fā)技術(shù)

這篇文章將為大家詳細(xì)講解有關(guān)SpringBoot整合Redisson如何實(shí)現(xiàn)分布式鎖,小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。

Redisson是架設(shè)在redis基礎(chǔ)上的一個(gè)Java駐內(nèi)存數(shù)據(jù)網(wǎng)格(In-Memory Data Grid)。充分的利用了Redis鍵值數(shù)據(jù)庫(kù)提供的一系列優(yōu)勢(shì),基于Java實(shí)用工具包中常用接口,為使用者提供了一系列具有分布式特性的常用工具類。使得原本作為協(xié)調(diào)單機(jī)多線程并發(fā)程序的工具包獲得了協(xié)調(diào)分布式多機(jī)多線程并發(fā)系統(tǒng)的能力,大大降低了設(shè)計(jì)和研發(fā)大規(guī)模分布式系統(tǒng)的難度。同時(shí)結(jié)合各富特色的分布式服務(wù),更進(jìn)一步簡(jiǎn)化了分布式環(huán)境中程序相互之間的協(xié)作。

一、添加依賴

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
 
        <!-- springboot整合redis -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
 
        <!-- springboot整合redisson -->
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.13.6</version>
        </dependency>
    </dependencies>

二、redis配置文件

server:
  port: 8000
 
spring:
  redis:
    host: localhost
    port: 6379
    password: null
    database: 1
    timeout: 30000

三、新建配置類

@Configuration
public class MyRedissonConfig {
 
    @Value("${spring.redis.host}")
    String redisHost;
 
    @Value("${spring.redis.port}")
    String redisPort;
 
    @Value("${spring.redis.password}")
    String redisPassword;
 
    @Value("${spring.redis.timeout}")
    Integer redisTimeout;
 
    /**
     * Redisson配置
     * @return
     */
    @Bean
    RedissonClient redissonClient() {
      //1、創(chuàng)建配置
        Config config = new Config();
        
        redisHost = redisHost.startsWith("redis://") ? redisHost : "redis://" + redisHost;
        SingleServerConfig serverConfig = config.useSingleServer()
                .setAddress(redisHost + ":" + redisPort)
                .setTimeout(redisTimeout);
        
        if (StringUtils.isNotBlank(redisPassword)) {
            serverConfig.setPassword(redisPassword);
        }
        
        return Redisson.create(config);
    }
    
}
//單機(jī)
RedissonClient redisson = Redisson.create();
Config config = new Config();
config.useSingleServer().setAddress("myredisserver:6379");
RedissonClient redisson = Redisson.create(config);
 
 
//主從
 
Config config = new Config();
config.useMasterSlaveServers()
    .setMasterAddress("127.0.0.1:6379")
    .addSlaveAddress("127.0.0.1:6389", "127.0.0.1:6332", "127.0.0.1:6419")
    .addSlaveAddress("127.0.0.1:6399");
RedissonClient redisson = Redisson.create(config);
 
 
//哨兵
Config config = new Config();
config.useSentinelServers()
    .setMasterName("mymaster")
    .addSentinelAddress("127.0.0.1:26389", "127.0.0.1:26379")
    .addSentinelAddress("127.0.0.1:26319");
RedissonClient redisson = Redisson.create(config);
 
 
//集群
Config config = new Config();
config.useClusterServers()
    .setScanInterval(2000) // cluster state scan interval in milliseconds
    .addNodeAddress("127.0.0.1:7000", "127.0.0.1:7001")
    .addNodeAddress("127.0.0.1:7002");
RedissonClient redisson = Redisson.create(config);

四、使用分布式鎖

可重入鎖

基于Redis的Redisson分布式可重入鎖RLock對(duì)象實(shí)現(xiàn)了java.util.concurrent.locks.Lock接口。

@RequestMapping("/redisson")
    public String testRedisson(){
        //獲取分布式鎖,只要鎖的名字一樣,就是同一把鎖
        RLock lock = redissonClient.getLock("lock");
 
        //加鎖(阻塞等待),默認(rèn)過期時(shí)間是無(wú)限期
        lock.lock();
        try{
            //如果業(yè)務(wù)執(zhí)行過長(zhǎng),Redisson會(huì)自動(dòng)給鎖續(xù)期
            Thread.sleep(1000);
            System.out.println("加鎖成功,執(zhí)行業(yè)務(wù)邏輯");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //解鎖,如果業(yè)務(wù)執(zhí)行完成,就不會(huì)繼續(xù)續(xù)期
            lock.unlock();
        }
 
        return "Hello Redisson!";
    }

如果拿到分布式鎖的節(jié)點(diǎn)宕機(jī),且這個(gè)鎖正好處于鎖住的狀態(tài)時(shí),會(huì)出現(xiàn)鎖死的狀態(tài),為了避免這種情況的發(fā)生,鎖都會(huì)設(shè)置一個(gè)過期時(shí)間。這樣也存在一個(gè)問題,一個(gè)線程拿到了鎖設(shè)置了30s超時(shí),在30s后這個(gè)線程還沒有執(zhí)行完畢,鎖超時(shí)釋放了,就會(huì)導(dǎo)致問題,Redisson給出了自己的答案,就是 watch dog 自動(dòng)延期機(jī)制。
Redisson提供了一個(gè)監(jiān)控鎖的看門狗,它的作用是在Redisson實(shí)例被關(guān)閉前,不斷的延長(zhǎng)鎖的有效期,也就是說,如果一個(gè)拿到鎖的線程一直沒有完成邏輯,那么看門狗會(huì)幫助線程不斷的延長(zhǎng)鎖超時(shí)時(shí)間,鎖不會(huì)因?yàn)槌瑫r(shí)而被釋放。
默認(rèn)情況下,看門狗的續(xù)期時(shí)間是30s,也可以通過修改Config.lockWatchdogTimeout來(lái)另行指定。
另外Redisson 還提供了可以指定leaseTime參數(shù)的加鎖方法來(lái)指定加鎖的時(shí)間。超過這個(gè)時(shí)間后鎖便自動(dòng)解開了,不會(huì)延長(zhǎng)鎖的有效期。

在RedissonLock類的renewExpiration()方法中,會(huì)啟動(dòng)一個(gè)定時(shí)任務(wù)每隔30/3=10秒給鎖續(xù)期。如果業(yè)務(wù)執(zhí)行期間,應(yīng)用掛了,那么不會(huì)自動(dòng)續(xù)期,到過期時(shí)間之后,鎖會(huì)自動(dòng)釋放。

private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getName() + " expiration", e);
                        return;
                    }
                    
                    if (res) {
                        // reschedule itself
                        renewExpiration();
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); 
        ee.setTimeout(task);
    }

另外Redisson還提供了leaseTime的參數(shù)來(lái)指定加鎖的時(shí)間。超過這個(gè)時(shí)間后鎖便自動(dòng)解開了。

// 加鎖以后10秒鐘自動(dòng)解鎖
// 無(wú)需調(diào)用unlock方法手動(dòng)解鎖
lock.lock(10, TimeUnit.SECONDS);
 
// 嘗試加鎖,最多等待100秒,上鎖以后10秒自動(dòng)解鎖
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);

如果指定了鎖的超時(shí)時(shí)間,底層直接調(diào)用lua腳本,進(jìn)行占鎖。如果超過leaseTime,業(yè)務(wù)邏輯還沒有執(zhí)行完成,則直接釋放鎖,所以在指定leaseTime時(shí),要讓leaseTime大于業(yè)務(wù)執(zhí)行時(shí)間。RedissonLock類的tryLockInnerAsync()方法

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);
 
        return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

讀寫鎖

分布式可重入讀寫鎖允許同時(shí)有多個(gè)讀鎖和一個(gè)寫鎖處于加鎖狀態(tài)。在讀寫鎖中,讀讀共享、讀寫互斥、寫寫互斥。

RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常見的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();

讀寫鎖測(cè)試類,當(dāng)訪問write接口時(shí),read接口會(huì)被阻塞住。

@RestController
public class TestController {
 
    @Autowired
    RedissonClient redissonClient;
 
    @Autowired
    StringRedisTemplate redisTemplate;
 
    @RequestMapping("/write")
    public String write(){
        RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("wr-lock");
        RLock writeLock = readWriteLock.writeLock();
        String s = UUID.randomUUID().toString();
        writeLock.lock();
        try {
            redisTemplate.opsForValue().set("wr-lock-key", s);
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            writeLock.unlock();
        }
        return s;
    }
 
    @RequestMapping("/read")
    public String read(){
        RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("wr-lock");
        RLock readLock = readWriteLock.readLock();
        String s = "";
        readLock.lock();
        try {
            s = redisTemplate.opsForValue().get("wr-lock-key");
        } finally {
            readLock.unlock();
        }
        return s;
    }
}

信號(hào)量(Semaphore)

基于Redis的Redisson的分布式信號(hào)量(Semaphore)Java對(duì)象RSemaphore采用了與java.util.concurrent.Semaphore類似的接口和用法

關(guān)于信號(hào)量的使用你們能夠想象一下這個(gè)場(chǎng)景,有三個(gè)停車位,當(dāng)三個(gè)停車位滿了后,其余車就不停了。能夠把車位比做信號(hào),如今有三個(gè)信號(hào),停一次車,用掉一個(gè)信號(hào),車離開就是釋放一個(gè)信號(hào)。

SpringBoot整合Redisson如何實(shí)現(xiàn)分布式鎖

咱們用 Redisson 來(lái)演示上述停車位的場(chǎng)景。

先定義一個(gè)占用停車位的方法:

/**
* 停車,占用停車位
* 總共 3 個(gè)車位
*/
@ResponseBody
@RequestMapping("park")
public String park() throws InterruptedException {
  // 獲取信號(hào)量(停車場(chǎng))
  RSemaphore park = redisson.getSemaphore("park");
  // 獲取一個(gè)信號(hào)(停車位)
  park.acquire();
 
  return "OK";
}

再定義一個(gè)離開車位的方法:

/**
 * 釋放車位
 * 總共 3 個(gè)車位
 */
@ResponseBody
@RequestMapping("leave")
public String leave() throws InterruptedException {
    // 獲取信號(hào)量(停車場(chǎng))
    RSemaphore park = redisson.getSemaphore("park");
    // 釋放一個(gè)信號(hào)(停車位)
    park.release();
 
    return "OK";
}

為了簡(jiǎn)便,我用 Redis 客戶端添加了一個(gè) key:“park”,值等于 3,表明信號(hào)量為 park,總共有三個(gè)值。

SpringBoot整合Redisson如何實(shí)現(xiàn)分布式鎖

 而后用 postman 發(fā)送 park 請(qǐng)求占用一個(gè)停車位。

SpringBoot整合Redisson如何實(shí)現(xiàn)分布式鎖

而后在 redis 客戶端查看 park 的值,發(fā)現(xiàn)已經(jīng)改成 2 了。繼續(xù)調(diào)用兩次,發(fā)現(xiàn) park 的等于 0,當(dāng)調(diào)用第四次的時(shí)候,會(huì)發(fā)現(xiàn)請(qǐng)求一直處于等待中,說明車位不夠了。若是想要不阻塞,能夠用 tryAcquire 或 tryAcquireAsync。

咱們?cè)僬{(diào)用離開車位的方法,park 的值變?yōu)榱?1,表明車位剩余 1 個(gè)。

注意:屢次執(zhí)行釋放信號(hào)量操做,剩余信號(hào)量會(huì)一直增長(zhǎng),而不是到 3 后就封頂了。

閉鎖(CountDownLatch)

CountDownLatch作用:某一線程,等待其他線程執(zhí)行完畢之后,自己再繼續(xù)執(zhí)行。

RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();
 
// 在其他線程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();

在TestController中添加測(cè)試方法,訪問close接口時(shí),調(diào)用await()方法進(jìn)入阻塞狀態(tài),直到有三次訪問release接口時(shí),close接口才會(huì)返回。

@RequestMapping("/close")
    public String close() throws InterruptedException {
        RCountDownLatch close = redissonClient.getCountDownLatch("close");
        close.trySetCount(3);
        close.await();
        return "close";
    }
 
    @RequestMapping("/release")
    public String release(){
        RCountDownLatch close = redissonClient.getCountDownLatch("close");
        close.countDown();
        return "release";
    }

關(guān)于“SpringBoot整合Redisson如何實(shí)現(xiàn)分布式鎖”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI