您好,登錄后才能下訂單哦!
小編給大家分享一下使用redis實(shí)現(xiàn)分布式鎖的方法,希望大家閱讀完這篇文章后大所收獲,下面讓我們一起去探討吧!
使用Redis實(shí)現(xiàn)分布式鎖
redis特性介紹
1、支持豐富的數(shù)據(jù)類型,如String、List、Map、Set、ZSet等。
2、支持?jǐn)?shù)據(jù)持久化,RDB和AOF兩種方式
3、支持集群工作模式,分區(qū)容錯(cuò)性強(qiáng)
4、單線程,順序處理命令
5、支持事務(wù)
6、支持發(fā)布與訂閱
Redis實(shí)現(xiàn)分布式鎖使用了SETNX命令:
SETNX key value
將key的值設(shè)為value ,當(dāng)且僅當(dāng)key不存在。
若給定的key已經(jīng)存在,則SETNX不做任何動(dòng)作。
SETNX 是『SET if Not eXists』(如果不存在,則 SET)的簡(jiǎn)寫。
可用版本:>= 1.0.0時(shí)間復(fù)雜度:O(1)返回值:
設(shè)置成功,返回 1 。
設(shè)置失敗,返回 0 。
redis> EXISTS job # job 不存在 (integer) 0 redis> SETNX job "programmer" # job 設(shè)置成功 (integer) 1 redis> SETNX job "code-farmer" # 嘗試覆蓋 job ,失敗 (integer) 0 redis> GET job # 沒(méi)有被覆蓋 "programmer"
首先,我們需要封裝一個(gè)公共的Redis訪問(wèn)工具類。該類需要注入RedisTemplate實(shí)例和ValueOperations實(shí)例,使用ValueOperations實(shí)例是因?yàn)镽edis實(shí)現(xiàn)的分布式鎖使用了最簡(jiǎn)單的String類型。另外,我們需要封裝3個(gè)方法,分別是setIfObsent (String key, String value)、 expire (String key, long timeout, TimeUnit unit) 、delete (String key) ,分別對(duì)應(yīng)Redis的SETNX、expire、del命令。以下是Redis訪問(wèn)工具類的具體實(shí)現(xiàn):
@Component public class RedisDao { @Autowired private RedisTemplate redisTemplate; @Resource(name="redisTemplate") private ValueOperations<Object, Object> valOpsObj; /** * 如果key不存在,就存儲(chǔ)一個(gè)key-value,相當(dāng)于SETNX命令 * @param key 鍵 * @param value 值,可以為空 * @return */ public boolean setIfObsent (String key, String value) { return valOpsObj.setIfAbsent(key, value); } /** * 為key設(shè)置失效時(shí)間 * @param key 鍵 * @param timeout 時(shí)間大小 * @param unit 時(shí)間單位 */ public boolean expire (String key, long timeout, TimeUnit unit) { return redisTemplate.expire(key, timeout, unit); } /** * 刪除key * @param key 鍵 */ public void delete (String key) { redisTemplate.delete(key); } }
完成了Redis訪問(wèn)工具類的實(shí)現(xiàn),現(xiàn)在需要考慮的是如何去模擬競(jìng)爭(zhēng)分布式鎖。因?yàn)镽edis本身就是支持分布式集群的,所以只需要模擬出多線程處理業(yè)務(wù)場(chǎng)景。這里采用線程池來(lái)模擬,以下是測(cè)試類的具體實(shí)現(xiàn):
@RestController @RequestMapping("test") public class TestController { private static final Logger LOG = LoggerFactory.getLogger(TestController.class); //日志對(duì)象 @Autowired private RedisDao redisDao; //定義的分布式鎖key private static final String LOCK_KEY = "MyTestLock"; @RequestMapping(value={"testRedisLock"}, method=RequestMethod.GET) public void testRedisLock () { ExecutorService executorService = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { executorService.submit(new Runnable() { @Override public void run() { //獲取分布式鎖 boolean flag = redisDao.setIfObsent(LOCK_KEY, "lock"); if (flag) { LOG.info(Thread.currentThread().getName() + ":獲取Redis分布式鎖成功"); //獲取鎖成功后設(shè)置失效時(shí)間 redisDao.expire(LOCK_KEY, 2, TimeUnit.SECONDS); try { LOG.info(Thread.currentThread().getName() + ":處理業(yè)務(wù)開(kāi)始"); Thread.sleep(1000); //睡眠1000ms模擬處理業(yè)務(wù) LOG.info(Thread.currentThread().getName() + ":處理業(yè)務(wù)結(jié)束"); //處理業(yè)務(wù)完成后刪除鎖 redisDao.delete(LOCK_KEY); } catch (InterruptedException e) { LOG.error("處理業(yè)務(wù)異常:", e); } } else { LOG.info(Thread.currentThread().getName() + ":獲取Redis分布式鎖失敗"); } } }); } } }
通過(guò)上面這段代碼,可能會(huì)產(chǎn)生以下幾個(gè)疑問(wèn):
線程如果獲取分布式鎖失敗,為什么不嘗試重新獲取鎖?
線程獲取分布式鎖成功后,設(shè)置了鎖的失效時(shí)間,這個(gè)失效時(shí)間長(zhǎng)短如何確定?
線程業(yè)務(wù)處理結(jié)束后,為什么要做刪除鎖的操作?
針對(duì)這幾個(gè)疑問(wèn),我們可以來(lái)討論下。
第一,Redis的SETNX命令,如果key已經(jīng)存在,則不會(huì)做任何操作,所以SETNX實(shí)現(xiàn)的分布式鎖并不是可重入鎖。當(dāng)然,也可以自己通過(guò)代碼實(shí)現(xiàn)重試n次或者直至獲取到分布式鎖為止。但是,這不能保證競(jìng)爭(zhēng)的公平性,某個(gè)線程會(huì)因?yàn)橐恢钡却i而阻塞。因此,Redis實(shí)現(xiàn)的分布式鎖更適用于對(duì)共享資源一寫多讀的場(chǎng)景。
第二,分布式鎖必須設(shè)置失效時(shí)間,而且失效時(shí)間必須大于業(yè)務(wù)處理所需的時(shí)間(保證數(shù)據(jù)一致性)。所以,在測(cè)試階段盡可能準(zhǔn)確的預(yù)測(cè)出業(yè)務(wù)正常處理所需的時(shí)間,設(shè)置失效時(shí)間是防止因?yàn)闃I(yè)務(wù)處理過(guò)程的某些原因?qū)е滤梨i的情況。
第三,業(yè)務(wù)處理結(jié)束,必須要做刪除鎖的操作。
上面設(shè)置分布式鎖和為鎖設(shè)置失效時(shí)間是通過(guò)兩個(gè)操作步驟完成的,更合理的方式應(yīng)該是把設(shè)置分布式鎖和為鎖設(shè)置失效時(shí)間通過(guò)一個(gè)操作完成。要么都成功,要么都失敗。實(shí)現(xiàn)代碼如下:
/** * Redis訪問(wèn)工具類 */ @Component public class RedisDao { private static Logger logger = LoggerFactory.getLogger(RedisDao.class); @Autowired private StringRedisTemplate stringRedisTemplate; /** * 設(shè)置分布式鎖 * @param key 鍵 * @param value 值 * @param timeout 失效時(shí)間 * @return */ public boolean setDistributeLock (String key, String value, long timeout) { RedisConnection connection = null; boolean flag = false; try { //獲取一個(gè)連接 connection = stringRedisTemplate.getConnectionFactory().getConnection(); //設(shè)置分布式鎖的同時(shí)為鎖設(shè)置失效時(shí)間 connection.set(key.getBytes(), value.getBytes(), Expiration.seconds(timeout), RedisStringCommands.SetOption.SET_IF_ABSENT); flag = true; } catch (Exception e) { logger.error("set automic lock error:", e); } finally { //使用后關(guān)閉連接 connection.close(); } return flag; } /** * 查詢key的失效時(shí)間 * @param key 鍵 * @param timeUnit 時(shí)間單位 * @return */ public long ttl (String key, TimeUnit timeUnit) { return stringRedisTemplate.getExpire(key, timeUnit); } } /** * 單元測(cè)試類 */ @RunWith(SpringRunner.class) @SpringBootTest public class Demo1ApplicationTests { private static final Logger LOG = LoggerFactory.getLogger(Demo1ApplicationTests.class); @Autowired private RedisDao redisDao; @Test public void testDistributeLock () { String key = "MyDistributeLock"; //設(shè)置分布式鎖,失效時(shí)間20s boolean result = redisDao.setDistributeLock(key, "1", 20); if (result) { LOG.info("設(shè)置分布式鎖成功"); long ttl = redisDao.ttl(key, TimeUnit.SECONDS); LOG.info("{}距離失效還有{}s", key, ttl); } } }
運(yùn)行單元測(cè)試類,結(jié)果如下:
2019-05-15 13:07:10.827 - 設(shè)置分布式鎖成功 2019-05-15 13:07:10.838 - MyDistributeLock距離失效還有19s
看完了這篇文章,相信你對(duì)使用redis實(shí)現(xiàn)分布式鎖的方法有了一定的了解,想了解更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。