您好,登錄后才能下訂單哦!
這篇文章主要介紹“怎么正確使用RabbitMQ異步編程”,在日常操作中,相信很多人在怎么正確使用RabbitMQ異步編程問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”怎么正確使用RabbitMQ異步編程”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
1 適用場景
1.1 服務(wù)于主流程的分支流程
在注冊流程中,數(shù)據(jù)寫DB是主流程,但注冊后給用戶發(fā)優(yōu)惠券或歡迎短信是分支流程,時效性也不強。
1.2 用戶無需實時看到結(jié)果
比如外賣下單后的配貨、送貨流程完全可異步處理,每個階段處理完成后,再給用戶發(fā)推送或短信讓用戶知曉即可。
1.3 MQ
任務(wù)的緩沖的分發(fā),流量削峰、服務(wù)解耦和消息廣播。
當然了異步處理不僅僅是通過 MQ 來實現(xiàn),還有其他方式
比如開新線程執(zhí)行,返回 Future
還有各種異步框架,比如 Vertx,它是通過 callback 的方式實現(xiàn)
2 異步處理之坑
2.1 異步處理需做消息補償以閉環(huán)
RabbitMQ雖可將消息落地磁盤,即使MQ異常消息數(shù)據(jù)也不會丟失,但異步流程在消息發(fā)送、傳輸、處理等環(huán)節(jié),都可能發(fā)生消息丟失。所有MQ都無法確保百分百可用,業(yè)務(wù)設(shè)計都需考慮不可用時異步流程將如何繼續(xù)。
因此,對于異步處理流程,必須考慮補償或建立主備雙活流程。
2.1.1 案例
用戶注冊后異步發(fā)送歡迎消息。
用戶注冊落DB為同步流程
會員服務(wù)收到消息后發(fā)送歡迎消息為異步流程
藍線
MQ異步處理(主線),消息可能丟失(虛線代表異步調(diào)用)
綠線
補償Job定期消息補償(備線),以補償主線丟失的消息
考慮極端的MQ中間件失效場景
要求備線的處理吞吐能力達到主線性能
代碼示例
UserController 注冊+發(fā)送異步消息。注冊方法,一次性注冊10個用戶,用戶注冊消息不能發(fā)送出去的概率為50%。
MemberService 會員服務(wù)監(jiān)聽用戶注冊成功的消息,并發(fā)送歡迎短信。使用ConcurrentHashMap存放那些發(fā)過短信的用戶ID實現(xiàn)冪等,避免相同的用戶補償時重復發(fā)短信
對于MQ消費程序,處理邏輯須考慮去重(支持冪等):
MQ消息可能會因中間件本身配置錯誤、穩(wěn)定性等原因出現(xiàn)重復
自動補償重復
比如本例,同一消息可能既走MQ也走補償,肯定會出現(xiàn)重復,而且考慮到高內(nèi)聚,補償Job本身不會做去重
人工補償重復
出現(xiàn)消息堆積時,異步處理流程必然延遲。若提供補償功能,則在處理遇到延遲時,很可能會先人工補償,過段時間后處理程序又收到消息了,重復處理。
有次MQ故障,MQ中堆積了幾十萬條發(fā)放資金消息,導致業(yè)務(wù)無法及時處理,運營以為程序出錯,就先通過后臺進行人工處理,結(jié)果MQ系統(tǒng)恢復后消息又被重復處理一次,造成大量資金重復發(fā)放。
異步處理須考慮消息重復可能性,因此處理邏輯須實現(xiàn)冪等,防止重復處理。
接著定義補償Job即備線操作。
定時任務(wù),5秒做一次補償,因Job并不知道哪些用戶注冊的消息可能丟失,所以是全量補償。
補償邏輯
每5秒補償一次,按順序一次補償5個用戶,下一次補償操作從上一次補償?shù)淖詈笠粋€用戶ID開始
補償任務(wù)提交到線程池以“異步”處理,提高處理能力
為實現(xiàn)高內(nèi)聚,主線和備線處理消息,最好使用同一方法。本案例的MemberService監(jiān)聽到MQ消息和CompensationJob補償,調(diào)用的都是welcome。
這里的補償邏輯簡單僅為 demo,實際生產(chǎn)代碼須:
考慮配置補償?shù)念l次、每次處理數(shù)量,以及補償線程池大小等參數(shù)為合適值,以滿足補償?shù)耐掏铝?/p>
考慮備線補償數(shù)據(jù)進行適當延遲
比如,對注冊時間在30s前的用戶再進行補償,以方便和主線MQ實時流程錯開,避免沖突
諸如當前補償?shù)侥膫€用戶的offset數(shù)據(jù),需要落地DB
補償Job本身須高可用,可使用類似xxl-job或ElasticJob等任務(wù)系統(tǒng)。
運行程序,執(zhí)行注冊方法注冊10個用戶,查看日志
可見
共10個用戶,MQ發(fā)送成功的用戶有四個:1、5、7、8
補償任務(wù)第一次運行,補償了用戶2、3、4,第二次運行補償了用戶6、9,第三次運行補充了用戶10
消息補償閉環(huán)的最高標準
能夠達到補償全量數(shù)據(jù)的吞吐量。即若補償備線足夠完善,即使直接停機MQ,雖會稍微影響處理及時性,但至少確保流程都能正常執(zhí)行。
小結(jié)
實際開發(fā)要考慮異步流程丟消息或處理中斷場景。
異步流程需有備線以補償,比如這里的全量補償方式,即便異步流程徹底失效,通過補償也能讓業(yè)務(wù)繼續(xù)進行。
2.2 RabbitMQ廣播、工作隊列模式坑
消息模式是廣播 Or 工作隊列
消息廣播
同一消息,不同消費者都能分別消費
隊列模式
不同消費者共享消費同一個隊列的數(shù)據(jù),相同消息只能被某一個消費者消費一次。
比如同一用戶的注冊消息
會員服務(wù)需監(jiān)聽以發(fā)送歡迎短信
營銷服務(wù)需監(jiān)聽以發(fā)送新用戶小禮物
但會員、營銷服務(wù)都可能有多實例,業(yè)務(wù)需求同一用戶的消息,可同時廣播給不同的服務(wù)(廣播模式),但對同一服務(wù)的不同實例(比如會員服務(wù)1和會員服務(wù)2),不管哪個實例來處理,處理一次即可(工作隊列模式):
實現(xiàn)代碼時務(wù)必確認MQ系統(tǒng)的機制,確保消息的路由按期望。
RocketMQ實現(xiàn)類似功能比較簡單直白:若消費者屬于一個組,那么消息只會由同組的一個消費者消費;若消費者屬不同組,每個組都能消費一遍消息。
而RabbitMQ的消息路由模式采用隊列+交換器,隊列是消息載體,交換器決定消息路由到隊列的方式。
step1:會員服務(wù)-監(jiān)聽用戶服務(wù)發(fā)出的新用戶注冊消息
若啟動倆會員服務(wù),那么同一用戶的注冊消息應(yīng)只能被其中一個實例消費。
分別實現(xiàn)RabbitMQ隊列、交換器、綁定三件套。
隊列使用匿名隊列
交換器使用DirectExchange,交換器綁定到匿名隊列的路由Key是空字符串
收到消息之后,打印所在實例使用的端口。
消息發(fā)布者、消費者、以及MQ的配置
使用12345和45678兩個端口啟動倆程序?qū)嵗?,發(fā)條消息,輸出的日志,顯示同一會員服務(wù)兩個實例都收到了消息:
所以問題在于不明
RabbitMQ直接交換器和隊列的綁定關(guān)系
RabbitMQ的直接交換器根據(jù)routingKey路由消息。而程序每次啟動都會創(chuàng)建匿名(隨機命名)隊列,所以每個會員服務(wù)實例都對應(yīng)獨立的隊列,以空routingKey綁定到直接交換器。
用戶服務(wù)發(fā)消息時也設(shè)置了空routingKey,所以直接交換器收到消息后,發(fā)現(xiàn)匹配倆隊列,于是都轉(zhuǎn)發(fā)消息
修復
對會員服務(wù)不要使用匿名隊列,而使用同一隊列。
將上面代碼中的匿名隊列換做普通隊列:
private static final String QUEUE = "newuserQueue";@Beanpublic Queue queue() { return new Queue(QUEUE);}
這樣對同一消息,倆實例中只有一個實例可收到,不同消息被輪詢發(fā)給不同實例。
現(xiàn)在的交換器和隊列關(guān)系
step2:用戶服務(wù)-廣播消息給會員、營銷服務(wù)
期望會員、營銷服務(wù)都能收到廣播消息,但會員/營銷服務(wù)中的每個實例只需收到一次消息。
聲明一個隊列和一個FanoutExchange,然后模擬倆用戶服務(wù)和倆營銷服務(wù):
注冊四個用戶。日志發(fā)現(xiàn)一條用戶注冊的消息,要么被會員服務(wù)收到,要么被營銷服務(wù)收到,這不是廣播。可使用的明明是FanoutExchange,為什么沒起效呢?
因為廣播交換器會忽略routingKey,廣播消息到所有綁定的隊列。該案例的倆會員服務(wù)和兩個營銷服務(wù)都綁定了同一隊列,所以四服務(wù)只能收到一次消息:
修復
拆分隊列,會員和營銷兩組服務(wù)分別使用一條獨立隊列綁定到廣播交換器
現(xiàn)在的交換器和隊列結(jié)構(gòu)
從日志輸出可以驗證,對每條MQ消息,會員服務(wù)和營銷服務(wù)分別都會收到一次,一條消息廣播到兩個服務(wù)同時,在每一個服務(wù)的兩個實例中通過輪詢接收:
異步的消息路由模式一旦配置出錯,輕則可能導致消息重復處理,重則可能導致重要的服務(wù)無法接收到消息,最終造成業(yè)務(wù)邏輯錯誤。
小結(jié)
微服務(wù)場景下不同服務(wù)多個實例監(jiān)聽消息的情況,一般不同服務(wù)需要同時收到相同的消息,而相同服務(wù)的多個實例只需要輪詢接收消息。我們需要確認MQ的消息路由配置是否滿足需求,以避免消息重復或漏發(fā)問題。
2.3 死信堵塞MQ之坑
始終無法處理的死信消息,可能會引發(fā)堵塞MQ。
若線程池的任務(wù)隊列無上限,最終可能導致OOM,類似的MQ也要注意任務(wù)堆積問題。對于突發(fā)流量引起的MQ堆積,問題并不大,適當調(diào)整消費者的消費能力應(yīng)該就可以解決。但在很多時候,消息隊列的堆積堵塞,是因為有大量始終無法處理的消息。
2.3.1 案例
用戶服務(wù)在用戶注冊后發(fā)出一條消息,會員服務(wù)監(jiān)聽到消息后給用戶派發(fā)優(yōu)惠券,但因用戶并沒有保存成功,會員服務(wù)處理消息始終失敗,消息重新進入隊列,然后還是處理失敗。這種在MQ中回蕩的同一條消息,就是死信。
隨著MQ被越來越多的死信填滿,消費者需花費大量時間反復處理死信,導致正常消息的消費受阻,最終MQ可能因數(shù)據(jù)量過大而崩潰。
定義一個隊列、一個直接交換器,然后把隊列綁定到交換器
sendMessage發(fā)送消息到MQ,訪問一次提交一條消息,使用自增標識作為消息內(nèi)容
收到消息后,直接NPE,模擬處理出錯
調(diào)用sendMessage接口發(fā)送兩條消息,然后來到RabbitMQ管理臺,可以看到這兩條消息始終在隊列,不斷被重新投遞,導致重新投遞QPS達到1063。
在日志中也可看到大量異常信息。
修復方案
解決死信無限重復進入隊列最簡單方案
程序處理出錯時,直接拋AmqpRejectAndDontRequeueException,避免消息重新進入隊列
throw new AmqpRejectAndDontRequeueException("error");
但更希望對同一消息,能夠先進行幾次重試,解決因為網(wǎng)絡(luò)問題導致的偶發(fā)消息處理失敗,若依舊失敗,再把消息投遞到專門設(shè)置的DLX。對于來自DLX的數(shù)據(jù),可能只是記錄日志發(fā)送報警,即使出現(xiàn)異常也不會再重復投遞。
邏輯如下
針對該問題,我們來看
Spring AMQP的簡便解決方案
鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)
定義死信交換器、死信隊列。其實都是普通交換器和隊列,只不過專門用于處理死信消息
通過RetryInterceptorBuilder構(gòu)建一個RetryOperationsInterceptor以處理失敗時候的重試。策略是最多嘗試5次(重試4次);并且采取指數(shù)退避重試,首次重試延遲1秒,第二次2秒,以此類推,最大延遲是10秒;如果第4次重試還是失敗,則使用RepublishMessageRecoverer把消息重新投入一個DLX
定義死信隊列的處理程序。本案例只記錄日志
代碼
執(zhí)行程序,發(fā)送兩條消息,查看日志:
msg2的4次重試間隔分別是1秒、2秒、4秒、8秒,再加上首次的失敗,所以最大嘗試次數(shù)是5
4次重試后,RepublishMessageRecoverer把消息發(fā)往DLX
死信處理程序輸出了got dead message msg2。
雖然幾乎同時發(fā)倆消息,但msg2在msg1四次重試全部結(jié)束后才開始處理,因為默認SimpleMessageListenerContainer只有一個消費線程??赏ㄟ^增加消費線程避免性能問題:
直接設(shè)置concurrentConsumers參數(shù)為10,來增加到10個工作線程
也可設(shè)置maxConcurrentConsumers參數(shù),讓SimpleMessageListenerContainer動態(tài)調(diào)整消費者線程數(shù)。
到此,關(guān)于“怎么正確使用RabbitMQ異步編程”的學習就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。