SuperEdge service group 利用 application-grid-wrapper 實(shí)現(xiàn)拓?fù)涓兄?,完成了同一個(gè) nodeunit 內(nèi)服務(wù)的閉環(huán)訪問(wèn)。
在深入分析 application-grid-wrapper 之前,這里先簡(jiǎn)單介紹一下社區(qū) Kubernetes 原生支持的拓?fù)涓兄匦?sup>[1]
Kubernetes service topology awareness 特性于v1.17發(fā)布 alpha 版本,用于實(shí)現(xiàn)路由拓?fù)湟约熬徒L問(wèn)特性。用戶需要在service 中添加 topologyKeys 字段標(biāo)示拓?fù)鋕ey類型,只有具有相同拓?fù)溆虻?endpoint 會(huì)被訪問(wèn)到,目前有三種 topologyKeys 可供選擇:
在介紹完 service group 實(shí)現(xiàn)的拓?fù)涓兄螅覀兩钊氲皆创a分析實(shí)現(xiàn)細(xì)節(jié)。同樣的,這里以一個(gè)使用示例開始分析:
在創(chuàng)建完 ServiceGrid CR 后,ServiceGrid Controller 負(fù)責(zé)根據(jù) ServiceGrid產(chǎn)生對(duì)應(yīng)的 service (包含由 serviceGrid.Spec.GridUniqKey 構(gòu)成的 topologyKeys annotations);而 application-grid-wrapper 根據(jù) service 實(shí)現(xiàn)拓?fù)涓兄?,下面依次分析?/p>
由于邏輯與 DeploymentGrid 類似,這里不展開細(xì)節(jié),重點(diǎn)關(guān)注 application-grid-wrapper 部分。
在 ServiceGrid Controller 創(chuàng)建完 service 之后,application-grid-wrapper 的作用就開始啟動(dòng)了:
為了實(shí)現(xiàn) Kubernetes 零侵入,需要在 kube-proxy 與 apiserver 通信之間添加一層 wrapper,架構(gòu)如下:
debug:接受 debug 請(qǐng)求,返回 wrapper pprof 運(yùn)行信息
node:接受 kube-proxy node GET(/api/v1/nodes/{node})請(qǐng)求,并返回 node信息
event:接受 kube-proxy events POST(/events)請(qǐng)求,并將請(qǐng)求轉(zhuǎn)發(fā)給 lite-apiserver
func (s *interceptorServer) interceptEventRequest(handler http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost || !strings.HasSuffix(r.URL.Path, "/events") { handler.ServeHTTP(w, r) return } targetURL, _ := url.Parse(s.restConfig.Host) reverseProxy := httputil.NewSingleHostReverseProxy(targetURL) reverseProxy.Transport, _ = rest.TransportFor(s.restConfig) reverseProxy.ServeHTTP(w, r) }) }
service:接受 kube-proxy service List&Watch(/api/v1/services)請(qǐng)求,并根據(jù) storageCache 內(nèi)容返回(GetServices)
endpoint:接受 kube-proxy endpoint List&Watch (/api/v1/endpoints)請(qǐng)求,并根據(jù) storageCache 內(nèi)容返回(GetEndpoints)
下面先重點(diǎn)分析 cache 部分的邏輯,然后再回過(guò)頭來(lái)分析具體的 http handler List&Watch 處理邏輯。
wrapper 為了實(shí)現(xiàn)拓?fù)涓兄?,自己維護(hù)了一個(gè) cache,包括:node,service,endpoint。可以看到在 setupInformers 中注冊(cè)了這三類資源的處理函數(shù):
type storageCache struct { // hostName is the nodeName of node which application-grid-wrapper deploys on hostName string wrapperInCluster bool // mu lock protect the following map structure mu sync.RWMutex servicesMap map[types.NamespacedName]*serviceContainer endpointsMap map[types.NamespacedName]*endpointsContainer nodesMap map[types.NamespacedName]*nodeContainer // service watch channel serviceChan chan<- watch.Event // endpoints watch channel endpointsChan chan<- watch.Event } ... func NewStorageCache(hostName string, wrapperInCluster bool, serviceNotifier, endpointsNotifier chan watch.Event) *storageCache { msc := &storageCache{ hostName: hostName, wrapperInCluster: wrapperInCluster, servicesMap: make(map[types.NamespacedName]*serviceContainer), endpointsMap: make(map[types.NamespacedName]*endpointsContainer), nodesMap: make(map[types.NamespacedName]*nodeContainer), serviceChan: serviceNotifier, endpointsChan: endpointsNotifier, } return msc } ... func (s *interceptorServer) Run(debug bool, bindAddress string, insecure bool, caFile, certFile, keyFile string) error { ... if err := s.setupInformers(ctx.Done()); err != nil { return err } klog.Infof("Start to run interceptor server") /* filter */ server := &http.Server{Addr: bindAddress, Handler: s.buildFilterChains(debug)} ... return server.ListenAndServeTLS("", "") } func (s *interceptorServer) setupInformers(stop <-chan struct{}) error { klog.Infof("Start to run service and endpoints informers") noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil) if err != nil { klog.Errorf("can't parse proxy label, %v", err) return err } noHeadlessEndpoints, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil) if err != nil { klog.Errorf("can't parse headless label, %v", err) return err } labelSelector := labels.NewSelector() labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints) resyncPeriod := time.Minute * 5 client := kubernetes.NewForConfigOrDie(s.restConfig) nodeInformerFactory := informers.NewSharedInformerFactory(client, resyncPeriod) informerFactory := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod, informers.WithTweakListOptions(func(options *metav1.ListOptions) { options.LabelSelector = labelSelector.String() })) nodeInformer := nodeInformerFactory.Core().V1().Nodes().Informer() serviceInformer := informerFactory.Core().V1().Services().Informer() endpointsInformer := informerFactory.Core().V1().Endpoints().Informer() /* */ nodeInformer.AddEventHandlerWithResyncPeriod(s.cache.NodeEventHandler(), resyncPeriod) serviceInformer.AddEventHandlerWithResyncPeriod(s.cache.ServiceEventHandler(), resyncPeriod) endpointsInformer.AddEventHandlerWithResyncPeriod(s.cache.EndpointsEventHandler(), resyncPeriod) go nodeInformer.Run(stop) go serviceInformer.Run(stop) go endpointsInformer.Run(stop) if !cache.WaitForNamedCacheSync("node", stop, nodeInformer.HasSynced, serviceInformer.HasSynced, endpointsInformer.HasSynced) { return fmt.Errorf("can't sync informers") } return nil } func (sc *storageCache) NodeEventHandler() cache.ResourceEventHandler { return &nodeHandler{cache: sc} } func (sc *storageCache) ServiceEventHandler() cache.ResourceEventHandler { return &serviceHandler{cache: sc} } func (sc *storageCache) EndpointsEventHandler() cache.ResourceEventHandler { return &endpointsHandler{cache: sc} }
這里依次分析 NodeEventHandler,ServiceEventHandler 以及 EndpointsEventHandler,如下:
1、NodeEventHandler
NodeEventHandler 負(fù)責(zé)監(jiān)聽 node 資源相關(guān) event,并將 node 以及 node Labels 添加到 storageCache.nodesMap 中(key為nodeName,value為node以及node labels)。
func (nh *nodeHandler) add(node *v1.Node) { sc := nh.cache sc.mu.Lock() nodeKey := types.NamespacedName{Namespace: node.Namespace, Name: node.Name} klog.Infof("Adding node %v", nodeKey) sc.nodesMap[nodeKey] = &nodeContainer{ node: node, labels: node.Labels, } // update endpoints changedEps := sc.rebuildEndpointsMap() sc.mu.Unlock() for _, eps := range changedEps { sc.endpointsChan <- eps } } func (nh *nodeHandler) update(node *v1.Node) { sc := nh.cache sc.mu.Lock() nodeKey := types.NamespacedName{Namespace: node.Namespace, Name: node.Name} klog.Infof("Updating node %v", nodeKey) nodeContainer, found := sc.nodesMap[nodeKey] if !found { sc.mu.Unlock() klog.Errorf("Updating non-existed node %v", nodeKey) return } nodeContainer.node = node // return directly when labels of node stay unchanged if reflect.DeepEqual(node.Labels, nodeContainer.labels) { sc.mu.Unlock() return } nodeContainer.labels = node.Labels // update endpoints changedEps := sc.rebuildEndpointsMap() sc.mu.Unlock() for _, eps := range changedEps { sc.endpointsChan <- eps } } ...
同時(shí)由于 node 的改變會(huì)影響 endpoint,因此會(huì)調(diào)用 rebuildEndpointsMap 刷新 storageCache.endpointsMap。
// rebuildEndpointsMap updates all endpoints stored in storageCache.endpointsMap dynamically and constructs relevant modified events func (sc *storageCache) rebuildEndpointsMap() []watch.Event { evts := make([]watch.Event, 0) for name, endpointsContainer := range sc.endpointsMap { newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpointsContainer.endpoints, sc.wrapperInCluster) if apiequality.Semantic.DeepEqual(newEps, endpointsContainer.modified) { continue } sc.endpointsMap[name].modified = newEps evts = append(evts, watch.Event{ Type: watch.Modified, Object: newEps, }) } return evts }
rebuildEndpointsMap 是 cache 的核心函數(shù),同時(shí)也是拓?fù)涓兄乃惴▽?shí)現(xiàn):
// pruneEndpoints filters endpoints using serviceTopology rules combined by services topologyKeys and node labels func pruneEndpoints(hostName string, nodes map[types.NamespacedName]*nodeContainer, services map[types.NamespacedName]*serviceContainer, eps *v1.Endpoints, wrapperInCluster bool) *v1.Endpoints { epsKey := types.NamespacedName{Namespace: eps.Namespace, Name: eps.Name} if wrapperInCluster { eps = genLocalEndpoints(eps) } // dangling endpoints svc, ok := services[epsKey] if !ok { klog.V(4).Infof("Dangling endpoints %s, %+#v", eps.Name, eps.Subsets) return eps } // normal service if len(svc.keys) == 0 { klog.V(4).Infof("Normal endpoints %s, %+#v", eps.Name, eps.Subsets) return eps } // topology endpoints newEps := eps.DeepCopy() for si := range newEps.Subsets { subnet := &newEps.Subsets[si] subnet.Addresses = filterConcernedAddresses(svc.keys, hostName, nodes, subnet.Addresses) subnet.NotReadyAddresses = filterConcernedAddresses(svc.keys, hostName, nodes, subnet.NotReadyAddresses) } klog.V(4).Infof("Topology endpoints %s: subnets from %+#v to %+#v", eps.Name, eps.Subsets, newEps.Subsets) return newEps } // filterConcernedAddresses aims to filter out endpoints addresses within the same node unit func filterConcernedAddresses(topologyKeys []string, hostName string, nodes map[types.NamespacedName]*nodeContainer, addresses []v1.EndpointAddress) []v1.EndpointAddress { hostNode, found := nodes[types.NamespacedName{Name: hostName}] if !found { return nil } filteredEndpointAddresses := make([]v1.EndpointAddress, 0) for i := range addresses { addr := addresses[i] if nodeName := addr.NodeName; nodeName != nil { epsNode, found := nodes[types.NamespacedName{Name: *nodeName}] if !found { continue } if hasIntersectionLabel(topologyKeys, hostNode.labels, epsNode.labels) { filteredEndpointAddresses = append(filteredEndpointAddresses, addr) } } } return filteredEndpointAddresses } func hasIntersectionLabel(keys []string, n1, n2 map[string]string) bool { if n1 == nil || n2 == nil { return false } for _, key := range keys { val1, v1found := n1[key] val2, v2found := n2[key] if v1found && v2found && val1 == val2 { return true } } return false }
算法邏輯如下:
判斷 endpoint 是否為 default kubernetes service,如果是,則將該 endpoint 轉(zhuǎn)化為 wrapper 所在邊緣節(jié)點(diǎn)的 lite-apiserver 地址(127.0.0.1)和端口(5100
3)。 apiVersion: v1 kind: Endpoints metadata: annotations: superedge.io/local-endpoint: 127.0.0.1 superedge.io/local-port: "51003" name: kubernetes namespace: default subsets: - addresses: - ip: 172.31.0.60 ports: - name: https port: xxx protocol: TCP
func genLocalEndpoints(eps *v1.Endpoints) *v1.Endpoints { if eps.Namespace != metav1.NamespaceDefault || eps.Name != MasterEndpointName { return eps } klog.V(4).Infof("begin to gen local ep %v", eps) ipAddress, e := eps.Annotations[EdgeLocalEndpoint] if !e { return eps } portStr, e := eps.Annotations[EdgeLocalPort] if !e { return eps } klog.V(4).Infof("get local endpoint %s:%s", ipAddress, portStr) port, err := strconv.ParseInt(portStr, 10, 32) if err != nil { klog.Errorf("parse int %s err %v", portStr, err) return eps } ip := net.ParseIP(ipAddress) if ip == nil { klog.Warningf("parse ip %s nil", ipAddress) return eps } nep := eps.DeepCopy() nep.Subsets = []v1.EndpointSubset{ { Addresses: []v1.EndpointAddress{ { IP: ipAddress, }, }, Ports: []v1.EndpointPort{ { Protocol: v1.ProtocolTCP, Port: int32(port), Name: "https", }, }, }, } klog.V(4).Infof("gen new endpoint complete %v", nep) return nep }
這樣做的目的是使邊緣節(jié)點(diǎn)上的服務(wù)采用集群內(nèi) (InCluster) 方式訪問(wèn)的 apiserver 為本地的 lite-apiserver,而不是云端的 apiserver。
func getTopologyKeys(objectMeta *metav1.ObjectMeta) []string { if !hasTopologyKey(objectMeta) { return nil } var keys []string keyData := objectMeta.Annotations[TopologyAnnotationsKey] if err := json.Unmarshal([]byte(keyData), &keys); err != nil { klog.Errorf("can't parse topology keys %s, %v", keyData, err) return nil } return keys }
// filterConcernedAddresses aims to filter out endpoints addresses within the same node unit func filterConcernedAddresses(topologyKeys []string, hostName string, nodes map[types.NamespacedName]*nodeContainer, addresses []v1.EndpointAddress) []v1.EndpointAddress { hostNode, found := nodes[types.NamespacedName{Name: hostName}] if !found { return nil } filteredEndpointAddresses := make([]v1.EndpointAddress, 0) for i := range addresses { addr := addresses[i] if nodeName := addr.NodeName; nodeName != nil { epsNode, found := nodes[types.NamespacedName{Name: *nodeName}] if !found { continue } if hasIntersectionLabel(topologyKeys, hostNode.labels, epsNode.labels) { filteredEndpointAddresses = append(filteredEndpointAddresses, addr) } } } return filteredEndpointAddresses } func hasIntersectionLabel(keys []string, n1, n2 map[string]string) bool { if n1 == nil || n2 == nil { return false } for _, key := range keys { val1, v1found := n1[key] val2, v2found := n2[key] if v1found && v2found && val1 == val2 { return true } } return false }
注意:如果 wrapper 所在邊緣節(jié)點(diǎn)沒有 service topologyKeys 標(biāo)簽,則也無(wú)法訪問(wèn)該 service。
回到 rebuildEndpointsMap,在調(diào)用 pruneEndpoints 刷新了同一個(gè)拓?fù)溆騼?nèi)的 endpoint 后,會(huì)將修改后的 endpoints 賦值給 storageCache .endpointsMap [endpoint]. modified (該字段記錄了拓?fù)涓兄笮薷牡膃ndpoints)。
func (nh *nodeHandler) add(node *v1.Node) { sc := nh.cache sc.mu.Lock() nodeKey := types.NamespacedName{Namespace: node.Namespace, Name: node.Name} klog.Infof("Adding node %v", nodeKey) sc.nodesMap[nodeKey] = &nodeContainer{ node: node, labels: node.Labels, } // update endpoints changedEps := sc.rebuildEndpointsMap() sc.mu.Unlock() for _, eps := range changedEps { sc.endpointsChan <- eps } } // rebuildEndpointsMap updates all endpoints stored in storageCache.endpointsMap dynamically and constructs relevant modified events func (sc *storageCache) rebuildEndpointsMap() []watch.Event { evts := make([]watch.Event, 0) for name, endpointsContainer := range sc.endpointsMap { newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpointsContainer.endpoints, sc.wrapperInCluster) if apiequality.Semantic.DeepEqual(newEps, endpointsContainer.modified) { continue } sc.endpointsMap[name].modified = newEps evts = append(evts, watch.Event{ Type: watch.Modified, Object: newEps, }) } return evts }
另外,如果 endpoints (拓?fù)涓兄笮薷牡?endpoints)發(fā)生改變,會(huì)構(gòu)建 watch event,傳遞給 endpoints handler (interceptEndpointsRequest)處理。
2、ServiceEventHandler
storageCache.servicesMap 結(jié)構(gòu)體 key 為 service 名稱(namespace/name),value 為 serviceContainer,包含如下數(shù)據(jù):
keys:service topologyKeys 對(duì)于 service 資源的改動(dòng),這里用 Update event 說(shuō)明:
func (sh *serviceHandler) update(service *v1.Service) { sc := sh.cache sc.mu.Lock() serviceKey := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} klog.Infof("Updating service %v", serviceKey) newTopologyKeys := getTopologyKeys(&service.ObjectMeta) serviceContainer, found := sc.servicesMap[serviceKey] if !found { sc.mu.Unlock() klog.Errorf("update non-existed service, %v", serviceKey) return } sc.serviceChan <- watch.Event{ Type: watch.Modified, Object: service, } serviceContainer.svc = service // return directly when topologyKeys of service stay unchanged if reflect.DeepEqual(serviceContainer.keys, newTopologyKeys) { sc.mu.Unlock() return } serviceContainer.keys = newTopologyKeys // update endpoints changedEps := sc.rebuildEndpointsMap() sc.mu.Unlock() for _, eps := range changedEps { sc.endpointsChan <- eps } }
構(gòu)建 service event.Modified event 比較 service topologyKeys 與已經(jīng)存在的是否有差異 如果有差異則更新 topologyKeys,且調(diào)用 rebuildEndpointsMap 刷新該 service 對(duì)應(yīng)的endpoints,如果 endpoints 發(fā)生變化,則構(gòu)建 endpoints watch event,傳遞給 endpoints handler (interceptEndpointsRequest)處理。 3、EndpointsEventHandler
storageCache.endpointsMap 結(jié)構(gòu)體 key 為 endpoints 名稱(namespace/name),value 為 endpointsContainer,包含如下數(shù)據(jù):
endpoints:拓?fù)湫薷那暗?endpoints modified:拓?fù)湫薷暮蟮?endpoints 對(duì)于 endpoints 資源的改動(dòng),這里用 Update event 說(shuō)
明:
func (eh *endpointsHandler) update(endpoints *v1.Endpoints) { sc := eh.cache sc.mu.Lock() endpointsKey := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} klog.Infof("Updating endpoints %v", endpointsKey) endpointsContainer, found := sc.endpointsMap[endpointsKey] if !found { sc.mu.Unlock() klog.Errorf("Updating non-existed endpoints %v", endpointsKey) return } endpointsContainer.endpoints = endpoints newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpoints, sc.wrapperInCluster) changed := !apiequality.Semantic.DeepEqual(endpointsContainer.modified, newEps) if changed { endpointsContainer.modified = newEps } sc.mu.Unlock() if changed { sc.endpointsChan <- watch.Event{ Type: watch.Modified, Object: newEps, } } }
更新 endpointsContainer.endpoint 為新的 endpoints 對(duì)象 調(diào)用 pruneEndpoints 獲取拓?fù)渌⑿潞蟮?endpoints 比較 endpointsContainer.modified 與新刷新后的 endpoints 如果有差異則更新 endpointsContainer.modified,則構(gòu)建 endpoints watch event,傳遞給 endpoints handler (interceptEndpointsRequest)處理。 在分析完 NodeEventHandler,ServiceEventHandler 以及 EndpointsEventHandler 之后,我們回到具體的 http handler List&Watch 處理邏輯上,這里以 endpoints 為例:
func (s *interceptorServer) interceptEndpointsRequest(handler http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet || !strings.HasPrefix(r.URL.Path, "/api/v1/endpoints") { handler.ServeHTTP(w, r) return } queries := r.URL.Query() acceptType := r.Header.Get("Accept") info, found := s.parseAccept(acceptType, s.mediaSerializer) if !found { klog.Errorf("can't find %s serializer", acceptType) w.WriteHeader(http.StatusBadRequest) return } encoder := scheme.Codecs.EncoderForVersion(info.Serializer, v1.SchemeGroupVersion) // list request if queries.Get("watch") == "" { w.Header().Set("Content-Type", info.MediaType) allEndpoints := s.cache.GetEndpoints() epsItems := make([]v1.Endpoints, 0, len(allEndpoints)) for _, eps := range allEndpoints { epsItems = append(epsItems, *eps) } epsList := &v1.EndpointsList{ Items: epsItems, } err := encoder.Encode(epsList, w) if err != nil { klog.Errorf("can't marshal endpoints list, %v", err) w.WriteHeader(http.StatusInternalServerError) return } return } // watch request timeoutSecondsStr := r.URL.Query().Get("timeoutSeconds") timeout := time.Minute if timeoutSecondsStr != "" { timeout, _ = time.ParseDuration(fmt.Sprintf("%ss", timeoutSecondsStr)) } timer := time.NewTimer(timeout) defer timer.Stop() flusher, ok := w.(http.Flusher) if !ok { klog.Errorf("unable to start watch - can't get http.Flusher: %#v", w) w.WriteHeader(http.StatusMethodNotAllowed) return } e := restclientwatch.NewEncoder( streaming.NewEncoder(info.StreamSerializer.Framer.NewFrameWriter(w), scheme.Codecs.EncoderForVersion(info.StreamSerializer, v1.SchemeGroupVersion)), encoder) if info.MediaType == runtime.ContentTypeProtobuf { w.Header().Set("Content-Type", runtime.ContentTypeProtobuf+";stream=watch") } else { w.Header().Set("Content-Type", runtime.ContentTypeJSON) } w.Header().Set("Transfer-Encoding", "chunked") w.WriteHeader(http.StatusOK) flusher.Flush() for { select { case <-r.Context().Done(): return case <-timer.C: return case evt := <-s.endpointsWatchCh: klog.V(4).Infof("Send endpoint watch event: %+#v", evt) err := e.Encode(&evt) if err != nil { klog.Errorf("can't encode watch event, %v", err) return } if len(s.endpointsWatchCh) == 0 { flusher.Flush() } } } }) }
邏輯如下:
func (sc *storageCache) GetEndpoints() []*v1.Endpoints { sc.mu.RLock() defer sc.mu.RUnlock() epList := make([]*v1.Endpoints, 0, len(sc.endpointsMap)) for _, v := range sc.endpointsMap { epList = append(epList, v.modified) } return epList }
總結(jié) SuperEdge service group 利用 application-grid-wrapper 實(shí)現(xiàn)拓?fù)涓兄?,完成了同一個(gè) nodeunit 內(nèi)服務(wù)的閉環(huán)訪問(wèn) service group 實(shí)現(xiàn)的拓?fù)涓兄?Kubernetes 社區(qū)原生實(shí)現(xiàn)對(duì)比,有如下區(qū)別: service group 拓?fù)?key 可以自定義,也即為 gridUniqKey,使用起來(lái)更加靈活;而社區(qū)實(shí)現(xiàn)目前只有三種選擇:"kubernetes.io/hostname","topology.kubernetes.io/zone"以及"topology.kubernetes.io/region" service group 只能填寫一個(gè)拓?fù)?key,也即只能訪問(wèn)本拓?fù)溆騼?nèi)有效的 endpoint,無(wú)法訪問(wèn)其它拓?fù)溆虻?endpoint;而社區(qū)可以通過(guò) topologyKey 列表以及"*"實(shí)現(xiàn)其它備選拓?fù)溆?endpoint的訪問(wèn) ServiceGrid Controller 負(fù)責(zé)根據(jù) ServiceGrid 產(chǎn)生對(duì)應(yīng)的 service(包含由serviceGrid.Spec.GridUniqKey 構(gòu)成的 topologyKeys annotations),邏輯和 DeploymentGrid Controller 整體一致,如下: 創(chuàng)建并維護(hù) service group 需要的若干CRDs(包括:ServiceGrid) 監(jiān)聽 ServiceGrid event,并填充 ServiceGrid到工作隊(duì)列中;循環(huán)從隊(duì)列中取出 ServiceGrid 進(jìn)行解析,創(chuàng)建并且維護(hù)對(duì)應(yīng)的 service 監(jiān)聽 service event,并將相關(guān)的 ServiceGrid 塞到工作隊(duì)列中進(jìn)行上述處理,協(xié)助上述邏輯達(dá)到整體 reconcile 邏輯 為了實(shí)現(xiàn) Kubernetes 零侵入,需要在 kube-proxy 與 apiserver 通信之間添加一層 wrapper,調(diào)用鏈路如下:
kube-proxy -> application-grid-wrapper -> lite-apiserver -> kube-apiserver
application-grid-wrapper 是一個(gè) http server,接受來(lái)自 kube-proxy 的請(qǐng)求,同時(shí)維護(hù)一個(gè)資源緩存,處理函數(shù)由外到內(nèi)依次如下: debug:接受 debug 請(qǐng)求,返回 wrapper pprof 運(yùn)行信息 node:接受 kube-proxy node GET (/api/v1/nodes/{node}) 請(qǐng)求,并返回 node 信息 event:接受 kube-proxy events POST (/events) 請(qǐng)求,并將請(qǐng)求轉(zhuǎn)發(fā)給 lite-apiserver service:接受 kube-proxy service List&Watch (/api/v1/services) 請(qǐng)求,并根據(jù) storageCache 內(nèi)容返回 (GetServices) endpoint:接受 kube-proxy endpoint List&Watch (/api/v1/endpoints) 請(qǐng)求,并根據(jù) storageCache 內(nèi)容返回(GetEndpoints) wrapper 為了實(shí)現(xiàn)拓?fù)涓兄?,維護(hù)了一個(gè)資源 cache,包括:node,service,endpoint,同時(shí)注冊(cè)了相關(guān) event 處理函數(shù)。核心拓?fù)渌惴ㄟ壿嫗椋赫{(diào)用 filterConcernedAddresses 過(guò)濾 endpoint.Subsets Addresses 以及 NotReadyAddresses,只保留同一個(gè) service topologyKeys 中的 endpoint。另外,如果 wrapper 所在邊緣節(jié)點(diǎn)沒有 service topologyKeys 標(biāo)簽,則也無(wú)法訪問(wèn)該service wrapper 接受來(lái)自 kube-proxy 對(duì) endpoints 以及 service 的 List&Watch 請(qǐng)求,以endpoints 為例:如果為L(zhǎng)ist 請(qǐng)求,則調(diào)用 GetEndpoints 獲取拓?fù)湫薷暮蟮?endpoints 列表,并返回;如果為 Watch 請(qǐng)求,則不斷從storageCache.endpointsWatchCh 管道中接受 watch event,并返回。service 邏輯與 endpoints 一致