溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Redis分布式鎖實(shí)例分析

發(fā)布時(shí)間:2022-03-29 14:20:02 來(lái)源:億速云 閱讀:173 作者:iii 欄目:大數(shù)據(jù)

這篇文章主要介紹“Redis分布式鎖實(shí)例分析”的相關(guān)知識(shí),小編通過(guò)實(shí)際案例向大家展示操作過(guò)程,操作方法簡(jiǎn)單快捷,實(shí)用性強(qiáng),希望這篇“Redis分布式鎖實(shí)例分析”文章能幫助大家解決問(wèn)題。

分布式鎖概覽

在多線程的環(huán)境下,為了保證一個(gè)代碼塊在同一時(shí)間只能由一個(gè)線程訪問(wèn),Java中我們一般可以使用synchronized語(yǔ)法和ReetrantLock去保證,這實(shí)際上是本地鎖的方式。但是現(xiàn)在公司都是流行分布式架構(gòu),在分布式環(huán)境下,如何保證不同節(jié)點(diǎn)的線程同步執(zhí)行呢?因此就引出了分布式鎖,它是控制分布式系統(tǒng)之間互斥訪問(wèn)共享資源的一種方式。

在一個(gè)分布式系統(tǒng)中,多臺(tái)機(jī)器上部署了多個(gè)服務(wù),當(dāng)客戶端一個(gè)用戶發(fā)起一個(gè)數(shù)據(jù)插入請(qǐng)求時(shí),如果沒(méi)有分布式鎖機(jī)制保證,那么那多臺(tái)機(jī)器上的多個(gè)服務(wù)可能進(jìn)行并發(fā)插入操作,導(dǎo)致數(shù)據(jù)重復(fù)插入,對(duì)于某些不允許有多余數(shù)據(jù)的業(yè)務(wù)來(lái)說(shuō),這就會(huì)造成問(wèn)題。而分布式鎖機(jī)制就是為了解決類似這類問(wèn)題,保證多個(gè)服務(wù)之間互斥的訪問(wèn)共享資源,如果一個(gè)服務(wù)搶占了分布式鎖,其他服務(wù)沒(méi)獲取到鎖,就不進(jìn)行后續(xù)操作。大致意思如下圖所示:

Redis分布式鎖實(shí)例分析

分布式鎖的特點(diǎn)

分布式鎖一般有如下的特點(diǎn):

  • 互斥性: 同一時(shí)刻只能有一個(gè)線程持有鎖

  • 可重入性: 同一節(jié)點(diǎn)上的同一個(gè)線程如果獲取了鎖之后能夠再次獲取鎖

  • 鎖超時(shí):和J.U.C中的鎖一樣支持鎖超時(shí),防止死鎖

  • 高性能和高可用: 加鎖和解鎖需要高效,同時(shí)也需要保證高可用,防止分布式鎖失效

  • 具備阻塞和非阻塞性:能夠及時(shí)從阻塞狀態(tài)中被喚醒

分布式鎖的實(shí)現(xiàn)方式

我們一般實(shí)現(xiàn)分布式鎖有以下幾種方式:

  • 基于數(shù)據(jù)庫(kù)

  • 基于Redis

  • 基于zookeeper

Redis普通分布式鎖存在的問(wèn)題

說(shuō)到Redis分布式鎖,大部分人都會(huì)想到:setnx+lua(redis保證執(zhí)行l(wèi)ua腳本時(shí)不執(zhí)行其他操作,保證操作的原子性),或者知道set key value px milliseconds nx。后一種方式的核心實(shí)現(xiàn)命令如下:

- 獲取鎖(unique_value可以是UUID等)
SET resource_name unique_value NX PX 30000

- 釋放鎖(lua腳本中,一定要比較value,防止誤解鎖)
if redis.call("get",KEYS[1]) == ARGV[1] then 
    return redis.call("del",KEYS[1])
else   
    return 0
end

這種實(shí)現(xiàn)方式有3大要點(diǎn)(也是面試概率非常高的地方):

  1. set命令要用set key value px milliseconds nx;

  2. value要具有唯一性;

  3. 釋放鎖時(shí)要驗(yàn)證value值,不能誤解鎖;

事實(shí)上這類鎖最大的缺點(diǎn)就是它加鎖時(shí)只作用在一個(gè)Redis節(jié)點(diǎn)上,即使Redis通過(guò)sentinel保證高可用,如果這個(gè)master節(jié)點(diǎn)由于某些原因發(fā)生了主從切換,那么就會(huì)出現(xiàn)鎖丟失的情況:

  1. 在Redis的master節(jié)點(diǎn)上拿到了鎖;

  2. 但是這個(gè)加鎖的key還沒(méi)有同步到slave節(jié)點(diǎn);

  3. master故障,發(fā)生故障轉(zhuǎn)移,slave節(jié)點(diǎn)升級(jí)為master節(jié)點(diǎn);

  4. 導(dǎo)致鎖丟失。

為了避免單點(diǎn)故障問(wèn)題,Redis作者antirez基于分布式環(huán)境下提出了一種更高級(jí)的分布式鎖的實(shí)現(xiàn)方式:Redlock。Redlock也是Redis所有分布式鎖實(shí)現(xiàn)方式中唯一能讓面試官高潮的方式。

Redis高級(jí)分布式鎖:Redlock

antirez提出的redlock算法大概是這樣的:

在Redis的分布式環(huán)境中,我們假設(shè)有N個(gè)Redis master。這些節(jié)點(diǎn)完全互相獨(dú)立,不存在主從復(fù)制或者其他集群協(xié)調(diào)機(jī)制。我們確保將在N個(gè)實(shí)例上使用與在Redis單實(shí)例下相同方法獲取和釋放鎖。現(xiàn)在我們假設(shè)有5個(gè)Redis master節(jié)點(diǎn),同時(shí)我們需要在5臺(tái)服務(wù)器上面運(yùn)行這些Redis實(shí)例,這樣保證他們不會(huì)同時(shí)都宕掉。

為了取到鎖,客戶端應(yīng)該執(zhí)行以下操作:

  • 獲取當(dāng)前Unix時(shí)間,以毫秒為單位。

  • 依次嘗試從5個(gè)實(shí)例,使用相同的key和具有唯一性的value(例如UUID)獲取鎖。當(dāng)向Redis請(qǐng)求獲取鎖時(shí),客戶端應(yīng)該設(shè)置一個(gè)網(wǎng)絡(luò)連接和響應(yīng)超時(shí)時(shí)間,這個(gè)超時(shí)時(shí)間應(yīng)該小于鎖的失效時(shí)間。例如你的鎖自動(dòng)失效時(shí)間TTL為10秒,則超時(shí)時(shí)間應(yīng)該在5-50毫秒之間。這樣可以避免服務(wù)器端Redis已經(jīng)掛掉的情況下,客戶端還在死死地等待響應(yīng)結(jié)果。如果服務(wù)器端沒(méi)有在規(guī)定時(shí)間內(nèi)響應(yīng),客戶端應(yīng)該盡快嘗試去另外一個(gè)Redis實(shí)例請(qǐng)求獲取鎖。

  • 客戶端使用當(dāng)前時(shí)間減去開始獲取鎖時(shí)間(步驟1記錄的時(shí)間)就得到獲取鎖使用的時(shí)間。當(dāng)且僅當(dāng)從大多數(shù)(N/2+1,這里是3個(gè)節(jié)點(diǎn))的Redis節(jié)點(diǎn)都取到鎖,并且使用的時(shí)間小于鎖失效時(shí)間時(shí),鎖才算獲取成功。

  • 如果取到了鎖,key的真正有效時(shí)間等于有效時(shí)間減去獲取鎖所使用的時(shí)間(步驟3計(jì)算的結(jié)果)。

  • 如果因?yàn)槟承┰?,獲取鎖失?。](méi)有在至少N/2+1個(gè)Redis實(shí)例取到鎖或者取鎖時(shí)間已經(jīng)超過(guò)了有效時(shí)間),客戶端應(yīng)該在所有的Redis實(shí)例上進(jìn)行解鎖(即便某些Redis實(shí)例根本就沒(méi)有加鎖成功,防止某些節(jié)點(diǎn)獲取到鎖但是客戶端沒(méi)有得到響應(yīng)而導(dǎo)致接下來(lái)的一段時(shí)間不能被重新獲取鎖)。

  • 此處不討論時(shí)鐘漂移

Redis分布式鎖實(shí)例分析

Redlock源碼

redisson已經(jīng)有對(duì)redlock算法封裝,接下來(lái)對(duì)其用法進(jìn)行簡(jiǎn)單介紹,并對(duì)核心源碼進(jìn)行分析(假設(shè)5個(gè)redis實(shí)例)。

1. Redlock依賴

<!-- https://mvnrepository.com/artifact/org.redisson/redisson -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.3.2</version>
</dependency>

2. Redlock用法

首先,我們來(lái)看一下redission封裝的redlock算法實(shí)現(xiàn)的分布式鎖用法,非常簡(jiǎn)單,跟重入鎖(ReentrantLock)有點(diǎn)類似:

Config config = new Config();
config.useSentinelServers().addSentinelAddress("127.0.0.1:6369","127.0.0.1:6379", "127.0.0.1:6389")
        .setMasterName("masterName")
        .setPassword("password").setDatabase(0);
