您好,登錄后才能下訂單哦!
這篇文章主要介紹“如何理解分布式系統(tǒng)下基于Redis的分布式鎖”,在日常操作中,相信很多人在如何理解分布式系統(tǒng)下基于Redis的分布式鎖問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”如何理解分布式系統(tǒng)下基于Redis的分布式鎖”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!
新接手的項(xiàng)目,偶爾會出現(xiàn)賬不平的問題。之前的技術(shù)老大臨走時(shí)給的解釋是:排查了,沒找到原因,之后太忙就沒再解決,可能是框架的原因……
既然項(xiàng)目交付到手中,這樣的問題是必須要解決的。梳理了所有賬務(wù)處理邏輯,最終找到了原因:數(shù)據(jù)庫并發(fā)操作熱點(diǎn)賬戶導(dǎo)致。就這這個(gè)問題,來聊一聊分布式系統(tǒng)下基于Redis的分布式鎖。順便也分解一下問題形成原因及解決方案。
系統(tǒng)并發(fā)量并不高,存在熱點(diǎn)賬戶,但也不至于那么嚴(yán)重。問題的根源在于系統(tǒng)架構(gòu)設(shè)計(jì),人為的制造了并發(fā)。場景是這樣的:商戶批量導(dǎo)入一批數(shù)據(jù),系統(tǒng)會進(jìn)行前置處理,并對賬戶余額進(jìn)行增減。
此時(shí),另外一個(gè)定時(shí)任務(wù),也會對賬戶進(jìn)行掃描更新。而且對同一賬戶的操作分布到各個(gè)系統(tǒng)當(dāng)中,熱點(diǎn)賬戶也就出現(xiàn)了。
針對此問題的解決方案,從架構(gòu)層面可以考慮將賬務(wù)系統(tǒng)進(jìn)行抽離,集中在一個(gè)系統(tǒng)中進(jìn)行處理,所有的數(shù)據(jù)庫事務(wù)及執(zhí)行順序由賬務(wù)系統(tǒng)來統(tǒng)籌處理。從技術(shù)方面來講,則可以通過鎖機(jī)制來對熱點(diǎn)賬戶進(jìn)行加鎖。
本篇文章就針對熱點(diǎn)賬戶基于分布式鎖的實(shí)現(xiàn)方式進(jìn)行詳細(xì)的講解。
在Java的多線程環(huán)境下,通常有幾類鎖可以使用:
JVM內(nèi)存模型級別的鎖,常用的有:synchronized、Lock等;
數(shù)據(jù)庫鎖,比如樂觀鎖,悲觀鎖等;
分布式鎖;
JVM內(nèi)存級別的鎖,可以保證單體服務(wù)下線程的安全性,比如多個(gè)線程訪問/修改一個(gè)全局變量。但當(dāng)系統(tǒng)進(jìn)行集群部署時(shí),JVM級別的本地鎖就無能為力了。
像上述案例中,熱點(diǎn)賬戶就屬于分布式系統(tǒng)中的共享資源,我們通常會采用數(shù)據(jù)庫鎖或分布式鎖來進(jìn)行解決。
數(shù)據(jù)庫鎖,又分為樂觀鎖和悲觀鎖。
悲觀鎖是基于數(shù)據(jù)庫(Mysql的InnoDB)提供的排他鎖來實(shí)現(xiàn)的。在進(jìn)行事務(wù)操作時(shí),通過select ... for update語句,MySQL會對查詢結(jié)果集中每行數(shù)據(jù)都添加排他鎖,其他線程對該記錄的更新與刪除操作都會阻塞。從而達(dá)到共享資源的順序執(zhí)行(修改);
樂觀鎖是相對悲觀鎖而言的,樂觀鎖假設(shè)數(shù)據(jù)一般情況不會造成沖突,所以在數(shù)據(jù)進(jìn)行提交更新的時(shí)候,才會正式對數(shù)據(jù)的沖突與否進(jìn)行檢測。如果沖突則返回給用戶異常信息,讓用戶決定如何去做。樂觀鎖適用于讀多寫少的場景,這樣可以提高程序的吞吐量。在樂觀鎖實(shí)現(xiàn)時(shí)通常會基于記錄狀態(tài)或添加version版本來進(jìn)行實(shí)現(xiàn)。
項(xiàng)目中使用了悲觀鎖,但悲觀鎖卻失效了。這也是使用悲觀鎖時(shí),常見的誤區(qū),下面來分析一下。
正常使用悲觀鎖的流程:
通過select ... for update鎖定記錄;
計(jì)算新余額,修改金額并存儲;
執(zhí)行完成釋放鎖;
經(jīng)常犯錯(cuò)的處理流程:
查詢賬戶余額,計(jì)算新余額;
通過select ... for update鎖定記錄;
修改金額并存儲;
執(zhí)行完成釋放鎖;
錯(cuò)誤的流程中,比如A和B服務(wù)查詢到的余額都是100,A扣減50,B扣減40,然后A鎖定記錄,更新數(shù)據(jù)庫為50;A釋放鎖之后,B鎖定記錄,更新數(shù)據(jù)庫為60。顯然,后者把前者的更新給覆蓋掉了。解決的方案就是擴(kuò)大鎖的范圍,將鎖提前到計(jì)算新余額之前。
通常悲觀鎖對數(shù)據(jù)庫的壓力是非常大的,在實(shí)踐中通常會根據(jù)場景使用樂觀鎖或分布式鎖等方式來實(shí)現(xiàn)。
下面進(jìn)入正題,講講基于Redis的分布式鎖實(shí)現(xiàn)。
這里以Spring Boot、Redis、Lua腳本為例來演示分布式鎖的實(shí)現(xiàn)。為了簡化處理,示例中Redis既承擔(dān)了分布式鎖的功能,也承擔(dān)了數(shù)據(jù)庫的功能。
集群環(huán)境下,對同一個(gè)賬戶的金額進(jìn)行操作,基本步驟:
從數(shù)據(jù)庫讀取用戶金額;
程序修改金額;
再將最新金額存儲到數(shù)據(jù)庫;
下面從最初不加鎖,不同步處理,逐步推演出最終的分布式鎖。
準(zhǔn)備一個(gè)不加鎖處理的基礎(chǔ)業(yè)務(wù)環(huán)境。
首先在Spring Boot項(xiàng)目中引入相關(guān)依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
賬戶對應(yīng)實(shí)體類UserAccount:
public class UserAccount { //用戶ID private String userId; //賬戶內(nèi)金額 private int amount; //添加賬戶金額 public void addAmount(int amount) { this.amount = this.amount + amount; } // 省略構(gòu)造方法和getter/setter }
創(chuàng)建一個(gè)線程實(shí)現(xiàn)類AccountOperationThread:
public class AccountOperationThread implements Runnable { private final static Logger logger = LoggerFactory.getLogger(AccountOperationThread.class); private static final Long RELEASE_SUCCESS = 1L; private String userId; private RedisTemplate<Object, Object> redisTemplate; public AccountOperationThread(String userId, RedisTemplate<Object, Object> redisTemplate) { this.userId = userId; this.redisTemplate = redisTemplate; } @Override public void run() { noLock(); } /** * 不加鎖 */ private void noLock() { try { Random random = new Random(); // 模擬線程進(jìn)行業(yè)務(wù)處理 TimeUnit.MILLISECONDS.sleep(random.nextInt(100) + 1); } catch (InterruptedException e) { e.printStackTrace(); } //模擬數(shù)據(jù)庫中獲取用戶賬號 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId); // 金額+1 userAccount.addAmount(1); logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount()); //模擬存回?cái)?shù)據(jù)庫 redisTemplate.opsForValue().set(userId, userAccount); } }
其中RedisTemplate的實(shí)例化交給了Spring Boot:
@Configuration public class RedisConfig { @Bean public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(objectMapper); // 設(shè)置value的序列化規(guī)則和 key的序列化規(guī)則 redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.afterPropertiesSet(); return redisTemplate; } }
最后,再準(zhǔn)備一個(gè)TestController來進(jìn)行觸發(fā)多線程的運(yùn)行:
@RestController public class TestController { private final static Logger logger = LoggerFactory.getLogger(TestController.class); private static ExecutorService executorService = Executors.newFixedThreadPool(10); @Autowired private RedisTemplate<Object, Object> redisTemplate; @GetMapping("/test") public String test() throws InterruptedException { // 初始化用戶user_001到Redis,賬戶金額為0 redisTemplate.opsForValue().set("user_001", new UserAccount("user_001", 0)); // 開啟10個(gè)線程進(jìn)行同步測試,每個(gè)線程為賬戶增加1元 for (int i = 0; i < 10; i++) { logger.info("創(chuàng)建線程i=" + i); executorService.execute(new AccountOperationThread("user_001", redisTemplate)); } // 主線程休眠1秒等待線程跑完 TimeUnit.MILLISECONDS.sleep(1000); // 查詢Redis中的user_001賬戶 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get("user_001"); logger.info("user id : " + userAccount.getUserId() + " amount : " + userAccount.getAmount()); return "success"; } }
執(zhí)行上述程序,正常來說10個(gè)線程,每個(gè)線程加1,結(jié)果應(yīng)該是10。但多執(zhí)行幾次,會發(fā)現(xiàn),結(jié)果變化很大,基本上都要比10小。
[pool-1-thread-5] c.s.redis.thread.AccountOperationThread : pool-1-thread-5 : user id : user_001 amount : 1 [pool-1-thread-4] c.s.redis.thread.AccountOperationThread : pool-1-thread-4 : user id : user_001 amount : 1 [pool-1-thread-3] c.s.redis.thread.AccountOperationThread : pool-1-thread-3 : user id : user_001 amount : 1 [pool-1-thread-1] c.s.redis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 1 [pool-1-thread-1] c.s.redis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 2 [pool-1-thread-2] c.s.redis.thread.AccountOperationThread : pool-1-thread-2 : user id : user_001 amount : 2 [pool-1-thread-5] c.s.redis.thread.AccountOperationThread : pool-1-thread-5 : user id : user_001 amount : 2 [pool-1-thread-4] c.s.redis.thread.AccountOperationThread : pool-1-thread-4 : user id : user_001 amount : 3 [pool-1-thread-1] c.s.redis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 4 [pool-1-thread-3] c.s.redis.thread.AccountOperationThread : pool-1-thread-3 : user id : user_001 amount : 5 [nio-8080-exec-1] c.s.redis.controller.TestController : user id : user_001 amount : 5
以上述日志為例,前四個(gè)線程都將值改為1,也就是后面三個(gè)線程都將前面的修改進(jìn)行了覆蓋,導(dǎo)致最終結(jié)果不是10,只有5。這顯然是有問題的。
針對上面的情況,在同一個(gè)JVM當(dāng)中,我們可以通過線程加鎖來完成。但在分布式環(huán)境下,JVM級別的鎖是沒辦法實(shí)現(xiàn)的,這里可以采用Redis同步鎖實(shí)現(xiàn)。
基本思路:第一個(gè)線程進(jìn)入時(shí),在Redis中進(jìn)記錄,當(dāng)后續(xù)線程過來請求時(shí),判斷Redis是否存在該記錄,如果存在則說明處于鎖定狀態(tài),進(jìn)行等待或返回。如果不存在,則進(jìn)行后續(xù)業(yè)務(wù)處理。
/** * 1.搶占資源時(shí)判斷是否被鎖。 * 2.如未鎖則搶占成功且加鎖,否則等待鎖釋放。 * 3.業(yè)務(wù)完成后釋放鎖,讓給其它線程。 * <p> * 該方案并未解決同步問題,原因:線程獲得鎖和加鎖的過程,并非原子性操作,可能會導(dǎo)致線程A獲得鎖,還未加鎖時(shí),線程B也獲得了鎖。 */ private void redisLock() { Random random = new Random(); try { TimeUnit.MILLISECONDS.sleep(random.nextInt(1000) + 1); } catch (InterruptedException e) { e.printStackTrace(); } while (true) { Object lock = redisTemplate.opsForValue().get(userId + ":syn"); if (lock == null) { // 獲得鎖 -> 加鎖 -> 跳出循環(huán) logger.info(Thread.currentThread().getName() + ":獲得鎖"); redisTemplate.opsForValue().set(userId + ":syn", "lock"); break; } try { // 等待500毫秒重試獲得鎖 TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } try { //模擬數(shù)據(jù)庫中獲取用戶賬號 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId); if (userAccount != null) { //設(shè)置金額 userAccount.addAmount(1); logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount()); //模擬存回?cái)?shù)據(jù)庫 redisTemplate.opsForValue().set(userId, userAccount); } } finally { //釋放鎖 redisTemplate.delete(userId + ":syn"); logger.info(Thread.currentThread().getName() + ":釋放鎖"); } }
在while代碼塊中,先判斷對應(yīng)用戶ID是否在Redis中存在,如果不存在,則進(jìn)行set加鎖,如果存在,則跳出循環(huán)繼續(xù)等待。
上述代碼,看起來實(shí)現(xiàn)了加鎖的功能,但當(dāng)執(zhí)行程序時(shí),會發(fā)現(xiàn)與未加鎖一樣,依舊存在并發(fā)問題。原因是:獲取鎖和加鎖的操作并不是原子的。比如兩個(gè)線程發(fā)現(xiàn)lock都是null,都進(jìn)行了加鎖,此時(shí)并發(fā)問題依舊存在。
針對上述問題,可將獲取鎖和加鎖的過程原子化處理?;趕pring-boot-data-redis提供的原子化API可以實(shí)現(xiàn):
// 該方法使用了redis的指令:SETNX key value // 1.key不存在,設(shè)置成功返回value,setIfAbsent返回true; // 2.key存在,則設(shè)置失敗返回null,setIfAbsent返回false; // 3.原子性操作; Boolean setIfAbsent(K var1, V var2);
上述方法的原子化操作是對Redis的setnx命令的封裝,在Redis中setnx的使用如下實(shí)例:
redis> SETNX mykey "Hello" (integer) 1 redis> SETNX mykey "World" (integer) 0 redis> GET mykey "Hello"
第一次,設(shè)置mykey時(shí),并不存在,則返回1,表示設(shè)置成功;第二次設(shè)置mykey時(shí),已經(jīng)存在,則返回0,表示設(shè)置失敗。再次查詢mykey對應(yīng)的值,會發(fā)現(xiàn)依舊是第一次設(shè)置的值。也就是說redis的setnx保證了唯一的key只能被一個(gè)服務(wù)設(shè)置成功。
理解了上述API及底層原理,來看看線程中的實(shí)現(xiàn)方法代碼如下:
/** * 1.原子操作加鎖 * 2.競爭線程循環(huán)重試獲得鎖 * 3.業(yè)務(wù)完成釋放鎖 */ private void atomicityRedisLock() { //Spring data redis 支持的原子性操作 while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", "lock")) { try { // 等待100毫秒重試獲得鎖 TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } logger.info(Thread.currentThread().getName() + ":獲得鎖"); try { //模擬數(shù)據(jù)庫中獲取用戶賬號 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId); if (userAccount != null) { //設(shè)置金額 userAccount.addAmount(1); logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount()); //模擬存回?cái)?shù)據(jù)庫 redisTemplate.opsForValue().set(userId, userAccount); } } finally { //釋放鎖 redisTemplate.delete(userId + ":syn"); logger.info(Thread.currentThread().getName() + ":釋放鎖"); } }
再次執(zhí)行代碼,會發(fā)現(xiàn)結(jié)果正確了,也就是說可以成功的對分布式線程進(jìn)行了加鎖。
雖然上述代碼執(zhí)行結(jié)果沒問題,但如果應(yīng)用異常宕機(jī),沒來得及執(zhí)行finally中釋放鎖的方法,那么其他線程則永遠(yuǎn)無法獲得這個(gè)鎖。
此時(shí)可采用setIfAbsent的重載方法:
Boolean setIfAbsent(K var1, V var2, long var3, TimeUnit var5);
基于該方法,可以設(shè)置鎖的過期時(shí)間。這樣即便獲得鎖的線程宕機(jī),在Redis中數(shù)據(jù)過期之后,其他線程可正常獲得該鎖。
示例代碼如下:
private void atomicityAndExRedisLock() { try { //Spring data redis 支持的原子性操作,并設(shè)置5秒過期時(shí)間 while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", System.currentTimeMillis() + 5000, 5000, TimeUnit.MILLISECONDS)) { // 等待100毫秒重試獲得鎖 logger.info(Thread.currentThread().getName() + ":嘗試循環(huán)獲取鎖"); TimeUnit.MILLISECONDS.sleep(1000); } logger.info(Thread.currentThread().getName() + ":獲得鎖--------"); // 應(yīng)用在這里宕機(jī),進(jìn)程退出,無法執(zhí)行 finally; Thread.currentThread().interrupt(); // 業(yè)務(wù)邏輯... } catch (InterruptedException e) { e.printStackTrace(); } finally { //釋放鎖 if (!Thread.currentThread().isInterrupted()) { redisTemplate.delete(userId + ":syn"); logger.info(Thread.currentThread().getName() + ":釋放鎖"); } } }
上面添加了Redis所的超時(shí)時(shí)間,看似解決了問題,但又引入了新的問題。
比如,正常情況下線程A在5秒內(nèi)可正常處理完業(yè)務(wù),但偶發(fā)會出現(xiàn)超過5秒的情況。如果將超時(shí)時(shí)間設(shè)置為5秒,線程A獲得了鎖,但業(yè)務(wù)邏輯處理需要6秒。此時(shí),線程A還在正常業(yè)務(wù)邏輯,線程B已經(jīng)獲得了鎖。當(dāng)線程A處理完時(shí),有可能將線程B的鎖給釋放掉。
在上述場景中有兩個(gè)問題點(diǎn):
第一,線程A和線程B可能會同時(shí)在執(zhí)行,存在并發(fā)問題。
第二,線程A可能會把線程B的鎖給釋放掉,導(dǎo)致一系列的惡性循環(huán)。
當(dāng)然,可以通過在Redis中設(shè)置value值來判斷鎖是屬于線程A還是線程B。但仔細(xì)分析會發(fā)現(xiàn),這個(gè)問題的本質(zhì)是因?yàn)榫€程A執(zhí)行業(yè)務(wù)邏輯耗時(shí)超出了鎖超時(shí)的時(shí)間。
那么就有兩個(gè)解決方案了:
第一,將超時(shí)時(shí)間設(shè)置的足夠長,確保業(yè)務(wù)代碼能夠在鎖釋放之前執(zhí)行完成;
第二,為鎖添加守護(hù)線程,為將要過期釋放但未釋放的鎖增加時(shí)間;
第一種方式需要全行大多數(shù)情況下業(yè)務(wù)邏輯的耗時(shí),進(jìn)行超時(shí)時(shí)間的設(shè)定。
第二種方式,可通過如下守護(hù)線程的方式來動態(tài)增加鎖超時(shí)時(shí)間。
public class DaemonThread implements Runnable { private final static Logger logger = LoggerFactory.getLogger(DaemonThread.class); // 是否需要守護(hù) 主線程關(guān)閉則結(jié)束守護(hù)線程 private volatile boolean daemon = true; // 守護(hù)鎖 private String lockKey; private RedisTemplate<Object, Object> redisTemplate; public DaemonThread(String lockKey, RedisTemplate<Object, Object> redisTemplate) { this.lockKey = lockKey; this.redisTemplate = redisTemplate; } @Override public void run() { try { while (daemon) { long time = redisTemplate.getExpire(lockKey, TimeUnit.MILLISECONDS); // 剩余有效期小于1秒則續(xù)命 if (time < 1000) { logger.info("守護(hù)進(jìn)程: " + Thread.currentThread().getName() + " 延長鎖時(shí)間 5000 毫秒"); redisTemplate.expire(lockKey, 5000, TimeUnit.MILLISECONDS); } TimeUnit.MILLISECONDS.sleep(300); } logger.info(" 守護(hù)進(jìn)程: " + Thread.currentThread().getName() + "關(guān)閉 "); } catch (InterruptedException e) { e.printStackTrace(); } } // 主線程主動調(diào)用結(jié)束 public void stop() { daemon = false; } }
上述線程每隔300毫秒獲取一下Redis中鎖的超時(shí)時(shí)間,如果小于1秒,則延長5秒。當(dāng)主線程調(diào)用關(guān)閉時(shí),守護(hù)線程也隨之關(guān)閉。
主線程中相關(guān)代碼實(shí)現(xiàn):
private void deamonRedisLock() { //守護(hù)線程 DaemonThread daemonThread = null; //Spring data redis 支持的原子性操作,并設(shè)置5秒過期時(shí)間 String uuid = UUID.randomUUID().toString(); String value = Thread.currentThread().getId() + ":" + uuid; try { while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", value, 5000, TimeUnit.MILLISECONDS)) { // 等待100毫秒重試獲得鎖 logger.info(Thread.currentThread().getName() + ":嘗試循環(huán)獲取鎖"); TimeUnit.MILLISECONDS.sleep(1000); } logger.info(Thread.currentThread().getName() + ":獲得鎖----"); // 開啟守護(hù)線程 daemonThread = new DaemonThread(userId + ":syn", redisTemplate); Thread thread = new Thread(daemonThread); thread.start(); // 業(yè)務(wù)邏輯執(zhí)行10秒... TimeUnit.MILLISECONDS.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } finally { //釋放鎖 這里也需要原子操作,今后通過 Redis + Lua 講 String result = (String) redisTemplate.opsForValue().get(userId + ":syn"); if (value.equals(result)) { redisTemplate.delete(userId + ":syn"); logger.info(Thread.currentThread().getName() + ":釋放鎖-----"); } //關(guān)閉守護(hù)線程 if (daemonThread != null) { daemonThread.stop(); } } }
其中在獲得鎖之后,開啟守護(hù)線程,在finally中將守護(hù)線程關(guān)閉。
在上述邏輯中,我們是基于spring-boot-data-redis提供的原子化操作來保證鎖判斷和執(zhí)行的原子化的。在非Spring Boot項(xiàng)目中,則可以基于Lua腳本來實(shí)現(xiàn)。
首先定義加鎖和解鎖的Lua腳本及對應(yīng)的DefaultRedisScript
對象,在RedisConfig
配置類中添加如下實(shí)例化代碼:
@Configuration public class RedisConfig { //lock script private static final String LOCK_SCRIPT = " if redis.call('setnx',KEYS[1],ARGV[1]) == 1 " + " then redis.call('expire',KEYS[1],ARGV[2]) " + " return 1 " + " else return 0 end "; private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call" + "('del', KEYS[1]) else return 0 end"; // ... 省略部分代碼 @Bean public DefaultRedisScript<Boolean> lockRedisScript() { DefaultRedisScript<Boolean> defaultRedisScript = new DefaultRedisScript<>(); defaultRedisScript.setResultType(Boolean.class); defaultRedisScript.setScriptText(LOCK_SCRIPT); return defaultRedisScript; } @Bean public DefaultRedisScript<Long> unlockRedisScript() { DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>(); defaultRedisScript.setResultType(Long.class); defaultRedisScript.setScriptText(UNLOCK_SCRIPT); return defaultRedisScript; } }
再通過在AccountOperationThread
類中新建構(gòu)造方法,將上述兩個(gè)對象傳入類中(省略此部分演示)。然后,就可以基于RedisTemplate
來調(diào)用了,改造之后的代碼實(shí)現(xiàn)如下:
private void deamonRedisLockWithLua() { //守護(hù)線程 DaemonThread daemonThread = null; //Spring data redis 支持的原子性操作,并設(shè)置5秒過期時(shí)間 String uuid = UUID.randomUUID().toString(); String value = Thread.currentThread().getId() + ":" + uuid; try { while (!redisTemplate.execute(lockRedisScript, Collections.singletonList(userId + ":syn"), value, 5)) { // 等待1000毫秒重試獲得鎖 logger.info(Thread.currentThread().getName() + ":嘗試循環(huán)獲取鎖"); TimeUnit.MILLISECONDS.sleep(1000); } logger.info(Thread.currentThread().getName() + ":獲得鎖----"); // 開啟守護(hù)線程 daemonThread = new DaemonThread(userId + ":syn", redisTemplate); Thread thread = new Thread(daemonThread); thread.start(); // 業(yè)務(wù)邏輯執(zhí)行10秒... TimeUnit.MILLISECONDS.sleep(10000); } catch (InterruptedException e) { logger.error("異常", e); } finally { //使用Lua腳本:先判斷是否是自己設(shè)置的鎖,再執(zhí)行刪除 // key存在,當(dāng)前值=期望值時(shí),刪除key;key存在,當(dāng)前值!=期望值時(shí),返回0; Long result = redisTemplate.execute(unlockRedisScript, Collections.singletonList(userId + ":syn"), value); logger.info("redis解鎖:{}", RELEASE_SUCCESS.equals(result)); if (RELEASE_SUCCESS.equals(result)) { if (daemonThread != null) { //關(guān)閉守護(hù)線程 daemonThread.stop(); logger.info(Thread.currentThread().getName() + ":釋放鎖---"); } } } }
其中while循環(huán)中加鎖和finally中的釋放鎖都是基于Lua腳本來實(shí)現(xiàn)了。
除了上述實(shí)例,在使用Redis分布式鎖時(shí),還可以考慮以下情況及方案。
當(dāng)線程在持有鎖的情況下再次請求加鎖,如果一個(gè)鎖支持一個(gè)線程多次加鎖,那么這個(gè)鎖就是可重入的。如果一個(gè)不可重入鎖被再次加鎖,由于該鎖已經(jīng)被持有,再次加鎖會失敗。Redis可通過對鎖進(jìn)行重入計(jì)數(shù),加鎖時(shí)加 1,解鎖時(shí)減 1,當(dāng)計(jì)數(shù)歸 0時(shí)釋放鎖。
可重入鎖雖然高效但會增加代碼的復(fù)雜性,這里就不舉例說明了。
有的業(yè)務(wù)場景,發(fā)現(xiàn)被鎖則直接返回。但有的場景下,客戶端需要等待鎖釋放然后去搶鎖。上述示例就屬于后者。針對等待鎖釋放也有兩種方案:
客戶端輪訓(xùn):當(dāng)未獲得鎖時(shí),等待一段時(shí)間再重新獲取,直到成功。上述示例就是基于這種方式實(shí)現(xiàn)的。這種方式的缺點(diǎn)也很明顯,比較耗費(fèi)服務(wù)器資源,當(dāng)并發(fā)量大時(shí)會影響服務(wù)器的效率。
使用Redis的訂閱發(fā)布功能:當(dāng)獲取鎖失敗時(shí),訂閱鎖釋放消息,獲取鎖成功后釋放時(shí),發(fā)送釋放消息。
在Redis包含主從同步的集群部署方式中,如果主節(jié)點(diǎn)掛掉,從節(jié)點(diǎn)提升為主節(jié)點(diǎn)。如果客戶端A在主節(jié)點(diǎn)加鎖成功,指令還未同步到從節(jié)點(diǎn),此時(shí)主節(jié)點(diǎn)掛掉,從節(jié)點(diǎn)升為主節(jié)點(diǎn),新的主節(jié)點(diǎn)中沒有鎖的數(shù)據(jù)。這種情況下,客戶端B就可能加鎖成功,從而出現(xiàn)并發(fā)的場景。
當(dāng)集群發(fā)生腦裂時(shí),Redis master節(jié)點(diǎn)跟slave 節(jié)點(diǎn)和 sentinel 集群處于不同的網(wǎng)絡(luò)分區(qū)。sentinel集群無法感知到master的存在,會將 slave 節(jié)點(diǎn)提升為 master 節(jié)點(diǎn),此時(shí)就會存在兩個(gè)不同的 master 節(jié)點(diǎn)。從而也會導(dǎo)致并發(fā)問題的出現(xiàn)。Redis Cluster集群部署方式同理。
到此,關(guān)于“如何理解分布式系統(tǒng)下基于Redis的分布式鎖”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。