您好,登錄后才能下訂單哦!
本篇內(nèi)容主要講解“golang高并發(fā)系統(tǒng)限流策略漏桶和令牌桶算法源碼分析”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“golang高并發(fā)系統(tǒng)限流策略漏桶和令牌桶算法源碼分析”吧!
漏桶算法比較好理解,假設(shè)我們現(xiàn)在有一個(gè)水桶,我們向這個(gè)水桶里添水,雖然我們我們無(wú)法預(yù)計(jì)一次會(huì)添多少水,也無(wú)法預(yù)計(jì)水流入的速度,但是可以固定出水的速度,不論添水的速率有多大,都按照固定的速率流出,如果桶滿了,溢出的上方水直接拋棄。我們把水當(dāng)作HTTP請(qǐng)求,每次都把請(qǐng)求放到一個(gè)桶中,然后以固定的速率處理請(qǐng)求,說(shuō)了這么多,不如看一個(gè)圖加深理解:
原理其實(shí)很簡(jiǎn)單,就看我們?cè)趺磳?shí)現(xiàn)它了,uber團(tuán)隊(duì)有一個(gè)開(kāi)源的uber-go/ratelimit庫(kù),這個(gè)庫(kù)就是漏桶的一種實(shí)現(xiàn),下面我們一起來(lái)看一看他的實(shí)現(xiàn)思路。
學(xué)習(xí)一個(gè)新東西的時(shí)候,往往是從會(huì)用開(kāi)始的,慢慢才能明白其實(shí)現(xiàn)原理,所以我們先來(lái)看看這個(gè)庫(kù)是怎樣使用的,這里我們直接提供一個(gè)實(shí)際使用例子,配合Gin框架,我們添加一個(gè)限流中間件,來(lái)達(dá)到請(qǐng)求限流的作用,測(cè)試代碼如下:
// 定義全局限流器對(duì)象 var rateLimit ratelimit.Limiter // 在 gin.HandlerFunc 加入限流邏輯 func leakyBucket() gin.HandlerFunc { prev := time.Now() return func(c *gin.Context) { now := rateLimit.Take() fmt.Println(now.Sub(prev)) // 為了打印時(shí)間間隔 prev = now // 記錄上一次的時(shí)間,沒(méi)有這個(gè)打印的會(huì)有問(wèn)題 } } func main() { rateLimit = ratelimit.New(10) r := gin.Default() r.GET("/ping", leakyBucket(), func(c *gin.Context) { c.JSON(200, true) }) r.Run() // listen and serve on 0.0.0.0:8080 (for windows "localhost:8080") }
我們簡(jiǎn)單使用壓測(cè)工具ab測(cè)試一下:ab -n 10 -c 2 http://127.0.0.1:8080/ping,執(zhí)行結(jié)果部分如下:
觀察結(jié)果可知,每次處理請(qǐng)求的時(shí)間間隔是10ms,并且后面的請(qǐng)求耗時(shí)越來(lái)越久,為什么會(huì)這樣呢? 這里先賣(mài)個(gè)小關(guān)子,看完uber的實(shí)現(xiàn)你就知道了~
我們首先來(lái)看一下其核心結(jié)構(gòu):
type limiter struct { sync.Mutex last time.Time sleepFor time.Duration perRequest time.Duration maxSlack time.Duration clock Clock } type Limiter interface { // Take should block to make sure that the RPS is met. Take() time.Time }
限制器接口只提供了一個(gè)方法take(),take()方法會(huì)阻塞確保兩次請(qǐng)求之間的時(shí)間走完,具體實(shí)現(xiàn)我們?cè)谙旅孢M(jìn)行分析。實(shí)現(xiàn)限制器接口的結(jié)構(gòu)體中各個(gè)字段的意義如下:
sync.Mutext:互斥鎖,控制并發(fā)的作用
last:記錄上一次的時(shí)刻
sleepFor:距離處理下一次請(qǐng)求需要等待的時(shí)間
perRequest:每次請(qǐng)求的時(shí)間間隔
maxSlack:最大松弛量,用來(lái)解決突發(fā)流量
clock:一個(gè)時(shí)鐘或模擬時(shí)鐘,提供了now和sleep方法,是實(shí)例化速率限制器
要是用該限制器,首先需要通過(guò)New方法進(jìn)行初始化,一個(gè)必傳的參數(shù)是rate,代表的是每秒請(qǐng)求量(RPS),還有一個(gè)可選參數(shù),參數(shù)類(lèi)型option,也就是我們可以自定義limit,不過(guò)一般使用場(chǎng)景不多,這里就不過(guò)多介紹了。我主要看一下他是怎么保證固定速率的,截取New方法部分代碼如下:
l := &limiter{ perRequest: time.Second / time.Duration(rate), maxSlack: -10 * time.Second / time.Duration(rate), }
根據(jù)我們傳入的請(qǐng)求數(shù)量,能計(jì)算出1s內(nèi)要通過(guò)n個(gè)請(qǐng)求,每個(gè)請(qǐng)求之間的間隔時(shí)間是多少,這樣在take方法中就可以根據(jù)這個(gè)字段來(lái)處理請(qǐng)求的固定速率問(wèn)題,這里還初始化了最大松弛化字段,他的值是負(fù)數(shù),默認(rèn)最大松弛量是10個(gè)請(qǐng)求的時(shí)間間隔。
接下來(lái)我們主要看一下take方法:
func (t *limiter) Take() time.Time { t.Lock() defer t.Unlock() now := t.clock.Now() if t.last.IsZero() { t.last = now return t.last } t.sleepFor += t.perRequest - now.Sub(t.last) if t.sleepFor < t.maxSlack { t.sleepFor = t.maxSlack } if t.sleepFor > 0 { t.clock.Sleep(t.sleepFor) t.last = now.Add(t.sleepFor) t.sleepFor = 0 } else { t.last = now } return t.last }
take()方法的執(zhí)行步驟如下:
為了控制并發(fā),所以進(jìn)入該方法就需要進(jìn)行上鎖,該鎖的粒度比較大,整個(gè)方法都加上了鎖
通過(guò)IsZero方法來(lái)判斷當(dāng)前是否是第一次請(qǐng)求,如果是第一次請(qǐng)求,直接取now時(shí)間即可返回。
如果不是第一次請(qǐng)求,就需要計(jì)算距離處理下一次請(qǐng)求需要等待的時(shí)間,這里有一個(gè)要注意點(diǎn)的是累加需要等待的時(shí)間,目的是可以給后面的抵消使用
如果當(dāng)前累加需要等待的時(shí)間大于最大松弛量了,將等待的時(shí)間設(shè)置為最大松弛量的時(shí)間。
如果當(dāng)前請(qǐng)求多余的時(shí)間無(wú)法完全抵消此次的所需量,調(diào)用sleep方法進(jìn)行阻塞,同時(shí)清空等待的時(shí)間。如果sleepFor小于0,說(shuō)明此次請(qǐng)求時(shí)間間隔大于預(yù)期間隔,也就說(shuō)無(wú)需等待可以直接處理請(qǐng)求。
步驟其實(shí)不是很多,主要需要注意一個(gè)知識(shí)點(diǎn) —— 最大松弛量。
漏桶算法有個(gè)天然缺陷就是無(wú)法應(yīng)對(duì)突發(fā)流量(勻速,兩次請(qǐng)求 req1 和 req2 之間的延遲至少應(yīng)該 >=perRequest),舉個(gè)例子說(shuō)明:假設(shè)我們現(xiàn)在有三個(gè)請(qǐng)求req1、req2、req3按順序處理,每個(gè)請(qǐng)求處理間隔為100ms,req1請(qǐng)求處理完成之后150ms,req2請(qǐng)求到來(lái),依據(jù)限速策略可以對(duì) req2 立即處理,當(dāng) req2 完成后,50ms 后, req3 到來(lái),這個(gè)時(shí)候距離上次請(qǐng)求還不足 100ms,因此還需要等待 50ms 才能繼續(xù)執(zhí)行, 但是,對(duì)于這種情況,實(shí)際上這三個(gè)請(qǐng)求一共消耗了 250ms 才完成,并不是預(yù)期的 200ms。
對(duì)于上面這種情況,我們可以把之前間隔比較長(zhǎng)的請(qǐng)求的時(shí)間勻給后面的請(qǐng)求判斷限流時(shí)使用,減少請(qǐng)求等待的時(shí)間了,但是當(dāng)兩個(gè)請(qǐng)求之間到達(dá)的間隔比較大時(shí),就會(huì)產(chǎn)生很大的可抵消時(shí)間,以至于后面大量請(qǐng)求瞬間到達(dá)時(shí),也無(wú)法抵消這個(gè)時(shí)間,那樣就已經(jīng)失去了限流的意義,所以引入了最大松弛量 (maxSlack) 的概念, 該值為負(fù)值,表示允許抵消的最長(zhǎng)時(shí)間,防止以上情況的出現(xiàn)。
以上就是漏桶實(shí)現(xiàn)的基本思路了,整體還是很簡(jiǎn)單的,你學(xué)會(huì)了嗎?
令牌桶其實(shí)和漏桶的原理類(lèi)似,令牌桶就是想象有一個(gè)固定大小的桶,系統(tǒng)會(huì)以恒定速率向桶中放 Token,桶滿則暫時(shí)不放。從網(wǎng)上找了圖,表述非常恰當(dāng):
關(guān)于令牌桶限流算法的實(shí)現(xiàn),Github有一個(gè)高效的基于令牌桶限流算法實(shí)現(xiàn)的限流庫(kù):github.com/juju/ratelimit,Golang的timer/rate也是令牌桶的一種實(shí)現(xiàn),本文就不介紹juju/ratelimit庫(kù)了,有興趣的自己學(xué)習(xí)一下的他的實(shí)現(xiàn)思想吧,我們主要來(lái)看一看time/rate是如何實(shí)現(xiàn)的。
還是老樣子,我們還是結(jié)合gin寫(xiě)一個(gè)限流中間件看看他是怎么使用的,例子如下:
import ( "net/http" "time" "github.com/gin-gonic/gin" "golang.org/x/time/rate" ) var rateLimit *rate.Limiter func tokenBucket() gin.HandlerFunc { return func(c *gin.Context) { if rateLimit.Allow() { c.String(http.StatusOK, "rate limit,Drop") c.Abort() return } c.Next() } } func main() { limit := rate.Every(100 * time.Millisecond) rateLimit = rate.NewLimiter(limit, 10) r := gin.Default() r.GET("/ping", tokenBucket(), func(c *gin.Context) { c.JSON(200, true) }) r.Run() // listen and serve on 0.0.0.0:8080 (for windows "localhost:8080") }
上面的例子我們首先調(diào)用NewLimiter方法構(gòu)造一個(gè)限流器,第一個(gè)參數(shù)是r limit,代表每秒可以向Token桶中產(chǎn)生多少token,第二個(gè)參數(shù)是b int,代表Token桶的容量大小,對(duì)于上面的例子,表示每100ms往桶中放一個(gè)token,也就是1s鐘產(chǎn)生10個(gè),桶的容量就是10。消費(fèi)token的方法這里我們使用Allow方法,Allow 實(shí)際上就是 AllowN(time.Now(),1),AllowN 方法表示,截止到某一時(shí)刻,目前桶中數(shù)目是否至少為 n 個(gè),滿足則返回 true,同時(shí)從桶中消費(fèi) n 個(gè) token。反之返回不消費(fèi) Token。對(duì)應(yīng)上面的例子,當(dāng)桶中的數(shù)目不足于1個(gè)時(shí),就會(huì)丟掉該請(qǐng)求。
time/rate自定義了一個(gè)limit類(lèi)型,其實(shí)他本質(zhì)就是float64的別名,Limit定了事件的最大頻率,表示每秒事件的數(shù)據(jù)量,0就表示無(wú)限制。Inf是無(wú)限的速率限制;它允許所有事件(即使突發(fā)為0)。還提供 Every 方法來(lái)指定向 Token 桶中放置 Token 的間隔,計(jì)算出每秒時(shí)間的數(shù)據(jù)量。
type Limit float64 // Inf is the infinite rate limit; it allows all events (even if burst is zero). const Inf = Limit(math.MaxFloat64) // Every converts a minimum time interval between events to a Limit. func Every(interval time.Duration) Limit { if interval <= 0 { return Inf } return 1 / Limit(interval.Seconds()) }
type Limiter struct { mu sync.Mutex limit Limit burst int tokens float64 // last is the last time the limiter's tokens field was updated last time.Time // lastEvent is the latest time of a rate-limited event (past or future) lastEvent time.Time }
各個(gè)字段含義如下:
mu:互斥鎖、為了控制并發(fā)
limit:每秒允許處理的事件數(shù)量,即每秒處理事件的頻率
burst:令牌桶的最大數(shù)量,如果burst為0,并且limit == Inf,則允許處理任何事件,否則不允許
tokens:令牌桶中可用的令牌數(shù)量
last:記錄上次limiter的tokens被更新的時(shí)間
lastEvent:lastEvent記錄速率受限制(桶中沒(méi)有令牌)的時(shí)間點(diǎn),該時(shí)間點(diǎn)可能是過(guò)去的,也可能是將來(lái)的(Reservation預(yù)定的結(jié)束時(shí)間點(diǎn))
type Reservation struct { ok bool lim *Limiter tokens int timeToAct time.Time // This is the Limit at reservation time, it can change later. limit Limit }
各個(gè)字段含義如下:
ok
:到截至?xí)r間是否可以獲取足夠的令牌
lim
:limiter對(duì)象
tokens
:需要獲取的令牌數(shù)量
timeToAct
:需要等待的時(shí)間點(diǎn)
limit
:代表預(yù)定的時(shí)間,是可以更改的。
reservation就是一個(gè)預(yù)定令牌的操作,timeToAct是本次預(yù)約需要等待到的指定時(shí)間點(diǎn)才有足夠預(yù)約的令牌。
Limiter有三個(gè)token的消費(fèi)方法,分別是Allow、Reserve和Wait,最終三種消費(fèi)方式都調(diào)用了 reserveN 、advance這兩個(gè)方法來(lái)生成和消費(fèi) Token。所以我們主要看看reserveN、advance函數(shù)的具體實(shí)現(xiàn)。
advance方法的實(shí)現(xiàn):
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) { //last不能在當(dāng)前時(shí)間now之后,否則計(jì)算出來(lái)的elapsed為負(fù)數(shù),會(huì)導(dǎo)致令牌桶數(shù)量減少 last := lim.last if now.Before(last) { last = now } //根據(jù)令牌桶的缺數(shù)計(jì)算出令牌桶未進(jìn)行更新的最大時(shí)間 maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens) elapsed := now.Sub(last) //令牌桶未進(jìn)行更新的時(shí)間段 if elapsed > maxElapsed { elapsed = maxElapsed } //根據(jù)未更新的時(shí)間(未向桶中加入令牌的時(shí)間段)計(jì)算出產(chǎn)生的令牌數(shù) delta := lim.limit.tokensFromDuration(elapsed) tokens := lim.tokens + delta //計(jì)算出可用的令牌數(shù) if burst := float64(lim.burst); tokens > burst { tokens = burst } return now, last, tokens }
advance方法的作用是更新令牌桶的狀態(tài),計(jì)算出令牌桶未更新的時(shí)間(elapsed),根據(jù)elapsed算出需要向桶中加入的令牌數(shù)delta,然后算出桶中可用的令牌數(shù)newTokens.
reserveN方法的實(shí)現(xiàn):reserveN是 AllowN, ReserveN及 WaitN的輔助方法,用于判斷在maxFutureReserve時(shí)間內(nèi)是否有足夠的令牌。
// @param n 要消費(fèi)的token數(shù)量 // @param maxFutureReserve 愿意等待的最長(zhǎng)時(shí)間 func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation { lim.mu.Lock() // 如果沒(méi)有限制 if lim.limit == Inf { lim.mu.Unlock() return Reservation{ ok: true, //桶中有足夠的令牌 lim: lim, tokens: n, timeToAct: now, } } //更新令牌桶的狀態(tài),tokens為目前可用的令牌數(shù)量 now, last, tokens := lim.advance(now) // 計(jì)算取完之后桶還能剩能下多少token tokens -= float64(n) var waitDuration time.Duration // 如果token < 0, 說(shuō)明目前的token不夠,需要等待一段時(shí)間 if tokens < 0 { waitDuration = lim.limit.durationFromTokens(-tokens) } ok := n <= lim.burst && waitDuration <= maxFutureReserve r := Reservation{ ok: ok, lim: lim, limit: lim.limit, } // timeToAct表示當(dāng)桶中滿足token數(shù)目等于n的時(shí)間 if ok { r.tokens = n r.timeToAct = now.Add(waitDuration) } // 更新桶里面的token數(shù)目 // 更新last時(shí)間 // lastEvent if ok { lim.last = now lim.tokens = tokens lim.lastEvent = r.timeToAct } else { lim.last = last } lim.mu.Unlock() return r }
上面的代碼我已經(jīng)進(jìn)行了注釋?zhuān)@里在總結(jié)一下流程:
首選判斷是否擁有速率限制,沒(méi)有速率限制也就是桶中一致?lián)碛凶銐虻牧钆啤?/p>
計(jì)算從上次取 Token 的時(shí)間到當(dāng)前時(shí)刻,期間一共新產(chǎn)生了多少 Token:我們只在取 Token 之前生成新的 Token,也就意味著每次取Token的間隔,實(shí)際上也是生成 Token 的間隔。我們可以利用 tokensFromDuration, 輕易的算出這段時(shí)間一共產(chǎn)生 Token 的數(shù)目。所以當(dāng)前 Token 數(shù)目 = 新產(chǎn)生的 Token 數(shù)目 + 之前剩余的 Token 數(shù)目 - 要消費(fèi)的 Token 數(shù)目。
如果消費(fèi)后剩余 Token 數(shù)目大于零,說(shuō)明此時(shí) Token 桶內(nèi)仍不為空,此時(shí) Token 充足,無(wú)需調(diào)用側(cè)等待。 如果 Token 數(shù)目小于零,則需等待一段時(shí)間。那么這個(gè)時(shí)候,我們可以利用 durationFromTokens 將當(dāng)前負(fù)值的 Token 數(shù)轉(zhuǎn)化為需要等待的時(shí)間。
將需要等待的時(shí)間等相關(guān)結(jié)果返回給調(diào)用方
其實(shí)整個(gè)過(guò)程就是利用了 Token 數(shù)可以和時(shí)間相互轉(zhuǎn)化 的原理。而如果 Token 數(shù)為負(fù),則需要等待相應(yīng)時(shí)間即可。
上面提到了durationFromTokens、tokensFromDuration這兩個(gè)方法,是關(guān)鍵,他們的實(shí)現(xiàn)如下:
func (limit Limit) durationFromTokens(tokens float64) time.Duration { seconds := tokens / float64(limit) return time.Nanosecond * time.Duration(1e9*seconds) } func (limit Limit) tokensFromDuration(d time.Duration) float64 { // Split the integer and fractional parts ourself to minimize rounding errors. // See golang.org/issues/34861. sec := float64(d/time.Second) * float64(limit) nsec := float64(d%time.Second) * float64(limit) return sec + nsec/1e9 }
durationFromTokens:功能是計(jì)算出生成N 個(gè)新的 Token 一共需要多久。
tokensFromDuration:給定一段時(shí)長(zhǎng),這段時(shí)間一共可以生成多少個(gè) Token。
細(xì)心的網(wǎng)友會(huì)發(fā)現(xiàn)tokensFromDuration方法既然是計(jì)算一段時(shí)間一共可以生成多少個(gè) Token,為什么不直接進(jìn)行相乘呢?其實(shí)Golang最初的版本就是采用d.Seconds() * float64(limit)直接相乘實(shí)現(xiàn)的,雖然看上去一點(diǎn)問(wèn)題沒(méi)有,但是這里是兩個(gè)小數(shù)相乘,會(huì)帶來(lái)精度損失,所以采用現(xiàn)在這種方法實(shí)現(xiàn),分別求出秒的整數(shù)部分和小數(shù)部分,進(jìn)行相乘后再相加,這樣可以得到最精確的精度。
既然我們可以消費(fèi)Token,那么對(duì)應(yīng)也可以取消此次消費(fèi),將token歸還,當(dāng)調(diào)用 Cancel() 函數(shù)時(shí),消費(fèi)的 Token 數(shù)將會(huì)盡可能歸還給 Token 桶。歸還也并不是那么簡(jiǎn)單,接下我們我們看看歸還token是如何實(shí)現(xiàn)的。
func (r *Reservation) CancelAt(now time.Time) { if !r.ok { return } r.lim.mu.Lock() defer r.lim.mu.Unlock() /* 1.如果無(wú)需限流 2. tokens為0 (需要獲取的令牌數(shù)量為0) 3. 已經(jīng)過(guò)了截至?xí)r間 以上三種情況無(wú)需處理取消操作 */ if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) { return } //計(jì)算出需要還原的令牌數(shù)量 //這里的r.lim.lastEvent可能是本次Reservation的結(jié)束時(shí)間,也可能是后來(lái)的Reservation的結(jié)束時(shí)間,所以要把本次結(jié)束時(shí)間點(diǎn)(r.timeToAct)之后產(chǎn)生的令牌數(shù)減去 restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct)) // 當(dāng)小于0,表示已經(jīng)都預(yù)支完了,不能歸還了 if restoreTokens <= 0 { return } //從新計(jì)算令牌桶的狀態(tài) now, _, tokens := r.lim.advance(now) //還原當(dāng)前令牌桶的令牌數(shù)量,當(dāng)前的令牌數(shù)tokens加上需要還原的令牌數(shù)restoreTokens tokens += restoreTokens //如果tokens大于桶的最大容量,則將tokens置為桶的最大容量 if burst := float64(r.lim.burst); tokens > burst { tokens = burst } // update state r.lim.last = now //記錄桶的更新時(shí)間 r.lim.tokens = tokens //更新令牌數(shù)量 // 如果都相等,說(shuō)明跟沒(méi)消費(fèi)一樣。直接還原成上次的狀態(tài)吧 if r.timeToAct == r.lim.lastEvent { prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens))) if !prevEvent.Before(now) { r.lim.lastEvent = prevEvent } } return }
注釋已經(jīng)添加,就不在詳細(xì)解釋了,重點(diǎn)是這一行代碼:
restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct)),
r.tokens指的是本次消費(fèi)的token數(shù),
r.timeToAcr指的是Token桶可以滿足本次消費(fèi)數(shù)目的時(shí)刻,也就是消費(fèi)的時(shí)刻+等待的時(shí)長(zhǎng)
r.lim.lastEvent指的是最近一次消費(fèi)的timeToAct的值,
通過(guò)r.limit.tokensFromDuration方法得出的結(jié)果指的是從該次消費(fèi)到當(dāng)前時(shí)間,一共又消費(fèi)了多少Token數(shù)目,所以最終得出這一段的代碼含義是:
要?dú)w還的Token = 該次消費(fèi)的Token - 新消費(fèi)的token。
到此,相信大家對(duì)“golang高并發(fā)系統(tǒng)限流策略漏桶和令牌桶算法源碼分析”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢(xún),關(guān)注我們,繼續(xù)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。