RedissonClient redissonClient = Redisson.create(config);
// 還可以getFairLock(), getReadWriteLock()
RLock redLock = redissonClient.getLock("REDLOCK_KEY");
boolean isLock;
try {
    isLock = redLock.tryLock();
    // 500ms拿不到鎖, 就認(rèn)為獲取鎖失敗。10000ms即10s是鎖失效時(shí)間。
    isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS);
    if (isLock) {
        //TODO if get lock success, do something;
    }
} catch (Exception e) {
} finally {
    // 無(wú)論如何, 最后都要解鎖
    redLock.unlock();
}

3. Redlock唯一ID

實(shí)現(xiàn)分布式鎖的一個(gè)非常重要的點(diǎn)就是set的value要具有唯一性,redisson的value是怎樣保證value的唯一性呢?答案是UUID+threadId。入口在redissonClient.getLock(“REDLOCK_KEY”),源碼在Redisson.java和RedissonLock.java中:

protected final UUID id = UUID.randomUUID();
String getLockName(long threadId) {
    return id + ":" + threadId;
}

4. Redlock獲取鎖

獲取鎖的代碼為redLock.tryLock()或者redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS),兩者的最終核心源碼都是下面這段代碼,只不過(guò)前者獲取鎖的默認(rèn)租約時(shí)間(leaseTime)是LOCK_EXPIRATION_INTERVAL_SECONDS,即30s:

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);
    // 獲取鎖時(shí)向5個(gè)redis實(shí)例發(fā)送的命令
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              // 首先分布式鎖的KEY不能存在,如果確實(shí)不存在,那么執(zhí)行hset命令(hset REDLOCK_KEY uuid+threadId 1),并通過(guò)pexpire設(shè)置失效時(shí)間(也是鎖的租約時(shí)間)
              "if (redis.call('exists', KEYS[1]) == 0) then " +
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              // 如果分布式鎖的KEY已經(jīng)存在,并且value也匹配,表示是當(dāng)前線程持有的鎖,那么重入次數(shù)加1,并且設(shè)置失效時(shí)間
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              // 獲取分布式鎖的KEY的失效時(shí)間毫秒數(shù)
              "return redis.call('pttl', KEYS[1]);",
              // 這三個(gè)參數(shù)分別對(duì)應(yīng)KEYS[1],ARGV[1]和ARGV[2]
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

獲取鎖的命令中,

  • KEYS[1]就是Collections.singletonList(getName()),表示分布式鎖的key,即REDLOCK_KEY;

  • ARGV[1]就是internalLockLeaseTime,即鎖的租約時(shí)間,默認(rèn)30s;

  • ARGV[2]就是getLockName(threadId),是獲取鎖時(shí)set的唯一值,即UUID+threadId:

5. Redlock釋放鎖

釋放鎖的代碼為redLock.unlock(),核心源碼如下:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    // 向5個(gè)redis實(shí)例都執(zhí)行如下命令
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            // 如果分布式鎖KEY不存在,那么向channel發(fā)布一條消息
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
            // 如果分布式鎖存在,但是value不匹配,表示鎖已經(jīng)被占用,那么直接返回
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            // 如果就是當(dāng)前線程占有分布式鎖,那么將重入次數(shù)減1
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            // 重入次數(shù)減1后的值如果大于0,表示分布式鎖有重入過(guò),那么只設(shè)置失效時(shí)間,還不能刪除
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
                // 重入次數(shù)減1后的值如果為0,表示分布式鎖只獲取過(guò)1次,那么刪除這個(gè)KEY,并發(fā)布解鎖消息
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            // 這5個(gè)參數(shù)分別對(duì)應(yīng)KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3]
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}

Redis實(shí)現(xiàn)的分布式鎖輪子

下面利用SpringBoot + Jedis + AOP的組合來(lái)實(shí)現(xiàn)一個(gè)簡(jiǎn)易的分布式鎖。

1. 自定義注解

自定義一個(gè)注解,被注解的方法會(huì)執(zhí)行獲取分布式鎖的邏輯

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface RedisLock {
    /**
     * 業(yè)務(wù)鍵
     *
     * @return
     */
    String key();
    /**
     * 鎖的過(guò)期秒數(shù),默認(rèn)是5秒
     *
     * @return
     */
    int expire() default 5;

    /**
     * 嘗試加鎖,最多等待時(shí)間
     *
     * @return
     */
    long waitTime() default Long.MIN_VALUE;
    /**
     * 鎖的超時(shí)時(shí)間單位
     *
     * @return
     */
    TimeUnit timeUnit() default TimeUnit.SECONDS;
}

2. AOP攔截器實(shí)現(xiàn)

在AOP中我們?nèi)?zhí)行獲取分布式鎖和釋放分布式鎖的邏輯,代碼如下:

@Aspect
@Component
public class LockMethodAspect {
    @Autowired
    private RedisLockHelper redisLockHelper;
    @Autowired
    private JedisUtil jedisUtil;
    private Logger logger = LoggerFactory.getLogger(LockMethodAspect.class);

    @Around("@annotation(com.redis.lock.annotation.RedisLock)")
    public Object around(ProceedingJoinPoint joinPoint) {
        Jedis jedis = jedisUtil.getJedis();
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();

        RedisLock redisLock = method.getAnnotation(RedisLock.class);
        String value = UUID.randomUUID().toString();
        String key = redisLock.key();
        try {
            final boolean islock = redisLockHelper.lock(jedis,key, value, redisLock.expire(), redisLock.timeUnit());
            logger.info("isLock : {}",islock);
            if (!islock) {
                logger.error("獲取鎖失敗");
                throw new RuntimeException("獲取鎖失敗");
            }
            try {
                return joinPoint.proceed();
            } catch (Throwable throwable) {
                throw new RuntimeException("系統(tǒng)異常");
            }
        }  finally {
            logger.info("釋放鎖");
            redisLockHelper.unlock(jedis,key, value);
            jedis.close();
        }
    }
}

3. Redis實(shí)現(xiàn)分布式鎖核心類

@Component
public class RedisLockHelper {
    private long sleepTime = 100;
    /**
     * 直接使用setnx + expire方式獲取分布式鎖
     * 非原子性
     *
     * @param key
     * @param value
     * @param timeout
     * @return
     */
    public boolean lock_setnx(Jedis jedis,String key, String value, int timeout) {
        Long result = jedis.setnx(key, value);
        // result = 1時(shí),設(shè)置成功,否則設(shè)置失敗
        if (result == 1L) {
            return jedis.expire(key, timeout) == 1L;
        } else {
            return false;
        }
    }

    /**
     * 使用Lua腳本,腳本中使用setnex+expire命令進(jìn)行加鎖操作
     *
     * @param jedis
     * @param key
     * @param UniqueId
     * @param seconds
     * @return
     */
    public boolean Lock_with_lua(Jedis jedis,String key, String UniqueId, int seconds) {
        String lua_scripts = "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then" +
                "redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";
        List<String> keys = new ArrayList<>();
        List<String> values = new ArrayList<>();
        keys.add(key);
        values.add(UniqueId);
        values.add(String.valueOf(seconds));
        Object result = jedis.eval(lua_scripts, keys, values);
        //判斷是否成功
        return result.equals(1L);
    }

    /**
     * 在Redis的2.6.12及以后中,使用 set key value [NX] [EX] 命令
     *
     * @param key
     * @param value
     * @param timeout
     * @return
     */
    public boolean lock(Jedis jedis,String key, String value, int timeout, TimeUnit timeUnit) {
        long seconds = timeUnit.toSeconds(timeout);
        return "OK".equals(jedis.set(key, value, "NX", "EX", seconds));
    }

    /**
     * 自定義獲取鎖的超時(shí)時(shí)間
     *
     * @param jedis
     * @param key
     * @param value
     * @param timeout
     * @param waitTime
     * @param timeUnit
     * @return
     * @throws InterruptedException
     */
    public boolean lock_with_waitTime(Jedis jedis,String key, String value, int timeout, long waitTime,TimeUnit timeUnit) throws InterruptedException {
        long seconds = timeUnit.toSeconds(timeout);
        while (waitTime >= 0) {
            String result = jedis.set(key, value, "nx", "ex", seconds);
            if ("OK".equals(result)) {
                return true;
            }
            waitTime -= sleepTime;
            Thread.sleep(sleepTime);
        }
        return false;
    }
    /**
     * 錯(cuò)誤的解鎖方法—直接刪除key
     *
     * @param key
     */
    public void unlock_with_del(Jedis jedis,String key) {
        jedis.del(key);
    }

    /**
     * 使用Lua腳本進(jìn)行解鎖操縱,解鎖的時(shí)候驗(yàn)證value值
     *
     * @param jedis
     * @param key
     * @param value
     * @return
     */
    public boolean unlock(Jedis jedis,String key,String value) {
        String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
                "return redis.call('del',KEYS[1]) else return 0 end";
        return jedis.eval(luaScript, Collections.singletonList(key), Collections.singletonList(value)).equals(1L);
    }
}

4. Controller層控制

定義一個(gè)TestController來(lái)測(cè)試我們實(shí)現(xiàn)的分布式鎖

@RestController
public class TestController {
    @RedisLock(key = "redis_lock")
    @GetMapping("/index")
    public String index() {
        return "index";
    }
}

關(guān)于“Redis分布式鎖實(shí)例分析”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí),可以關(guān)注億速云行業(yè)資訊頻道,小編每天都會(huì)為大家更新不同的知識(shí)點(diǎn)。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI