溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

怎么在SpringBoot中使用Redis實現(xiàn)分布式鎖

發(fā)布時間:2023-03-29 11:20:22 來源:億速云 閱讀:127 作者:iii 欄目:開發(fā)技術(shù)

這篇文章主要介紹了怎么在SpringBoot中使用Redis實現(xiàn)分布式鎖的相關(guān)知識,內(nèi)容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇怎么在SpringBoot中使用Redis實現(xiàn)分布式鎖文章都會有所收獲,下面我們一起來看看吧。

一、Redis實現(xiàn)分布式鎖原理

為什么需要分布式鎖

在聊分布式鎖之前,有必要先解釋一下,為什么需要分布式鎖。

與分布式鎖相對就的是單機鎖,我們在寫多線程程序時,避免同時操作一個共享變量產(chǎn)生數(shù)據(jù)問題,通常會使用一把鎖來互斥以保證共享變量的正確性,其使用范圍是在同一個進程中。如果換做是多個進程,需要同時操作一個共享資源,如何互斥呢?現(xiàn)在的業(yè)務(wù)應用通常是微服務(wù)架構(gòu),這也意味著一個應用會部署多個進程,多個進程如果需要修改MySQL中的同一行記錄,為了避免操作亂序?qū)е屡K數(shù)據(jù),此時就需要引入分布式鎖了。

怎么在SpringBoot中使用Redis實現(xiàn)分布式鎖

想要實現(xiàn)分布式鎖,必須借助一個外部系統(tǒng),所有進程都去這個系統(tǒng)上申請加鎖。而這個外部系統(tǒng),必須要實現(xiàn)互斥能力,即兩個請求同時進來,只會給一個進程加鎖成功,另一個失敗。這個外部系統(tǒng)可以是數(shù)據(jù)庫,也可以是Redis或Zookeeper,但為了追求性能,我們通常會選擇使用Redis或Zookeeper來做。

Redis本身可以被多個客戶端共享訪問,正好就是一個共享存儲系統(tǒng),可以用來保存分布式鎖。而且 Redis 的讀寫性能高,可以應對高并發(fā)的鎖操作場景。本文主要探討如何基于Redis實現(xiàn)分布式鎖以及實現(xiàn)過程中可能面臨的問題。

分布式鎖如何實現(xiàn)

作為分布式鎖實現(xiàn)過程中的共享存儲系統(tǒng),Redis可以使用鍵值對來保存鎖變量,在接收和處理不同客戶端發(fā)送的加鎖和釋放鎖的操作請求。那么,鍵值對的鍵和值具體是怎么定的呢?我們要賦予鎖變量一個變量名,把這個變量名作為鍵值對的鍵,而鎖變量的值,則是鍵值對的值,這樣一來,Redis就能保存鎖變量了,客戶端也就可以通過Redis的命令操作來實現(xiàn)鎖操作。

想要實現(xiàn)分布式鎖,必須要求Redis有互斥的能力。可以使用SETNX命令,其含義是SET IF NOT EXIST,即如果key不存在,才會設(shè)置它的值,否則什么也不做。兩個客戶端進程可以執(zhí)行這個命令,達到互斥,就可以實現(xiàn)一個分布式鎖。

以下展示了Redis使用key/value對保存鎖變量,以及兩個客戶端同時請求加鎖的操作過程。

怎么在SpringBoot中使用Redis實現(xiàn)分布式鎖

加鎖操作完成后,加鎖成功的客戶端,就可以去操作共享資源,例如,修改MySQL的某一行數(shù)據(jù)。操作完成后,還要及時釋放鎖,給后來者讓出操作共享資源的機會。如何釋放鎖呢?直接使用DEL命令刪除這個key即可。這個邏輯非常簡單,整體的流程寫成偽代碼就是下面這樣。

// 加鎖
SETNX lock_key 1
// 業(yè)務(wù)邏輯
DO THINGS
// 釋放鎖
DEL lock_key

但是,以上實現(xiàn)存在一個很大的問題,當客戶端1拿到鎖后,如果發(fā)生下面的場景,就會造成死鎖。

程序處理業(yè)務(wù)邏輯異常,沒及時釋放鎖進程掛了,沒機會釋放鎖

以上情況會導致已經(jīng)獲得鎖的客戶端一直占用鎖,其他客戶端永遠無法獲取到鎖。

如何避免死鎖

為了解決以上死鎖問題,最容易想到的方案是在申請鎖時,在Redis中實現(xiàn)時,給鎖設(shè)置一個過期時間,假設(shè)操作共享資源的時間不會超過10s,那么加鎖時,給這個key設(shè)置10s過期即可。

但以上操作還是有問題,加鎖、設(shè)置過期時間是2條命令,有可能只執(zhí)行了第一條,第二條卻執(zhí)行失敗,例如:

1.SETNX執(zhí)行成功,執(zhí)行EXPIRE時由于網(wǎng)絡(luò)問題,執(zhí)行失敗
2.SETNX執(zhí)行成功,Redis異常宕機,EXPIRE沒有機會執(zhí)行
3.SETNX執(zhí)行成功,客戶端異常崩潰,EXPIRE沒有機會執(zhí)行

總之這兩條命令如果不能保證是原子操作,就有潛在的風險導致過期時間設(shè)置失敗,依舊有可能發(fā)生死鎖問題。幸好在Redis 2.6.12之后,Redis擴展了SET命令的參數(shù),可以在SET的同時指定EXPIRE時間,這條操作是原子的,例如以下命令是設(shè)置鎖的過期時間為10秒。

SET lock_key 1 EX 10 NX

至此,解決了死鎖問題,但還是有其他問題。想像下面這個這樣一種場景:

怎么在SpringBoot中使用Redis實現(xiàn)分布式鎖

  1. 客戶端1加鎖成功,開始操作共享資源

  2. 客戶端1操作共享資源耗時太久,超過了鎖的過期時間,鎖失效(鎖被自動釋放)

  3. 客戶端2加鎖成功,開始操作共享資源

  4. 客戶端1操作共享資源完成,在finally塊中手動釋放鎖,但此時它釋放的是客戶端2的鎖。

這里存在兩個嚴重的問題:

  • 鎖過期

  • 釋放了別人的鎖

第1個問題是評估操作共享資源的時間不準確導致的,如果只是一味增大過期時間,只能緩解問題降低出現(xiàn)問題的概率,依舊無法徹底解決問題。原因在于客戶端在拿到鎖之后,在操作共享資源時,遇到的場景是很復雜的,既然是預估的時間,也只能是大致的計算,不可能覆蓋所有導致耗時變長的場景。

第2個問題是釋放了別人的鎖,原因在于釋放鎖的操作是無腦操作,并沒有檢查這把鎖的歸屬,這樣解鎖不嚴謹。如何解決呢?

鎖被別人給釋放了

解決辦法是,客戶端在加鎖時,設(shè)置一個只有自己知道的唯一標識進去,例如可以是自己的線程ID,如果是redis實現(xiàn),就是SET key unique_value EX 10 NX。之后在釋放鎖時,要先判斷這把鎖是否歸自己持有,只有是自己的才能釋放它。

//釋放鎖 比較unique_value是否相等,避免誤釋放
if redis.get("key") == unique_value then
    return redis.del("key")

這里釋放鎖使用的是GET + DEL兩條命令,這時又會遇到原子性問題了。

  1. 客戶端1執(zhí)行GET,判斷鎖是自己的

  2. 客戶端2執(zhí)行了SET命令,強制獲取到鎖(雖然發(fā)生概念很低,但要嚴謹考慮鎖的安全性)

  3. 客戶端1執(zhí)行DEL,卻釋放了客戶端2的鎖

由此可見,以上GET + DEL兩個命令還是必須原子的執(zhí)行才行。怎樣原子執(zhí)行兩條命令呢?答案是Lua腳本,可以把以上邏輯寫成Lua腳本,讓Redis執(zhí)行。因為Redis處理每個請求是單線程執(zhí)行的,在執(zhí)行一個Lua腳本時其它請求必須等待,直到這個Lua腳本處理完成,這樣一來GET+DEL之間就不會有其他命令執(zhí)行了。

以下是使用Lua腳本(unlock.script)實現(xiàn)的釋放鎖操作的偽代碼,其中,KEYS[1]表示lock_key,ARGV[1]是當前客戶端的唯一標識,這兩個值都是我們在執(zhí)行 Lua腳本時作為參數(shù)傳入的。

//Lua腳本語言,釋放鎖 比較unique_value是否相等,避免誤釋放
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

最后我們執(zhí)行以下命令,即可

redis-cli  --eval  unlock.script lock_key , unique_value

