溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

apiserver的list-watch怎么使用

發(fā)布時(shí)間:2021-12-18 15:30:31 來源:億速云 閱讀:190 作者:iii 欄目:云計(jì)算

本篇內(nèi)容主要講解“apiserver的list-watch怎么使用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“apiserver的list-watch怎么使用”吧!

0. list-watch的需求

apiserver的list-watch怎么使用

上圖是一個(gè)典型的Pod創(chuàng)建過程,在這個(gè)過程中,每次當(dāng)kubectl創(chuàng)建了ReplicaSet對(duì)象后,controller-manager都是通過list-watch這種方式得到了最新的ReplicaSet對(duì)象,并執(zhí)行自己的邏輯來創(chuàng)建Pod對(duì)象。其他的幾個(gè)組件,Scheduler/Kubelet也是一樣,通過list-watch得知變化并進(jìn)行處理。這是組件的處理端代碼:

c.NodeLister.Store, c.nodePopulator = framework.NewInformer(
        c.createNodeLW(),                                            ...(1)
		&api.Node{},                                                 ...(2)
		0,                                                           ...(3)
		framework.ResourceEventHandlerFuncs{                         ...(4)
			AddFunc:    c.addNodeToCache,                            ...(5)
			UpdateFunc: c.updateNodeInCache,
			DeleteFunc: c.deleteNodeFromCache,
		},
)

其中(1)是list-watch函數(shù),(4)(5)則是相應(yīng)事件觸發(fā)操作的入口。

list-watch操作需要做這么幾件事:

  1. 由組件向apiserver而不是etcd發(fā)起watch請(qǐng)求,在組件啟動(dòng)時(shí)就進(jìn)行訂閱,告訴apiserver需要知道什么數(shù)據(jù)發(fā)生變化。Watch是一個(gè)典型的發(fā)布-訂閱模式。

  2. 組件向apiserver發(fā)起的watch請(qǐng)求是可以帶條件的,例如,scheduler想要watch的是所有未被調(diào)度的Pod,也就是滿足Pod.destNode=""的Pod來進(jìn)行調(diào)度操作;而kubelet只關(guān)心自己節(jié)點(diǎn)上的Pod列表。apiserver向etcd發(fā)起的watch是沒有條件的,只能知道某個(gè)數(shù)據(jù)發(fā)生了變化或創(chuàng)建、刪除,但不能過濾具體的值。也就是說對(duì)象數(shù)據(jù)的條件過濾必須在apiserver端而不是etcd端完成。

  3. list是watch失敗,數(shù)據(jù)太過陳舊后的彌補(bǔ)手段,這方面詳見 基于list-watch的Kubernetes異步事件處理框架詳解-客戶端部分。list本身是一個(gè)簡(jiǎn)單的列表操作,和其它apiserver的增刪改操作一樣,不再多描述細(xì)節(jié)。

1. watch的API處理

既然watch本身是一個(gè)apiserver提供的http restful的API,那么就按照API的方式去閱讀它的代碼,按照apiserver的基礎(chǔ)功能實(shí)現(xiàn)一文所描述,我們來看它的代碼,

  • 關(guān)鍵的處理API注冊(cè)代碼pkg/apiserver/api_installer.go

func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage,...

...
    lister, isLister := storage.(rest.Lister)
	watcher, isWatcher := storage.(rest.Watcher)                     ...(1)
...    
		case "LIST": // List all resources of a kind.                ...(2)
			doc := "list objects of kind " + kind
			if hasSubresource {
				doc = "list " + subresource + " of objects of kind " + kind
			}
			handler := metrics.InstrumentRouteFunc(action.Verb, resource, ListResource(lister, watcher, reqScope, false, a.minRequestTimeout)) ...(3)
  1. 一個(gè)rest.Storage對(duì)象會(huì)被轉(zhuǎn)換為watcherlister對(duì)象

  2. 提供list和watch服務(wù)的入口是同一個(gè),在API接口中是通過 GET /pods?watch=true 這種方式來區(qū)分是list還是watch

  3. API處理函數(shù)是由listerwatcher經(jīng)過ListResource()合體后完成的。

  • 那么就看看ListResource()的具體實(shí)現(xiàn)吧,/pkg/apiserver/resthandler.go

func ListResource(r rest.Lister, rw rest.Watcher,... {
...
		if (opts.Watch || forceWatch) && rw != nil {
			watcher, err := rw.Watch(ctx, &opts)           ...(1)
            ....
			serveWatch(watcher, scope, req, res, timeout)
			return
		}
		result, err := r.List(ctx, &opts)                  ...(2)           
		write(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request)
  1. 每次有一個(gè)watch的url請(qǐng)求過來,都會(huì)調(diào)用rw.Watch()創(chuàng)建一個(gè)watcher,好吧這里的名字和上面那一層的名字重復(fù)了,但我們可以區(qū)分開,然后使用serveWatch()來處理這個(gè)請(qǐng)求。watcher的生命周期是每個(gè)http請(qǐng)求的,這一點(diǎn)非常重要。

  2. list在這里是另外一個(gè)分支,和watch分別處理,可以忽略。

  • 響應(yīng)http請(qǐng)求的過程serveWatch()的代碼在/pkg/apiserver/watch.go里面

func serveWatch(watcher watch.Interface... {
	server.ServeHTTP(res.ResponseWriter, req.Request)
}

func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
	for {
		select {
		case event, ok := <-s.watching.ResultChan():

			obj := event.Object

			if err := s.embeddedEncoder.EncodeToStream(obj, buf); 
...
}

這段的操作基本毫無技術(shù)含量,就是從watcher的結(jié)果channel中讀取一個(gè)event對(duì)象,然后持續(xù)不斷的編碼寫入到http response的流當(dāng)中。

  • 這是整個(gè)過程的圖形化描述:

apiserver的list-watch怎么使用

所以,我們的問題就回到了

  1. watcher這個(gè)對(duì)象,嚴(yán)格來說是watch.Interface的對(duì)象,位置在pkg/watch/watch.go中,是怎么被創(chuàng)建出來的?

  2. 這個(gè)watcher對(duì)象是怎么從etcd中獲得變化的數(shù)據(jù)的?又是怎么過濾條件的?

2. 在代碼迷宮中追尋watcher

回到上面的代碼追蹤過程來看,watcher(watch.Interface)對(duì)象是被Rest.Storage對(duì)象創(chuàng)建出來的。從上一篇apiserver的基礎(chǔ)功能實(shí)現(xiàn) 可以知道,所有的Rest.Storage分兩層,一層是每個(gè)對(duì)象自己的邏輯,另一層則是通過通用的操作來搞定,像watch這樣的操作應(yīng)該是通用的,所以我們看這個(gè)源代碼

  • /pkg/registry/generic/registry/store.go

func (e *Store) Watch(ctx api.Context, options *api.ListOptions) (watch.Interface, error) {
...
	return e.WatchPredicate(ctx, e.PredicateFunc(label, field), resourceVersion)
}

func (e *Store) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) {

			return e.Storage.Watch(ctx, key, resourceVersion, filterFunc)   ...(1)

	return e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), resourceVersion, filterFunc)
}

果然,我們?cè)?1)這里找到了生成Watch的函數(shù),但這個(gè)工作是由e.Storage來完成的,所以我們需要找一個(gè)具體的Storage的生成過程,以Pod為例子

  • /pkg/registry/pod/etcd/etcd.go

func NewStorage(opts generic.RESTOptions, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper) PodStorage {
	prefix := "/pods"

	storageInterface := opts.Decorator(
		opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Pods), &api.Pod{}, prefix, pod.Strategy, newListFunc)                                        ...(1)

	store := &registry.Store{
        ...
		Storage: storageInterface,                                        ...(2)
	}
	return PodStorage{
		Pod:         &REST{store, proxyTransport},                        ...(3)

這(1)就是Storage的生成現(xiàn)場(chǎng),傳入的參數(shù)包括了一個(gè)緩存Pod的數(shù)量。(2)(3)是和上面代碼的連接點(diǎn)。那么現(xiàn)在問題就轉(zhuǎn)化為追尋Decorator這個(gè)東西具體是怎么生成的,需要重復(fù)剛才的過程,往上搜索opts是怎么搞進(jìn)來的。

  • /pkg/master/master.go - GetRESTOptionsOrDie()

  • /pkg/genericapiserver/genericapiserver.go - StorageDecorator()

  • /pkg/registry/generic/registry/storage_factory.go - StorageWithCacher()

  • /pkg/storage/cacher.go

OK,這樣我們就來到正題,一個(gè)具體的watch緩存的實(shí)現(xiàn)了!

把上面這個(gè)過程用一幅圖表示:

apiserver的list-watch怎么使用

3. watch緩存的具體實(shí)現(xiàn)

看代碼,首要看的是數(shù)據(jù)結(jié)構(gòu),以及考慮這個(gè)數(shù)據(jù)結(jié)構(gòu)和需要解決的問題之間的關(guān)系。

3.1 Cacher(pkg/storage/cacher.go)

對(duì)于cacher這結(jié)構(gòu)來說,我們從外看需求,可以知道這是一個(gè)Storage,用于提供某個(gè)類型的數(shù)據(jù),例如Pod的增刪改查請(qǐng)求,同時(shí)它又用于watch,用于在client端需要對(duì)某個(gè)key的變化感興趣時(shí),創(chuàng)建一個(gè)watcher來源源不斷的提供新的數(shù)據(jù)給客戶端。

那么cacher是怎么滿足這些需求的呢?答案就在它的結(jié)構(gòu)里面:

type Cacher struct {
	// Underlying storage.Interface.
	storage Interface

	// "sliding window" of recent changes of objects and the current state.
	watchCache *watchCache
	reflector  *cache.Reflector

	// Registered watchers.
	watcherIdx int
	watchers   map[int]*cacheWatcher
}

略去里面的鎖(在看代碼的時(shí)候一開始要忽略鎖的存在,鎖是后期為了避免破壞數(shù)據(jù)再加上去的,不影響數(shù)據(jù)流),略去里面的一些非關(guān)鍵的成員,現(xiàn)在我們剩下這3段重要的成員,其中

  • storage是連接etcd的,也就是背后的裸存儲(chǔ)

  • watchCache并不僅僅是和注釋里面說的那樣,是個(gè)滑動(dòng)窗口,里面存儲(chǔ)了所有數(shù)據(jù)+滑動(dòng)窗口

  • watchers這是為每個(gè)請(qǐng)求創(chuàng)建的struct,每個(gè)watch的client上來后都會(huì)被創(chuàng)建一個(gè),所以這里有個(gè)map

當(dāng)然,這3個(gè)成員的作用是我看了所有代碼后,總結(jié)出來的,一開始讀代碼時(shí)不妨先在腦子里面有個(gè)定位,然后在看下面的方法時(shí)不斷修正這個(gè)定位。那么,接下來就看看具體的方法是怎么讓數(shù)據(jù)在這些結(jié)構(gòu)里面流動(dòng)的吧!

  • 初始化方法

func NewCacherFromConfig(config CacherConfig) *Cacher { 
...
					cacher.startCaching(stopCh)
}

func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
...
	if err := c.reflector.ListAndWatch(stopChannel); err != nil {
		glog.Errorf("unexpected ListAndWatch error: %v", err)
	}
}

其他的部分都是陳詞濫調(diào),只有startCaching()這段有點(diǎn)意思,這里啟動(dòng)一個(gè)go協(xié)程,最后啟動(dòng)了c.reflector.ListAndWatch()這個(gè)方法,如果對(duì)k8s的基本有了解的話,這個(gè)其實(shí)就是一個(gè)把遠(yuǎn)端數(shù)據(jù)源源不斷的同步到本地的方法,那么數(shù)據(jù)落在什么地方呢?往上看可以看到

reflector:  cache.NewReflector(listerWatcher, config.Type, watchCache, 0),

也就是說從創(chuàng)建cacher的實(shí)例開始,就會(huì)從etcd中把所有Pod的數(shù)據(jù)同步到watchCache里面來。這也就印證了watchCache是數(shù)據(jù)從etcd過來的第一站。

apiserver的list-watch怎么使用

  • 增刪改方法

func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
	return c.storage.Create(ctx, key, obj, out, ttl)
}

大部分方法都很無聊,就是短路到底層的storage直接執(zhí)行。

  • Watch方法

// Implements storage.Interface.
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error) {
	
	initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)

	watcher := newCacheWatcher(watchRV, initEvents, filterFunction(key, c.keyFunc, filter), forgetWatcher(c, c.watcherIdx))
	c.watchers[c.watcherIdx] = watcher
	c.watcherIdx++
	return watcher, nil
}

這里的邏輯就比較清晰,首先從watchCache中拿到從某個(gè)resourceVersion以來的所有數(shù)據(jù)——initEvents,然后用這個(gè)數(shù)據(jù)創(chuàng)建了一個(gè)watcher返回出去為某個(gè)客戶端提供服務(wù)。

  • List方法

// Implements storage.Interface.
func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, filter FilterFunc, listObj runtime.Object) error {

	filterFunc := filterFunction(key, c.keyFunc, filter)

	objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV)
	if err != nil {
		return fmt.Errorf("failed to wait for fresh list: %v", err)
	}
	for _, obj := range objs {
		if filterFunc(object) {
			listVal.Set(reflect.Append(listVal, reflect.ValueOf(object).Elem()))
		}
	}

}

從這段代碼中我們可以看出2件事,一是list的數(shù)據(jù)都是從watchCache中獲取的,二是獲取后通過filterFunc過濾了一遍然后返回出去。

apiserver的list-watch怎么使用

3.2 WatchCache(pkg/storage/watch_cache.go)

這個(gè)結(jié)構(gòu)應(yīng)該是緩存的核心結(jié)構(gòu),從上一層的代碼分析中我們已經(jīng)知道了對(duì)這個(gè)結(jié)構(gòu)的需求,包括存儲(chǔ)所有這個(gè)類型的數(shù)據(jù),包括當(dāng)有新的數(shù)據(jù)過來時(shí)把數(shù)據(jù)扔到cacheWatcher里面去,總之,提供List和Watch兩大輸出。

type watchCache struct {
	// cache is used a cyclic buffer - its first element (with the smallest
	// resourceVersion) is defined by startIndex, its last element is defined
	// by endIndex (if cache is full it will be startIndex + capacity).
	// Both startIndex and endIndex can be greater than buffer capacity -
	// you should always apply modulo capacity to get an index in cache array.
	cache      []watchCacheElement
	startIndex int
	endIndex   int

	// store will effectively support LIST operation from the "end of cache
	// history" i.e. from the moment just after the newest cached watched event.
	// It is necessary to effectively allow clients to start watching at now.
	store cache.Store
}

這里的關(guān)鍵數(shù)據(jù)結(jié)構(gòu)依然是2個(gè)

  • cache 環(huán)形隊(duì)列,存儲(chǔ)有限個(gè)數(shù)的最新數(shù)據(jù)

  • store 底層實(shí)際上是個(gè)線程安全的hashMap,存儲(chǔ)全量數(shù)據(jù)

那么繼續(xù)看看方法是怎么運(yùn)轉(zhuǎn)的吧~

  • 增刪改方法

func (w *watchCache) Update(obj interface{}) error {
	event := watch.Event{Type: watch.Modified, Object: object}
	f := func(obj runtime.Object) error { return w.store.Update(obj) }
	return w.processEvent(event, resourceVersion, f)
}


func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(runtime.Object) error) error {

	previous, exists, err := w.store.Get(event.Object)
	watchCacheEvent := watchCacheEvent{event.Type, event.Object, prevObject, resourceVersion}
		w.onEvent(watchCacheEvent)
	w.updateCache(resourceVersion, watchCacheEvent)

}

// Assumes that lock is already held for write.
func (w *watchCache) updateCache(resourceVersion uint64, event watchCacheEvent) {
	w.cache[w.endIndex%w.capacity] = watchCacheElement{resourceVersion, event}
	w.endIndex++
}

所有的增刪改方法做的事情都差不多,就是在store里面存具體的數(shù)據(jù),然后調(diào)用processEvent()去增加環(huán)形隊(duì)列里面的數(shù)據(jù),如果詳細(xì)看一下onEvent的操作,就會(huì)發(fā)現(xiàn)這個(gè)操作的本質(zhì)是落在cacher.go里面:

func (c *Cacher) processEvent(event watchCacheEvent) {
	for _, watcher := range c.watchers {
		watcher.add(event)
	}
}

往所有的watcher里面挨個(gè)添加數(shù)據(jù)??傮w來說,我們可以從上面的代碼中得出一個(gè)結(jié)論:cache里面存儲(chǔ)的是Event,也就是有prevObject的,對(duì)于所有操作都會(huì)在cache里面保存,但對(duì)于store來說,只存儲(chǔ)當(dāng)下的數(shù)據(jù),刪了就刪了,改了就改了。

apiserver的list-watch怎么使用

  • WaitUntilFreshAndList()

這里本來應(yīng)該討論List()方法的,但在cacher里面的List()實(shí)際上使用的是這個(gè),所以我們看這個(gè)方法。

func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64) ([]interface{}, uint64, error) {
	startTime := w.clock.Now()
	go func() {
		w.cond.Broadcast()
	}()

	for w.resourceVersion < resourceVersion {
		w.cond.Wait()
	}
	return w.store.List(), w.resourceVersion, nil
}

這個(gè)方法比較繞,前面使用了一堆cond通知來和其他協(xié)程通信,最后還是調(diào)用了store.List()把數(shù)據(jù)返回出去。后面來具體分析這里的協(xié)調(diào)機(jī)制。

  • GetAllEventsSinceThreadUnsafe()

這個(gè)方法在cacher的創(chuàng)建cacheWatcher里面使用,把當(dāng)前store里面的所有數(shù)據(jù)都搞出來,然后把store里面的數(shù)據(jù)都轉(zhuǎn)換為AddEvent,配上cache里面的Event,全部返回出去。

3.3 CacheWatcher(pkg/storage/cacher.go)

這個(gè)結(jié)構(gòu)是每個(gè)watch的client都會(huì)擁有一個(gè)的,從上面的分析中我們也能得出這個(gè)結(jié)構(gòu)的需求,就是從watchCache里面搞一些數(shù)據(jù),然后寫到客戶端那邊。

// cacherWatch implements watch.Interface
type cacheWatcher struct {
	sync.Mutex
	input   chan watchCacheEvent
	result  chan watch.Event
	filter  FilterFunc
	stopped bool
	forget  func(bool)
}

這段代碼比較簡(jiǎn)單,就不去分析方法了,簡(jiǎn)單說就是數(shù)據(jù)在增加的時(shí)候放到input這個(gè)channel里面去,通過filter然后輸出到result這個(gè)channel里面去。

到此,相信大家對(duì)“apiserver的list-watch怎么使用”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI