溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

你需要知道的RoketMQ

發(fā)布時(shí)間:2020-07-21 12:29:26 來源:網(wǎng)絡(luò) 閱讀:479 作者:Java_老男孩 欄目:編程語言

1.概述

本篇文章會(huì)盡力全面的介紹RocketMQ和Kafka各個(gè)關(guān)鍵點(diǎn)的比較,希望大家讀完能有所收獲。

RocketMQ前身叫做MetaQ, 在MeataQ發(fā)布3.0版本的時(shí)候改名為RocketMQ,其本質(zhì)上的設(shè)計(jì)思路和Kafka類似,但是和Kafka不同的是其使用Java進(jìn)行開發(fā),由于在國內(nèi)的Java受眾群體遠(yuǎn)遠(yuǎn)多于Scala,所以RocketMQ是很多以Java語言為主的公司的首選。同樣的RocketMQ和Kafka都是Apache基金會(huì)中的頂級(jí)項(xiàng)目,他們社區(qū)的活躍度都非常高,項(xiàng)目更新迭代也非常快。

2.入門實(shí)例

2.1 生產(chǎn)者

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.start();

        for (int i = 0; i < 128; i++)
            try {
                {
                    Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }

        producer.shutdown();
    }
}

直接定義好一個(gè)producer,創(chuàng)建好Message,調(diào)用send方法即可。

2.2 消費(fèi)者