這樣一路優(yōu)先下來,整個加鎖、解鎖流程就更嚴謹了,先小結(jié)一下,基于Redis實現(xiàn)的分布式鎖,一個嚴謹?shù)牧鞒倘缦拢?/p>

  1. 加鎖時要設(shè)置過期時間SET lock_key unique_value EX expire_time NX

  2. 操作共享資源

  3. 釋放鎖:Lua腳本,先GET判斷鎖是否歸屬自己,再DEL釋放鎖

有了這個嚴謹?shù)逆i模型,我們還需要重新思考之前的那個問題,鎖的過期時間不好評估怎么辦。

如何確定鎖的過期時間

前面提到過,過期時間如果評估得不好,這個鎖就會有提前過期的風險,一種妥協(xié)的解決方案是,盡量冗余過期時間,降低鎖提前過期的概率,但這個方案并不能完美解決問題。是否可以設(shè)置這樣的方案,加鎖時,先設(shè)置一個預估的過期時間,然后開啟一個守護線程,定時去檢測這個鎖的失效時間,如果鎖快要過期了,操作共享資源還未完成,那么就自動對鎖進行續(xù)期,重新設(shè)置過期時間。

這是一種比較好的方案,已經(jīng)有一個庫把這些工作都封裝好了,它就是Redisson。Redisson是一個Java語言實現(xiàn)的Redis SDK客戶端,在使用分布式鎖時,它就采用了自動續(xù)期的方案來避免鎖過期,這個守護線程我們一般叫它看門狗線程。這個SDK提供的API非常友好,它可以像操作本地鎖一樣操作分布式鎖??蛻舳艘坏┘渔i成功,就會啟動一個watch dog看門狗線程,它是一個后臺線程,會每隔一段時間(這段時間的長度與設(shè)置的鎖的過期時間有關(guān))檢查一下,如果檢查時客戶端還持有鎖key(也就是說還在操作共享資源),那么就會延長鎖key的生存時間。

怎么在SpringBoot中使用Redis實現(xiàn)分布式鎖

那如果客戶端在加鎖成功后就宕機了呢?宕機了那么看門狗任務(wù)就不存在了,也就無法為鎖續(xù)期了,鎖到期自動失效。

Redis的部署方式對鎖的影響

上面討論的情況,都是鎖在單個Redis 實例中可能產(chǎn)生的問題,并沒有涉及到Redis的部署架構(gòu)細節(jié)。

Redis發(fā)展到現(xiàn)在,幾種常見的部署架構(gòu)有:

  • 單機模式;

  • 主從模式;

  • 哨兵(sentinel)模式;

  • 集群模式;

我們使用Redis時,一般會采用主從集群+哨兵的模式部署,哨兵的作用就是監(jiān)測redis節(jié)點的運行狀態(tài)。普通的主從模式,當master崩潰時,需要手動切換讓slave成為master,使用主從+哨兵結(jié)合的好處在于,當master異常宕機時,哨兵可以實現(xiàn)故障自動切換,把slave提升為新的master,繼續(xù)提供服務(wù),以此保證可用性。那么當主從發(fā)生切換時,分布式鎖依舊安全嗎?

怎么在SpringBoot中使用Redis實現(xiàn)分布式鎖

想像這樣的場景:

  1. 客戶端1在master上執(zhí)行SET命令,加鎖成功

  2. 此時,master異常宕機,SET命令還未同步到slave上(主從復制是異步的)

  3. 哨兵將slave提升為新的master,但這個鎖在新的master上丟失了,導致客戶端2來加鎖成功了,兩個客戶端共同操作共享資源

可見,當引入Redis副本后,分布式鎖還是可能受到影響。即使Redis通過sentinel保證高可用,如果這個master節(jié)點由于某些原因發(fā)生了主從切換,那么就會出現(xiàn)鎖丟失的情況。

集群模式+Redlock實現(xiàn)高可靠的分布式鎖

為了避免Redis實例故障而導致的鎖無法工作的問題,Redis的開發(fā)者 Antirez提出了分布式鎖算法Redlock。Redlock算法的基本思路,是讓客戶端和多個獨立的Redis實例依次請求加鎖,如果客戶端能夠和半數(shù)以上的實例成功地完成加鎖操作,那么我們就認為,客戶端成功地獲得分布式鎖了,否則加鎖失敗。這樣一來,即使有單個Redis實例發(fā)生故障,因為鎖變量在其它實例上也有保存,所以,客戶端仍然可以正常地進行鎖操作,鎖變量并不會丟失。

來具體看下Redlock算法的執(zhí)行步驟。Redlock算法的實現(xiàn)要求Redis采用集群部署模式,無哨兵節(jié)點,需要有N個獨立的Redis實例(官方推薦至少5個實例)。接下來,我們可以分成3步來完成加鎖操作。

怎么在SpringBoot中使用Redis實現(xiàn)分布式鎖

第一步是,客戶端獲取當前時間。

第二步是,客戶端按順序依次向N個Redis實例執(zhí)行加鎖操作。

這里的加鎖操作和在單實例上執(zhí)行的加鎖操作一樣,使用SET命令,帶上NX、EX/PX選項,以及帶上客戶端的唯一標識。當然,如果某個Redis實例發(fā)生故障了,為了保證在這種情況下,Redlock算法能夠繼續(xù)運行,我們需要給加鎖操作設(shè)置一個超時時間。如果客戶端在和一個Redis實例請求加鎖時,一直到超時都沒有成功,那么此時,客戶端會和下一個Redis實例繼續(xù)請求加鎖。加鎖操作的超時時間需要遠遠地小于鎖的有效時間,一般也就是設(shè)置為幾十毫秒。

第三步是,一旦客戶端完成了和所有Redis實例的加鎖操作,客戶端就要計算整個加鎖過程的總耗時。

客戶端只有在滿足兩個條件時,才能認為是加鎖成功,條件一是客戶端從超過半數(shù)(大于等于 N/2+1)的Redis實例上成功獲取到了鎖;條件二是客戶端獲取鎖的總耗時沒有超過鎖的有效時間。

為什么大多數(shù)實例加鎖成功才能算成功呢?多個Redis實例一起來用,其實就組成了一個分布式系統(tǒng)。在分布式系統(tǒng)中總會出現(xiàn)異常節(jié)點,所以在談?wù)摲植际较到y(tǒng)時,需要考慮異常節(jié)點達到多少個,也依舊不影響整個系統(tǒng)的正確運行。這是一個分布式系統(tǒng)的容錯問題,這個問題的結(jié)論是:如果只存在故障節(jié)點,只要大多數(shù)節(jié)點正常,那么整個系統(tǒng)依舊可以提供正確服務(wù)。

在滿足了這兩個條件后,我們需要重新計算這把鎖的有效時間,計算的結(jié)果是鎖的最初有效時間減去客戶端為獲取鎖的總耗時。如果鎖的有效時間已經(jīng)來不及完成共享數(shù)據(jù)的操作了,我們可以釋放鎖,以免出現(xiàn)還沒完成共享資源操作,鎖就過期了的情況。

當然,如果客戶端在和所有實例執(zhí)行完加鎖操作后,沒能同時滿足這兩個條件,那么,客戶端就要向所有Redis節(jié)點發(fā)起釋放鎖的操作。為什么釋放鎖,要操作所有的節(jié)點呢,不能只操作那些加鎖成功的節(jié)點嗎?因為在某一個Redis節(jié)點加鎖時,可能因為網(wǎng)絡(luò)原因?qū)е录渔i失敗,例如一個客戶端在一個Redis實例上加鎖成功,但在讀取響應結(jié)果時由于網(wǎng)絡(luò)問題導致讀取失敗,那這把鎖其實已經(jīng)在Redis上加鎖成功了。所以釋放鎖時,不管之前有沒有加鎖成功,需要釋放所有節(jié)點上的鎖以保證清理節(jié)點上的殘留的鎖。

在Redlock算法中,釋放鎖的操作和在單實例上釋放鎖的操作一樣,只要執(zhí)行釋放鎖的 Lua腳本就可以了。這樣一來,只要N個Redis實例中的半數(shù)以上實例能正常工作,就能保證分布式鎖的正常工作了。所以,在實際的業(yè)務(wù)應用中,如果你想要提升分布式鎖的可靠性,就可以通過Redlock算法來實現(xiàn)。

二、代碼實現(xiàn)Redis分布式鎖

1.SpringBoot整合redis用到最多的當然屬于我們的老朋友RedisTemplate,pom依賴如下:

<!-- springboot整合redis -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2.Redis配置類:

package com.example.redisdemo.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * @description: Redis配置類
 * @author Keson
 * @date 21:20 2022/11/14
 * @Param
 * @return
 * @version 1.0
 */
@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
        // 設(shè)置序列化
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        // 配置redisTemplate
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
        redisTemplate.setConnectionFactory(lettuceConnectionFactory);
        RedisSerializer<?> stringSerializer = new StringRedisSerializer();
        redisTemplate.setKeySerializer(stringSerializer);// key序列化
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);// value序列化
        redisTemplate.setHashKeySerializer(stringSerializer);// Hash key序列化
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);// Hash value序列化
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
}

3.Service層面

package com.example.redisdemo.service;

import com.example.redisdemo.entity.CustomerBalance;
import java.util.concurrent.Callable;

/**
 * @author Keson
 * @version 1.0
 * @description: TODO
 * @date 2022/11/14 15:12
 */
public interface RedisService {

    <T> T callWithLock(CustomerBalance customerBalance, Callable<T> callable) throws Exception;
}
package com.example.redisdemo.service.impl;

import com.example.redisdemo.entity.CustomerBalance;
import com.example.redisdemo.service.RedisService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.connection.ReturnType;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.types.Expiration;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

/**
 * @author Keson
 * @version 1.0
 * @description: TODO Redis實現(xiàn)分布式鎖
 * @date 2022/11/14 15:13
 */
@Service
@Slf4j
public class RedisServiceImpl implements RedisService {

    //設(shè)置默認過期時間
    private final static int DEFAULT_LOCK_EXPIRY_TIME = 20;
    //自定義lock key前綴
    private final static String LOCK_PREFIX = "LOCK:CUSTOMER_BALANCE";

    @Autowired
    private RedisTemplate redisTemplate;

    @Override
    public <T> T callWithLock(CustomerBalance customerBalance, Callable<T> callable) throws Exception{
        //自定義lock key
        String lockKey = getLockKey(customerBalance.getCustomerNumber(), customerBalance.getSubAccountNumber(), customerBalance.getCurrencyCode());
        //將UUID當做value,確保唯一性
        String lockReference = UUID.randomUUID().toString();

        try {
            if (!lock(lockKey, lockReference, DEFAULT_LOCK_EXPIRY_TIME, TimeUnit.SECONDS)) {
                throw new Exception("lock加鎖失敗");
            }
            return callable.call();
        } finally {
            unlock(lockKey, lockReference);
        }
    }

    //定義lock key
    String getLockKey(String customerNumber, String subAccountNumber, String currencyCode) {
        return String.format("%s:%s:%s:%s", LOCK_PREFIX, customerNumber, subAccountNumber, currencyCode);
    }

    //redis加鎖
    private boolean lock(String key, String value, long timeout, TimeUnit timeUnit) {
        Boolean locked;
        try {
            //SET_IF_ABSENT --> NX: Only set the key if it does not already exist.
            //SET_IF_PRESENT --> XX: Only set the key if it already exist.
            locked = (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection ->
                    connection.set(key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8),
                            Expiration.from(timeout, timeUnit), RedisStringCommands.SetOption.SET_IF_ABSENT));
        } catch (Exception e) {
            log.error("Lock failed for redis key: {}, value: {}", key, value);
            locked = false;
        }
        return locked != null && locked;
    }

    //redis解鎖
    private boolean unlock(String key, String value) {
        try {
            //使用lua腳本保證刪除的原子性,確保解鎖
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] " +
                            "then return redis.call('del', KEYS[1]) " +
                            "else return 0 end";
            Boolean unlockState = (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection ->
                    connection.eval(script.getBytes(), ReturnType.BOOLEAN, 1,
                            key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8)));
            return unlockState == null || !unlockState;
        } catch (Exception e) {
            log.error("unLock failed for redis key: {}, value: {}", key, value);
            return false;
        }
    }
}

4.業(yè)務(wù)調(diào)用實現(xiàn)分布式鎖示例:

    @Override
    public int updateById(CustomerBalance customerBalance) throws Exception {
        return redisService.callWithLock(customerBalance, ()-> customerBalanceMapper.updateById(customerBalance));
    }

關(guān)于“怎么在SpringBoot中使用Redis實現(xiàn)分布式鎖”這篇文章的內(nèi)容就介紹到這里,感謝各位的閱讀!相信大家對“怎么在SpringBoot中使用Redis實現(xiàn)分布式鎖”知識都有一定的了解,大家如果還想學習更多知識,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI