溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

kubernetes Volume有什么作用

發(fā)布時間:2021-12-20 10:08:46 來源:億速云 閱讀:146 作者:iii 欄目:云計算

本篇內(nèi)容主要講解“kubernetes Volume有什么作用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“kubernetes Volume有什么作用”吧!

VolumeBinder in Scheduler

VolumeBinder是Kubernetes default scheduler中的一個模塊。

pkg/scheduler/volumebinder/volume_binder.go:33

// VolumeBinder sets up the volume binding library and manages
// the volume binding operations with a queue.
type VolumeBinder struct {
	Binder    persistentvolume.SchedulerVolumeBinder
	BindQueue *workqueue.Type
}
  • 它維護(hù)著一個FIFO類型的BindQueue,BindQueue中存放著待Volume Bind的Pods;

  • Binder(persistentvolume.SchedulerVolumeBinder)是PV Controller內(nèi)的功能子模塊,用于提供給scheduler在調(diào)度時處理PV/PVC Binding和Dynamic Provisioning。

SchedulerVolumeBinder

SchedulerVolumeBinder用于調(diào)度時Volume Bind的考慮,以保證調(diào)度后的Node也滿足Pod所需的PV NodeAffinity需求,而不僅是Resource Request等其他Predicate Policies得到滿足。它實(shí)際上是基于StorageClass的VolumeBindingMode為WaitForFirstConsumer來決定要延遲Bind PV,然后schduler predicate時等待并確保Pod的all PVCs均成功Bind到滿足條件的PVs時,才會最終觸發(fā)Bind API完成Pod和Node的Bind。

pkg/controller/volume/persistentvolume/scheduler_binder.go:58

// SchedulerVolumeBinder is used by the scheduler to handle PVC/PV binding
// and dynamic provisioning.  The binding decisions are integrated into the pod scheduling
// workflow so that the PV NodeAffinity is also considered along with the pod's other
// scheduling requirements.
//
// This integrates into the existing default scheduler workflow as follows:
// 1. The scheduler takes a Pod off the scheduler queue and processes it serially:
//    a. Invokes all predicate functions, parallelized across nodes.  FindPodVolumes() is invoked here.
//    b. Invokes all priority functions.  Future/TBD
//    c. Selects the best node for the Pod.
//    d. Cache the node selection for the Pod. (Assume phase)
//       i.  If PVC binding is required, cache in-memory only:
//           * Updated PV objects for prebinding to the corresponding PVCs.
//           * For the pod, which PVs need API updates.
//           AssumePodVolumes() is invoked here.  Then BindPodVolumes() is called asynchronously by the
//           scheduler.  After BindPodVolumes() is complete, the Pod is added back to the scheduler queue
//           to be processed again until all PVCs are bound.
//       ii. If PVC binding is not required, cache the Pod->Node binding in the scheduler's pod cache,
//           and asynchronously bind the Pod to the Node.  This is handled in the scheduler and not here.
// 2. Once the assume operation is done, the scheduler processes the next Pod in the scheduler queue
//    while the actual binding operation occurs in the background.
type SchedulerVolumeBinder interface {
	// FindPodVolumes checks if all of a Pod's PVCs can be satisfied by the node.
	//
	// If a PVC is bound, it checks if the PV's NodeAffinity matches the Node.
	// Otherwise, it tries to find an available PV to bind to the PVC.
	//
	// It returns true if all of the Pod's PVCs have matching PVs or can be dynamic provisioned,
	// and returns true if bound volumes satisfy the PV NodeAffinity.
	//
	// This function is called by the volume binding scheduler predicate and can be called in parallel
	FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisified, boundVolumesSatisfied bool, err error)

	// AssumePodVolumes will:
	// 1. Take the PV matches for unbound PVCs and update the PV cache assuming
	// that the PV is prebound to the PVC.
	// 2. Take the PVCs that need provisioning and update the PVC cache with related
	// annotations set.
	//
	// It returns true if all volumes are fully bound, and returns true if any volume binding/provisioning
	// API operation needs to be done afterwards.
	//
	// This function will modify assumedPod with the node name.
	// This function is called serially.
	AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (allFullyBound bool, bindingRequired bool, err error)

	// BindPodVolumes will:
	// 1. Initiate the volume binding by making the API call to prebind the PV
	// to its matching PVC.
	// 2. Trigger the volume provisioning by making the API call to set related
	// annotations on the PVC
	//
	// This function can be called in parallel.
	BindPodVolumes(assumedPod *v1.Pod) error

	// GetBindingsCache returns the cache used (if any) to store volume binding decisions.
	GetBindingsCache() PodBindingCache
}

SchedulerVolumeBinder Interface包含如下三個方法:

  • FindPodVolumes:該方法被scheduler進(jìn)行VolumeBindingChecker predicate policy執(zhí)行時候調(diào)用,用于檢查Pod的PVCs是否都能被該Node滿足。如果PVC已經(jīng)Bound成功,會檢查是否對應(yīng)的PV的NodeAffinity是否與Node能匹配上。如果PVC還沒有Bound被Bound,將試圖從PV cache中查找是否有合適的PV能與該P(yáng)VC進(jìn)行Bound。返回值unboundVolumesSatisified,boundVolumesSatisfied分別表示:

    • unboundVolumesSatisified:bool,true表示Pod的所有PVCs都已經(jīng)成功Bound,或者可以Dynamic Provisioned(local volume目前只支持static provisioned),否則返回false。

    • boundVolumesSatisfied:bool,true表示已經(jīng)Bound的Volumes能滿足PV的NodeAffinity。

  • AssumePodVolumes:當(dāng)scheduler完成predicate和priority調(diào)度邏輯后,接著會執(zhí)行該方法。為Pod中那些還沒被Bound的PVCs尋找合適的PVs,并更新PV cache,完成PVs和PVCs的prebound操作(對于需要Dynamic Provisioning的PVC加上Annotation:"pv.kubernetes.io/bound-by-controller")。如果是需要Dynamic Provisioning的PVCs,那么更新PVC cache中這些PVCs的相關(guān)Annotations:"volume.alpha.kubernetes.io/selected-node=$nodeName",也相當(dāng)于prebound操作。返回值allFullyBound,bindingRequired分別表示:

    • allFullyBound:bool,true表示Pod對應(yīng)的所有PVCs都已經(jīng)完成Bound,否則返回false。

    • bindingRequired:bool,true表示還有volume binding/provisioning的API操作還需要進(jìn)行,否則返回false。

  • BindPodVolumes:根據(jù)podBindingCache中信息,調(diào)用API完成PV,PVC的PreBind,然后PV Controller watch到這一事件再去完成真正的Bound操作。

  • GetBindingsCache:返回PodBindingCache內(nèi)容。

Scheduler中VolumeBinder的初始化由volumebinder.NewVolumeBinder完成。

pkg/scheduler/volumebinder/volume_binder.go:39

// NewVolumeBinder sets up the volume binding library and binding queue
func NewVolumeBinder(
	client clientset.Interface,
	pvcInformer coreinformers.PersistentVolumeClaimInformer,
	pvInformer coreinformers.PersistentVolumeInformer,
	storageClassInformer storageinformers.StorageClassInformer) *VolumeBinder {

	return &VolumeBinder{
		Binder:    persistentvolume.NewVolumeBinder(client, pvcInformer, pvInformer, storageClassInformer),
		BindQueue: workqueue.NewNamed("podsToBind"),
	}
}

scheduler volumebinder.NewVolumeBinder負(fù)責(zé):

  • 調(diào)用persistentvolume.NewVolumeBinder完成Binder對象的初始化,需要pvInformer, pvcInformer,storageClassInformer。

  • 創(chuàng)建podsToBind BindQueue,用于存放待Bind的Pods FIFIO隊(duì)列。

在Scheduler NewConfigFactory中調(diào)用volumebinder.NewVolumeBinder完成其初始化,其中很重要的部分是完成pvcInformer, pvInformer, storageClassInformer的初始化,然后傳遞給persistentvolume.NewVolumeBinder完成Binder的創(chuàng)建。

pkg/scheduler/factory/factory.go:145

// NewConfigFactory initializes the default implementation of a Configurator To encourage eventual privatization of the struct type, we only
// return the interface.
func NewConfigFactory(
	schedulerName string,
	client clientset.Interface,
	nodeInformer coreinformers.NodeInformer,
	podInformer coreinformers.PodInformer,
	pvInformer coreinformers.PersistentVolumeInformer,
	pvcInformer coreinformers.PersistentVolumeClaimInformer,
	replicationControllerInformer coreinformers.ReplicationControllerInformer,
	replicaSetInformer extensionsinformers.ReplicaSetInformer,
	statefulSetInformer appsinformers.StatefulSetInformer,
	serviceInformer coreinformers.ServiceInformer,
	pdbInformer policyinformers.PodDisruptionBudgetInformer,
	storageClassInformer storageinformers.StorageClassInformer,
	hardPodAffinitySymmetricWeight int32,
	enableEquivalenceClassCache bool,
	disablePreemption bool,
) scheduler.Configurator {
	stopEverything := make(chan struct{})
	schedulerCache := schedulercache.New(30*time.Second, stopEverything)

	// storageClassInformer is only enabled through VolumeScheduling feature gate
	var storageClassLister storagelisters.StorageClassLister
	if storageClassInformer != nil {
		storageClassLister = storageClassInformer.Lister()
	}

	...
	
	// On add and delete of PVs, it will affect equivalence cache items
	// related to persistent volume
	pvInformer.Informer().AddEventHandler(
		cache.ResourceEventHandlerFuncs{
			// MaxPDVolumeCountPredicate: since it relies on the counts of PV.
			AddFunc:    c.onPvAdd,
			UpdateFunc: c.onPvUpdate,
			DeleteFunc: c.onPvDelete,
		},
	)
	c.pVLister = pvInformer.Lister()

	// This is for MaxPDVolumeCountPredicate: add/delete PVC will affect counts of PV when it is bound.
	pvcInformer.Informer().AddEventHandler(
		cache.ResourceEventHandlerFuncs{
			AddFunc:    c.onPvcAdd,
			UpdateFunc: c.onPvcUpdate,
			DeleteFunc: c.onPvcDelete,
		},
	)
	c.pVCLister = pvcInformer.Lister()

	...

	if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
		// Setup volume binder
		c.volumeBinder = volumebinder.NewVolumeBinder(client, pvcInformer, pvInformer, storageClassInformer)

		storageClassInformer.Informer().AddEventHandler(
			cache.ResourceEventHandlerFuncs{
				AddFunc:    c.onStorageClassAdd,
				DeleteFunc: c.onStorageClassDelete,
			},
		)
	}

	...

	return c
}

scheduler volumebinder.NewVolumeBinder的調(diào)用前提是Enable VolumeScheduling Feature Gate。

volumeBinder in PV Controller

前面提到,scheduler volumebinder.NewVolumeBinder在初始化Binder時是通過persistentvolume.NewVolumeBinder完成的,因此這里我們將對persistentvolume.volumeBinder進(jìn)行分析。

PV Contorller中的volumeBinder就是前面提到的SchedulerVolumeBinder Interface的實(shí)現(xiàn),實(shí)現(xiàn)了其中的FindPodVolumes、AssumePodVolumes、BindPodVolumes、GetBindingsCache這些接口。

pkg/controller/volume/persistentvolume/scheduler_binder.go:96

type volumeBinder struct {
	ctrl *PersistentVolumeController

	pvcCache PVCAssumeCache
	pvCache  PVAssumeCache

	// Stores binding decisions that were made in FindPodVolumes for use in AssumePodVolumes.
	// AssumePodVolumes modifies the bindings again for use in BindPodVolumes.
	podBindingCache PodBindingCache
}

pkg/controller/volume/persistentvolume/scheduler_binder.go:108
// NewVolumeBinder sets up all the caches needed for the scheduler to make volume binding decisions.
func NewVolumeBinder(
	kubeClient clientset.Interface,
	pvcInformer coreinformers.PersistentVolumeClaimInformer,
	pvInformer coreinformers.PersistentVolumeInformer,
	storageClassInformer storageinformers.StorageClassInformer) SchedulerVolumeBinder {

	// TODO: find better way...
	ctrl := &PersistentVolumeController{
		kubeClient:  kubeClient,
		classLister: storageClassInformer.Lister(),
	}

	b := &volumeBinder{
		ctrl:            ctrl,
		pvcCache:        NewPVCAssumeCache(pvcInformer.Informer()),
		pvCache:         NewPVAssumeCache(pvInformer.Informer()),
		podBindingCache: NewPodBindingCache(),
	}

	return b
}

volumeBinder struct主要包含pvController實(shí)例、pvCache、pvcCache、podBindingCache。

podBindingCache結(jié)構(gòu)體是我們需要關(guān)注的:

pkg/controller/volume/persistentvolume/scheduler_binder_cache.go:48

type podBindingCache struct {
	mutex sync.Mutex

	// Key = pod name
	// Value = nodeDecisions
	bindingDecisions map[string]nodeDecisions
}

// Key = nodeName
// Value = bindings & provisioned PVCs of the node
type nodeDecisions map[string]nodeDecision

// A decision includes bindingInfo and provisioned PVCs of the node
type nodeDecision struct {
	bindings      []*bindingInfo
	provisionings []*v1.PersistentVolumeClaim
}

type bindingInfo struct {
	// Claim that needs to be bound
	pvc *v1.PersistentVolumeClaim

	// Proposed PV to bind to this claim
	pv *v1.PersistentVolume
}

kubernetes Volume有什么作用

VolumeBindingChecker Predicate

在Scheduler NewConfigFactory時完成VolumeBinder的創(chuàng)建,然后CheckVolumeBinding Predicate Policy注冊到default scheduler。注意默認(rèn)的所有predicate policies的執(zhí)行是有先后順序的:

predicatesOrdering = []string{CheckNodeConditionPred, CheckNodeUnschedulablePred,
		GeneralPred, HostNamePred, PodFitsHostPortsPred,
		MatchNodeSelectorPred, PodFitsResourcesPred, NoDiskConflictPred,
		PodToleratesNodeTaintsPred, PodToleratesNodeNoExecuteTaintsPred, CheckNodeLabelPresencePred,
		CheckServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred,
		MaxAzureDiskVolumeCountPred, CheckVolumeBindingPred, NoVolumeZoneConflictPred,
		CheckNodeMemoryPressurePred, CheckNodePIDPressurePred, CheckNodeDiskPressurePred, MatchInterPodAffinityPred}

VolumeBindingChecker.predicate就是對應(yīng)的predicate實(shí)現(xiàn)。

pkg/scheduler/algorithm/predicates/predicates.go:1680

func (c *VolumeBindingChecker) predicate(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
	if !utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
		return true, nil, nil
	}

	node := nodeInfo.Node()
	if node == nil {
		return false, nil, fmt.Errorf("node not found")
	}

	unboundSatisfied, boundSatisfied, err := c.binder.Binder.FindPodVolumes(pod, node)
	if err != nil {
		return false, nil, err
	}

	failReasons := []algorithm.PredicateFailureReason{}
	if !boundSatisfied {
		glog.V(5).Infof("Bound PVs not satisfied for pod %v/%v, node %q", pod.Namespace, pod.Name, node.Name)
		failReasons = append(failReasons, ErrVolumeNodeConflict)
	}

	if !unboundSatisfied {
		glog.V(5).Infof("Couldn't find matching PVs for pod %v/%v, node %q", pod.Namespace, pod.Name, node.Name)
		failReasons = append(failReasons, ErrVolumeBindConflict)
	}

	if len(failReasons) > 0 {
		return false, failReasons, nil
	}

	// All volumes bound or matching PVs found for all unbound PVCs
	glog.V(5).Infof("All PVCs found matches for pod %v/%v, node %q", pod.Namespace, pod.Name, node.Name)
	return true, nil, nil
}
  • 需要確認(rèn)VolumeScheduling Feature Gate Enabled。

  • 調(diào)用volumeBinder.FindPodVolumes檢查Pod的PVCs是否都能被該Node滿足。

調(diào)度時VolumeBindingChecker失敗會怎么樣

如果VolumeBindingChecker.predicate失敗,會怎么樣?熟悉scheduler邏輯的同學(xué)應(yīng)該知道,調(diào)度失敗,會觸發(fā)MakeDefaultErrorFunc。

func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue core.SchedulingQueue) func(pod *v1.Pod, err error) {
	return func(pod *v1.Pod, err error) {
		
		...

		backoff.Gc()
		// Retry asynchronously.
		// Note that this is extremely rudimentary and we need a more real error handling path.
		go func() {
			defer runtime.HandleCrash()
			podID := types.NamespacedName{
				Namespace: pod.Namespace,
				Name:      pod.Name,
			}
			origPod := pod

			// When pod priority is enabled, we would like to place an unschedulable
			// pod in the unschedulable queue. This ensures that if the pod is nominated
			// to run on a node, scheduler takes the pod into account when running
			// predicates for the node.
			if !util.PodPriorityEnabled() {
				entry := backoff.GetEntry(podID)
				if !entry.TryWait(backoff.MaxDuration()) {
					glog.Warningf("Request for pod %v already in flight, abandoning", podID)
					return
				}
			}
			// Get the pod again; it may have changed/been scheduled already.
			getBackoff := initialGetBackoff
			for {
				pod, err := c.client.CoreV1().Pods(podID.Namespace).Get(podID.Name, metav1.GetOptions{})
				if err == nil {
					if len(pod.Spec.NodeName) == 0 {
						podQueue.AddUnschedulableIfNotPresent(pod)
					} else {
						if c.volumeBinder != nil {
							// Volume binder only wants to keep unassigned pods
							c.volumeBinder.DeletePodBindings(pod)
						}
					}
					break
				}
				if errors.IsNotFound(err) {
					glog.Warningf("A pod %v no longer exists", podID)

					if c.volumeBinder != nil {
						// Volume binder only wants to keep unassigned pods
						c.volumeBinder.DeletePodBindings(origPod)
					}
					return
			
				glog.Errorf("Error getting pod %v for retry: %v; retrying...", podID, err)
				if getBackoff = getBackoff * 2; getBackoff > maximalGetBackoff {
					getBackoff = maximalGetBackoff
				}
				time.Sleep(getBackoff)
			}
		}()
	}
}

MakeDefaultErrorFunc會對調(diào)度失敗的Pod進(jìn)行異步重試:

  • 如果pod.Spec.NodeName不為空,并且volumeBinder不為空(意味著Enable VolumeScheduling Feature Gate),則調(diào)用podBindingCache.DeleteBindings將該pod對應(yīng)的bindingDecisions從podBindingCache中刪除,因?yàn)関olumeBinder僅處理unassigned pods。

  • 如果該P(yáng)od已經(jīng)被API刪除了,并且volumeBinder不為空(意味著Enable VolumeScheduling Feature Gate),同樣的調(diào)用podBindingCache.DeleteBindings將該pod對應(yīng)的bindingDecisions從podBindingCache中刪除,因?yàn)関olumeBinder僅處理unassigned pods。

調(diào)度時VolumeBindingChecker成功會如何?

NewConfigFactory中注冊了從unscheduled pod queue中刪除pod(意味著調(diào)度成功)的Event handler:deletePodFromSchedulingQueue。

pkg/scheduler/factory/factory.go:745

func (c *configFactory) deletePodFromSchedulingQueue(obj interface{}) {
	var pod *v1.Pod
	...
	if err := c.podQueue.Delete(pod); err != nil {
		runtime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err))
	}
	if c.volumeBinder != nil {
		// Volume binder only wants to keep unassigned pods
		c.volumeBinder.DeletePodBindings(pod)
	}
}

deletePodFromSchedulingQueue的處理邏輯,除了將pod從podQueue中刪除外,如果volumeBinder不為空(意味著Enable VolumeScheduling Feature Gate),還需要同MakeDefaultErrorFunc一樣,調(diào)用podBindingCache.DeleteBindings將該pod對應(yīng)的bindingDecisions從podBindingCache中刪除,因?yàn)関olumeBinder僅處理unassigned pods。

volumeBinder

接下來,我們看看volumeBinder的各個接口的實(shí)現(xiàn),及何時被調(diào)用。

FindPodVolumes

前面分析VolumeBindingChecker Predicate的時看到,其中調(diào)用了volumeBinder.FindPodVolumes。

FindPodVolumes用于檢查Pod的PVCs是否都能被該Node滿足。

  • 如果PVC已經(jīng)Bound成功,會檢查是否對應(yīng)的PV的NodeAffinity是否與Node能匹配上。

  • 如果PVC還沒有Bound被Bound,將試圖從PV cache中查找是否有合適的PV能與該P(yáng)VC進(jìn)行Bound。返回值unboundVolumesSatisified,boundVolumesSatisfied分別表示:

    • unboundVolumesSatisified:bool,true表示Pod的所有PVCs都已經(jīng)成功Bound,或者可以Dynamic Provisioned(local volume目前只支持static provisioned),否則返回false。

    • boundVolumesSatisfied:bool,true表示已經(jīng)Bound的Volumes能滿足PV的NodeAffinity。

pkg/controller/volume/persistentvolume/scheduler_binder.go:135

// FindPodVolumes caches the matching PVs and PVCs to provision per node in podBindingCache
func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisfied, boundVolumesSatisfied bool, err error) {
	podName := getPodName(pod)

	// Warning: Below log needs high verbosity as it can be printed several times (#60933).
	glog.V(5).Infof("FindPodVolumes for pod %q, node %q", podName, node.Name)

	// Initialize to true for pods that don't have volumes
	unboundVolumesSatisfied = true
	boundVolumesSatisfied = true

	// The pod's volumes need to be processed in one call to avoid the race condition where
	// volumes can get bound/provisioned in between calls.
	boundClaims, claimsToBind, unboundClaimsImmediate, err := b.getPodVolumes(pod)
	if err != nil {
		return false, false, err
	}

	// Immediate claims should be bound
	if len(unboundClaimsImmediate) > 0 {
		return false, false, fmt.Errorf("pod has unbound immediate PersistentVolumeClaims")
	}

	// Check PV node affinity on bound volumes
	if len(boundClaims) > 0 {
		boundVolumesSatisfied, err = b.checkBoundClaims(boundClaims, node, podName)
		if err != nil {
			return false, false, err
		}
	}

	if len(claimsToBind) > 0 {
		var claimsToProvision []*v1.PersistentVolumeClaim
		unboundVolumesSatisfied, claimsToProvision, err = b.findMatchingVolumes(pod, claimsToBind, node)
		if err != nil {
			return false, false, err
		}

		if utilfeature.DefaultFeatureGate.Enabled(features.DynamicProvisioningScheduling) {
			// Try to provision for unbound volumes
			if !unboundVolumesSatisfied {
				unboundVolumesSatisfied, err = b.checkVolumeProvisions(pod, claimsToProvision, node)
				if err != nil {
					return false, false, err
				}
			}
		}
	}

	return unboundVolumesSatisfied, boundVolumesSatisfied, nil
}

FindPodVolumes中調(diào)用了三個重要的方法:

  • getPodVolumes:將PVCs分成boundClaims、unboundClaims、unboundClaimsImmediate。

  • checkBoundClaims:如果boundClaims不為空,則checkBoundClaims Bound的PV的NodeAffinity是否與Node Labels匹配,如果匹配成功,則boundVolumesSatisfied為true。

  • findMatchingVolumes:如果claimsToBind不為空,則調(diào)用findMatchingVolumes從pvcache中選擇匹配條件的size smallestPV,如果沒有匹配成功的,則調(diào)用checkVolumeProvisions檢查是否dynamic provision。

下面我們重點(diǎn)看getPodVolumes、findMatchingVolumes和checkVolumeProvisions。

getPodVolumes
pkg/controller/volume/persistentvolume/scheduler_binder.go:359

// getPodVolumes returns a pod's PVCs separated into bound (including prebound), unbound with delayed binding,
// and unbound with immediate binding
func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentVolumeClaim, unboundClaims []*bindingInfo, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error) {
	boundClaims = []*v1.PersistentVolumeClaim{}
	unboundClaimsImmediate = []*v1.PersistentVolumeClaim{}
	unboundClaims = []*bindingInfo{}

	for _, vol := range pod.Spec.Volumes {
		volumeBound, pvc, err := b.isVolumeBound(pod.Namespace, &vol, false)
		if err != nil {
			return nil, nil, nil, err
		}
		if pvc == nil {
			continue
		}
		if volumeBound {
			boundClaims = append(boundClaims, pvc)
		} else {
			delayBinding, err := b.ctrl.shouldDelayBinding(pvc)
			if err != nil {
				return nil, nil, nil, err
			}
			if delayBinding {
				// Scheduler path
				unboundClaims = append(unboundClaims, &bindingInfo{pvc: pvc})
			} else {
				// Immediate binding should have already been bound
				unboundClaimsImmediate = append(unboundClaimsImmediate, pvc)
			}
		}
	}
	return boundClaims, unboundClaims, unboundClaimsImmediate, nil
}

getPodVolumes將pod的PVCs分成三類:

  • boundClaims:已經(jīng)Bound的PVCs,包括prebound;

  • unboundClaims:需要delay binding的unbound PVCs;

  • unboundClaimsImmediate:需要immediate binding的unbound PVCs;

那么什么樣的PVCs是delay binding的呢?我們看看shouldDelayBinding的邏輯:

func (ctrl *PersistentVolumeController) shouldDelayBinding(claim *v1.PersistentVolumeClaim) (bool, error) {
	if !utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
		return false, nil
	}

	if utilfeature.DefaultFeatureGate.Enabled(features.DynamicProvisioningScheduling) {
		// When feature DynamicProvisioningScheduling enabled,
		// Scheduler signal to the PV controller to start dynamic
		// provisioning by setting the "annSelectedNode" annotation
		// in the PVC
		if _, ok := claim.Annotations[annSelectedNode]; ok {
			return false, nil
		}
	}

	className := v1helper.GetPersistentVolumeClaimClass(claim)
	if className == "" {
		return false, nil
	}

	class, err := ctrl.classLister.Get(className)
	if err != nil {
		return false, nil
	}

	if class.VolumeBindingMode == nil {
		return false, fmt.Errorf("VolumeBindingMode not set for StorageClass %q", className)
	}

	return *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer, nil
}
  • 如果VolumeScheduling Feature Gate Disenable,則PVC都不屬于deley binding。

  • 如果DynamicProvisioningScheduling Feature Gate Enable,則檢查PVC的Annotation是否包含"volume.alpha.kubernetes.io/selected-node",如果包含該Annotation,則該P(yáng)VC不屬于delay binding。

  • 如果PVC對應(yīng)的storageClass為空或者該storageClass不存在,則該P(yáng)VC不屬于delay binding。

  • 如果PVC對應(yīng)的storageClass存在,且storageClass的VolumeBindingMode為空,則該P(yáng)VC不屬于delay binding。

  • 只有當(dāng)PVC對應(yīng)的storageClass存在,且storageClass的VolumeBindingMode為WaitForFirstConsumer,該P(yáng)VC才屬于delay binding。

findMatchingVolumes

如果getPodVolumes返回的claimsToBind不為空,則調(diào)用findMatchingVolumes從pvcache中選擇匹配條件的size smallestPV,如果沒有匹配成功的,則調(diào)用checkVolumeProvisions檢查是否dynamic provision。

pkg/controller/volume/persistentvolume/scheduler_binder.go:413

// findMatchingVolumes tries to find matching volumes for given claims,
// and return unbound claims for further provision.
func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingInfo, node *v1.Node) (foundMatches bool, unboundClaims []*v1.PersistentVolumeClaim, err error) {
	podName := getPodName(pod)
	// Sort all the claims by increasing size request to get the smallest fits
	sort.Sort(byPVCSize(claimsToBind))

	chosenPVs := map[string]*v1.PersistentVolume{}

	foundMatches = true
	matchedClaims := []*bindingInfo{}

	for _, bindingInfo := range claimsToBind {
		// Get storage class name from each PVC
		storageClassName := ""
		storageClass := bindingInfo.pvc.Spec.StorageClassName
		if storageClass != nil {
			storageClassName = *storageClass
		}
		allPVs := b.pvCache.ListPVs(storageClassName)

		// Find a matching PV
		bindingInfo.pv, err = findMatchingVolume(bindingInfo.pvc, allPVs, node, chosenPVs, true)
		if err != nil {
			return false, nil, err
		}
		if bindingInfo.pv == nil {
			glog.V(4).Infof("No matching volumes for Pod %q, PVC %q on node %q", podName, getPVCName(bindingInfo.pvc), node.Name)
			unboundClaims = append(unboundClaims, bindingInfo.pvc)
			foundMatches = false
			continue
		}

		// matching PV needs to be excluded so we don't select it again
		chosenPVs[bindingInfo.pv.Name] = bindingInfo.pv
		matchedClaims = append(matchedClaims, bindingInfo)
		glog.V(5).Infof("Found matching PV %q for PVC %q on node %q for pod %q", bindingInfo.pv.Name, getPVCName(bindingInfo.pvc), node.Name, podName)
	}

	// Mark cache with all the matches for each PVC for this node
	if len(matchedClaims) > 0 {
		b.podBindingCache.UpdateBindings(pod, node.Name, matchedClaims)
	}

	if foundMatches {
		glog.V(4).Infof("Found matching volumes for pod %q on node %q", podName, node.Name)
	}

	return
}
checkVolumeProvisions
pkg/controller/volume/persistentvolume/scheduler_binder.go:465

// checkVolumeProvisions checks given unbound claims (the claims have gone through func
// findMatchingVolumes, and do not have matching volumes for binding), and return true
// if all of the claims are eligible for dynamic provision.
func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied bool, err error) {
	podName := getPodName(pod)
	provisionedClaims := []*v1.PersistentVolumeClaim{}

	for _, claim := range claimsToProvision {
		className := v1helper.GetPersistentVolumeClaimClass(claim)
		if className == "" {
			return false, fmt.Errorf("no class for claim %q", getPVCName(claim))
		}

		class, err := b.ctrl.classLister.Get(className)
		if err != nil {
			return false, fmt.Errorf("failed to find storage class %q", className)
		}
		provisioner := class.Provisioner
		if provisioner == "" || provisioner == notSupportedProvisioner {
			glog.V(4).Infof("storage class %q of claim %q does not support dynamic provisioning", className, getPVCName(claim))
			return false, nil
		}

		// Check if the node can satisfy the topology requirement in the class
		if !v1helper.MatchTopologySelectorTerms(class.AllowedTopologies, labels.Set(node.Labels)) {
			glog.V(4).Infof("Node %q cannot satisfy provisioning topology requirements of claim %q", node.Name, getPVCName(claim))
			return false, nil
		}

		// TODO: Check if capacity of the node domain in the storage class
		// can satisfy resource requirement of given claim

		provisionedClaims = append(provisionedClaims, claim)

	}
	glog.V(4).Infof("Provisioning for claims of pod %q that has no matching volumes on node %q ...", podName, node.Name)

	// Mark cache with all the PVCs that need provisioning for this node
	b.podBindingCache.UpdateProvisionedPVCs(pod, node.Name, provisionedClaims)

	return true, nil
}
  • checkVolumeProvisions主要檢查對應(yīng)的PVC的storageClass的TopologySelectorTerm與Node Labels是否能匹配成功。

  • 如果匹配成功,則調(diào)用UpdateProvisionedPVCs更新podBindingCache的bindingDecisions。

AssumePodVolumes

volumeBinder的AssumePodVolumes啥時候被調(diào)用呢?我們看看scheduleOne的相關(guān)代碼:

scheduleOne invoke assumeAndBindVolumes
pkg/scheduler/scheduler.go:439

// scheduleOne does the entire scheduling workflow for a single pod.  It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne() {
	pod := sched.config.NextPod()
	
	...
	
	suggestedHost, err := sched.schedule(pod)
	
	...
	
	// Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
	// This allows us to keep scheduling without waiting on binding to occur.
	assumedPod := pod.DeepCopy()

	// Assume volumes first before assuming the pod.
	//
	// If no volumes need binding, then nil is returned, and continue to assume the pod.
	//
	// Otherwise, error is returned and volume binding is started asynchronously for all of the pod's volumes.
	// scheduleOne() returns immediately on error, so that it doesn't continue to assume the pod.
	//
	// After the asynchronous volume binding updates are made, it will send the pod back through the scheduler for
	// subsequent passes until all volumes are fully bound.
	//
	// This function modifies 'assumedPod' if volume binding is required.
	err = sched.assumeAndBindVolumes(assumedPod, suggestedHost)
	if err != nil {
		return
	}

	// assume modifies `assumedPod` by setting NodeName=suggestedHost
	err = sched.assume(assumedPod, suggestedHost)
	...
	// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
	go func() {
		err := sched.bind(assumedPod, &v1.Binding{
			...
		}
	}()
}

在sched.schedule(pod)完成pod的predicate,priority后,先調(diào)用sched.assumeAndBindVolumes,然后再調(diào)用sched.assume進(jìn)行pod assume,最后調(diào)用sched.bind進(jìn)行Bind操作。

assumeAndBindVolumes add assume pod to BindQueue
pkg/scheduler/scheduler.go:268

// assumeAndBindVolumes will update the volume cache and then asynchronously bind volumes if required.
//
// If volume binding is required, then the bind volumes routine will update the pod to send it back through
// the scheduler.
//
// Otherwise, return nil error and continue to assume the pod.
//
// This function modifies assumed if volume binding is required.
func (sched *Scheduler) assumeAndBindVolumes(assumed *v1.Pod, host string) error {
	if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
		allBound, bindingRequired, err := sched.config.VolumeBinder.Binder.AssumePodVolumes(assumed, host)
		if err != nil {
			sched.config.Error(assumed, err)
			sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "AssumePodVolumes failed: %v", err)
			sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
				Type:    v1.PodScheduled,
				Status:  v1.ConditionFalse,
				Reason:  "SchedulerError",
				Message: err.Error(),
			})
			return err
		}
		if !allBound {
			err = fmt.Errorf("Volume binding started, waiting for completion")
			if bindingRequired {
				if sched.config.Ecache != nil {
					invalidPredicates := sets.NewString(predicates.CheckVolumeBindingPred)
					sched.config.Ecache.InvalidatePredicates(invalidPredicates)
				}

				// bindVolumesWorker() will update the Pod object to put it back in the scheduler queue
				sched.config.VolumeBinder.BindQueue.Add(assumed)
			} else {
				// We are just waiting for PV controller to finish binding, put it back in the
				// scheduler queue
				sched.config.Error(assumed, err)
				sched.config.Recorder.Eventf(assumed, v1.EventTypeNormal, "FailedScheduling", "%v", err)
				sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
					Type:   v1.PodScheduled,
					Status: v1.ConditionFalse,
					Reason: "VolumeBindingWaiting",
				})
			}
			return err
		}
	}
	return nil
}

assumeAndBindVolumes調(diào)用volumeBinder.AssumePodVolumes。

pkg/controller/volume/persistentvolume/scheduler_binder.go:191

// AssumePodVolumes will take the cached matching PVs and PVCs to provision
// in podBindingCache for the chosen node, and:
// 1. Update the pvCache with the new prebound PV.
// 2. Update the pvcCache with the new PVCs with annotations set
// It will update podBindingCache again with the PVs and PVCs that need an API update.
func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (allFullyBound, bindingRequired bool, err error) {
	podName := getPodName(assumedPod)

	glog.V(4).Infof("AssumePodVolumes for pod %q, node %q", podName, nodeName)

	if allBound := b.arePodVolumesBound(assumedPod); allBound {
		glog.V(4).Infof("AssumePodVolumes for pod %q, node %q: all PVCs bound and nothing to do", podName, nodeName)
		return true, false, nil
	}

	assumedPod.Spec.NodeName = nodeName
	// Assume PV
	claimsToBind := b.podBindingCache.GetBindings(assumedPod, nodeName)
	newBindings := []*bindingInfo{}

	for _, binding := range claimsToBind {
		newPV, dirty, err := b.ctrl.getBindVolumeToClaim(binding.pv, binding.pvc)
		glog.V(5).Infof("AssumePodVolumes: getBindVolumeToClaim for pod %q, PV %q, PVC %q.  newPV %p, dirty %v, err: %v",
			podName,
			binding.pv.Name,
			binding.pvc.Name,
			newPV,
			dirty,
			err)
		if err != nil {
			b.revertAssumedPVs(newBindings)
			return false, true, err
		}
		if dirty {
			err = b.pvCache.Assume(newPV)
			if err != nil {
				b.revertAssumedPVs(newBindings)
				return false, true, err
			}

			newBindings = append(newBindings, &bindingInfo{pv: newPV, pvc: binding.pvc})
		}
	}

	// Don't update cached bindings if no API updates are needed.  This can happen if we
	// previously updated the PV object and are waiting for the PV controller to finish binding.
	if len(newBindings) != 0 {
		bindingRequired = true
		b.podBindingCache.UpdateBindings(assumedPod, nodeName, newBindings)
	}

	// Assume PVCs
	claimsToProvision := b.podBindingCache.GetProvisionedPVCs(assumedPod, nodeName)

	newProvisionedPVCs := []*v1.PersistentVolumeClaim{}
	for _, claim := range claimsToProvision {
		// The claims from method args can be pointing to watcher cache. We must not
		// modify these, therefore create a copy.
		claimClone := claim.DeepCopy()
		metav1.SetMetaDataAnnotation(&claimClone.ObjectMeta, annSelectedNode, nodeName)
		err = b.pvcCache.Assume(claimClone)
		if err != nil {
			b.revertAssumedPVs(newBindings)
			b.revertAssumedPVCs(newProvisionedPVCs)
			return
		}

		newProvisionedPVCs = append(newProvisionedPVCs, claimClone)
	}

	if len(newProvisionedPVCs) != 0 {
		bindingRequired = true
		b.podBindingCache.UpdateProvisionedPVCs(assumedPod, nodeName, newProvisionedPVCs)
	}

	return
}

volumeBinder.AssumePodVolumes主要邏輯:

  • 為Pod中那些還沒被Bound的PVCs尋找合適的PVs,并更新PV cache,完成PVs和PVCs的prebound操作(對于需要Dynamic Provisioning的PVC加上Annotation:"pv.kubernetes.io/bound-by-controller")。

  • 如果是需要Dynamic Provisioning的PVCs,那么更新PVC cache中這些PVCs的相關(guān)Annotations:"volume.alpha.kubernetes.io/selected-node=$nodeName",也相當(dāng)于prebound操作。返回值allFullyBound,bindingRequired分別表示:

    • allFullyBound:bool,true表示Pod對應(yīng)的所有PVCs都已經(jīng)完成Bound,否則返回false。

    • bindingRequired:bool,true表示還有volume binding/provisioning的API操作還需要進(jìn)行,否則返回false。

  • 如果allFullyBound為false,并且bindingRequired為true,則將pod加入到volumeBinder的BindQueue。

BindQueue中的Pods由bindVolumesWorker進(jìn)行逐個處理,其中會調(diào)用volumeBinder.BindPodVolumes完成volume binding operation,下面我們看看bindVolumesWorker干了啥。

bindVolumesWorker

bindVolumesWorker負(fù)責(zé)循環(huán)處理volumeBinder中的BindQueue內(nèi)的Pods,完成volume bind。我們得先知道bindVolumesWorker在哪里啟動的。

pkg/scheduler/scheduler.go:174

// Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately.
func (sched *Scheduler) Run() {
	if !sched.config.WaitForCacheSync() {
		return
	}

	if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
		go sched.config.VolumeBinder.Run(sched.bindVolumesWorker, sched.config.StopEverything)
	}

	go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}

在default scheduler啟動時,如果VolumeScheduling Feature Gate Enable,則會啟動bindVolumesWorker goroutine。

pkg/scheduler/scheduler.go:312

// bindVolumesWorker() processes pods queued in assumeAndBindVolumes() and tries to
// make the API update for volume binding.
// This function runs forever until the volume BindQueue is closed.
func (sched *Scheduler) bindVolumesWorker() {
	workFunc := func() bool {
		keyObj, quit := sched.config.VolumeBinder.BindQueue.Get()
		if quit {
			return true
		}
		defer sched.config.VolumeBinder.BindQueue.Done(keyObj)

		assumed, ok := keyObj.(*v1.Pod)
		if !ok {
			glog.V(4).Infof("Object is not a *v1.Pod")
			return false
		}

		// TODO: add metrics
		var reason string
		var eventType string

		glog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name)

		// The Pod is always sent back to the scheduler afterwards.
		err := sched.config.VolumeBinder.Binder.BindPodVolumes(assumed)
		if err != nil {
			glog.V(1).Infof("Failed to bind volumes for pod \"%v/%v\": %v", assumed.Namespace, assumed.Name, err)
			reason = "VolumeBindingFailed"
			eventType = v1.EventTypeWarning
		} else {
			glog.V(4).Infof("Successfully bound volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name)
			reason = "VolumeBindingWaiting"
			eventType = v1.EventTypeNormal
			err = fmt.Errorf("Volume binding started, waiting for completion")
		}

		// Always fail scheduling regardless of binding success.
		// The Pod needs to be sent back through the scheduler to:
		// * Retry volume binding if it fails.
		// * Retry volume binding if dynamic provisioning fails.
		// * Bind the Pod to the Node once all volumes are bound.
		sched.config.Error(assumed, err)
		sched.config.Recorder.Eventf(assumed, eventType, "FailedScheduling", "%v", err)
		sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
			Type:   v1.PodScheduled,
			Status: v1.ConditionFalse,
			Reason: reason,
		})
		return false
	}

	for {
		if quit := workFunc(); quit {
			glog.V(4).Infof("bindVolumesWorker shutting down")
			break
		}
	}
}

bindVolumesWorker會調(diào)用volumeBinder.BindPodVolumes進(jìn)行podBindingCache中的volume binding operation。

BindPodVolumes

pkg/controller/volume/persistentvolume/scheduler_binder.go:266

// BindPodVolumes gets the cached bindings and PVCs to provision in podBindingCache
// and makes the API update for those PVs/PVCs.
func (b *volumeBinder) BindPodVolumes(assumedPod *v1.Pod) error {
	podName := getPodName(assumedPod)
	glog.V(4).Infof("BindPodVolumes for pod %q", podName)

	bindings := b.podBindingCache.GetBindings(assumedPod, assumedPod.Spec.NodeName)
	claimsToProvision := b.podBindingCache.GetProvisionedPVCs(assumedPod, assumedPod.Spec.NodeName)

	// Do the actual prebinding. Let the PV controller take care of the rest
	// There is no API rollback if the actual binding fails
	for i, bindingInfo := range bindings {
		glog.V(5).Infof("BindPodVolumes: Pod %q, binding PV %q to PVC %q", podName, bindingInfo.pv.Name, bindingInfo.pvc.Name)
		_, err := b.ctrl.updateBindVolumeToClaim(bindingInfo.pv, bindingInfo.pvc, false)
		if err != nil {
			// only revert assumed cached updates for volumes we haven't successfully bound
			b.revertAssumedPVs(bindings[i:])
			// Revert all of the assumed cached updates for claims,
			// since no actual API update will be done
			b.revertAssumedPVCs(claimsToProvision)
			return err
		}
	}

	// Update claims objects to trigger volume provisioning. Let the PV controller take care of the rest
	// PV controller is expect to signal back by removing related annotations if actual provisioning fails
	for i, claim := range claimsToProvision {
		if _, err := b.ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(claim); err != nil {
			glog.V(4).Infof("updating PersistentVolumeClaim[%s] failed: %v", getPVCName(claim), err)
			// only revert assumed cached updates for claims we haven't successfully updated
			b.revertAssumedPVCs(claimsToProvision[i:])
			return err
		}
	}

	return nil
}
  • 根據(jù)podBindingCache中保存的信息,調(diào)用API完成PVC和PV的Binding,也就是preBound。PV Controller watch到這一事件后去執(zhí)行真正的Volume Bound操作。

  • 根據(jù)podBindingCache中保存的信息,調(diào)用API完成PVCs的claimsToProvision的更新,PV Controller watch到這一事件后會執(zhí)行Dynamic Volume Provisioning。

關(guān)鍵流程

kubernetes Volume有什么作用

到此,相信大家對“kubernetes Volume有什么作用”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI