溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何進(jìn)行main方法與Leader選舉分析

發(fā)布時(shí)間:2022-01-12 16:03:42 來源:億速云 閱讀:148 作者:柒染 欄目:云計(jì)算

如何進(jìn)行main方法與Leader選舉分析,相信很多沒有經(jīng)驗(yàn)的人對此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個(gè)問題。

main方法與Leader選舉分析

1.main方法分析

主要對main方法的主要邏輯進(jìn)行分析,以及分析下組件的EventHandler,看該組件list/watch哪些對象,對象事件來了怎么處理,以及claimQueue與volumeQueue的對象來源。

main方法主要邏輯分析

main方法主要邏輯:
(1)解析啟動(dòng)參數(shù);
(2)根據(jù)配置建立clientset;
(3)建立grpcclient;
(4)進(jìn)行g(shù)rpc探測(探測cephcsi-rbd服務(wù)是否準(zhǔn)備好),直至探測成功;
(5)通過grpc獲取driver名稱與能力;
(6)根據(jù)clientset建立informers;
(7)構(gòu)建provisionController對象;
(8)定義run方法(包括了provisionController.Run);
(9)根據(jù)--enable-leader-election組件啟動(dòng)參數(shù)配置決定是否開啟Leader 選舉,當(dāng)不開啟時(shí),直接運(yùn)行run方法,開啟時(shí)調(diào)用le.Run()。

func main() {
	var config *rest.Config
	var err error

	flag.Var(utilflag.NewMapStringBool(&featureGates), "feature-gates", "A set of key=value pairs that describe feature gates for alpha/experimental features. "+
		"Options are:\n"+strings.Join(utilfeature.DefaultFeatureGate.KnownFeatures(), "\n"))

	klog.InitFlags(nil)
	flag.CommandLine.AddGoFlagSet(goflag.CommandLine)
	flag.Set("logtostderr", "true")
	flag.Parse()

	if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(featureGates); err != nil {
		klog.Fatal(err)
	}

	if *showVersion {
		fmt.Println(os.Args[0], version)
		os.Exit(0)
	}
	klog.Infof("Version: %s", version)

	// get the KUBECONFIG from env if specified (useful for local/debug cluster)
	kubeconfigEnv := os.Getenv("KUBECONFIG")

	if kubeconfigEnv != "" {
		klog.Infof("Found KUBECONFIG environment variable set, using that..")
		kubeconfig = &kubeconfigEnv
	}

	if *master != "" || *kubeconfig != "" {
		klog.Infof("Either master or kubeconfig specified. building kube config from that..")
		config, err = clientcmd.BuildConfigFromFlags(*master, *kubeconfig)
	} else {
		klog.Infof("Building kube configs for running in cluster...")
		config, err = rest.InClusterConfig()
	}
	if err != nil {
		klog.Fatalf("Failed to create config: %v", err)
	}
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		klog.Fatalf("Failed to create client: %v", err)
	}
	// snapclientset.NewForConfig creates a new Clientset for VolumesnapshotV1beta1Client
	snapClient, err := snapclientset.NewForConfig(config)
	if err != nil {
		klog.Fatalf("Failed to create snapshot client: %v", err)
	}

	// The controller needs to know what the server version is because out-of-tree
	// provisioners aren't officially supported until 1.5
	serverVersion, err := clientset.Discovery().ServerVersion()
	if err != nil {
		klog.Fatalf("Error getting server version: %v", err)
	}

	metricsManager := metrics.NewCSIMetricsManager("" /* driverName */)

	grpcClient, err := ctrl.Connect(*csiEndpoint, metricsManager)
	if err != nil {
		klog.Error(err.Error())
		os.Exit(1)
	}   
	
	// 循環(huán)探測,直至CSI driver即cephcsi-rbd服務(wù)準(zhǔn)備好
	err = ctrl.Probe(grpcClient, *operationTimeout)
	if err != nil {
		klog.Error(err.Error())
		os.Exit(1)
	}

	// 從ceph-csi組件中獲取driver name
	provisionerName, err := ctrl.GetDriverName(grpcClient, *operationTimeout)
	if err != nil {
		klog.Fatalf("Error getting CSI driver name: %s", err)
	}
	klog.V(2).Infof("Detected CSI driver %s", provisionerName)
	metricsManager.SetDriverName(provisionerName)
	metricsManager.StartMetricsEndpoint(*metricsAddress, *metricsPath)
    
    // 從ceph-csi組件中獲取driver能力
	pluginCapabilities, controllerCapabilities, err := ctrl.GetDriverCapabilities(grpcClient, *operationTimeout)
	if err != nil {
		klog.Fatalf("Error getting CSI driver capabilities: %s", err)
	}

	// Generate a unique ID for this provisioner
	timeStamp := time.Now().UnixNano() / int64(time.Millisecond)
	identity := strconv.FormatInt(timeStamp, 10) + "-" + strconv.Itoa(rand.Intn(10000)) + "-" + provisionerName
    
    // 開始構(gòu)建infomer
	factory := informers.NewSharedInformerFactory(clientset, ctrl.ResyncPeriodOfCsiNodeInformer)

	// -------------------------------
	// Listers
	// Create informer to prevent hit the API server for all resource request
	scLister := factory.Storage().V1().StorageClasses().Lister()
	claimLister := factory.Core().V1().PersistentVolumeClaims().Lister()

	var csiNodeLister storagelistersv1beta1.CSINodeLister
	var nodeLister v1.NodeLister
	if ctrl.SupportsTopology(pluginCapabilities) {
		csiNodeLister = factory.Storage().V1beta1().CSINodes().Lister()
		nodeLister = factory.Core().V1().Nodes().Lister()
	}

	// -------------------------------
	// PersistentVolumeClaims informer
	rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax)
	claimQueue := workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims")
	claimInformer := factory.Core().V1().PersistentVolumeClaims().Informer()

	// Setup options
	provisionerOptions := []func(*controller.ProvisionController) error{
		controller.LeaderElection(false), // Always disable leader election in provisioner lib. Leader election should be done here in the CSI provisioner level instead.
		controller.FailedProvisionThreshold(0),
		controller.FailedDeleteThreshold(0),
		controller.RateLimiter(rateLimiter),
		controller.Threadiness(int(*workerThreads)),
		controller.CreateProvisionedPVLimiter(workqueue.DefaultControllerRateLimiter()),
		controller.ClaimsInformer(claimInformer),
	}

	translator := csitrans.New()

	supportsMigrationFromInTreePluginName := ""
	if translator.IsMigratedCSIDriverByName(provisionerName) {
		supportsMigrationFromInTreePluginName, err = translator.GetInTreeNameFromCSIName(provisionerName)
		if err != nil {
			klog.Fatalf("Failed to get InTree plugin name for migrated CSI plugin %s: %v", provisionerName, err)
		}
		klog.V(2).Infof("Supports migration from in-tree plugin: %s", supportsMigrationFromInTreePluginName)
		provisionerOptions = append(provisionerOptions, controller.AdditionalProvisionerNames([]string{supportsMigrationFromInTreePluginName}))
	}

	// Create the provisioner: it implements the Provisioner interface expected by
	// the controller
	csiProvisioner := ctrl.NewCSIProvisioner(
		clientset,
		*operationTimeout,
		identity,
		*volumeNamePrefix,
		*volumeNameUUIDLength,
		grpcClient,
		snapClient,
		provisionerName,
		pluginCapabilities,
		controllerCapabilities,
		supportsMigrationFromInTreePluginName,
		*strictTopology,
		translator,
		scLister,
		csiNodeLister,
		nodeLister,
		claimLister,
		*extraCreateMetadata,
	)

	provisionController = controller.NewProvisionController(
		clientset,
		provisionerName,
		csiProvisioner,
		serverVersion.GitVersion,
		provisionerOptions...,
	)

	csiClaimController := ctrl.NewCloningProtectionController(
		clientset,
		claimLister,
		claimInformer,
		claimQueue,
	)
    
    // 主循環(huán)函數(shù)
	run := func(context.Context) {
		stopCh := context.Background().Done()
		factory.Start(stopCh)
		cacheSyncResult := factory.WaitForCacheSync(stopCh)
		for _, v := range cacheSyncResult {
			if !v {
				klog.Fatalf("Failed to sync Informers!")
			}
		}
        
        // 跑兩個(gè)controller,后面主要分析provisionController
		go csiClaimController.Run(int(*finalizerThreads), stopCh)
		provisionController.Run(wait.NeverStop)
	}
    
    // Leader 選舉相關(guān)
	if !*enableLeaderElection {
		run(context.TODO())
	} else {
		// this lock name pattern is also copied from sigs.k8s.io/sig-storage-lib-external-provisioner/v5/controller
		// to preserve backwards compatibility
		lockName := strings.Replace(provisionerName, "/", "-", -1)
        
        // 使用endpoints或leases資源對象來選leader
		var le leaderElection
		if *leaderElectionType == "endpoints" {
			klog.Warning("The 'endpoints' leader election type is deprecated and will be removed in a future release. Use '--leader-election-type=leases' instead.")
			le = leaderelection.NewLeaderElectionWithEndpoints(clientset, lockName, run)
		} else if *leaderElectionType == "leases" {
			le = leaderelection.NewLeaderElection(clientset, lockName, run)
		} else {
			klog.Error("--leader-election-type must be either 'endpoints' or 'leases'")
			os.Exit(1)
		}

		if *leaderElectionNamespace != "" {
			le.WithNamespace(*leaderElectionNamespace)
		}
        
        // 處理Leader 選舉邏輯的方法
		if err := le.Run(); err != nil {
			klog.Fatalf("failed to initialize leader election: %v", err)
		}
	}

}
controller.NewProvisionController

主要看到EventHandler,定義了該組件list/watch的對象,對象事件來了怎么處理,以及claimQueue與volumeQueue的對象來源。

claimHandler

可以看到,claimQueue的來源是pvc對象的新增、更新事件(對claimQueue的處理已在external-provisioner源碼分析(1)-主體處理邏輯分析中講過,忘了的話可以回頭看下)。

    ...
	// PersistentVolumeClaims

	claimHandler := cache.ResourceEventHandlerFuncs{
		AddFunc:    func(obj interface{}) { controller.enqueueClaim(obj) },
		UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueClaim(newObj) },
		DeleteFunc: func(obj interface{}) {
			// NOOP. The claim is either in claimsInProgress and in the queue, so it will be processed as usual
			// or it's not in claimsInProgress and then we don't care
		},
	}
	
	if controller.claimInformer != nil {
		controller.claimInformer.AddEventHandlerWithResyncPeriod(claimHandler, controller.resyncPeriod)
	} else {
		controller.claimInformer = informer.Core().V1().PersistentVolumeClaims().Informer()
		controller.claimInformer.AddEventHandler(claimHandler)
	}
	...
// enqueueClaim takes an obj and converts it into UID that is then put onto claim work queue.
func (ctrl *ProvisionController) enqueueClaim(obj interface{}) {
	uid, err := getObjectUID(obj)
	if err != nil {
		utilruntime.HandleError(err)
		return
	}
	if ctrl.claimQueue.NumRequeues(uid) == 0 {
		ctrl.claimQueue.Add(uid)
	}
}
volumeHandler

可以看到,volumeQueue的來源是pv對象的新增、更新事件(對volumeQueue的處理已在external-provisioner源碼分析(1)-主體處理邏輯分析中講過,忘了的話可以回頭看下)。

    ...
	// PersistentVolumes
	volumeHandler := cache.ResourceEventHandlerFuncs{
		AddFunc:    func(obj interface{}) { controller.enqueueVolume(obj) },
		UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueVolume(newObj) },
		DeleteFunc: func(obj interface{}) { controller.forgetVolume(obj) },
	}
	if controller.volumeInformer != nil {
		controller.volumeInformer.AddEventHandlerWithResyncPeriod(volumeHandler, controller.resyncPeriod)
	} else {
		controller.volumeInformer = informer.Core().V1().PersistentVolumes().Informer()
		controller.volumeInformer.AddEventHandler(volumeHandler)
	}
	...
// enqueueVolume takes an obj and converts it into a namespace/name string which
// is then put onto the given work queue.
func (ctrl *ProvisionController) enqueueVolume(obj interface{}) {
	var key string
	var err error
	if key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err != nil {
		utilruntime.HandleError(err)
		return
	}
	// Re-Adding is harmless but try to add it to the queue only if it is not
	// already there, because if it is already there we *must* be retrying it
	if ctrl.volumeQueue.NumRequeues(key) == 0 {
		ctrl.volumeQueue.Add(key)
	}
}
// forgetVolume Forgets an obj from the given work queue, telling the queue to
// stop tracking its retries because e.g. the obj was deleted
func (ctrl *ProvisionController) forgetVolume(obj interface{}) {
	var key string
	var err error
	if key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err != nil {
		utilruntime.HandleError(err)
		return
	}
	ctrl.volumeQueue.Forget(key)
	ctrl.volumeQueue.Done(key)
}

2.Leader 選舉分析

在 Golang 中,k8s client-go 這個(gè)package 針對 Leader 相關(guān)功能進(jìn)行了封裝,支持3種鎖資源,endpoint,configmap,lease,方便使用。

Leader 選舉基本原理

Leader 選舉基本原理其實(shí)就是利用通過Kubernetes中 configmap , endpoints 或者 lease 資源實(shí)現(xiàn)一個(gè)分布式鎖,搶(acqure)到鎖的節(jié)點(diǎn)成為leader,并且定期更新(renew)。其他進(jìn)程也在不斷的嘗試進(jìn)行搶占,搶占不到則繼續(xù)等待下次循環(huán)。當(dāng)leader節(jié)點(diǎn)掛掉之后,租約到期,其他節(jié)點(diǎn)就成為新的leader。

搶到鎖其實(shí)就是成功把該進(jìn)程的相關(guān)信息(如進(jìn)程唯一標(biāo)識)寫入configmap、endpoints 或者 lease 資源對象中;而定期更新其實(shí)就是定期更新該資源的鎖更新時(shí)間,以延續(xù)租期。

多個(gè)進(jìn)程同時(shí)獲取鎖(更新鎖資源)時(shí),由apiserver來保證鎖資源update的原子操作,通過對比resourceVersion版本號(resourceVersion的取值最終來源于etcd的modifiedindex),保證只有一個(gè)進(jìn)程能修改成功,也即只有一個(gè)進(jìn)程能成功獲取到鎖。

鎖示例如下:

apiVersion: coordination.k8s.io/v1
kind: Lease
metadata:
  creationTimestamp: "2020-08-21T11:56:46Z"
  name: rbd-csi-ceph-com
  namespace: default
  resourceVersion: "69642798"
  selfLink: /apis/coordination.k8s.io/v1/namespaces/default/leases/rbd-csi-ceph-com
  uid: c9a7ea00-c000-4c5c-b90f-d0e7c85240ca
spec:
  acquireTime: "2020-08-21T11:56:46.907075Z"
  holderIdentity: cld-dnode3-1091-i-nease-net
  leaseDurationSeconds: 15
  leaseTransitions: 0
  renewTime: "2020-09-07T02:38:24.587170Z"

其中holderIdentity記錄了獲取到鎖的進(jìn)程信息,renewTime記錄了鎖更新時(shí)間。

external-provisioner的Leader 選舉

從main方法代碼中可以看出,在external-provisioner組件中,僅支持endpoint與lease兩種鎖資源,且endpoints鎖會(huì)在后續(xù)被棄用,所以建議使用lease鎖。

external-provisioner組件的高可用選主邏輯與k8s中的kube-controller-manager、kube-scheduler等組件的高可用選主邏輯類似。

概要過程:
(1)組件啟動(dòng)時(shí),定期循環(huán)的去獲取lease鎖,獲取成功則成為leader且返回,否則一直阻塞;
(2)獲取lease鎖成功,則競選leader成功,然后運(yùn)行external-provisioner組件的主體處理邏輯;
(3)競選leader成功后,繼續(xù)定期循環(huán)續(xù)約,以保證leader的長久性。

下面進(jìn)行代碼的分析。

le.Run()

當(dāng)--enable-leader-election組件啟動(dòng)參數(shù)為true時(shí),運(yùn)行該方法,主要邏輯為:
(1)定義leaderConfig結(jié)構(gòu)體;
(2)調(diào)用leaderelection.RunOrDie做進(jìn)一步的選主邏輯處理。

func (l *leaderElection) Run() error {
	if l.identity == "" {
		id, err := defaultLeaderElectionIdentity()
		if err != nil {
			return fmt.Errorf("error getting the default leader identity: %v", err)
		}

		l.identity = id
	}

	if l.namespace == "" {
		l.namespace = inClusterNamespace()
	}

	broadcaster := record.NewBroadcaster()
	broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: l.clientset.CoreV1().Events(l.namespace)})
	eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("%s/%s", l.lockName, string(l.identity))})

	rlConfig := resourcelock.ResourceLockConfig{
		Identity:      sanitizeName(l.identity),
		EventRecorder: eventRecorder,
	}

	lock, err := resourcelock.New(l.resourceLock, l.namespace, sanitizeName(l.lockName), l.clientset.CoreV1(), l.clientset.CoordinationV1(), rlConfig)
	if err != nil {
		return err
	}

	leaderConfig := leaderelection.LeaderElectionConfig{
		Lock:          lock,
		LeaseDuration: l.leaseDuration,
		RenewDeadline: l.renewDeadline,
		RetryPeriod:   l.retryPeriod,
		Callbacks: leaderelection.LeaderCallbacks{
			OnStartedLeading: func(ctx context.Context) {
				klog.V(2).Info("became leader, starting")
				l.runFunc(ctx)
			},
			OnStoppedLeading: func() {
				klog.Fatal("stopped leading")
			},
			OnNewLeader: func(identity string) {
				klog.V(3).Infof("new leader detected, current leader: %s", identity)
			},
		},
	}

	leaderelection.RunOrDie(context.TODO(), leaderConfig)
	return nil // should never reach here
}
leaderelection.RunOrDie()

主要邏輯:
(1)調(diào)用le.acquire()方法來嘗試競選為leader(acquire方法會(huì)定期循環(huán)的去獲取lease鎖,獲取成功則成為leader且返回,否則一直阻塞);
(2)競選leader成功,運(yùn)行run方法;
(3)調(diào)用le.renew()續(xù)約方法,定期循環(huán)續(xù)約。

// RunOrDie starts a client with the provided config or panics if the config
// fails to validate.
func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
	le, err := NewLeaderElector(lec)
	if err != nil {
		panic(err)
	}
	if lec.WatchDog != nil {
		lec.WatchDog.SetLeaderElection(le)
	}
	le.Run(ctx)
}

// Run starts the leader election loop
func (le *LeaderElector) Run(ctx context.Context) {
	defer func() {
		runtime.HandleCrash()
		le.config.Callbacks.OnStoppedLeading()
	}()
	if !le.acquire(ctx) {
		return // ctx signalled done
	}
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	go le.config.Callbacks.OnStartedLeading(ctx)
	le.renew(ctx)
}

// acquire會(huì)不斷循環(huán)的去獲取lease鎖,獲取成功則成為leader且返回
// acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds.
// Returns false if ctx signals done.
func (le *LeaderElector) acquire(ctx context.Context) bool {
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	succeeded := false
	desc := le.config.Lock.Describe()
	klog.Infof("attempting to acquire leader lease  %v...", desc)
	wait.JitterUntil(func() {
		succeeded = le.tryAcquireOrRenew()
		le.maybeReportTransition()
		if !succeeded {
			klog.V(4).Infof("failed to acquire lease %v", desc)
			return
		}
		le.config.Lock.RecordEvent("became leader")
		le.metrics.leaderOn(le.config.Name)
		klog.Infof("successfully acquired lease %v", desc)
		cancel()
	}, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
	return succeeded
}

// 續(xù)約方法,不斷循環(huán)續(xù)約
// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done.
func (le *LeaderElector) renew(ctx context.Context) {
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	wait.Until(func() {
		timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
		defer timeoutCancel()
		err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
			done := make(chan bool, 1)
			go func() {
				defer close(done)
				done <- le.tryAcquireOrRenew()
			}()

			select {
			case <-timeoutCtx.Done():
				return false, fmt.Errorf("failed to tryAcquireOrRenew %s", timeoutCtx.Err())
			case result := <-done:
				return result, nil
			}
		}, timeoutCtx.Done())

		le.maybeReportTransition()
		desc := le.config.Lock.Describe()
		if err == nil {
			klog.V(5).Infof("successfully renewed lease %v", desc)
			return
		}
		le.config.Lock.RecordEvent("stopped leading")
		le.metrics.leaderOff(le.config.Name)
		klog.Infof("failed to renew lease %v: %v", desc, err)
		cancel()
	}, le.config.RetryPeriod, ctx.Done())

	// if we hold the lease, give it up
	if le.config.ReleaseOnCancel {
		le.release()
	}
}

看完上述內(nèi)容,你們掌握如何進(jìn)行main方法與Leader選舉分析的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI