溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Redis中怎樣實(shí)現(xiàn)一個(gè)分布式鎖

發(fā)布時(shí)間:2021-08-03 16:26:23 來源:億速云 閱讀:158 作者:Leah 欄目:數(shù)據(jù)庫

這期內(nèi)容當(dāng)中小編將會(huì)給大家?guī)碛嘘P(guān)Redis中怎樣實(shí)現(xiàn)一個(gè)分布式鎖,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

方案

使用 SETNX 和 EXPIRE 命令

SETNX key value  EXPIRE key seconds  DEL key  if (setnx("item_1_lock", 1)) {      expire("item_1_lock", 30);      try {          ... 邏輯      } catch {          ...      } finally {          del("item_1_lock");      }  }

這種方法看起來可以解決問題,但是有一定的風(fēng)險(xiǎn),因?yàn)?SETNX 和 EXPIRE 這波操作是非原子性的,如果 SETNX 成功之后,出現(xiàn)錯(cuò)誤,導(dǎo)致 EXPIRE 沒有執(zhí)行,導(dǎo)致鎖沒有設(shè)置超時(shí)時(shí)間形成死鎖。

Redis中怎樣實(shí)現(xiàn)一個(gè)分布式鎖

針對(duì)這種情況,我們可以使用 lua 腳本來保持操作原子性,保證 SETNX 和 EXPIRE 兩個(gè)操作要么都成功,要么都不成功。

if (redis.call('setnx', KEYS[1], ARGV[1]) < 1)  then return 0;  end;  redis.call('expire', KEYS[1], tonumber(ARGV[2]));  return 1;

通過這樣的方法,我們初步解決了競(jìng)爭(zhēng)鎖的原子性問題,雖然其他功能還未實(shí)現(xiàn),但是應(yīng)該不會(huì)造成死鎖???。

Redis 2.6.12 以上可靈活使用 SET 命令

SET key value NX EX 30  DEL key  if (set("item_1_lock", 1, "NX", "EX", 30)) {      try {          ... 邏輯      } catch {          ...      } finally {          del("item_1_lock");      }  }

改進(jìn)后的方法不需要借助 lua 腳本就解決了 SETNX 和 EXPIRE 的原子性問題?,F(xiàn)在我們?cè)僮屑?xì)琢磨琢磨,如果 A 拿到了鎖順利進(jìn)入代碼塊執(zhí)行邏輯,但是由于各種原因?qū)е鲁瑫r(shí)自動(dòng)釋放鎖。在這之后 B 成功拿到了鎖進(jìn)入代碼塊執(zhí)行邏輯,但此時(shí)如果 A 執(zhí)行邏輯完畢再來釋放鎖,就會(huì)把 B 剛獲得的鎖釋放了。就好比用自己家的鑰匙開了別家的門,這是不可接受的。

Redis中怎樣實(shí)現(xiàn)一個(gè)分布式鎖

為了解決這個(gè)問題我們可以嘗試在 SET 的時(shí)候設(shè)置一個(gè)鎖標(biāo)識(shí),然后在 DEL 的時(shí)候驗(yàn)證當(dāng)前鎖是否為自己的鎖。

String value = UUID.randomUUID().toString().replaceAll("-", "");  if (set("item_1_lock", value, "NX", "EX", 30)) {      try {          ... 邏輯      } catch {          ...      } finally {          ... lua 腳本保證原子性      }  }  if (redis.call('get', KEYS[1]) == ARGV[1])  then return redis.call('del', KEYS[1])  else return 0  end

到這里,我們終于解決了競(jìng)爭(zhēng)鎖的原子性問題和誤刪鎖問題。但是鎖一般還需要支持可重入、循環(huán)等待和超時(shí)自動(dòng)續(xù)約等功能點(diǎn)。下面我們學(xué)習(xí)使用一個(gè)非常好用的包來解決這些問題。

入門 Redisson

Redission 的鎖,實(shí)現(xiàn)了可重入和超時(shí)自動(dòng)續(xù)約功能,它都幫我們封裝好了,我們只要按照自己的需求調(diào)用它的 API 就可以輕松實(shí)現(xiàn)上面所提到的幾個(gè)功能點(diǎn)。詳細(xì)功能可以查看 Redisson 文檔

在項(xiàng)目中安裝 Redisson

<dependency>      <groupId>org.redisson</groupId>      <artifactId>redisson</artifactId>      <version>3.13.2</version>  </dependency>
implementation 'org.redisson:redisson:3.13.2'

用 Maven 或者 Gradle 構(gòu)建,目前最新版本為 3.13.2,也可以在這里 Redisson 找到你需要的版本。

簡(jiǎn)單嘗試

RedissonClient redissonClient = Redisson.create();  RLock lock = redissonClient.getLock("lock");  boolean res = lock.lock();  if (res) {     try {       ... 邏輯     } finally {         lock.unlock();     }  }

Redisson 將底層邏輯全部做了一個(gè)封裝 ?,我們無需關(guān)心具體實(shí)現(xiàn),幾行代碼就能使用一把完美的鎖。下面我們簡(jiǎn)單折騰折騰源碼 ???。

加鎖

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {      long threadId = Thread.currentThread().getId();      Long ttl = tryAcquire(leaseTime, unit, threadId);      if (ttl == null) {          return;      }      RFuture<RedissonLockEntry> future = subscribe(threadId);      if (interruptibly) {          commandExecutor.syncSubscriptionInterrupted(future);      } else {          commandExecutor.syncSubscription(future);      }      try {          while (true) {              ttl = tryAcquire(leaseTime, unit, threadId);              if (ttl == null) {                  break;              }              if (ttl >= 0) {                  try {                      future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);                  } catch (InterruptedException e) {                      if (interruptibly) {                          throw e;                      }                      future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);                  }              } else {                  if (interruptibly) {                      future.getNow().getLatch().acquire();                  } else {                      future.getNow().getLatch().acquireUninterruptibly();                  }              }         }      } finally {          unsubscribe(future, threadId);      }  }

Redis中怎樣實(shí)現(xiàn)一個(gè)分布式鎖

獲取鎖

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {      if (leaseTime != -1) {          return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);      }      RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);      ttlRemainingFuture.onComplete((ttlRemaining, e) -> {          if (e != null) {              return;          }          if (ttlRemaining == null) {              scheduleExpirationRenewal(threadId);          }      });      return ttlRemainingFuture;  }  <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {      internalLockLeaseTime = unit.toMillis(leaseTime);      return evalWriteAsync(getName(), LongCodec.INSTANCE, command,              "if (redis.call('exists', KEYS[1]) == 0) then " +                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +                      "return nil; " +                      "end; " +                      "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +                      "return nil; " +                      "end; " +                      "return redis.call('pttl', KEYS[1]);",              Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));  }

Redis中怎樣實(shí)現(xiàn)一個(gè)分布式鎖

刪除鎖

public RFuture<Void> unlockAsync(long threadId) {      RPromise<Void> result = new RedissonPromise<Void>();      RFuture<Boolean> future = unlockInnerAsync(threadId);      future.onComplete((opStatus, e) -> {          cancelExpirationRenewal(threadId);          if (e != null) {              result.tryFailure(e);              return;          }          if (opStatus == null) {              IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "                      + id + " thread-id: " + threadId);              result.tryFailure(cause);              return;          }          result.trySuccess(null);      });      return result;  }  protected RFuture<Boolean> unlockInnerAsync(long threadId) {      return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,              "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +                      "return nil;" +                      "end; " +                      "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +                      "if (counter > 0) then " +                      "redis.call('pexpire', KEYS[1], ARGV[2]); " +                      "return 0; " +                      "else " +                      "redis.call('del', KEYS[1]); " +                      "redis.call('publish', KEYS[2], ARGV[1]); " +                      "return 1; " +                      "end; " +                      "return nil;",              Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));  }

上述就是小編為大家分享的Redis中怎樣實(shí)現(xiàn)一個(gè)分布式鎖了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI