溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Go結(jié)合Redis怎么實(shí)現(xiàn)分布式鎖

發(fā)布時(shí)間:2022-01-21 09:41:47 來源:億速云 閱讀:139 作者:小新 欄目:開發(fā)技術(shù)

這篇文章主要介紹了Go結(jié)合Redis怎么實(shí)現(xiàn)分布式鎖,具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

    單Redis實(shí)例場(chǎng)景

    如果熟悉Redis的命令,可能會(huì)馬上想到使用Redis的set if not exists操作來實(shí)現(xiàn),并且現(xiàn)在標(biāo)準(zhǔn)的實(shí)現(xiàn)方式是SET resource_name my_random_value NX PX 30000這串命令,其中:

    • resource_name表示要鎖定的資源

    • NX表示如果不存在則設(shè)置

    • PX 30000表示過期時(shí)間為30000毫秒,也就是30秒

    • my_random_value這個(gè)值在所有的客戶端必須是唯一的,所有同一key的獲取者(競爭者)這個(gè)值都不能一樣。

    value的值必須是隨機(jī)數(shù)主要是為了更安全的釋放鎖,釋放鎖的時(shí)候使用腳本告訴Redis:只有key存在并且存儲(chǔ)的值和我指定的值一樣才能告訴我刪除成功。可以通過以下Lua腳本實(shí)現(xiàn):

    if redis.call("get",KEYS[1]) == ARGV[1] then
        return redis.call("del",KEYS[1])
    else
        return 0
    end

    舉個(gè)例子:客戶端A取得資源鎖,但是緊接著被一個(gè)其他操作阻塞了,當(dāng)客戶端A運(yùn)行完畢其他操作后要釋放鎖時(shí),原來的鎖早已超時(shí)并且被Redis自動(dòng)釋放,并且在這期間資源鎖又被客戶端B再次獲取到。

    使用Lua腳本是因?yàn)榕袛嗪蛣h除是兩個(gè)操作,所以有可能A剛判斷完鎖就過期自動(dòng)釋放了,然后B就獲取到了鎖,然后A又調(diào)用了Del,導(dǎo)致把B的鎖給釋放了。

    加解鎖示例

    package main
    
    import (
       "context"
       "errors"
       "fmt"
       "github.com/brianvoe/gofakeit/v6"
       "github.com/go-redis/redis/v8"
       "sync"
       "time"
    )
    
    var client *redis.Client
    
    const unlockScript = `
    if redis.call("get",KEYS[1]) == ARGV[1] then
        return redis.call("del",KEYS[1])
    else
        return 0
    end`
    
    func lottery(ctx context.Context) error {
       // 加鎖
       myRandomValue := gofakeit.UUID()
       resourceName := "resource_name"
       ok, err := client.SetNX(ctx, resourceName, myRandomValue, time.Second*30).Result()
       if err != nil {
          return err
       }
       if !ok {
          return errors.New("系統(tǒng)繁忙,請(qǐng)重試")
       }
       // 解鎖
       defer func() {
          script := redis.NewScript(unlockScript)
          script.Run(ctx, client, []string{resourceName}, myRandomValue)
       }()
    
       // 業(yè)務(wù)處理
       time.Sleep(time.Second)
       return nil
    }
    
    func main() {
       client = redis.NewClient(&redis.Options{
          Addr: "127.0.0.1:6379",
       })
       var wg sync.WaitGroup
       wg.Add(2)
       go func() {
          defer wg.Done()
          ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
          err := lottery(ctx)
          if err != nil {
             fmt.Println(err)
          }
       }()
       go func() {
          defer wg.Done()
          ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
          err := lottery(ctx)
          if err != nil {
             fmt.Println(err)
          }
       }()
       wg.Wait()
    }

    我們先看lottery()函數(shù),這里模擬一個(gè)抽獎(jiǎng)操作,在進(jìn)入函數(shù)時(shí),先使用SET resource_name my_random_value NX PX 30000加鎖,這里使用UUID作為隨機(jī)值,如果操作失敗,直接返回,讓用戶重試,如果成功在defer里面執(zhí)行解鎖邏輯,解鎖邏輯就是執(zhí)行前面說到得lua腳本,然后再進(jìn)行業(yè)務(wù)處理。

    我們?cè)趍ain()函數(shù)里面執(zhí)行了兩個(gè)goroutine并發(fā)調(diào)用lottery()函數(shù),其中有一個(gè)操作會(huì)因?yàn)槟貌坏芥i而直接失敗。

    小結(jié)

    • 生成隨機(jī)值

    • 使用SET resource_name my_random_value NX PX 30000加鎖

    • 如果加鎖失敗,直接返回

    • defer添加解鎖邏輯,保證在函數(shù)退出的時(shí)候會(huì)執(zhí)行

    • 執(zhí)行業(yè)務(wù)邏輯

    多Redis實(shí)例場(chǎng)景

    在單實(shí)例情況下,如果這個(gè)實(shí)例掛了,那么所有請(qǐng)求都會(huì)因?yàn)槟貌坏芥i而失敗,所以我們需要多個(gè)分布在不同機(jī)器上的Redis實(shí)例,并且拿到其中大多數(shù)節(jié)點(diǎn)的鎖才能加鎖成功,這也就是RedLock算法。它其實(shí)也是基于上面的單實(shí)例算法的,只是我們需要同時(shí)對(duì)多個(gè)Redis實(shí)例獲取鎖。

    加解鎖示例

    package main
    
    import (
       "context"
       "errors"
       "fmt"
       "github.com/brianvoe/gofakeit/v6"
       "github.com/go-redis/redis/v8"
       "sync"
       "time"
    )
    
    var clients []*redis.Client
    
    const unlockScript = `
    if redis.call("get",KEYS[1]) == ARGV[1] then
        return redis.call("del",KEYS[1])
    else
        return 0
    end`
    
    func lottery(ctx context.Context) error {
       // 加鎖
       myRandomValue := gofakeit.UUID()
       resourceName := "resource_name"
       var wg sync.WaitGroup
       wg.Add(len(clients))
       // 這里主要是確保不要加鎖太久,這樣會(huì)導(dǎo)致業(yè)務(wù)處理的時(shí)間變少
       lockCtx, _ := context.WithTimeout(ctx, time.Millisecond*5)
       // 成功獲得鎖的Redis實(shí)例的客戶端
       successClients := make(chan *redis.Client, len(clients))
       for _, client := range clients {
          go func(client *redis.Client) {
             defer wg.Done()
             ok, err := client.SetNX(lockCtx, resourceName, myRandomValue, time.Second*30).Result()
             if err != nil {
                return
             }
             if !ok {
                return
             }
             successClients <- client
          }(client)
       }
       wg.Wait() // 等待所有獲取鎖操作完成
       close(successClients)
       // 解鎖,不管加鎖是否成功,最后都要把已經(jīng)獲得的鎖給釋放掉
       defer func() {
          script := redis.NewScript(unlockScript)
          for client := range successClients {
             go func(client *redis.Client) {
                script.Run(ctx, client, []string{resourceName}, myRandomValue)
             }(client)
          }
       }()
       // 如果成功加鎖得客戶端少于客戶端數(shù)量的一半+1,表示加鎖失敗
       if len(successClients) < len(clients)/2+1 {
          return errors.New("系統(tǒng)繁忙,請(qǐng)重試")
       }
    
       // 業(yè)務(wù)處理
       time.Sleep(time.Second)
       return nil
    }
    
    func main() {
       clients = append(clients, redis.NewClient(&redis.Options{
          Addr: "127.0.0.1:6379",
          DB:   0,
       }), redis.NewClient(&redis.Options{
          Addr: "127.0.0.1:6379",
          DB:   1,
       }), redis.NewClient(&redis.Options{
          Addr: "127.0.0.1:6379",
          DB:   2,
       }), redis.NewClient(&redis.Options{
          Addr: "127.0.0.1:6379",
          DB:   3,
       }), redis.NewClient(&redis.Options{
          Addr: "127.0.0.1:6379",
          DB:   4,
       }))
       var wg sync.WaitGroup
       wg.Add(2)
       go func() {
          defer wg.Done()
          ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
          err := lottery(ctx)
          if err != nil {
             fmt.Println(err)
          }
       }()
       go func() {
          defer wg.Done()
          ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
          err := lottery(ctx)
          if err != nil {
             fmt.Println(err)
          }
       }()
       wg.Wait()
       time.Sleep(time.Second) 
    }

    在上面的代碼中,我們使用Redis的多數(shù)據(jù)庫模擬多個(gè)Redis master實(shí)例,一般我們會(huì)選擇5個(gè)Redis實(shí)例,真實(shí)環(huán)境中這些實(shí)例應(yīng)該是分布在不同機(jī)器上的,避免同時(shí)失效。
    在加鎖邏輯里,我們主要是對(duì)每個(gè)Redis實(shí)例執(zhí)行SET resource_name my_random_value NX PX 30000獲取鎖,然后把成功獲取鎖的客戶端放到一個(gè)channel里(這里使用slice可能有并發(fā)問題),同時(shí)使用sync.WaitGroup等待所以獲取鎖操作結(jié)束。
    然后添加defer釋放鎖邏輯,釋放鎖邏輯很簡單,只是把成功拿到的鎖給釋放掉即可。
    最后判斷成功獲取到的鎖的數(shù)量是否大于一半,如果沒有得到一半以上的鎖,說明加鎖失敗。
    如果加鎖成功接下來就是進(jìn)行業(yè)務(wù)處理。

    小結(jié)

    • 生成隨機(jī)值

    • 并發(fā)給每個(gè)Redis實(shí)例使用SET resource_name my_random_value NX PX 30000加鎖

    • 等待所有獲取鎖操作完成

    • defer添加解鎖邏輯,保證在函數(shù)退出的時(shí)候會(huì)執(zhí)行,這里先defer再判斷是因?yàn)橛锌赡塬@取到一部分Redis實(shí)例的鎖,但是因?yàn)闆]有超過一半,還是會(huì)判斷為加鎖失敗

    • 判斷是否拿到一半以上Redis實(shí)例的鎖,如果沒有說明加鎖失敗,直接返回

    • 執(zhí)行業(yè)務(wù)邏輯

    感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“Go結(jié)合Redis怎么實(shí)現(xiàn)分布式鎖”這篇文章對(duì)大家有幫助,同時(shí)也希望大家多多支持億速云,關(guān)注億速云行業(yè)資訊頻道,更多相關(guān)知識(shí)等著你來學(xué)習(xí)!

    向AI問一下細(xì)節(jié)

    免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

    AI