您好,登錄后才能下訂單哦!
這篇文章主要介紹了Golang分布式應(yīng)用之Redis怎么使用的相關(guān)知識(shí),內(nèi)容詳細(xì)易懂,操作簡(jiǎn)單快捷,具有一定借鑒價(jià)值,相信大家閱讀完這篇Golang分布式應(yīng)用之Redis怎么使用文章都會(huì)有所收獲,下面我們一起來(lái)看看吧。
Redis作是一個(gè)高性能的內(nèi)存數(shù)據(jù)庫(kù),常被應(yīng)用于分布式系統(tǒng)中,除了作為分布式緩存或簡(jiǎn)單的內(nèi)存數(shù)據(jù)庫(kù)還有一些特殊的應(yīng)用場(chǎng)景,本文結(jié)合Golang來(lái)編寫(xiě)對(duì)應(yīng)的中間件。
單機(jī)系統(tǒng)中我們可以使用sync.Mutex
來(lái)保護(hù)臨界資源,在分布式系統(tǒng)中同樣有這樣的需求,當(dāng)多個(gè)主機(jī)搶占同一個(gè)資源,需要加對(duì)應(yīng)的“分布式鎖”。
在Redis中我們可以通過(guò)setnx
命令來(lái)實(shí)現(xiàn)
如果key不存在可以設(shè)置對(duì)應(yīng)的值,設(shè)置成功則加鎖成功,key不存在返回失敗
釋放鎖可以通過(guò)del
實(shí)現(xiàn)。
主要邏輯如下:
type RedisLock struct { client *redis.Client key string expiration time.Duration // 過(guò)期時(shí)間,防止宕機(jī)或者異常 } func NewLock(client *redis.Client, key string, expiration time.Duration) *RedisLock { return &RedisLock{ client: client, key: key, expiration: expiration, } } // 加鎖將成功會(huì)將調(diào)用者id保存到redis中 func (l *RedisLock) Lock(id string) (bool, error) { return l.client.SetNX(context.TODO(), l.key, id, l.expiration).Result() } const unLockScript = ` if (redis.call("get", KEYS[1]) == KEYS[2]) then redis.call("del", KEYS[1]) return true end return false ` // 解鎖通過(guò)lua腳本來(lái)保證原子性,只能解鎖當(dāng)前調(diào)用者加的鎖 func (l *RedisLock) UnLock(id string) error { _, err := l.client.Eval(context.TODO(), unLockScript, []string{l.key, id}).Result() if err != nil && err != redis.Nil { return err } return nil }
需要加一個(gè)額外的超時(shí)時(shí)間來(lái)防止系統(tǒng)宕機(jī)或者異常請(qǐng)求造成的死鎖,通過(guò)超時(shí)時(shí)間為最大預(yù)估運(yùn)行時(shí)間的2倍。
解鎖時(shí)通過(guò)lua腳本來(lái)保證原子性,調(diào)用者只會(huì)解自己加的鎖。避免由于超時(shí)造成的混亂,例如:進(jìn)程A在時(shí)間t1獲取了鎖,但由于執(zhí)行緩慢,在時(shí)間t2鎖超時(shí)失效,進(jìn)程B在t3獲取了鎖,這是如果進(jìn)程A執(zhí)行完去解鎖會(huì)取消進(jìn)程B的鎖。
func main() { client := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "123456", DB: 0, // use default DB }) lock := NewLock(client, "counter", 30*time.Second) counter := 0 worker := func(i int) { for { id := fmt.Sprintf("worker%d", i) ok, err := lock.Lock(id) log.Printf("worker %d attempt to obtain lock, ok: %v, err: %v", i, ok, err) if !ok { time.Sleep(100 * time.Millisecond) continue } defer lock.UnLock(id) counter++ log.Printf("worker %d, add counter %d", i, counter) break } } wg := sync.WaitGroup{} for i := 1; i <= 5; i++ { wg.Add(1) id := i go func() { defer wg.Done() worker(id) }() } wg.Wait() }
運(yùn)行結(jié)果,可以看到與sync.Mutex
使用效果類(lèi)似
2022/07/22 09:58:09 worker 5 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:09 worker 5, add counter 1
2022/07/22 09:58:09 worker 4 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:09 worker 1 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:09 worker 2 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:09 worker 3 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 1 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 4 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:10 worker 4, add counter 2
2022/07/22 09:58:10 worker 1 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:10 worker 1, add counter 3
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:10 worker 2, add counter 4
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:10 worker 3, add counter 5
特別注意的是,在分布式Redis集群中,如果發(fā)生異常時(shí)(主節(jié)點(diǎn)宕機(jī)),可能會(huì)降低分布式鎖的可用性,可以通過(guò)強(qiáng)一致性的組件etcd、ZooKeeper等實(shí)現(xiàn)。
假設(shè)要開(kāi)發(fā)一個(gè)爬蟲(chóng)服務(wù),爬取百萬(wàn)級(jí)的網(wǎng)頁(yè),怎么判斷某一個(gè)網(wǎng)頁(yè)是否爬取過(guò),除了借助數(shù)據(jù)庫(kù)和HashMap,我們可以借助布隆過(guò)濾器來(lái)做。相比其他方式布隆過(guò)濾器占用極低的空間,而且插入查詢(xún)時(shí)間非常快。
布隆過(guò)濾器用來(lái)判斷某個(gè)元素是否在集合中,利用BitSet
插入數(shù)據(jù)時(shí)將值進(jìn)行多次Hash,將BitSet對(duì)應(yīng)位置1
查詢(xún)時(shí)同樣進(jìn)行多次Hash對(duì)比所有位上是否為1,如是則存在。
布隆過(guò)濾器有一定的誤判率,不適合精確查詢(xún)的場(chǎng)景。另外也不支持刪除元素。通常適用于URL去重、垃圾郵件過(guò)濾、防止緩存擊穿等場(chǎng)景中。
在Redis中,我們可以使用自帶的BitSet實(shí)現(xiàn),同樣也借助lua腳本的原子性來(lái)避免多次查詢(xún)數(shù)據(jù)不一致。
const ( // 插入數(shù)據(jù),調(diào)用setbit設(shè)置對(duì)應(yīng)位 setScript = ` for _, offset in ipairs(ARGV) do redis.call("setbit", KEYS[1], offset, 1) end ` // 查詢(xún)數(shù)據(jù),如果所有位都為1返回true getScript = ` for _, offset in ipairs(ARGV) do if tonumber(redis.call("getbit", KEYS[1], offset)) == 0 then return false end end return true ` ) type BloomFilter struct { client *redis.Client key string // 存在redis中的key bits uint // BitSet的大小 maps uint // Hash的次數(shù) } func NewBloomFilter(client *redis.Client, key string, bits, maps uint) *BloomFilter { client.Del(context.TODO(), key) if maps == 0 { maps = 14 } return &BloomFilter{ key: key, client: client, bits: bits, maps: maps, } } // 進(jìn)行多次Hash, 得到位置列表 func (f *BloomFilter) getLocations(data []byte) []uint { locations := make([]uint, f.maps) for i := 0; i < int(f.maps); i++ { val := murmur3.Sum64(append(data, byte(i))) locations[i] = uint(val) % f.bits } return locations } func (f *BloomFilter) Add(data []byte) error { args := getArgs(f.getLocations(data)) _, err := f.client.Eval(context.TODO(), setScript, []string{f.key}, args).Result() if err != nil && err != redis.Nil { return err } return nil } func (f *BloomFilter) Exists(data []byte) (bool, error) { args := getArgs(f.getLocations(data)) resp, err := f.client.Eval(context.TODO(), getScript, []string{f.key}, args).Result() if err != nil { if err == redis.Nil { return false, nil } return false, err } exists, ok := resp.(int64) if !ok { return false, nil } return exists == 1, nil } func getArgs(locations []uint) []string { args := make([]string, 0) for _, l := range locations { args = append(args, strconv.FormatUint(uint64(l), 10)) } return args }
func main() { bf := NewBloomFilter(client,"bf-test", 2^16, 14) exists, err := bf.Exists([]byte("test1")) log.Printf("exist %t, err %v", exists, err) if err := bf.Add([]byte("test1")); err != nil { log.Printf("add err: %v", err) } exists, err = bf.Exists([]byte("test1")) log.Printf("exist %t, err %v", exists, err) exists, err = bf.Exists([]byte("test2")) log.Printf("exist %t, err %v", exists, err) // output // 2022/07/22 10:05:58 exist false, err <nil> // 2022/07/22 10:05:58 exist true, err <nil> // 2022/07/22 10:05:58 exist false, err <nil> }
在golang.org/x/time/rate
包中提供了基于令牌桶的限流器,如果要實(shí)現(xiàn)分布式環(huán)境的限流可以基于Redis Lua腳本實(shí)現(xiàn)。
令牌桶的主要原理如下:
假設(shè)一個(gè)令牌桶容量為burst,每秒按照qps的速率往里面放置令牌
初始時(shí)放滿(mǎn)令牌,令牌溢出則直接丟棄,請(qǐng)求令牌時(shí),如果桶中有足夠令牌則允許,否則拒絕
當(dāng)burst==qps時(shí),嚴(yán)格按照qps限流;當(dāng)burst>qps時(shí),可以允許一定的突增流量
這里主要參考了官方rate
包的實(shí)現(xiàn),將核心邏輯改為L(zhǎng)ua實(shí)現(xiàn)。
--- 相關(guān)Key --- limit rate key值,對(duì)應(yīng)value為當(dāng)前令牌數(shù) local limit_key = KEYS[1] --- 輸入?yún)?shù) --[[ qps: 每秒請(qǐng)求數(shù); burst: 令牌桶容量; now: 當(dāng)前Timestamp; cost: 請(qǐng)求令牌數(shù); max_wait: 最大等待時(shí)間 --]] local qps = tonumber(ARGV[1]) local burst = tonumber(ARGV[2]) local now = ARGV[3] local cost = tonumber(ARGV[4]) local max_wait = tonumber(ARGV[5]) --- 獲取redis中的令牌數(shù) local tokens = redis.call("hget", limit_key, "token") if not tokens then tokens = burst end --- 上次修改時(shí)間 local last_time = redis.call("hget", limit_key, "last_time") if not last_time then last_time = 0 end --- 最新等待時(shí)間 local last_event = redis.call("hget", limit_key, "last_event") if not last_event then last_event = 0 end --- 通過(guò)當(dāng)前時(shí)間與上次修改時(shí)間的差值,qps計(jì)算出當(dāng)前時(shí)間得令牌數(shù) local delta = math.max(0, now-last_time) local new_tokens = math.min(burst, delta * qps + tokens) new_tokens = new_tokens - cost --- 最新令牌數(shù),減少請(qǐng)求令牌 --- 如果最新令牌數(shù)小于0,計(jì)算需要等待的時(shí)間 local wait_period = 0 if new_tokens < 0 and qps > 0 then wait_period = wait_period - new_tokens / qps end wait_period = math.ceil(wait_period) local time_act = now + wait_period --- 滿(mǎn)足等待間隔的時(shí)間戳 --- 允許請(qǐng)求有兩種情況 --- 當(dāng)請(qǐng)求令牌數(shù)小于burst, 等待時(shí)間不超過(guò)最大等待時(shí)間,可以通過(guò)補(bǔ)充令牌滿(mǎn)足請(qǐng)求 --- qps為0時(shí),只要最新令牌數(shù)不小于0即可 local ok = (cost <= burst and wait_period <= max_wait and qps > 0) or (qps == 0 and new_tokens >= 0) --- 設(shè)置對(duì)應(yīng)值 if ok then redis.call("set", limit_key, new_tokens) redis.call("set", last_time_key, now) redis.call("set", last_event_key, time_act) end --- 返回列表,{是否允許, 等待時(shí)間} return {ok, wait_period}
在Golang中的相關(guān)接口Allow、AllowN、Wait等都是通過(guò)調(diào)用reserveN實(shí)現(xiàn)
// 調(diào)用lua腳本 func (lim *RedisLimiter) reserveN(now time.Time, n int, maxFutureReserveSecond int) (*Reservation, error) { // ... res, err := lim.rdb.Eval(context.TODO(), reserveNScript, []string{lim.limitKey}, lim.qps, lim.burst, now.Unix(), n, maxFutureReserveSecond).Result() if err != nil && err != redis.Nil { return nil, err } //... return &Reservation{ ok: allow == 1, lim: lim, tokens: n, timeToAct: now.Add(time.Duration(wait) * time.Second), }, nil }
func main() { rdb := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "123456", DB: 0, // use default DB }) r, err := NewRedisLimiter(rdb, 1, 2, "testrate") if err != nil { log.Fatal(err) } r.Reset() for i := 0; i < 5; i++ { err := r.Wait(context.TODO()) log.Printf("worker %d allowed: %v", i, err) } } // output // 2022/07/22 12:50:31 worker 0 allowed: <nil> // 2022/07/22 12:50:31 worker 1 allowed: <nil> // 2022/07/22 12:50:32 worker 2 allowed: <nil> // 2022/07/22 12:50:33 worker 3 allowed: <nil> // 2022/07/22 12:50:34 worker 4 allowed: <nil>
前兩個(gè)請(qǐng)求在burst內(nèi),直接可以獲得,后面的請(qǐng)求按照qps的速率生成。
除此之外,Redis還可以用作全局計(jì)數(shù)、去重(set)、發(fā)布訂閱等場(chǎng)景。Redis官方也提供了一些通用模塊,通過(guò)加載這些模塊也可以實(shí)現(xiàn)過(guò)濾、限流等特性,參考modules。
關(guān)于“Golang分布式應(yīng)用之Redis怎么使用”這篇文章的內(nèi)容就介紹到這里,感謝各位的閱讀!相信大家對(duì)“Golang分布式應(yīng)用之Redis怎么使用”知識(shí)都有一定的了解,大家如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。