您好,登錄后才能下訂單哦!
這篇文章主要為大家展示了“如何使用RocketMQ的事務(wù)消息來解決一致性問題”,內(nèi)容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“如何使用RocketMQ的事務(wù)消息來解決一致性問題”這篇文章吧。
在微服務(wù)架構(gòu)中,我們常常使用異步化的手段來提升系統(tǒng)的 吞吐量 和 解耦 上下游,而構(gòu)建異步架構(gòu)最常用的手段就是使用 消息隊列(MQ)
,那異步架構(gòu)怎樣才能實現(xiàn)數(shù)據(jù)一致性呢?
可以看到在 業(yè)務(wù)處理 方面來說 RocketMQ
優(yōu)于其他對手,而且原生支持 事務(wù)消息
PS:業(yè)務(wù)系統(tǒng)用的是其他 MQ
產(chǎn)品但是又需要 事務(wù)消息 怎么辦?學(xué)習(xí)原理自己開發(fā)實現(xiàn)!
例如下圖的場景:生成訂單記錄 -> MQ -> 增加積分
我們是應(yīng)該先 創(chuàng)建訂單記錄,還是先 發(fā)送MQ消息 呢?
先發(fā)送MQ消息:這個明顯是不行的,因為如果消息發(fā)送成功,而訂單創(chuàng)建失敗的話是沒辦法把消息收回來的
先創(chuàng)建訂單記錄:如果訂單創(chuàng)建成功后MQ消息發(fā)送失敗 拋出異常,因為兩個操作都在本地事務(wù)中所以訂單數(shù)據(jù)是可以 回滾 的
上面的 方式二 看似沒問題,但是 網(wǎng)絡(luò)是不可靠的!如果 MQ
的響應(yīng)因為網(wǎng)絡(luò)原因沒有收到,所以在面對不確定的結(jié)果只好進(jìn)行回滾;但是 MQ
端又確實是收到了這條消息的,只是回給客戶端的 響應(yīng)丟失 了!
所以 事務(wù)消息
就是用來保證 本地事務(wù) 與 MQ消息發(fā)送 的原子性!
主要的邏輯分為兩個流程:
事務(wù)消息發(fā)送及提交:
發(fā)送 half消息
MQ服務(wù)端
響應(yīng)消息寫入結(jié)果
根據(jù)發(fā)送結(jié)果執(zhí)行 本地事務(wù)
(如果寫入失敗,此時half消息對業(yè)務(wù) 不可見,本地邏輯不執(zhí)行)
根據(jù)本地事務(wù)狀態(tài)執(zhí)行 Commit
或者 Rollback
(Commit操作生成消息索引,消息對消費者 可見)
回查流程:
對于長時間沒有 Commit/Rollback
的事務(wù)消息(pending
狀態(tài)的消息),從服務(wù)端發(fā)起一次 回查
Producer
收到回查消息,檢查回查消息對應(yīng)的 本地事務(wù)狀態(tài)
根據(jù)本地事務(wù)狀態(tài),重新 Commit
或者 Rollback
邏輯時序圖
從上面的原理可以發(fā)現(xiàn) 事務(wù)消息
僅僅只是保證本地事務(wù)和MQ消息發(fā)送形成整體的 原子性
,而投遞到MQ服務(wù)器后,并無法保證消費者一定能消費成功!
如果 消費端消費失敗 后的處理方式,建議是記錄異常信息然后 人工處理,并不建議回滾上游服務(wù)的數(shù)據(jù)(因為兩者是 解耦 的,而且 復(fù)雜度 太高)
我們可以利用 MQ
的兩個特性 重試
和 死信隊列
來協(xié)助消費端處理:
消費失敗后進(jìn)行一定次數(shù)的 重試
重試后也失敗的話該消息丟進(jìn) 死信隊列
里
另外起一個線程監(jiān)聽消費 死信隊列
里的消息,記錄日志并且預(yù)警!
因為有 重試
所以消費者需要實現(xiàn)冪等性
下面就用剛剛提到的場景:生成訂單記錄 -> MQ -> 增加積分;來簡單講一下 Spring Cloud
中應(yīng)該怎么做,詳細(xì)代碼請 下載demo 查看。
PS:怎樣安裝部署RocketMQ可以參考《Apache RocketMQ 消息隊列部署與可視化界面安裝》
使用 spring-cloud-stream
框架來訪問 RocketMQ
Spring Cloud Stream 是一個構(gòu)建消息驅(qū)動的框架,通過抽象的定義實現(xiàn)應(yīng)用與MQ消息隊列之間的解耦,目前支持
RabbitMQ
、kafka
和RocketMQ
消息生產(chǎn)者需要添加 transactional: true
開啟 事務(wù)消息
因為開啟了
事務(wù)消息
所以這里發(fā)送的是half消息
對于消費端是不可見
的
使用 @RocketMQTransactionListener
注解監(jiān)聽 半消息,并實現(xiàn) RocketMQLocalTransactionListener
接口,該接口有兩個方法
executeLocalTransaction:用于提交本地事務(wù)
checkLocalTransaction:用于事務(wù)回查
如果提交事務(wù)消息失敗,需等待約1分鐘左右 事務(wù)回查 方法才會被調(diào)用
注意:因為有 重試 這里如果是真實的業(yè)務(wù)需要自行實現(xiàn)
冪等性
監(jiān)聽并消費死信隊列中的消息,用于記錄錯誤日志,并且預(yù)警通知運維人員等
demo中提供了3個接口分別測試不同的場景:
事務(wù)成功
http://localhost:11002/success
流程如下:
訂單創(chuàng)建 成功
提交事務(wù)消息 成功
消費消息增加積分 成功
訂單創(chuàng)建成功但提交事務(wù)消息失敗
http://localhost:11002/produceError
流程如下:
訂單創(chuàng)建 成功
提交事務(wù)消息 失敗
事務(wù)回查(等待1分鐘左右) 成功
提交事務(wù)消息 成功
消費消息增加積分 成功
消費消息失敗
http://localhost:11002/consumeError
流程如下:
訂單創(chuàng)建 成功
提交事務(wù)消息 成功
消費消息增加積分 失敗
重試消費消息 失敗
進(jìn)入死信隊列 成功
消費死信隊列的消息 成功
記錄日志并發(fā)出預(yù)警 成功
以上是“如何使用RocketMQ的事務(wù)消息來解決一致性問題”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學(xué)習(xí)更多知識,歡迎關(guān)注億速云行業(yè)資訊頻道!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。