溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Kubernetes Job Controller怎么構(gòu)造

發(fā)布時間:2021-12-20 09:54:36 來源:億速云 閱讀:115 作者:iii 欄目:云計算

這篇文章主要講解了“Kubernetes Job Controller怎么構(gòu)造”,文中的講解內(nèi)容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Kubernetes Job Controller怎么構(gòu)造”吧!

實現(xiàn)流程圖

廢話不多說,先把完整流程貼出來。

Kubernetes Job Controller怎么構(gòu)造

New JobController

type JobController struct {
	kubeClient clientset.Interface
	podControl controller.PodControlInterface

	// To allow injection of updateJobStatus for testing.
	updateHandler func(job *batch.Job) error
	syncHandler   func(jobKey string) (bool, error)
	// podStoreSynced returns true if the pod store has been synced at least once.
	// Added as a member to the struct to allow injection for testing.
	podStoreSynced cache.InformerSynced
	// jobStoreSynced returns true if the job store has been synced at least once.
	// Added as a member to the struct to allow injection for testing.
	jobStoreSynced cache.InformerSynced

	// A TTLCache of pod creates/deletes each rc expects to see
	expectations controller.ControllerExpectationsInterface

	// A store of jobs
	jobLister batchv1listers.JobLister

	// A store of pods, populated by the podController
	podStore corelisters.PodLister

	// Jobs that need to be updated
	queue workqueue.RateLimitingInterface

	recorder record.EventRecorder
}


func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *JobController {
	eventBroadcaster := record.NewBroadcaster()
	eventBroadcaster.StartLogging(glog.Infof)
	// TODO: remove the wrapper when every clients have moved to use the clientset.
	eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")})

	if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
		metrics.RegisterMetricAndTrackRateLimiterUsage("job_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
	}

	jm := &JobController{
		kubeClient: kubeClient,
		podControl: controller.RealPodControl{
			KubeClient: kubeClient,
			Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
		},
		expectations: controller.NewControllerExpectations(),
		queue:        workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job"),
		recorder:     eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
	}

	jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    jm.enqueueController,
		UpdateFunc: jm.updateJob,
		DeleteFunc: jm.enqueueController,
	})
	jm.jobLister = jobInformer.Lister()
	jm.jobStoreSynced = jobInformer.Informer().HasSynced

	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    jm.addPod,
		UpdateFunc: jm.updatePod,
		DeleteFunc: jm.deletePod,
	})
	jm.podStore = podInformer.Lister()
	jm.podStoreSynced = podInformer.Informer().HasSynced

	jm.updateHandler = jm.updateJobStatus
	jm.syncHandler = jm.syncJob

	return jm
}
  • 構(gòu)造JobController,并初始化相關(guān)數(shù)據(jù),比如rate limiter queue;

  • watch pod and job object;

  • 注冊podInformer的add/del/update EventHandler;

  • 注冊jobInformer的add/del/update EventHandler;

  • 注冊updataHandler為updateJobStatus,用來更新Job狀態(tài);

  • 注冊syncHandler為syncJob,用來進行處理queue中的Job;

JobController Run

// Run the main goroutine responsible for watching and syncing jobs.
func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	defer jm.queue.ShutDown()

	glog.Infof("Starting job controller")
	defer glog.Infof("Shutting down job controller")

	if !controller.WaitForCacheSync("job", stopCh, jm.podStoreSynced, jm.jobStoreSynced) {
		return
	}

	for i := 0; i < workers; i++ {
		go wait.Until(jm.worker, time.Second, stopCh)
	}

	<-stopCh
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (jm *JobController) worker() {
	for jm.processNextWorkItem() {
	}
}

func (jm *JobController) processNextWorkItem() bool {
	key, quit := jm.queue.Get()
	if quit {
		return false
	}
	defer jm.queue.Done(key)

	forget, err := jm.syncHandler(key.(string))
	if err == nil {
		if forget {
			jm.queue.Forget(key)
		}
		return true
	}

	utilruntime.HandleError(fmt.Errorf("Error syncing job: %v", err))
	jm.queue.AddRateLimited(key)

	return true
}
  • WaitForCacheSync等待jobController cache同步;

  • 啟動5個goruntine,每個協(xié)程分別執(zhí)行worker,每個worker執(zhí)行完后等待1s,繼續(xù)執(zhí)行,如此循環(huán);

  • worker負責從從queue中g(shù)et job key,對每個job,調(diào)用syncJob進行同步,如果syncJob成功,則forget the job(其實就是讓rate limiter 停止tracking it),否則將該key再次加入到queue中,等待下次sync。

syncJob

// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
// concurrently with the same key.
func (jm *JobController) syncJob(key string) (bool, error) {
	startTime := time.Now()
	defer func() {
		glog.V(4).Infof("Finished syncing job %q (%v)", key, time.Now().Sub(startTime))
	}()

	ns, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		return false, err
	}
	if len(ns) == 0 || len(name) == 0 {
		return false, fmt.Errorf("invalid job key %q: either namespace or name is missing", key)
	}
	sharedJob, err := jm.jobLister.Jobs(ns).Get(name)
	if err != nil {
		if errors.IsNotFound(err) {
			glog.V(4).Infof("Job has been deleted: %v", key)
			jm.expectations.DeleteExpectations(key)
			return true, nil
		}
		return false, err
	}
	job := *sharedJob

	// if job was finished previously, we don't want to redo the termination
	if IsJobFinished(&job) {
		return true, nil
	}

	// retrieve the previous number of retry
	previousRetry := jm.queue.NumRequeues(key)

	// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
	// and update the expectations after we've retrieved active pods from the store. If a new pod enters
	// the store after we've checked the expectation, the job sync is just deferred till the next relist.
	jobNeedsSync := jm.expectations.SatisfiedExpectations(key)

	pods, err := jm.getPodsForJob(&job)
	if err != nil {
		return false, err
	}

	activePods := controller.FilterActivePods(pods)
	active := int32(len(activePods))
	succeeded, failed := getStatus(pods)
	conditions := len(job.Status.Conditions)
	// job first start
	if job.Status.StartTime == nil {
		now := metav1.Now()
		job.Status.StartTime = &now
		// enqueue a sync to check if job past ActiveDeadlineSeconds
		if job.Spec.ActiveDeadlineSeconds != nil {
			glog.V(4).Infof("Job %s have ActiveDeadlineSeconds will sync after %d seconds",
				key, *job.Spec.ActiveDeadlineSeconds)
			jm.queue.AddAfter(key, time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second)
		}
	}

	var manageJobErr error
	jobFailed := false
	var failureReason string
	var failureMessage string

	jobHaveNewFailure := failed > job.Status.Failed

	// check if the number of failed jobs increased since the last syncJob
	if jobHaveNewFailure && (int32(previousRetry)+1 > *job.Spec.BackoffLimit) {
		jobFailed = true
		failureReason = "BackoffLimitExceeded"
		failureMessage = "Job has reach the specified backoff limit"
	} else if pastActiveDeadline(&job) {
		jobFailed = true
		failureReason = "DeadlineExceeded"
		failureMessage = "Job was active longer than specified deadline"
	}

	if jobFailed {
		errCh := make(chan error, active)
		jm.deleteJobPods(&job, activePods, errCh)
		select {
		case manageJobErr = <-errCh:
			if manageJobErr != nil {
				break
			}
		default:
		}

		// update status values accordingly
		failed += active
		active = 0
		job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, failureReason, failureMessage))
		jm.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage)
	} else {
		if jobNeedsSync && job.DeletionTimestamp == nil {
			active, manageJobErr = jm.manageJob(activePods, succeeded, &job)
		}
		completions := succeeded
		complete := false
		if job.Spec.Completions == nil {
			// This type of job is complete when any pod exits with success.
			// Each pod is capable of
			// determining whether or not the entire Job is done.  Subsequent pods are
			// not expected to fail, but if they do, the failure is ignored.  Once any
			// pod succeeds, the controller waits for remaining pods to finish, and
			// then the job is complete.
			if succeeded > 0 && active == 0 {
				complete = true
			}
		} else {
			// Job specifies a number of completions.  This type of job signals
			// success by having that number of successes.  Since we do not
			// start more pods than there are remaining completions, there should
			// not be any remaining active pods once this count is reached.
			if completions >= *job.Spec.Completions {
				complete = true
				if active > 0 {
					jm.recorder.Event(&job, v1.EventTypeWarning, "TooManyActivePods", "Too many active pods running after completion count reached")
				}
				if completions > *job.Spec.Completions {
					jm.recorder.Event(&job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached")
				}
			}
		}
		if complete {
			job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", ""))
			now := metav1.Now()
			job.Status.CompletionTime = &now
		}
	}

	forget := false
	// no need to update the job if the status hasn't changed since last time
	if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || len(job.Status.Conditions) != conditions {
		job.Status.Active = active
		job.Status.Succeeded = succeeded
		job.Status.Failed = failed

		if err := jm.updateHandler(&job); err != nil {
			return false, err
		}

		if jobHaveNewFailure && !IsJobFinished(&job) {
			// returning an error will re-enqueue Job after the backoff period
			return false, fmt.Errorf("failed pod(s) detected for job key %q", key)
		}

		forget = true
	}

	return forget, manageJobErr
}
  • 從Indexer中查找指定的Job是否存在,如果不存在,則從expectations中刪除該job,流程結(jié)束返回true。否則繼續(xù)下面流程。

  • 根據(jù)JobCondition Complete or Failed判斷Job是否Finished,如果Finished,則流程結(jié)束返回true,否則繼續(xù)下面流程。

  • 調(diào)用SatisfiedExpectations,如果ControlleeExpectations中待add和del都<=0,或者expectations已經(jīng)超過5分鐘沒更新過了,則返回jobNeedsSync=true,表示需要進行一次manageJob了。

  • 對于那些第一次啟動的jobs (StartTime==nil), 需要把設置StartTime,并且如果ActiveDeadlineSeconds不為空,則經(jīng)過ActiveDeadlineSeconds后再次把該job加入到queue中進行sync。

  • 獲取該job管理的所有pods,過濾出activePods,計算出actived,successed,failed pods的數(shù)量。如果failed > job.Status.Failed,說明該job又有新failed Pods了,則jobHaveNewFailure為true。

  • 如果jobHaveNewFailure,并且queue記錄的該job retry次數(shù)加1,比job.Spec.BackoffLimit(默認為6),則表示該job BackoffLimitExceeded,jobFailed。如果job StartTime到現(xiàn)在為止的歷時>=ActiveDeadlineSeconds,則表示該job DeadlineExceeded,jobFailed。

  • 如果jobFailed,則用sync.WaitGroup并發(fā)等待刪除所有的前面過濾出來的activePods,刪除成功,則failed += acitve, active = 0, 并設置Condition Failed為true。

  • 如果job not failed, jobNeedSync為true,并且job的DeletionTimestamp為空(沒有標記為刪除),則調(diào)用manageJob對Job管理的pods根據(jù)復雜的策略進行add or del。

  • 如果job not failed且job.Spec.Completions為nil,表示This type of job is complete when any pod exits with success。因此如果succeeded > 0 && active == 0,則表示job completed。

  • 如果如果job not failed且job.Spec.Completions不為nil,表示This type of job signals success by having that number of successes。因此如果succeeded >= job.Spec.Completions,則表示job completed。

  • 如果job completed,則更新其Conditions Complete為true,并設置CompletionTime。

  • 接下來invoke updateJobStatus更新etcd中job狀態(tài),如果更新失敗,則返回false,該job將再次加入queue。如果jobHaveNewFailure為true,并且Job Condition顯示該Job not Finished,則返回false,該job將再次加入queue。

manageJob

// manageJob is the core method responsible for managing the number of running
// pods according to what is specified in the job.Spec.
// Does NOT modify <activePods>.
func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) (int32, error) {
	var activeLock sync.Mutex
	active := int32(len(activePods))
	parallelism := *job.Spec.Parallelism
	jobKey, err := controller.KeyFunc(job)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err))
		return 0, nil
	}

	var errCh chan error
	if active > parallelism {
		diff := active - parallelism
		errCh = make(chan error, diff)
		jm.expectations.ExpectDeletions(jobKey, int(diff))
		glog.V(4).Infof("Too many pods running job %q, need %d, deleting %d", jobKey, parallelism, diff)
		// Sort the pods in the order such that not-ready < ready, unscheduled
		// < scheduled, and pending < running. This ensures that we delete pods
		// in the earlier stages whenever possible.
		sort.Sort(controller.ActivePods(activePods))

		active -= diff
		wait := sync.WaitGroup{}
		wait.Add(int(diff))
		for i := int32(0); i < diff; i++ {
			go func(ix int32) {
				defer wait.Done()
				if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job); err != nil {
					defer utilruntime.HandleError(err)
					// Decrement the expected number of deletes because the informer won't observe this deletion
					glog.V(2).Infof("Failed to delete %v, decrementing expectations for job %q/%q", activePods[ix].Name, job.Namespace, job.Name)
					jm.expectations.DeletionObserved(jobKey)
					activeLock.Lock()
					active++
					activeLock.Unlock()
					errCh <- err
				}
			}(i)
		}
		wait.Wait()

	} else if active < parallelism {
		wantActive := int32(0)
		if job.Spec.Completions == nil {
			// Job does not specify a number of completions.  Therefore, number active
			// should be equal to parallelism, unless the job has seen at least
			// once success, in which leave whatever is running, running.
			if succeeded > 0 {
				wantActive = active
			} else {
				wantActive = parallelism
			}
		} else {
			// Job specifies a specific number of completions.  Therefore, number
			// active should not ever exceed number of remaining completions.
			wantActive = *job.Spec.Completions - succeeded
			if wantActive > parallelism {
				wantActive = parallelism
			}
		}
		diff := wantActive - active
		if diff < 0 {
			utilruntime.HandleError(fmt.Errorf("More active than wanted: job %q, want %d, have %d", jobKey, wantActive, active))
			diff = 0
		}
		jm.expectations.ExpectCreations(jobKey, int(diff))
		errCh = make(chan error, diff)
		glog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff)

		active += diff
		wait := sync.WaitGroup{}

		// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
		// and double with each successful iteration in a kind of "slow start".
		// This handles attempts to start large numbers of pods that would
		// likely all fail with the same error. For example a project with a
		// low quota that attempts to create a large number of pods will be
		// prevented from spamming the API service with the pod create requests
		// after one of its pods fails.  Conveniently, this also prevents the
		// event spam that those failures would generate.
		for batchSize := int32(integer.IntMin(int(diff), controller.SlowStartInitialBatchSize)); diff > 0; batchSize = integer.Int32Min(2*batchSize, diff) {
			errorCount := len(errCh)
			wait.Add(int(batchSize))
			for i := int32(0); i < batchSize; i++ {
				go func() {
					defer wait.Done()
					err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind))
					if err != nil && errors.IsTimeout(err) {
						// Pod is created but its initialization has timed out.
						// If the initialization is successful eventually, the
						// controller will observe the creation via the informer.
						// If the initialization fails, or if the pod keeps
						// uninitialized for a long time, the informer will not
						// receive any update, and the controller will create a new
						// pod when the expectation expires.
						return
					}
					if err != nil {
						defer utilruntime.HandleError(err)
						// Decrement the expected number of creates because the informer won't observe this pod
						glog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name)
						jm.expectations.CreationObserved(jobKey)
						activeLock.Lock()
						active--
						activeLock.Unlock()
						errCh <- err
					}
				}()
			}
			wait.Wait()
			// any skipped pods that we never attempted to start shouldn't be expected.
			skippedPods := diff - batchSize
			if errorCount < len(errCh) && skippedPods > 0 {
				glog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for job %q/%q", skippedPods, job.Namespace, job.Name)
				active -= skippedPods
				for i := int32(0); i < skippedPods; i++ {
					// Decrement the expected number of creates because the informer won't observe this pod
					jm.expectations.CreationObserved(jobKey)
				}
				// The skipped pods will be retried later. The next controller resync will
				// retry the slow start process.
				break
			}
			diff -= batchSize
		}
	}

	select {
	case err := <-errCh:
		// all errors have been reported before, we only need to inform the controller that there was an error and it should re-try this job once more next time.
		if err != nil {
			return active, err
		}
	default:
	}

	return active, nil
}
  • 如果active > job.Spec.Parallelism, 表示要scale down:

    • 計算active與parallelism的差值diff,修改ControllerExpectations中該job的dels為diff,表示要刪除diff這么多的pod。

    • 計算active與parallelism的差值diff,修改ControllerExpectations中該job的dels為diff,表示要刪除diff這么多的pod。

    • 將activePods中的Pods按照not-ready < ready, unscheduled < scheduled, pending < running進行排序,確保先刪除stage越早的pods。

    • 更新active (active減去diff),用sync.WaitGroup并發(fā)等待刪除etcd中那些Pods。如果刪除某個Pod失敗,active要加1,expectations中dels要減1.

    • 返回active

  • 如果active < job.Spec.Parallelism, 表示要scale up:

    • 如果job.Spec.Completions為nil,且succeeded大于0,則diff設為0;如果job.Spec.Completions為nil,但successed = 0,則diff為 parallelism-active;如果job.Spec.Completions不為nil,則diff為max(job.Spec.Completions - succeeded,parallelim) - active;

    • 修改ControllerExpectations中該job的adds為diff,表示要新增diff這么多的pod。

    • 更新active (active加上diff),用sync.WaitGroup分批的創(chuàng)建Pods,第一批創(chuàng)建1個(代碼寫死SlowStartInitialBatchSize = 1),第二批創(chuàng)建2,然后4,8,16...這樣下去,但是每次不能超過diff的值。每一批創(chuàng)建pod后,注意更新diff的值(減去batchsize)。如果某一批創(chuàng)建過程Pods中存在失敗情況,則更新active和expectations中adds,且不進行后續(xù)未啟動的批量創(chuàng)建pods行為。

  • 如果active == job.Spec.Parallelism,返回active。

感謝各位的閱讀,以上就是“Kubernetes Job Controller怎么構(gòu)造”的內(nèi)容了,經(jīng)過本文的學習后,相信大家對Kubernetes Job Controller怎么構(gòu)造這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI