您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關(guān)怎么實(shí)現(xiàn)Kafka事務(wù)特性分析,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
特性背景
消息事務(wù)是指一系列的生產(chǎn)、消費(fèi)操作可以要么都完成,要么都失敗,類似數(shù)據(jù)庫的事務(wù)。這個(gè)特性在0.10.2的版本是不支持的,從0.11版本開始才支持。華為云DMS率先提供Kafka 1.1.0的專享版服務(wù),支持消息事務(wù)特性。
支持事務(wù)消息有什么作用?消息事務(wù)是實(shí)現(xiàn)分布式事務(wù)的一種方案,可以確保分布式場景下的數(shù)據(jù)最終一致性。例如最常用的轉(zhuǎn)賬場景,小王 轉(zhuǎn)賬到小明,實(shí)際操作是小王賬戶減去相應(yīng)金額,小明的賬戶增加相應(yīng)金額,在分庫分表的前提下,2個(gè)賬戶存儲(chǔ)在不同的數(shù)據(jù)庫中,這時(shí)需要分布式事務(wù)才能保證數(shù)據(jù)庫一致性,單個(gè)數(shù)據(jù)庫的事務(wù)無法保證跨庫之間的原子性。如果小王賬戶先扣錢,再去發(fā)送消息到小明所在的數(shù)據(jù)庫去通知增加錢,在沒有事務(wù)消息的情況下,無論是先扣錢或者先發(fā)送通知增加錢,都會(huì)有數(shù)據(jù)不一致的問題,因?yàn)闊o法保證兩者的原子性。而有了事務(wù)消息,可以保證發(fā)送通知與本地事務(wù)(扣錢)是一個(gè)原子操作,本地事務(wù)與發(fā)送通知可以同時(shí)成功或者同時(shí)失敗,確保數(shù)據(jù)一致。
除了數(shù)據(jù)最終一致性外,還實(shí)現(xiàn)了消息Exactly once語義。所謂Exactly once語義是消息傳遞語義中最難實(shí)現(xiàn)的一種,包括At most once:最多一次(不會(huì)重復(fù),但是可能丟失數(shù)據(jù)); At least once:至少投遞一次(不會(huì)丟失,但是會(huì)導(dǎo)致重復(fù))和Exactly once: 剛好一次(不丟不重),也即冪等性。Kafka的冪等性可以保證生產(chǎn)只對一個(gè)分區(qū)實(shí)現(xiàn)Exactl once語義,需要多個(gè)分區(qū)也實(shí)現(xiàn)這個(gè)語義,還需要引入消息事務(wù)確保原子性。
分布式事務(wù)介紹
當(dāng)前系統(tǒng)架構(gòu)主流是分布式架構(gòu)與微服務(wù)架構(gòu),在這種架構(gòu)下數(shù)據(jù)源不是單一的數(shù)據(jù)庫,業(yè)務(wù)邏輯往往需要在多個(gè)數(shù)據(jù)庫中實(shí)現(xiàn)原子操作,單個(gè)數(shù)據(jù)庫中的強(qiáng)大的本地事務(wù)無法保證多節(jié)點(diǎn)原子操作。 此時(shí)需要分布式事務(wù)來確保數(shù)據(jù)的一致性。目前使用較多的分布式事務(wù)解決方案有幾種:
1、XA事務(wù):兩階段/三階段提交
XA是由X/Open組織提出的分布式事務(wù)的規(guī)范。XA規(guī)范主要定義了(全局)事務(wù)管理器(Transaction Manager)和(局部)資源管理器(Resource Manager)之間的接口。XA接口是雙向的系統(tǒng)接口,在事務(wù)管理器(Transaction Manager)以及一個(gè)或多個(gè)資源管理器(Resource Manager)之間形成通信橋梁。實(shí)現(xiàn)XA事務(wù)的關(guān)鍵是兩階段和三階段提交協(xié)議。
兩階段提交協(xié)議(Two-phase Commit,2PC)經(jīng)常被用來實(shí)現(xiàn)分布式事務(wù)。一般分為協(xié)調(diào)器C和若干事務(wù)參與者Si兩種角色,這里的事務(wù)參與者就是具體的數(shù)據(jù)庫,協(xié)調(diào)器可以和事務(wù)參與者在一臺(tái)機(jī)器上。
二階段提交協(xié)議主要包括由2個(gè)階段:第一個(gè)階段為準(zhǔn)備階段(prepare),第二階段為提交階段。準(zhǔn)備階段由事務(wù)協(xié)調(diào)者向事務(wù)參與者發(fā)送prepare消息,各個(gè)參與者處理本地事務(wù)但不提交,然后向事務(wù)協(xié)調(diào)者返回事務(wù)狀態(tài)。 提交階段根據(jù)準(zhǔn)備階段各參與者的執(zhí)行請求,協(xié)調(diào)者確定事務(wù)是提交或者回滾,向各個(gè)參與者發(fā)送命令。
二階段提交協(xié)議主要的問題是在提交執(zhí)行過程中,所有的參與者都需要聽從協(xié)調(diào)者的統(tǒng)一調(diào)度,期間處于阻塞狀態(tài)而不能從事其他操作,這樣效率及其低下。特別是當(dāng)協(xié)調(diào)者發(fā)出提交通知到部分參與者后宕機(jī),其他參與者就會(huì)阻塞。
針對二階段提交存在的問題,三階段提交協(xié)議在prepare與commit階段之間增加一個(gè)pre-commit階段。Prepare階段只詢問參與者而不做事務(wù),而在pre-commit階段各個(gè)參與者才會(huì)執(zhí)行本地事務(wù)但不提交。Commit階段就是直接提交。這樣做可以避免二階段當(dāng)協(xié)調(diào)者遲遲沒有發(fā)出commit或者rollback通知,參與者在超時(shí)后可以自行提交或者回滾,避免阻塞事務(wù)(這是因?yàn)榻?jīng)過了prepare階段已經(jīng)確認(rèn)了各個(gè)參與者是可以執(zhí)行的,最后第三階段直接執(zhí)行即可)。 三階段提交也存在很多問題,也不能完全保證數(shù)據(jù)一致,完全一致需要用到Paxos算法。
2、TCC補(bǔ)償性事務(wù)解決方案
TCC分別對應(yīng)Try、Confirm和Cancel三種操作,含義如下:
- Try:預(yù)留業(yè)務(wù)資源
- Confirm:確認(rèn)執(zhí)行業(yè)務(wù)操作,執(zhí)行事務(wù)
- Cancel:取消執(zhí)行業(yè)務(wù)操作
TCC解決了跨應(yīng)用業(yè)務(wù)操作的原子性問題,在諸如組合支付、賬務(wù)拆分場景非常實(shí)用。TCC實(shí)際上把數(shù)據(jù)庫層的二階段提交上提到了應(yīng)用層來實(shí)現(xiàn),對于數(shù)據(jù)庫來說是一階段提交,規(guī)避了數(shù)據(jù)庫層的2PC性能低下問題。TCC需要業(yè)務(wù)提供使用,開發(fā)復(fù)雜和成本高。
3、事務(wù)消息
基于消息中間件的事務(wù)消息來完成分布式事務(wù)。事務(wù)消息可以確保本地執(zhí)行事務(wù)與消息發(fā)送是原子的:先發(fā)送一條消息到消息中間件,然后執(zhí)行本地事務(wù),當(dāng)本地事務(wù)成功后再發(fā)送提交確認(rèn)到消息中間件,然后這條消息才能被其他業(yè)務(wù)消費(fèi)者所能感知,從而確保原子性。
Kafka消息事務(wù)
01基本概念
為了支持事務(wù),Kafka 0.11.0版本引入以下概念:
1.事務(wù)協(xié)調(diào)者:類似于消費(fèi)組負(fù)載均衡的協(xié)調(diào)者,每一個(gè)實(shí)現(xiàn)事務(wù)的生產(chǎn)端都被分配到一個(gè)事務(wù)協(xié)調(diào)者(Transaction Coordinator)。
2.引入一個(gè)內(nèi)部Kafka Topic作為事務(wù)Log:類似于消費(fèi)管理Offset的Topic,事務(wù)Topic本身也是持久化的,日志信息記錄事務(wù)狀態(tài)信息,由事務(wù)協(xié)調(diào)者寫入。
3.引入控制消息(Control Messages):這些消息是客戶端產(chǎn)生的并寫入到主題的特殊消息,但對于使用者來說不可見。它們是用來讓broker告知消費(fèi)者之前拉取的消息是否被原子性提交。
4.引入TransactionId:不同生產(chǎn)實(shí)例使用同一個(gè)TransactionId表示是同一個(gè)事務(wù),可以跨Session的數(shù)據(jù)冪等發(fā)送。當(dāng)具有相同Transaction ID的新的Producer實(shí)例被創(chuàng)建且工作時(shí),舊的且擁有相同Transaction ID的Producer將不再工作,避免事務(wù)僵死。
5.Producer ID:每個(gè)新的Producer在初始化的時(shí)候會(huì)被分配一個(gè)唯一的PID,這個(gè)PID對用戶是不可見的。主要是為提供冪等性時(shí)引入的。
6.Sequence Numbler。(對于每個(gè)PID,該P(yáng)roducer發(fā)送數(shù)據(jù)的每個(gè)<Topic, Partition>都對應(yīng)一個(gè)從0開始單調(diào)遞增的Sequence Number。
7.每個(gè)生產(chǎn)者增加一個(gè)epoch:用于標(biāo)識(shí)同一個(gè)事務(wù)Id在一次事務(wù)中的epoch,每次初始化事務(wù)時(shí)會(huì)遞增,從而讓服務(wù)端可以知道生產(chǎn)者請求是否舊的請求。
8.冪等性:保證發(fā)送單個(gè)分區(qū)的消息只會(huì)發(fā)送一次,不會(huì)出現(xiàn)重復(fù)消息。增加一個(gè)冪等性的開關(guān)enable.idempotence,可以獨(dú)立與事務(wù)使用,即可以只開啟冪等但不開啟事務(wù)。
02事務(wù)流程
1、查找事務(wù)協(xié)調(diào)者
生產(chǎn)者會(huì)首先發(fā)起一個(gè)查找事務(wù)協(xié)調(diào)者的請求(FindCoordinatorRequest)。協(xié)調(diào)者會(huì)負(fù)責(zé)分配一個(gè)PID給生產(chǎn)者。類似于消費(fèi)組的協(xié)調(diào)者。
2、獲取produce ID
在知道事務(wù)協(xié)調(diào)者后,生產(chǎn)者需要往協(xié)調(diào)者發(fā)送初始化pid請求(initPidRequest)。這個(gè)請求分兩種情況:
●不帶transactionID
這種情況下直接生成一個(gè)新的produce ID即可,返回給客戶端
●帶transactionID
這種情況下,kafka根據(jù)transactionalId獲取對應(yīng)的PID,這個(gè)對應(yīng)關(guān)系是保存在事務(wù)日志中(上圖2a)。這樣可以確保相同的TransactionId返回相同的PID,用于恢復(fù)或者終止之前未完成的事務(wù)。
3、啟動(dòng)事務(wù)
生產(chǎn)者通過調(diào)用beginTransaction接口啟動(dòng)事務(wù),此時(shí)只是內(nèi)部的狀態(tài)記錄為事務(wù)開始,但是事務(wù)協(xié)調(diào)者認(rèn)為事務(wù)開始只有當(dāng)生產(chǎn)者開始發(fā)送第一條消息才開始。
4、消費(fèi)和生產(chǎn)配合過程
這一步是消費(fèi)和生成互相配合完成事務(wù)的過程,其中涉及多個(gè)請求:
●增加分區(qū)到事務(wù)請求
當(dāng)生產(chǎn)者有新分區(qū)要寫入數(shù)據(jù),則會(huì)發(fā)送AddPartitionToTxnRequest到事務(wù)協(xié)調(diào)者。協(xié)調(diào)者會(huì)處理請求,主要做的事情是更新事務(wù)元數(shù)據(jù)信息,并把信息寫入到事務(wù)日志中(事務(wù)Topic)。
●生產(chǎn)請求
生產(chǎn)者通過調(diào)用send接口發(fā)送數(shù)據(jù)到分區(qū),這些請求新增pid,epoch和sequence number字段。
●增加消費(fèi)offset到事務(wù)
生產(chǎn)者通過新增的snedOffsets ToTransaction接口,會(huì)發(fā)送某個(gè)分區(qū)的Offset信息到事務(wù)協(xié)調(diào)者。協(xié)調(diào)者會(huì)把分區(qū)信息增加到事務(wù)中。
●事務(wù)提交offset請求
當(dāng)生產(chǎn)者調(diào)用事務(wù)提交offset接口后,會(huì)發(fā)送一個(gè)TxnOffsetCommitRequest請求到消費(fèi)組協(xié)調(diào)者,消費(fèi)組協(xié)調(diào)者會(huì)把offset存儲(chǔ)在__consumer-offsets Topic中。協(xié)調(diào)者會(huì)根據(jù)請求的PID和epoch驗(yàn)證生產(chǎn)者是否允許發(fā)起這個(gè)請求。 消費(fèi)offset只有當(dāng)事務(wù)提交后才對外可見。
5、提交或回滾事務(wù)
用戶通過調(diào)用commitTransaction或abortTranssaction方法提交或回滾事務(wù)。
●EndTxnRequest
當(dāng)生產(chǎn)者完成事務(wù)后,客戶端需要顯式調(diào)用結(jié)束事務(wù)或者回滾事務(wù)。前者會(huì)使得消息對消費(fèi)者可見,后者會(huì)對生產(chǎn)數(shù)據(jù)標(biāo)記為Abort狀態(tài),使得消息對消費(fèi)者不可見。無論是提交或者回滾,都是發(fā)送一個(gè)EndTnxRequest請求到事務(wù)協(xié)調(diào)者,寫入PREPARE_COMMIT或者PREPARE_ABORT信息到事務(wù)記錄日志中(5.1a)。
●WriteTxnMarkerRequest
這個(gè)請求是事務(wù)協(xié)調(diào)者向事務(wù)中每個(gè)TopicPartition的Leader發(fā)送的。每個(gè)Broker收到請求后會(huì)寫入COMMIT(PID)或者ABORT(PID)控制信息到數(shù)據(jù)日志中(5.2a)。
這個(gè)信息用于告知消費(fèi)者當(dāng)前消息是哪個(gè)事務(wù),消息是否應(yīng)該接受或者丟棄。而對于未提交消息,消費(fèi)者會(huì)緩存該事務(wù)的消息直到提交或者回滾。
這里要注意,如果事務(wù)也涉及到__consumer_offsets,即該事務(wù)中有消費(fèi)數(shù)據(jù)的操作且將該消費(fèi)的Offset存于__consumer_offsets中,Transaction Coordinator也需要向該內(nèi)部Topic的各Partition的Leader發(fā)送WriteTxnMarkerRequest從而寫入COMMIT(PID)或COMMIT(PID)控制信息(5.2a 左邊)。
●寫入最終提交或回滾信息
當(dāng)提交和回滾信息寫入數(shù)據(jù)日子后,事務(wù)協(xié)調(diào)者會(huì)往事務(wù)日志中寫入最終的提交或者終止信息以表示事務(wù)已經(jīng)完成(圖5.3),此時(shí)大部分于事務(wù)有關(guān)系的消息都可以被刪除(通過標(biāo)記后面在日志壓縮時(shí)會(huì)被移除),我們只需要保留事務(wù)ID以及其時(shí)間戳即可。
以上就是怎么實(shí)現(xiàn)Kafka事務(wù)特性分析,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過這篇文章學(xué)到更多知識(shí)。更多詳情敬請關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。