溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Istio Pilot代碼是什么

發(fā)布時間:2021-12-24 10:01:20 來源:億速云 閱讀:124 作者:iii 欄目:云計算

本篇內(nèi)容介紹了“Istio Pilot代碼是什么”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!

Istio Pilot 組件介紹

在Istio架構(gòu)中,Pilot組件屬于最核心的組件,負責(zé)了服務(wù)網(wǎng)格中的流量管理以及控制面和數(shù)據(jù)面之間的配置下發(fā)。Pilot內(nèi)部的代碼結(jié)構(gòu)比較復(fù)雜,本文中我們將通過對Pilot的代碼的深入分析來了解Pilot實現(xiàn)原理。

首先我們來看一下Pilot在Istio中的功能定位,Pilot將服務(wù)信息和配置數(shù)據(jù)轉(zhuǎn)換為xDS接口的標準數(shù)據(jù)結(jié)構(gòu),通過gRPC下發(fā)到數(shù)據(jù)面的Envoy。如果把Pilot看成一個處理數(shù)據(jù)的黑盒,則其有兩個輸入,一個輸出:

目前Pilot的輸入包括兩部分數(shù)據(jù)來源:

  • 服務(wù)數(shù)據(jù): 來源于各個服務(wù)注冊表(Service Registry),例如Kubernetes中注冊的Service,Consul Catalog中的服務(wù)等。

  • 配置規(guī)則: 各種配置規(guī)則,包括路由規(guī)則及流量管理規(guī)則等,通過Kubernetes CRD(Custom Resources Definition)形式定義并存儲在Kubernetes中。

Pilot的輸出為符合xDS接口的數(shù)據(jù)面配置數(shù)據(jù),并通過gRPC Streaming接口將配置數(shù)據(jù)推送到數(shù)據(jù)面的Envoy中。

備注:Istio代碼庫在不停變化更新中,本文分析所基于的代碼commit為: d539abe00c2599d80c6d64296f78d3bb8ab4b033

Pilot-Discovery 代碼結(jié)構(gòu)

Istio Pilot的代碼分為Pilot-Discovery和Pilot-Agent,其中Pilot-Agent用于在數(shù)據(jù)面負責(zé)Envoy的生命周期管理,Pilot-Discovery才是控制面進行流量管理的組件,本文將重點分析控制面部分,即Pilot-Discovery的代碼。

Pilot-Discovery的入口函數(shù)為:pilot/cmd/pilot-discovery/main.go中的main方法。main方法中創(chuàng)建了Discovery Server,Discovery Server中主要包含三部分邏輯:

Config Controller

Config Controller用于管理各種配置數(shù)據(jù),包括用戶創(chuàng)建的流量管理規(guī)則和策略。Istio目前支持三種類型的Config Controller:

  • Kubernetes:使用Kubernetes來作為配置數(shù)據(jù)的存儲,該方式直接依附于Kubernetes強大的CRD機制來存儲配置數(shù)據(jù),簡單方便,是Istio最開始使用的配置存儲方案。

  • MCP (Mesh Configuration Protocol):使用Kubernetes來存儲配置數(shù)據(jù)導(dǎo)致了Istio和Kubernetes的耦合,限制了Istio在非Kubernetes環(huán)境下的運用。為了解決該耦合,Istio社區(qū)提出了MCP,MCP定義了一個向Istio控制面下發(fā)配置數(shù)據(jù)的標準協(xié)議,Istio Pilot作為MCP Client,任何實現(xiàn)了MCP協(xié)議的Server都可以通過MCP協(xié)議向Pilot下發(fā)配置,從而解除了Istio和Kubernetes的耦合。如果想要了解更多關(guān)于MCP的內(nèi)容,請參考文后的鏈接。

  • Memory:一個在內(nèi)存中的Config Controller實現(xiàn),主要用于測試。

目前Istio的配置包括:

  • Virtual Service: 定義流量路由規(guī)則。

  • Destination Rule: 定義和一個服務(wù)或者subset相關(guān)的流量處理規(guī)則,包括負載均衡策略,連接池大小,斷路器設(shè)置,subset定義等等。

  • Gateway: 定義入口網(wǎng)關(guān)上對外暴露的服務(wù)。

  • Service Entry: 通過定義一個Service Entry可以將一個外部服務(wù)手動添加到服務(wù)網(wǎng)格中。

  • Envoy Filter: 通過Pilot在Envoy的配置中添加一個自定義的Filter。

Service Controller

Service Controller用于管理各種Service Registry,提出服務(wù)發(fā)現(xiàn)數(shù)據(jù),目前Istio支持的Service Registry包括:

  • Kubernetes:對接Kubernetes Registry,可以將Kubernetes中定義的Service和Instance采集到Istio中。

  • Consul: 對接Consul Catalog,將Consul中定義的Service采集到Istio中。

  • MCP: 和MCP config controller類似,從MCP Server中獲取Service和Service Instance。

  • Memory: 一個內(nèi)存中的Service Controller實現(xiàn),主要用于測試。

Discovery Service

Discovery Service中主要包含下述邏輯:

  • 啟動gRPC Server并接收來自Envoy端的連接請求。

  • 接收Envoy端的xDS請求,從Config Controller和Service Controller中獲取配置和服務(wù)信息,生成響應(yīng)消息發(fā)送給Envoy。

  • 監(jiān)聽來自Config Controller的配置變化消息和來自Service Controller的服務(wù)變化消息,并將配置和服務(wù)變化內(nèi)容通過xDS接口推送到Envoy。(備注:目前Pilot未實現(xiàn)增量變化推送,每次變化推送的是全量配置,在網(wǎng)格中服務(wù)較多的情況下可能會有性能問題)。

Pilot-Discovery 業(yè)務(wù)流程

Pilot-Disocvery包括以下主要的幾個業(yè)務(wù)流程:

初始化Pilot-Discovery的各個主要組件

Pilot-Discovery命令的入口為pilot/cmd/pilot-discovery/main.go中的main方法,在該方法中創(chuàng)建Pilot Server,Server代碼位于文件pilot/pkg/bootstrap/server.go中。Server主要做了下面一些初始化工作:

  • 創(chuàng)建并初始化Config Controller。

  • 創(chuàng)建并初始化Service Controller。

  • 創(chuàng)建并初始化Discovery Server,Pilot中創(chuàng)建了基于Envoy V1 API的HTTP Discovery Server和基于Envoy V2 API的GPRC Discovery Server。由于V1已經(jīng)被廢棄,本文將主要分析V2 API的gRPC Discovery Server。

  • 將Discovery Server注冊為Config Controller和Service Controller的Event Handler,監(jiān)聽配置和服務(wù)變化消息。

創(chuàng)建gRPC Server并接收Envoy的連接請求

Pilot Server創(chuàng)建了一個gRPC Server,用于監(jiān)聽和接收來自Envoy的xDS請求。pilot/pkg/proxy/envoy/v2/ads.go 中的 DiscoveryServer.StreamAggregatedResources方法被注冊為gRPC Server的服務(wù)處理方法。

當(dāng)gRPC Server收到來自Envoy的連接時,會調(diào)用DiscoveryServer.StreamAggregatedResources方法,在該方法中創(chuàng)建一個XdsConnection對象,并開啟一個goroutine從該connection中接收客戶端的xDS請求并進行處理;如果控制面的配置發(fā)生變化,Pilot也會通過該connection把配置變化主動推送到Envoy端。

配置變化后向Envoy推送更新

這是Pilot中最復(fù)雜的一個業(yè)務(wù)流程,主要是因為代碼中采用了多個channel和queue對變化消息進行合并和轉(zhuǎn)發(fā)。該業(yè)務(wù)流程如下:

  1. Config Controller或者Service Controller在配置或服務(wù)發(fā)生變化時通過回調(diào)方法通知Discovery Server,Discovery Server將變化消息放入到Push Channel中。

  2. Discovery Server通過一個goroutine從Push Channel中接收變化消息,將一段時間內(nèi)連續(xù)發(fā)生的變化消息進行合并。如果超過指定時間沒有新的變化消息,則將合并后的消息加入到一個隊列Push Queue中。

  3. 另一個goroutine從Push Queue中取出變化消息,生成XdsEvent,發(fā)送到每個客戶端連接的Push Channel中。

  4. 在DiscoveryServer.StreamAggregatedResources方法中從Push Channel中取出XdsEvent,然后根據(jù)上下文生成符合xDS接口規(guī)范的DiscoveryResponse,通過gRPC推送給Envoy端。(gRPC會為每個client連接單獨分配一個goroutine來進行處理,因此不同客戶端連接的StreamAggregatedResources處理方法是在不同goroutine中處理的)

響應(yīng)Envoy主動發(fā)起的xDS請求

Pilot和Envoy之間建立的是一個雙向的Streaming gRPC服務(wù)調(diào)用,因此Pilot可以在配置變化時向Envoy推送,Envoy也可以主動發(fā)起xDS調(diào)用請求獲取配置。Envoy主動發(fā)起xDS請求的流程如下:

  1. Envoy通過創(chuàng)建好的gRPC連接發(fā)送一個DiscoveryRequest

  2. Discovery Server通過一個goroutine從XdsConnection中接收來自Envoy的DiscoveryRequest,并將請求發(fā)送到ReqChannel中

  3. Discovery Server的另一個goroutine從ReqChannel中接收DiscoveryRequest,根據(jù)上下文生成符合xDS接口規(guī)范的DiscoveryResponse,然后返回給Envoy。

Discovery Server業(yè)務(wù)處理關(guān)鍵代碼片段

下面是Discovery Server的關(guān)鍵代碼片段和對應(yīng)的業(yè)務(wù)邏輯注解,為方便閱讀,代碼中只保留了邏輯主干,去掉了一些不重要的細節(jié)。

處理xDS請求和推送的關(guān)鍵代碼

該部分關(guān)鍵代碼位于 istio.io/istio/pilot/pkg/proxy/envoy/v2/ads.go 文件的StreamAggregatedResources 方法中。StreamAggregatedResources方法被注冊為gRPC Server的handler,對于每一個客戶端連接,gRPC Server會啟動一個goroutine來進行處理。

代碼中主要包含以下業(yè)務(wù)邏輯:

  • 從gRPC連接中接收來自Envoy的xDS 請求,并放到一個channel reqChannel中。

  • 從reqChannel中接收xDS請求,根據(jù)xDS請求的類型構(gòu)造響應(yīng)并發(fā)送給Envoy。

  • 從connection的pushChannel中接收Service或者Config變化后的通知,構(gòu)造xDS響應(yīng)消息,將變化內(nèi)容推送到Envoy端。

// StreamAggregatedResources implements the ADS interface.
func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
        
    ......

    //創(chuàng)建一個goroutine來接收來自Envoy的xDS請求,并將請求放到reqChannel中
    con := newXdsConnection(peerAddr, stream)
    reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
    go receiveThread(con, reqChannel, &receiveError)

     ......
    
    for {
        select{
        //從reqChannel接收Envoy端主動發(fā)起的xDS請求
        case discReq, ok := <-reqChannel:        
            //根據(jù)請求的類型構(gòu)造相應(yīng)的xDS Response并發(fā)送到Envoy端
            switch discReq.TypeUrl {
            case ClusterType:
                err := s.pushCds(con, s.globalPushContext(), versionInfo())
            case ListenerType:
                err := s.pushLds(con, s.globalPushContext(), versionInfo())
            case RouteType:
                err := s.pushRoute(con, s.globalPushContext(), versionInfo())
            case EndpointType:
                err := s.pushEds(s.globalPushContext(), con, versionInfo(), nil)
            }

        //從PushChannel接收Service或者Config變化后的通知
        case pushEv := <-con.pushChannel:
            //將變化內(nèi)容推送到Envoy端
            err := s.pushConnection(con, pushEv)   
        }            
    }
}
處理服務(wù)和配置變化的關(guān)鍵代碼

該部分關(guān)鍵代碼位于 istio.io/istio/pilot/pkg/proxy/envoy/v2/discovery.go 文件中,用于監(jiān)聽服務(wù)和配置變化消息,并將變化消息合并后通過Channel發(fā)送給前面提到的 StreamAggregatedResources 方法進行處理。

ConfigUpdate是處理服務(wù)和配置變化的回調(diào)函數(shù),service controller和config controller在發(fā)生變化時會調(diào)用該方法通知Discovery Server。

func (s *DiscoveryServer) ConfigUpdate(req *model.PushRequest) {
  inboundConfigUpdates.Increment()

  //服務(wù)或配置變化后,將一個PushRequest發(fā)送到pushChannel中
  s.pushChannel <- req
}

在debounce方法中將連續(xù)發(fā)生的PushRequest進行合并,如果一段時間內(nèi)沒有收到新的PushRequest,再發(fā)起推送;以避免由于服務(wù)和配置頻繁變化給系統(tǒng)帶來較大壓力。

// The debounce helper function is implemented to enable mocking
func debounce(ch chan *model.PushRequest, stopCh <-chan struct{}, pushFn func(req *model.PushRequest)) {

    ......

    pushWorker := func() {
        eventDelay := time.Since(startDebounce)
        quietTime := time.Since(lastConfigUpdateTime)

        // it has been too long or quiet enough
        //一段時間內(nèi)沒有收到新的PushRequest,再發(fā)起推送
        if eventDelay >= DebounceMax || quietTime >= DebounceAfter {
            if req != nil {
                pushCounter++
                adsLog.Infof("Push debounce stable[%d] %d: %v since last change, %v since last push, full=%v",
                pushCounter, debouncedEvents,
                quietTime, eventDelay, req.Full)

                free = false
                go push(req)
                req = nil
                debouncedEvents = 0
            }
        } else {
           timeChan = time.After(DebounceAfter - quietTime)
        }
    }
    for {
        select {
        ......

        case r := <-ch:
            lastConfigUpdateTime = time.Now()
            if debouncedEvents == 0 {
                timeChan = time.After(DebounceAfter)
                startDebounce = lastConfigUpdateTime
            }
            debouncedEvents++
            //合并連續(xù)發(fā)生的多個PushRequest
            req = req.Merge(r)
        case <-timeChan:
           if free {
               pushWorker()
            }
        case <-stopCh:
            return
    }
  }
}

“Istio Pilot代碼是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI