溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

kubernetes中etcd增刪改查的具體實(shí)現(xiàn)

發(fā)布時(shí)間:2021-07-28 19:03:53 來(lái)源:億速云 閱讀:321 作者:chen 欄目:云計(jì)算

本篇內(nèi)容主要講解“kubernetes中etcd增刪改查的具體實(shí)現(xiàn)”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“kubernetes中etcd增刪改查的具體實(shí)現(xiàn)”吧!

kubernetes中基于etcd實(shí)現(xiàn)集中的數(shù)據(jù)存儲(chǔ),今天來(lái)學(xué)習(xí)下基于etcd如何實(shí)現(xiàn)數(shù)據(jù)讀取一致性、更新一致性、事務(wù)的具體實(shí)現(xiàn)

1. 數(shù)據(jù)的存儲(chǔ)與版本

1.1 數(shù)據(jù)存儲(chǔ)的轉(zhuǎn)換

kubernetes中etcd增刪改查的具體實(shí)現(xiàn) 在k8s中有部分?jǐn)?shù)據(jù)的存儲(chǔ)是需要經(jīng)過(guò)處理之后才能存儲(chǔ)的,比如secret這種加密的數(shù)據(jù),既然要存儲(chǔ)就至少包含兩個(gè)操作,加密存儲(chǔ),解密讀取,transformer就是為了完成該操作而實(shí)現(xiàn)的,其在進(jìn)行etcd數(shù)據(jù)存儲(chǔ)的時(shí)候回對(duì)數(shù)據(jù)進(jìn)行加密,而在讀取的時(shí)候,則會(huì)進(jìn)行解密

1.2 資源版本revision

kubernetes中etcd增刪改查的具體實(shí)現(xiàn) 在etcd中進(jìn)行修改(增刪改)操作的時(shí)候,都會(huì)遞增revision,而在k8s中也通過(guò)該值來(lái)作為k8s資源的ResourceVersion,該機(jī)制也是實(shí)現(xiàn)watch的關(guān)鍵機(jī)制,在操作etcd解碼從etcd獲取的數(shù)據(jù)的時(shí)候,會(huì)通過(guò)versioner組件來(lái)為資源動(dòng)態(tài)的修改該值

1.3 數(shù)據(jù)模型的映射

kubernetes中etcd增刪改查的具體實(shí)現(xiàn) 將數(shù)據(jù)從etcd中讀取后,數(shù)據(jù)本身就是一個(gè)字節(jié)數(shù)組,如何將對(duì)應(yīng)的數(shù)據(jù)轉(zhuǎn)換成我們真正的運(yùn)行時(shí)對(duì)象呢?還記得我們之前的scheme與codec么,在這里我們知道對(duì)應(yīng)的數(shù)據(jù)編碼格式,也知道資源對(duì)象的類型,則通過(guò)codec、字節(jié)數(shù)組、目標(biāo)類型,我們就可以完成對(duì)應(yīng)數(shù)據(jù)的反射

2. 查詢接口一致性

kubernetes中etcd增刪改查的具體實(shí)現(xiàn) etcd中的數(shù)據(jù)寫入是基于leader單點(diǎn)寫入和集群quorum機(jī)制實(shí)現(xiàn)的,并不是一個(gè)強(qiáng)一致性的數(shù)據(jù)寫入,則如果如果我們?cè)L問(wèn)的節(jié)點(diǎn)不存在quorum的半數(shù)節(jié)點(diǎn)內(nèi),則可能造成短暫的數(shù)據(jù)不一致,針對(duì)一些強(qiáng)一致的場(chǎng)景,我們可以通過(guò)其revision機(jī)制來(lái)進(jìn)行數(shù)據(jù)的讀取, 保證我們讀取到更新之后的數(shù)據(jù)

// 省略非核心代碼
func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error {
	// 獲取key
	getResp, err := s.client.KV.Get(ctx, key, s.getOps...)

    // 檢測(cè)當(dāng)前版本,是否達(dá)到最小版本的
	if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
		return err
	}

	// 執(zhí)行數(shù)據(jù)轉(zhuǎn)換
	data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key))
	if err != nil {
		return storage.NewInternalError(err.Error())
	}
	// 解碼數(shù)據(jù)
	return decode(s.codec, s.versioner, data, out, kv.ModRevision)
}

3. 創(chuàng)建接口實(shí)現(xiàn)kubernetes中etcd增刪改查的具體實(shí)現(xiàn)

創(chuàng)建一個(gè)接口數(shù)據(jù)則會(huì)首先進(jìn)行資源對(duì)象的檢查,避免重復(fù)創(chuàng)建對(duì)象,此時(shí)會(huì)先通過(guò)資源對(duì)象的version字段來(lái)進(jìn)行初步檢查,然后在利用etcd的事務(wù)機(jī)制來(lái)保證資源創(chuàng)建的原子性操作

// 省略非核心代碼
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
	if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
		return errors.New("resourceVersion should not be set on objects to be created")
	}
	if err := s.versioner.PrepareObjectForStorage(obj); err != nil {
		return fmt.Errorf("PrepareObjectForStorage failed: %v", err)
	}
	// 將數(shù)據(jù)編碼
	data, err := runtime.Encode(s.codec, obj)
	if err != nil {
		return err
	}
	
	// 轉(zhuǎn)換數(shù)據(jù)
	newData, err := s.transformer.TransformToStorage(data, authenticatedDataString(key))
	if err != nil {
		return storage.NewInternalError(err.Error())
	}

	startTime := time.Now()
    // 事務(wù)操作
	txnResp, err := s.client.KV.Txn(ctx).If(
		notFound(key), // 如果之前不存在 這里是利用的etcd的ModRevision即修改版本為0, 寓意著對(duì)應(yīng)的key不存在
	).Then(
		clientv3.OpPut(key, string(newData), opts...), // put修改數(shù)據(jù)
	).Commit()
	metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
	if err != nil {
		return err
	}
	if !txnResp.Succeeded {
		return storage.NewKeyExistsError(key, 0)
	}

	if out != nil {
        // 獲取對(duì)應(yīng)的Revision
		putResp := txnResp.Responses[0].GetResponsePut()
		return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
	}
	return nil
}

func notFound(key string) clientv3.Cmp {
	return clientv3.Compare(clientv3.ModRevision(key), "=", 0)
}

4. 刪除接口的實(shí)現(xiàn)

kubernetes中etcd增刪改查的具體實(shí)現(xiàn) 刪除接口主要是通過(guò)CAS和事務(wù)機(jī)制來(lái)共同實(shí)現(xiàn),確保在etcd不發(fā)生異常的情況,即使并發(fā)對(duì)同個(gè)資源來(lái)進(jìn)行刪除操作也能保證至少有一個(gè)節(jié)點(diǎn)成功

// 省略非核心代碼
func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc) error {
	startTime := time.Now()
	// 獲取當(dāng)前的key的數(shù)據(jù)
	getResp, err := s.client.KV.Get(ctx, key)
	for {
		// 獲取當(dāng)前的狀態(tài)
		origState, err := s.getState(getResp, key, v, false)
		if err != nil {
			return err
		}
		txnResp, err := s.client.KV.Txn(ctx).If(
			clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev), // 如果修改版本等于當(dāng)前狀態(tài),就嘗試刪除
		).Then(
			clientv3.OpDelete(key), // 刪除
		).Else(
			clientv3.OpGet(key),	// 獲取
		).Commit()
		if !txnResp.Succeeded {
			// 獲取最新的數(shù)據(jù)重試事務(wù)操作
			getResp = (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
			klog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
			continue
		}
		// 將最后一個(gè)版本的數(shù)據(jù)解碼到out里面,然后返回
		return decode(s.codec, s.versioner, origState.data, out, origState.rev)
	}
}

5. 更新接口的實(shí)現(xiàn)

kubernetes中etcd增刪改查的具體實(shí)現(xiàn) 更新接口實(shí)現(xiàn)上與刪除接口并無(wú)本質(zhì)上的差別,但是如果多個(gè)節(jié)點(diǎn)同時(shí)進(jìn)行更新,CAS并發(fā)操作必然會(huì)有一個(gè)節(jié)點(diǎn)成功,當(dāng)發(fā)現(xiàn)已經(jīng)有節(jié)點(diǎn)操作成功,則當(dāng)前節(jié)點(diǎn)其實(shí)并不需要再做過(guò)多的操作,直接返回即可

// 省略非核心代碼
func (s *store) GuaranteedUpdate(
	ctx context.Context, key string, out runtime.Object, ignoreNotFound bool,
	preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion ...runtime.Object) error {
	// 獲取當(dāng)前key的最新數(shù)據(jù)
	getCurrentState := func() (*objState, error) {
		startTime := time.Now()
		getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
		metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
		if err != nil {
			return nil, err
		}
		return s.getState(getResp, key, v, ignoreNotFound)
	}

	// 獲取當(dāng)前數(shù)據(jù)
	var origState *objState
	var mustCheckData bool
	if len(suggestion) == 1 && suggestion[0] != nil {
		// 如果提供了建議的數(shù)據(jù),則會(huì)使用,
		origState, err = s.getStateFromObject(suggestion[0])
		if err != nil {
			return err
		}
		//但是需要檢測(cè)數(shù)據(jù)
		mustCheckData = true
	} else {
		// 嘗試重新獲取數(shù)據(jù)
		origState, err = getCurrentState()
		if err != nil {
			return err
		}
	}

	transformContext := authenticatedDataString(key)
	for {
		// 檢查對(duì)象是否已經(jīng)更新, 主要是通過(guò)檢測(cè)uuid/revision來(lái)實(shí)現(xiàn)
		if err := preconditions.Check(key, origState.obj); err != nil {
			// If our data is already up to date, return the error
			if !mustCheckData {
				return err
			}
			// 如果檢查數(shù)據(jù)一致性錯(cuò)誤,則需要重新獲取
			origState, err = getCurrentState()
			if err != nil {
				return err
			}
			mustCheckData = false
			// Retry
			continue
		}

		// 刪除當(dāng)前的版本數(shù)據(jù)revision
		ret, ttl, err := s.updateState(origState, tryUpdate)
		if err != nil {
			// If our data is already up to date, return the error
			if !mustCheckData {
				return err
			}

			// It's possible we were working with stale data
			// Actually fetch
			origState, err = getCurrentState()
			if err != nil {
				return err
			}
			mustCheckData = false
			// Retry
			continue
		}

		// 編碼數(shù)據(jù)
		data, err := runtime.Encode(s.codec, ret)
		if err != nil {
			return err
		}
		if !origState.stale && bytes.Equal(data, origState.data) {
			// 如果我們發(fā)現(xiàn)我們當(dāng)前的數(shù)據(jù)與獲取到的數(shù)據(jù)一致,則會(huì)直接跳過(guò)
			if mustCheckData {
				origState, err = getCurrentState()
				if err != nil {
					return err
				}
				mustCheckData = false
				if !bytes.Equal(data, origState.data) {
					// original data changed, restart loop
					continue
				}
			}
			if !origState.stale {
                // 直接返回?cái)?shù)據(jù)
				return decode(s.codec, s.versioner, origState.data, out, origState.rev)
			}
		}

		// 磚漢數(shù)據(jù)
		newData, err := s.transformer.TransformToStorage(data, transformContext)
		if err != nil {
			return storage.NewInternalError(err.Error())
		}

		opts, err := s.ttlOpts(ctx, int64(ttl))
		if err != nil {
			return err
		}
		trace.Step("Transaction prepared")

		startTime := time.Now()
		// 事務(wù)更新數(shù)據(jù)
		txnResp, err := s.client.KV.Txn(ctx).If(
			clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
		).Then(
			clientv3.OpPut(key, string(newData), opts...),
		).Else(
			clientv3.OpGet(key),
		).Commit()
		metrics.RecordEtcdRequestLatency("update", getTypeName(out), startTime)
		if err != nil {
			return err
		}
		trace.Step("Transaction committed")
		if !txnResp.Succeeded {
			// 重新獲取數(shù)據(jù)
			getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
			klog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", key)
			origState, err = s.getState(getResp, key, v, ignoreNotFound)
			if err != nil {
				return err
			}
			trace.Step("Retry value restored")
			mustCheckData = false
			continue
		}
		// 獲取put響應(yīng)
		putResp := txnResp.Responses[0].GetResponsePut()

		return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
	}
}

到此,相信大家對(duì)“kubernetes中etcd增刪改查的具體實(shí)現(xiàn)”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI