溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何通過源碼分析Informer機制

發(fā)布時間:2021-10-12 14:17:41 來源:億速云 閱讀:168 作者:柒染 欄目:云計算

本篇文章給大家分享的是有關(guān)如何通過源碼分析Informer機制,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

通過源碼分析Informer機制

在之前的文章kubernetes之面向終態(tài)設(shè)計與控制器中了解主要了解了控制器通過ListAndWatch方式實現(xiàn)終態(tài)架構(gòu)。而并未分析內(nèi)部實現(xiàn)的細節(jié),故本章主要分析內(nèi)部實現(xiàn)的細節(jié)。

參考項目為官方的sample-controller

其實,每個Kubernetes擴展組件,都可以看做是一個控制器,用于處理自己關(guān)心的資源類型(API對象)。而要把自己關(guān)系的資源和自己的控制器關(guān)聯(lián)起來,則是通過Informer機制,可稱為“通知器”,而Informer與API對象都是一一對應(yīng)的,因此可實現(xiàn)我們創(chuàng)建一個自定義資源,我們的控制器就可以通過該Informer機制

如何通過源碼分析Informer機制

官方架構(gòu)圖

如何通過源碼分析Informer機制

通過上圖,可知一個Kubernetes的擴展組件主要分為兩大部分:client-go組件和自定義的Controller組件。

  • client-go:

    • Reflector:主要對接Kubernetes的APIServer,依托ListWatch實現(xiàn)數(shù)據(jù)從ETCD中同步到本地緩存(Delta Fifo queue)中。

    • Informer:用于把本地緩存的數(shù)據(jù)構(gòu)建索引及調(diào)用事先注冊好的ResourceEventHandler。

    • Indexer:用于構(gòu)建索引,底層采用一個線程安全的Map存儲。每個資源默認的Key為<namespace>/<name>。

  • Custom Controller

    • Workqueue是一個去重隊列,內(nèi)部除了items列表外還帶有processing和dirty set記錄.

    • 同一個資源對象的多次事件觸發(fā),入隊列后會去重;

    • 同一個資源對象不會被多個worker同時處理。詳細可見Learning Concurrent Reconciling controller對資源對象的查詢都應(yīng)該從Informer中查cache,而不是直接調(diào)用kube-apiserver查詢。

    • Informer reference : 編寫自定義Controller時,需要創(chuàng)建一個關(guān)注自已資源的Informer對象。

    • ResourceEventHandler: 用于注冊相關(guān)的事件,待有數(shù)據(jù)時,Informer會進行相關(guān)的回調(diào)。

    • ProcessItem: 通過Workqueue去數(shù)據(jù),并通過下發(fā)給Handler進行處理。

    • Workqueue:工作隊列工具類,每個controller都需要有一個工作隊列。從event handler觸發(fā)的事件會先放入工作隊列,然后由controller的ProcessItem取出來處理。

整體執(zhí)行流程

  • Client-go

    • HandleDeltas中會進行調(diào)用Indexer進行索引構(gòu)建,并最終存儲在本地的一個線程安全的Map中

    • 之后,會進行該事件的分發(fā),通知所有的listener進行調(diào)用用戶注冊好的ResourceEventHandler進行處理。

    • Reflector首先通過List進行全量數(shù)據(jù)同步,由ETCD到本地的Delta Fifo queue中。Reflector是最終和Kubernetes APIServer建立連接的。

    • Reflector其次再通過最新的ResourceVersion進行Watch數(shù)據(jù),此時若有未同步到的數(shù)據(jù),將進行補齊(因List完成之后,可能存在新數(shù)據(jù)的增加,因此可能存在遺漏)。

    • 啟動自定義控制器時,通過Informer調(diào)用Reflector執(zhí)行List&Watch進行數(shù)據(jù)同步及注冊觀察事件。

    • 當用戶創(chuàng)建了一個自定義資源時,會被Reflector的Watch觀察到,并放入本地的Delta緩存中。

    • Informer通過chche中的Controller定時(1s)調(diào)用processLoop方法,并Pop出隊列(Delta)中的數(shù)據(jù),交給Informer的HandleDeltas處理;

  • Custom Controller

    • 自定的ResourceEventHandler中會進行相關(guān)過濾處理,并最終加入到Workqueue中,該工作隊列只存儲KEY,不會存儲具體對象

    • 一旦加入后,ProcessItem方案就會Pop出數(shù)據(jù),并交給Handle Object方法進行處理

    • Handle Object會根據(jù)Key調(diào)用Indexer reference獲取到具體對象,并開始處理業(yè)務(wù)。注意:這里一定要通過緩存去數(shù)據(jù),不要去直接調(diào)用Kubernetes的API,否則會影響性能。

原生Controller代碼分析

初始化部分
  • main方法中初始化相關(guān)結(jié)構(gòu)及啟動Informer

	//https://github.com/kubernetes/sample-controller/blob/master/main.go#L62
	
	kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
	exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)

	controller := NewController(kubeClient, exampleClient,
		kubeInformerFactory.Apps().V1().Deployments(),
		exampleInformerFactory.Samplecontroller().V1alpha1().Foos())

	// notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
	// Start method is non-blocking and runs all registered informers in a dedicated goroutine.
	kubeInformerFactory.Start(stopCh)
	exampleInformerFactory.Start(stopCh)

	if err = controller.Run(2, stopCh); err != nil {
		klog.Fatalf("Error running controller: %s", err.Error())
	}

以上的代碼涵蓋兩部分:

  • Client-go部分,構(gòu)建SharedInformerFactory并啟動,實現(xiàn)ListAndWatch,第一部分解析

  • 自定義Controller部分,內(nèi)部主要業(yè)務(wù)為等待事件,并做響應(yīng),第二部分解析

client-go代碼分析

用戶通過構(gòu)建Informer并啟動后,就會進入到client-go內(nèi)部的Informer內(nèi),主要邏輯如下

  • 調(diào)用Start方法初始化請求的Informer

// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
	f.lock.Lock()
	defer f.lock.Unlock()

	for informerType, informer := range f.informers {
		if !f.startedInformers[informerType] {
			go informer.Run(stopCh)
			f.startedInformers[informerType] = true
		}
	}
}
  • 調(diào)用Run正式開始啟動相關(guān)業(yè)務(wù)

//client-go@v2.0.0-alpha.0.0.20180910083459-2cefa64ff137+incompatible/tools/cache/shared_informer.go:189
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()

	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

	cfg := &Config{
		//注冊DeltaFIFO隊列
		Queue:            fifo,
		//注冊listerWatcher,后續(xù)會和APIServer建立連接
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		FullResyncPeriod: s.resyncCheckPeriod,
		RetryOnError:     false,
		//檢查是否需要進行Resync,該方法會把需要Resync的listener加入到需要同步的隊列中
		ShouldResync:     s.processor.shouldResync,
		//這里先注冊用于構(gòu)建索引和分發(fā)事件的方法
		Process: s.HandleDeltas,
	}
	......
	//啟動用于緩存比較的方法
	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
	//啟動用于接收事件消息并回調(diào)用戶注冊的ResourceEventHandler
	wg.StartWithChannel(processorStopCh, s.processor.run)
	......
	//運行內(nèi)部的Controller
	s.controller.Run(stopCh)
}
  • 啟動內(nèi)置的Controller

// Run begins processing items, and will continue until a value is sent down stopCh.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) {
	......
   //調(diào)用reflector中的Run,進行啟動ListAndWatch,同APIServer建立連接
   //client-go@v2.0.0-alpha.0.0.20180910083459-2cefa64ff137+incompatible/tools/cache/shared_informer.go:219
   wg.StartWithChannel(stopCh, r.Run)
	//啟動定時器,每秒運行一次,用于調(diào)用processLoop進行讀取數(shù)據(jù)
	wait.Until(c.processLoop, time.Second, stopCh)
}
  • ListAndWatch,實現(xiàn)同APIServer建立連接

    	//k8s.io/client-go@v0.0.0-20190918200256-06eb1244587a/tools/cache/reflector.go:121
    	func (r *Reflector) Run(stopCh <-chan struct{}) {
    		wait.Until(func() {
    			if err := r.ListAndWatch(stopCh); err != nil {
    				utilruntime.HandleError(err)
    			}
    		}, r.period, stopCh)
    	}

     

    	//k8s.io/client-go@v0.0.0-20190918200256-06eb1244587a/tools/cache/reflector.go:159
    	// ListAndWatch first lists all items and get the resource version at the moment of call,
    	// and then use the resource version to watch.
    	// It returns error if ListAndWatch didn't even try to initialize watch.
    	func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    		......
    		// Explicitly set "0" as resource version - it's fine for the List()
    		// to be served from cache and potentially be delayed relative to
    		// etcd contents. Reflector framework will catch up via Watch() eventually.
    		options := metav1.ListOptions{ResourceVersion: "0"}
    
    		if err := func() error {
    			......
    			go func() {
    				.....
    				// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
    				// list request will return the full response.
    				pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
    					//這里是真正同Kubernetes連接并獲取數(shù)據(jù)的地方
    					return r.listerWatcher.List(opts)
    				}))
    				......
    				//這里會執(zhí)行l(wèi)ist,獲取數(shù)據(jù)
    				list, err = pager.List(context.Background(), options)
    				close(listCh)
    			}()
    			......
    			//解析數(shù)據(jù)類型
    			listMetaInterface, err := meta.ListAccessor(list)
    			......
    
    			resourceVersion = listMetaInterface.GetResourceVersion()
    			//抽取list中的數(shù)據(jù)
    			items, err := meta.ExtractList(list)
    			......
    			//通過list到的數(shù)據(jù)進行數(shù)據(jù)全量本地隊列(Delta FIFO Queue)數(shù)據(jù)替換
    			if err := r.syncWith(items, resourceVersion); err != nil {
    				return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
    			}
    			initTrace.Step("SyncWith done")
    			r.setLastSyncResourceVersion(resourceVersion)
    			initTrace.Step("Resource version updated")
    			return nil
    		}(); err != nil {
    			return err
    		}
    
    		......
    
    		for {
    			......
    			//獲取Watch對象
    			w, err := r.listerWatcher.Watch(options)
    			......
    			//開始處理Watch到的數(shù)據(jù)
    			if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
    				......
    			}
    		}
    	}

     

    	//k8s.io/client-go@v0.0.0-20190918200256-06eb1244587a/tools/cache/reflector.go:319
    	// watchHandler watches w and keeps *resourceVersion up to date.
    	func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
    	......
    
    	loop:
    		for {
    			select {
    			case <-stopCh:
    				return errorStopRequested
    			case err := <-errc:
    				return err
    			case event, ok := <-w.ResultChan(): //這里當Watch到數(shù)據(jù)后,會觸發(fā)該CHANEL
    				......
    				// 通過得到的事件對象,訪問到具體的數(shù)據(jù)
    				meta, err := meta.Accessor(event.Object)
    				......
    				newResourceVersion := meta.GetResourceVersion()
    				//根據(jù)獲取到的事件類型,觸發(fā)相應(yīng)動作
    				switch event.Type {
    				case watch.Added:
    					err := r.store.Add(event.Object)
    					......
    				case watch.Modified:
    					err := r.store.Update(event.Object)
    					......
    				case watch.Deleted:
    					// TODO: Will any consumers need access to the "last known
    					// state", which is passed in event.Object? If so, may need
    					// to change this.
    					err := r.store.Delete(event.Object)
    					......
    				case watch.Bookmark:
    					// A `Bookmark` means watch has synced here, just update the resourceVersion
    				default:
    					utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
    				}
    				*resourceVersion = newResourceVersion
    				//設(shè)置最新需要Watch的版本
    				r.setLastSyncResourceVersion(newResourceVersion)
    				eventCount++
    			}
    		}
    		......
    	}


    通過以上步驟,實現(xiàn)了Kubernetes存儲在ETCD中的數(shù)據(jù)到Controller本地緩存中的過程。接下來就需要對存儲在Delta FIFO Queue中的數(shù)據(jù)進行處理的過程。

    • 處理Watch到的數(shù)據(jù)

    • 執(zhí)行List和Watch邏輯

    • 定時調(diào)用ListAndWatch

  • 處理Delta中的數(shù)據(jù),建立索引及分發(fā)事件

    	func (c *controller) processLoop() {
    		for {
    			//讀取Delta中的數(shù)據(jù)并調(diào)用之前設(shè)置好的方法HandleDelta,進行業(yè)務(wù)處理
    			//vendor/k8s.io/client-go/tools/cache/controller.go:150
    			obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
    			......
    		}
    	}

     

    	//client-go@v2.0.0-alpha.0.0.20180910083459-2cefa64ff137+incompatible/tools/cache/shared_informer.go:344
    
    	func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    		s.blockDeltas.Lock()
    		defer s.blockDeltas.Unlock()
    
    		// from oldest to newest
    		for _, d := range obj.(Deltas) {
    			switch d.Type {
    			//根據(jù)事件的類型進行相關(guān)的事件分類下發(fā)
    			case Sync, Added, Updated:
    				isSync := d.Type == Sync
    				s.cacheMutationDetector.AddObject(d.Object)
    
    				//到索引中先查詢是否有數(shù)據(jù)
    				if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
    					//若存在數(shù)據(jù),則更新索引數(shù)據(jù)
    					if err := s.indexer.Update(d.Object); err != nil {
    						return err
    					}
    					//給listener分發(fā)更新事件
    					s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
    				} else {
    					//若沒有數(shù)據(jù),則直接添加新數(shù)據(jù)到索引中去
    					if err := s.indexer.Add(d.Object); err != nil {
    						return err
    					}
    					//給listener分發(fā)添加事件
    					s.processor.distribute(addNotification{newObj: d.Object}, isSync)
    				}
    			case Deleted:
    				//若是刪除類型,則先刪除索引
    				if err := s.indexer.Delete(d.Object); err != nil {
    					return err
    				}
    				//給listener分發(fā)刪除事件
    				s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
    			}
    		}
    		return nil
    	}


    以上需要注意一點,關(guān)于最后的刪除事件:若是一個刪除事件,在之前已經(jīng)刪除了索引中的數(shù)據(jù)了,因此無法再在自定義的Controller中,獲取到該數(shù)據(jù)的內(nèi)容了。因此雖然得到了刪除事件通知,但是卻無法通過該Key,查詢到事件內(nèi)容。因此當我們需要在刪除時,需要處理該數(shù)據(jù)的話,應(yīng)該添加finalizer阻止提前刪除,待處理完畢后,在刪除該標記即可。

    	//k8s.io/client-go@v0.0.0-20190918200256-06eb1244587a/tools/cache/shared_informer.go:453
    	func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
    		......
    	   //這里通過分發(fā)到相應(yīng)的listener中
    		if sync {
    			for _, listener := range p.syncingListeners {
    				listener.add(obj)
    			}
    		} else {
    			for _, listener := range p.listeners {
    				listener.add(obj)
    			}
    		}
    	}
    	//觸發(fā)add的CHANEL,實現(xiàn)對接到用戶定義的ResourceEventHandler中
    	func (p *processorListener) add(notification interface{}) {
    		p.addCh <- notification
    	}


    這里需要注意,若要觸發(fā)用戶定義的ResourceEventHandler,則需要先讓用戶注冊才行。故以下代碼是用戶注冊ResourceEventHandler的部分。

    	//https://github.com/kubernetes/sample-controller/blob/master/controller.go#L116
    	//這里調(diào)用Informer的AddEventHandler方法進行注冊ResourceEventHandler,并添加入隊方式
    	fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    		AddFunc: controller.enqueueFoo,
    		UpdateFunc: func(old, new interface{}) {
    			//這里向工作隊列中加入數(shù)據(jù),同時這里可以做一些過濾操作
    			controller.enqueueFoo(new)
    		},
    	})
    	//加入到工作隊列中
    	func (c *Controller) enqueueFoo(obj interface{}) {
    		......
    		//把收到的Key加入到工作隊列中
    		c.workqueue.Add(key)
    	}

     

    	//k8s.io/client-go@v0.0.0-20190918200256-06eb1244587a/tools/cache/shared_informer.go:326
    	func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
    		s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
    	}
    
    	//k8s.io/client-go@v0.0.0-20190918200256-06eb1244587a/tools/cache/shared_informer.go:347
    
    	func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
    		......
    		//構(gòu)建ProcessListener對象
    		listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
    
    		if !s.started {
    			//注冊listener
    			s.processor.addListener(listener)
    			return
    		}
    		......
    
    		//若已經(jīng)處于啟動狀態(tài)下,則還需要添加事件消息給該listener,用于及時處理消息
    		s.processor.addListener(listener)
    		for _, item := range s.indexer.List() {
    			listener.add(addNotification{newObj: item})
    		}
    	}
    
    	//k8s.io/client-go@v0.0.0-20190918200256-06eb1244587a/tools/cache/shared_informer.go:437
    	func (p *sharedProcessor) addListener(listener *processorListener) {
    		......
    		if p.listenersStarted {
    			//運行l(wèi)istener開始處理收到的數(shù)據(jù),比如回調(diào)用戶定義的EventHandler
    			// 定時調(diào)用用戶的Handler進行處理
    			p.wg.Start(listener.run)
    			p.wg.Start(listener.pop)
    		}
    	}


    • 添加用戶的事件,并開始處理收到的數(shù)據(jù)

    • 自定義控制器中注冊ResourceEventHandler

    • 事件分發(fā)

    • HandleDelta對讀取到的事件進行處理

    • 通過processLoop讀取Delta中的數(shù)據(jù)

Custom Controller 代碼分析

以上client-go處理完畢后,會把數(shù)據(jù)通過用戶注冊的ResourceEventHandler調(diào)用相應(yīng)的方法。通過自定義的ResourceEventHandler進行預(yù)處理,并加入到工作隊列(這里不建議處理復(fù)雜邏輯,因為一旦該方法阻塞,會導致相應(yīng)的鏈路阻塞,而應(yīng)該把需要處理的事件放入到工作列表中,通過用戶側(cè)的協(xié)程進行處理)。

  • 自定義Controller定義定時器執(zhí)行業(yè)務(wù)

//https://github.com/kubernetes/sample-controller/blob/7e92736cc38f37632d2b53e31b9a966e7a91c24a/controller.go#L150
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
	
	......
	// Wait for the caches to be synced before starting workers
	if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok {
		return fmt.Errorf("failed to wait for caches to sync")
	}

	klog.Info("Starting workers")
	// Launch two workers to process Foo resources
	for i := 0; i < threadiness; i++ {
		go wait.Until(c.runWorker, time.Second, stopCh)
	}

}
	
// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (c *Controller) runWorker() {
	//死循環(huán),執(zhí)行業(yè)務(wù)邏輯
	for c.processNextWorkItem() {
	}
}	
  • 從隊列中取出數(shù)據(jù),并進行調(diào)用syncHandler方法進行處理,處理完畢后從工作隊列中刪除

//https://github.com/kubernetes/sample-controller/blob/7e92736cc38f37632d2b53e31b9a966e7a91c24a/controller.go#L186
func (c *Controller) processNextWorkItem() bool {
	//讀取工作隊列中的數(shù)據(jù),在之前通過用戶定義的ResourceEventHandler已經(jīng)加入到了工作隊列中,這里區(qū)出做處理
	obj, shutdown := c.workqueue.Get()

	if shutdown {
		return false
	}

	// We wrap this block in a func so we can defer c.workqueue.Done.
	err := func(obj interface{}) error {
		// We call Done here so the workqueue knows we have finished
		// processing this item. We also must remember to call Forget if we
		// do not want this work item being re-queued. For example, we do
		// not call Forget if a transient error occurs, instead the item is
		// put back on the workqueue and attempted again after a back-off
		// period.
		defer c.workqueue.Done(obj)
		var key string
		var ok bool
		// We expect strings to come off the workqueue. These are of the
		// form namespace/name. We do this as the delayed nature of the
		// workqueue means the items in the informer cache may actually be
		// more up to date that when the item was initially put onto the
		// workqueue.
		if key, ok = obj.(string); !ok {
			// As the item in the workqueue is actually invalid, we call
			// Forget here else we'd go into a loop of attempting to
			// process a work item that is invalid.
			c.workqueue.Forget(obj)
			utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
			return nil
		}
		// Run the syncHandler, passing it the namespace/name string of the
		// Foo resource to be synced.
		if err := c.syncHandler(key); err != nil {
			// Put the item back on the workqueue to handle any transient errors.
			c.workqueue.AddRateLimited(key)
			return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
		}
		// Finally, if no error occurs we Forget this item so it does not
		// get queued again until another change happens.
		c.workqueue.Forget(obj)
		klog.Infof("Successfully synced '%s'", key)
		return nil
	}(obj)

	if err != nil {
		utilruntime.HandleError(err)
		return true
	}

	return true
}
// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Foo resource
// with the current status of the resource.
func (c *Controller) syncHandler(key string) error {
	.....
}

至此,已經(jīng)全部分析完畢。

對Kubernetes做擴展開發(fā),一般都是采用自定義的Controller方式。借助官方提供的client-go組件,已經(jīng)實現(xiàn)了Informer機制。而我們只需要注冊ResourceEventHandler事件,并實現(xiàn)自定義的Controller即可完成擴展。

步驟回顧:

  • main方法中構(gòu)建Informer對象并啟動,同時啟動自己的Controller,主要邏輯為輪詢?nèi)スぷ麝犃兄腥?shù)據(jù),并做處理,若無數(shù)據(jù),則會阻塞在取數(shù)據(jù)的地方。

  • Informer構(gòu)建,主要步驟如下

    • 調(diào)用reflector進行ListAndWatch,主要是首次獲取全量的數(shù)據(jù)(List)及監(jiān)聽所有需要關(guān)注資源的最新版本(Watch)存儲到Delta FIFO Queue中。

    • 調(diào)用內(nèi)置controller從Delta中取出數(shù)據(jù)并構(gòu)建數(shù)據(jù)索引及分發(fā)消息給用戶注冊的ResourceEventHandler中;

  • 自定義ResourceEventHandler中根據(jù)事件類型進行處理(如過濾)后,再加入到自定義Controller的工作隊列中;

  • 當加入到工作隊列中后,自定義Controller中的輪詢?nèi)?shù)據(jù)的地方就會繼續(xù),取出數(shù)據(jù),處理,成功后刪除該數(shù)據(jù)。

以上就是如何通過源碼分析Informer機制,小編相信有部分知識點可能是我們?nèi)粘9ぷ鲿姷交蛴玫降?。希望你能通過這篇文章學到更多知識。更多詳情敬請關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI