溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

如何解析k8s中的Informer機(jī)制

發(fā)布時(shí)間:2021-12-16 09:31:50 來(lái)源:億速云 閱讀:189 作者:柒染 欄目:云計(jì)算

如何解析k8s中的Informer機(jī)制,相信很多沒(méi)有經(jīng)驗(yàn)的人對(duì)此束手無(wú)策,為此本文總結(jié)了問(wèn)題出現(xiàn)的原因和解決方法,通過(guò)這篇文章希望你能解決這個(gè)問(wèn)題。

Informer機(jī)制架構(gòu)設(shè)計(jì)總覽

下面是我根據(jù)理解畫(huà)的一個(gè)數(shù)據(jù)流轉(zhuǎn)圖,從全局視角看一下數(shù)據(jù)的整體走向是怎么樣的。

其中虛線的表示的是代碼中的方法。

如何解析k8s中的Informer機(jī)制

首先講一個(gè)結(jié)論:

通過(guò)Informer機(jī)制獲取數(shù)據(jù)的情況下,在初始化的時(shí)候會(huì)從Kubernetes API Server獲取對(duì)應(yīng)Resource的全部Object,后續(xù)只會(huì)通過(guò)Watch機(jī)制接收API Server推送過(guò)來(lái)的數(shù)據(jù),不會(huì)再主動(dòng)從API Server拉取數(shù)據(jù),直接使用本地緩存中的數(shù)據(jù)以減少API Server的壓力。

Watch機(jī)制基于HTTP的Chunk實(shí)現(xiàn),維護(hù)一個(gè)長(zhǎng)連接,這是一個(gè)優(yōu)化點(diǎn),減少請(qǐng)求的數(shù)據(jù)量。第二個(gè)優(yōu)化點(diǎn)是SharedInformer,它可以讓同一種資源使用的是同一個(gè)Informer,例如v1版本的Deployment和v1beta1版本的Deployment同時(shí)存在的時(shí)候,共享一個(gè)Informer。

上面圖中可以看到Informer分為三個(gè)部分,可以理解為三大邏輯。

其中Reflector主要是把從API Server數(shù)據(jù)獲取到的數(shù)據(jù)放到DeltaFIFO隊(duì)列中,充當(dāng)生產(chǎn)者角色。

SharedInformer主要是從DeltaFIFIO隊(duì)列中獲取數(shù)據(jù)并分發(fā)數(shù)據(jù),充當(dāng)消費(fèi)者角色。

最后Indexer是作為本地緩存的存儲(chǔ)組件存在。

Reflector理解

Reflector中主要看Run、ListAndWatch、watchHandler三個(gè)地方就足夠了。

源碼位置是 tools/cache/reflector.go

// Ruvn starts a watch and handles watch events. Will restart the watch if it is closed.
// Run will exit when stopCh is closed.
//開(kāi)始時(shí)執(zhí)行Run,上一層調(diào)用的地方是 controller.go中的Run方法
func (r *Reflector) Run(stopCh <-chan struct{}) {
    
    klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
    wait.Until(func() {
         //啟動(dòng)后執(zhí)行一次ListAndWatch
        if err := r.ListAndWatch(stopCh); err != nil {
            utilruntime.HandleError(err)
        }
    }, r.period, stopCh)
}

...

// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {

// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
            // list request will return the full response.
            pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {

//這里是調(diào)用了各個(gè)資源中的ListFunc函數(shù),例如如果v1版本的Deployment
//則調(diào)用的是informers/apps/v1/deployment.go中的ListFunc
                             return r.listerWatcher.List(opts)
            }))
            if r.WatchListPageSize != 0 {
                pager.Pa1geSize = r.WatchListPageSize
            }
            // Pager falls back to full list if paginated list calls fail due to an "Expired">

數(shù)據(jù)的生產(chǎn)就結(jié)束了,就兩點(diǎn):

  1. 初始化時(shí)從API Server請(qǐng)求數(shù)據(jù)

  2. 監(jiān)聽(tīng)后續(xù)從Watch推送來(lái)的數(shù)據(jù)

DeltaFIFO理解

先看一下數(shù)據(jù)結(jié)構(gòu):

type DeltaFIFO struct {
...
    items map[string]Deltas
    queue []string
...
}

type Delta struct {
    Type   DeltaType
    Object interface{}
}

type Deltas []Delta


type DeltaType string

// Change type definition
const (
    Added   DeltaType = "Added"
    Updated DeltaType = "Updated"
    Deleted DeltaType = "Deleted"
    Sync DeltaType = "Sync"
)

其中queue存儲(chǔ)的是Object的id,而items存儲(chǔ)的是以O(shè)bjectID為key的這個(gè)Object的事件列表,

可以想象到是這樣的一個(gè)數(shù)據(jù)結(jié)構(gòu),左邊是Key,右邊是一個(gè)數(shù)組對(duì)象,其中每個(gè)元素都是由type和obj組成.

如何解析k8s中的Informer機(jī)制

DeltaFIFO顧名思義存放Delta數(shù)據(jù)的先入先出隊(duì)列,相當(dāng)于一個(gè)數(shù)據(jù)的中轉(zhuǎn)站,將數(shù)據(jù)從一個(gè)地方轉(zhuǎn)移另一個(gè)地方。

主要看的內(nèi)容是queueActionLocked、Pop、Resync

queueActionLocked方法:

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
...
    newDeltas := append(f.items[id], Delta{actionType, obj})
      //去重處理
    newDeltas = dedupDeltas(newDeltas)

    if len(newDeltas) > 0 {
        ... 
               //pop消息
          
        f.cond.Broadcast()
    ...
    return nil
}

Pop方法:

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
    f.lock.Lock()
    defer f.lock.Unlock()
    for {
        for len(f.queue) == 0 {
            //阻塞 直到調(diào)用了f.cond.Broadcast()
            f.cond.Wait()
        }
//取出第一個(gè)元素
        id := f.queue[0]
        f.queue = f.queue[1:]
        ...
        item, ok := f.items[id]
...
                delete(f.items, id)
        //這個(gè)process可以在controller.go中的processLoop()找到
        //初始化是在shared_informer.go的Run
        //最終執(zhí)行到shared_informer.go的HandleDeltas方法
        err := process(item)
        //如果處理出錯(cuò)了重新放回隊(duì)列中
        if e, ok := err.(ErrRequeue); ok {
            f.addIfNotPresent(id, item)
            err = e.Err
        }
         ...
    }
}

Resync機(jī)制:

小總結(jié):每次從本地緩存Indexer中獲取數(shù)據(jù)重新放到DeltaFIFO中執(zhí)行任務(wù)邏輯。

啟動(dòng)的Resync地方是reflector.go的resyncChan()方法,在reflector.go的ListAndWatch方法中的調(diào)用開(kāi)始定時(shí)執(zhí)行。

go func() {
               //啟動(dòng)定時(shí)任務(wù)
        resyncCh, cleanup := r.resyncChan()
        defer func() {
            cleanup() // Call the last one written into cleanup
        }()
        for {
            select {
            case <-resyncCh:
            case <-stopCh:
                return
            case <-cancelCh:
                return
            }
                        //定時(shí)執(zhí)行   調(diào)用會(huì)執(zhí)行到delta_fifo.go的Resync()方法
            if r.ShouldResync == nil || r.ShouldResync() {
                klog.V(4).Infof("%s: forcing resync", r.name)
                if err := r.store.Resync(); err != nil {
                    resyncerrc <- err
                    return
                }
            }
            cleanup()
            resyncCh, cleanup = r.resyncChan()
        }
    }()

func (f *DeltaFIFO) Resync() error {
    ...
//從緩存中獲取到所有的key
    keys := f.knownObjects.ListKeys()
    for _, k := range keys {
        if err := f.syncKeyLocked(k); err != nil {
            return err
        }
    }
    return nil

}


func (f *DeltaFIFO) syncKeyLocked(key string) error {
           //獲緩存拿到對(duì)應(yīng)的Object
        obj, exists, err := f.knownObjects.GetByKey(key)
    ...
         //放入到隊(duì)列中執(zhí)行任務(wù)邏輯
    if err := f.queueActionLocked(Sync, obj); err != nil {
        return fmt.Errorf("couldn't queue object: %v", err)
    }
    return nil
}

SharedInformer消費(fèi)消息理解

主要看HandleDeltas方法就好,消費(fèi)消息然后分發(fā)數(shù)據(jù)并且存儲(chǔ)數(shù)據(jù)到緩存的地方

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()

    // from oldest to newest
    for _, d := range obj.(Deltas) {
        
        switch d.Type {
        case Sync, Added, Updated:
            ...
            //查一下是否在Indexer緩存中 如果在緩存中就更新緩存中的對(duì)象
            if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
                if err := s.indexer.Update(d.Object); err != nil {
                    return err
                }
                //把數(shù)據(jù)分發(fā)到Listener
                s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
            } else {
                //沒(méi)有在Indexer緩存中 把對(duì)象插入到緩存中
                if err := s.indexer.Add(d.Object); err != nil {
                    return err
                }
                s.processor.distribute(addNotification{newObj: d.Object}, isSync)
            }
        ...
        }
    }
    return nil
}

Indexer理解

這塊不會(huì)講述太多內(nèi)容,因?yàn)槲艺J(rèn)為Informer機(jī)制最主要的還是前面數(shù)據(jù)的流轉(zhuǎn),當(dāng)然這并不代表數(shù)據(jù)存儲(chǔ)不重要,而是先理清楚整體的思路,后續(xù)再詳細(xì)更新存儲(chǔ)的部分。

Indexer使用的是threadsafe_store.go中的threadSafeMap存儲(chǔ)數(shù)據(jù),是一個(gè)線程安全并且?guī)в兴饕δ艿膍ap,數(shù)據(jù)只會(huì)存放在內(nèi)存中,每次涉及操作都會(huì)進(jìn)行加鎖。

// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
    lock  sync.RWMutex
    items map[string]interface{}
    indexers Indexers
    indices Indices
}

Indexer還有一個(gè)索引相關(guān)的內(nèi)容就暫時(shí)不展開(kāi)講述。

Example代碼

-------------

package main

import (
    "flag"
    "fmt"
    "path/filepath"
    "time"

    v1 "k8s.io/api/apps/v1"
    "k8s.io/apimachinery/pkg/labels"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/homedir"
)

func main() {
    var err error
    var config *rest.Config

    var kubeconfig *string

    if home := homedir.HomeDir(); home != "">

以上示例代碼中程序啟動(dòng)后會(huì)拉取一次Deployment數(shù)據(jù),并且拉取數(shù)據(jù)完成后從本地緩存中List一次default命名空間的Deployment資源并打印,然后每60秒Resync一次Deployment資源。

QA


為什么需要Resync?

在本周有同學(xué)提出一個(gè),我看到這個(gè)問(wèn)題后也感覺(jué)挺奇怪的,因?yàn)镽esync是從本地緩存的數(shù)據(jù)緩存到本地緩存(從開(kāi)始到結(jié)束來(lái)說(shuō)是這樣),為什么需要把數(shù)據(jù)拿出來(lái)又走一遍流程呢?當(dāng)時(shí)鉆牛角尖也是想不明白,后來(lái)?yè)Q個(gè)角度想就知道了。

數(shù)據(jù)從API Server過(guò)來(lái)并且經(jīng)過(guò)處理后放到緩存中,但數(shù)據(jù)并不一定就可以正常處理,也就是說(shuō)可能報(bào)錯(cuò)了,而這個(gè)Resync相當(dāng)于一個(gè)重試的機(jī)制。

可以嘗試實(shí)踐一下: 部署有狀態(tài)服務(wù),存儲(chǔ)使用LocalPV(也可以換成自己熟悉的),這時(shí)候pod會(huì)由于存儲(chǔ)目錄不存在而啟動(dòng)失敗. 然后在pod啟動(dòng)失敗后再創(chuàng)建好對(duì)應(yīng)的目錄,過(guò)一會(huì)pod就啟動(dòng)成功了。

看完上述內(nèi)容,你們掌握如何解析k8s中的Informer機(jī)制的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI