溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Kubernetes Replication Controller的結(jié)構(gòu)定義是什么

發(fā)布時(shí)間:2021-12-20 10:14:45 來源:億速云 閱讀:139 作者:iii 欄目:云計(jì)算

本篇內(nèi)容主要講解“Kubernetes Replication Controller的結(jié)構(gòu)定義是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“Kubernetes Replication Controller的結(jié)構(gòu)定義是什么”吧!

ReplicationManager

ReplicationManager就是ReplicationController控制器對(duì)象,方便在代碼中和ReplicationController Resource API Object進(jìn)行區(qū)分。下面代碼是ReplicationManager的結(jié)構(gòu)定義。

pkg/controller/replication/replication_controller.go:75

// ReplicationManager is responsible for synchronizing ReplicationController objects stored in the system with actual running pods.
type ReplicationManager struct {
	kubeClient clientset.Interface
	podControl controller.PodControlInterface

	// internalPodInformer is used to hold a personal informer.  If we're using
	// a normal shared informer, then the informer will be started for us.  If
	// we have a personal informer, we must start it ourselves.   If you start
	// the controller using NewReplicationManager(passing SharedInformer), this
	// will be null
	internalPodInformer cache.SharedIndexInformer

	// An rc is temporarily suspended after creating/deleting these many replicas.
	// It resumes normal action after observing the watch events for them.
	burstReplicas int
	// To allow injection of syncReplicationController for testing.
	syncHandler func(rcKey string) error

	// A TTLCache of pod creates/deletes each rc expects to see.
	expectations *controller.UIDTrackingControllerExpectations

	// A store of replication controllers, populated by the rcController
	rcStore cache.StoreToReplicationControllerLister
	// Watches changes to all replication controllers
	rcController *cache.Controller
	// A store of pods, populated by the podController
	podStore cache.StoreToPodLister
	// Watches changes to all pods
	podController cache.ControllerInterface
	// podStoreSynced returns true if the pod store has been synced at least once.
	// Added as a member to the struct to allow injection for testing.
	podStoreSynced func() bool

	lookupCache *controller.MatchingCache

	// Controllers that need to be synced
	queue workqueue.RateLimitingInterface

	// garbageCollectorEnabled denotes if the garbage collector is enabled. RC
	// manager behaves differently if GC is enabled.
	garbageCollectorEnabled bool
}

重點(diǎn)對(duì)下面?zhèn)€幾個(gè)對(duì)象介紹說明:

  • podControl: 提供Create/Delete Pod的操作接口。

  • burstReplicas: 每次批量Create/Delete Pods時(shí)允許并發(fā)的最大數(shù)量。

  • syncHandler: 真正執(zhí)行Replica Sync的函數(shù)。

  • expectation: 維護(hù)的期望狀態(tài)下的Pod的Uid Cache,并且提供了修正該Cache的接口。

  • rcStore: ReplicationController Resource對(duì)象的Indexer,數(shù)據(jù)由rcController提供和維護(hù)。

  • rcController: 用來watch 所有 ReplicationController Resource,watch到的change更新到rcStore中。

  • podStore: Pod的Indexer,數(shù)據(jù)由podController提供和維護(hù)。

  • podController: 用來watch所有Pod Resource,watch到的change更新到podStore中。

  • queue: 用來存放待sync的RC,是一個(gè)RateLimit類型的queue。

  • lookupCache: 提供Pod和RC匹配信息的cache,以提高查詢效率。

ReplicationController在何處啟動(dòng)的

看過我我的博文: Kubernetes ResourceQuota Controller內(nèi)部實(shí)現(xiàn)原理及源碼分析的可能有印象,里面也提到了controller manager是如何啟動(dòng)ResourceQuotaController的,ReplicationController也是一樣的。在kube-controller-manager調(diào)用newControllerInitializers進(jìn)行控制器初始化的時(shí)候,將startReplicationController注冊(cè)進(jìn)去了,用來啟動(dòng)ReplicationController控制器。

cmd/kube-controller-manager/app/controllermanager.go:224

func newControllerInitializers() map[string]InitFunc {
	controllers := map[string]InitFunc{}
	controllers["endpoint"] = startEndpointController
	controllers["replicationcontroller"] = startReplicationController
	controllers["podgc"] = startPodGCController
	controllers["resourcequota"] = startResourceQuotaController
	controllers["namespace"] = startNamespaceController
	controllers["serviceaccount"] = startServiceAccountController
	controllers["garbagecollector"] = startGarbageCollectorController
	controllers["daemonset"] = startDaemonSetController
	controllers["job"] = startJobController
	controllers["deployment"] = startDeploymentController
	controllers["replicaset"] = startReplicaSetController
	controllers["horizontalpodautoscaling"] = startHPAController
	controllers["disruption"] = startDisruptionController
	controllers["statefuleset"] = startStatefulSetController
	controllers["cronjob"] = startCronJobController
	controllers["certificatesigningrequests"] = startCSRController

	return controllers
}

代碼繼續(xù)跟到startReplicationController,很簡(jiǎn)單,啟動(dòng)一個(gè)goroutine,調(diào)用replicationcontroller.NewReplicationManager創(chuàng)建一個(gè)ReplicationManager并執(zhí)行其中Run方法開始工作。

cmd/kube-controller-manager/app/core.go:55

func startReplicationController(ctx ControllerContext) (bool, error) {
	go replicationcontroller.NewReplicationManager(
		ctx.InformerFactory.Pods().Informer(),
		ctx.ClientBuilder.ClientOrDie("replication-controller"),
		ResyncPeriod(&ctx.Options),
		replicationcontroller.BurstReplicas,
		int(ctx.Options.LookupCacheSizeForRC),
		ctx.Options.EnableGarbageCollector,
	).Run(int(ctx.Options.ConcurrentRCSyncs), ctx.Stop)
	return true, nil
}

創(chuàng)建ReplicationManager

上面分析到,controller-manager通過NewReplicationManager創(chuàng)建一個(gè)ReplicationManager對(duì)象,其實(shí)就是ReplicationController控制器。

pkg/controller/replication/replication_controller.go:122

// NewReplicationManager creates a replication manager
func NewReplicationManager(podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicationManager {
	eventBroadcaster := record.NewBroadcaster()
	eventBroadcaster.StartLogging(glog.Infof)
	eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
	return newReplicationManager(
		eventBroadcaster.NewRecorder(v1.EventSource{Component: "replication-controller"}),
		podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled)
}



pkg/controller/replication/replication_controller.go:132
// newReplicationManager configures a replication manager with the specified event recorder
func newReplicationManager(eventRecorder record.EventRecorder, podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicationManager {
	if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil {
		metrics.RegisterMetricAndTrackRateLimiterUsage("replication_controller", kubeClient.Core().RESTClient().GetRateLimiter())
	}

	rm := &ReplicationManager{
		kubeClient: kubeClient,
		podControl: controller.RealPodControl{
			KubeClient: kubeClient,
			Recorder:   eventRecorder,
		},
		burstReplicas: burstReplicas,
		expectations:  controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
		queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "replicationmanager"),
		garbageCollectorEnabled: garbageCollectorEnabled,
	}

	rm.rcStore.Indexer, rm.rcController = cache.NewIndexerInformer(
		&cache.ListWatch{
			ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
				return rm.kubeClient.Core().ReplicationControllers(v1.NamespaceAll).List(options)
			},
			WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
				return rm.kubeClient.Core().ReplicationControllers(v1.NamespaceAll).Watch(options)
			},
		},
		&v1.ReplicationController{},
		// TODO: Can we have much longer period here?
		FullControllerResyncPeriod,
		cache.ResourceEventHandlerFuncs{
			AddFunc:    rm.enqueueController,
			UpdateFunc: rm.updateRC,
			// This will enter the sync loop and no-op, because the controller has been deleted from the store.
			// Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
			// way of achieving this is by performing a `stop` operation on the controller.
			DeleteFunc: rm.enqueueController,
		},
		cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
	)

	podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: rm.addPod,
		// This invokes the rc for every pod change, eg: host assignment. Though this might seem like overkill
		// the most frequent pod update is status, and the associated rc will only list from local storage, so
		// it should be ok.
		UpdateFunc: rm.updatePod,
		DeleteFunc: rm.deletePod,
	})
	rm.podStore.Indexer = podInformer.GetIndexer()
	rm.podController = podInformer.GetController()

	rm.syncHandler = rm.syncReplicationController
	rm.podStoreSynced = rm.podController.HasSynced
	rm.lookupCache = controller.NewMatchingCache(lookupCacheSize)
	return rm
}

newReplicationManager中主要配置ReplicationManager,比如:

  • 通過workqueue.NewNamedRateLimitingQueue配置queue。

  • 通過controller.NewUIDTrackingControllerExpectations配置expectations。

  • 配置rcStore, podStore, rcController, podController。

  • 配置syncHandler為rm.syncReplicationController,這個(gè)很重要,所以我單獨(dú)列出來說。在后面會(huì)講到,syncReplicationController就是做核心工作的的方法,可以說Replica的自動(dòng)維護(hù)都是由它來完成的。

執(zhí)行ReplicationManger.Run開始工作

ReplicationManager創(chuàng)建好了,接下來得干活啦。Run方法就是干活的起步點(diǎn),開始進(jìn)行watching and syncing。

pkg/controller/replication/replication_controller.go:217

// Run begins watching and syncing.
func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	glog.Infof("Starting RC Manager")
	go rm.rcController.Run(stopCh)
	go rm.podController.Run(stopCh)
	for i := 0; i < workers; i++ {
		go wait.Until(rm.worker, time.Second, stopCh)
	}

	if rm.internalPodInformer != nil {
		go rm.internalPodInformer.Run(stopCh)
	}

	<-stopCh
	glog.Infof("Shutting down RC Manager")
	rm.queue.ShutDown()
}
  • watching

    • go rm.rcController.Run(stopCh)負(fù)責(zé)watch all rc。

    • go rm.podController.Run(stopCh)負(fù)責(zé)watch all pod。

  • syncing

    • 啟動(dòng)workers數(shù)量的goroutine。

    • 每個(gè)goroutine都不斷循環(huán)執(zhí)行rm.worker,每個(gè)循環(huán)之間停留1s。而rm.worker就是負(fù)責(zé)從queue中獲取rc并調(diào)用syncHandler進(jìn)行同步。

    • 每個(gè)goroutine直到收到stopCh信號(hào)才結(jié)束。

下面是rcController和podController的Run方法實(shí)現(xiàn),功能就是完成rc / pod的watch。

pkg/client/cache/controller.go:84

// Run begins processing items, and will continue until a value is sent down stopCh.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *Controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)

	c.reflectorMutex.Lock()
	c.reflector = r
	c.reflectorMutex.Unlock()

	r.RunUntil(stopCh)

	wait.Until(c.processLoop, time.Second, stopCh)
}

sync的關(guān)鍵實(shí)現(xiàn),就在ReplicationManager的worker方法中,代碼如下。

pkg/controller/replication/replication_controller.go:488

// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (rm *ReplicationManager) worker() {
	workFunc := func() bool {
		key, quit := rm.queue.Get()
		if quit {
			return true
		}
		defer rm.queue.Done(key)

		err := rm.syncHandler(key.(string))
		if err == nil {
			rm.queue.Forget(key)
			return false
		}

		rm.queue.AddRateLimited(key)
		utilruntime.HandleError(err)
		return false
	}
	for {
		if quit := workFunc(); quit {
			glog.Infof("replication controller worker shutting down")
			return
		}
	}
}

worker中的主要邏輯為:

  • 從rm的RateLimited Queue中獲取一個(gè)rc的key。

  • 調(diào)用syncHandler Interface,對(duì)該rc進(jìn)行sync。

在newReplicationManager時(shí),通過rm.syncHandler = rm.syncReplicationController注冊(cè)syncHandler為syncReplicationController了。因此sync rc的邏輯就在syncReplicationController中了。

pkg/controller/replication/replication_controller.go:639

// syncReplicationController will sync the rc with the given key if it has had its expectations fulfilled, meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked concurrently with the same key.

func (rm *ReplicationManager) syncReplicationController(key string) error {
	trace := util.NewTrace("syncReplicationController: " + key)
	defer trace.LogIfLong(250 * time.Millisecond)

	startTime := time.Now()
	defer func() {
		glog.V(4).Infof("Finished syncing controller %q (%v)", key, time.Now().Sub(startTime))
	}()

	if !rm.podStoreSynced() {
		// Sleep so we give the pod reflector goroutine a chance to run.
		time.Sleep(PodStoreSyncedPollPeriod)
		glog.Infof("Waiting for pods controller to sync, requeuing rc %v", key)
		rm.queue.Add(key)
		return nil
	}

	obj, exists, err := rm.rcStore.Indexer.GetByKey(key)
	if !exists {
		glog.Infof("Replication Controller has been deleted %v", key)
		rm.expectations.DeleteExpectations(key)
		return nil
	}
	if err != nil {
		return err
	}
	rc := *obj.(*v1.ReplicationController)

	trace.Step("ReplicationController restored")
	rcNeedsSync := rm.expectations.SatisfiedExpectations(key)
	trace.Step("Expectations restored")

	// NOTE: filteredPods are pointing to objects from cache - if you need to
	// modify them, you need to copy it first.
	// TODO: Do the List and Filter in a single pass, or use an index.
	var filteredPods []*v1.Pod
	if rm.garbageCollectorEnabled {
		// list all pods to include the pods that don't match the rc's selector
		// anymore but has the stale controller ref.
		pods, err := rm.podStore.Pods(rc.Namespace).List(labels.Everything())
		if err != nil {
			glog.Errorf("Error getting pods for rc %q: %v", key, err)
			rm.queue.Add(key)
			return err
		}
		cm := controller.NewPodControllerRefManager(rm.podControl, rc.ObjectMeta, labels.Set(rc.Spec.Selector).AsSelectorPreValidated(), getRCKind())
		matchesAndControlled, matchesNeedsController, controlledDoesNotMatch := cm.Classify(pods)
		// Adopt pods only if this replication controller is not going to be deleted.
		if rc.DeletionTimestamp == nil {
			for _, pod := range matchesNeedsController {
				err := cm.AdoptPod(pod)
				// continue to next pod if adoption fails.
				if err != nil {
					// If the pod no longer exists, don't even log the error.
					if !errors.IsNotFound(err) {
						utilruntime.HandleError(err)
					}
				} else {
					matchesAndControlled = append(matchesAndControlled, pod)
				}
			}
		}
		filteredPods = matchesAndControlled
		// remove the controllerRef for the pods that no longer have matching labels
		var errlist []error
		for _, pod := range controlledDoesNotMatch {
			err := cm.ReleasePod(pod)
			if err != nil {
				errlist = append(errlist, err)
			}
		}
		if len(errlist) != 0 {
			aggregate := utilerrors.NewAggregate(errlist)
			// push the RC into work queue again. We need to try to free the
			// pods again otherwise they will stuck with the stale
			// controllerRef.
			rm.queue.Add(key)
			return aggregate
		}
	} else {
		pods, err := rm.podStore.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelectorPreValidated())
		if err != nil {
			glog.Errorf("Error getting pods for rc %q: %v", key, err)
			rm.queue.Add(key)
			return err
		}
		filteredPods = controller.FilterActivePods(pods)
	}

	var manageReplicasErr error
	if rcNeedsSync && rc.DeletionTimestamp == nil {
		manageReplicasErr = rm.manageReplicas(filteredPods, &rc)
	}
	trace.Step("manageReplicas done")

	newStatus := calculateStatus(rc, filteredPods, manageReplicasErr)

	// Always updates status as pods come up or die.
	if err := updateReplicationControllerStatus(rm.kubeClient.Core().ReplicationControllers(rc.Namespace), rc, newStatus); err != nil {
		// Multiple things could lead to this update failing.  Returning an error causes a requeue without forcing a hotloop
		return err
	}

	return manageReplicasErr
}

syncReplicationController的主要邏輯為:

  1. 如果podStore還沒有被同步過一次,則將該rc的key重新加入到queue中,以等待podStore同步,流程結(jié)束,否則繼續(xù)后面的流程。

  2. 根據(jù)該rc的key值,從rcStore中獲取對(duì)應(yīng)的rc object,如果不存在該rc object,則說明該rc已經(jīng)被刪除了,然后根據(jù)key從epectations中刪除該rc并返回,流程結(jié)束。如果存在該rc object,則繼續(xù)后面的流程。

  3. 檢測(cè)expectations中的add和del以及距離上一個(gè)時(shí)間戳是否超時(shí)5min,來判斷該rc是否需要sync。

  4. 如果啟動(dòng)了GC,則獲取podStore中整個(gè)namespace下的pods,然后將matchesAndControlled和matchesNeedsController的pods作為過濾后待同步的filteredPods。如果沒有啟動(dòng)GC,則直接獲取podStore中該namespace下匹配rc.Spec.Selector的Active狀態(tài)的pods作為過濾后待同步的filteredPods。(關(guān)于matchesAndControlled和matchesNeedsController的理解,請(qǐng)參考pkg/controller/controller_ref_manager.go:57中定義的PodControllerRefManager.Classify函數(shù))

  5. 如果第3步中檢測(cè)到該rc需要sync,并且DeletionTimestamp這個(gè)時(shí)間戳為nil,則調(diào)用manageReplicas方法,使得該rc管理的active狀態(tài)的pods數(shù)量和期望值一樣。

  6. 執(zhí)行完manageReplicas后,需要馬上重新計(jì)算一下rc的status,更新status中的Conditions,Replicas,F(xiàn)ullyLabeledReplicas,ReadyReplicas,AvailableReplicas信息。

  7. 通過updateReplicationControllerStatus方法調(diào)用kube-api-server的接口更新該rc的status為上一步重新計(jì)算后的新status,流程結(jié)束。

上面描述的syncReplicationController流程中,一個(gè)很關(guān)鍵的步驟是step 5中調(diào)用的manageReplicas方法,它負(fù)責(zé)rc對(duì)應(yīng)replicas的修復(fù)工作(add or delete)。

pkg/controller/replication/replication_controller.go:516

// manageReplicas checks and updates replicas for the given replication controller.
// Does NOT modify <filteredPods>.
func (rm *ReplicationManager) manageReplicas(filteredPods []*v1.Pod, rc *v1.ReplicationController) error {
	diff := len(filteredPods) - int(*(rc.Spec.Replicas))
	rcKey, err := controller.KeyFunc(rc)
	if err != nil {
		return err
	}
	if diff == 0 {
		return nil
	}

	if diff < 0 {
		diff *= -1
		if diff > rm.burstReplicas {
			diff = rm.burstReplicas
		}
		// TODO: Track UIDs of creates just like deletes. The problem currently
		// is we'd need to wait on the result of a create to record the pod's
		// UID, which would require locking *across* the create, which will turn
		// into a performance bottleneck. We should generate a UID for the pod
		// beforehand and store it via ExpectCreations.
		errCh := make(chan error, diff)
		rm.expectations.ExpectCreations(rcKey, diff)
		var wg sync.WaitGroup
		wg.Add(diff)
		glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", rc.Namespace, rc.Name, *(rc.Spec.Replicas), diff)
		for i := 0; i < diff; i++ {
			go func() {
				defer wg.Done()
				var err error
				if rm.garbageCollectorEnabled {
					var trueVar = true
					controllerRef := &metav1.OwnerReference{
						APIVersion: getRCKind().GroupVersion().String(),
						Kind:       getRCKind().Kind,
						Name:       rc.Name,
						UID:        rc.UID,
						Controller: &trueVar,
					}
					err = rm.podControl.CreatePodsWithControllerRef(rc.Namespace, rc.Spec.Template, rc, controllerRef)
				} else {
					err = rm.podControl.CreatePods(rc.Namespace, rc.Spec.Template, rc)
				}
				if err != nil {
					// Decrement the expected number of creates because the informer won't observe this pod
					glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name)
					rm.expectations.CreationObserved(rcKey)
					errCh <- err
					utilruntime.HandleError(err)
				}
			}()
		}
		wg.Wait()

		select {
		case err := <-errCh:
			// all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
			if err != nil {
				return err
			}
		default:
		}

		return nil
	}

	if diff > rm.burstReplicas {
		diff = rm.burstReplicas
	}
	glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rc.Namespace, rc.Name, *(rc.Spec.Replicas), diff)
	// No need to sort pods if we are about to delete all of them
	if *(rc.Spec.Replicas) != 0 {
		// Sort the pods in the order such that not-ready < ready, unscheduled
		// < scheduled, and pending < running. This ensures that we delete pods
		// in the earlier stages whenever possible.
		sort.Sort(controller.ActivePods(filteredPods))
	}
	// Snapshot the UIDs (ns/name) of the pods we're expecting to see
	// deleted, so we know to record their expectations exactly once either
	// when we see it as an update of the deletion timestamp, or as a delete.
	// Note that if the labels on a pod/rc change in a way that the pod gets
	// orphaned, the rs will only wake up after the expectations have
	// expired even if other pods are deleted.
	deletedPodKeys := []string{}
	for i := 0; i < diff; i++ {
		deletedPodKeys = append(deletedPodKeys, controller.PodKey(filteredPods[i]))
	}
	// We use pod namespace/name as a UID to wait for deletions, so if the
	// labels on a pod/rc change in a way that the pod gets orphaned, the
	// rc will only wake up after the expectation has expired.
	errCh := make(chan error, diff)
	rm.expectations.ExpectDeletions(rcKey, deletedPodKeys)
	var wg sync.WaitGroup
	wg.Add(diff)
	for i := 0; i < diff; i++ {
		go func(ix int) {
			defer wg.Done()
			if err := rm.podControl.DeletePod(rc.Namespace, filteredPods[ix].Name, rc); err != nil {
				// Decrement the expected number of deletes because the informer won't observe this deletion
				podKey := controller.PodKey(filteredPods[ix])
				glog.V(2).Infof("Failed to delete %v due to %v, decrementing expectations for controller %q/%q", podKey, err, rc.Namespace, rc.Name)
				rm.expectations.DeletionObserved(rcKey, podKey)
				errCh <- err
				utilruntime.HandleError(err)
			}
		}(i)
	}
	wg.Wait()

	select {
	case err := <-errCh:
		// all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
		if err != nil {
			return err
		}
	default:
	}

	return nil

}

上面manageReplicas代碼的主要邏輯為:

  • 首先計(jì)算filteredPods中Pods數(shù)量和rc.Spec.Replicas中定義的期望數(shù)量的差值diff。

  • 如果差值diff為0,表示當(dāng)前狀態(tài)和期望狀態(tài)一樣,直接返回,流程結(jié)束。

  • 如果差值diff為負(fù)數(shù),表示當(dāng)前Active狀態(tài)的Pods數(shù)量不足,則啟動(dòng)下面流程:

    • 比較|diff|和burstReplicas的值,以保證這次最多只創(chuàng)建burstReplicas數(shù)量的pods。

    • 調(diào)用expectations.ExpectCreations接口設(shè)置expectations中的add大小為|diff|的值,表示要新創(chuàng)建|diff|數(shù)量的pods以達(dá)到期望狀態(tài)。

    • sync.WaitGroup啟動(dòng)|diff|數(shù)量的goroutine協(xié)程,每個(gè)goroutine分別負(fù)責(zé)調(diào)用podControl.CreatePods接口創(chuàng)建一個(gè)該namespace.rc管理的對(duì)應(yīng)spec Template的pod。

    • 待所有g(shù)oroutine都執(zhí)行完畢后,如果其中一個(gè)或者多個(gè)pod創(chuàng)建失敗,則返回err,否則返回nil,流程結(jié)束。

  • 如果差值diff為正數(shù),表示當(dāng)前Active狀態(tài)的Pods數(shù)量超過了期望值,則啟動(dòng)下面流程:

    • 比較|diff|和burstReplicas的值,以保證這次最多只刪除burstReplicas數(shù)量的pods。

    • 對(duì)filteredPods中的pods進(jìn)行排序,排序目的是:not-ready < ready, unscheduled < scheduled, and pending < running,讓stages越早的pods優(yōu)先被delete。

    • 排序完之后,挑選前面|diff|個(gè)pods作為待delete的Pods。

    • 調(diào)用expectations.ExpectDeletions接口設(shè)置expectations中的del大小為|diff|的值,表示要新刪除|diff|數(shù)量的pods以達(dá)到期望狀態(tài)。

    • sync.WaitGroup啟動(dòng)|diff|數(shù)量的goroutine協(xié)程,每個(gè)goroutine分別負(fù)責(zé)調(diào)用podControl.DeletePod接口刪除待delete Pods中的一個(gè)Pod。

    • 待所有g(shù)oroutine都執(zhí)行完畢后,如果其中一個(gè)或者多個(gè)pod刪除失敗,則返回err,否則返回nil,流程結(jié)束。

到此,相信大家對(duì)“Kubernetes Replication Controller的結(jié)構(gòu)定義是什么”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI