溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

如何分析SuperEdge 拓?fù)渌惴?/h1>
發(fā)布時(shí)間:2022-01-17 10:53:49 來(lái)源:億速云 閱讀:117 作者:柒染 欄目:云計(jì)算

這篇文章主要為大家分析了如何分析SuperEdge 拓?fù)渌惴ǖ南嚓P(guān)知識(shí)點(diǎn),內(nèi)容詳細(xì)易懂,操作細(xì)節(jié)合理,具有一定參考價(jià)值。如果感興趣的話,不妨跟著跟隨小編一起來(lái)看看,下面跟著小編一起深入學(xué)習(xí)“如何分析SuperEdge 拓?fù)渌惴ā钡闹R(shí)吧。


前言

 

SuperEdge 介紹

SuperEdge 是基于原生 Kubernetes 的邊緣容器管理系統(tǒng)。該系統(tǒng)把云原生能力擴(kuò)展到邊緣側(cè),很好的實(shí)現(xiàn)了云端對(duì)邊緣端的管理和控制。同時(shí) superedge 自研了 service group 實(shí)現(xiàn)了基于邊緣計(jì)算的服務(wù)訪問(wèn)控制,極大簡(jiǎn)化了應(yīng)用從云端部署到邊緣端的過(guò)程。  

SuperEdge service group拓?fù)涓兄匦?/h4>

SuperEdge service group 利用 application-grid-wrapper 實(shí)現(xiàn)拓?fù)涓兄?,完成了同一個(gè) nodeunit 內(nèi)服務(wù)的閉環(huán)訪問(wèn)。

在深入分析 application-grid-wrapper 之前,這里先簡(jiǎn)單介紹一下社區(qū) Kubernetes 原生支持的拓?fù)涓兄匦?sup>[1]

Kubernetes service topology awareness 特性于v1.17發(fā)布 alpha 版本,用于實(shí)現(xiàn)路由拓?fù)湟约熬徒L問(wèn)特性。用戶需要在service 中添加 topologyKeys 字段標(biāo)示拓?fù)鋕ey類型,只有具有相同拓?fù)溆虻?endpoint 會(huì)被訪問(wèn)到,目前有三種 topologyKeys 可供選擇:

  • "kubernetes.io/hostname":訪問(wèn)本節(jié)點(diǎn)內(nèi)(     kubernetes.io/hostname      label value相同)的endpoint,如果沒有則service訪問(wèn)失敗
  • "topology.kubernetes.io/zone":訪問(wèn)相同zone域內(nèi)(     topology.kubernetes.io/zone      label value相同)的endpoint,如果沒有則service訪問(wèn)失敗
  • "topology.kubernetes.io/region":訪問(wèn)相同region域內(nèi)(     topology.kubernetes.io/region      label value 相同)的 endpoint,如果沒有則 service 訪問(wèn)失敗

除了單獨(dú)填寫如上某一個(gè)拓?fù)鋕ey之外,還可以將這些key構(gòu)造成列表進(jìn)行填寫,例如:["kubernetes.io/hostname", "topology.kubernetes.io/zone", "topology.kubernetes.io/region"],這表示:優(yōu)先訪問(wèn)本節(jié)點(diǎn)內(nèi)的 endpoint;如果不存在,則訪問(wèn)同一個(gè) zone 內(nèi)的 endpoint;如果再不存在,則訪問(wèn)同一個(gè) region 內(nèi)的 endpoint,如果都不存在則訪問(wèn)失敗。

另外,還可以在列表最后(只能最后一項(xiàng))添加"*"表示:如果前面拓?fù)溆蚨际?,則訪問(wèn)任何有效的 endpoint,也即沒有限制拓?fù)淞?,示例如下?  
# A Service that prefers node local, zonal, then regional endpoints but falls back to cluster wide endpoints.
apiVersion: v1
kind: Service
metadata:  
  name: my-service
spec:  
  selector:    
    app: my-app  
  ports:    
    - protocol: TCP      
      port: 80      
      targetPort: 9376  
  topologyKeys:    
    - "kubernetes.io/hostname"
    - "topology.kubernetes.io/zone" 
    - "topology.kubernetes.io/region"
    - "*"
 

而service group 實(shí)現(xiàn)的拓?fù)涓兄蜕鐓^(qū)對(duì)比,有如下區(qū)別:

  • service group 拓?fù)?key 可以自定義,也即為 gridUniqKey,使用起來(lái)更加靈活;而社區(qū)實(shí)現(xiàn)目前只有三種選擇:"kubernetes.io/hostname","topology.kubernetes.io/zone"以及"topology.kubernetes.io/region"。
  • service group 只能填寫一個(gè)拓?fù)?key,也即只能訪問(wèn)本拓?fù)溆騼?nèi)有效的 endpoint,無(wú)法訪問(wèn)其它拓?fù)溆虻?endpoint;而社區(qū)可以通過(guò) topologyKey 列表以及"*"實(shí)現(xiàn)其它備選拓?fù)溆?endpoint 的訪問(wèn)。

service group 實(shí)現(xiàn)的拓?fù)涓兄?,service 配置如下:

# A Service that only prefers node zone1al endpoints.
apiVersion: v1
kind: Service
metadata:  
  annotations:    
    topologyKeys: '["zone1"]'  
  labels:    
    superedge.io/grid-selector: servicegrid-demo  
  name: servicegrid-demo-svc
spec:  
  ports:  
  - port: 80    
    protocol: TCP    
    targetPort: 8080  
  selector:    
    appGrid: echo
 

在介紹完 service group 實(shí)現(xiàn)的拓?fù)涓兄螅覀兩钊氲皆创a分析實(shí)現(xiàn)細(xì)節(jié)。同樣的,這里以一個(gè)使用示例開始分析:

# step1: labels edge nodes
$ kubectl  get nodes
NAME    STATUS   ROLES    AGE   VERSIO
Nnode0   Ready    <none>   16d   v1.16.7
node1    Ready    <none>   16d   v1.16.7
node2    Ready    <none>   16d   v1.16.7
# nodeunit1(nodegroup and servicegroup zone1)
$ kubectl --kubeconfig config label nodes node0 zone1=nodeunit1  
# nodeunit2(nodegroup and servicegroup zone1)
$ kubectl --kubeconfig config label nodes node1 zone1=nodeunit2
$ kubectl --kubeconfig config label nodes node2 zone1=nodeunit2

...

# step3: deploy echo ServiceGrid
$ cat <<EOF | kubectl --kubeconfig config apply -f -
apiVersion: superedge.io/v1
kind: ServiceGrid
metadata:  
  name: servicegrid-demo  
  namespace: default
spec:  
  gridUniqKey: zone1  
  template:    
    selector:      
      appGrid: echo    
    ports:    
    - protocol: TCP      
      port: 80      
      targetPort: 8080
EOF
servicegrid.superedge.io/servicegrid-demo created
# note that there is only one relevant service generated
$ kubectl  get svc
NAME                TYPE        CLUSTER-IP        EXTERNAL-IP   PORT(S)   AGE
kubernetes          ClusterIP   192.168.0.1       <none>        443/TCP   16d
servicegrid-demo-svc   ClusterIP   192.168.6.139     <none>        80/TCP    10m
    
# step4: access servicegrid-demo-svc(service topology and closed-looped)
# execute on node0
$ curl 192.168.6.139|grep "node name"        node name:      node0
# execute on node1 and node2
$ curl 192.168.6.139|grep "node name" 
       node name:      node2
$ curl 192.168.6.139|grep "node name"
       node name:      node1
 

在創(chuàng)建完 ServiceGrid CR 后,ServiceGrid Controller 負(fù)責(zé)根據(jù) ServiceGrid產(chǎn)生對(duì)應(yīng)的 service (包含由 serviceGrid.Spec.GridUniqKey 構(gòu)成的 topologyKeys annotations);而 application-grid-wrapper 根據(jù) service 實(shí)現(xiàn)拓?fù)涓兄?,下面依次分析?/p> 

ServiceGrid Controller 分析

ServiceGrid Controller 邏輯和 DeploymentGrid Controller 整體一致,如下:

  • 1、創(chuàng)建并維護(hù) service group 需要的若干 CRDs(包括:ServiceGrid)
  • 2、監(jiān)聽 ServiceGrid event,并填充 ServiceGrid 到工作隊(duì)列中;循環(huán)從隊(duì)列中取出 ServiceGrid 進(jìn)行解析,創(chuàng)建并且維護(hù)對(duì)應(yīng)的 service
  • 3、監(jiān)聽 service event,并將相關(guān)的  ServiceGrid 塞到工作隊(duì)列中進(jìn)行上述處理,協(xié)助上述邏輯達(dá)到整體 reconcile 邏輯

注意這里區(qū)別于 DeploymentGrid Controller:

  • 一個(gè) ServiceGrid 對(duì)象只產(chǎn)生一個(gè) service
  • 只需額外監(jiān)聽 service event,無(wú)需監(jiān)聽 node 事件。因?yàn)?node 的CRUD與 ServiceGrid 無(wú)關(guān)
  • ServiceGrid 對(duì)應(yīng)產(chǎn)生的 service,命名為:     {ServiceGrid}-svc
func (sgc *ServiceGridController) syncServiceGrid(key string) error {    
    startTime := time.Now()    
    klog.V(4).Infof("Started syncing service grid %q (%v)", key, startTime)    
    defer func() {        
      klog.V(4).Infof("Finished syncing service grid %q (%v)", key, time.Since(startTime))    
    }()    
    
    namespace, name, err := cache.SplitMetaNamespaceKey(key)    
    if err != nil {        
      return err    
    }    
    
    sg, err := sgc.svcGridLister.ServiceGrids(namespace).Get(name)    
    if errors.IsNotFound(err) {              
        klog.V(2).Infof("service grid %v has been deleted", key)        
        return nil    
    }    
    if err != nil {        
        return err    
    }    
    
    if sg.Spec.GridUniqKey == "" {           
    sgc.eventRecorder.Eventf(sg, corev1.EventTypeWarning, "Empty", "This service grid has an empty grid key")        
        return nil    
    }    
    
    // get service workload list of this grid    
    svcList, err := sgc.getServiceForGrid(sg)    
    if err != nil {        
        return err    
    }    
    
    if sg.DeletionTimestamp != nil {        
        return nil    
    }    
    
    // sync service grid relevant services workload    
    return sgc.reconcile(sg, svcList)
    }
    
func (sgc *ServiceGridController) getServiceForGrid(sg *crdv1.ServiceGrid) ([]*corev1.Service, error) {    
  svcList, err := sgc.svcLister.Services(sg.Namespace).List(labels.Everything())    
  if err != nil {        
    return nil, err    
  }    
  
  labelSelector, err := common.GetDefaultSelector(sg.Name)    
  if err != nil {        
      return nil, err    
  }   
     
  canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error)
  {        
         fresh, err := 
  sgc.crdClient.SuperedgeV1().ServiceGrids(sg.Namespace).Get(context.TODO(), sg.Name, metav1.GetOptions{})        
      if err != nil {            
        return nil, err        
      }        
      if fresh.UID != sg.UID {           
           return nil, fmt.Errorf("orignal service grid %v/%v is gone: got uid %v, wanted %v", sg.Namespace,                  
              sg.Name, fresh.UID, sg.UID)        
      }        
      return fresh, nil    
    })    
      
    cm := controller.NewServiceControllerRefManager(sgc.svcClient, sg, labelSelector, util.ControllerKind, canAdoptFunc)    
    return cm.ClaimService(svcList)
}
    
func (sgc *ServiceGridController) reconcile(g *crdv1.ServiceGrid, svcList 
[]*corev1.Service) error {    
    var (        
        adds    []*corev1.Service     
        updates []*corev1.Service     
        deletes []*corev1.Service    
    )    
    
    sgTargetSvcName := util.GetServiceName(g)    
    isExistingSvc := false    
    for _, svc := range svcList {     
        if svc.Name == sgTargetSvcName {            
            isExistingSvc = true     
            template := util.KeepConsistence(g, svc)            
            if !apiequality.Semantic.DeepEqual(template, svc) {           
                updates = append(updates, template)            
            }        
        } else {            
            deletes = append(deletes, svc)        
        }    
    }    
    
    if !isExistingSvc {        
        adds = append(adds, util.CreateService(g))    
    }    
    
    return sgc.syncService(adds, updates, deletes)
}

func CreateService(sg *crdv1.ServiceGrid) *corev1.Service {    
    svc := &corev1.Service{        
        ObjectMeta: metav1.ObjectMeta{            
          Name:      GetServiceName(sg),            
          Namespace: sg.Namespace,   
          // Append existed ServiceGrid labels to service to be created  
          Labels: func() map[string]string {                
              if sg.Labels != nil { 
                  newLabels := sg.Labels                    
                  newLabels[common.GridSelectorName] = sg.Name             
                  newLabels[common.GridSelectorUniqKeyName] = sg.Spec.GridUniqKey                    
                  return newLabels              
             } else {               
                  return map[string]string{                        
                      common.GridSelectorName:        sg.Name,             
                      common.GridSelectorUniqKeyName: sg.Spec.GridUniqKey, 
              }               
          }            
      }(),            
      Annotations: make(map[string]string),       
    },        
    Spec: sg.Spec.Template,   
  }    
  
  keys := make([]string, 1)    
  keys[0] = sg.Spec.GridUniqKey    
  keyData, _ := json.Marshal(keys)     
  svc.Annotations[common.TopologyAnnotationsKey] = string(keyData)    
  
  return svc
}
 

由于邏輯與 DeploymentGrid 類似,這里不展開細(xì)節(jié),重點(diǎn)關(guān)注 application-grid-wrapper 部分。

 

application-grid-wrapper 分析

在 ServiceGrid Controller 創(chuàng)建完 service 之后,application-grid-wrapper 的作用就開始啟動(dòng)了:

apiVersion: v1
kind: Service
metadata:  
  annotations:    
    topologyKeys: '["zone1"]'  
  creationTimestamp: "2021-03-03T07:33:30Z"  
  labels:    
    superedge.io/grid-selector: servicegrid-demo  
    name: servicegrid-demo-svc  
    namespace: default  
    ownerReferences:  
    - apiVersion: superedge.io/v1    
      blockOwnerDeletion: true    
      controller: true    
      kind: ServiceGrid    
      name: servicegrid-demo    
      uid: 78c74d3c-72ac-4e68-8c79-f1396af5a581  
    resourceVersion: "127987090"  
    selfLink: /api/v1/namespaces/default/services/servicegrid-demo-svc  
    uid: 8130ba7b-c27e-4c3a-8ceb-4f6dd0178dfc
spec:  
    clusterIP: 192.168.161.1  
    ports:  
    - port: 80    
    protocol: TCP    
    targetPort: 8080  
  selector:    
    appGrid: echo  
  sessionAffinity: None  
  type: ClusterIP
status:  
  loadBalancer: {}
 

為了實(shí)現(xiàn) Kubernetes 零侵入,需要在 kube-proxy 與 apiserver 通信之間添加一層 wrapper,架構(gòu)如下:

如何分析SuperEdge 拓?fù)渌惴?> 
  </figure><p>調(diào)用鏈路如下:</p><pre>kube-proxy -> application-grid-wrapper -> lite-apiserver -> kube-apiserver</pre> 
  <p>因此 application-grid-wrapper 會(huì)起服務(wù),接受來(lái)自 kube-proxy 的請(qǐng)求,如下:</p><pre><code>func (s *interceptorServer) Run(debug bool, bindAddress string, insecure bool, caFile, certFile, keyFile string) error {    <br/>    ...    <br/>    klog.Infof("Start to run interceptor server")    <br/>    /* filter     <br/>    */    <br/>    server := &http.Server{Addr: bindAddress, Handler: s.buildFilterChains(debug)}    <br/>    <br/>    if insecure {        <br/>        return server.ListenAndServe()    <br/>    }    <br/>    ...    <br/>    server.TLSConfig = tlsConfig    <br/>    return server.ListenAndServeTLS("", "")<br/>}<br/><br/>func (s *interceptorServer) buildFilterChains(debug bool) http.Handler {   <br/>    handler := http.Handler(http.NewServeMux())    <br/>    <br/>    handler = s.interceptEndpointsRequest(handler)    <br/>    handler = s.interceptServiceRequest(handler)    <br/>    handler = s.interceptEventRequest(handler)    <br/>    handler = s.interceptNodeRequest(handler)    <br/>    handler = s.logger(handler)    <br/>    <br/>  if debug {        <br/>      handler = s.debugger(handler)   <br/>  }    <br/>  <br/>  return handler<br/>}</code></pre> 
  <section>這里會(huì)首先創(chuàng)建 interceptorServer,然后注冊(cè)處理函數(shù),由外到內(nèi)依次如下: 
   <br/> 
  </section><ul data-tool=
  • debug:接受 debug 請(qǐng)求,返回 wrapper pprof 運(yùn)行信息

  • logger:打印請(qǐng)求日志

  • node:接受 kube-proxy node GET(/api/v1/nodes/{node})請(qǐng)求,并返回 node信息

  • event:接受 kube-proxy events POST(/events)請(qǐng)求,并將請(qǐng)求轉(zhuǎn)發(fā)給 lite-apiserver

  • func (s *interceptorServer) interceptEventRequest(handler http.Handler) http.Handler {  
       return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {           if r.Method != http.MethodPost || !strings.HasSuffix(r.URL.Path, "/events") {          
             handler.ServeHTTP(w, r)     
             return      
         }      
         
         targetURL, _ := url.Parse(s.restConfig.Host)      
         reverseProxy := httputil.NewSingleHostReverseProxy(targetURL)      
         reverseProxy.Transport, _ = rest.TransportFor(s.restConfig)      
         reverseProxy.ServeHTTP(w, r) 
        })
      }
     
    • service:接受 kube-proxy service List&Watch(/api/v1/services)請(qǐng)求,并根據(jù) storageCache 內(nèi)容返回(GetServices)

    • endpoint:接受 kube-proxy endpoint List&Watch (/api/v1/endpoints)請(qǐng)求,并根據(jù) storageCache 內(nèi)容返回(GetEndpoints)

    下面先重點(diǎn)分析 cache 部分的邏輯,然后再回過(guò)頭來(lái)分析具體的 http handler List&Watch 處理邏輯。

    wrapper 為了實(shí)現(xiàn)拓?fù)涓兄?,自己維護(hù)了一個(gè) cache,包括:node,service,endpoint。可以看到在 setupInformers 中注冊(cè)了這三類資源的處理函數(shù):

    type storageCache struct {    
        // hostName is the nodeName of node which application-grid-wrapper deploys on    
        hostName         string    
        wrapperInCluster bool    
        
        // mu lock protect the following map structure    
        mu           sync.RWMutex    
        servicesMap  map[types.NamespacedName]*serviceContainer    
        endpointsMap map[types.NamespacedName]*endpointsContainer    
        nodesMap     map[types.NamespacedName]*nodeContainer    
        
        // service watch channel    
        serviceChan chan<- watch.Event   
        // endpoints watch channel    
        endpointsChan chan<- watch.Event
    }
    ...
    func NewStorageCache(hostName string, wrapperInCluster bool, serviceNotifier, endpointsNotifier chan watch.Event) *storageCache {    
       msc := &storageCache{        
           hostName:         hostName,   
           wrapperInCluster: wrapperInCluster,        
           servicesMap:      make(map[types.NamespacedName]*serviceContainer), 
           endpointsMap:     make(map[types.NamespacedName]*endpointsContainer),        
           nodesMap:         make(map[types.NamespacedName]*nodeContainer),  
           serviceChan:      serviceNotifier,        
           endpointsChan:    endpointsNotifier,    
        }    
        
        return msc
    }
    ...
    func (s *interceptorServer) Run(debug bool, bindAddress string, insecure bool, caFile, certFile, keyFile string) error {    
        ...    
        if err := s.setupInformers(ctx.Done()); err != nil {        
            return err    
       }    
      
       klog.Infof("Start to run interceptor server")    
       /* filter     
       */    
      server := &http.Server{Addr: bindAddress, Handler: s.buildFilterChains(debug)}    
      ...    
      return server.ListenAndServeTLS("", "")
    }

    func (s *interceptorServer) setupInformers(stop <-chan struct{}) error {   
        klog.Infof("Start to run service and endpoints informers")    
        noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil)    
        if err != nil {        
            klog.Errorf("can't parse proxy label, %v", err)        
            return err    
        }    
        
        noHeadlessEndpoints, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil)    
        if err != nil {        
            klog.Errorf("can't parse headless label, %v", err)        
            return err    
        }    
            
        labelSelector := labels.NewSelector()    
        labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints) 
        
        resyncPeriod := time.Minute * 5   
        client := kubernetes.NewForConfigOrDie(s.restConfig)    
        nodeInformerFactory := informers.NewSharedInformerFactory(client, resyncPeriod)    
        informerFactory := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod,        
          informers.WithTweakListOptions(func(options *metav1.ListOptions) {          
              options.LabelSelector = labelSelector.String()        
           }))    
             
        nodeInformer := nodeInformerFactory.Core().V1().Nodes().Informer()    
        serviceInformer := informerFactory.Core().V1().Services().Informer()    
        endpointsInformer := informerFactory.Core().V1().Endpoints().Informer()   
        
        /*    
        */    
        nodeInformer.AddEventHandlerWithResyncPeriod(s.cache.NodeEventHandler(), resyncPeriod)    
        serviceInformer.AddEventHandlerWithResyncPeriod(s.cache.ServiceEventHandler(), resyncPeriod)    
       
        endpointsInformer.AddEventHandlerWithResyncPeriod(s.cache.EndpointsEventHandler(), resyncPeriod)    
        
        go nodeInformer.Run(stop)    
        go serviceInformer.Run(stop)    
        go endpointsInformer.Run(stop)    
        
        if !cache.WaitForNamedCacheSync("node", stop,        
            nodeInformer.HasSynced,        
            serviceInformer.HasSynced,     
            endpointsInformer.HasSynced) {      
            return fmt.Errorf("can't sync informers")    
        }    
        
        return nil
    }

    func (sc *storageCache) NodeEventHandler() cache.ResourceEventHandler {   
         return &nodeHandler{cache: sc}
    }

    func (sc *storageCache) ServiceEventHandler() cache.ResourceEventHandler {    
        return &serviceHandler{cache: sc}

    }
    func (sc *storageCache) EndpointsEventHandler() cache.ResourceEventHandler {    
        return &endpointsHandler{cache: sc}
    }
     
    這里依次分析 NodeEventHandler,ServiceEventHandler 以及 EndpointsEventHandler,如下:  
     

    1、NodeEventHandler

    NodeEventHandler 負(fù)責(zé)監(jiān)聽 node 資源相關(guān) event,并將 node 以及 node Labels 添加到 storageCache.nodesMap 中(key為nodeName,value為node以及node labels)。

    func (nh *nodeHandler) add(node *v1.Node) {    
        sc := nh.cache    
        
        sc.mu.Lock()    
        
        nodeKey := types.NamespacedName{Namespace: node.Namespace, Name: node.Name}    
        klog.Infof("Adding node %v", nodeKey)    
        sc.nodesMap[nodeKey] = &nodeContainer{        
            node:   node,        
            labels: node.Labels,    
        }    
        // update endpoints    
        changedEps := sc.rebuildEndpointsMap()    
        
        sc.mu.Unlock()    
        
        for _, eps := range changedEps { 
            sc.endpointsChan <- eps    
            }
    }

    func (nh *nodeHandler) update(node *v1.Node) {    
        sc := nh.cache    
        
        sc.mu.Lock()    
            nodeKey := types.NamespacedName{Namespace: node.Namespace, Name: node.Name}    
        klog.Infof("Updating node %v", nodeKey)    
        nodeContainer, found := sc.nodesMap[nodeKey]    
        if !found {        
            sc.mu.Unlock()        
            klog.Errorf("Updating non-existed node %v", nodeKey)        
            return    
        }    
        
        nodeContainer.node = node    
        // return directly when labels of node stay unchanged    
        if reflect.DeepEqual(node.Labels, nodeContainer.labels) {        
            sc.mu.Unlock()        
            return    
        }    
        nodeContainer.labels = node.Labels    
        
        // update endpoints    
        changedEps := sc.rebuildEndpointsMap()    
        
        sc.mu.Unlock()    
        
        for _, eps := range changedEps {        
            sc.endpointsChan <- eps  
        }
    }
    ...
     

    同時(shí)由于 node 的改變會(huì)影響 endpoint,因此會(huì)調(diào)用 rebuildEndpointsMap 刷新 storageCache.endpointsMap。

    // rebuildEndpointsMap updates all endpoints stored in storageCache.endpointsMap dynamically and constructs relevant modified events
    func (sc *storageCache) rebuildEndpointsMap() []watch.Event {   
        evts := make([]watch.Event, 0)   
        for name, endpointsContainer := range sc.endpointsMap {       
            newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpointsContainer.endpoints, sc.wrapperInCluster)   
            if apiequality.Semantic.DeepEqual(newEps, endpointsContainer.modified) {            
                continue        
            }        
            sc.endpointsMap[name].modified = newEps        
            evts = append(evts, watch.Event{            
                Type:   watch.Modified,      
                Object: newEps,     
           })    
       }    
       return evts
    }
     

    rebuildEndpointsMap 是 cache 的核心函數(shù),同時(shí)也是拓?fù)涓兄乃惴▽?shí)現(xiàn):

    // pruneEndpoints filters endpoints using serviceTopology rules combined by services topologyKeys and node labels
    func pruneEndpoints(hostName string,
        nodes map[types.NamespacedName]*nodeContainer,    
        services map[types.NamespacedName]*serviceContainer,    
        eps *v1.Endpoints, wrapperInCluster bool) *v1.Endpoints {   
        
        epsKey := types.NamespacedName{Namespace: eps.Namespace, Name: eps.Name}  
        
        if wrapperInCluster {       
            eps = genLocalEndpoints(eps) 
        }   
        
        // dangling endpoints    
        svc, ok := services[epsKey] 
        if !ok {        
            klog.V(4).Infof("Dangling endpoints %s, %+#v", eps.Name, eps.Subsets)        
            return eps    
        }   
            
        // normal service    
        if len(svc.keys) == 0 {     
            klog.V(4).Infof("Normal endpoints %s, %+#v", eps.Name, eps.Subsets)    
            return eps   
        }    
        
        // topology endpoints    
        newEps := eps.DeepCopy()    
        for si := range newEps.Subsets { 
            subnet := &newEps.Subsets[si]
            subnet.Addresses = filterConcernedAddresses(svc.keys, hostName, nodes, subnet.Addresses)        
            subnet.NotReadyAddresses = filterConcernedAddresses(svc.keys, hostName, nodes, subnet.NotReadyAddresses)    
        }    
        klog.V(4).Infof("Topology endpoints %s: subnets from %+#v to %+#v", eps.Name, eps.Subsets, newEps.Subsets) 
        
        return newEps
    }

    // filterConcernedAddresses aims to filter out endpoints addresses within the same node unit
    func filterConcernedAddresses(topologyKeys []string, hostName string, nodes 
    map[types.NamespacedName]*nodeContainer,    
        addresses []v1.EndpointAddress) []v1.EndpointAddress {    
        hostNode, found := nodes[types.NamespacedName{Name: hostName}]    
        if !found {        
            return nil    
        }    
        
        filteredEndpointAddresses := make([]v1.EndpointAddress, 0)    
        for i := range addresses {       
            addr := addresses[i]        
            if nodeName := addr.NodeName; nodeName != nil {            
                 epsNode, found := nodes[types.NamespacedName{Name: *nodeName}]           
                 if !found {             
                     continue            
                 }            
                 if hasIntersectionLabel(topologyKeys, hostNode.labels, epsNode.labels) {                
                     filteredEndpointAddresses = append(filteredEndpointAddresses, addr)            
                 }        
            }    
        }    
        
        return filteredEndpointAddresses 
    }

    func hasIntersectionLabel(keys []string, n1, n2 map[string]string) bool { 
        if n1 == nil || n2 == nil {      
            return false    
        }    
        
        for _, key := range keys {       
            val1, v1found := n1[key]     
            val2, v2found := n2[key]    
            
            if v1found && v2found && val1 == val2 {            
            return true        
            }    
        }    
        
        return false   
    }
     

    算法邏輯如下:

    • 判斷 endpoint 是否為 default kubernetes service,如果是,則將該 endpoint 轉(zhuǎn)化為 wrapper 所在邊緣節(jié)點(diǎn)的 lite-apiserver 地址(127.0.0.1)和端口(5100     3)。
    apiVersion: v1
    kind: Endpoints
    metadata:  
      annotations:    
        superedge.io/local-endpoint: 127.0.0.1    
        superedge.io/local-port: "51003" 
      name: kubernetes  
      namespace: default
    subsets:
    - addresses: 
      - ip: 172.31.0.60  
      ports:  
      - name: https    
      port: xxx    
      protocol: TCP
     
    func genLocalEndpoints(eps *v1.Endpoints) *v1.Endpoints {    
        if eps.Namespace != metav1.NamespaceDefault || eps.Name != MasterEndpointName {        
            return eps    
        }    
        
        klog.V(4).Infof("begin to gen local ep %v", eps)    
        ipAddress, e := eps.Annotations[EdgeLocalEndpoint]    
        if !e {        
            return eps    
        }    
            
        portStr, e := eps.Annotations[EdgeLocalPort]    
        if !e {        
            return eps    
        }    
        
        klog.V(4).Infof("get local endpoint %s:%s", ipAddress, portStr)    
        port, err := strconv.ParseInt(portStr, 10, 32)    
        if err != nil {        
            klog.Errorf("parse int %s err %v", portStr, err)        
            return eps    
        }  
        
        ip := net.ParseIP(ipAddress)   
        if ip == nil {        
            klog.Warningf("parse ip %s nil", ipAddress)        
            return eps    
        }    
        
        nep := eps.DeepCopy()    
        nep.Subsets = []v1.EndpointSubset{        
           {            
               Addresses: []v1.EndpointAddress{                
                  {                    
                       IP: ipAddress,   
                    },            
                },            
                Ports: []v1.EndpointPort{                
                     {                    
                         Protocol: v1.ProtocolTCP,                    
                         Port:     int32(port),                    
                        Name:     "https",                
                    },         
                 },      
             },   
        }  
          
        klog.V(4).Infof("gen new endpoint complete %v", nep)      
        return nep
    }
     
    這樣做的目的是使邊緣節(jié)點(diǎn)上的服務(wù)采用集群內(nèi) (InCluster) 方式訪問(wèn)的 apiserver 為本地的 lite-apiserver,而不是云端的 apiserver。  
     
    • 從 storageCache.servicesMap cache 中根據(jù) endpoint 名稱(namespace/name) 取出對(duì)應(yīng) service,如果該 service 沒有 topologyKeys 則無(wú)需做拓?fù)滢D(zhuǎn)化(非 service group)。

    func getTopologyKeys(objectMeta *metav1.ObjectMeta) []string {   
        if !hasTopologyKey(objectMeta) { 
            return nil    
        }   
        
        var keys []string   
        keyData := objectMeta.Annotations[TopologyAnnotationsKey]    
        if err := json.Unmarshal([]byte(keyData), &keys); err != nil {        
            klog.Errorf("can't parse topology keys %s, %v", keyData, err)       
            return nil    
        }    
        
        return keys
    }
     
    • 調(diào)用 filterConcernedAddresses 過(guò)濾 endpoint.Subsets Addresses 以及 NotReadyAddresses,只保留同一個(gè) service topologyKeys 中的 endpoint。

    // filterConcernedAddresses aims to filter out endpoints addresses within the same node unit
    func filterConcernedAddresses(topologyKeys []string, hostName string, nodes 
    map[types.NamespacedName]*nodeContainer,    
        addresses []v1.EndpointAddress) []v1.EndpointAddress {    
        hostNode, found := nodes[types.NamespacedName{Name: hostName}]    
        if !found {        
            return nil   
        }
            
        filteredEndpointAddresses := make([]v1.EndpointAddress, 0)    
        for i := range addresses {       
            addr := addresses[i]        
            if nodeName := addr.NodeName; nodeName != nil {            
                epsNode, found := nodes[types.NamespacedName{Name: *nodeName}]   
                if !found {                
                    continue            
                }           
                if hasIntersectionLabel(topologyKeys, hostNode.labels, epsNode.labels) {                
                    filteredEndpointAddresses = append(filteredEndpointAddresses, addr)            
                }       
            }   
        }    
        
        return filteredEndpointAddresses
    }
        func hasIntersectionLabel(keys []string, n1, n2 map[string]string) bool {   
            if n1 == nil || n2 == nil {       
                return false    
            }   
        
           for _, key := range keys {       
               val1, v1found := n1[key]       
               val2, v2found := n2[key]       
        
          if v1found && v2found && val1 == val2 {           
              return true        
           }    
       }    
      
       return false
    }
     
    注意:如果 wrapper 所在邊緣節(jié)點(diǎn)沒有 service topologyKeys 標(biāo)簽,則也無(wú)法訪問(wèn)該 service。  
    回到 rebuildEndpointsMap,在調(diào)用 pruneEndpoints 刷新了同一個(gè)拓?fù)溆騼?nèi)的 endpoint 后,會(huì)將修改后的 endpoints 賦值給 storageCache .endpointsMap [endpoint]. modified (該字段記錄了拓?fù)涓兄笮薷牡膃ndpoints)。  
    func (nh *nodeHandler) add(node *v1.Node) {    
        sc := nh.cache   
        
        sc.mu.Lock()    
        
        nodeKey := types.NamespacedName{Namespace: node.Namespace, Name: node.Name}    
        klog.Infof("Adding node %v", nodeKey)    
        sc.nodesMap[nodeKey] = &nodeContainer{        
            node:   node,        
            labels: node.Labels,    
        }    
        // update endpoints    
        changedEps := sc.rebuildEndpointsMap()    
        
        sc.mu.Unlock()    
        
        for _, eps := range changedEps { 
            sc.endpointsChan <- eps    
        }
    }

    // rebuildEndpointsMap updates all endpoints stored in storageCache.endpointsMap dynamically and constructs relevant modified events
    func (sc *storageCache) rebuildEndpointsMap() []watch.Event {    
        evts := make([]watch.Event, 0)  
        for name, endpointsContainer := range sc.endpointsMap {        
            newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpointsContainer.endpoints, sc.wrapperInCluster)        
            if apiequality.Semantic.DeepEqual(newEps, endpointsContainer.modified) {            
                continue        
            }        
            sc.endpointsMap[name].modified = newEps        
            evts = append(evts, watch.Event{            
                Type:   watch.Modified,   
                Object: newEps,        
            })    
        }    
        return evts
    }
     
    另外,如果 endpoints (拓?fù)涓兄笮薷牡?endpoints)發(fā)生改變,會(huì)構(gòu)建 watch event,傳遞給 endpoints handler (interceptEndpointsRequest)處理。  

    2、ServiceEventHandler

    storageCache.servicesMap 結(jié)構(gòu)體 key 為 service 名稱(namespace/name),value 為 serviceContainer,包含如下數(shù)據(jù):

    • svc:service對(duì)象
    • keys:service topologyKeys
    對(duì)于 service 資源的改動(dòng),這里用 Update event 說(shuō)明:  
    func (sh *serviceHandler) update(service *v1.Service) {    
        sc := sh.cache   
        
        sc.mu.Lock()    
        serviceKey := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}    
        klog.Infof("Updating service %v", serviceKey)    
        newTopologyKeys := getTopologyKeys(&service.ObjectMeta)    
        serviceContainer, found := sc.servicesMap[serviceKey]    
        if !found {        
            sc.mu.Unlock()        
            klog.Errorf("update non-existed service, %v", serviceKey)        
            return    
        }   
       
        sc.serviceChan <- watch.Event{   
            Type:   watch.Modified,       
            Object: service,    
        }    
        
        serviceContainer.svc = service   
        // return directly when topologyKeys of service stay unchanged    
        if reflect.DeepEqual(serviceContainer.keys, newTopologyKeys) {        
            sc.mu.Unlock()        
            return    
        }    
        
        serviceContainer.keys = newTopologyKeys    
        
        // update endpoints    
        changedEps := sc.rebuildEndpointsMap()    
        sc.mu.Unlock()    
        
        for _, eps := range changedEps { 
            sc.endpointsChan <- eps    
        }
    }
     
    邏輯如下:  
     
    • 獲取 service topologyKeys
    • 構(gòu)建 service event.Modified event
    • 比較 service topologyKeys 與已經(jīng)存在的是否有差異
    • 如果有差異則更新 topologyKeys,且調(diào)用 rebuildEndpointsMap 刷新該 service 對(duì)應(yīng)的endpoints,如果 endpoints 發(fā)生變化,則構(gòu)建 endpoints watch event,傳遞給 endpoints handler (interceptEndpointsRequest)處理。

    3、EndpointsEventHandler

    storageCache.endpointsMap 結(jié)構(gòu)體 key 為 endpoints 名稱(namespace/name),value 為 endpointsContainer,包含如下數(shù)據(jù):

    • endpoints:拓?fù)湫薷那暗?endpoints
    • modified:拓?fù)湫薷暮蟮?endpoints
    對(duì)于 endpoints 資源的改動(dòng),這里用 Update event 說(shuō)   明:  
    func (eh *endpointsHandler) update(endpoints *v1.Endpoints) {    
        sc := eh.cache   
         
        sc.mu.Lock()   
        endpointsKey := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}    
        klog.Infof("Updating endpoints %v", endpointsKey)    
        
        endpointsContainer, found := sc.endpointsMap[endpointsKey]    
        if !found {        
            sc.mu.Unlock()       
            klog.Errorf("Updating non-existed endpoints %v", endpointsKey)    
            return    
        }    
        endpointsContainer.endpoints = endpoints    
        newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpoints, sc.wrapperInCluster)    
        changed := !apiequality.Semantic.DeepEqual(endpointsContainer.modified, newEps)    
        if changed {        
            endpointsContainer.modified = newEps   
        }    
        sc.mu.Unlock()    
        
        if changed {       
            sc.endpointsChan <- watch.Event{            
                Type:   watch.Modified,   
                Object: newEps,       
            }   
        }
    }
     
    邏輯如下:  
     
    • 更新 endpointsContainer.endpoint 為新的 endpoints 對(duì)象
    • 調(diào)用 pruneEndpoints 獲取拓?fù)渌⑿潞蟮?endpoints
    • 比較 endpointsContainer.modified 與新刷新后的 endpoints
    • 如果有差異則更新 endpointsContainer.modified,則構(gòu)建 endpoints watch event,傳遞給 endpoints handler (interceptEndpointsRequest)處理。
    在分析完 NodeEventHandler,ServiceEventHandler 以及 EndpointsEventHandler 之后,我們回到具體的 http handler List&Watch 處理邏輯上,這里以 endpoints 為例:      
    func (s *interceptorServer) interceptEndpointsRequest(handler http.Handler) http.Handler {    
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 
            if r.Method != http.MethodGet || 
    !strings.HasPrefix(r.URL.Path, "/api/v1/endpoints") {            
                handler.ServeHTTP(w, r)   
                return       
            }        
            
            queries := r.URL.Query()     
            acceptType := r.Header.Get("Accept")        
            info, found := s.parseAccept(acceptType, s.mediaSerializer)       
            if !found {            
                klog.Errorf("can't find %s serializer", acceptType)           
                w.WriteHeader(http.StatusBadRequest)            
                return        
            }        
            
            encoder := 
    scheme.Codecs.EncoderForVersion(info.Serializer, v1.SchemeGroupVersion)   
            // list request        
            if queries.Get("watch") == "" {           
                w.Header().Set("Content-Type", info.MediaType)            
                allEndpoints := s.cache.GetEndpoints()           
                epsItems := make([]v1.Endpoints, 0, len(allEndpoints))        
                for _, eps := range allEndpoints {                
                    epsItems = append(epsItems, *eps)            
                }            
                
                epsList := &v1.EndpointsList{                
                    Items: epsItems,     
                 }           
                 
              err := encoder.Encode(epsList, w)            
              if err != nil {             
                  klog.Errorf("can't marshal endpoints list, %v", err)        
                 w.WriteHeader(http.StatusInternalServerError)               
                  return            
              }    
                
                  return       
          }       
             
          // watch request       
          timeoutSecondsStr := r.URL.Query().Get("timeoutSeconds")   
          timeout := time.Minute        
          if timeoutSecondsStr != "" {  
              timeout, _ = time.ParseDuration(fmt.Sprintf("%ss", timeoutSecondsStr))       
          }        
            
          timer := time.NewTimer(timeout)      
          defer timer.Stop()       
          
          flusher, ok := w.(http.Flusher)      
          if !ok {            
              klog.Errorf("unable to start watch - can't get http.Flusher: %#v", w)            
             w.WriteHeader(http.StatusMethodNotAllowed)            
              return        
            }    
            
            e := restclientwatch.NewEncoder(           
               streaming.NewEncoder(info.StreamSerializer.Framer.NewFrameWriter(w),       
                    scheme.Codecs.EncoderForVersion(info.StreamSerializer, v1.SchemeGroupVersion)),            
              encoder)        
          if info.MediaType == runtime.ContentTypeProtobuf {         
              w.Header().Set("Content-Type", 
        runtime.ContentTypeProtobuf+";stream=watch")       
          } else {           
              w.Header().Set("Content-Type", runtime.ContentTypeJSON)        
          }        
          w.Header().Set("Transfer-Encoding", "chunked")        
          w.WriteHeader(http.StatusOK)   
          flusher.Flush()        
          for {            
              select {            
              case <-r.Context().Done(): 
                  return            
              case <-timer.C:            
                  return           
              case evt := <-s.endpointsWatchCh:               
                  klog.V(4).Infof("Send endpoint watch event: %+#v", evt)     
                  err := e.Encode(&evt)   
                  if err != nil {         
                  klog.Errorf("can't encode watch event, %v", err)             
                  return                
              }               
              
              if len(s.endpointsWatchCh) == 0 {                    
                  flusher.Flush()         
                }            
             }         
          }    
      })
    }
     

    邏輯如下:

    • 如果為 List請(qǐng)求,則調(diào)用 GetEndpoints 獲取拓?fù)湫薷暮蟮?endpoints 列表,并返回

    func (sc *storageCache) GetEndpoints() []*v1.Endpoints {    
        sc.mu.RLock()   
        defer sc.mu.RUnlock()   
        
        epList := make([]*v1.Endpoints, 0, 
    len(sc.endpointsMap))    
        for _, v := range sc.endpointsMap {        
            epList = append(epList, v.modified)    
        }    
        return epList
    }
     
    • 如果為 Watch 請(qǐng)求,則不斷從 storageCache.endpointsWatchCh 管道中接受 watch event,并返回 interceptServiceRequest 邏輯與 interceptEndpointsRequest 一致,這里不再贅述 。

     

    總結(jié)

    • SuperEdge service group 利用 application-grid-wrapper 實(shí)現(xiàn)拓?fù)涓兄?,完成了同一個(gè) nodeunit 內(nèi)服務(wù)的閉環(huán)訪問(wèn)
    • service group 實(shí)現(xiàn)的拓?fù)涓兄?Kubernetes 社區(qū)原生實(shí)現(xiàn)對(duì)比,有如下區(qū)別:
      • service group 拓?fù)?key 可以自定義,也即為 gridUniqKey,使用起來(lái)更加靈活;而社區(qū)實(shí)現(xiàn)目前只有三種選擇:"kubernetes.io/hostname","topology.kubernetes.io/zone"以及"topology.kubernetes.io/region"
      • service group 只能填寫一個(gè)拓?fù)?key,也即只能訪問(wèn)本拓?fù)溆騼?nèi)有效的 endpoint,無(wú)法訪問(wèn)其它拓?fù)溆虻?endpoint;而社區(qū)可以通過(guò) topologyKey 列表以及"*"實(shí)現(xiàn)其它備選拓?fù)溆?endpoint的訪問(wèn)
    • ServiceGrid Controller 負(fù)責(zé)根據(jù) ServiceGrid 產(chǎn)生對(duì)應(yīng)的 service(包含由serviceGrid.Spec.GridUniqKey 構(gòu)成的 topologyKeys annotations),邏輯和 DeploymentGrid Controller 整體一致,如下:
      • 創(chuàng)建并維護(hù) service group 需要的若干CRDs(包括:ServiceGrid)
      • 監(jiān)聽 ServiceGrid event,并填充 ServiceGrid到工作隊(duì)列中;循環(huán)從隊(duì)列中取出 ServiceGrid 進(jìn)行解析,創(chuàng)建并且維護(hù)對(duì)應(yīng)的 service
      • 監(jiān)聽 service event,并將相關(guān)的 ServiceGrid 塞到工作隊(duì)列中進(jìn)行上述處理,協(xié)助上述邏輯達(dá)到整體 reconcile 邏輯
    • 為了實(shí)現(xiàn) Kubernetes 零侵入,需要在 kube-proxy 與 apiserver 通信之間添加一層 wrapper,調(diào)用鏈路如下:     kube-proxy -> application-grid-wrapper -> lite-apiserver -> kube-apiserver
    • application-grid-wrapper 是一個(gè) http server,接受來(lái)自 kube-proxy 的請(qǐng)求,同時(shí)維護(hù)一個(gè)資源緩存,處理函數(shù)由外到內(nèi)依次如下:
      • debug:接受 debug 請(qǐng)求,返回 wrapper pprof 運(yùn)行信息
      • logger:打印請(qǐng)求日志
      • node:接受 kube-proxy node GET (/api/v1/nodes/{node}) 請(qǐng)求,并返回 node 信息
      • event:接受 kube-proxy events POST (/events) 請(qǐng)求,并將請(qǐng)求轉(zhuǎn)發(fā)給 lite-apiserver
      • service:接受 kube-proxy service List&Watch (/api/v1/services) 請(qǐng)求,并根據(jù) storageCache 內(nèi)容返回 (GetServices)
      • endpoint:接受 kube-proxy endpoint List&Watch (/api/v1/endpoints) 請(qǐng)求,并根據(jù) storageCache 內(nèi)容返回(GetEndpoints)
    • wrapper 為了實(shí)現(xiàn)拓?fù)涓兄?,維護(hù)了一個(gè)資源 cache,包括:node,service,endpoint,同時(shí)注冊(cè)了相關(guān) event 處理函數(shù)。核心拓?fù)渌惴ㄟ壿嫗椋赫{(diào)用 filterConcernedAddresses 過(guò)濾 endpoint.Subsets Addresses 以及 NotReadyAddresses,只保留同一個(gè) service topologyKeys 中的 endpoint。另外,如果 wrapper 所在邊緣節(jié)點(diǎn)沒有 service topologyKeys 標(biāo)簽,則也無(wú)法訪問(wèn)該service
    • wrapper 接受來(lái)自 kube-proxy 對(duì) endpoints 以及 service 的 List&Watch 請(qǐng)求,以endpoints 為例:如果為L(zhǎng)ist 請(qǐng)求,則調(diào)用 GetEndpoints 獲取拓?fù)湫薷暮蟮?endpoints 列表,并返回;如果為 Watch 請(qǐng)求,則不斷從storageCache.endpointsWatchCh 管道中接受 watch event,并返回。service 邏輯與 endpoints 一致

    關(guān)于“如何分析SuperEdge 拓?fù)渌惴ā本徒榻B到這了,更多相關(guān)內(nèi)容可以搜索億速云以前的文章,希望能夠幫助大家答疑解惑,請(qǐng)多多支持億速云網(wǎng)站!

    向AI問(wèn)一下細(xì)節(jié)

    免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

    AI