溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何實例化一個Taint Manager

發(fā)布時間:2021-12-20 09:55:21 來源:億速云 閱讀:85 作者:iii 欄目:云計算

本篇內(nèi)容主要講解“如何實例化一個Taint Manager”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“如何實例化一個Taint Manager”吧!

NewNoExecuteTaintManager

  • PodInformer添加Event Handler時,通過調(diào)用taintManager.PodUpdated(oldPod *v1.Pod, newPod *v1.Pod)往tc.podUpdateQueue添加updateItem。

  • NodeInformer添加Event Handler時,通過調(diào)用taintManager.NodeUpdated(oldNode *v1.Node, newNode *v1.Node)往tc.nodeUpdateQueue添加updateItem。

  • 當創(chuàng)建NodeController時,如果runTaintManager為true(通過kube-controller-manager的--enable-taint-manager中指定,默認為true),則會通過NewNoExecuteTaintManager來實例化一個Taint Manager。

pkg/controller/node/nodecontroller.go:195

func NewNodeController(..) (*NodeController, error) {
	...
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			...
			if nc.taintManager != nil {
				nc.taintManager.PodUpdated(nil, pod)
			}
		},
		...
	}
	...
	} else {
		nodeEventHandlerFuncs = cache.ResourceEventHandlerFuncs{
			AddFunc: func(originalObj interface{}) {
				...
				if nc.taintManager != nil {
					nc.taintManager.NodeUpdated(nil, node)
				}
			},
			...
		}
	}
	...
	if nc.runTaintManager {
		nc.taintManager = NewNoExecuteTaintManager(kubeClient)
	}

    ...

	return nc, nil
}

因此,創(chuàng)建NodeController時已經(jīng)配置了監(jiān)聽pod和node的事件,并會將相關數(shù)據(jù)發(fā)送到tc.podUpdateQueue和tc.nodeUpdateQueue,然后由Taint Manager從中取出數(shù)據(jù)進行處理。在此之前,我們先來看看NewNoExecuteTaintManager是如何實例化一個Taint Manager的。

pkg/controller/node/taint_controller.go:152

func NewNoExecuteTaintManager(c clientset.Interface) *NoExecuteTaintManager {
	...

	tm := &NoExecuteTaintManager{
		client:            c,
		recorder:          recorder,
		
		// taintedNodes記錄每個Node對應的Taint信息。
		taintedNodes:      make(map[string][]v1.Taint),
		
		// nodeUpdateQueue中取出的updateItem會發(fā)送到nodeUpdateChannel,Tait Manager從該Channel中取出對應的node update info。
		nodeUpdateChannel: make(chan *nodeUpdateItem, nodeUpdateChannelSize),
		
		// podUpdateQueue中取出的updateItem會發(fā)送到podUpdateChannel,Tait Manager從該Channel中取出對應的pod update info。
		podUpdateChannel:  make(chan *podUpdateItem, podUpdateChannelSize),
        
        // Node Controller監(jiān)聽到的node update info會發(fā)送到nodeUpdateQueue。
		nodeUpdateQueue: workqueue.New(),
		
		// Node Controller監(jiān)聽到的pod update info會發(fā)送到podUpdateQueue。
		podUpdateQueue:  workqueue.New(),
	}
	
	// CreateWorkerQueue creates a new TimedWorkerQueue for workers that will execute deletePodHandler.
	tm.taintEvictionQueue = CreateWorkerQueue(deletePodHandler(c, tm.emitPodDeletionEvent))

	return tm
}

相關的代碼分析見里面的代碼注釋。需要強調(diào)的是,我們在這里給tm.taintEvictionQueue注冊了函數(shù)deletePodHandler,用來通過Taint Eviction時刪除pod時調(diào)用。Taint Manager Run的時候會通過tc.taintEvictionQueue.AddWork()時創(chuàng)建Worker來執(zhí)行deletePodHandler。

func deletePodHandler(c clientset.Interface, emitEventFunc func(types.NamespacedName)) func(args *WorkArgs) error {
	return func(args *WorkArgs) error {
		ns := args.NamespacedName.Namespace
		name := args.NamespacedName.Name
		glog.V(0).Infof("NoExecuteTaintManager is deleting Pod: %v", args.NamespacedName.String())
		if emitEventFunc != nil {
			emitEventFunc(args.NamespacedName)
		}
		var err error
		
		// 按照失敗重試5次,每次間隔10s的重試機制,調(diào)用apiserver的api刪除對應的Pod。
		for i := 0; i < retries; i++ {
			err = c.Core().Pods(ns).Delete(name, &metav1.DeleteOptions{})
			if err == nil {
				break
			}
			time.Sleep(10 * time.Millisecond)
		}
		return err
	}
}

Run

在Kubernetes Node Controller源碼分析之執(zhí)行篇中提到,在Node Controller Run的時候,如果runTaintManager為true,則會調(diào)用nc.taintManager.Run啟動Taint Manager loop。

pkg/controller/node/nodecontroller.go:550

func (nc *NodeController) Run() {
	go func() {
		...

		if nc.runTaintManager {
			go nc.taintManager.Run(wait.NeverStop)
		}

		...
	}()
}

接下來,我們來看Taint Manager的Run方法。Node Controller啟動的Taint Manager實例其實就是NoExecuteTaintManager,其對應的Run方法代碼如下。

pkg/controller/node/taint_controller.go:179

// Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed.
func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
	glog.V(0).Infof("Starting NoExecuteTaintManager")
	
	// Functions that are responsible for taking work items out of the workqueues and putting them into channels.
	// 從tc.nodeUpdateQueue中獲取updateItem,并發(fā)送到tc.nodeUpdateChannel。
	go func(stopCh <-chan struct{}) {
		for {
			item, shutdown := tc.nodeUpdateQueue.Get()
			if shutdown {
				break
			}
			nodeUpdate := item.(*nodeUpdateItem)
			select {
			case <-stopCh:
				break
			case tc.nodeUpdateChannel <- nodeUpdate:
			}
		}
	}(stopCh)

    // 從tc.podUpdateQueue中獲取updateItem,并發(fā)送到tc.podUpdateChannel。
	go func(stopCh <-chan struct{}) {
		for {
			item, shutdown := tc.podUpdateQueue.Get()
			if shutdown {
				break
			}
			podUpdate := item.(*podUpdateItem)
			select {
			case <-stopCh:
				break
			case tc.podUpdateChannel <- podUpdate:
			}
		}
	}(stopCh)

	// When processing events we want to prioritize Node updates over Pod updates,
	// as NodeUpdates that interest NoExecuteTaintManager should be handled as soon as possible -
	// we don't want user (or system) to wait until PodUpdate queue is drained before it can
	// start evicting Pods from tainted Nodes.
	for {
		select {
		case <-stopCh:
			break
			
		// 從tc.nodeUpdateChannel獲取nodeUpdate數(shù)據(jù),然后invoke tc.handleNodeUpdate進行處理。
		case nodeUpdate := <-tc.nodeUpdateChannel:
			tc.handleNodeUpdate(nodeUpdate)
			
		// 從tc.podUpdateChannel獲取podUpdate數(shù)據(jù),在invoke tc.handlePodUpdate進行處理之前,先確保tc.nodeUpdateQueue中的數(shù)據(jù)已經(jīng)被處理完。
		case podUpdate := <-tc.podUpdateChannel:
		
		// If we found a Pod update we need to empty Node queue first.
		priority:
			for {
				select {
				case nodeUpdate := <-tc.nodeUpdateChannel:
					tc.handleNodeUpdate(nodeUpdate)
				default:
					break priority
				}
			}
			
			// After Node queue is emptied we process podUpdate.
			tc.handlePodUpdate(podUpdate)
		}
	}
}

可見, Run方法中分別從對應的queue中取出數(shù)據(jù),然后調(diào)用tc.handleNodeUpdatetc.handlePodUpdate進行處理。

// pkg/controller/node/taint_controller.go:365

func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate *nodeUpdateItem) {
	// Delete
	// 如果nodeUpdate.newNode == nil,則表明該Node被刪除了,那么將該Node的Taints信息從tc.taintedNodes緩存中刪除。
	if nodeUpdate.newNode == nil {
		node := nodeUpdate.oldNode
		glog.V(4).Infof("Noticed node deletion: %#v", node.Name)
		tc.taintedNodesLock.Lock()
		defer tc.taintedNodesLock.Unlock()
		delete(tc.taintedNodes, node.Name)
		return
	}
	
	// Create or Update
	// 如果是Node Create或者Node Update Event,則更新tc.taintedNodes緩存中記錄的該Node的Taints信息。
	glog.V(4).Infof("Noticed node update: %#v", nodeUpdate)
	node := nodeUpdate.newNode
	taints := nodeUpdate.newTaints
	func() {
		tc.taintedNodesLock.Lock()
		defer tc.taintedNodesLock.Unlock()
		glog.V(4).Infof("Updating known taints on node %v: %v", node.Name, taints)
		if len(taints) == 0 {
			delete(tc.taintedNodes, node.Name)
		} else {
			tc.taintedNodes[node.Name] = taints
		}
	}()
	
	// 然后,獲取該Node上所有pods list。
	pods, err := getPodsAssignedToNode(tc.client, node.Name)
	if err != nil {
		glog.Errorf(err.Error())
		return
	}
	if len(pods) == 0 {
		return
	}
	
	
	// Short circuit, to make this controller a bit faster.
	// 如果該Node上的Taints被刪除了,則取消所有該node上的pod evictions。
	if len(taints) == 0 {
		glog.V(4).Infof("All taints were removed from the Node %v. Cancelling all evictions...", node.Name)
		for i := range pods {
			tc.cancelWorkWithEvent(types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name})
		}
		return
	}

    // 否則,調(diào)用tc.processPodOnNode根據(jù)Node Taints info和Pod Tolerations info處理該Node上的Pod Eviction。
	now := time.Now()
	for i := range pods {
		pod := &pods[i]
		podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
		tc.processPodOnNode(podNamespacedName, node.Name, pod.Spec.Tolerations, taints, now)
	}
}

handleNodeUpdate的邏輯為:

  • 如果nodeUpdate.newNode == nil,則表明該Node被刪除了,那么將該Node的Taints信息從tc.taintedNodes緩存中刪除。

  • 如果是Node Create或者Node Update Event,則更新tc.taintedNodes緩存中記錄的該Node的Taints信息。

    • 獲取該Node上所有pods list。

    • 如果該Node上的Taints被刪除了,則取消所有該node上的pod evictions。

    • 否則,遍歷pods list中的每個pod,分別調(diào)用tc.processPodOnNode根據(jù)Node Taints info和Pod Tolerations info處理該Node上的Pod Eviction。

// pkg/controller/node/taint_controller.go:334

func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate *podUpdateItem) {
	// Delete
	// 如果podUpdate.newPod == nil,則表明該Pod被刪除了,那么取消該Pod Evictions。
	if podUpdate.newPod == nil {
		pod := podUpdate.oldPod
		podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
		glog.V(4).Infof("Noticed pod deletion: %#v", podNamespacedName)
		tc.cancelWorkWithEvent(podNamespacedName)
		return
	}
	
	// Create or Update
	// 如果是Pod Create或者Pod Update Event,則取出該pod的node上的Taints info。
	pod := podUpdate.newPod
	podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
	glog.V(4).Infof("Noticed pod update: %#v", podNamespacedName)
	nodeName := pod.Spec.NodeName
	if nodeName == "" {
		return
	}
	taints, ok := func() ([]v1.Taint, bool) {
		tc.taintedNodesLock.Lock()
		defer tc.taintedNodesLock.Unlock()
		taints, ok := tc.taintedNodes[nodeName]
		return taints, ok
	}()
	// It's possible that Node was deleted, or Taints were removed before, which triggered
	// eviction cancelling if it was needed.
	if !ok {
		return
	}
	
	// 然后,調(diào)用tc.processPodOnNode根據(jù)Node Taints info和Pod Tolerations info處理該Node上的Pod Eviction。
	tc.processPodOnNode(podNamespacedName, nodeName, podUpdate.newTolerations, taints, time.Now())
}

handlePodUpdate的邏輯為:

  • 如果podUpdate.newPod == nil,則表明該Pod被刪除了,那么取消該Pod Evictions。

  • 如果是Pod Create或者Pod Update Event,則取出該pod的node上的Taints info。

    • 如果node上的Taints info信息為空,表明Taints info被刪除了或者Node被刪除了,那么就不需要處理該node上的pod eviction了,流程結束。

    • 否則,調(diào)用tc.processPodOnNode根據(jù)Node Taints info和Pod Tolerations info處理該Node上的Pod Eviction。

因此,不管是handlePodUpdate還是handleNodeUpdate,最終都是通過processPodOnNode來處理Pod Eviction的。

pkg/controller/node/taint_controller.go:295

func (tc *NoExecuteTaintManager) processPodOnNode(
	podNamespacedName types.NamespacedName,
	nodeName string,
	tolerations []v1.Toleration,
	taints []v1.Taint,
	now time.Time,
) {

    // 如果該node的taints info為空,則取消Taint Eviction Pods。
	if len(taints) == 0 {
		tc.cancelWorkWithEvent(podNamespacedName)
	}
	
	// 對比node的taints info和pod tolerations info,判斷出node的taints是否都能被pod所能容忍。
	allTolerated, usedTolerations := v1.GetMatchingTolerations(taints, tolerations)
	
	// 如果不是全部都能容忍,那么調(diào)用立刻調(diào)用AddWork來創(chuàng)建worker,啟動tc.taintEvictionQueue注冊的deletePodHandler來刪除該pod。
	if !allTolerated {
		glog.V(2).Infof("Not all taints are tolerated after update for Pod %v on %v", podNamespacedName.String(), nodeName)
		// We're canceling scheduled work (if any), as we're going to delete the Pod right away.
		tc.cancelWorkWithEvent(podNamespacedName)
		tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), time.Now(), time.Now())
		return
	}
	
	// 否則,取pod的所有tolerations的TolerationSeconds的最小值作為minTolerationTime。如果某個Toleration沒有設置TolerationSeconds,則表示0,如果設置的值為負數(shù),則用0替代。
	minTolerationTime := getMinTolerationTime(usedTolerations)
	// getMinTolerationTime returns negative value to denote infinite toleration.
	if minTolerationTime < 0 {
		glog.V(4).Infof("New tolerations for %v tolerate forever. Scheduled deletion won't be cancelled if already scheduled.", podNamespacedName.String())
		return
	}

	startTime := now
	triggerTime := startTime.Add(minTolerationTime)
	
	// 從tc.taintEvictionQueue中獲取Worker-scheduledEviction
	scheduledEviction := tc.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String())
	
	// 如果獲取到不為空的scheduledEviction,則判斷worker創(chuàng)建時間加上minTolerationTime是否達到觸發(fā)時間要求,如果沒達到,則不進行Taint Pod Eviction,流程結束。
	if scheduledEviction != nil {
		startTime = scheduledEviction.CreatedAt
		if startTime.Add(minTolerationTime).Before(triggerTime) {
			return
		} else {
			tc.cancelWorkWithEvent(podNamespacedName)
		}
	}
	
	// 如果達到觸發(fā)時間要求,則取消worker,并立刻調(diào)用AddWork來創(chuàng)建worker,啟動tc.taintEvictionQueue注冊的deletePodHandler來刪除該pod。
	tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime)
}

processPodOnNode的邏輯為:

  • 如果該node的taints info為空,則取消Taint Eviction Pods。

  • 對比node的taints info和pod tolerations info,判斷出node的taints是否都能被pod所能容忍。

  • 如果不是全部都能容忍,那么調(diào)用立刻調(diào)用AddWork來創(chuàng)建worker,啟動tc.taintEvictionQueue注冊的deletePodHandler來刪除該pod。

  • 否則,取pod的所有tolerations的TolerationSeconds的最小值作為minTolerationTime。如果某個Toleration沒有設置TolerationSeconds,表示不作驅逐。

    • 如果獲取到不為空的scheduledEviction,則判斷worker創(chuàng)建時間加上minTolerationTime是否達到觸發(fā)時間要求,如果沒達到,則不進行Taint Pod Eviction,流程結束。

    • 如果達到觸發(fā)時間要求,則取消worker,并立刻調(diào)用AddWork來創(chuàng)建worker,啟動tc.taintEvictionQueue注冊的deletePodHandler來刪除該pod。

    • 如果minTolerationTime小于0,則永遠容忍,流程結束。

    • 從tc.taintEvictionQueue中獲取Worker-scheduledEviction。

到此,相信大家對“如何實例化一個Taint Manager”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關內(nèi)容可以進入相關頻道進行查詢,關注我們,繼續(xù)學習!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。

AI