溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Device Manager在什么時(shí)候創(chuàng)建

發(fā)布時(shí)間:2021-12-20 09:49:45 來源:億速云 閱讀:152 作者:iii 欄目:云計(jì)算

這篇文章主要介紹“Device Manager在什么時(shí)候創(chuàng)建”,在日常操作中,相信很多人在Device Manager在什么時(shí)候創(chuàng)建問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對(duì)大家解答”Device Manager在什么時(shí)候創(chuàng)建”的疑惑有所幫助!接下來,請(qǐng)跟著小編一起來學(xué)習(xí)吧!

Create Device Manager Instance

Device Manager在何時(shí)創(chuàng)建

Device Manager和Volume Manager、QoS Container Manager等一樣,都屬于kubelet管理的眾多Manager之一。Device Manager在kubelet啟動(dòng)時(shí)的NewContainerManager中創(chuàng)建。

pkg/kubelet/cm/container_manager_linux.go:197

func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, devicePluginEnabled bool, recorder record.EventRecorder) (ContainerManager, error) {
	
	...

	glog.Infof("Creating device plugin manager: %t", devicePluginEnabled)
	if devicePluginEnabled {
		cm.deviceManager, err = devicemanager.NewManagerImpl()
	} else {
		cm.deviceManager, err = devicemanager.NewManagerStub()
	}
	if err != nil {
		return nil, err
	}
	...
}	

ManagerImpl結(jié)構(gòu)體

我們有必要先了解Device Manager的結(jié)構(gòu)體:

// ManagerImpl is the structure in charge of managing Device Plugins.
type ManagerImpl struct {
	socketname string
	socketdir  string

	endpoints map[string]endpoint // Key is ResourceName
	mutex     sync.Mutex

	server *grpc.Server

	// activePods is a method for listing active pods on the node
	// so the amount of pluginResources requested by existing pods
	// could be counted when updating allocated devices
	activePods ActivePodsFunc

	// sourcesReady provides the readiness of kubelet configuration sources such as apiserver update readiness.
	// We use it to determine when we can purge inactive pods from checkpointed state.
	sourcesReady config.SourcesReady

	// callback is used for updating devices' states in one time call.
	// e.g. a new device is advertised, two old devices are deleted and a running device fails.
	callback monitorCallback

	// healthyDevices contains all of the registered healthy resourceNames and their exported device IDs.
	healthyDevices map[string]sets.String

	// unhealthyDevices contains all of the unhealthy devices and their exported device IDs.
	unhealthyDevices map[string]sets.String

	// allocatedDevices contains allocated deviceIds, keyed by resourceName.
	allocatedDevices map[string]sets.String

	// podDevices contains pod to allocated device mapping.
	podDevices podDevices
	store      utilstore.Store
	pluginOpts map[string]*pluginapi.DevicePluginOptions
}

下面是核心field的說明:

  • socketname: 就是kubelet對(duì)外暴露的socket名,即 kubelet.sock。

  • socketdir: device plugins' socket的存放的目錄,/var/lib/kubelet/device-plugins/。

  • endpoints: map對(duì)象,key為Resource Name,value為endpoint接口(包括run,stop,allocate,preStartContainer,getDevices,callback,isStoped,StopGracePeriodExpired),每個(gè)endpoint接口對(duì)應(yīng)一個(gè)已注冊(cè)的device plugin,負(fù)責(zé)與device plugin的gRPC通信及緩存device plugin反饋的device states。

  • server: Register服務(wù)暴露的gRPC Server。

  • activePods: 用來獲取該節(jié)點(diǎn)上所有active pods,即non-Terminated狀態(tài)的Pods。在kubelet的initializeRuntimeDependentModules時(shí)會(huì)注冊(cè)activePods Func為如下函數(shù):

    	// GetActivePods returns non-terminal pods
    	func (kl *Kubelet) GetActivePods() []*v1.Pod {
    		allPods := kl.podManager.GetPods()
    		activePods := kl.filterOutTerminatedPods(allPods)
    		return activePods
    	}


  • callback: 是kubelet收到device plugin的ListAndWatch gRCP stream中有devices state變更時(shí)的回調(diào)函數(shù),包括有新設(shè)備增加、舊設(shè)備刪除、設(shè)備狀態(tài)變化,所以通過ListAndWatch接口的回調(diào)方式,可以實(shí)現(xiàn)設(shè)備的自動(dòng)發(fā)現(xiàn)和熱插拔。

    	type monitorCallback func(resourceName string, added, updated, deleted []pluginapi.Device)


  • healthyDevices: map對(duì)象,key為Resource Name,value為對(duì)應(yīng)的健康的device IDs。

  • unhealthyDevices: map對(duì)象,key為Resource Name,value為對(duì)應(yīng)的不健康的device IDs。

  • allocatedDevices: map對(duì)象,key為Resource Name,value為已經(jīng)分配出去的device IDs。

  • podDevices: 記錄每個(gè)pod中每個(gè)容器的device分配情況。

    	// ContainerAllocateResponse為容器內(nèi)某個(gè)device對(duì)應(yīng)的分配信息,包括注入的環(huán)境變量、掛載信息、Annotations。
    	type ContainerAllocateResponse struct {
    		Envs map[string]string 
    		Mounts []*Mount 
    		Devices []*DeviceSpec 
    		Annotations map[string]string 
    	}
    
    	// deviceAllocateInfo
    	type deviceAllocateInfo struct {
    		deviceIds sets.String
    		allocResp *pluginapi.ContainerAllocateResponse
    	}
    
    	type resourceAllocateInfo map[string]deviceAllocateInfo // Keyed by resourceName.
    	type containerDevices map[string]resourceAllocateInfo   // Keyed by containerName.
    	type podDevices map[string]containerDevices             // Keyed by podUID.


  • store: 是對(duì)checkpointData的文件存儲(chǔ)(/var/lib/kubelet/device-plugins/kubelet_internal_checkpoint),具體存儲(chǔ)了每個(gè)Pod分配的Devices信息PodDeviceEntries, 以及已經(jīng)注冊(cè)的Resource Name及對(duì)應(yīng)的Devices IDs。

    	type checkpointData struct {
    		PodDeviceEntries  []podDevicesCheckpointEntry
    		RegisteredDevices map[string][]string // key為Resource Name,value為DeviceIDs
    	}
    
    	type podDevicesCheckpointEntry struct {
    		PodUID        string
    		ContainerName string
    		ResourceName  string
    		DeviceIDs     []string
    		AllocResp     []byte
    	}


    Device Manager在什么時(shí)候創(chuàng)建

  • pluginOpts: map對(duì)象,key為Resource Name,value為DevicePluginOptions,目前只有一項(xiàng)內(nèi)容,就是PreStartRequired bool,表示是否在容器啟動(dòng)前要調(diào)用device plugin的PreStartContiner接口。在nvidia-k8s-plugin中,PreStartContainer為空實(shí)現(xiàn)。

NewManagerImpl

我們?cè)賮砜纯碊evice Manager的具體創(chuàng)建實(shí)現(xiàn)NewManagerImpl。

pkg/kubelet/cm/devicemanager/manager.go:97

// NewManagerImpl creates a new manager.
func NewManagerImpl() (*ManagerImpl, error) {

	// 通過/var/lib/kubelet/device-plugins/kubelet.sock與device plugin交互
	return newManagerImpl(pluginapi.KubeletSocket)
}

func newManagerImpl(socketPath string) (*ManagerImpl, error) {
	glog.V(2).Infof("Creating Device Plugin manager at %s", socketPath)

	if socketPath == "" || !filepath.IsAbs(socketPath) {
		return nil, fmt.Errorf(errBadSocket+" %v", socketPath)
	}

	dir, file := filepath.Split(socketPath)
	manager := &ManagerImpl{
		endpoints:        make(map[string]endpoint),
		socketname:       file,
		socketdir:        dir,
		healthyDevices:   make(map[string]sets.String),
		unhealthyDevices: make(map[string]sets.String),
		allocatedDevices: make(map[string]sets.String),
		pluginOpts:       make(map[string]*pluginapi.DevicePluginOptions),
		podDevices:       make(podDevices),
	}
	manager.callback = manager.genericDeviceUpdateCallback

	// The following structs are populated with real implementations in manager.Start()
	// Before that, initializes them to perform no-op operations.
	manager.activePods = func() []*v1.Pod { return []*v1.Pod{} }
	manager.sourcesReady = &sourcesReadyStub{}
	var err error
	
	// 在/var/lib/kubelet/device-plugins/目錄下創(chuàng)建file store類型的key-value存儲(chǔ)文件kubelet_internal_checkpoint,用來作為kubelet的device plugin的checkpoint。
	manager.store, err = utilstore.NewFileStore(dir, utilfs.DefaultFs{})
	if err != nil {
		return nil, fmt.Errorf("failed to initialize device plugin checkpointing store: %+v", err)
	}

	return manager, nil
}
  • kubelet Device Manager通過/var/lib/kubelet/device-plugins/kubelet.sock與device plugin交互。

  • 注冊(cè)callback為genericDeviceUpdateCallback,用來處理對(duì)應(yīng)devices的add,delete,update事件。

  • /var/lib/kubelet/device-plugins/目錄下創(chuàng)建file store類型的key-value存儲(chǔ)文件kubelet_internal_checkpoint,用來作為kubelet的device plugin的checkpoint。

    • 當(dāng)監(jiān)聽到devices add/delete/update事件發(fā)生時(shí),會(huì)更新到kubelet_internal_checkpoint文件中。

    • 當(dāng)device plugin的stop time超過grace period time(代碼寫死為5min,不可配置),會(huì)從checkpoint中刪除對(duì)應(yīng)的devices。在這個(gè)時(shí)間范圍內(nèi),Device Manager會(huì)繼續(xù)緩存該endpoint及對(duì)應(yīng)的devices。

    • 為Container Allocate Devices后,也會(huì)將PodDevices更新到checkpoint中。

我們來看看callback的實(shí)現(xiàn)genericDeviceUpdateCallback的實(shí)現(xiàn),了解Device Manager是如何處理devices的add/delete/update消息的。

pkg/kubelet/cm/devicemanager/manager.go:134

func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, added, updated, deleted []pluginapi.Device) {
	kept := append(updated, added...)
	m.mutex.Lock()
	if _, ok := m.healthyDevices[resourceName]; !ok {
		m.healthyDevices[resourceName] = sets.NewString()
	}
	if _, ok := m.unhealthyDevices[resourceName]; !ok {
		m.unhealthyDevices[resourceName] = sets.NewString()
	}
	for _, dev := range kept {
		if dev.Health == pluginapi.Healthy {
			m.healthyDevices[resourceName].Insert(dev.ID)
			m.unhealthyDevices[resourceName].Delete(dev.ID)
		} else {
			m.unhealthyDevices[resourceName].Insert(dev.ID)
			m.healthyDevices[resourceName].Delete(dev.ID)
		}
	}
	for _, dev := range deleted {
		m.healthyDevices[resourceName].Delete(dev.ID)
		m.unhealthyDevices[resourceName].Delete(dev.ID)
	}
	m.mutex.Unlock()
	m.writeCheckpoint()
}
  • 將callback中收到的devices狀態(tài)是Healthy,那么將device ID插入到ManagerImpl中healthDevices中,并從unhealthyDevices中刪除。

  • 將callback中收到的devices狀態(tài)是Unhealthy,那么將device ID插入到ManagerImpl中unhealthDevices中,并從healthyDevices中刪除。

  • 將device plugin反饋的需要delete的devices從healthDevices和unhealthDevices中一并刪除。

  • 將ManagerImpl中的數(shù)據(jù)更新到checkpoint文件中。

Device Manager的啟動(dòng)

前面把Device Manager的創(chuàng)建流程分析了一下,還涉及到checkpoint和callback的分析。接下來,我們繼續(xù)對(duì)Device Manager的Start流程進(jìn)行分析。

Start Device Manager

Device Manager是在containerManagerImpl的Start時(shí)啟動(dòng)的。

pkg/kubelet/cm/container_manager_linux.go:527

func (cm *containerManagerImpl) Start(node *v1.Node,
	activePods ActivePodsFunc,
	sourcesReady config.SourcesReady,
	podStatusProvider status.PodStatusProvider,
	runtimeService internalapi.RuntimeService) error {

	...
	
	// Starts device manager.
	if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady); err != nil {
		return err
	}

	return nil
}
  • deviceManager.Start的第一個(gè)參數(shù)是獲取該節(jié)點(diǎn)的active(non-terminated)Pods的函數(shù)。

  • SourcesReady是用來跟蹤kubelet配置的Pod Sources,這些Sources包括:

    • file: 通過static file創(chuàng)建靜態(tài)Pods。

    • http: 通過http接口來獲取Pods信息。

    • api: 從Kubernetes API Server獲取Pods信息,是Kubernetes默認(rèn)的內(nèi)部機(jī)制。

    • *: 表示包含以上全部的Sources類型。

ManagerIml Start

ManagerIml.Start負(fù)責(zé)啟動(dòng)Device Manager,對(duì)外提供gRPC服務(wù)。

pkg/kubelet/cm/devicemanager/manager.go:204

// Start starts the Device Plugin Manager amd start initialization of
// podDevices and allocatedDevices information from checkpoint-ed state and
// starts device plugin registration service.
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {

	m.activePods = activePods
	m.sourcesReady = sourcesReady

	// Loads in allocatedDevices information from disk.
	err := m.readCheckpoint()
	...

	socketPath := filepath.Join(m.socketdir, m.socketname)
	os.MkdirAll(m.socketdir, 0755)

	// Removes all stale sockets in m.socketdir. Device plugins can monitor
	// this and use it as a signal to re-register with the new Kubelet.
	if err := m.removeContents(m.socketdir); err != nil {
		glog.Errorf("Fail to clean up stale contents under %s: %+v", m.socketdir, err)
	}

	s, err := net.Listen("unix", socketPath)
	if err != nil {
		glog.Errorf(errListenSocket+" %+v", err)
		return err
	}

	m.server = grpc.NewServer([]grpc.ServerOption{}...)

	pluginapi.RegisterRegistrationServer(m.server, m)
	go m.server.Serve(s)

	glog.V(2).Infof("Serving device plugin registration server on %q", socketPath)

	return nil
}
  • 首先讀取checkpoint file中數(shù)據(jù),恢復(fù)ManagerImpl的相關(guān)數(shù)據(jù),包括:

    • podDevices;

    • allocatedDevices;

    • healthyDevices;

    • unhealthyDevices;

    • endpoints,注意這里會(huì)將endpoint的stop time設(shè)置為當(dāng)前時(shí)間,意味著kubelet restart后,需要等待device plugin進(jìn)行re-register后,才認(rèn)為這些resource是可用的。

  • 然后將/var/lib/kubelet/device-plugins/下面的所有文件清空,當(dāng)然checkpiont文件除外,也就是清空所有的socket文件,包括自己的kubelet.sock,以及其他所有之前的device plugin的socket文件。device plugin會(huì)監(jiān)控kubelet.sock文件是否被刪除,如果刪除,則會(huì)觸發(fā)自己的向kubelet重新注冊(cè)自己。

  • 創(chuàng)建kubelet.sock并啟動(dòng)gRPC Server對(duì)外提供gRPC服務(wù),目前只注冊(cè)了Register服務(wù),用于Device plugin調(diào)用進(jìn)行插件注冊(cè)。

Register服務(wù)

我們就來看看kubelet Device Manager對(duì)外提供的唯一gRPC接口Register。

Register

pkg/kubelet/cm/devicemanager/manager.go:289

// Register registers a device plugin.
func (m *ManagerImpl) Register(ctx context.Context, r *pluginapi.RegisterRequest) (*pluginapi.Empty, error) {
	glog.Infof("Got registration request from device plugin with resource name %q", r.ResourceName)
	metrics.DevicePluginRegistrationCount.WithLabelValues(r.ResourceName).Inc()
	var versionCompatible bool
	for _, v := range pluginapi.SupportedVersions {
		if r.Version == v {
			versionCompatible = true
			break
		}
	}
	if !versionCompatible {
		errorString := fmt.Sprintf(errUnsupportedVersion, r.Version, pluginapi.SupportedVersions)
		glog.Infof("Bad registration request from device plugin with resource name %q: %v", r.ResourceName, errorString)
		return &pluginapi.Empty{}, fmt.Errorf(errorString)
	}

	if !v1helper.IsExtendedResourceName(v1.ResourceName(r.ResourceName)) {
		errorString := fmt.Sprintf(errInvalidResourceName, r.ResourceName)
		glog.Infof("Bad registration request from device plugin: %v", errorString)
		return &pluginapi.Empty{}, fmt.Errorf(errorString)
	}

	// TODO: for now, always accepts newest device plugin. Later may consider to
	// add some policies here, e.g., verify whether an old device plugin with the
	// same resource name is still alive to determine whether we want to accept
	// the new registration.
	go m.addEndpoint(r)

	return &pluginapi.Empty{}, nil
}
  • 注冊(cè)請(qǐng)求是device plugin向kubelet發(fā)送的,注冊(cè)請(qǐng)求RegisterRequest為:

    	type RegisterRequest struct {
    		Version string  // Kubernetes 1.10對(duì)應(yīng)的device plugin api version為v1beta1
    		Endpoint string // device plugin對(duì)應(yīng)的socket name
    		ResourceName string 
    		Options *DevicePluginOptions 
    	}


  • 這里會(huì)檢查注冊(cè)的Resource Name是否符合Extended Resource的規(guī)則:

    • Resource Name不能屬于kubernetes.io,得有自己的domain,比如nvidia.com。

    • Resource Name中不能包含requests.前綴。

    • 對(duì)應(yīng)的Resource value只能是整數(shù)值。

  • 調(diào)用addEndpoint進(jìn)行插件注冊(cè)。

addEndpoint進(jìn)行device plugin注冊(cè)

從上面Register的方法中可見,真正插件注冊(cè)的邏輯是在addEndpoint中實(shí)現(xiàn)的。

pkg/kubelet/cm/devicemanager/manager.go:332

func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
	existingDevs := make(map[string]pluginapi.Device)
	m.mutex.Lock()
	old, ok := m.endpoints[r.ResourceName]
	if ok && old != nil {
		// Pass devices of previous endpoint into re-registered one,
		// to avoid potential orphaned devices upon re-registration
		devices := make(map[string]pluginapi.Device)
		for _, device := range old.getDevices() {
			devices[device.ID] = device
		}
		existingDevs = devices
	}
	m.mutex.Unlock()

	socketPath := filepath.Join(m.socketdir, r.Endpoint)
	e, err := newEndpointImpl(socketPath, r.ResourceName, existingDevs, m.callback)
	if err != nil {
		glog.Errorf("Failed to dial device plugin with request %v: %v", r, err)
		return
	}
	m.mutex.Lock()
	if r.Options != nil {
		m.pluginOpts[r.ResourceName] = r.Options
	}
	// Check for potential re-registration during the initialization of new endpoint,
	// and skip updating if re-registration happens.
	// TODO: simplify the part once we have a better way to handle registered devices
	ext := m.endpoints[r.ResourceName]
	if ext != old {
		glog.Warningf("Some other endpoint %v is added while endpoint %v is initialized", ext, e)
		m.mutex.Unlock()
		e.stop()
		return
	}
	// Associates the newly created endpoint with the corresponding resource name.
	// Stops existing endpoint if there is any.
	m.endpoints[r.ResourceName] = e
	glog.V(2).Infof("Registered endpoint %v", e)
	m.mutex.Unlock()

	if old != nil {
		old.stop()
	}

	go func() {
		e.run()
		e.stop()
		m.mutex.Lock()
		if old, ok := m.endpoints[r.ResourceName]; ok && old == e {
			m.markResourceUnhealthy(r.ResourceName)
		}
		glog.V(2).Infof("Unregistered endpoint %v", e)
		m.mutex.Unlock()
	}()
}
  • 首先檢查注冊(cè)的這個(gè)device plugin是否已經(jīng)注冊(cè)過,如果注冊(cè)過,則獲取已經(jīng)緩存的devices。

  • 再檢查device plugin的socket是否能dial成功,如果dial失敗,則說明device plugin沒正常啟動(dòng)。如果dial成功,就根據(jù)已經(jīng)緩存的devices重新初始化Endpoint,EndpointImpl的定義如下:

    	type endpointImpl struct {
    		client     pluginapi.DevicePluginClient
    		clientConn *grpc.ClientConn
    
    		socketPath   string
    		resourceName string
    		stopTime     time.Time
    
    		devices map[string]pluginapi.Device
    		mutex   sync.Mutex
    
    		cb monitorCallback
    	}


  • 為了防止在EndpointImpl重新初始化的過程中device plugin進(jìn)行re-register,初始化完成后再次獲取緩存中該device plugin的Endpoint,并與初始化之前的Endpoint對(duì)象進(jìn)行比對(duì):

    • 如果不是同一個(gè)對(duì)象,則說明在初始化過程中發(fā)生了re-register,那么就invoke Endpoint的stop接口,關(guān)閉gRPC連接,并設(shè)置Endpoint的stopTime為當(dāng)前時(shí)間,Register流程以失敗結(jié)束。

    • 否則繼續(xù)后面流程。

  • 如果該device plugin之前注冊(cè)過,那么再重新調(diào)用Endpoint的run()啟動(dòng)之前,先調(diào)用Endpoint的stop關(guān)閉gRPC連接,并設(shè)置Endpoint的stopTime為當(dāng)前時(shí)間。

  • 然后啟動(dòng)golang協(xié)程執(zhí)行Endpoint的run(),在run方法中:

    • 調(diào)用device plugin的ListAndWatch gRPC接口,通過長連接持續(xù)獲取ListAndWatch gRPC stream,

    • 從stream流中獲取的devices與Endpoint中緩存的devices進(jìn)行比對(duì),得到需要add/delete/update的devices,

    • 然后調(diào)用Endpoint的callback(也就是ManagerImpl注冊(cè)的callback方法genericDeviceUpdateCallback)進(jìn)行Device Manager的緩存更新并寫到checkpoint文件中。

  • 直到與device plugin的gRPC連接發(fā)生errListAndWatch錯(cuò)誤,跳出持續(xù)獲取stream的死循環(huán),然后調(diào)用Endpoint的stop關(guān)閉gRPC連接,并設(shè)置Endpoint的stopTime為當(dāng)前時(shí)間。

  • invoke stop后,再標(biāo)記該device plugin對(duì)應(yīng)的所有devices為unhealthy,即設(shè)置healthyDevices為空, 所有原來healthy的devices都加到unhealthyDevices中,此時(shí)表示注冊(cè)失敗。

調(diào)用Device Plugin的Allocate接口

注冊(cè)UpdatePluginResources為Pod Admit Handler

kubelet在NewMainKubelet中會(huì)注冊(cè)一系列的Pod Admit Handler,當(dāng)有Pod需要?jiǎng)?chuàng)建的時(shí),都會(huì)先調(diào)用這些Pod Admit Handler進(jìn)行處理,其中klet.containerManager.UpdatePluginResources就是kubelet Device Manager為Pod分配devices的。

pkg/kubelet/kubelet.go:893

func NewMainKubelet( ... ) (*Kubelet, error) {
	...
	
	klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources))
	
	...
}
	
pkg/kubelet/cm/container_manager_linux.go:618

func (cm *containerManagerImpl) UpdatePluginResources(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
	return cm.deviceManager.Allocate(node, attrs)
}

Allocate

kubelet在創(chuàng)建Pod前,會(huì)invoke Device Manager的Allocate方法,為Pod中的每個(gè)Container請(qǐng)求分配對(duì)應(yīng)的devices,kubelet會(huì)將請(qǐng)求轉(zhuǎn)發(fā)到對(duì)應(yīng)的Endpoint的Allocate方法, 然后請(qǐng)求會(huì)到對(duì)應(yīng)的device plugin進(jìn)行處理。

pkg/kubelet/cm/devicemanager/manager.go:259

func (m *ManagerImpl) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
	pod := attrs.Pod
	devicesToReuse := make(map[string]sets.String)
	// TODO: Reuse devices between init containers and regular containers.
	for _, container := range pod.Spec.InitContainers {
		if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil {
			return err
		}
		m.podDevices.addContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)
	}
	for _, container := range pod.Spec.Containers {
		if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil {
			return err
		}
		m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)
	}

	m.mutex.Lock()
	defer m.mutex.Unlock()

	// quick return if no pluginResources requested
	if _, podRequireDevicePluginResource := m.podDevices[string(pod.UID)]; !podRequireDevicePluginResource {
		return nil
	}

	m.sanitizeNodeAllocatable(node)
	return nil
}
  • 調(diào)用allocateContainerResources為Pod中的init container分配devices,并更新ManagerImpl中PodDevices緩存;

  • 調(diào)用allocateContainerResources為Pod中的regular container分配devices,并更新ManagerImpl中PodDevices緩存;

  • 調(diào)用sanitizeNodeAllocatable更新scheduler cache中Node對(duì)應(yīng)Resource Name的Allocatable Resource;

allocateContainerResources

pkg/kubelet/cm/devicemanager/manager.go:608

func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Container, devicesToReuse map[string]sets.String) error {
	podUID := string(pod.UID)
	contName := container.Name
	allocatedDevicesUpdated := false
	// Extended resources are not allowed to be overcommitted.
	// Since device plugin advertises extended resources,
	// therefore Requests must be equal to Limits and iterating
	// over the Limits should be sufficient.
	for k, v := range container.Resources.Limits {
		resource := string(k)
		needed := int(v.Value())
		glog.V(3).Infof("needs %d %s", needed, resource)
		if !m.isDevicePluginResource(resource) {
			continue
		}
		// Updates allocatedDevices to garbage collect any stranded resources
		// before doing the device plugin allocation.
		if !allocatedDevicesUpdated {
			m.updateAllocatedDevices(m.activePods())
			allocatedDevicesUpdated = true
		}
		allocDevices, err := m.devicesToAllocate(podUID, contName, resource, needed, devicesToReuse[resource])
		if err != nil {
			return err
		}
		if allocDevices == nil || len(allocDevices) <= 0 {
			continue
		}

		startRPCTime := time.Now()
		
		m.mutex.Lock()
		e, ok := m.endpoints[resource]
		m.mutex.Unlock()
		if !ok {
			m.mutex.Lock()
			m.allocatedDevices = m.podDevices.devices()
			m.mutex.Unlock()
			return fmt.Errorf("Unknown Device Plugin %s", resource)
		}

		devs := allocDevices.UnsortedList()
		
		glog.V(3).Infof("Making allocation request for devices %v for device plugin %s", devs, resource)
		resp, err := e.allocate(devs)
		metrics.DevicePluginAllocationLatency.WithLabelValues(resource).Observe(metrics.SinceInMicroseconds(startRPCTime))
		if err != nil {
			m.mutex.Lock()
			m.allocatedDevices = m.podDevices.devices()
			m.mutex.Unlock()
			return err
		}

		// Update internal cached podDevices state.
		m.mutex.Lock()
		m.podDevices.insert(podUID, contName, resource, allocDevices, resp.ContainerResponses[0])
		m.mutex.Unlock()
	}

	// Checkpoints device to container allocation information.
	return m.writeCheckpoint()
}
  • device plugin提供的Resource屬于Kubernetes Extended Resources,所以其Resource QoS只能是Guaranted。

  • 每次在為Pod分配devices之前,都去檢查一下此時(shí)的active pods,并與podDevices緩存中的pods進(jìn)行比對(duì),將已經(jīng)terminated的Pods的devices從podDevices中刪除,即進(jìn)行了devices的GC操作。

  • 從healthyDevices中隨機(jī)分配對(duì)應(yīng)數(shù)量的devices給該P(yáng)od,并注意更新allocatedDevices,否則會(huì)導(dǎo)致一個(gè)device被分配給多個(gè)Pod。

  • 拿到devices后,就調(diào)用Endpoint的Allocate方法(進(jìn)而調(diào)用對(duì)應(yīng)device plugin的Allocate gRPC Service),device plugin返回ContainerAllocateResponse(包括注入的環(huán)境變量、掛載信息、Annotations)。

  • 更新podDevices緩存信息,并將ManagerImpl中緩存數(shù)據(jù)更新到checkpoint文件中。

思考:當(dāng)init container結(jié)束后,對(duì)應(yīng)分配的devices會(huì)被釋放嗎? 目前還不會(huì)釋放devices,在Allocate前只會(huì)回收Terminated Pods的devices,并沒有回收init container的devices。要優(yōu)化這個(gè)也是比較簡單的,只要修改上面代碼中updateAllocatedDevices方法內(nèi)的邏輯就行了,增加init container的devices回收邏輯。
所以當(dāng)前版本最好不會(huì)要在init container中使用devices,雖然這種場(chǎng)景幾乎不存在。

維護(hù)NodeStatus中Device Plugin管理的Resource Capacity

當(dāng)kubelet更新node status時(shí)會(huì)調(diào)用GetCapacity更新device plugins對(duì)應(yīng)的Resource信息。

pkg/kubelet/kubelet_node_status.go:599

func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) {
	...
	devicePluginCapacity, devicePluginAllocatable, removedDevicePlugins = kl.containerManager.GetDevicePluginResourceCapacity()
	...
}	


pkg/kubelet/cm/container_manager_linux.go:881

func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) {
	return cm.deviceManager.GetCapacity()
}

下面是GetCapacity的具體代碼實(shí)現(xiàn),邏輯很簡單:

  • 檢測(cè)healthyDevices對(duì)應(yīng)的device plugin是否已經(jīng)從緩存中刪除或者已經(jīng)停止超過5min,如果滿足以上條件之一,則從endpoints和healthyDevices緩存中刪除這些devices。

  • 檢測(cè)unhealthyDevices對(duì)應(yīng)的device plugin是否已經(jīng)從緩存中刪除或者已經(jīng)停止超過5min,如果滿足以上條件之一,則從endpoints和unhealthyDevices緩存中刪除這些devices。

  • 如果緩存發(fā)生變化,則更新到checkpoint文件中。

pkg/kubelet/cm/devicemanager/manager.go:414

func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) {
	needsUpdateCheckpoint := false
	var capacity = v1.ResourceList{}
	var allocatable = v1.ResourceList{}
	deletedResources := sets.NewString()
	m.mutex.Lock()
	for resourceName, devices := range m.healthyDevices {
		e, ok := m.endpoints[resourceName]
		if (ok && e.stopGracePeriodExpired()) || !ok {
		
			if !ok {
				glog.Errorf("unexpected: healthyDevices and endpoints are out of sync")
			}
			delete(m.endpoints, resourceName)
			delete(m.healthyDevices, resourceName)
			deletedResources.Insert(resourceName)
			needsUpdateCheckpoint = true
		} else {
			capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
			allocatable[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
		}
	}
	for resourceName, devices := range m.unhealthyDevices {
		e, ok := m.endpoints[resourceName]
		if (ok && e.stopGracePeriodExpired()) || !ok {
			if !ok {
				glog.Errorf("unexpected: unhealthyDevices and endpoints are out of sync")
			}
			delete(m.endpoints, resourceName)
			delete(m.unhealthyDevices, resourceName)
			deletedResources.Insert(resourceName)
			needsUpdateCheckpoint = true
		} else {
			capacityCount := capacity[v1.ResourceName(resourceName)]
			unhealthyCount := *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
			capacityCount.Add(unhealthyCount)
			capacity[v1.ResourceName(resourceName)] = capacityCount
		}
	}
	m.mutex.Unlock()
	if needsUpdateCheckpoint {
		m.writeCheckpoint()
	}
	return capacity, allocatable, deletedResources.UnsortedList()
}

GetCapacity更新NodeStatus如下數(shù)據(jù):

  • registered device plugin resource Capacity

  • registered device plugin resource Allocatable

  • previously registered resources that are no longer active

調(diào)用Device Plugin的PreStartContainer接口

在kubelet的GetResource中,會(huì)調(diào)用DeviceManager的GetDeviceRunContainerOptions,并將這些options添加到kubecontainer.RunContainerOptions中。RunContainerOptions包括Envs、Mounts、Devices、PortMappings、Annotations等信息。

pkg/kubelet/cm/container_manager_linux.go:601

// TODO: move the GetResources logic to PodContainerManager.
func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) {
	opts := &kubecontainer.RunContainerOptions{}
	// Allocate should already be called during predicateAdmitHandler.Admit(),
	// just try to fetch device runtime information from cached state here
	devOpts, err := cm.deviceManager.GetDeviceRunContainerOptions(pod, container)
	if err != nil {
		return nil, err
	} else if devOpts == nil {
		return opts, nil
	}
	opts.Devices = append(opts.Devices, devOpts.Devices...)
	opts.Mounts = append(opts.Mounts, devOpts.Mounts...)
	opts.Envs = append(opts.Envs, devOpts.Envs...)
	opts.Annotations = append(opts.Annotations, devOpts.Annotations...)
	return opts, nil
}
  • Device Manager的GetDeviceRunContainerOptions會(huì)根據(jù)pluginOpts的PreStartRequired是否為true,決定是否調(diào)用device plugin的PreStartContainer gRPC Service。

注意:如果某個(gè)device plugin的PreStartRequired為true,那么需要注冊(cè)kubelet Device Manager調(diào)用device plugin的PreStartContainer接口的超時(shí)時(shí)間是30s,即30s內(nèi)必須完成PreStartContainer的邏輯并返回。

pkg/kubelet/cm/devicemanager/manager.go:688

// GetDeviceRunContainerOptions checks whether we have cached containerDevices
// for the passed-in <pod, container> and returns its DeviceRunContainerOptions
// for the found one. An empty struct is returned in case no cached state is found.
func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error) {
	podUID := string(pod.UID)
	contName := container.Name
	for k := range container.Resources.Limits {
		resource := string(k)
		if !m.isDevicePluginResource(resource) {
			continue
		}
		err := m.callPreStartContainerIfNeeded(podUID, contName, resource)
		if err != nil {
			return nil, err
		}
	}
	m.mutex.Lock()
	defer m.mutex.Unlock()
	return m.podDevices.deviceRunContainerOptions(string(pod.UID), container.Name), nil
}
  • 然后deviceRunContainerOptions負(fù)責(zé)封裝Container的Envs、Mount points、Device files、Annotations。

到此,關(guān)于“Device Manager在什么時(shí)候創(chuàng)建”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI