溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

使用Redisson怎么實(shí)現(xiàn)一個(gè)分布式鎖

發(fā)布時(shí)間:2021-06-17 11:18:41 來(lái)源:億速云 閱讀:154 作者:Leah 欄目:編程語(yǔ)言

使用Redisson怎么實(shí)現(xiàn)一個(gè)分布式鎖,針對(duì)這個(gè)問(wèn)題,這篇文章詳細(xì)介紹了相對(duì)應(yīng)的分析和解答,希望可以幫助更多想解決這個(gè)問(wèn)題的小伙伴找到更簡(jiǎn)單易行的方法。

Redisson鎖繼承Implements Reentrant Lock,所以具備 Reentrant Lock 鎖中的一些特性:超時(shí),重試,可中斷等。加上Redisson中Redis具備分布式的特性,所以非常適合用來(lái)做Java中的分布式鎖。 下面我們對(duì)其加鎖、解鎖過(guò)程中的源碼細(xì)節(jié)進(jìn)行一一分析。

鎖的接口定義了一下方法:

使用Redisson怎么實(shí)現(xiàn)一個(gè)分布式鎖

分布式鎖當(dāng)中加鎖,我們常用的加鎖接口:

boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;

下面我們來(lái)看一下方法的具體實(shí)現(xiàn):

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
  long time = unit.toMillis(waitTime);
  long current = System.currentTimeMillis();
  final long threadId = Thread.currentThread().getId();
  Long ttl = tryAcquire(leaseTime, unit, threadId);
  // lock acquired
  if (ttl == null) {
   return true;
  }
  
  time -= (System.currentTimeMillis() - current);
  if (time <= 0) {
   acquireFailed(threadId);
   return false;
  }
  
  current = System.currentTimeMillis();
  final RFuture subscribeFuture = subscribe(threadId);
  if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
   if (!subscribeFuture.cancel(false)) {
    subscribeFuture.addListener(new FutureListener() {
     @Override
     public void operationComplete(Future future) throws Exception {
      if (subscribeFuture.isSuccess()) {
       unsubscribe(subscribeFuture, threadId);
      }
     }
    });
   }
   acquireFailed(threadId);
   return false;
  }

  try {
   time -= (System.currentTimeMillis() - current);
   if (time <= 0) {
    acquireFailed(threadId);
    return false;
   }
  
   while (true) {
    long currentTime = System.currentTimeMillis();
    ttl = tryAcquire(leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
     return true;
    }

    time -= (System.currentTimeMillis() - currentTime);
    if (time = 0 && ttl < time) {
     getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    } else {
     getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
    }

    time -= (System.currentTimeMillis() - currentTime);
    if (time <= 0) {
     acquireFailed(threadId);
     return false;
    }
   }
  } finally {
   unsubscribe(subscribeFuture, threadId);
  }
//  return get(tryLockAsync(waitTime, leaseTime, unit));
 }

首先我們看到調(diào)用tryAcquire嘗試獲取鎖,在這里是否能獲取到鎖,是根據(jù)鎖名稱的過(guò)期時(shí)間TTL來(lái)判定的(TTL

下面我們接著看一下tryAcquire的實(shí)現(xiàn):

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
 return get(tryAcquireAsync(leaseTime, unit, threadId));
}

可以看到真正獲取鎖的操作經(jīng)過(guò)一層get操作里面執(zhí)行的,這里為何要這么操作,本人也不是太理解,如有理解錯(cuò)誤,歡迎指正。

get 是由CommandAsyncExecutor(一個(gè)線程Executor)封裝的一個(gè)Executor

設(shè)置一個(gè)單線程的同步控制器CountDownLatch,用于控制單個(gè)線程的中斷信息。個(gè)人理解經(jīng)過(guò)中間的這么一步:主要是為了支持線程可中斷操作。

public V get(RFuture future) {
 if (!future.isDone()) {
  final CountDownLatch l = new CountDownLatch(1);
  future.addListener(new FutureListener() {
   @Override
   public void operationComplete(Future future) throws Exception {
    l.countDown();
   }
  });
  
  boolean interrupted = false;
  while (!future.isDone()) {
   try {
    l.await();
   } catch (InterruptedException e) {
    interrupted = true;
   }
  }
  
  if (interrupted) {
   Thread.currentThread().interrupt();
  }
 }

 // commented out due to blocking issues up to 200 ms per minute for each thread:由于每個(gè)線程的阻塞問(wèn)題,每分鐘高達(dá)200毫秒
 // future.awaitUninterruptibly();
 if (future.isSuccess()) {
  return future.getNow();
 }

 throw convertException(future);
}

我們進(jìn)一步往下看:

private RFuture tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
 if (leaseTime != -1) {
  return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
 }
 RFuture ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
 ttlRemainingFuture.addListener(new FutureListener() {
  @Override
  public void operationComplete(Future future) throws Exception {
   if (!future.isSuccess()) {
    return;
   }

   Long ttlRemaining = future.getNow();
   // lock acquired
   if (ttlRemaining == null) {
    scheduleExpirationRenewal(threadId);
   }
  }
 });
 return ttlRemainingFuture;
}

首先判斷鎖是否有超時(shí)時(shí)間,有過(guò)期時(shí)間的話,會(huì)在后面獲取鎖的時(shí)候設(shè)置進(jìn)去。沒(méi)有過(guò)期時(shí)間的話,則會(huì)用默認(rèn)的

private long lockWatchdogTimeout = 30 * 1000;

下面我們?cè)谶M(jìn)一步往下分析真正獲取鎖的操作:

RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
 internalLockLeaseTime = unit.toMillis(leaseTime);

 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
    "if (redis.call('exists', KEYS[1]) == 0) then " +
     "redis.call('hset', KEYS[1], ARGV[2], 1); " +
     "redis.call('pexpire', KEYS[1], ARGV[1]); " +
     "return nil; " +
    "end; " +
    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
     "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
     "redis.call('pexpire', KEYS[1], ARGV[1]); " +
     "return nil; " +
    "end; " +
    "return redis.call('pttl', KEYS[1]);",
    Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

我把里面的重點(diǎn)信息做了以下三點(diǎn)總結(jié):

1:真正執(zhí)行的是一段具有原子性的Lua腳本,并且最終也是由CommandAsynExecutor去執(zhí)行。

2:鎖真正持久化到Redis時(shí),用的hash類型key field value

3:獲取鎖的三個(gè)參數(shù):getName()是邏輯鎖名稱,例如:分布式鎖要鎖住的methodName+params;internalLockLeaseTime是毫秒單位的鎖過(guò)期時(shí)間;getLockName則是鎖對(duì)應(yīng)的線程級(jí)別的名稱,因?yàn)橹С窒嗤€程可重入,不同線程不可重入,所以這里的鎖的生成方式是:UUID+":"threadId。有的同學(xué)可能會(huì)問(wèn),這樣不是很縝密:不同的JVM可能會(huì)生成相同的threadId,所以Redission這里加了一個(gè)區(qū)分度很高的UUID;

Lua腳本中的執(zhí)行分為以下三步:

1:exists檢查redis中是否存在鎖名稱;如果不存在,則獲取成功;同時(shí)把邏輯鎖名稱KEYS[1],線程級(jí)別的鎖名稱[ARGV[2],value=1,設(shè)置到redis。并設(shè)置邏輯鎖名稱的過(guò)期時(shí)間ARGV[2],返回;

2:如果檢查到存在KEYS[1],[ARGV[2],則說(shuō)明獲取成功,此時(shí)會(huì)自增對(duì)應(yīng)的value值,記錄重入次數(shù);并更新鎖的過(guò)期時(shí)間

3:key不存,直接返回key的剩余過(guò)期時(shí)間(-2)

關(guān)于使用Redisson怎么實(shí)現(xiàn)一個(gè)分布式鎖問(wèn)題的解答就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,如果你還有很多疑惑沒(méi)有解開(kāi),可以關(guān)注億速云行業(yè)資訊頻道了解更多相關(guān)知識(shí)。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI