您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關(guān)Springboot整合Redis如何實現(xiàn)超賣問題,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
寫一段簡單正常的超賣邏輯代碼,多個用戶同時操作同一段數(shù)據(jù),探究出現(xiàn)的問題。
Redis中存儲一項數(shù)據(jù)信息,請求對應接口,獲取商品數(shù)量信息;
商品數(shù)量信息如果大于0,則扣減1,重新存儲Redis中;
運行代碼測試問題。
/** * Redis數(shù)據(jù)庫操作,超賣問題模擬 * @author * */ @RestController public class RedisController { // 引入String類型redis操作模板 @Autowired private StringRedisTemplate stringRedisTemplate; // 測試數(shù)據(jù)設置接口 @RequestMapping("/setStock") public String setStock() { stringRedisTemplate.opsForValue().set("stock", "100"); return "ok"; } // 模擬商品超賣代碼 @RequestMapping("/deductStock") public String deductStock() { // 獲取Redis數(shù)據(jù)庫中的商品數(shù)量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 減庫存 if(stock > 0) { int realStock = stock -1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣減成功,剩余商品:"+realStock); }else { System.out.println("庫存不足....."); } return "end"; } }
在單應用模式下,使用jmeter
壓測。
測試結(jié)果:
每個請求相當于一個線程,當幾個線程同時拿到數(shù)據(jù)時,線程A拿到庫存為84,這個時候線程B也進入程序,并且搶占了CPU,訪問庫存為84,最后兩個線程都對庫存減一,導致最后修改為83,實際上多賣出去了一件
既然線程和線程之間,數(shù)據(jù)處理不一致,能否使用synchronized
加鎖測試?
依舊還是先測試單服務器
// 模擬商品超賣代碼, // 設置synchronized同步鎖 @RequestMapping("/deductStock1") public String deductStock1() { synchronized (this) { // 獲取Redis數(shù)據(jù)庫中的商品數(shù)量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 減庫存 if(stock > 0) { int realStock = stock -1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣減成功,剩余商品:"+realStock); }else { System.out.println("庫存不足....."); } } return "end"; }
數(shù)量100
重新壓測,得到的日志信息如下所示:
在單機模式下,添加synchronized關(guān)鍵字,的確能夠避免商品的超賣現(xiàn)象!
但是在分布式微服務中,針對該服務設置了集群,synchronized依舊還能保證數(shù)據(jù)的正確性嗎?
假設多個請求,被注冊中心負載均衡,每個微服務中的該處理接口,都添加有synchronized,
依然會出現(xiàn)類似的超賣
問題:
synchronized
只是針對單一服務器
的JVM
進行加鎖
,但是分布式是很多個不同的服務器,導致兩個線程或多個在不同服務器上共同對商品數(shù)量信息做了操作!
在Redis中存在一條命令setnx (set if not exists)
setnx key value
如果不存在key,則可以設置成功;否則設置失敗。
修改處理接口,增加key
// 模擬商品超賣代碼 @RequestMapping("/deductStock2") public String deductStock2() { // 創(chuàng)建一個key,保存至redis String key = "lock"; // setnx // 由于redis是一個單線程,執(zhí)行命令采取“隊列”形式排隊! // 優(yōu)先進入隊列的命令先執(zhí)行,由于是setnx,第一個執(zhí)行后,其他操作執(zhí)行失敗。 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock"); // 當不存在key時,可以設置成功,回執(zhí)true;如果存在key,則無法設置,返回false if (!result) { // 前端監(jiān)測,redis中存在,則不能讓這個搶購操作執(zhí)行,予以提示! return "err"; } // 獲取Redis數(shù)據(jù)庫中的商品數(shù)量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 減庫存 if(stock > 0) { int realStock = stock -1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣減成功,剩余商品:"+realStock); }else { System.out.println("庫存不足....."); } // 程序執(zhí)行完成,則刪除這個key stringRedisTemplate.delete(key); return "end"; }
1、請求進入接口中,如果redis中不存在key,則會新建一個setnx;如果存在,則不會新建,同時返回錯誤編碼,不會繼續(xù)執(zhí)行搶購邏輯。
2、當創(chuàng)建成功后,執(zhí)行搶購邏輯。
3、搶購邏輯執(zhí)行完成后,刪除數(shù)據(jù)庫中對應的setnx
的key
。讓其他請求能夠設置并操作。
這種邏輯來說比之前單一使用syn
合理的多,但是如果執(zhí)行搶購操作中出現(xiàn)了異常,導致這個key
無法被刪除
。以至于其他處理請求,一直無法拿到key
,程序邏輯死鎖!
可以采取try … finally進行操作
/** * 模擬商品超賣代碼 設置 * * @return */ @RequestMapping("/deductStock3") public String deductStock3() { // 創(chuàng)建一個key,保存至redis String key = "lock"; // setnx // 由于redis是一個單線程,執(zhí)行命令采取隊列形式排隊!優(yōu)先進入隊列的命令先執(zhí)行,由于是setnx,第一個執(zhí)行后,其他操作執(zhí)行失敗 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock"); // 當不存在key時,可以設置成功,回執(zhí)true;如果存在key,則無法設置,返回false if (!result) { // 前端監(jiān)測,redis中存在,則不能讓這個搶購操作執(zhí)行,予以提示! return "err"; } try { // 獲取Redis數(shù)據(jù)庫中的商品數(shù)量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 減庫存 if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣減成功,剩余商品:" + realStock); } else { System.out.println("庫存不足....."); } } finally { // 程序執(zhí)行完成,則刪除這個key // 放置于finally中,保證即使上述邏輯出問題,也能del掉 stringRedisTemplate.delete(key); } return "end"; }
這個邏輯相比上面其他的邏輯來說,顯得更加的嚴謹。
但是,如果一套服務器,因為斷電、系統(tǒng)崩潰等原因出現(xiàn)宕機
,導致本該執(zhí)行finally
中的語句未成功執(zhí)行完成?。⊥瑯映霈F(xiàn)key一直存在
,導致死鎖
!
在設置成功setnx
后,以及搶購代碼邏輯執(zhí)行前,增加key的限時。
/** * 模擬商品超賣代碼 設置setnx保證分布式環(huán)境下,數(shù)據(jù)處理安全行問題;<br> * 但如果某個代碼段執(zhí)行異常,導致key無法清理,出現(xiàn)死鎖,添加try...finally;<br> * 如果某個服務因某些問題導致釋放key不能執(zhí)行,導致死鎖,此時解決思路為:增加key的有效時間;<br> * 為了保證設置key的值和設置key的有效時間,兩條命令構(gòu)成同一條原子命令,將下列邏輯換成其他代碼。 * * @return */ @RequestMapping("/deductStock4") public String deductStock4() { // 創(chuàng)建一個key,保存至redis String key = "lock"; // setnx // 由于redis是一個單線程,執(zhí)行命令采取隊列形式排隊!優(yōu)先進入隊列的命令先執(zhí)行,由于是setnx,第一個執(zhí)行后,其他操作執(zhí)行失敗 //boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock"); //讓設置key和設置key的有效時間都可以同時執(zhí)行 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock", 10, TimeUnit.SECONDS); // 當不存在key時,可以設置成功,回執(zhí)true;如果存在key,則無法設置,返回false if (!result) { // 前端監(jiān)測,redis中存在,則不能讓這個搶購操作執(zhí)行,予以提示! return "err"; } // 設置key有效時間 //stringRedisTemplate.expire(key, 10, TimeUnit.SECONDS); try { // 獲取Redis數(shù)據(jù)庫中的商品數(shù)量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 減庫存 if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣減成功,剩余商品:" + realStock); } else { System.out.println("庫存不足....."); } } finally { // 程序執(zhí)行完成,則刪除這個key // 放置于finally中,保證即使上述邏輯出問題,也能del掉 stringRedisTemplate.delete(key); } return "end"; }
但是上述代碼的邏輯中依舊會有問題:
如果處理邏輯中,出現(xiàn)
超時
問題。
當邏輯執(zhí)行時,時間超過設定key有效時間,此時會出現(xiàn)什么問題?
從上圖可以清楚的發(fā)現(xiàn)問題:
如果一個請求執(zhí)行時間超過了key的有效時間。
新的請求執(zhí)行過來時,必然可以拿到key并設置時間;
此時的redis中保存的key并不是請求1的key,而是別的請求設置的。
當請求1執(zhí)行完成后,此處刪除key,刪除的是別的請求設置的key!
依然出現(xiàn)了key形同虛設
的問題!如果失效一直存在,超賣問題依舊不會解決。
既然出現(xiàn)key形同虛設的現(xiàn)象,是否可以增加條件,當finally中需要執(zhí)行刪除操作時,獲取數(shù)據(jù)判斷值是否是該請求中對應的,如果是則刪除,不是則不管!
修改上述代碼如下所示:
/** * 模擬商品超賣代碼 <br> * 解決`deductStock6`中,key形同虛設的問題。 * * @return */ @RequestMapping("/deductStock5") public String deductStock5() { // 創(chuàng)建一個key,保存至redis String key = "lock"; String lock_value = UUID.randomUUID().toString(); // setnx //讓設置key和設置key的有效時間都可以同時執(zhí)行 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, lock_value, 10, TimeUnit.SECONDS); // 當不存在key時,可以設置成功,回執(zhí)true;如果存在key,則無法設置,返回false if (!result) { // 前端監(jiān)測,redis中存在,則不能讓這個搶購操作執(zhí)行,予以提示! return "err"; } try { // 獲取Redis數(shù)據(jù)庫中的商品數(shù)量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 減庫存 if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣減成功,剩余商品:" + realStock); } else { System.out.println("庫存不足....."); } } finally { // 程序執(zhí)行完成,則刪除這個key // 放置于finally中,保證即使上述邏輯出問題,也能del掉 // 判斷redis中該數(shù)據(jù)是否是這個接口處理時的設置的,如果是則刪除 if(lock_value.equalsIgnoreCase(stringRedisTemplate.opsForValue().get(key))) { stringRedisTemplate.delete(key); } } return "end"; }
由于獲得鎖的線程必須執(zhí)行完減庫存邏輯才能釋放鎖,所以在此期間所有其他的線程都會由于沒獲得鎖,而直接結(jié)束程序,導致有很多庫存根本沒有賣出去,所以這里應該可以優(yōu)化,讓沒獲得鎖的線程等待,或者循環(huán)檢查鎖
我們將鎖封裝到一個實體類中,然后加入兩個方法,加鎖和解鎖
@Component public class RedisLock { private final Logger log = LoggerFactory.getLogger(this.getClass()); private final long acquireTimeout = 10*1000; // 獲取鎖之前的超時時間(獲取鎖的等待重試時間) private final int timeOut = 20; // 獲取鎖之后的超時時間(防止死鎖) @Autowired private StringRedisTemplate stringRedisTemplate; // 引入String類型redis操作模板 /** * 獲取分布式鎖 * @return 鎖標識 */ public boolean getRedisLock(String lockName,String lockValue) { // 1.計算獲取鎖的時間 Long endTime = System.currentTimeMillis() + acquireTimeout; // 2.嘗試獲取鎖 while (System.currentTimeMillis() < endTime) { //3. 獲取鎖成功就設置過期時間 讓設置key和設置key的有效時間都可以同時執(zhí)行 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockName, lockValue, timeOut, TimeUnit.SECONDS); if (result) { return true; } } return false; } /** * 釋放分布式鎖 * @param lockName 鎖名稱 * @param lockValue 鎖值 */ public void unRedisLock(String lockName,String lockValue) { if(lockValue.equalsIgnoreCase(stringRedisTemplate.opsForValue().get(lockName))) { stringRedisTemplate.delete(lockName); } } }
@RestController public class RedisController { // 引入String類型redis操作模板 @Autowired private StringRedisTemplate stringRedisTemplate; @Autowired private RedisLock redisLock; @RequestMapping("/setStock") public String setStock() { stringRedisTemplate.opsForValue().set("stock", "100"); return "ok"; } @RequestMapping("/deductStock") public String deductStock() { // 創(chuàng)建一個key,保存至redis String key = "lock"; String lock_value = UUID.randomUUID().toString(); try { boolean redisLock = this.redisLock.getRedisLock(key, lock_value);//獲取鎖 if (redisLock) { // 獲取Redis數(shù)據(jù)庫中的商品數(shù)量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 減庫存 if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣減成功,剩余商品:" + realStock); } else { System.out.println("庫存不足....."); } } } finally { redisLock.unRedisLock(key,lock_value); //釋放鎖 } return "end"; } }
可以看到失敗的線程不會直接結(jié)束,而是會嘗試重試,一直到重試結(jié)束時間,才會結(jié)束
實際上這個最終版依然存在3個問題
1、在finally流程中,由于是先判斷在處理。如果判斷條件結(jié)束后,獲取到的結(jié)果為true。但是在執(zhí)行del操作前,此時jvm在執(zhí)行GC操作(為了保證GC操作獲取GC roots根完全,會暫停java程序),導致程序暫停。GC操作執(zhí)行完成后(暫?;謴秃?,執(zhí)行del操作,但是此時的key還在當前加鎖的key么?
2、問題如圖所示
關(guān)于“Springboot整合Redis如何實現(xiàn)超賣問題”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。