public class PushConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
        consumer.subscribe("TopicTest", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //wrong time format 2017_0422_221800
        consumer.setConsumeTimestamp("20181109221800");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

3.RocketMQ架構(gòu)原理

對(duì)于RocketMQ先拋出幾個(gè)問題:

  • RocketMQ的topic和隊(duì)列是什么樣的,和Kafka的分區(qū)有什么不同?

  • RocketMQ網(wǎng)絡(luò)模型是什么樣的,和Kafka對(duì)比如何?

  • RocketMQ消息存儲(chǔ)模型是什么樣的,如何保證高可靠的存儲(chǔ),和Kafka對(duì)比如何?

3.1 RocketMQ架構(gòu)圖

你需要知道的RoketMQ

對(duì)于RocketMQ的架構(gòu)圖,在大體上來看和Kafka并沒有太多的差別,但是在很多細(xì)節(jié)上是有很多差別的,接下來會(huì)一一進(jìn)行講述。

3.2 RocketMQ名詞解釋

在3.1的架構(gòu)中我們有多個(gè)Producer,多個(gè)主Broker,多個(gè)從Broker,每個(gè)Producer可以對(duì)應(yīng)多個(gè)Topic,每個(gè)Consumer也可以消費(fèi)多個(gè)Topic。

Broker信息會(huì)上報(bào)至NameServer,Consumer會(huì)從NameServer中拉取Broker和Topic的信息。

  • Producer:消息生產(chǎn)者,向Broker發(fā)送消息的客戶端

  • Consumer:消息消費(fèi)者,從Broker讀取消息的客戶端

  • Broker:消息中間的處理節(jié)點(diǎn),這里和kafka不同,kafka的Broker沒有主從的概念,都可以寫入請求以及備份其他節(jié)點(diǎn)數(shù)據(jù),RocketMQ只有主Broker節(jié)點(diǎn)才能寫,一般也通過主節(jié)點(diǎn)讀,當(dāng)主節(jié)點(diǎn)有故障或者一些其他特殊情況才會(huì)使用從節(jié)點(diǎn)讀,有點(diǎn)類似- 于mysql的主從架構(gòu)。

  • Topic:消息主題,一級(jí)消息類型,生產(chǎn)者向其發(fā)送消息, 消費(fèi)者讀取其消息。

  • Group:分為ProducerGroup,ConsumerGroup,代表某一類的生產(chǎn)者和消費(fèi)者,一般來說同一個(gè)服務(wù)可以作為Group,同一個(gè)Group一般來說發(fā)送和消費(fèi)的消息都是一樣的。

  • Tag:Kafka中沒有這個(gè)概念,Tag是屬于二級(jí)消息類型,一般來說業(yè)務(wù)有關(guān)聯(lián)的可以使用同一個(gè)Tag,比如訂單消息隊(duì)列,使用Topic_Order,Tag可以分為Tag_食品訂單,Tag_服裝訂單等等。

  • Queue: 在kafka中叫Partition,每個(gè)Queue內(nèi)部是有序的,在RocketMQ中分為讀和寫兩種隊(duì)列,一般來說讀寫隊(duì)列數(shù)量一致,如果不一致就會(huì)出現(xiàn)很多問題。

  • NameServer:Kafka中使用的是ZooKeeper保存Broker的地址信息,以及Broker的Leader的選舉,在RocketMQ中并沒有采用選舉Broker的策略,所以采用了無狀態(tài)的NameServer太存儲(chǔ),由于NameServer是無狀態(tài)的,集群節(jié)點(diǎn)之間并不會(huì)通信,所以上傳數(shù)據(jù)的時(shí)候都需要向所有節(jié)點(diǎn)進(jìn)行發(fā)送。

很多朋友都在問什么是無狀態(tài)呢?狀態(tài)的有無實(shí)際上就是數(shù)據(jù)是否會(huì)做存儲(chǔ),有狀態(tài)的話數(shù)據(jù)會(huì)被持久化,無狀態(tài)的服務(wù)可以理解就是一個(gè)內(nèi)存服務(wù),NameServer本身也是一個(gè)內(nèi)存服務(wù),所有數(shù)據(jù)都存儲(chǔ)在內(nèi)存中,重啟之后都會(huì)丟失。

3.3 Topic和Queue

在RocketMQ中的每一條消息,都有一個(gè)Topic,用來區(qū)分不同的消息。一個(gè)主題一般會(huì)有多個(gè)消息的訂閱者,當(dāng)生產(chǎn)者發(fā)布消息到某個(gè)主題時(shí),訂閱了這個(gè)主題的消費(fèi)者都可以接收到生產(chǎn)者寫入的新消息。

在Topic中有分為了多個(gè)Queue,這其實(shí)是我們發(fā)送/讀取消息通道的最小單位,我們發(fā)送消息都需要指定某個(gè)寫入某個(gè)Queue,拉取消息的時(shí)候也需要指定拉取某個(gè)Queue,所以我們的順序消息可以基于我們的Queue維度保持隊(duì)列有序,如果想做到全局有序那么需要將Queue大小設(shè)置為1,這樣所有的數(shù)據(jù)都會(huì)在Queue中有序。

你需要知道的RoketMQ

在上圖中我們的Producer會(huì)通過一些策略進(jìn)行Queue的選擇:

  • 非順序消息:非順序消息一般直接采用輪訓(xùn)發(fā)送的方式進(jìn)行發(fā)送。

  • 順序消息:根據(jù)某個(gè)Key比如我們常見的訂單Id,用戶Id,進(jìn)行Hash,將同一類數(shù)據(jù)放在同一個(gè)隊(duì)列中,保證我們的順序性。

我們同一組Consumer也會(huì)根據(jù)一些策略來選Queue,常見的比如平均分配或者一致性Hash分配。

要注意的是當(dāng)Consumer出現(xiàn)下線或者上線的時(shí)候,這里需要做重平衡,也就是Rebalance,RocketMQ的重平衡機(jī)制如下:

  • 定時(shí)拉取broker,topic的最新信息

  • 每隔20s做重平衡

  • 隨機(jī)選取當(dāng)前Topic的一個(gè)主Broker,這里要注意的是不是每次重平衡所有主Broker都會(huì)被選中,因?yàn)闀?huì)存在一個(gè)Broker再多個(gè)Broker的情況。

  • 獲取當(dāng)前Broker,當(dāng)前ConsumerGroup的所有機(jī)器ID。

  • 然后進(jìn)行策略分配。

由于重平衡是定時(shí)做的,所以這里有可能會(huì)出現(xiàn)某個(gè)Queue同時(shí)被兩個(gè)Consumer消費(fèi),所以會(huì)出現(xiàn)消息重復(fù)投遞。

Kafka的重平衡機(jī)制和RocketMQ不同,Kafka的重平衡是通過Consumer和Coordinator聯(lián)系來完成的,當(dāng)Coordinator感知到消費(fèi)組的變化,會(huì)在心跳過程中發(fā)送重平衡的信號(hào),然后由一個(gè)ConsumerLeader進(jìn)行重平衡選擇,然后再由Coordinator將結(jié)果通知給所有的消費(fèi)者。

3.3.1 Queue讀寫數(shù)量不一致

在RocketMQ中Queue被分為讀和寫兩種,在最開始接觸RocketMQ的時(shí)候一直以為讀寫隊(duì)列數(shù)量配置不一致不會(huì)出現(xiàn)什么問題的,比如當(dāng)消費(fèi)者機(jī)器很多的時(shí)候我們配置很多讀的隊(duì)列,但是實(shí)際過程中發(fā)現(xiàn)會(huì)出現(xiàn)消息無法消費(fèi)和根本沒有消息消費(fèi)的情況。

  • 當(dāng)寫的隊(duì)列數(shù)量大于讀的隊(duì)列的數(shù)量,當(dāng)大于讀隊(duì)列這部分ID的寫隊(duì)列的數(shù)據(jù)會(huì)無法消費(fèi),因?yàn)椴粫?huì)將其分配給消費(fèi)者。

  • 當(dāng)讀的隊(duì)列數(shù)量大于寫的隊(duì)列數(shù)量,那么多的隊(duì)列數(shù)量就不會(huì)有消息被投遞進(jìn)來。

這個(gè)功能在RocketMQ在我看來明顯沒什么用,因?yàn)榛旧隙紩?huì)設(shè)置為讀寫隊(duì)列大小一樣,這個(gè)為啥不直接將其進(jìn)行統(tǒng)一,反而容易讓用戶配置不一樣出現(xiàn)錯(cuò)誤。

這個(gè)問題在RocketMQ的Issue里也沒有收到好的答案。

3.4 消費(fèi)模型

一般來說消息隊(duì)列的消費(fèi)模型分為兩種,基于推送的消息(push)模型和基于拉取(poll)的消息模型。

基于推送模型的消息系統(tǒng),由消息代理記錄消費(fèi)狀態(tài)。消息代理將消息推送到消費(fèi)者后,標(biāo)記這條消息為已經(jīng)被消費(fèi),但是這種方式無法很好地保證消費(fèi)的處理語義。比如當(dāng)我們把已經(jīng)把消息發(fā)送給消費(fèi)者之后,由于消費(fèi)進(jìn)程掛掉或者由于網(wǎng)絡(luò)原因沒有收到這條消息,如果我們在消費(fèi)代理將其標(biāo)記為已消費(fèi),這個(gè)消息就永久丟失了。如果我們利用生產(chǎn)者收到消息后回復(fù)這種方法,消息代理需要記錄消費(fèi)狀態(tài),這種不可取。

用過RocketMQ的同學(xué)肯定不禁會(huì)想到,在RocketMQ中不是提供了兩種消費(fèi)者嗎?

MQPullConsumer?和?MQPushConsumer?,其中?MQPushConsumer?不就是我們的推模型嗎?其實(shí)這兩種模型都是客戶端主動(dòng)去拉消息,其中的實(shí)現(xiàn)區(qū)別如下:

  • MQPullConsumer:每次拉取消息需要傳入拉取消息的offset和每次拉取多少消息量,具體拉取哪里的消息,拉取多少是由客戶端控制。

  • MQPushConsumer:同樣也是客戶端主動(dòng)拉取消息,但是消息進(jìn)度是由服務(wù)端保存,Consumer會(huì)定時(shí)上報(bào)自己消費(fèi)到哪里,所以Consumer下次消費(fèi)的時(shí)候是可以找到上次消費(fèi)的點(diǎn),一般來說使用PushConsumer我們不需要關(guān)心offset和拉取多少數(shù)據(jù),直接使用即可。

3.4.1 集群消費(fèi)和廣播消費(fèi)

消費(fèi)模式我們分為兩種,集群消費(fèi),廣播消費(fèi):

  • 集群消費(fèi): 同一個(gè)GroupId都屬于一個(gè)集群,一般來說一條消息只會(huì)被任意一個(gè)消費(fèi)者處理。

  • 廣播消費(fèi):廣播消費(fèi)的消息會(huì)被集群中所有消費(fèi)者進(jìn)行消息,但是要注意一下因?yàn)閺V播消費(fèi)的offset在服務(wù)端保存成本太高,所以客戶端每一次重啟都會(huì)從最新消息消費(fèi),而不是上次保存的offset。

3.5 網(wǎng)絡(luò)模型

在Kafka中使用的原生的socket實(shí)現(xiàn)網(wǎng)絡(luò)通信,而RocketMQ使用的是Netty網(wǎng)絡(luò)框架,現(xiàn)在越來越多的中間件都不會(huì)直接選擇原生的socket,而是使用的Netty框架,主要得益于下面幾個(gè)原因:

  • API使用簡單,不需要關(guān)心過多的網(wǎng)絡(luò)細(xì)節(jié),更專注于中間件邏輯。

  • 性能高。

  • 成熟穩(wěn)定,jdk nio的bug都被修復(fù)了。

選擇框架是一方面,而想要保證網(wǎng)絡(luò)通信的高效,網(wǎng)絡(luò)線程模型也是一方面,我們常見的有1+N(1個(gè)Acceptor線程,N個(gè)IO線程),1+N+M(1個(gè)acceptor線程,N個(gè)IO線程,M個(gè)worker線程)等模型,RocketMQ使用的是1+N1+N2+M的模型,如下圖所示:

你需要知道的RoketMQ

1個(gè)acceptor線程,N1個(gè)IO線程,N2個(gè)線程用來做Shake-hand,SSL驗(yàn)證,編解碼;M個(gè)線程用來做業(yè)務(wù)處理。這樣的好處將編解碼,和SSL驗(yàn)證等一些可能耗時(shí)的操作放在了一個(gè)單獨(dú)的線程池,不會(huì)占據(jù)我們業(yè)務(wù)線程和IO線程。

3.6 高可靠的分布式存儲(chǔ)模型

做為一個(gè)好的消息系統(tǒng),高性能的存儲(chǔ),高可用都不可少。

3.6.1 高性能日志存儲(chǔ)

RocketMQ和Kafka的存儲(chǔ)核心設(shè)計(jì)有很大的不同,所以其在寫入性能方面也有很大的差別,這是16年阿里中間件團(tuán)隊(duì)對(duì)RocketMQ和Kafka不同Topic下做的性能測試:

你需要知道的RoketMQ

從圖上可以看出:

  • Kafka在Topic數(shù)量由64增長到256時(shí),吞吐量下降了98.37%。

  • RocketMQ在Topic數(shù)量由64增長到256時(shí),吞吐量只下降了16%。

    這是為什么呢?kafka一個(gè)topic下面的所有消息都是以partition的方式分布式的存儲(chǔ)在多個(gè)節(jié)點(diǎn)上。同時(shí)在kafka的機(jī)器上,每個(gè)Partition其實(shí)都會(huì)對(duì)應(yīng)一個(gè)日志目錄,在目錄下面會(huì)對(duì)應(yīng)多個(gè)日志分段。所以如果Topic很多的時(shí)候Kafka雖然寫文件是順序?qū)?,但?shí)際上文件過多,會(huì)造成磁盤IO競爭非常激烈。

那RocketMQ為什么在多Topic的情況下,依然還能很好的保持較多的吞吐量呢?我們首先來看一下RocketMQ中比較關(guān)鍵的文件:

你需要知道的RoketMQ

這里有四個(gè)目錄(這里的解釋就直接用RocketMQ官方的了):

  • commitLog:消息主體以及元數(shù)據(jù)的存儲(chǔ)主體,存儲(chǔ)Producer端寫入的消息主體內(nèi)容,消息內(nèi)容不是定長的。單個(gè)文件大小默認(rèn)1G ,文件名長度為20位,左邊補(bǔ)零,剩余為起始偏移量,比如00000000000000000000代表了第一個(gè)文件,起始偏移量為0,文件大小為1G=1073741824;當(dāng)?shù)谝粋€(gè)文件寫滿了,第二個(gè)文件為00000000001073741824,起始偏移量為1073741824,以此類推。消息主要是順序?qū)懭肴罩疚募?,?dāng)文件滿了,寫入下一個(gè)文件;

  • config:保存一些配置信息,包括一些Group,Topic以及Consumer消費(fèi)offset等信息。

  • consumeQueue:消息消費(fèi)隊(duì)列,引入的目的主要是提高消息消費(fèi)的性能,由于RocketMQ是基于主題topic的訂閱模式,消息消費(fèi)是針對(duì)主題進(jìn)行的,如果要遍歷commitlog文件中根據(jù)topic檢索消息是非常低效的。Consumer即可根據(jù)ConsumeQueue來查找待消費(fèi)的消息。其中,ConsumeQueue(邏輯消費(fèi)隊(duì)列)作為消費(fèi)消息的索引,保存了指定Topic下的隊(duì)列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夾的組織方式如下:topic/queue/file三層組織結(jié)構(gòu),具體存儲(chǔ)路徑為:HOME \store\index\${fileName},文件名fileName是以創(chuàng)建時(shí)的時(shí)間戳命名的,固定的單個(gè)IndexFile文件大小約為400M,一個(gè)IndexFile可以保存 2000W個(gè)索引,IndexFile的底層存儲(chǔ)設(shè)計(jì)為在文件系統(tǒng)中實(shí)現(xiàn)HashMap結(jié)構(gòu),故rocketmq的索引文件其底層實(shí)現(xiàn)為hash索引。

我們發(fā)現(xiàn)我們的消息主體數(shù)據(jù)并沒有像Kafka一樣寫入多個(gè)文件,而是寫入一個(gè)文件,這樣我們的寫入IO競爭就非常小,可以在很多Topic的時(shí)候依然保持很高的吞吐量。有同學(xué)說這里的ConsumeQueue寫是在不停的寫入呢,并且ConsumeQueue是以Queue維度來創(chuàng)建文件,那么文件數(shù)量依然很多,在這里ConsumeQueue的寫入的數(shù)據(jù)量很小,每條消息只有20個(gè)字節(jié),30W條數(shù)據(jù)也才6M左右,所以其實(shí)對(duì)我們的影響相對(duì)Kafka的Topic之間影響是要小很多的。我們整個(gè)的邏輯可以如下:

你需要知道的RoketMQ

Producer不斷的再往CommitLog添加新的消息,有一個(gè)定時(shí)任務(wù)ReputService會(huì)不斷的掃描新添加進(jìn)來的CommitLog,然后不斷的去構(gòu)建ConsumerQueue和Index。

注意:這里指的都是普通的硬盤,在SSD上面多個(gè)文件并發(fā)寫入和單個(gè)文件寫入影響不大。

讀取消息

Kafka中每個(gè)Partition都會(huì)是一個(gè)單獨(dú)的文件,所以當(dāng)消費(fèi)某個(gè)消息的時(shí)候,會(huì)很好的出現(xiàn)順序讀,我們知道OS從物理磁盤上訪問讀取文件的同時(shí),會(huì)順序?qū)ζ渌噜弶K的數(shù)據(jù)文件進(jìn)行預(yù)讀取,將數(shù)據(jù)放入PageCache,所以Kafka的讀取消息性能比較好。

RocketMQ讀取流程如下:

  • 先讀取ConsumerQueue中的offset對(duì)應(yīng)CommitLog物理的offset

  • 根據(jù)offset讀取CommitLog

ConsumerQueue也是每個(gè)Queue一個(gè)單獨(dú)的文件,并且其文件體積小,所以很容易利用PageCache提高性能。而CommitLog,由于同一個(gè)Queue的連續(xù)消息在CommitLog其實(shí)是不連續(xù)的,所以會(huì)造成隨機(jī)讀,RocketMQ對(duì)此做了幾個(gè)優(yōu)化:

  • Mmap映射讀取,Mmap的方式減少了傳統(tǒng)IO將磁盤文件數(shù)據(jù)在操作系統(tǒng)內(nèi)核地址空間的緩沖區(qū)和用戶應(yīng)用程序地址空間的緩沖區(qū)之間來回進(jìn)行拷貝的性能開銷

  • 使用DeadLine調(diào)度算法+SSD存儲(chǔ)盤

  • 由于Mmap映射受到內(nèi)存限制,當(dāng)不在Mmmap映射這部分?jǐn)?shù)據(jù)的時(shí)候(也就是消息堆積過多),默認(rèn)是內(nèi)存的40%,會(huì)將請求發(fā)送到SLAVE,減緩Master的壓力

3.6.2 可用性

3.6.2.1 集群模式

我們首先需要選擇一種集群模式,來適應(yīng)我們可忍耐的可用程度,一般來說分為三種:

  • 單Master:這種模式,可用性最低,但是成本也是最低,一旦宕機(jī),所有都不可用。這種一般只適用于本地測試。

  • 單Master多SLAVE:這種模式,可用性一般,如果主宕機(jī),那么所有寫入都不可用,讀取依然可用,如果master磁盤損壞,可以依賴slave的數(shù)據(jù)。

  • 多Master:這種模式,可用性一般,如果出現(xiàn)部分master宕機(jī),那么這部分master上的消息都不可消費(fèi),也不可寫數(shù)據(jù),如果一個(gè)Topic的隊(duì)列在多個(gè)Master上都有,那么可以保證沒有宕機(jī)的那部分可以正常消費(fèi),寫入。如果master的磁盤損壞會(huì)導(dǎo)致消息丟失。

  • 多Master多Slave:這種模式,可用性最高,但是維護(hù)成本也最高,當(dāng)master宕機(jī)了之后,只會(huì)出現(xiàn)在這部分master上的隊(duì)列不可寫入,但是讀取依然是可以的,并且如果master磁盤損壞,可以依賴slave的數(shù)據(jù)。

一般來說投入生產(chǎn)環(huán)境的話都會(huì)選擇第四種,來保證最高的可用性。

3.6.2.2 消息的可用性

當(dāng)我們選擇好了集群模式之后,那么我們需要關(guān)心的就是怎么去存儲(chǔ)和復(fù)制這個(gè)數(shù)據(jù),rocketMQ對(duì)消息的刷盤提供了同步和異步的策略來滿足我們的,當(dāng)我們選擇同步刷盤之后,如果刷盤超時(shí)會(huì)給返回FLUSH_DISK_TIMEOUT,如果是異步刷盤不會(huì)返回刷盤相關(guān)信息,選擇同步刷盤可以盡最大程度滿足我們的消息不會(huì)丟失。

除了存儲(chǔ)有選擇之后,我們的主從同步提供了同步和異步兩種模式來進(jìn)行復(fù)制,當(dāng)然選擇同步可以提升可用性,但是消息的發(fā)送RT時(shí)間會(huì)下降10%左右。

3.6.3 Dleger

我們上面對(duì)于master-slave部署模式已經(jīng)做了很多分析,我們發(fā)現(xiàn),當(dāng)master出現(xiàn)問題的時(shí)候,我們的寫入怎么都會(huì)不可用,除非恢復(fù)master,或者手動(dòng)將我們的slave切換成master,導(dǎo)致了我們的Slave在多數(shù)情況下只有讀取的作用。RocketMQ在最近的幾個(gè)版本中推出了Dleger-RocketMQ,使用Raft協(xié)議復(fù)制CommitLog,并且自動(dòng)進(jìn)行選主,這樣master宕機(jī)的時(shí)候,寫入依然保持可用。

3.7 定時(shí)/延時(shí)消息

定時(shí)消息和延時(shí)消息在實(shí)際業(yè)務(wù)場景中使用的比較多,比如下面的一些場景:

  • 訂單超時(shí)未支付自動(dòng)關(guān)閉,因?yàn)樵诤芏鄨鼍爸邢聠沃髱齑婢捅绘i定了,這里需要將其進(jìn)行超時(shí)關(guān)閉。

  • 需要一些延時(shí)的操作,比如一些兜底的邏輯,當(dāng)做完某個(gè)邏輯之后,可以發(fā)送延時(shí)消息比如延時(shí)半個(gè)小時(shí),進(jìn)行兜底檢查補(bǔ)償。

  • 在某個(gè)時(shí)間給用戶發(fā)送消息,同樣也可以使用延時(shí)消息。

在開源版本的RocketMQ中延時(shí)消息并不支持任意時(shí)間的延時(shí),需要設(shè)置幾個(gè)固定的延時(shí)等級(jí),目前默認(rèn)設(shè)置為:?1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h?,從1s到2h分別對(duì)應(yīng)著等級(jí)1到18,而阿里云中的版本(要付錢)是可以支持40天內(nèi)的任何時(shí)刻(毫秒級(jí)別)。我們先看下在RocketMQ中定時(shí)任務(wù)原理圖:

你需要知道的RoketMQ

  • Step1:Producer在自己發(fā)送的消息上設(shè)置好需要延時(shí)的級(jí)別。

  • Step2: Broker發(fā)現(xiàn)此消息是延時(shí)消息,將Topic進(jìn)行替換成延時(shí)Topic,每個(gè)延時(shí)級(jí)別都會(huì)作為一個(gè)單獨(dú)的queue,將自己的Topic作為額外信息存儲(chǔ)。

  • Step3: 構(gòu)建ConsumerQueue

  • Step4: 定時(shí)任務(wù)定時(shí)掃描每個(gè)延時(shí)級(jí)別的ConsumerQueue。

  • Step5: 拿到ConsumerQueue中的CommitLog的Offset,獲取消息,判斷是否已經(jīng)達(dá)到執(zhí)行時(shí)間

  • Step6: 如果達(dá)到,那么將消息的Topic恢復(fù),進(jìn)行重新投遞。如果沒有達(dá)到則延遲沒有達(dá)到的這段時(shí)間執(zhí)行任務(wù)。

可以看見延時(shí)消息是利用新建單獨(dú)的Topic和Queue來實(shí)現(xiàn)的,如果我們要實(shí)現(xiàn)40天之內(nèi)的任意時(shí)間度,基于這種方案,那么需要40?24?60?60?1000個(gè)queue,這樣的成本是非常之高的,那阿里云上面的支持任意時(shí)間是怎么實(shí)現(xiàn)的呢?這里猜測是持久化二級(jí)TimeWheel時(shí)間輪,二級(jí)時(shí)間輪用于替代我們的ConsumeQueue,保存Commitlog-Offset,然后通過時(shí)間輪不斷的取出當(dāng)前已經(jīng)到了的時(shí)間,然后再次投遞消息。具體的實(shí)現(xiàn)邏輯需要后續(xù)會(huì)單獨(dú)寫一篇文章。

3.8 事務(wù)消息

事務(wù)消息同樣的也是RocketMQ中的一大特色,其可以幫助我們完成分布式事務(wù)的最終一致性,有關(guān)分布式事務(wù)相關(guān)的可以看我以前的很多文章都有很多詳細(xì)的介紹。

你需要知道的RoketMQ

具體使用事務(wù)消息步驟如下:

  • Step1:調(diào)用sendMessageInTransaction發(fā)送事務(wù)消息

  • Step2: ?如果發(fā)送成功,則執(zhí)行本地事務(wù)。

  • Step3: ?如果執(zhí)行本地事務(wù)成功則發(fā)送commit,如果失敗則發(fā)送rollback。

  • Step4: ?如果其中某個(gè)階段比如commit發(fā)送失敗,rocketMQ會(huì)進(jìn)行定時(shí)從Broker回查,本地事務(wù)的狀態(tài)。

事務(wù)消息的使用整個(gè)流程相對(duì)之前幾種消息使用比較復(fù)雜,下面是事務(wù)消息實(shí)現(xiàn)的原理圖:

你需要知道的RoketMQ

  • Step1: 發(fā)送事務(wù)消息,這里也叫做halfMessage,會(huì)將Topic替換為HalfMessage的Topic。

  • Step2: 發(fā)送commit或者rollback,如果是commit這里會(huì)查詢出之前的消息,然后將消息復(fù)原成原Topic,并且發(fā)送一個(gè)OpMessage用于記錄當(dāng)前消息可以刪除。如果是rollback這里會(huì)直接發(fā)送一個(gè)OpMessage刪除。

  • Step3: 在Broker有個(gè)處理事務(wù)消息的定時(shí)任務(wù),定時(shí)對(duì)比halfMessage和OpMessage,如果有OpMessage且狀態(tài)為刪除,那么該條消息必定commit或者rollback,所以就可以刪除這條消息。

  • Step4: 如果事務(wù)超時(shí)(默認(rèn)是6s),還沒有opMessage,那么很有可能commit信息丟了,這里會(huì)去反查我們的Producer本地事務(wù)狀態(tài)。

  • Step5: 根據(jù)查詢出來的信息做Step2。

我們發(fā)現(xiàn)RocketMQ實(shí)現(xiàn)事務(wù)消息也是通過修改原Topic信息,和延遲消息一樣,然后模擬成消費(fèi)者進(jìn)行消費(fèi),做一些特殊的業(yè)務(wù)邏輯。當(dāng)然我們還可以利用這種方式去做RocketMQ更多的擴(kuò)展。

4.總結(jié)

這里讓我們在回到文章中提到的幾個(gè)問題:

  • RocketMQ的topic和隊(duì)列是什么樣的,和Kafka的分區(qū)有什么不同?

  • RocketMQ網(wǎng)絡(luò)模型是什么樣的,和Kafka對(duì)比如何?

  • RocketMQ消息存儲(chǔ)模型是什么樣的,如何保證高可靠的存儲(chǔ),和Kafka對(duì)比如何?

想必讀完這篇文章,你心中已經(jīng)有答案

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI