溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

怎么解析Kafka中的事務(wù)消息

發(fā)布時間:2021-12-15 09:12:28 來源:億速云 閱讀:619 作者:柒染 欄目:大數(shù)據(jù)

這篇文章給大家介紹怎么解析Kafka中的事務(wù)消息,內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。

1.冪等消息

為了解決重試導(dǎo)致的消息重復(fù)、亂序問題,kafka引入了冪等消息。冪等消息保證producer在一次會話內(nèi)寫入一個partition內(nèi)的消息具有冪等性,可以通過重試來確保消息發(fā)布的Exactly Once語義。

實現(xiàn)邏輯很簡單:

  • 區(qū)分producer會話

producer每次啟動后,首先向broker申請一個全局唯一的pid,用來標(biāo)識本次會話。

  • 消息檢測

message_v2 增加了sequence number字段,producer每發(fā)一批消息,seq就加1。

broker在內(nèi)存維護(pid,seq)映射,收到消息后檢查seq,如果,

new_seq=old_seq+1: 正常消息;new_seq<=old_seq : 重復(fù)消息;new_seq>old_seq+1: 消息丟失;
  • producer重試

producer在收到明確的的消息丟失ack,或者超時后未收到ack,要進行重試。

2.事務(wù)消息

考慮在stream處理的場景中,需要多個消息的原子寫入語義,要么全部寫入成功,要么全部失敗,這就是kafka事務(wù)消息要解決的問題。

事務(wù)消息是由producer、事務(wù)協(xié)調(diào)器、broker、組協(xié)調(diào)器、consumer共同參與實現(xiàn)的,

1)producer

為producer指定固定的TransactionalId,可以穿越producer的多次會話(producer重啟/斷線重連)中,持續(xù)標(biāo)識producer的身份。

使用epoch標(biāo)識producer的每一次"重生",防止同一producer存在多個會話。

producer遵從冪等消息的行為,并在發(fā)送的recordbatch中增加事務(wù)id和epoch。

2)事務(wù)協(xié)調(diào)器(Transaction Coordinator)

引入事務(wù)協(xié)調(diào)器,以兩階段提交的方式,實現(xiàn)消息的事務(wù)提交。

事務(wù)協(xié)調(diào)器使用一個特殊的topic:transaction,來做事務(wù)提交日志。

事務(wù)控制器通過RPC調(diào)研,協(xié)調(diào) broker 和 consumer coordinator 實現(xiàn)事務(wù)的兩階段提交。

每一個broker都會啟動一個事務(wù)協(xié)調(diào)器,使用hash(TransactionalId)確定producer對應(yīng)的事務(wù)協(xié)調(diào)器,使得整個集群的負(fù)載均衡。

3) broker

broker處理在事務(wù)協(xié)調(diào)器的commit/abort控制消息,把控制消息向正常消息一樣寫入topic(和正常消息交織在一起,用來確認(rèn)事務(wù)提交的日志偏移),并向前推進消息提交偏移hw。

4) 組協(xié)調(diào)器

如果在事務(wù)過程中,提交了消費偏移,組協(xié)調(diào)器在offset log中寫入事務(wù)消費偏移。當(dāng)事務(wù)提交時,在offset log中寫入事務(wù)offset確認(rèn)消息。

5)consumer

consumer過濾未提交消息和事務(wù)控制消息,使這些消息對用戶不可見。

有兩種實現(xiàn)方式:

  • consumer緩存方式

設(shè)置isolation.level=read_uncommitted,此時topic的所有消息對consumer都可見。

consumer緩存這些消息,直到收到事務(wù)控制消息。若事務(wù)commit,則對外發(fā)布這些消息;若事務(wù)abort,則丟棄這些消息。

  • broker過濾方式

設(shè)置isolation.level=read_committed,此時topic中未提交的消息對consumer不可見,只有在事務(wù)結(jié)束后,消息才對consumer可見。

broker給consumer的BatchRecord消息中,會包含以列表,指明哪些是"abort"事務(wù),consumer丟棄abort事務(wù)的消息即可。

事務(wù)消息處理流程如圖1所示,

怎么解析Kafka中的事務(wù)消息

圖1 事務(wù)消息業(yè)務(wù)流程

流程說明:

1. 查找事務(wù)協(xié)調(diào)器 -- FindCoordinatorRequest

事務(wù)協(xié)調(diào)器是分配pid和管理事務(wù)的核心,produer首先對任何一個broker發(fā)送FindCoordinatorRequest,發(fā)現(xiàn)自己的事務(wù)協(xié)調(diào)器。

2. 申請pid -- InitPidRequest

緊接著,producer向事務(wù)協(xié)調(diào)器發(fā)送InitPidRequest,申請生成pid。

2a.當(dāng)指定了transactional.id時,事務(wù)協(xié)調(diào)器為producer分區(qū)pid,并更新epoch,把(tid,pid)的映射關(guān)系寫入事務(wù)日志。同時清理tid任何未完成的事務(wù),丟棄未提交的消息。

3. 啟動事務(wù)

啟動事務(wù)是producer的本地操作,促使producer更新內(nèi)部狀態(tài),不會和事務(wù)協(xié)調(diào)器發(fā)生關(guān)系。

事務(wù)協(xié)調(diào)器自動啟動事務(wù),始終處在一個接一個的事務(wù)處理狀態(tài)機中。

4. consume-transform-produce 事務(wù)循環(huán)

4.1. 注冊partition -- AddPartitionsToTxnRequest

對于每一個要在事務(wù)中寫消息的topic分區(qū),producer應(yīng)當(dāng)在第一次發(fā)消息前,向事務(wù)處理器注冊分區(qū)。

4.1a.事務(wù)處理器把事務(wù)關(guān)聯(lián)的分區(qū)寫入事務(wù)日志。

在提交或終止事務(wù)時,事務(wù)協(xié)調(diào)器需要這些信息,控制事務(wù)涉及的所有分區(qū)leader完成事務(wù)提交或終止。

4.2. 寫消息 -- ProduceRequest

4.2a. producer向分區(qū)leader寫消息,消息中包含tid,pid,epoch和seq。

4.3. 提交消費偏移 -- AddOffsetCommitsToTxnRequest

4.3a. producer向事務(wù)協(xié)調(diào)器發(fā)送消費偏移,事務(wù)協(xié)調(diào)器在事務(wù)日志中記錄偏移信息,并把組協(xié)調(diào)器返回給producer。

4.4. 提交消費偏移 -- TxnOffsetCommitRequest

4.4a. producer向組協(xié)調(diào)器發(fā)送TxnOffsetCommitRequest,組協(xié)調(diào)器把偏移信息寫入偏移日志。但是,要一直等到事務(wù)提交后,這個偏移才生效,對外部可見。

5. 提交或終止事務(wù)

5.1. EndTxnRequest

收到提交或終止事務(wù)的請求時,事務(wù)處理器執(zhí)行下面的操作:

1. 在事務(wù)日志中寫入PREPARE_COMMIT或PREPARE_ABORT消息(5.1a)。

2. 通過WriteTxnMarkerRequest向事務(wù)中的所有broker發(fā)事務(wù)控制消息(5.2)。

3. 在事務(wù)之日中寫入COMMITTED或ABORTED消息(5.3)。

5.2. WriteTxnMarkerRequest

這個消息由事務(wù)處理器發(fā)給事務(wù)中所涉及分區(qū)的leader。

當(dāng)收到這個消息后,broker會在分區(qū)log中寫入一個COMMIT或ABORT控制消息。同時,也會更新該分區(qū)的事務(wù)提交偏移hw。

如果事務(wù)中有提交消費偏移, broker也會把控制消息寫入 __consumer-offsets log,并通知組協(xié)調(diào)器使事務(wù)中提交的消費偏移生效。

5.3. 寫最終的commit或abort消息

當(dāng)所有的commit或abort消息寫入數(shù)據(jù)日志,事務(wù)協(xié)調(diào)器在事務(wù)日志中寫入事務(wù)日志,標(biāo)志這事務(wù)結(jié)束。

至此,本事務(wù)的所有狀態(tài)信息都可以被刪除,可以開始一個新的事務(wù)。

在實現(xiàn)上,還有很多細(xì)節(jié),比如,事務(wù)協(xié)調(diào)器會啟動定時器,用來檢測并終止開始后長時間不活動的事務(wù),具體請參考下面列出的kafka社區(qū)技術(shù)文檔。

我們要認(rèn)識到,雖然kafka事務(wù)消息提供了多個消息原子寫的保證,但它不保證原子讀。

例如,

1)事務(wù)向topic_a和topic_b兩個分區(qū)寫入消息,在事務(wù)提交后的某個時刻,topic_a的全部副本失效。這時topic_b中的消息可以正常消費,但topic_a中的消息就丟失了。2)假如consumer只消費了topic_a,沒有消費topic_b,這樣也不能讀到完整的事務(wù)消息。3)典型的kafka stream應(yīng)用從多個topic消費,然后向一個或多個topic寫。在一次故障后,kafka stream應(yīng)用重新開始處理流數(shù)據(jù),由于從多個topic讀到的數(shù)據(jù)之間不存在穩(wěn)定的順序(即便只有一個topic,從多個分區(qū)讀到的數(shù)據(jù)之間也沒有穩(wěn)定的順序),那么兩次處理輸出的結(jié)果就可能會不一樣。

也就是說,雖然kafka log持久化了數(shù)據(jù),也可以通過指定offset多次消費數(shù)據(jù),但由于分區(qū)數(shù)據(jù)之間的無序性,導(dǎo)致每次處理輸出的結(jié)果都是不同的。這使得kafka stream不能像hadoop批處理任務(wù)一樣,可以隨時重新執(zhí)行,保證每次執(zhí)行的結(jié)果相同。除非我們只從一個topic分區(qū)讀數(shù)據(jù)。

關(guān)于怎么解析Kafka中的事務(wù)消息就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI