溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

怎樣實(shí)現(xiàn)kubeproxy源碼分析

發(fā)布時(shí)間:2021-12-28 15:22:30 來源:億速云 閱讀:121 作者:柒染 欄目:云計(jì)算

本篇文章給大家分享的是有關(guān)怎樣實(shí)現(xiàn)kubeproxy源碼分析,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

kubernetes離線安裝包

kube-proxy源碼解析

ipvs相對(duì)于iptables模式具備較高的性能與穩(wěn)定性, 本文講以此模式的源碼解析為主,如果想去了解iptables模式的原理,可以去參考其實(shí)現(xiàn),架構(gòu)上無差別。

kube-proxy主要功能是監(jiān)聽service和endpoint的事件,然后下放代理策略到機(jī)器上。 底層調(diào)用docker/libnetwork, 而libnetwork最終調(diào)用了netlink 與netns來實(shí)現(xiàn)ipvs的創(chuàng)建等動(dòng)作

<!--more-->

初始化配置

代碼入口:cmd/kube-proxy/app/server.go Run() 函數(shù)

通過命令行參數(shù)去初始化proxyServer的配置

proxyServer, err := NewProxyServer(o)
type ProxyServer struct {
    // k8s client
	Client                 clientset.Interface
	EventClient            v1core.EventsGetter

    // ipvs 相關(guān)接口
	IptInterface           utiliptables.Interface
	IpvsInterface          utilipvs.Interface
	IpsetInterface         utilipset.Interface

    // 處理同步時(shí)的處理器
	Proxier                proxy.ProxyProvider

    // 代理模式,ipvs iptables userspace kernelspace(windows)四種
	ProxyMode              string
    // 配置同步周期
	ConfigSyncPeriod       time.Duration

    // service 與 endpoint 事件處理器
	ServiceEventHandler    config.ServiceHandler
	EndpointsEventHandler  config.EndpointsHandler
}

Proxier是主要入口,抽象了兩個(gè)函數(shù):

type ProxyProvider interface {
	// Sync immediately synchronizes the ProxyProvider's current state to iptables.
	Sync()
	// 定期執(zhí)行
	SyncLoop()
}

ipvs 的interface 這個(gè)很重要:

type Interface interface {
	// 刪除所有規(guī)則
	Flush() error
	// 增加一個(gè)virtual server
	AddVirtualServer(*VirtualServer) error

	UpdateVirtualServer(*VirtualServer) error
	DeleteVirtualServer(*VirtualServer) error
	GetVirtualServer(*VirtualServer) (*VirtualServer, error)
	GetVirtualServers() ([]*VirtualServer, error)

    // 給virtual server加個(gè)realserver, 如 VirtualServer就是一個(gè)clusterip realServer就是pod(或者自定義的endpoint)
	AddRealServer(*VirtualServer, *RealServer) error
	GetRealServers(*VirtualServer) ([]*RealServer, error)
	DeleteRealServer(*VirtualServer, *RealServer) error
}

我們?cè)谙挛脑僭敿?xì)看ipvs_linux是如何實(shí)現(xiàn)上面接口的

virtual server與realserver, 最重要的是ip:port,然后就是一些代理的模式如sessionAffinity等:

type VirtualServer struct {
	Address   net.IP
	Protocol  string
	Port      uint16
	Scheduler string
	Flags     ServiceFlags
	Timeout   uint32
}

type RealServer struct {
	Address net.IP
	Port    uint16
	Weight  int
}

創(chuàng)建apiserver client

client, eventClient, err := createClients(config.ClientConnection, master)

創(chuàng)建Proxier 這是僅僅關(guān)注ipvs模式的proxier

else if proxyMode == proxyModeIPVS {
		glog.V(0).Info("Using ipvs Proxier.")
		proxierIPVS, err := ipvs.NewProxier(
			iptInterface,
			ipvsInterface,
			ipsetInterface,
			utilsysctl.New(),
			execer,
			config.IPVS.SyncPeriod.Duration,
			config.IPVS.MinSyncPeriod.Duration,
			config.IPTables.MasqueradeAll,
			int(*config.IPTables.MasqueradeBit),
			config.ClusterCIDR,
			hostname,
			getNodeIP(client, hostname),
			recorder,
			healthzServer,
			config.IPVS.Scheduler,
		)
...
		proxier = proxierIPVS
		serviceEventHandler = proxierIPVS
		endpointsEventHandler = proxierIPVS

這個(gè)Proxier具備以下方法:

   +OnEndpointsAdd(endpoints *api.Endpoints)
   +OnEndpointsDelete(endpoints *api.Endpoints)
   +OnEndpointsSynced()
   +OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints)
   +OnServiceAdd(service *api.Service)
   +OnServiceDelete(service *api.Service)
   +OnServiceSynced()
   +OnServiceUpdate(oldService, service *api.Service)
   +Sync()
   +SyncLoop()

所以ipvs的這個(gè)Proxier實(shí)現(xiàn)了我們需要的絕大部分接口

小結(jié)一下:

     +-----------> endpointHandler
     |
     +-----------> serviceHandler
     |                ^
     |                | +-------------> sync 定期同步等
     |                | |
ProxyServer---------> Proxier --------> service 事件回調(diào)           
     |                  |                                                
     |                  +-------------> endpoint事件回調(diào)          
     |                                             |  觸發(fā)
     +-----> ipvs interface ipvs handler     <-----+

啟動(dòng)proxyServer

  1. 檢查是不是帶了clean up參數(shù),如果帶了那么清除所有規(guī)則退出

  2. OOM adjuster貌似沒實(shí)現(xiàn),忽略

  3. resouceContainer也沒實(shí)現(xiàn),忽略

  4. 啟動(dòng)metrics服務(wù)器,這個(gè)挺重要,比如我們想監(jiān)控時(shí)可以傳入這個(gè)參數(shù), 包含promethus的 metrics. metrics-bind-address參數(shù)

  5. 啟動(dòng)informer, 開始監(jiān)聽事件,分別啟動(dòng)協(xié)程處理。

1 2 3 4我們都不用太關(guān)注,細(xì)看5即可:

informerFactory := informers.NewSharedInformerFactory(s.Client, s.ConfigSyncPeriod)

serviceConfig := config.NewServiceConfig(informerFactory.Core().InternalVersion().Services(), s.ConfigSyncPeriod)
// 注冊(cè) service handler并啟動(dòng)
serviceConfig.RegisterEventHandler(s.ServiceEventHandler)
// 這里面僅僅是把ServiceEventHandler賦值給informer回調(diào) 
go serviceConfig.Run(wait.NeverStop)

endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), s.ConfigSyncPeriod)
// 注冊(cè)endpoint 
endpointsConfig.RegisterEventHandler(s.EndpointsEventHandler)
go endpointsConfig.Run(wait.NeverStop)

go informerFactory.Start(wait.NeverStop)

serviceConfig.Run與endpointConfig.Run僅僅是給回調(diào)函數(shù)賦值, 所以注冊(cè)的handler就給了informer, informer監(jiān)聽到事件時(shí)就會(huì)回調(diào):

for i := range c.eventHandlers {
	glog.V(3).Infof("Calling handler.OnServiceSynced()")
	c.eventHandlers[i].OnServiceSynced()
}

那么問題來了,注冊(cè)進(jìn)去的這個(gè)handler是啥? 回顧一下上文的

		serviceEventHandler = proxierIPVS
		endpointsEventHandler = proxierIPVS

所以都是這個(gè)proxierIPVS

handler的回調(diào)函數(shù), informer會(huì)回調(diào)這幾個(gè)函數(shù),所以我們?cè)谧约洪_發(fā)時(shí)實(shí)現(xiàn)這個(gè)interface注冊(cè)進(jìn)去即可:

type ServiceHandler interface {
	// OnServiceAdd is called whenever creation of new service object
	// is observed.
	OnServiceAdd(service *api.Service)
	// OnServiceUpdate is called whenever modification of an existing
	// service object is observed.
	OnServiceUpdate(oldService, service *api.Service)
	// OnServiceDelete is called whenever deletion of an existing service
	// object is observed.
	OnServiceDelete(service *api.Service)
	// OnServiceSynced is called once all the initial even handlers were
	// called and the state is fully propagated to local cache.
	OnServiceSynced()
}

開始監(jiān)聽

go informerFactory.Start(wait.NeverStop)

這里執(zhí)行后,我們創(chuàng)建刪除service endpoint等動(dòng)作都會(huì)被監(jiān)聽到,然后回調(diào),回顧一下上面的圖,最終都是由Proxier去實(shí)現(xiàn),所以后面我們重點(diǎn)關(guān)注Proxier即可

s.Proxier.SyncLoop()

然后開始SyncLoop,下文開講

Proxier 實(shí)現(xiàn)

我們創(chuàng)建一個(gè)service時(shí)OnServiceAdd方法會(huì)被調(diào)用, 這里記錄一下之前的狀態(tài)與當(dāng)前狀態(tài)兩個(gè)東西,然后發(fā)個(gè)信號(hào)給syncRunner讓它去處理:

func (proxier *Proxier) OnServiceAdd(service *api.Service) {
	namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
	if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() {
		proxier.syncRunner.Run()
	}
}

記錄service 信息,可以看到?jīng)]做什么事,就是把service存在map里, 如果沒變直接刪掉map信息不做任何處理:

change, exists := scm.items[*namespacedName]
if !exists {
	change = &serviceChange{}
    // 老的service信息
	change.previous = serviceToServiceMap(previous)
	scm.items[*namespacedName] = change
}
// 當(dāng)前監(jiān)聽到的service信息
change.current = serviceToServiceMap(current)

如果一樣,直接刪除
if reflect.DeepEqual(change.previous, change.current) {
	delete(scm.items, *namespacedName)
}

proxier.syncRunner.Run() 里面就發(fā)送了一個(gè)信號(hào)

select {
case bfr.run <- struct{}{}:
default:
}

這里面處理了這個(gè)信號(hào)

s.Proxier.SyncLoop()

func (proxier *Proxier) SyncLoop() {
	// Update healthz timestamp at beginning in case Sync() never succeeds.
	if proxier.healthzServer != nil {
		proxier.healthzServer.UpdateTimestamp()
	}
	proxier.syncRunner.Loop(wait.NeverStop)
}

runner里收到信號(hào)執(zhí)行,沒收到信號(hào)會(huì)定期執(zhí)行:

func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
	glog.V(3).Infof("%s Loop running", bfr.name)
	bfr.timer.Reset(bfr.maxInterval)
	for {
		select {
		case <-stop:
			bfr.stop()
			glog.V(3).Infof("%s Loop stopping", bfr.name)
			return
		case <-bfr.timer.C():  // 定期執(zhí)行
			bfr.tryRun()
		case <-bfr.run:
			bfr.tryRun()       // 收到事件信號(hào)執(zhí)行
		}
	}
}

這個(gè)bfr runner里我們最需要主意的是一個(gè)回調(diào)函數(shù),tryRun里檢查這個(gè)回調(diào)是否滿足被調(diào)度的條件:

type BoundedFrequencyRunner struct {
	name        string        // the name of this instance
	minInterval time.Duration // the min time between runs, modulo bursts
	maxInterval time.Duration // the max time between runs

	run chan struct{} // try an async run

	mu      sync.Mutex  // guards runs of fn and all mutations
	fn      func()      // function to run, 這個(gè)回調(diào)
	lastRun time.Time   // time of last run
	timer   timer       // timer for deferred runs
	limiter rateLimiter // rate limiter for on-demand runs
}

// 傳入的proxier.syncProxyRules這個(gè)函數(shù)
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)

這是個(gè)600行左右的搓逼函數(shù),也是處理主要邏輯的地方。

syncProxyRules

  1. 設(shè)置一些iptables規(guī)則,如mark與comment

  2. 確定機(jī)器上有網(wǎng)卡,ipvs需要綁定地址到上面

  3. 確定有ipset,ipset是iptables的擴(kuò)展,可以給一批地址設(shè)置iptables規(guī)則 ...(又臭又長,重復(fù)代碼多,看不下去了,細(xì)節(jié)問題自己去看吧)

  4. 我們最關(guān)注的,如何去處理VirtualServer的

serv := &utilipvs.VirtualServer{
	Address:   net.ParseIP(ingress.IP),
	Port:      uint16(svcInfo.port),
	Protocol:  string(svcInfo.protocol),
	Scheduler: proxier.ipvsScheduler,
}
if err := proxier.syncService(svcNameString, serv, false); err == nil {
	if err := proxier.syncEndpoint(svcName, svcInfo.onlyNodeLocalEndpoints, serv); err != nil {
	}
}

看下實(shí)現(xiàn), 如果沒有就創(chuàng)建,如果已存在就更新, 給網(wǎng)卡綁定service的cluster ip:

func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, bindAddr bool) error {
	appliedVirtualServer, _ := proxier.ipvs.GetVirtualServer(vs)
	if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) {
		if appliedVirtualServer == nil {
			if err := proxier.ipvs.AddVirtualServer(vs); err != nil {
				return err
			}
		} else {
			if err := proxier.ipvs.UpdateVirtualServer(appliedVirtualServer); err != nil {
				return err
			}
		}
	}

	// bind service address to dummy interface even if service not changed,
	// in case that service IP was removed by other processes
	if bindAddr {
		_, err := proxier.netlinkHandle.EnsureAddressBind(vs.Address.String(), DefaultDummyDevice)
		if err != nil {
			return err
		}
	}
	return nil
}

創(chuàng)建service實(shí)現(xiàn)

現(xiàn)在可以去看ipvs的AddVirtualServer的實(shí)現(xiàn)了,主要是利用socket與內(nèi)核進(jìn)程通信做到的。 pkg/util/ipvs/ipvs_linux.go 里 runner結(jié)構(gòu)體實(shí)現(xiàn)了這些方法, 這里用到了 docker/libnetwork/ipvs庫:

// runner implements Interface.
type runner struct {
	exec       utilexec.Interface
	ipvsHandle *ipvs.Handle
}

// New returns a new Interface which will call ipvs APIs.
func New(exec utilexec.Interface) Interface {
	ihandle, err := ipvs.New("") // github.com/docker/libnetwork/ipvs
	if err != nil {
		glog.Errorf("IPVS interface can't be initialized, error: %v", err)
		return nil
	}
	return &runner{
		exec:       exec,
		ipvsHandle: ihandle,
	}
}

New的時(shí)候創(chuàng)建了一個(gè)特殊的socket, 這里與我們普通的socket編程無差別,關(guān)鍵是syscall.AF_NETLINK這個(gè)參數(shù),代表與內(nèi)核進(jìn)程通信:

sock, err := nl.GetNetlinkSocketAt(n, netns.None(), syscall.NETLINK_GENERIC)

func getNetlinkSocket(protocol int) (*NetlinkSocket, error) {
	fd, err := syscall.Socket(syscall.AF_NETLINK, syscall.SOCK_RAW|syscall.SOCK_CLOEXEC, protocol)
	if err != nil {
		return nil, err
	}
	s := &NetlinkSocket{
		fd: int32(fd),
	}
	s.lsa.Family = syscall.AF_NETLINK
	if err := syscall.Bind(fd, &s.lsa); err != nil {
		syscall.Close(fd)
		return nil, err
	}

	return s, nil
}

創(chuàng)建一個(gè)service, 轉(zhuǎn)換成docker service格式,直接調(diào)用:

// AddVirtualServer is part of Interface.
func (runner *runner) AddVirtualServer(vs *VirtualServer) error {
	eSvc, err := toBackendService(vs)
	if err != nil {
		return err
	}
	return runner.ipvsHandle.NewService(eSvc)
}

然后就是把service信息打包,往socket里面寫即可:

func (i *Handle) doCmdwithResponse(s *Service, d *Destination, cmd uint8) ([][]byte, error) {
	req := newIPVSRequest(cmd)
	req.Seq = atomic.AddUint32(&i.seq, 1)

	if s == nil {
		req.Flags |= syscall.NLM_F_DUMP                    //Flag to dump all messages
		req.AddData(nl.NewRtAttr(ipvsCmdAttrService, nil)) //Add a dummy attribute
	} else {
		req.AddData(fillService(s))
	} // 把service塞到請(qǐng)求中

	if d == nil {
		if cmd == ipvsCmdGetDest {
			req.Flags |= syscall.NLM_F_DUMP
		}

	} else {
		req.AddData(fillDestinaton(d))
	}

    // 給內(nèi)核進(jìn)程發(fā)送service信息
	res, err := execute(i.sock, req, 0)
	if err != nil {
		return [][]byte{}, err
	}

	return res, nil
}

構(gòu)造請(qǐng)求

func newIPVSRequest(cmd uint8) *nl.NetlinkRequest {
	return newGenlRequest(ipvsFamily, cmd)
}

在構(gòu)造請(qǐng)求時(shí)傳入的是ipvs協(xié)議簇

然后構(gòu)造一個(gè)與內(nèi)核通信的消息頭

func NewNetlinkRequest(proto, flags int) *NetlinkRequest {
	return &NetlinkRequest{
		NlMsghdr: syscall.NlMsghdr{
			Len:   uint32(syscall.SizeofNlMsghdr),
			Type:  uint16(proto),
			Flags: syscall.NLM_F_REQUEST | uint16(flags),
			Seq:   atomic.AddUint32(&nextSeqNr, 1),
		},
	}
}

給消息加Data,這個(gè)Data是個(gè)數(shù)組,需要實(shí)現(xiàn)兩個(gè)方法:

type NetlinkRequestData interface {
	Len() int  // 長度
	Serialize() []byte // 序列化, 內(nèi)核通信也需要一定的數(shù)據(jù)格式,service信息也需要實(shí)現(xiàn)
}

比如 header是這樣序列化的, 一看愣住了,思考好久才看懂: 拆下看: ([unsafe.Sizeof(hdr)]byte) 一個(gè)[]byte類型,長度就是結(jié)構(gòu)體大小 (unsafe.Pointer(hdr))把結(jié)構(gòu)體轉(zhuǎn)成byte指針類型 加個(gè)取它的值 用[:]轉(zhuǎn)成byte返回

func (hdr *genlMsgHdr) Serialize() []byte {
	return (*(*[unsafe.Sizeof(*hdr)]byte)(unsafe.Pointer(hdr)))[:]
}

發(fā)送service信息給內(nèi)核

一個(gè)很普通的socket發(fā)送接收數(shù)據(jù)

func execute(s *nl.NetlinkSocket, req *nl.NetlinkRequest, resType uint16) ([][]byte, error) {
	var (
		err error
	)

	if err := s.Send(req); err != nil {
		return nil, err
	}

	pid, err := s.GetPid()
	if err != nil {
		return nil, err
	}

	var res [][]byte

done:
	for {
		msgs, err := s.Receive()
		if err != nil {
			return nil, err
		}
		for _, m := range msgs {
			if m.Header.Seq != req.Seq {
				continue
			}
			if m.Header.Pid != pid {
				return nil, fmt.Errorf("Wrong pid %d, expected %d", m.Header.Pid, pid)
			}
			if m.Header.Type == syscall.NLMSG_DONE {
				break done
			}
			if m.Header.Type == syscall.NLMSG_ERROR {
				error := int32(native.Uint32(m.Data[0:4]))
				if error == 0 {
					break done
				}
				return nil, syscall.Errno(-error)
			}
			if resType != 0 && m.Header.Type != resType {
				continue
			}
			res = append(res, m.Data)
			if m.Header.Flags&syscall.NLM_F_MULTI == 0 {
				break done
			}
		}
	}
	return res, nil
}

Service 數(shù)據(jù)打包 這里比較細(xì),核心思想就是內(nèi)核只認(rèn)一定格式的標(biāo)準(zhǔn)數(shù)據(jù),我們把service信息按其標(biāo)準(zhǔn)打包發(fā)送給內(nèi)核即可。 至于怎么打包的就不詳細(xì)講了。

func fillService(s *Service) nl.NetlinkRequestData {
	cmdAttr := nl.NewRtAttr(ipvsCmdAttrService, nil)
	nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrAddressFamily, nl.Uint16Attr(s.AddressFamily))
	if s.FWMark != 0 {
		nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrFWMark, nl.Uint32Attr(s.FWMark))
	} else {
		nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrProtocol, nl.Uint16Attr(s.Protocol))
		nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrAddress, rawIPData(s.Address))

		// Port needs to be in network byte order.
		portBuf := new(bytes.Buffer)
		binary.Write(portBuf, binary.BigEndian, s.Port)
		nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrPort, portBuf.Bytes())
	}

	nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrSchedName, nl.ZeroTerminated(s.SchedName))
	if s.PEName != "" {
		nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrPEName, nl.ZeroTerminated(s.PEName))
	}
	f := &ipvsFlags{
		flags: s.Flags,
		mask:  0xFFFFFFFF,
	}
	nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrFlags, f.Serialize())
	nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrTimeout, nl.Uint32Attr(s.Timeout))
	nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrNetmask, nl.Uint32Attr(s.Netmask))
	return cmdAttr
}

Service總體來講代碼比較簡(jiǎn)單,但是覺得有些地方實(shí)現(xiàn)的有點(diǎn)繞,不夠簡(jiǎn)單直接。 總體來說就是監(jiān)聽apiserver事件,然后比對(duì) 處理,定期也會(huì)去執(zhí)行同步策略。

以上就是怎樣實(shí)現(xiàn)kubeproxy源碼分析,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI