溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

RocketMQ事務(wù)消息是怎么保證數(shù)據(jù)的一致性

發(fā)布時(shí)間:2021-10-18 11:24:59 來(lái)源:億速云 閱讀:155 作者:iii 欄目:web開發(fā)

這篇文章主要介紹“RocketMQ事務(wù)消息是怎么保證數(shù)據(jù)的一致性”,在日常操作中,相信很多人在RocketMQ事務(wù)消息是怎么保證數(shù)據(jù)的一致性問題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”RocketMQ事務(wù)消息是怎么保證數(shù)據(jù)的一致性”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!

前言

在面過(guò)的幾家大廠中,幾乎每輪的面試官(「沒寫錯(cuò),幾乎是每輪面試官」)都問了同樣一個(gè)問題:你們的系統(tǒng)是分布式的系統(tǒng)嗎?

答:是。

面試官:那么你們分布式的系統(tǒng)是如何解決分布式事務(wù)這個(gè)問題的呢?也就是如何保證數(shù)據(jù)的一致性。

答:我們的系統(tǒng)中通過(guò) RocketMQ 的事務(wù)消息來(lái)保證數(shù)據(jù)的最終一致性。

面試官:那你說(shuō)說(shuō)它是如何來(lái)保證數(shù)據(jù)的最終一致性的?

答:分兩部分來(lái)回答,第一部分先回答事務(wù)消息的實(shí)現(xiàn)流程,第二部分解釋為什么它能保證數(shù)據(jù)的最終一致性。

事務(wù)消息的實(shí)現(xiàn)流程

RocketMQ事務(wù)消息是怎么保證數(shù)據(jù)的一致性

事務(wù)消息

  1. 鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)

  2. 首先服務(wù) A 發(fā)送一個(gè)半事務(wù)消息(也稱 half 消息)至 MQ 中。為什么要先發(fā)送一個(gè) half 消息呢?這是為了保證服務(wù) A 和 MQ  之間的通信正常,如果無(wú)法正常通信,則服務(wù) A 可以直接返回一個(gè)異常,也就不用處理后面的邏輯的了。

  3. 如果 half 消息發(fā)送成功,MQ 收到這個(gè) half 消息后,會(huì)返回一個(gè) success 響應(yīng)給服務(wù) A。

  4. 服務(wù) A 接收到 MQ 返回的 success 響應(yīng)后,開始處理本地的業(yè)務(wù)邏輯,并提交本地事務(wù)。

  5. 如果服務(wù) A 本地事務(wù)提交成功,則會(huì)向 MQ 中發(fā)送 commit,表示將 half 消息提交,MQ 就會(huì)執(zhí)行第 5 步操作;如果服務(wù) A  本地事務(wù)提交失敗,則直接回滾本地事務(wù),并向 MQ 中發(fā)送 rollback,表示將之前的 half 消息進(jìn)行回滾,MQ 接收到 rollback 消息后,就會(huì)將  half 消息刪除。

  6. 如果 commit,則將 half 消息寫入到磁盤。

  7. 如果 MQ 長(zhǎng)時(shí)間沒有接收到 commit 或者 rollback 消息,例如:服務(wù) A 在處理本地業(yè)務(wù)時(shí)宕機(jī)了,或者發(fā)送的  commit、rollback 因?yàn)樵谌蹙W(wǎng)環(huán)境,數(shù)據(jù)丟失了。那么 MQ 就會(huì)在一定時(shí)間后嘗試調(diào)用服務(wù) A 提供的一個(gè)接口,通過(guò)這個(gè)接口來(lái)判斷 half  消息的狀態(tài)。所以服務(wù) A 提供的接口,需要實(shí)現(xiàn)的業(yè)務(wù)邏輯是:通過(guò)數(shù)據(jù)庫(kù)中對(duì)應(yīng)數(shù)據(jù)的狀態(tài)來(lái)判斷,之前的 half 消息對(duì)應(yīng)的業(yè)務(wù)是否執(zhí)行成功。如果 MQ  從這個(gè)接口中得知 half 消息執(zhí)行成功了,那么 MQ 就會(huì)將 half 消息持久化到本地磁盤,如果得知沒有執(zhí)行成功,那么就會(huì)將 half 消息刪除。

  8. 服務(wù) B 從 MQ 中消費(fèi)到對(duì)應(yīng)的消息。

  9. 服務(wù) B 處理本地業(yè)務(wù)邏輯,然后提交本地事務(wù)。

如何保證數(shù)據(jù)的最終一致性

實(shí)現(xiàn)流程說(shuō)完了,可能你現(xiàn)在有各種各樣的疑惑?

Q: half 消息是個(gè)啥?

A: 它和我們正常發(fā)送的普通消息是一樣的,都是存儲(chǔ)在 MQ 中,唯一不同的是 half 在 MQ 中不會(huì)立馬被消費(fèi)者消費(fèi)到,除非這個(gè) half 消息被  commit 了。(至于為什么未 commit 的 half 消息無(wú)法被消費(fèi)者讀取到,這是因?yàn)樵?MQ 內(nèi)部,對(duì)于事務(wù)消息而言,在 commit  之前,會(huì)先放在一個(gè)內(nèi)部隊(duì)列中,只有 commit 了,才會(huì)真正將消息放在消費(fèi)者能讀取到的 topic 隊(duì)列中)

Q: 為什么要先發(fā)送 half 消息?

A: 前面已經(jīng)解釋過(guò)了,主要是為了保證服務(wù) A 和 MQ 之間是否能正常通信,如果兩者之間都不能正常通信,后面還玩?zhèn)€錘子,直接返回異常就可以了。

Q: 如果 MQ 接收到了 half 消息,但是在返回 success 響應(yīng)的時(shí)候,因?yàn)榫W(wǎng)絡(luò)原因,導(dǎo)致服務(wù) A 沒有接收到 success  響應(yīng),這個(gè)時(shí)候是什么現(xiàn)象?

A: 當(dāng)服務(wù) A 發(fā)送 half 消息后,它會(huì)等待 MQ 給自己返回 success 響應(yīng),如果沒有接收到,那么服務(wù) A  也會(huì)直接結(jié)束,返回異常,不再執(zhí)行后續(xù)邏輯。不執(zhí)行后續(xù)邏輯,這樣服務(wù) A 也就不會(huì)提交 commit 消息給 MQ,MQ 長(zhǎng)時(shí)間沒接收到 commit  消息,那么它就會(huì)主動(dòng)回調(diào)服務(wù) A 的一個(gè)接口,服務(wù) A 通過(guò)接口,查詢本地?cái)?shù)據(jù)后,發(fā)現(xiàn)這條消息對(duì)應(yīng)的業(yè)務(wù)并沒有正常執(zhí)行,那么就告訴 MQ,這個(gè) half  消息不能 commit,需要 rollback,MQ 知道后,就將 half 消息進(jìn)行刪除。

Q: 如果服務(wù) A 本地事務(wù)執(zhí)行失敗了,怎么辦?

A: 服務(wù) A 本地事務(wù)執(zhí)行失敗后,先對(duì)自己本地事務(wù)進(jìn)行回滾,然后再向 MQ 發(fā)送 rollback 操作。

Q: 服務(wù) A 本地事務(wù)提交成功或失敗后,向 MQ 發(fā)送的 commit 或者 rollback 消息,因?yàn)榫W(wǎng)絡(luò)問題丟失了,又該怎么處理?

A: 和上一個(gè)問題一樣,MQ 長(zhǎng)時(shí)間沒有接收到 half 消息的 commit 或者 rollback 消息,MQ 會(huì)主動(dòng)回調(diào)服務(wù) A  的接口,通過(guò)這個(gè)接口來(lái)判斷自己該對(duì)這個(gè) half 消息如何處理。

Q: 前面說(shuō)的全是事務(wù)消息的實(shí)現(xiàn)流程,這和事務(wù)消息如何保證數(shù)據(jù)的最終一致性有什么關(guān)系呢?

A: 有關(guān)系。首先,服務(wù) A 執(zhí)行本地事務(wù)并提交和向 MQ 中發(fā)送消息這是兩個(gè)寫操作,然后通過(guò) RocketMQ  的事務(wù)消息,我們保證了這兩個(gè)寫操作要么都執(zhí)行成功,要么都執(zhí)行失敗。然后讓其他系統(tǒng),如服務(wù) B 通過(guò)消費(fèi) MQ  中的消息,然后再去執(zhí)行自己本地的事務(wù),這樣到最后,服務(wù) A 和服務(wù) B 這兩個(gè)系統(tǒng)的數(shù)據(jù)狀態(tài)是不是達(dá)到了一致?這就是最終一致性的含義。

如果要求服務(wù) A 和服務(wù) B 的數(shù)據(jù)狀態(tài),在服務(wù) A 返回給客戶端之間,這兩者就達(dá)到一致,這是強(qiáng)一致性,RocketMQ 是沒法保證強(qiáng)一致性的。

目前通過(guò)「可靠消息來(lái)保證數(shù)據(jù)的最終一致性」是很多大廠都采用的方案,基本都是通過(guò) MQ  和補(bǔ)償機(jī)制來(lái)保證數(shù)據(jù)的一致性。(所謂的可靠消息,就是消息不丟失,如何保證 MQ 的消息不丟失,下篇文章會(huì)寫,這也是面試常考題)

Q: 服務(wù) B 本地事務(wù)提交失敗了,怎么辦?

A: 如果服務(wù) B 本地事務(wù)提交失敗了,可以進(jìn)行多次重試,直到成功。如果重試多次后,還是提交失敗,例如此時(shí)服務(wù) B 對(duì)應(yīng)的 DB 宕機(jī)了,這個(gè)時(shí)候只要服務(wù)  B 不向 MQ 提交本次消息的 offset 即可。如果不提交 offset,那么 MQ 會(huì)在一定時(shí)間后,繼續(xù)將這條消息推送給服務(wù) B,服務(wù) B  就可以繼續(xù)執(zhí)行本地事務(wù)并提交了,直到成功。這樣,依舊是保證了服務(wù) A 和服務(wù) B 數(shù)據(jù)的最終一致性。

代碼實(shí)現(xiàn)

使用 RokcetMQ 的事務(wù)消息主要涉及到兩個(gè)部分:

如何發(fā)送半事務(wù)消息,這個(gè)可以通過(guò)「TransactionMQProducer」 類來(lái)實(shí)現(xiàn)。

TransactionMQProducer transactionMQProducer = new TransactionMQProducer("producerGroup"); TransactionSendResult result = transactionMQProducer.sendMessageInTransaction(msg, null); // 通過(guò)result來(lái)判斷half消息是否發(fā)送成功 if(result.getSendStatus() == SendStatus.SEND_OK){     // 成功 }else{     // 失敗 }

在前面我們提到了服務(wù) A 需要提供一個(gè)接口,用來(lái)供 MQ 回調(diào)服務(wù)  A,實(shí)際上這個(gè)接口就是一個(gè)監(jiān)聽器:「TransactionListener」的方法。這是一個(gè)接口,提供了兩個(gè)方法。

public interface TransactionListener {       // 當(dāng)half消息發(fā)送成功后,我們?cè)谶@里實(shí)現(xiàn)自己的業(yè)務(wù)邏輯,然后commit或者rollback 給MQ     LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);        // 這個(gè)方法就是供MQ回調(diào)的方法,MQ通過(guò)回調(diào)該方法來(lái)判斷half消息的狀態(tài)      // 可以看到,這個(gè)方法的參數(shù)是MessageExt,也就是half消息的內(nèi)容,如果根據(jù)MessageExt,我們完全能在服務(wù)A中判斷之前的業(yè)務(wù)是否處理成功     LocalTransactionState checkLocalTransaction(final MessageExt msg); }

實(shí)際使用時(shí),我們需要實(shí)現(xiàn)該接口,例如:

public class MyTransactionListener implements TransactionListener {      @Override     public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {         try{             // 處理業(yè)務(wù)邏輯             // ....              // 業(yè)務(wù)邏輯處理成功,commit             return LocalTransactionState.COMMIT_MESSAGE;         }catch (Exception e){          }         // 業(yè)務(wù)處理失敗,rollback         return LocalTransactionState.ROLLBACK_MESSAGE;     }      @Override     public LocalTransactionState checkLocalTransaction(MessageExt msg) {         return null;     } }

另外,在創(chuàng)建 producer 時(shí),指定我們實(shí)現(xiàn)實(shí)現(xiàn)的監(jiān)聽器

TransactionMQProducer transactionMQProducer = new TransactionMQProducer("producerGroup"); transactionMQProducer.setTransactionListener(new MyTransactionListener());

到此,關(guān)于“RocketMQ事務(wù)消息是怎么保證數(shù)據(jù)的一致性”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI