溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Springboot基于Redisson如何實現(xiàn)Redis分布式可重入鎖源碼解析

發(fā)布時間:2022-03-03 13:40:37 來源:億速云 閱讀:162 作者:小新 欄目:開發(fā)技術

這篇文章主要介紹了Springboot基于Redisson如何實現(xiàn)Redis分布式可重入鎖源碼解析,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

      一、前言

      我們在實現(xiàn)使用Redis實現(xiàn)分布式鎖,最開始一般使用SET resource-name anystring NX EX max-lock-time進行加鎖,使用Lua腳本保證原子性進行實現(xiàn)釋放鎖。這樣手動實現(xiàn)比較麻煩,對此Redis官網(wǎng)也明確說Java版使用Redisson來實現(xiàn)。小編也是看了官網(wǎng)慢慢的摸索清楚,特寫此記錄一下。從官網(wǎng)到整合Springboot到源碼解讀,以單節(jié)點為例。

      二、為什么使用Redisson

      1. 我們打開官網(wǎng)

      redis中文官網(wǎng)

      2. 我們可以看到官方讓我們去使用其他

      Springboot基于Redisson如何實現(xiàn)Redis分布式可重入鎖源碼解析

      3. 打開官方推薦

      Springboot基于Redisson如何實現(xiàn)Redis分布式可重入鎖源碼解析

      4. 找到文檔

      Redisson地址

      Springboot基于Redisson如何實現(xiàn)Redis分布式可重入鎖源碼解析

      5. Redisson結構

      Springboot基于Redisson如何實現(xiàn)Redis分布式可重入鎖源碼解析

      三、Springboot整合Redisson

      1. 導入依賴

      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-data-redis</artifactId>
      </dependency>
      <dependency>
          <groupId>redis.clients</groupId>
          <artifactId>jedis</artifactId>
      </dependency>
      <!--redis分布式鎖-->
      <dependency>
          <groupId>org.redisson</groupId>
          <artifactId>redisson</artifactId>
          <version>3.12.0</version>
      </dependency>

      2. 以官網(wǎng)為例查看如何配置

      Springboot基于Redisson如何實現(xiàn)Redis分布式可重入鎖源碼解析

      3. 編寫配置類

      import org.redisson.Redisson;
      import org.redisson.api.RedissonClient;
      import org.redisson.config.Config;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      
      /**
       * @author wangzhenjun
       * @date 2022/2/9 9:57
       */
      @Configuration
      public class MyRedissonConfig {
      
          /**
           * 所有對redisson的使用都是通過RedissonClient來操作的
           * @return
           */
          @Bean(destroyMethod="shutdown")
          public RedissonClient redisson(){
              // 1. 創(chuàng)建配置
              Config config = new Config();
              // 一定要加redis://
              config.useSingleServer().setAddress("redis://192.168.17.130:6379");
              // 2. 根據(jù)config創(chuàng)建出redissonClient實例
              RedissonClient redissonClient = Redisson.create(config);
              return redissonClient;
          }
      }

      4. 官網(wǎng)測試加鎖例子

      Springboot基于Redisson如何實現(xiàn)Redis分布式可重入鎖源碼解析

      5. 根據(jù)官網(wǎng)簡單Controller接口編寫

      @ResponseBody
      @GetMapping("/hello")
      public String hello(){
          // 1.獲取一把鎖,只要鎖名字一樣,就是同一把鎖
          RLock lock = redisson.getLock("my-lock");
          // 2. 加鎖
          lock.lock();// 阻塞試等待  默認加的都是30s
          // 帶參數(shù)情況
          // lock.lock(10, TimeUnit.SECONDS);// 10s自動解鎖,自動解鎖時間一定要大于業(yè)務的執(zhí)行時間。
          try {
              System.out.println("加鎖成功" + Thread.currentThread().getId());
              Thread.sleep(30000);
          } catch (InterruptedException e) {
              e.printStackTrace();
          } finally {
              // 3. 解鎖
              System.out.println("解鎖成功:" + Thread.currentThread().getId());
              lock.unlock();
          }
          return "hello";
      }

      6. 測試

      Springboot基于Redisson如何實現(xiàn)Redis分布式可重入鎖源碼解析

      四、lock.lock()源碼分析

      1. 打開RedissonLock實現(xiàn)類

      Springboot基于Redisson如何實現(xiàn)Redis分布式可重入鎖源碼解析

      2. 找到實現(xiàn)方法

      @Override
      public void lock() {
          try {
          	// 我們發(fā)現(xiàn)不穿過期時間源碼默認過期時間為-1
              lock(-1, null, false);
          } catch (InterruptedException e) {
              throw new IllegalStateException();
          }
      }

      3. 按住Ctrl進去lock方法

      private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
      	// 獲取線程的id,占有鎖的時候field的值為UUID:線程號id
          long threadId = Thread.currentThread().getId();
          // 嘗試獲得鎖
          Long ttl = tryAcquire(leaseTime, unit, threadId);
          // lock acquired 獲得鎖,返回
          if (ttl == null) {
              return;
          }
      	// 這里說明獲取鎖失敗,就通過線程id訂閱這個鎖
          RFuture<RedissonLockEntry> future = subscribe(threadId);
          if (interruptibly) {
              commandExecutor.syncSubscriptionInterrupted(future);
          } else {
              commandExecutor.syncSubscription(future);
          }
      
          try {
          	// 這里進行自旋,不斷嘗試獲取鎖
              while (true) {
              	// 繼續(xù)嘗試獲取鎖
                  ttl = tryAcquire(leaseTime, unit, threadId);
                  // lock acquired 獲取成功
                  if (ttl == null) {
                  	// 直接返回,挑出自旋
                      break;
                  }
      
                  // waiting for message 繼續(xù)等待獲得鎖
                  if (ttl >= 0) {
                      try {
                          future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                      } catch (InterruptedException e) {
                          if (interruptibly) {
                              throw e;
                          }
                          future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                      }
                  } else {
                      if (interruptibly) {
                          future.getNow().getLatch().acquire();
                      } else {
                          future.getNow().getLatch().acquireUninterruptibly();
                      }
                  }
              }
          } finally {
           	// 取消訂閱
              unsubscribe(future, threadId);
          }
      //        get(lockAsync(leaseTime, unit));
      }

      4. 進去嘗試獲取鎖方法

      Springboot基于Redisson如何實現(xiàn)Redis分布式可重入鎖源碼解析

      private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
      	// 直接進入異步方法
          return get(tryAcquireAsync(leaseTime, unit, threadId));
      }
      
      private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
          // 這里進行判斷如果沒有設置參數(shù)leaseTime = -1
          if (leaseTime != -1) {
              return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
          }
          // 此方法進行獲得鎖,過期時間為看門狗的默認時間
          // private long lockWatchdogTimeout = 30 * 1000;看門狗默認過期時間為30s
          // 加鎖和過期時間要保證原子性,這個方法后面肯定調用執(zhí)行了Lua腳本,我們下面在看
          RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
          // 開啟一個定時任務進行不斷刷新過期時間
          ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
              if (e != null) {
                  return;
              }
              // lock acquired 獲得鎖
              if (ttlRemaining == null) {
              	// 刷新過期時間方法,我們下一步詳細說一下
                  scheduleExpirationRenewal(threadId);
          });
          return ttlRemainingFuture;

      5. 查看tryLockInnerAsync()方法

      <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
          internalLockLeaseTime = unit.toMillis(leaseTime);
      
          return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
          		  // 首先判斷鎖是否存在
                    "if (redis.call('exists', KEYS[1]) == 0) then " +
                    		// 存在則獲取鎖
                        "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                        // 然后設置過期時間
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                    "end; " +
                    // hexists查看哈希表的指定字段是否存在,存在鎖并且是當前線程持有鎖
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    		// hincrby自增一
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        	// 鎖的值大于1,說明是可重入鎖,重置過期時間
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                    "end; " +
                    // 鎖已存在,且不是本線程,則返回過期時間ttl
                    "return redis.call('pttl', KEYS[1]);",
                      Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
      }

      6. 進入4留下的定時任務scheduleExpirationRenewal()方法

      一步步往下找源碼:scheduleExpirationRenewal --->renewExpiration

      根據(jù)下面源碼,定時任務刷新時間為:internalLockLeaseTime / 3,是看門狗的1/3,即為10s刷新一次

      private void renewExpiration() {
          ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
          if (ee == null) {
              return;
          }
          
          Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
              @Override
              public void run(Timeout timeout) throws Exception {
                  ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                  if (ent == null) {
                      return;
                  }
                  Long threadId = ent.getFirstThreadId();
                  if (threadId == null) {
                      return;
                  }
                  
                  RFuture<Boolean> future = renewExpirationAsync(threadId);
                  future.onComplete((res, e) -> {
                      if (e != null) {
                          log.error("Can't update lock " + getName() + " expiration", e);
                          return;
                      }
                      
                      if (res) {
                          // reschedule itself
                          renewExpiration();
                      }
                  });
              }
          }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
          
          ee.setTimeout(task);
      }

      五、lock.lock(10, TimeUnit.SECONDS)源碼分析

      1. 打開實現(xiàn)類

      @Override
      public void lock(long leaseTime, TimeUnit unit) {
          try {
          	// 這里的過期時間為我們輸入的10
              lock(leaseTime, unit, false);
          } catch (InterruptedException e) {
              throw new IllegalStateException();
          }
      }

      2. 方法lock()實現(xiàn)展示,同三.3源碼

      3. 直接來到嘗試獲得鎖tryAcquireAsync()方法

      private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
          // 這里進行判斷如果沒有設置參數(shù)leaseTime = -1,此時我們?yōu)?0
          if (leaseTime != -1) {
          	// 來到此方法
              return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
          }
          // 此處省略后面內容,前面以詳細說明。。。。
      }

      4. 打開tryLockInnerAsync()方法

      我們不難發(fā)現(xiàn)和沒有傳過期時間的方法一樣,只不過leaseTime的值變了。

      <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
          internalLockLeaseTime = unit.toMillis(leaseTime);
      
          return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
          		  // 首先判斷鎖是否存在
                    "if (redis.call('exists', KEYS[1]) == 0) then " +
                    		// 存在則獲取鎖
                        "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                        // 然后設置過期時間
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                    "end; " +
                    // hexists查看哈希表的指定字段是否存在,存在鎖并且是當前線程持有鎖
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    		// hincrby自增一
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        	// 鎖的值大于1,說明是可重入鎖,重置過期時間
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                    "end; " +
                    // 鎖已存在,且不是本線程,則返回過期時間ttl
                    "return redis.call('pttl', KEYS[1]);",
                      Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
      }

      六、lock.unlock()源碼分析

      1. 打開方法實現(xiàn)

      @Override
      public void unlock() {
          try {
          	// 點擊進入釋放鎖方法
              get(unlockAsync(Thread.currentThread().getId()));
          } catch (RedisException e) {
              if (e.getCause() instanceof IllegalMonitorStateException) {
                  throw (IllegalMonitorStateException) e.getCause();
              } else {
                  throw e;
              }
          }
          
      //        Future<Void> future = unlockAsync();
      //        future.awaitUninterruptibly();
      //        if (future.isSuccess()) {
      //            return;
      //        }
      //        if (future.cause() instanceof IllegalMonitorStateException) {
      //            throw (IllegalMonitorStateException)future.cause();
      //        }
      //        throw commandExecutor.convertException(future);
      }

      2. 打開unlockAsync()方法

      @Override
      public RFuture<Void> unlockAsync(long threadId) {
          RPromise<Void> result = new RedissonPromise<Void>();
          // 解鎖方法,后面展開說
          RFuture<Boolean> future = unlockInnerAsync(threadId);
      	// 完成
          future.onComplete((opStatus, e) -> {
              if (e != null) {
              	// 取消到期續(xù)訂
                  cancelExpirationRenewal(threadId);
                  // 將這個未來標記為失敗并通知所有人
                  result.tryFailure(e);
                  return;
              }
      		// 狀態(tài)為空,說明解鎖的線程和當前鎖不是同一個線程
              if (opStatus == null) {
                  IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                          + id + " thread-id: " + threadId);
                  result.tryFailure(cause);
                  return;
              }
              
              cancelExpirationRenewal(threadId);
              result.trySuccess(null);
          });
      
          return result;
      }

      3. 打開unlockInnerAsync()方法

      protected RFuture<Boolean> unlockInnerAsync(long threadId) {
          return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
          		// 判斷釋放鎖的線程和已存在鎖的線程是不是同一個線程,不是返回空
                  "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                      "return nil;" +
                  "end; " +
                  // 釋放鎖后,加鎖次數(shù)減一
                  "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                  // 判斷剩余數(shù)量是否大于0
                  "if (counter > 0) then " +
                  	// 大于0 ,則刷新過期時間
                      "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                      "return 0; " +
                  "else " +
                  	// 釋放鎖,刪除key并發(fā)布鎖釋放的消息
                      "redis.call('del', KEYS[1]); " +
                      "redis.call('publish', KEYS[2], ARGV[1]); " +
                      "return 1; "+
                  "end; " +
                  "return nil;",
                  Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
      
      }

      感謝你能夠認真閱讀完這篇文章,希望小編分享的“Springboot基于Redisson如何實現(xiàn)Redis分布式可重入鎖源碼解析”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業(yè)資訊頻道,更多相關知識等著你來學習!

      向AI問一下細節(jié)

      免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內容。

      AI