溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

如何進(jìn)行Pilot-agent作用及其源碼的分析

發(fā)布時(shí)間:2022-01-06 17:46:43 來(lái)源:億速云 閱讀:122 作者:柒染 欄目:云計(jì)算

本篇文章為大家展示了如何進(jìn)行Pilot-agent作用及其源碼的分析,內(nèi)容簡(jiǎn)明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過(guò)這篇文章的詳細(xì)介紹希望你能有所收獲。

小編使用的Istio源碼是 release 1.5。

介紹

Sidecar在注入的時(shí)候會(huì)注入istio-init和istio-proxy兩個(gè)容器。Pilot-agent就是啟動(dòng)istio-proxy的入口。通過(guò)kubectl命令我們可以看到啟動(dòng)命令:

[root@localhost ~]# kubectl exec -it details-v1-6c9f8bcbcb-shltm -c istio-proxy -- ps -efww
UID         PID   PPID  C STIME TTY          TIME CMD

istio-p+      1      0  0 08:52 ?        00:00:13 /usr/local/bin/pilot-agent proxy sidecar --domain default.svc.cluster.local --configPath /etc/istio/proxy --binaryPath /usr/local/bin/envoy --serviceCluster details.default --drainDuration 45s --parentShutdownDuration 1m0s --discoveryAddress istiod.istio-system.svc:15012 --zipkinAddress zipkin.istio-system:9411 --proxyLogLevel=warning --proxyComponentLogLevel=misc:error --connectTimeout 10s --proxyAdminPort 15000 --concurrency 2 --controlPlaneAuthPolicy NONE --dnsRefreshRate 300s --statusPort 15020 --trust-domain=cluster.local --controlPlaneBootstrap=false

istio-p+     18      1  0 08:52 ?        00:01:11 /usr/local/bin/envoy -c /etc/istio/proxy/envoy-rev0.json --restart-epoch 0 --drain-time-s 45 --parent-shutdown-time-s 60 --service-cluster details.default --service-node sidecar~172.20.0.14~details-v1-6c9f8bcbcb-shltm.default~default.svc.cluster.local --max-obj-name-len 189 --local-address-ip-version v4 --log-format [Envoy (Epoch 0)] [%Y-%m-%d %T.%e][%t][%l][%n] %v -l warning --component-log-level misc:error --concurrency 2

Pilot-agent除了啟動(dòng)istio-proxy以外還有以下能力:

  • 生成Envoy的Bootstrap配置文件;

  • 健康檢查;

  • 監(jiān)視證書(shū)的變化,通知Envoy進(jìn)程熱重啟,實(shí)現(xiàn)證書(shū)的熱加載;

  • 提供Envoy守護(hù)功能,當(dāng)Envoy異常退出的時(shí)候重啟Envoy;

  • 通知Envoy優(yōu)雅退出;

代碼執(zhí)行流程分析

	proxyCmd = &cobra.Command{
		Use:   "proxy",
		Short: "Envoy proxy agent",
		FParseErrWhitelist: cobra.FParseErrWhitelist{ 
			UnknownFlags: true,
		},
		RunE: func(c *cobra.Command, args []string) error {
			...
			// 用于設(shè)置默認(rèn)配置文件的默認(rèn)配置相關(guān)參數(shù)
			proxyConfig := mesh.DefaultProxyConfig()

			// set all flags
			proxyConfig.CustomConfigFile = customConfigFile
			proxyConfig.ProxyBootstrapTemplatePath = templateFile
			proxyConfig.ConfigPath = configPath
			proxyConfig.BinaryPath = binaryPath
			proxyConfig.ServiceCluster = serviceCluster
			proxyConfig.DrainDuration = types.DurationProto(drainDuration)
			proxyConfig.ParentShutdownDuration = types.DurationProto(parentShutdownDuration)
			proxyConfig.DiscoveryAddress = discoveryAddress
			proxyConfig.ConnectTimeout = types.DurationProto(connectTimeout)
			proxyConfig.StatsdUdpAddress = statsdUDPAddress
			...
			ctx, cancel := context.WithCancel(context.Background()) 
			// 啟動(dòng) status server
			if statusPort > 0 {
				localHostAddr := localHostIPv4
				if proxyIPv6 {
					localHostAddr = localHostIPv6
				}
				prober := kubeAppProberNameVar.Get()
				//健康探測(cè)
				statusServer, err := status.NewServer(status.Config{
					LocalHostAddr:  localHostAddr,
					AdminPort:      proxyAdminPort,
                    //通過(guò)參數(shù)--statusPort 15020設(shè)置
					StatusPort:     statusPort,
					KubeAppProbers: prober,
					NodeType:       role.Type,
				})
				if err != nil {
					cancel()
					return err
				}
				go waitForCompletion(ctx, statusServer.Run)
			} 
			... 
			//構(gòu)造Proxy實(shí)例,包括配置,啟動(dòng)參數(shù)等
			envoyProxy := envoy.NewProxy(envoy.ProxyConfig{
				Config:              proxyConfig,
				Node:                role.ServiceNode(),
				LogLevel:            proxyLogLevel,
				ComponentLogLevel:   proxyComponentLogLevel,
				PilotSubjectAltName: pilotSAN,
				MixerSubjectAltName: mixerSAN,
				NodeIPs:             role.IPAddresses,
				DNSRefreshRate:      dnsRefreshRate,
				PodName:             podName,
				PodNamespace:        podNamespace,
				PodIP:               podIP,
				SDSUDSPath:          sdsUDSPath,
				SDSTokenPath:        sdsTokenPath,
				STSPort:             stsPort,
				ControlPlaneAuth:    controlPlaneAuthEnabled,
				DisableReportCalls:  disableInternalTelemetry,
				OutlierLogPath:      outlierLogPath,
				PilotCertProvider:   pilotCertProvider,
			})
			//構(gòu)造agent實(shí)例,實(shí)現(xiàn)了Agent接口
			agent := envoy.NewAgent(envoyProxy, features.TerminationDrainDuration())

			if nodeAgentSDSEnabled {
				tlsCertsToWatch = []string{}
			}
			//構(gòu)造watcher實(shí)例
			watcher := envoy.NewWatcher(tlsCertsToWatch, agent.Restart)
			//啟動(dòng) watcher
			go watcher.Run(ctx)

			// 優(yōu)雅退出
			go cmd.WaitSignalFunc(cancel)
			//啟動(dòng) agent
			return agent.Run(ctx)
		},
	}

執(zhí)行流程大概分成這么幾步:

  1. 用于設(shè)置默認(rèn)配置文件的默認(rèn)配置相關(guān)參數(shù);

  2. 啟動(dòng) status server進(jìn)行健康檢測(cè);

  3. 構(gòu)造Proxy實(shí)例,包括配置,啟動(dòng)參數(shù),并構(gòu)造構(gòu)造agent實(shí)例;

  4. 構(gòu)造watcher實(shí)例,并啟動(dòng);

  5. 開(kāi)啟線(xiàn)程監(jiān)聽(tīng)信號(hào),進(jìn)行優(yōu)雅退出;

  6. 啟動(dòng) agent;

默認(rèn)配置相關(guān)參數(shù)

kubectl exec -it details-v1-6c9f8bcbcb-shltm -c istio-proxy -- /usr/local/bin/pilot-agent proxy --help
Envoy proxy agent

Usage:
	pilot-agent proxy [flags]

Flags:
		--binaryPath string                 Path to the proxy binary (default "/usr/local/bin/envoy")
		--concurrency int                   number of worker threads to run
		--configPath string                 Path to the generated configuration file directory (default "/etc/istio/proxy")
		--connectTimeout duration           Connection timeout used by Envoy for supporting services (default 1s)
		--controlPlaneAuthPolicy string     Control Plane Authentication Policy (default "NONE")
		--controlPlaneBootstrap             Process bootstrap provided via templateFile to be used by control plane components. (default true)
		--customConfigFile string           Path to the custom configuration file
		--datadogAgentAddress string        Address of the Datadog Agent
		--disableInternalTelemetry          Disable internal telemetry
		--discoveryAddress string           Address of the discovery service exposing xDS (e.g. istio-pilot:8080) (default "istio-pilot:15010")
		--dnsRefreshRate string             The dns_refresh_rate for bootstrap STRICT_DNS clusters (default "300s")
		--domain string                     DNS domain suffix. If not provided uses ${POD_NAMESPACE}.svc.cluster.local
		--drainDuration duration            The time in seconds that Envoy will drain connections during a hot restart (default 45s)
		--envoyAccessLogService string      Settings of an Envoy gRPC Access Log Service API implementation
		--envoyMetricsService string        Settings of an Envoy gRPC Metrics Service API implementation
	-h, --help                              help for proxy
		--id string                         Proxy unique ID. If not provided uses ${POD_NAME}.${POD_NAMESPACE} from environment variables
		--ip string                         Proxy IP address. If not provided uses ${INSTANCE_IP} environment variable.
		--lightstepAccessToken string       Access Token for LightStep Satellite pool
		--lightstepAddress string           Address of the LightStep Satellite pool
		--lightstepCacertPath string        Path to the trusted cacert used to authenticate the pool
		--lightstepSecure                   Should connection to the LightStep Satellite pool be secure
		--mixerIdentity string              The identity used as the suffix for mixer's spiffe SAN. This would only be used by pilot all other proxy would get this value from pilot
		--outlierLogPath string             The log path for outlier detection
		--parentShutdownDuration duration   The time in seconds that Envoy will wait before shutting down the parent process during a hot restart (default 1m0s)
		--pilotIdentity string              The identity used as the suffix for pilot's spiffe SAN
		--proxyAdminPort uint16             Port on which Envoy should listen for administrative commands (default 15000)
		--proxyComponentLogLevel string     The component log level used to start the Envoy proxy (default "misc:error")
		--proxyLogLevel string              The log level used to start the Envoy proxy (choose from {trace, debug, info, warning, error, critical, off}) (default "warning")
		--serviceCluster string             Service cluster (default "istio-proxy")
		--serviceregistry string            Select the platform for service registry, options are {Kubernetes, Consul, Mock} (default "Kubernetes")
		--statsdUdpAddress string           IP Address and Port of a statsd UDP listener (e.g. 10.75.241.127:9125)
		--statusPort uint16                 HTTP Port on which to serve pilot agent status. If zero, agent status will not be provided.
		--stsPort int                       HTTP Port on which to serve Security Token Service (STS). If zero, STS service will not be provided.
		--templateFile string               Go template bootstrap config
		--tokenManagerPlugin string         Token provider specific plugin name. (default "GoogleTokenExchange")
		--trust-domain string               The domain to use for identities
		--zipkinAddress string              Address of the Zipkin service (e.g. zipkin:9411)

從上面輸出我們也可以看到proxy參數(shù)的含義以及對(duì)應(yīng)的默認(rèn)值。

func DefaultProxyConfig() meshconfig.ProxyConfig {
	return meshconfig.ProxyConfig{
		ConfigPath:             constants.ConfigPathDir,
		BinaryPath:             constants.BinaryPathFilename,
		ServiceCluster:         constants.ServiceClusterName,
		DrainDuration:          types.DurationProto(45 * time.Second),
		ParentShutdownDuration: types.DurationProto(60 * time.Second),
		DiscoveryAddress:       constants.DiscoveryPlainAddress,
		ConnectTimeout:         types.DurationProto(1 * time.Second),
		StatsdUdpAddress:       "",
		EnvoyMetricsService:    &meshconfig.RemoteService{Address: ""},
		EnvoyAccessLogService:  &meshconfig.RemoteService{Address: ""},
		ProxyAdminPort:         15000,
		ControlPlaneAuthPolicy: meshconfig.AuthenticationPolicy_NONE,
		CustomConfigFile:       "",
		Concurrency:            0,
		StatNameLength:         189,
		Tracing:                nil,
	}
}

默認(rèn)的啟動(dòng)參數(shù)都在DefaultProxyConfig方法中設(shè)置,默認(rèn)的啟動(dòng)配置如下所示:

  • ConfigPath:/etc/istio/proxy

  • BinaryPath:/usr/local/bin/envoy

  • ServiceCluster:istio-proxy

  • DrainDuration:45s

  • ParentShutdownDuration:60s

  • DiscoveryAddress:istio-pilot:15010

  • ConnectTimeout:1s

  • StatsdUdpAddress:""

  • EnvoyMetricsService:meshconfig.RemoteService

  • EnvoyAccessLogService:meshconfig.RemoteService

  • ProxyAdminPort:15000

  • ControlPlaneAuthPolicy:0

  • CustomConfigFile:""

  • Concurrency:0

  • StatNameLength:189

  • Tracing:nil

status server健康檢查

初始化status server:

func NewServer(config Config) (*Server, error) {
	s := &Server{
		statusPort: config.StatusPort,
		ready: &ready.Probe{
			LocalHostAddr: config.LocalHostAddr,
			AdminPort:     config.AdminPort,
			NodeType:      config.NodeType,
		},
	}
	...
	return s, nil
}

初始化完成之后會(huì)開(kāi)啟一個(gè)線(xiàn)程調(diào)用statusServer的 Run方法:

go waitForCompletion(ctx, statusServer.Run)


func (s *Server) Run(ctx context.Context) {
	log.Infof("Opening status port %d\n", s.statusPort)

	mux := http.NewServeMux()

	// Add the handler for ready probes.
	// 初始化探針的回調(diào)處理器
	// /healthz/ready
	mux.HandleFunc(readyPath, s.handleReadyProbe)
	mux.HandleFunc(quitPath, s.handleQuit)
	//應(yīng)用端口檢查
	mux.HandleFunc("/app-health/", s.handleAppProbe)
	//端口通過(guò)參數(shù)--statusPort 15020設(shè)置
	l, err := net.Listen("tcp", fmt.Sprintf(":%d", s.statusPort))
	if err != nil {
		log.Errorf("Error listening on status port: %v", err.Error())
		return
	}
	...
	defer l.Close()
	//開(kāi)啟監(jiān)聽(tīng)
	go func() {
		if err := http.Serve(l, mux); err != nil {
			log.Errora(err) 
			notifyExit()
		}
	}()
 
	<-ctx.Done()
	log.Info("Status server has successfully terminated")
}

Run方法會(huì)開(kāi)啟一個(gè)線(xiàn)程并監(jiān)聽(tīng)15020端口,調(diào)用路徑為 /healthz/ready,并通過(guò)調(diào)用handleReadyProbe處理器來(lái)調(diào)用Envoy的15000端口判斷Envoy是否已經(jīng) ready 接受相對(duì)應(yīng)的流量。調(diào)用過(guò)程如下:

如何進(jìn)行Pilot-agent作用及其源碼的分析

watcher監(jiān)控管理

在進(jìn)行watcher監(jiān)控之前會(huì)通過(guò)NewAgent生成agent實(shí)例:

func NewAgent(proxy Proxy, terminationDrainDuration time.Duration) Agent {
	return &agent{
		proxy:                    proxy,
		//用于管理啟動(dòng) Envoy 后的狀態(tài)通道,用于監(jiān)視 Envoy 進(jìn)程的狀態(tài)
		statusCh:                 make(chan exitStatus),
		//活躍的Epoch 集合
		activeEpochs:             map[int]chan error{},
		//默認(rèn)5s
		terminationDrainDuration: terminationDrainDuration,
		//當(dāng)前的Epoch
		currentEpoch:             -1,
	}
}

然后構(gòu)建watcher實(shí)例:

//構(gòu)造watcher實(shí)例
watcher := envoy.NewWatcher(tlsCertsToWatch, agent.Restart)

type watcher struct {
	//證書(shū)列表
	certs   []string
	//envoy 重啟函數(shù)
	updates func(interface{})
}
 
func NewWatcher(certs []string, updates func(interface{})) Watcher {
	return &watcher{
		certs:   certs,
		updates: updates,
	}
}

watcher里面總共就兩個(gè)參數(shù)certs是監(jiān)聽(tīng)的證書(shū)列表,updates是envoy 重啟函數(shù),如果證書(shū)文件發(fā)生變化則調(diào)用updates來(lái)reload envoy。

啟動(dòng)watcher:

go watcher.Run(ctx)

func (w *watcher) Run(ctx context.Context) { 
	//啟動(dòng)envoy
	w.SendConfig()
 
	//監(jiān)聽(tīng)證書(shū)變化
	go watchCerts(ctx, w.certs, watchFileEvents, defaultMinDelay, w.SendConfig)

	<-ctx.Done()
	log.Info("Watcher has successfully terminated")
}

watcher的Run方法首先會(huì)調(diào)用SendConfig啟動(dòng)Envoy,然后啟動(dòng)一個(gè)線(xiàn)程監(jiān)聽(tīng)證書(shū)的變化。

func (w *watcher) SendConfig() {
	h := sha256.New()
	generateCertHash(h, w.certs)
	w.updates(h.Sum(nil))
}

SendConfig方法會(huì)獲取當(dāng)前的證書(shū)集合hash之后傳入到updates方法中,updates方法就是在初始化NewWatcher的時(shí)候傳入的,這里是會(huì)調(diào)用到agent的Restart方法的:

func (a *agent) Restart(config interface{}) { 
	a.restartMutex.Lock()
	defer a.restartMutex.Unlock()
 
	a.mutex.Lock()
	//校驗(yàn)傳入的參數(shù)是否產(chǎn)生了變化
	if reflect.DeepEqual(a.currentConfig, config) {
		// Same configuration - nothing to do.
		a.mutex.Unlock()
		return
	}
	//活躍的Epoch
	hasActiveEpoch := len(a.activeEpochs) > 0
	//獲取當(dāng)前的Epoch
	activeEpoch := a.currentEpoch
 
	//因?yàn)榕渲米兞?所以Epoch加1
	epoch := a.currentEpoch + 1
	log.Infof("Received new config, creating new Envoy epoch %d", epoch)
	//更新當(dāng)前的配置以及Epoch
	a.currentEpoch = epoch
	a.currentConfig = config
 
	// 用來(lái)做做主動(dòng)退出
	abortCh := make(chan error, 1)
    // 設(shè)置當(dāng)前活躍Epoch的abortCh管道,用于優(yōu)雅關(guān)閉
	a.activeEpochs[a.currentEpoch] = abortCh
 
	a.mutex.Unlock()
 
	if hasActiveEpoch {
		a.waitUntilLive(activeEpoch)
	}
	//啟動(dòng)envoy,會(huì)將結(jié)果放入到statusCh管道中
	go a.runWait(config, epoch, abortCh)
}

Restart方法會(huì)判斷傳入的配置是否和當(dāng)前的配置一致,如果不一致,那么設(shè)置好當(dāng)前的配置后調(diào)用runWait方法啟動(dòng)Envoy,并將啟動(dòng)結(jié)果放入到statusCh管道中:

func (a *agent) runWait(config interface{}, epoch int, abortCh <-chan error) {
	log.Infof("Epoch %d starting", epoch)
	//啟動(dòng)envoy
	err := a.proxy.Run(config, epoch, abortCh)
	//刪除當(dāng)前 epoch 對(duì)應(yīng)的配置文件
	a.proxy.Cleanup(epoch)
	a.statusCh <- exitStatus{epoch: epoch, err: err}
}

envoy啟動(dòng)流程

如何進(jìn)行Pilot-agent作用及其源碼的分析

在上面講了,envoy的啟動(dòng)會(huì)在runWait方法中進(jìn)行,通過(guò)調(diào)用proxy的Run方法會(huì)通過(guò)模板文件創(chuàng)建/etc/istio/proxy/envoy-rev0.json配置文件,然會(huì)直接使用exec包調(diào)用envoy啟動(dòng)命令啟動(dòng)envoy。

func (e *envoy) Run(config interface{}, epoch int, abort <-chan error) error {
	var fname string
	//如果指定了模板文件,則使用用戶(hù)指定的,否則則使用默認(rèn)的
	if len(e.Config.CustomConfigFile) > 0 { 
		fname = e.Config.CustomConfigFile
	} else {
		out, err := bootstrap.New(bootstrap.Config{
			Node:                e.Node,
			DNSRefreshRate:      e.DNSRefreshRate,
			Proxy:               &e.Config,
			PilotSubjectAltName: e.PilotSubjectAltName,
			MixerSubjectAltName: e.MixerSubjectAltName,
			LocalEnv:            os.Environ(),
			NodeIPs:             e.NodeIPs,
			PodName:             e.PodName,
			PodNamespace:        e.PodNamespace,
			PodIP:               e.PodIP,
			SDSUDSPath:          e.SDSUDSPath,
			SDSTokenPath:        e.SDSTokenPath,
			STSPort:             e.STSPort,
			ControlPlaneAuth:    e.ControlPlaneAuth,
			DisableReportCalls:  e.DisableReportCalls,
			OutlierLogPath:      e.OutlierLogPath,
			PilotCertProvider:   e.PilotCertProvider,
		}).CreateFileForEpoch(epoch)
		if err != nil {
			log.Errora("Failed to generate bootstrap config: ", err)
			os.Exit(1) // Prevent infinite loop attempting to write the file, let k8s/systemd report
		}
		fname = out
	} 
	//設(shè)置啟動(dòng)參數(shù)
	args := e.args(fname, epoch, istioBootstrapOverrideVar.Get())
	log.Infof("Envoy command: %v", args)

	//直接使用exec包調(diào)用envoy啟動(dòng)命令
	cmd := exec.Command(e.Config.BinaryPath, args...)
	cmd.Stdout = os.Stdout
	cmd.Stderr = os.Stderr
	if err := cmd.Start(); err != nil {
		return err
	}
	done := make(chan error, 1)
	go func() {
		done <- cmd.Wait()
	}()
	//等待 abort channel 和 done,用于結(jié)束 Envoy 和正確返回當(dāng)前的啟動(dòng)狀態(tài)
	select {
    //用于優(yōu)雅關(guān)閉,后面會(huì)講到
	case err := <-abort:
		log.Warnf("Aborting epoch %d", epoch)
		if errKill := cmd.Process.Kill(); errKill != nil {
			log.Warnf("killing epoch %d caused an error %v", epoch, errKill)
		}
		return err
	case err := <-done:
		return err
	}
}

Run方法會(huì)通過(guò)調(diào)用CreateFileForEpoch方法獲取到模板文件:/var/lib/istio/envoy/envoy_bootstrap_tmpl.json,然后生成/etc/istio/proxy/envoy-rev0.json文件并返回路徑;通過(guò)調(diào)用args方法來(lái)配置envoy的啟動(dòng)參數(shù),然后調(diào)用exec.Command啟動(dòng)envoy,BinaryPath為/usr/local/bin/envoy。

最后異步獲取cmd的返回結(jié)果,存入到done管道中作為方法的參數(shù)返回。返回的參數(shù)在runWait方法中會(huì)被接受到,存入到statusCh管道中。

在調(diào)用agent的run方法的時(shí)候會(huì)監(jiān)聽(tīng)statusCh管道中的數(shù)據(jù):

agent.Run(ctx)

func (a *agent) Run(ctx context.Context) error {
	log.Info("Starting proxy agent")
	for {
		select {
		//如果 proxy-Envoy 的狀態(tài)發(fā)生了變化
		case status := <-a.statusCh:
			a.mutex.Lock()
			if status.err != nil {
				if status.err.Error() == errOutOfMemory {
					log.Warnf("Envoy may have been out of memory killed. Check memory usage and limits.")
				}
				log.Errorf("Epoch %d exited with error: %v", status.epoch, status.err)
			} else {
				//正常退出
				log.Infof("Epoch %d exited normally", status.epoch)
			}
			//刪除當(dāng)前 epoch 對(duì)應(yīng)的配置文件
			delete(a.activeEpochs, status.epoch)

			active := len(a.activeEpochs)
			a.mutex.Unlock()

			if active == 0 {
				log.Infof("No more active epochs, terminating")
				return nil
			} 
		...
	}
}

優(yōu)雅退出

pilot-agent會(huì)開(kāi)啟一個(gè)線(xiàn)程調(diào)用WaitSignalFunc方法監(jiān)聽(tīng)syscall.SIGINT、syscall.SIGTERM信號(hào),然后調(diào)用context的cancel來(lái)實(shí)現(xiàn)優(yōu)化關(guān)閉的效果:

func WaitSignalFunc(cancel func()) {
	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
	<-sigs
	cancel()
	_ = log.Sync()
}

當(dāng)context的cancel方法被調(diào)用的時(shí)候,agent的Run方法里面select監(jiān)聽(tīng)的ctx.Done()方法也會(huì)立即返回,調(diào)用terminate方法:

func (a *agent) Run(ctx context.Context) error { 
	for {
		select {
		//如果 proxy-Envoy 的狀態(tài)發(fā)生了變化
		case status := <-a.statusCh:
			... 
		case <-ctx.Done():
			a.terminate()
			log.Info("Agent has successfully terminated")
			return nil
		}
	}
}

func (a *agent) terminate() {
	log.Infof("Agent draining Proxy")
	e := a.proxy.Drain()
	if e != nil {
		log.Warnf("Error in invoking drain listeners endpoint %v", e)
	}
	log.Infof("Graceful termination period is %v, starting...", a.terminationDrainDuration)
	//睡眠5s
	time.Sleep(a.terminationDrainDuration)
	log.Infof("Graceful termination period complete, terminating remaining proxies.")
	a.abortAll()
}

terminate方法會(huì)調(diào)用sleep休眠5s,然后調(diào)用abortAll通知所有活躍Epoch進(jìn)行優(yōu)雅關(guān)閉。

var errAbort = errors.New("epoch aborted")

func (a *agent) abortAll() {
	a.mutex.Lock()
	defer a.mutex.Unlock()
	for epoch, abortCh := range a.activeEpochs {
		log.Warnf("Aborting epoch %d...", epoch)
		abortCh <- errAbort
	}
	log.Warnf("Aborted all epochs")
}

abortAll會(huì)獲取到所有活躍的Epoch對(duì)應(yīng)的abortCh管道,并插入一條數(shù)據(jù)。如果這個(gè)時(shí)候有活躍的Epoch正在等待cmd返回結(jié)果,那么會(huì)直接調(diào)用kill方法將進(jìn)程殺死:

func (e *envoy) Run(config interface{}, epoch int, abort <-chan error) error {
	...
	//等待 abort channel 和 done,用于結(jié)束 Envoy 和正確返回當(dāng)前的啟動(dòng)狀態(tài)
	select {
    //用于優(yōu)雅關(guān)閉,后面會(huì)講到
	case err := <-abort:
		log.Warnf("Aborting epoch %d", epoch)
		if errKill := cmd.Process.Kill(); errKill != nil {
			log.Warnf("killing epoch %d caused an error %v", epoch, errKill)
		}
		return err
	case err := <-done:
		return err
	}
}

上述內(nèi)容就是如何進(jìn)行Pilot-agent作用及其源碼的分析,你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI