您好,登錄后才能下訂單哦!
小編給大家分享一下RocketMQ事務(wù)消息的示例分析,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
一、大事務(wù) = 小事務(wù) + 異步
我們以一個(gè)轉(zhuǎn)帳的場(chǎng)景為例來(lái)說(shuō)明這個(gè)問(wèn)題,Bob向Smith轉(zhuǎn)賬100塊。這個(gè)列子在瓜子也有很多實(shí)際場(chǎng)景映射,如:車源狀態(tài)變化,訂單狀態(tài)變化,金融放款,物流運(yùn)輸……
在單機(jī)環(huán)境下,執(zhí)行事務(wù)的情況,大概是下面這個(gè)樣子
當(dāng)用戶增長(zhǎng)到一定程度,Bob和Smith的賬戶及余額信息已經(jīng)不在同一臺(tái)服務(wù)器上了,那么上面的流程就變成了這樣
這時(shí)候你會(huì)發(fā)現(xiàn),同樣是一個(gè)轉(zhuǎn)賬的業(yè)務(wù),在集群環(huán)境下,耗時(shí)居然成倍的增長(zhǎng),這顯然是不能夠接受的。而且跨網(wǎng)絡(luò)調(diào)用的事務(wù)需要解決網(wǎng)絡(luò)不穩(wěn)定的因素,直接放到業(yè)務(wù)代碼里控制,成本很高。那如何來(lái)規(guī)避這個(gè)問(wèn)題?
大事務(wù) = 小事務(wù) + 異步
將大事務(wù)拆分成多個(gè)小事務(wù)異步執(zhí)行。這樣基本上能夠?qū)⒖鐧C(jī)事務(wù)的執(zhí)行效率優(yōu)化到與單機(jī)一致。轉(zhuǎn)賬的事務(wù)就可以分解成如下兩個(gè)小事務(wù)
圖中執(zhí)行本地事務(wù)(Bob賬戶扣款)和發(fā)送異步消息應(yīng)該保證同時(shí)成功或者同時(shí)失敗,也就是扣款成功了,發(fā)送消息一定要成功,如果扣款失敗了,就不能再發(fā)送消息。
二、什么是事務(wù)消息(Transactional message)
RocketMQ官方是這樣定義的。可以將其視為兩階段提交消息實(shí)現(xiàn),以確保分布式系統(tǒng)中的最終一致性。事務(wù)性消息確??梢栽臃绞綀?zhí)行本地事務(wù)的執(zhí)行和消息的發(fā)送。
1、事務(wù)狀態(tài)
事務(wù)消息有3種狀態(tài)
(1)TransactionStatus.CommitTransaction,提交事務(wù),表示允許消費(fèi)者消費(fèi)(使用)這條消息
(2)TransactionStatus.RollbackTransaction,回滾事務(wù),表示消息將被刪除,不允許使用
(3)TransactionStatus.Unknown,中間狀態(tài),表示需要MQ向消息發(fā)送方進(jìn)行檢查以確定狀態(tài)
2、如何發(fā)送事務(wù)消息
RocketMQ(4.5.1版本)已經(jīng)把事務(wù)消息的發(fā)送方式封裝得非常優(yōu)雅,只需要兩個(gè)大的環(huán)節(jié)就能夠完成,創(chuàng)建事務(wù)消息生產(chǎn)者和實(shí)現(xiàn)TransactionListener接口??匆幌鹿俜降睦哟a
(1)創(chuàng)建事務(wù)消息生產(chǎn)者
使用TransactionMqProducer類創(chuàng)建消息生產(chǎn)客戶端,并指定唯一的ProducerGroup
設(shè)置自定義線程池來(lái)處理檢查請(qǐng)求
執(zhí)行本地事務(wù)之后,需要根據(jù)執(zhí)行結(jié)果回復(fù)MQ,回復(fù)上一小節(jié)中描述的狀態(tài)
(2)實(shí)現(xiàn)TransactionListener接口
“executeLocalTransaction”方法用于在發(fā)送半條消息成功時(shí)執(zhí)行本地事務(wù)。它返回上一節(jié)中提到的三個(gè)事務(wù)狀態(tài)之一。
“check local transaction”方法用于檢查本地事務(wù)狀態(tài)并響應(yīng)MQ檢查請(qǐng)求。它還返回前一節(jié)中提到的三個(gè)事務(wù)狀態(tài)之一。
3、事務(wù)消息的執(zhí)行流程
代碼寫起來(lái)非常簡(jiǎn)單,以至于光看代碼,并不能知道事務(wù)消息具體的執(zhí)行過(guò)程。
RocketMQ 事務(wù)消息的設(shè)計(jì)流程借鑒了兩階段提交理論,整體交互流程如下圖所示
事務(wù)發(fā)起方(即消息發(fā)送者)首先發(fā)送 prepare 消息到 MQ。
事務(wù)發(fā)起方(即消息發(fā)送者)在發(fā)送 prepare 消息成功后執(zhí)行本地事務(wù)。
根據(jù)本地事務(wù)執(zhí)行結(jié)果發(fā)送 commit 或者是 rollback 給 MQ。
如果消息是 rollback,MQ 將刪除該 prepare 消息不進(jìn)行下發(fā)。
如果消息是 commit,MQ 將會(huì)把這個(gè)消息發(fā)送給 consumer 端。
如果執(zhí)行本地事務(wù)過(guò)程中,執(zhí)行端掛掉,或者超時(shí),導(dǎo)致 MQ 收不到任何的消息(不知道是該 commit 還是該 rollback),RocketMQ 會(huì)定期掃描消息集群中的事務(wù)消息,這時(shí)候發(fā)現(xiàn)了某個(gè) prepare 消息還不知道該怎么處理,它會(huì)向消息發(fā)送者確認(rèn),所以消息發(fā)送者需要實(shí)現(xiàn)一個(gè) check 接口,RocketMQ 會(huì)根據(jù)消息發(fā)送者設(shè)置的策略來(lái)決定是 rollback 還是繼續(xù) commit。這樣就保證了消息發(fā)送與本地事務(wù)同時(shí)成功或同時(shí)失敗。
Consumer 端的消費(fèi)成功機(jī)制由 MQ 保證。
4、事務(wù)消息的存儲(chǔ)模型
在具體實(shí)現(xiàn)上,RocketMQ 通過(guò)使用 Half Topic 以及 Operation Topic 兩個(gè)內(nèi)部隊(duì)列來(lái)存儲(chǔ)事務(wù)消息推進(jìn)狀態(tài),如下圖所示
其中,Half Topic 對(duì)應(yīng)隊(duì)列中存放著 prepare 消息,Operation Topic 對(duì)應(yīng)的隊(duì)列則存放了 prepare message 對(duì)應(yīng)的 commit/rollback 消息,消息體中則是 prepare message 對(duì)應(yīng)的 offset,服務(wù)端通過(guò)比對(duì)兩個(gè)隊(duì)列的差值來(lái)找到尚未提交的超時(shí)事務(wù),進(jìn)行回查。
從用戶側(cè)來(lái)說(shuō),用戶需要分別實(shí)現(xiàn)本地事務(wù)執(zhí)行以及本地事務(wù)回查方法,因此只需關(guān)注本地事務(wù)的執(zhí)行狀態(tài)即可;而在 service 層,則對(duì)事務(wù)消息的兩階段提交進(jìn)行了抽象,同時(shí)針對(duì)超時(shí)事務(wù)實(shí)現(xiàn)了回查邏輯,通過(guò)不斷掃描當(dāng)前事務(wù)推進(jìn)狀態(tài),來(lái)不斷反向請(qǐng)求 Producer 端獲取超時(shí)事務(wù)的執(zhí)行狀態(tài),在避免事務(wù)掛起的同時(shí),也避免了 Producer 端的單點(diǎn)故障。
而在存儲(chǔ)層,RocketMQ 通過(guò) Bridge 封裝了與底層隊(duì)列存儲(chǔ)的相關(guān)操作,用以操作兩個(gè)對(duì)應(yīng)的內(nèi)部隊(duì)列,用戶也可以依賴其他存儲(chǔ)介質(zhì)實(shí)現(xiàn)自己的 service,RocketMQ 會(huì)通過(guò) ServiceProvider 加載進(jìn)來(lái)。
三、Notify的異曲同工
Notify和MetaQ是阿里的兩個(gè)消息中間件。MetaQ是一個(gè)高性能的存儲(chǔ)隊(duì)列;Notify是淘寶自主研發(fā)的一套消息服務(wù)引擎。貼兩個(gè)圖就什么都明白了
整體方案跟RocketMQ是完全相同的,只是兩者的Storage不同。
四、瓜子該怎么做事務(wù)一致性這塊工作
針對(duì)這個(gè)典型場(chǎng)景,有很多解決方案
1、Kafka換成RocketMQ
不行。有太多的業(yè)務(wù)跑在了Kafka上,替換消息中間件的成本基本不能接受。
2、類似去哪兒qmq的方案
這個(gè)方案研發(fā)簡(jiǎn)單,但是侵入具體業(yè)務(wù)的數(shù)據(jù)庫(kù),而且增加了部署運(yùn)維的成本。
3、有人提出binlog+TCC的方案
沒(méi)有仔細(xì)研究,但是業(yè)務(wù)會(huì)經(jīng)常調(diào)整,想想負(fù)責(zé)配置數(shù)據(jù)庫(kù)日志的同學(xué)肯定會(huì)抓狂(DBA沒(méi)有那么了解業(yè)務(wù))。
4、為Kafka配一個(gè)類似Notify的消息引擎
這個(gè)方案有一定的可行性
(1)把Kafka定位為MetaQ,研制一個(gè)Notify,為prepare message提供單獨(dú)的存儲(chǔ)
(2)現(xiàn)在各業(yè)務(wù)系統(tǒng)所采用的Kafka客戶端已經(jīng)是瓜子定制化開發(fā)的,可以模仿RocketMQ的客戶端進(jìn)行改造。已有代碼的邏輯完全不受影響;需要事務(wù)一致性的功能,只需要換個(gè)接口,實(shí)現(xiàn)check邏輯即可,而原有消費(fèi)方毫無(wú)感覺(jué)。
(3)似乎有可能結(jié)合spring的@Transactional標(biāo)簽,在完全不改業(yè)務(wù)代碼(只升級(jí)自研Kafka客戶端)的情況下,也能緩解一些不一致問(wèn)題
以上是“RocketMQ事務(wù)消息的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。