溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

如何解析client-go中workqueue

發(fā)布時(shí)間:2021-12-16 09:34:34 來(lái)源:億速云 閱讀:179 作者:柒染 欄目:云計(jì)算

今天就跟大家聊聊有關(guān)如何解析client-go中workqueue,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。


下面主要講述下client-go中workqueue, 看一下client-go的一個(gè)整體數(shù)據(jù)走向.如下圖:

如何解析client-go中workqueue

而workqueue主要是在listener這里引用,listener使用chan獲取到數(shù)據(jù)之后將數(shù)據(jù)放入到工作隊(duì)列進(jìn)行處理。主要是由于chan過(guò)于簡(jiǎn)單,已經(jīng)無(wú)法滿(mǎn)足K8S的場(chǎng)景,所以衍生出了workqueue,

特性


  1. 有序

  2. 去重

  3. 并發(fā)

  4. 延遲處理

  5. 限速

當(dāng)前有三種workqueue


  1. 基本隊(duì)列

  2. 延遲隊(duì)列

  3. 限速隊(duì)列

其中延遲隊(duì)列是基于基本隊(duì)列實(shí)現(xiàn)的,而限流隊(duì)列基于延遲隊(duì)列實(shí)現(xiàn)

基本隊(duì)列


看一下基本隊(duì)列的接口

// client-go源碼路徑util/workqueue/queue.go
type Interface interface {
    //新增元素 可以是任意對(duì)象
    Add(item interface{})
    //獲取當(dāng)前隊(duì)列的長(zhǎng)度
    Len() int
    // 阻塞獲取頭部元素(先入先出)  返回元素以及隊(duì)列是否關(guān)閉
    Get() (item interface{}, shutdown bool)
    // 顯示標(biāo)記完成元素的處理
    Done(item interface{})
    //關(guān)閉隊(duì)列
    ShutDown()
    //隊(duì)列是否處于關(guān)閉狀態(tài)
    ShuttingDown() bool
}

看一下基本隊(duì)列的數(shù)據(jù)結(jié)構(gòu),只看三個(gè)重點(diǎn)處理的,其他的沒(méi)有展示出來(lái)

type Type struct {
    //含有所有元素的元素的隊(duì)列 保證有序
    queue []t
    //所有需要處理的元素 set是基于map以value為空struct實(shí)現(xiàn)的結(jié)構(gòu),保證去重
    dirty set
    //當(dāng)前正在處理中的元素
    processing set
    ...
}

type empty struct{}
type t interface{}
type set map[t]empty

基本隊(duì)列的hello world也很簡(jiǎn)單

 wq := workqueue.New()
    wq.Add("hello")
    v, _ := wq.Get()

基本隊(duì)列Add


func (q *Type) Add(item interface{}) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    //如果當(dāng)前處于關(guān)閉狀態(tài),則不再新增元素
    if q.shuttingDown {
        return
    }
    //如果元素已經(jīng)在等待處理中,則不再新增
    if q.dirty.has(item) {
        return
    }
    //添加到metrics
    q.metrics.add(item)
    //加入等待處理中
    q.dirty.insert(item)
    //如果目前正在處理該元素 就不將元素添加到隊(duì)列
    if q.processing.has(item) {
        return
    }
    q.queue = append(q.queue, item)
    q.cond.Signal()
}

基本隊(duì)列Get


func (q *Type) Get() (item interface{}, shutdown bool) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    //如果當(dāng)前沒(méi)有元素并且不處于關(guān)閉狀態(tài),則阻塞
    for len(q.queue) == 0 && !q.shuttingDown {
        q.cond.Wait()
    }
    ...
    item, q.queue = q.queue[0], q.queue[1:]
    q.metrics.get(item)
    //把元素添加到正在處理隊(duì)列中
    q.processing.insert(item)
    //把隊(duì)列從等待處理隊(duì)列中刪除
    q.dirty.delete(item)
    return item, false
}

基本隊(duì)列實(shí)例化


func newQueue(c clock.Clock, metrics queueMetrics, updatePeriod time.Duration) *Type {
    t := &Type{
        clock:                      c,
        dirty:                      set{},
        processing:                 set{},
        cond:                       sync.NewCond(&sync.Mutex{}),
        metrics:                    metrics,
        unfinishedWorkUpdatePeriod: updatePeriod,
    }
        //啟動(dòng)一個(gè)協(xié)程 定時(shí)更新metrics
    go t.updateUnfinishedWorkLoop()
    return t
}

func (q *Type) updateUnfinishedWorkLoop() {
    t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod)
    defer t.Stop()
    for range t.C() {
        if !func() bool {
            q.cond.L.Lock()
            defer q.cond.L.Unlock()
            if !q.shuttingDown {
                q.metrics.updateUnfinishedWork()
                return true
            }
            return false

        }() {
            return
        }
    }
}

延遲隊(duì)列


延遲隊(duì)列的實(shí)現(xiàn)思路主要是使用優(yōu)先隊(duì)列存放需要延遲添加的元素,每次判斷最小延遲的元素書(shū)否已經(jīng)達(dá)到了加入隊(duì)列的要求(延遲的時(shí)間到了),如果是則判斷下一個(gè)元素,直到?jīng)]有元素或者元素還需要延遲為止。

看一下延遲隊(duì)列的數(shù)據(jù)結(jié)構(gòu)

type delayingType struct {
    Interface
        ...
    //放置延遲添加的元素
    waitingForAddCh chan *waitFor
       ...
}

主要是使用chan來(lái)保存延遲添加的元素,而具體實(shí)現(xiàn)是通過(guò)一個(gè)實(shí)現(xiàn)了一個(gè)AddAfter方法,看一下具體的內(nèi)容

//延遲隊(duì)列的接口
type DelayingInterface interface {
    Interface
    // AddAfter adds an item to the workqueue after the indicated duration has passed
    AddAfter(item interface{}, duration time.Duration)
}

func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
    ...
    //如果延遲實(shí)現(xiàn)小于等于0 直接添加到隊(duì)列
    if duration <= 0 {
        q.Add(item)
        return
    }
    select {
    case <-q.stopCh:
    //添加到chan,下面會(huì)講一下這個(gè)chan的處理
    case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
    }
}

延遲元素的處理

func (q *delayingType) waitingLoop() {

    defer utilruntime.HandleCrash()

    never := make(<-chan time.Time)

    var nextReadyAtTimer clock.Timer

    waitingForQueue := &waitForPriorityQueue{}
    //這里是初始化一個(gè)優(yōu)先隊(duì)列 具體實(shí)現(xiàn)有興趣的同學(xué)可以研究下
    heap.Init(waitingForQueue)

    waitingEntryByData := map[t]*waitFor{}

    for {
        if q.Interface.ShuttingDown() {
            return
        }

        now := q.clock.Now()

        // Add ready entries
        for waitingForQueue.Len() > 0 {
            entry := waitingForQueue.Peek().(*waitFor)
            //看一下第一個(gè)元素是否已經(jīng)到達(dá)延遲的時(shí)間了
            if entry.readyAt.After(now) {
                break
            }

            //時(shí)間到了,將元素添加到工作的隊(duì)列,并且從延遲的元素中移除
            entry = heap.Pop(waitingForQueue).(*waitFor)
            q.Add(entry.data)
            delete(waitingEntryByData, entry.data)
        }

        // Set up a wait for the first item's readyAt (if one exists)
        nextReadyAt := never
        if waitingForQueue.Len() > 0 {
            if nextReadyAtTimer != nil {
                nextReadyAtTimer.Stop()
            }
            //如果還有需要延遲的元素,計(jì)算第一個(gè)元素的延遲時(shí)間(最小延遲的元素)
            entry := waitingForQueue.Peek().(*waitFor)
            nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
            nextReadyAt = nextReadyAtTimer.C()
        }

        select {
        case <-q.stopCh:
            return
        case <-q.heartbeat.C():
            //定時(shí)檢查下是否有元素達(dá)到延遲的時(shí)間
        case <-nextReadyAt:
            //這里是上面計(jì)算出來(lái)的時(shí)間,時(shí)間到了,處理到達(dá)延遲時(shí)間的元素
        case waitEntry := <-q.waitingForAddCh:
            //檢查是否需要延遲,如果需要延遲就加入到延遲等待
            if waitEntry.readyAt.After(q.clock.Now()) {
                insert(waitingForQueue, waitingEntryByData, waitEntry)
            } else {
                //如果不需要延遲就直接添加到隊(duì)列
                q.Add(waitEntry.data)
            }

            drained := false
            for !drained {
                select {
                case waitEntry := <-q.waitingForAddCh:

上面waitingLoop 是在實(shí)例化延遲隊(duì)列的時(shí)候調(diào)用的,看一下實(shí)例化時(shí)候的邏輯

func NewDelayingQueueWithCustomClock(clock clock.Clock, name string) DelayingInterface {
    //實(shí)例化一個(gè)數(shù)據(jù)結(jié)構(gòu)
    ret := &delayingType{
        Interface:       NewNamed(name),
        clock:           clock,
        heartbeat:       clock.NewTicker(maxWait),
        stopCh:          make(chan struct{}),
        waitingForAddCh: make(chan *waitFor, 1000),
        metrics:         newRetryMetrics(name),
    }

    //放到一個(gè)協(xié)程中處理延遲元素
    go ret.waitingLoop()

    return ret
}

限速隊(duì)列


當(dāng)前限速隊(duì)列支持4中限速模式

  1. 令牌桶算法限速

  2. 排隊(duì)指數(shù)限速

  3. 計(jì)數(shù)器模式

  4. 混合模式(多種限速算法同時(shí)使用)

限速隊(duì)列的底層實(shí)際上還是通過(guò)延遲隊(duì)列來(lái)進(jìn)行限速,通過(guò)計(jì)算出元素的限速時(shí)間作為延遲時(shí)間

來(lái)看一下限速接口

type RateLimiter interface {
    //
    When(item interface{}) time.Duration
    // Forget indicates that an item is finished being retried.  Doesn't matter whether its for perm failing
    // or for success, we'll stop tracking it
    Forget(item interface{})
    // NumRequeues returns back how many failures the item has had
    NumRequeues(item interface{}) int
}

看一下限速隊(duì)列的數(shù)據(jù)結(jié)構(gòu)

// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
    DelayingInterface

    //實(shí)際上底層還是調(diào)用的延遲隊(duì)列,通過(guò)計(jì)算出元素的延遲時(shí)間 進(jìn)行限速
    AddRateLimited(item interface{})

    // Forget indicates that an item is finished being retried.  Doesn't matter whether it's for perm failing
    // or for success, we'll stop the rate limiter from tracking it.  This only clears the `rateLimiter`, you
    // still have to call `Done` on the queue.
    Forget(item interface{})

    // NumRequeues returns back how many times the item was requeued
    NumRequeues(item interface{}) int
}

func (q *rateLimitingType) AddRateLimited(item interface{}) {
         //通過(guò)when方法計(jì)算延遲加入隊(duì)列的時(shí)間
    q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}

令牌桶算法


client-go中的令牌桶限速是通過(guò) golang.org/x/time/rat包來(lái)實(shí)現(xiàn)的

可以通過(guò) flowcontrol.NewTokenBucketRateLimiter(qps float32, burst int) 來(lái)使用令牌桶限速算法,其中第一個(gè)參數(shù)qps表示每秒補(bǔ)充多少token,burst表示總token上限為多少。

排隊(duì)指數(shù)算法


排隊(duì)指數(shù)可以通過(guò) workqueue.NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) 來(lái)使用。

這個(gè)算法有兩個(gè)參數(shù):

  1. baseDelay 基礎(chǔ)限速時(shí)間

  2. maxDelay 最大限速時(shí)間

舉個(gè)例子來(lái)理解一下這個(gè)算法,例如快速插入5個(gè)相同元素,baseDelay設(shè)置為1秒,maxDelay設(shè)置為10秒,都在同一個(gè)限速期內(nèi)。第一個(gè)元素會(huì)在1秒后加入到隊(duì)列,第二個(gè)元素會(huì)在2秒后加入到隊(duì)列,第三個(gè)元素會(huì)在4秒后加入到隊(duì)列,第四個(gè)元素會(huì)在8秒后加入到隊(duì)列,第五個(gè)元素會(huì)在10秒后加入到隊(duì)列(指數(shù)計(jì)算的結(jié)果為16,但是最大值設(shè)置了10秒)。

來(lái)看一下源碼的計(jì)算

func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
    r.failuresLock.Lock()
    defer r.failuresLock.Unlock()

    //第一次為0
    exp := r.failures[item]
    //累加1
    r.failures[item] = r.failures[item] + 1

    //通過(guò)當(dāng)前計(jì)數(shù)和baseDelay計(jì)算指數(shù)結(jié)果  baseDelay*(2的exp次方)
    backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
    if backoff > math.MaxInt64 {
        return r.maxDelay
    }

    calculated := time.Duration(backoff)
    if calculated > r.maxDelay {
        return r.maxDelay
    }

    return calculated
}

計(jì)數(shù)器模式


計(jì)數(shù)器模式可以通過(guò) workqueue.NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int)來(lái)使用,有三個(gè)參數(shù)

  1. fastDelay 快限速時(shí)間

  2. slowDelay 慢限速時(shí)間

  3. maxFastAttempts 快限速元素個(gè)數(shù)

原理是這樣的,假設(shè)fastDelay設(shè)置為1秒,slowDelay設(shè)置為10秒,maxFastAttempts設(shè)置為3,同樣在一個(gè)限速周期內(nèi)快速插入5個(gè)相同的元素。前三個(gè)元素都是以1秒的限速時(shí)間加入到隊(duì)列,添加第四個(gè)元素時(shí)開(kāi)始使用slowDelay限速時(shí)間,也就是10秒后加入到隊(duì)列,后面的元素都將以10秒的限速時(shí)間加入到隊(duì)列,直到限速周期結(jié)束。

來(lái)看一下源碼

func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
    r.failuresLock.Lock()
    defer r.failuresLock.Unlock()
    //添加一次就計(jì)數(shù)一次
    r.failures[item] = r.failures[item] + 1
    //計(jì)數(shù)小于maxFastAttempts都以fastDelay為限速時(shí)間,否則以slowDelay為限速時(shí)間
    if r.failures[item] <= r.maxFastAttempts {
        return r.fastDelay
    }
    return r.slowDelay
}

混合模式


最后一種是混合模式,可以組合使用不同的限速算法實(shí)例化限速隊(duì)列

func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
    return &MaxOfRateLimiter{limiters: limiters}
}

在k8s-client-go的源碼中可以看到,大量的接口組合運(yùn)用,將各種功能拆分成各個(gè)細(xì)小的庫(kù),是一種非常值得學(xué)習(xí)的代碼風(fēng)格以及思路。

看完上述內(nèi)容,你們對(duì)如何解析client-go中workqueue有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI