溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

RocketMQ怎么保證消息的可靠性投遞

發(fā)布時間:2021-07-05 17:47:46 來源:億速云 閱讀:691 作者:chen 欄目:開發(fā)技術(shù)

這篇文章主要介紹“RocketMQ怎么保證消息的可靠性投遞”,在日常操作中,相信很多人在RocketMQ怎么保證消息的可靠性投遞問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”RocketMQ怎么保證消息的可靠性投遞”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

介紹

要想保證消息的可靠型投遞,無非保證如下3個階段的正常執(zhí)行即可。

  1. 生產(chǎn)者將消息成功投遞到broker

  2. broker將投遞過程的消息持久化下來

  3. 消費(fèi)者能從broker消費(fèi)到消息

RocketMQ怎么保證消息的可靠性投遞

發(fā)送端消息重試

producer向broker發(fā)送消息后,沒有收到broker的ack時,rocketmq會自動重試。重試的次數(shù)可以設(shè)置,默認(rèn)為2次

DefaultMQProducer producer = new DefaultMQProducer(RPODUCER_GROUP_NAME); // 同步發(fā)送設(shè)置重試次數(shù)為5次 producer.setRetryTimesWhenSendFailed(5); // 異步發(fā)送設(shè)置重試次數(shù)為5次 producer.setRetryTimesWhenSendAsyncFailed(5);

消息持久化

RocketMQ怎么保證消息的可靠性投遞

我們先來了解一下消息的存儲流程,這個知識對后面分析消費(fèi)端消息重試非常重要。

和消息相關(guān)的文件有如下幾種

  1. 鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)

  2. CommitLog:存儲消息的元數(shù)據(jù)

  3. ConsumerQueue:存儲消息在CommitLog的索引

  4. IndexFile:可以通過Message Key,時間區(qū)間快速查找到消息

RocketMQ怎么保證消息的可靠性投遞

整個消息的存儲流程如下

  1. 鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)

  2. Producer將消息順序?qū)懙紺ommitLog中

  3. 有一個線程根據(jù)消息的隊(duì)列信息,寫入到相關(guān)的ConsumerQueue中(minOffset為寫入的初始位置,consumerOffset為當(dāng)前消費(fèi)到的位置,maxOffset為ConsumerQueue最新寫入的位置)和IndexFile

  4. Consumer從ConsumerQueue的consumerOffset讀取到當(dāng)前應(yīng)該消費(fèi)的消息在CommitLog中的偏移量,到CommitLog中找到對應(yīng)的消息,消費(fèi)成功后移動consumerOffset

刷盤機(jī)制

RocketMQ怎么保證消息的可靠性投遞

「異步刷盤」:消息被寫入內(nèi)存的PAGECACHE,返回寫成功狀態(tài),當(dāng)內(nèi)存里的消息量積累到一定程度時,統(tǒng)一觸發(fā)寫磁盤操作,快速寫入  。吞吐量高,當(dāng)磁盤損壞時,會丟失消息

「同步刷盤」:消息寫入內(nèi)存的PAGECACHE后,立刻通知刷盤線程刷盤,然后等待刷盤完成,刷盤線程執(zhí)行完成后喚醒等待的線程,給應(yīng)用返回消息寫成功的狀態(tài)。吞吐量低,但不會造成消息丟失

主從復(fù)制

如果一個broker有master和slave時,就需要將master上的消息復(fù)制到slave上,復(fù)制的方式有兩種

  1. 鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)

  2. 「同步復(fù)制」:master和slave均寫成功,才返回客戶端成功。maste掛了以后可以保證數(shù)據(jù)不丟失,但是同步復(fù)制會增加數(shù)據(jù)寫入延遲,降低吞吐量

  3. 「異步復(fù)制」:master寫成功,返回客戶端成功。擁有較低的延遲和較高的吞吐量,但是當(dāng)master出現(xiàn)故障后,有可能造成數(shù)據(jù)丟失

消費(fèi)端消息重試

順序消息的重試

對于順序消息,當(dāng)消費(fèi)者消費(fèi)消息失敗后,消息隊(duì)列RocketMQ版會自動不斷地進(jìn)行消息重試(每次間隔時間為1秒),這時,應(yīng)用會出現(xiàn)消息消費(fèi)被阻塞的情況。所以一定要做好監(jiān)控,避免阻塞現(xiàn)象的發(fā)生

「順序消息消費(fèi)失敗后不會消費(fèi)下一條消息而是不斷重試這條消息,應(yīng)該是考慮到如果跨過這條消息消費(fèi)后面的消息會對業(yè)務(wù)邏輯產(chǎn)生影響」

「順序消息暫時僅支持集群消費(fèi)模式,不支持廣播消費(fèi)模式」

無序消息的重試

對于無序消息(普通、定時、延時、事務(wù)消息),當(dāng)消費(fèi)者消費(fèi)消息失敗時,您可以通過設(shè)置返回狀態(tài)達(dá)到消息重試的結(jié)果。

「無序消息的重試只針對集群消費(fèi)方式生效;廣播方式不提供失敗重試特性,即消費(fèi)失敗后,失敗消息不再重試,繼續(xù)消費(fèi)新的消息」

「消費(fèi)時候后,重試的配置方式有如下三種」

  1. 鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)

  2. 返回Action.ReconsumeLater(推薦)

  3. 返回Null

  4. 拋出異常

public class MessageListenerImpl implements MessageListener {      @Override     public Action consume(Message message, ConsumeContext context) {         //消息處理邏輯拋出異常,消息將重試。         doConsumeMessage(message);         //方式1:返回Action.ReconsumeLater,消息將重試。         return Action.ReconsumeLater;         //方式2:返回null,消息將重試。         return null;         //方式3:直接拋出異常,消息將重試。         throw new RuntimeException("Consumer Message exception");     } }

「消費(fèi)失敗后,無需重試的配置方式」

集群消費(fèi)方式下,消息失敗后期望消息不重試,需要捕獲消費(fèi)邏輯中可能拋出的異常,最終返回Action.CommitMessage,此后這條消息將不會再重試。

public class MessageListenerImpl implements MessageListener {      @Override     public Action consume(Message message, ConsumeContext context) {         try {             doConsumeMessage(message);         } catch (Throwable e) {             //捕獲消費(fèi)邏輯中的所有異常,并返回Action.CommitMessage;             return Action.CommitMessage;         }         //消息處理正常,直接返回Action.CommitMessage;         return Action.CommitMessage;     } }

「消息重試次數(shù)」

「RocketMQ默認(rèn)允許每條消息最多重試16次,每次消費(fèi)失敗發(fā)送一條延時消息到重試隊(duì)列,同一條消息失敗一次將延時等級提高一次,然后再放到重試隊(duì)列。重試16次后如果還沒有消費(fèi)成功,則將消息放到死信隊(duì)列中。」

「注意:重試隊(duì)列和死信隊(duì)列都是按照Consumer Group劃分的」

重試隊(duì)列topic名字:%RETRY% + consumerGroup

死信隊(duì)列topic名字:%DLQ% + consumerGroup

「為什么重試隊(duì)列和死信隊(duì)列要按照Consumer Group來進(jìn)行劃分?」

「因?yàn)樵赗ocketMQ的時候使用一定要保持訂閱關(guān)系一致。即一個Consumer  Group訂閱的topic和tag要完全一致,不然可能會導(dǎo)致消費(fèi)邏輯混亂,消息丟失」

如下任意一種情況都表現(xiàn)為訂閱關(guān)系不一致

  • 相同ConsumerGroup下的Consumer實(shí)例訂閱了不同的Topic。

  • 相同ConsumerGroup下的Consumer實(shí)例訂閱了相同的Topic,但訂閱的Tag不一致。

我們可以通過控制臺查看各種類型的主題

RocketMQ怎么保證消息的可靠性投遞

RocketMQ怎么保證消息的可靠性投遞

消息每次重試的間隔時間如下

第幾次重試 與上次重試的間隔時間 第幾次重試 與上次重試的間隔時間

第幾次重試與上次重試的間隔時間第幾次重試與上次重試的間隔時間
110 秒97 分鐘
230 秒108 分鐘
31 分鐘119 分鐘
42 分鐘1210 分鐘
53 分鐘1320 分鐘
64 分鐘1430 分鐘
75 分鐘151 小時
86 分鐘162 小時

「前面說到RocketMQ的消息重試是通過往重試隊(duì)列發(fā)送定時消息來實(shí)現(xiàn)的?!? RocketMQ支持18個級別的定時延時,每個級別定時消息的延時時間如下。

// MessageStoreConfig.java private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

消息重試只是把定時消息的前2個級別去掉,每次發(fā)送下一個級別的定時消息

我們可以設(shè)置消費(fèi)端消息重試次數(shù)

  1. 最大重試次數(shù)小于等于16次,則重試時間間隔同上表描述。

  2. 最大重試次數(shù)大于16次,超過16次的重試時間間隔均為每次2小時。

Properties properties = new Properties(); // 配置對應(yīng)Group ID的最大消息重試次數(shù)為20次,最大重試次數(shù)為字符串類型。 properties.put(PropertyKeyConst.MaxReconsumeTimes,"20"); Consumer consumer =ONSFactory.createConsumer(properties);

「那么重試隊(duì)列中的消息是如何被消費(fèi)的?」

消息消費(fèi)者在啟動的時候,會訂閱正常的topic和重試隊(duì)列的topic

RocketMQ怎么保證消息的可靠性投遞

定時消息的實(shí)現(xiàn)邏輯也比較簡單,可以歸納為如下幾步

1.發(fā)送延時消息

1.1  替換topic為SCHEDULE_TOPIC_XXXX,queueId為消息延遲等級(如果不替換topic直接發(fā)到對應(yīng)的consumeQueue中,則消息會被立馬消費(fèi))

1.2 將消息原來的topic,queueId放到消息擴(kuò)展屬性中

1.3 將消息應(yīng)該執(zhí)行的時間放到tagsCode中

將消息順序?qū)懙紺ommitLog中

將消息對應(yīng)的信息分發(fā)到對應(yīng)的ConsumerQueue中(topic為SCHEDULE_TOPIC_XXXX總共有18個queue,對應(yīng)18個延遲級別)

定時任務(wù)不斷判斷消息是否到達(dá)投遞時間,沒有到達(dá)則后續(xù)執(zhí)行投遞

如果到達(dá)投遞時間,則從commitLog中拉取消息的內(nèi)容,重新設(shè)置消息topic,queueId為原來的(原來的topic,queueId在消息擴(kuò)展屬性中),然后將消息投遞到commitLog中,此時消息就會被分發(fā)到對應(yīng)的隊(duì)列中,然后被消費(fèi)。

到此,關(guān)于“RocketMQ怎么保證消息的可靠性投遞”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI