溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Redisson如何實現(xiàn)分布式鎖、鎖續(xù)約

發(fā)布時間:2023-03-07 11:52:05 來源:億速云 閱讀:291 作者:iii 欄目:開發(fā)技術(shù)

這篇文章主要介紹了Redisson如何實現(xiàn)分布式鎖、鎖續(xù)約的相關(guān)知識,內(nèi)容詳細(xì)易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇Redisson如何實現(xiàn)分布式鎖、鎖續(xù)約文章都會有所收獲,下面我們一起來看看吧。

一、基礎(chǔ)

0)Redisson版本說明、案例

使用當(dāng)前(2022年12月初)最新的版本:3.18.1;

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.18.1</version>
</dependency>

案例

案例采用redis-cluster集群的方式;

public class Main {
    public static void main(String[] args) throws Exception {
        // 1.配置Redis-Cluster集群節(jié)點的ip和port 
        Config config = new Config();
        config.useClusterServers()
                .addNodeAddress("redis://127.0.0.1:7001")
                .addNodeAddress("redis://127.0.0.1:7002")
                .addNodeAddress("redis://127.0.0.1:7003")
                .addNodeAddress("redis://127.0.0.1:7004");
        // 2.創(chuàng)建Redisson的客戶端 
        RedissonClient redisson = Redisson.create(config);
        // 3.測試Redisson可重?鎖的加鎖、釋放鎖
        testLock(redisson);
    }

    private static void testLock(RedissonClient redisson) throws InterruptedException {
        // 1.獲取key為"anyLock"的鎖對象
        final RLock lock = redisson.getLock("test_lock");
        boolean locked = true;
        try {
            //2.1:加鎖 
            lock.lock();
            // 2.2:加鎖,并設(shè)置嘗試獲取鎖超時時間30s、鎖超時?動釋放的時間10s 
//            locked = lock.tryLock(30, 10, TimeUnit.SECONDS);
            if (locked)
                System.out.println("加鎖成功!" + new Date());
            
            Thread.sleep(20 * 1000);
            System.out.println("鎖邏輯執(zhí)行完畢!" + new Date());

        } finally {
            // 3.釋放鎖 
            lock.unlock();
        }
    }
}

1)Redisson連接Redis的方式

redission支持4種連接redis方式,分別為單機、主從、Sentinel、Cluster 集群;在分布式鎖的實現(xiàn)上區(qū)別在于hash槽的獲取方式。

具體配置方式見Redisson的GitHub

Redisson如何實現(xiàn)分布式鎖、鎖續(xù)約

2)用到的Redis命令

分布式鎖主要需要以下redis命令:

EXISTS key:當(dāng) key 存在,返回1;不存在,返回0。

GETSET key value:將給定 key 的值設(shè)為 value ,并返回 key 的舊值 (old value);當(dāng) key 存在但不是字符串類型時,返回一個錯誤;當(dāng)key不存在時,返回nil。

GET key:返回 key 所關(guān)聯(lián)的字符串值,如果 key 不存在那么返回 nil。

DEL key [KEY &hellip;]:刪除給定的一個或多個 key(不存在的 key 會被忽略),返回實際刪除的key的個數(shù)(integer)。

DEL key1 key2 key3

HSET key field value:給一個key 設(shè)置一個{field=value}的組合值,如果key沒有就直接賦值并返回1;如果field已有,那么就更新value的值,并返回0。

HEXISTS key field:當(dāng)key中存儲著field的時候返回1,如果key或者field有一個不存在返回0。

HINCRBY key field increment:將存儲在key中的哈希(Hash)對象中的指定字段field的值加上增量increment;

如果鍵key不存在,一個保存了哈希對象{field=value}的key將被創(chuàng)建;如果字段field不存在,在進行當(dāng)前操作前,feild將被創(chuàng)建,且對應(yīng)的值被置為0;返回值是increment。

