您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關(guān)分布式鎖如何在Redis數(shù)據(jù)庫中使用,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。
模擬一個電商里面下單減庫存的場景。
1.首先在redis里加入商品庫存數(shù)量。
2.新建一個Spring Boot項目,在pom里面引入相關(guān)的依賴。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
3.接下來,在application.yml配置redis屬性和指定應(yīng)用的端口號:
server: port: 8090 spring: redis: host: 192.168.0.60 port: 6379
4.新建一個Controller類,扣減庫存第一版代碼:
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.Objects; @RestController public class StockController { private static final Logger logger = LoggerFactory.getLogger(StockController.class); @Resource private StringRedisTemplate stringRedisTemplate; @RequestMapping("/reduceStock") public String reduceStock() { // 從redis中獲取庫存數(shù)量 int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount"))); if (stock > 0) { // 減庫存 int restStock = stock - 1; // 剩余庫存再重新設(shè)置到redis中 stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock)); logger.info("扣減成功,剩余庫存:{}", restStock); } else { logger.info("庫存不足,扣減失敗。"); } return "success"; } }
上面第一版的代碼存在什么問題:超賣。假如多個線程同時調(diào)用獲取庫存數(shù)量的代碼,那么每個線程拿到的都是100,判斷庫存都大于0,都可以執(zhí)行減庫存的操作。假如兩個線程都做減庫存更新緩存,那么緩存的庫存變成99,但實際上,應(yīng)該是減掉2個庫存。
那么很多人的第一個想法是加synchronized同步代碼塊,因為獲取數(shù)量和減庫存不是原子性操作,有多個線程來執(zhí)行代碼的時候,只允許一個線程執(zhí)行代碼塊里的代碼。那么改完的第二版的代碼如下:
@RequestMapping("/reduceStock") public String reduceStock() { synchronized (this) { // 從redis中獲取庫存數(shù)量 int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount"))); if (stock > 0) { // 減庫存 int restStock = stock - 1; // 剩余庫存再重新設(shè)置到redis中 stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock)); logger.info("扣減成功,剩余庫存:{}", restStock); } else { logger.info("庫存不足,扣減失敗。"); } } return "success"; }
但使用synchronize存在的問題,就是只能保證單機環(huán)境運行時沒有問題的。但現(xiàn)在的軟件公司里,基本上都是集群架構(gòu),是多實例,前面使用Nginx做負載均衡,大概架構(gòu)如下:
Nginx分發(fā)請求,把請求發(fā)送到不同的Tomcat容器,而synchronize只能保證一個應(yīng)用是沒有問題的。
那么代碼改進第三版,就是引入redis分布式鎖,具體代碼如下:
@RequestMapping("/reduceStock") public String reduceStock() { String lockKey = "stockKey"; try { boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1"); if (!result) { return "errorCode"; } // 從redis中獲取庫存數(shù)量 int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount"))); if (stock > 0) { // 減庫存 int restStock = stock - 1; // 剩余庫存再重新設(shè)置到redis中 stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock)); logger.info("扣減成功,剩余庫存:{}", restStock); } else { logger.info("庫存不足,扣減失敗。"); } } finally { stringRedisTemplate.delete(lockKey) } return "success"; }
如果有一個線程拿到鎖,那么其他的線程就會等待。一定要記得在finally里面把使用完的鎖要刪除掉。否則一旦拋出異常,只有一個線程會一直持有鎖,其他線程沒有機會獲取。
但如果在執(zhí)行if (stock > 0) {
代碼塊里的代碼,因為宕機或重啟沒有執(zhí)行完,也會一直持有鎖,所以,這里需要把鎖加一個超時時間:
boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1"); stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
但如果上面兩行代碼在中間執(zhí)行出問題了,設(shè)置超時時間的代碼還沒執(zhí)行,也會出現(xiàn)鎖不能釋放的問題。好在有對應(yīng)的方法:就是把上面兩行代碼設(shè)置成一個原子操作:
// 這里默認設(shè)置超時時間為10秒 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
到此為止,如果并發(fā)量不是很大的話,基本上是沒有問題的。
但是,如果請求的并發(fā)量很大,就會出現(xiàn)新的問題:有種比較特殊的情況,第一個線程執(zhí)行了15秒,但是執(zhí)行到10秒鐘的時候,鎖已經(jīng)失效釋放了,那么在高并發(fā)場景下,第二個線程發(fā)現(xiàn)鎖已經(jīng)失效,那么它就可以拿到這把鎖進行加鎖,
假設(shè)第二個線程執(zhí)行需要8秒,它執(zhí)行到5秒鐘后,此時第一個線程已經(jīng)執(zhí)行完了,執(zhí)行完那一刻,進行了刪除key的操作,但是此時的鎖是第二個線程加的,這樣第一個線程把第二個線程加的鎖刪掉了。
那意味著第三個線程又可以拿到鎖,第三個線程執(zhí)行了3秒鐘,此時第二個線程執(zhí)行完畢,那么第二個線程把第三個線程的鎖又刪除了。導(dǎo)致鎖失效。
那么解決的思路就是,我自己加的鎖,不要被別人刪掉。那么可以為每個進來的請求生成一個唯一的id,作為分布式鎖的值,然后在釋放時,判斷一下當(dāng)前線程的id,是不是和緩存里的id是否相等。
@RequestMapping("/reduceStock") public String reduceStock() { String lockKey = "stockKey"; String id = UUID.randomUUID().toString(); try { // 這里默認設(shè)置超時時間為30秒 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, id, 30, TimeUnit.SECONDS); if (!result) { return "errorCode"; } // 從redis中獲取庫存數(shù)量 int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount"))); if (stock > 0) { // 減庫存 int restStock = stock - 1; // 剩余庫存再重新設(shè)置到redis中 stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock)); logger.info("扣減成功,剩余庫存:{}", restStock); } else { logger.info("庫存不足,扣減失敗。"); } } finally { if (id.contentEquals(Objects.requireNonNull(stringRedisTemplate.opsForValue().get(lockKey)))) { stringRedisTemplate.delete(lockKey); } } return "success"; }
到此為止,一個比較完善的鎖就實現(xiàn)了,可以應(yīng)付大部分場景。
當(dāng)然,上面的代碼還有一個問題,就是一個線程執(zhí)行時間超過了過期時間,后面的代碼還沒有執(zhí)行完,鎖就已經(jīng)刪除了,還是會有些bug存在。解決的方法是給鎖續(xù)命的操作。
在當(dāng)前主線程獲取到鎖以后,可以fork出一個線程,執(zhí)行Timer定時器操作,假如默認超時時間為30秒,那么定時器每隔10秒去看下這把鎖還是否存在,存在就說明這個鎖里的邏輯還沒有執(zhí)行完,那么就可以把當(dāng)前主線程的超時時間重新設(shè)置為30秒;如果不存在,就直接結(jié)束掉。
但是上面的邏輯,在高并發(fā)場景下,實現(xiàn)比較完善還是比較困難的。好在現(xiàn)在已經(jīng)有比較成熟的框架,那就是Redisson。官方地址https://redisson.org。
下面用Redisson來實現(xiàn)分布式鎖。
首先引入依賴包:
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.6.5</version> </dependency>
配置類:
@Configuration public class RedissonConfig { @Bean public Redisson redisson() { // 單機模式 Config config = new Config(); config.useSingleServer().setAddress("redis://192.168.0.60:6379").setDatabase(0); return (Redisson) Redisson.create(config); } }
接下來用redisson重寫上面的減庫存操作:
@Resource private Redisson redisson; @RequestMapping("/reduceStock") public String reduceStock() { String lockKey = "stockKey"; RLock redissonLock = redisson.getLock(lockKey); try { // 加鎖,鎖續(xù)命 redissonLock.lock(); // 從redis中獲取庫存數(shù)量 int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount"))); if (stock > 0) { // 減庫存 int restStock = stock - 1; // 剩余庫存再重新設(shè)置到redis中 stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock)); logger.info("扣減成功,剩余庫存:{}", restStock); } else { logger.info("庫存不足,扣減失敗。"); } } finally { redissonLock.unlock(); } return "success"; }
其實就是三個步驟:獲取鎖,加鎖,釋放鎖。
先簡單看下Redisson的實現(xiàn)原理:
這里先說一下Redis很多操作使用Lua腳本來實現(xiàn)原子性操作,關(guān)于Lua語法,可以去網(wǎng)上找下相關(guān)教程。
使用Lua腳本的好處有:
1.減少網(wǎng)絡(luò)開銷,多個命令可以使用一次請求完成;
2.實現(xiàn)了原子性操作,Redis會把Lua腳本作為一個整體去執(zhí)行;
3.實現(xiàn)事務(wù),Redis自帶的事務(wù)功能有限,而Lua腳本實現(xiàn)了事務(wù)的常規(guī)操作,而且還支持回滾。
但是Lua實際上不會使用很多,如果Lua腳本執(zhí)行時間過長,因為Redis是單線程,因此會導(dǎo)致堵塞。
最后,說下Redisson分布式鎖的代碼實現(xiàn),
找到上面的redissonLock.lock();
lock方法點進去,一直點到RedissonLock類里面的lockInterruptibly方法:
@Override public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { // 獲取線程id long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } RFuture<RedissonLockEntry> future = subscribe(threadId); commandExecutor.syncSubscription(future); try { while (true) { ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().acquire(); } } } finally { unsubscribe(future, threadId); } // get(lockAsync(leaseTime, unit)); }
重點看下tryAcquire方法,把線程id作為一個參數(shù)傳遞進來,在這個方法里面,找到tryLockInnerAsync方法點進去,
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
這里就是一堆Lua腳本,先看第一個if命令,先去判斷 KEYS[1](就是對應(yīng)的鎖key的名字),如果不存在,在hashmap里,設(shè)置一個屬性為線程id,值為1,再把map的過期時間設(shè)置為internalLockLeaseTime,這個值默認是30秒,
上面的操作對應(yīng)的命令是:
hset keyname id:thread 1 pexpire keyname 30
然后返回nil,相當(dāng)于null,那程序return了。
另外,Redisson還支持重入鎖,那第二個if就是執(zhí)行重入鎖的操作,會判斷鎖是否存在,并且傳入的線程id是否是當(dāng)前線程的id,若果是,支持重復(fù)加鎖進行自增操作;
如果是其他線程調(diào)用lock方法,上面兩個if判斷不會走,會返回鎖剩余過期時間。
接著返回到tryAcquireAsync方法里面往下看:
實際上是加了一個監(jiān)聽器,在監(jiān)聽器里面有個很重要的方法scheduleExpirationRenewal,一看這個名字就能大概猜出是什么功能,
里面有個定時任務(wù)的輪詢,
private void scheduleExpirationRenewal(final long threadId) { if (expirationRenewalMap.containsKey(getEntryName())) { return; } Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { // 判斷傳遞進來的線程id是否是我們之前主線程設(shè)置的id,如果是,則增加續(xù)命,增加30秒。 RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); future.addListener(new FutureListener<Boolean>() { @Override public void operationComplete(Future<Boolean> future) throws Exception { expirationRenewalMap.remove(getEntryName()); if (!future.isSuccess()) { log.error("Can't update lock " + getName() + " expiration", future.cause()); return; } if (future.getNow()) { // reschedule itself scheduleExpirationRenewal(threadId); } } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) { task.cancel(); } }
接著推遲10秒鐘(internalLockLeaseTime / 3),再執(zhí)行續(xù)命操作邏輯。
到最后,再回到lockInterruptibly方法,如果ttl 為null,說明加鎖成功了,就返回null,那如果其他線程的話,就會返回剩余過期時間,那么就會進入到while死循環(huán)里,一直嘗試加鎖,調(diào)用tryAcquire方法,在瑣失效以后,再會嘗試獲取加鎖。
看完上述內(nèi)容,你們對分布式鎖如何在Redis數(shù)據(jù)庫中使用有進一步的了解嗎?如果還想了解更多知識或者相關(guān)內(nèi)容,請關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。