PEXPIRE key milliseconds:設(shè)置存活時間,單位是毫秒。EXPIRE操作單位是秒。

PUBLISH channel message:向channel post一個message內(nèi)容的消息,返回接收消息的客戶端數(shù)。

3)用到的lua腳本語義

Redisson源碼中,執(zhí)行redis命令的是lua腳本,其中主要有如下幾個概念:

  • redis.call():執(zhí)行redis命令。

  • KEYS[n]:指腳本中第n個參數(shù),比如KEYS[1]指腳本中的第一個參數(shù)。

  • ARGV[n]:指腳本中第n個參數(shù)的值,比如ARGV[1]指腳本中的第一個參數(shù)的值。

  • 返回值中nil與false同一個意思。

在redis執(zhí)行l(wèi)ua腳本時,相當(dāng)于一個redis級別的鎖,不能執(zhí)行其他操作,類似于原子操作,這也是redisson實現(xiàn)的一個關(guān)鍵點。

另外,如果lua腳本執(zhí)行過程中出現(xiàn)了異?;蛘遰edis服務(wù)器宕機了,會將腳本中已經(jīng)執(zhí)行的命令在AOF、RDB日志中刪除;即LUA腳本執(zhí)行報錯會進行回滾操作。

二、源碼分析

1、RLock

Redisson如何實現(xiàn)分布式鎖、鎖續(xù)約

RLock接口主要繼承了Lock接口,并擴展了部分方法,比如:tryLock(long waitTime, long leaseTime, TimeUnit unit)方法中加入的leaseTime參數(shù),用來設(shè)置鎖的過期時間,如果超過leaseTime還沒有解鎖的話,redis就強制解鎖;leaseTime的默認(rèn)時間是30s。

獲取RLock對象

RLock lock = redissonClient.getLock("test_lock");

RLock對象表示?個鎖對象,我們要某一個key加鎖時,需要先獲取?個鎖對象。

Redisson如何實現(xiàn)分布式鎖、鎖續(xù)約

這里并沒有具體請求Redis進行加鎖的邏輯,而只是調(diào)用RedissonLock的構(gòu)造函數(shù),設(shè)置一些變量。

2、加鎖流程

進入到Rlock#lock()方法,先看主流程;關(guān)于競爭鎖等待時間、鎖超時釋放時間的配置、使用,在流程中穿插著聊。

0)加鎖流程圖

Redisson如何實現(xiàn)分布式鎖、鎖續(xù)約

1)加鎖到哪臺機器

lock()方法執(zhí)行鏈路:

Redisson如何實現(xiàn)分布式鎖、鎖續(xù)約

走到這里,已經(jīng)可以看到加鎖的底層邏輯:LUA腳本。

而lua腳本只是??串字符串,作為evalWriteAsync()?法的?個參數(shù)?已;所以下?步進到evalWriteAsync()?法中:

Redisson如何實現(xiàn)分布式鎖、鎖續(xù)約

走到這里會調(diào)用ConnectionManager#getEntry(String)方法;

在創(chuàng)建RedissonClient時,筆者配置的是Redis-Cluster,而走到這里卻會進入到MasterSlaveConnectionManager,實際上實例化的ConnectionManager就是RedisCluster模式下的ClusterConnectionManager,而ClusterConnectionManager繼承自MasterSlaveConnectionManager,并且ClusterConnectionManager沒有重寫getEntry(String)方法,所以會進入到MasterSlaveConnectionManager#getEntry(String)方法。

ConnectionManager#getEntry(String)方法會根據(jù)傳入的key名稱找到相應(yīng)的Redis節(jié)點、目標(biāo)master。

Redis-Cluster集群中的數(shù)據(jù)分布式是 通過?個?個的hash slot來實現(xiàn)的,Redis-Cluster集群總共16384個hash slot,它們都 會被均勻分布到所有的master節(jié)點上;這里ClusterConnectionManager通過key名稱計算出相應(yīng)的hash slot方式如下:

?先通過key計算出CRC16值,然后 CRC16值對16384進?取模,進?得到hash slot。

@Override
public int calcSlot(String key) {
    if (key == null) {
        return 0;
    }

    int start = key.indexOf('{');
    if (start != -1) {
        int end = key.indexOf('}');
        if (end != -1 && start + 1 < end) {
            key = key.substring(start + 1, end);
        }
    }

    int result = CRC16.crc16(key.getBytes()) % MAX_SLOT;
    log.debug("slot {} for {}", result, key);
    return result;
}

這?計算出key的hash slot之后,就可以通過hash slot 去看?看哪個master上有這個hash slot,如果某個master上有個這個hash slot,那么這個 key當(dāng)然就會落到該master節(jié)點上,執(zhí)?加鎖指令也就應(yīng)該在該master上執(zhí)?。

下面進入本文重點,可重入鎖的各種加鎖、釋放鎖。

2)Client第一次加鎖

在尋找應(yīng)該在哪臺Redis機器上加鎖時,在RedissonLock#tryLockInnerAsync()方法中我們看到了一堆LUA腳本:

Redisson如何實現(xiàn)分布式鎖、鎖續(xù)約

LUA腳本參數(shù)解析:

  • KEYS[1] 表示的是 getName() ,即鎖key的名稱,比如案例中的 test_lock;

  • ARGV[1] 表示的是 internalLockLeaseTime 默認(rèn)值是30s;

  • ARGV[2] 表示的是 getLockName(threadId) ,唯一標(biāo)識當(dāng)前訪問線程,使用鎖對象id+線程id(UUID:ThreadId)方式表示,用于區(qū)分不同服務(wù)器上的線程。

    • UUID用來唯?標(biāo)識?個客戶端,因為會有多個客戶端的多個線程加鎖;

    • 結(jié)合起來的UUID:ThreadId 表示:具體哪個客戶端上的哪個線程過來加鎖,通 過這樣的組合?式唯?標(biāo)識?個線程。

LUA腳本邏輯:

  • 如果鎖名稱不存在;

    • 則向redis中添加一個key為test_lock的HASH結(jié)構(gòu)、添加一個field為線程id,值=1的鍵值對{field:increment},表示此線程的重入次數(shù)為1;

    • 設(shè)置test_lock的過期時間,防止當(dāng)前服務(wù)器出問題后導(dǎo)致死鎖,然后return nil; end;返回nil,lua腳本執(zhí)行完畢;

  • 如果鎖存在,檢測當(dāng)前線程是否持有鎖;

    • 如果是當(dāng)前線程持有鎖,hincrby將該線程重入的次數(shù)++;并重新設(shè)置鎖的過期時間;返回nil,lua腳本執(zhí)行完畢;

    • 如果不是當(dāng)前線程持有鎖,pttl返回鎖的過期時間,單位ms。

第一次加鎖時,key肯定不存在與master節(jié)點上;

會執(zhí)行下列LUA腳本對應(yīng)的Redis指令:

hset test_lock UUID:ThreadId 1 
pexpire test_lock 30000

此時,Redis中多一個Hash結(jié)構(gòu)的key(test_lock):

test_lock : 
{
    UUID:ThreadId:1
}

這里的1使用來做鎖重入的。

pexpire指令為test_lock這個key設(shè)置過期時間為30s,即:30s后這個key會?動過期被刪除,key對應(yīng)的鎖在那時也就被釋放了。

總體來看,加鎖的邏輯很簡單:

在key對應(yīng)的hash數(shù)據(jù)結(jié)構(gòu)中記錄了? 下當(dāng)前是哪個客戶端的哪個線程過來加鎖了,然后設(shè)置了?下key的過期時間為30s。 3)加鎖成功之后的鎖續(xù)約

成功加鎖后,lua腳本返回nil,即null。

Redisson如何實現(xiàn)分布式鎖、鎖續(xù)約

加鎖成功之后,tryLockInnerAsync()?法返回;再結(jié)合Java8的Stream,對加鎖結(jié)果進一步處理;

因為加鎖成功后返回的是nil,這是lua腳本的返回形式,體現(xiàn)到j(luò)ava代碼中的返回值為:null。
又由于RLock#lock()方法傳入的leaseTime是-1,所以進入到scheduleExpirationRenewal(long)方法做鎖續(xù)約。

Redisson如何實現(xiàn)分布式鎖、鎖續(xù)約

renewExpirationAsync()方法負(fù)責(zé)做具體的鎖續(xù)約:

protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return 0;",
            Collections.singletonList(getRawName()),
            internalLockLeaseTime, getLockName(threadId));
}

這里L(fēng)UA腳本的邏輯很簡單:

  • 判斷當(dāng)前key中,是否還被線程UUID:ThreadId持有鎖,持有則設(shè)置過期時間為30s(續(xù)命)。

鎖續(xù)約(看門狗機制)其實就是每次加鎖成功后,會?上開啟?個后臺線程, 每隔10s檢查?下key是否存在,如果存在就為key續(xù)期30s。

  • 這里的10s,取自配置的lockWatchdogTimeout參數(shù),默認(rèn)為30 * 1000 ms;

  • 所以?個key往往當(dāng)過期時間慢慢消逝到20s左右時就?會被定時任務(wù)重置為了30s,這樣就能保證:只要這個定時任務(wù)還在、這個key還在,就?直維持加鎖。

如果當(dāng)前持有鎖的線程被中斷了,會停止鎖續(xù)約,即殺死看門狗;

Redisson如何實現(xiàn)分布式鎖、鎖續(xù)約

protected void cancelExpirationRenewal(Long threadId) {
    ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (task == null) {
        return;
    }
    
    if (threadId != null) {
        task.removeThreadId(threadId);
    }

    if (threadId == null || task.hasNoThreads()) {
        Timeout timeout = task.getTimeout();
        if (timeout != null) {
            timeout.cancel();
        }
        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
    }
}

所謂的停止鎖續(xù)約,實際就是將當(dāng)前線程的threadId從看門狗緩存中移除,后續(xù)在執(zhí)行鎖續(xù)約時,如果發(fā)現(xiàn)看門狗緩存中已經(jīng)沒有了當(dāng)前線程threadId,則直接退出鎖續(xù)約 并且 不再延時10s開啟一個定時任務(wù)。

如果加鎖時指定了leaseTime > 0,則不會開門狗機制,表示強制鎖leaseTime 毫秒后過期。一共有三種加鎖方式可以做到,如下:

  • RLock#lock(long leaseTime, TimeUnit unit)

  • RLock#tryLock(long waitTime, long leaseTime, TimeUnit unit)

  • RLock#lockInterruptibly(long leaseTime, TimeUnit unit)

4)重入加鎖(相同線程多次加鎖)

再次回到加鎖的LUA腳本:

Redisson如何實現(xiàn)分布式鎖、鎖續(xù)約

同一個線程對分布式鎖多次加鎖時,會走以下邏輯:

  • 判斷當(dāng)前key是否被當(dāng)前線程持有,如果是則增加鎖重入的次數(shù),并重新設(shè)置鎖的過期時間為30s;

對應(yīng)的Redis命令為:

hexists test_lock UUID:ThreadId
hincrby test_lock UUID:ThreadId 1
pexpire test_lock 30000

此時Redis中test_key對應(yīng)的數(shù)據(jù)結(jié)構(gòu)從

test_lock : 
{
    UUID:ThreadId:1
}

變成:

test_lock : 
{
    UUID:ThreadId:2
}

并將key的過期時間重新設(shè)置為30s。

鎖重入成功之后,后臺也會開啟?個watchdog后臺線程做鎖續(xù)約,每隔10s檢查?下key,如果key存在就將key的過期時間重新設(shè)置為30s。

Redisson可重?加鎖的語義,實際是通過Hash結(jié)構(gòu)的key中某個線程(UUID:ThreadId)對應(yīng)的加鎖次數(shù)來表示的。

5)鎖競爭(其他線程加鎖失敗)

再再次回到加鎖的LUA腳本:

Redisson如何實現(xiàn)分布式鎖、鎖續(xù)約

如果分布式鎖已經(jīng)被其他線程持有,LUA腳本會執(zhí)行以下邏輯:

返回當(dāng)前key的剩余存活時間,因為不是返回nil,也就代表著加鎖失??;

對應(yīng)的Redis的命令為:

pttl test_lock

針對加鎖方式的不同,加鎖失敗的邏輯也不同;可以分兩大類:指定了加鎖失敗的等待時間waitTime和未指定waitTime。

  • 未執(zhí)行加鎖失敗的等待時間waitTime:獲取分布式鎖失敗會一直重試,直到獲取鎖成功。比如下列加鎖方法:

    • Rlock#lock():一直嘗試獲取分布式鎖,直到獲取鎖成功。

    • RLock#lockInterruptibly(long leaseTime, TimeUnit unit)

    • RLock#lock(long leaseTime, TimeUnit unit)

  • 指定了加鎖失敗的等待時間waitTime:獲取分布式鎖會超時,超時之后返回加鎖失??;

    • Rlock#tryLock(long waitTime, TimeUnit unit):指定獲取鎖失敗的等待時間。在等待時間范圍之內(nèi)進行重試,超時則返回加鎖失敗。

    • Rlock#tryLock(long waitTime, long leaseTime, TimeUnit unit):同樣是指定獲取鎖失敗的等待時間,并且強制指定鎖過期的時間(不開啟看門狗)。在等待時間范圍之內(nèi)進行重試,超時則返回加鎖失敗。

可以簡單的概述為RLock接口下的tryLock()方法獲取鎖會失敗,lock()方法獲取鎖一定會成功。

1> 一直重試直到加鎖成功

Rlock#lock()方法為例:

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return;
    }

    CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
    pubSub.timeout(future);
    RedissonLockEntry entry;
    if (interruptibly) {
        entry = commandExecutor.getInterrupted(future);
    } else {
        entry = commandExecutor.get(future);
    }

    try {
        while (true) {
            // lock() 或 lockInterruptibly()為入口走到這里時。leaseTime為-1,表示會開始開門狗;如果leaseTime大于0,則不會開啟開門狗;
            ttl = tryAcquire(-1, leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                break;
            }

            // waiting for message
            if (ttl >= 0) {
                try {
                    // 因為Semaphore的可用資源為0,所以這里就等價于Thread.sleep(ttl);
                    entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    if (interruptibly) {
                        throw e;
                    }
                    entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
            } else {
                if (interruptibly) {
                    entry.getLatch().acquire();
                } else {
                    entry.getLatch().acquireUninterruptibly();
                }
            }
        }
    } finally {
        unsubscribe(entry, threadId);
    }
}

首先訂閱解鎖channel(命名格式:redisson_lock__channel:{keyName}),其他線程解鎖后,會發(fā)布解鎖的消息;這里收到消息會立即嘗試獲取鎖;訂閱解鎖channel的超時時間默認(rèn)為7.5s。也就說獲取鎖失敗7.5s之內(nèi),如果其他線程釋放鎖,當(dāng)前線程可以立即嘗試獲取到鎖。

獲取鎖失敗之后會進??個while死循環(huán)中:

每休息鎖的存活時間ttl之后,就嘗試去獲取鎖,直到成功獲取到鎖才會跳出while死循環(huán)。

2> 等待鎖超時返回加鎖失敗

Rlock#tryLock(long waitTime, TimeUnit unit)為例:

@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return true;
    }

    // 獲取鎖剩余的等待時長
    time -= System.currentTimeMillis() - current;
    if (time <= 0) {
        // 獲取鎖超時,返回獲取分布式鎖失敗
        acquireFailed(waitTime, unit, threadId);
        return false;
    }
    
    current = System.currentTimeMillis();
    CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    try {
        // 訂閱解鎖channel的超時時長為 獲取鎖剩余的等待時長
        subscribeFuture.get(time, TimeUnit.MILLISECONDS);
    } catch (TimeoutException e) {
        if (!subscribeFuture.completeExceptionally(new RedisTimeoutException(
                "Unable to acquire subscription lock after " + time + "ms. " +
                        "Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {
            subscribeFuture.whenComplete((res, ex) -> {
                if (ex == null) {
                    unsubscribe(res, threadId);
                }
            });
        }
        acquireFailed(waitTime, unit, threadId);
        return false;
    } catch (ExecutionException e) {
        acquireFailed(waitTime, unit, threadId);
        return false;
    }

    try {
        // 收到解鎖channel的消息之后,走到這里,再次判斷獲取鎖等待時長是否超時
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
    
        // while循環(huán)中嘗試去獲取鎖
        while (true) {
            long currentTime = System.currentTimeMillis();
            ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                return true;
            }

            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }

            // waiting for message
            currentTime = System.currentTimeMillis();
            if (ttl >= 0 && ttl < time) {
                // 如果獲取鎖失敗后,鎖存活時長 小于 剩余鎖等待時長,則線程睡眠 鎖存活時長
                commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                // 如果獲取鎖失敗后,鎖存活時間 大于等于 剩余鎖等待時長,則線程睡眠 鎖等待時長
                commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
            }

            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
        }
    } finally {
        unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
    }
}

加鎖存在超時時間 相比于 一直重試直到加鎖成功,只是多一個時間限制,具體差異體現(xiàn)在:訂閱解鎖channel的超時時長、獲取鎖失敗后線程的睡眠時長、重試獲取鎖次數(shù)的限制;

獲取分布式鎖失敗之后,立即判斷當(dāng)前獲取鎖是否超時,如果超時,則返回加鎖失敗;
否者,訂閱解鎖channel(命名格式:redisson_lock__channel:{keyName}),其他線程解鎖后,會發(fā)布解鎖的消息;
訂閱解鎖channel的超時時間為 獲取鎖剩余的等待時長。 在這個時間范圍之內(nèi),如果其他線程釋放鎖,當(dāng)前線程收到解鎖channel的消息之后再次判斷獲取鎖是否超時,如果不超時,嘗試獲取鎖。
獲取鎖之后會進??個while死循環(huán)中: 如果獲取鎖超時,則返回加鎖失?。?/blockquote>
否者讓線程睡眠: 如果鎖存活時長ttl 小于 剩余鎖等待時長,則線程睡眠 鎖存活時長;
如果鎖存活時間ttl 大于等于 剩余鎖等待時長,則線程睡眠 鎖等待時長;
線程睡眠完之后,判斷獲取鎖是否超時,不超時則嘗試去獲取鎖。

3、釋放鎖流程

1)Client主動嘗試釋放鎖

進入到Rlock#unlock()方法;

Redisson如何實現(xiàn)分布式鎖、鎖續(xù)約

和加鎖的方式?樣,釋放鎖也是通過lua腳本來完成的;

LUA腳本參數(shù)解析:

  • KEYS[1] 表示的是 getName() ,代表的是鎖名 test_lock;

  • KEYS[2] 表示getChanelName() 表示的是發(fā)布訂閱過程中使用的Chanel;

  • ARGV[1] 表示的是LockPubSub.unLockMessage,解鎖消息,實際代表的是數(shù)字 0,代表解鎖消息;

  • ARGV[2] 表示的是internalLockLeaseTime 默認(rèn)的有效時間 30s;

  • ARGV[3] 表示的是 getLockName(thread.currentThread().getId()) 代表的是 UUID:ThreadId 用鎖對象id+線程id, 表示當(dāng)前訪問線程,用于區(qū)分不同服務(wù)器上的線程。

LUA腳本邏輯:

  • 如果鎖名稱不存在;

  • 可能是因為鎖過期導(dǎo)致鎖不存在,也可能是并發(fā)解鎖。

  • 則發(fā)布鎖解除的消息,返回1,lua腳本執(zhí)行完畢;

  • 如果鎖存在,檢測當(dāng)前線程是否持有鎖;

  • 如果是當(dāng)前線程持有鎖,定義變量counter,接收執(zhí)行incrby將該線程重入的次數(shù)&ndash;的結(jié)果;

  • 如果重入次數(shù)大于0,表示該線程還有其他任務(wù)需要執(zhí)行;重新設(shè)置鎖的過期時間;返回0,lua腳本執(zhí)行完畢;

  • 否則表示該線程執(zhí)行結(jié)束,del刪除該鎖;并且publish發(fā)布該鎖解除的消息;返回1,lua腳本執(zhí)行完畢;

  • 如果不是當(dāng)前線程持有鎖 或 其他情況,都返回nil,lua腳本執(zhí)行完畢。

腳本執(zhí)行結(jié)束之后,如果返回值不是0或1,即當(dāng)前線程去釋放其他線程的加鎖時,拋出異常。

通過LUA腳本釋放鎖成功之后,會將看門狗殺死;

Redisson如何實現(xiàn)分布式鎖、鎖續(xù)約

2)Client主動強制釋放鎖

forceUnlockAsync()方法被調(diào)用的地方很多,大多都是在清理資源時刪除鎖。

@Override
public RFuture<Boolean> forceUnlockAsync() {
    cancelExpirationRenewal(null);
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('del', KEYS[1]) == 1) then "
                    + "redis.call('publish', KEYS[2], ARGV[1]); "
                    + "return 1 "
                    + "else "
                    + "return 0 "
                    + "end",
            Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE);
}

LUA腳本邏輯:

邏輯比較簡單粗暴:刪除鎖成功則并發(fā)布鎖被刪除的消息,返回1結(jié)束,否則返回0結(jié)束。

3)Client宕機,鎖超時釋放

如果Redisson客戶端剛加鎖成功,并且未指定releaseTime,后臺會啟動一個定時任務(wù)watchdog每隔10s檢查key:key如果存在就為它?動續(xù)命到30s;在watchdog定時任務(wù)存在的情況下,如果不是主動釋放鎖,那么key將會?直的被watchdog這個定時任務(wù)維持加鎖。

但是如果客戶端宕機了,定時任務(wù)watchdog也就沒了,也就沒有鎖續(xù)約機制了,那么過完30s之后,key會?動被刪除、key對應(yīng)的鎖也自動被釋放了。

4)不啟動鎖續(xù)約的超時釋放鎖

如果在加鎖時指定了leaseTime,加鎖成功之后,后臺并不會啟動一個定時任務(wù)watchdog做鎖續(xù)約;key存活leaseTime 毫秒之后便會自動被刪除、key對應(yīng)的鎖也就自動被釋放了;無論當(dāng)前線程的業(yè)務(wù)邏輯是否執(zhí)行完畢。

比如使用如下方式加鎖:

  • RLock#lock(long leaseTime, TimeUnit unit)

  • RLock#tryLock(long waitTime, long leaseTime, TimeUnit unit)

  • RLock#lockInterruptibly(long leaseTime, TimeUnit unit)

關(guān)于“Redisson如何實現(xiàn)分布式鎖、鎖續(xù)約”這篇文章的內(nèi)容就介紹到這里,感謝各位的閱讀!相信大家對“Redisson如何實現(xiàn)分布式鎖、鎖續(xù)約”知識都有一定的了解,大家如果還想學(xué)習(xí)更多知識,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI