溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

進階必看的RocketMQ知識點總結(jié)

發(fā)布時間:2021-10-25 17:36:37 來源:億速云 閱讀:219 作者:iii 欄目:編程語言

這篇文章主要介紹“進階必看的RocketMQ知識點總結(jié)”,在日常操作中,相信很多人在進階必看的RocketMQ知識點總結(jié)問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”進階必看的RocketMQ知識點總結(jié)”的疑惑有所幫助!接下來,請跟著小編一起來學習吧! 

RocketMQ 整體架構(gòu)設計

整體的架構(gòu)設計主要分為四大部分,分別是:Producer、Consumer、Broker、NameServer。

進階必看的RocketMQ知識點總結(jié)  

為了更貼合實際,我畫的都是集群部署,像 Broker 我還畫了主從。

  • Producer:就是消息生產(chǎn)者,可以集群部署。它會先和 NameServer 集群中的隨機一臺建立長連接,得知當前要發(fā)送的 Topic 存在哪臺 Broker Master上,然后再與其建立長連接,支持多種負載平衡模式發(fā)送消息。

  • Consumer:消息消費者,也可以集群部署。它也會先和 NameServer 集群中的隨機一臺建立長連接,得知當前要消息的 Topic 存在哪臺 Broker Master、Slave上,然后它們建立長連接,支持集群消費和廣播消費消息。

  • Broker:主要負責消息的存儲、查詢消費,支持主從部署,一個 Master 可以對應多個 Slave,Master 支持讀寫,Slave 只支持讀。Broker 會向集群中的每一臺 NameServer 注冊自己的路由信息。

  • NameServer:是一個很簡單的 Topic 路由注冊中心,支持 Broker 的動態(tài)注冊和發(fā)現(xiàn),保存 Topic 和 Borker 之間的關系。通常也是集群部署,但是各 NameServer 之間不會互相通信, 各 NameServer 都有完整的路由信息,即無狀態(tài)。

我再用一段話來概括它們之間的交互:

進階必看的RocketMQ知識點總結(jié)  

先啟動 NameServer 集群,各 NameServer 之間無任何數(shù)據(jù)交互,Broker 啟動之后會向所有 NameServer 定期(每 30s)發(fā)送心跳包,包括:IP、Port、TopicInfo,NameServer 會定期掃描 Broker 存活列表,如果超過 120s 沒有心跳則移除此 Broker 相關信息,代表下線。

這樣每個 NameServer 就知道集群所有 Broker 的相關信息,此時 Producer 上線從 NameServer 就可以得知它要發(fā)送的某 Topic 消息在哪個 Broker 上,和對應的 Broker (Master 角色的)建立長連接,發(fā)送消息。

Consumer 上線也可以從 NameServer  得知它所要接收的 Topic 是哪個 Broker ,和對應的 Master、Slave 建立連接,接收消息。

簡單的工作流程如上所述,相信大家對整體數(shù)據(jù)流轉(zhuǎn)已經(jīng)有點印象了,我們再來看看每個部分的詳細情況。 

NameServer

它的特點就是輕量級,無狀態(tài)。角色類似于 Zookeeper 的情況,從上面描述知道其主要的兩個功能就是:Broker 管理、路由信息管理。

總體而言比較簡單,我再貼一些字段,讓大家有更直觀的印象知道它存儲了些什么。

進階必看的RocketMQ知識點總結(jié)    

Producer

Producer 無非就是消息生產(chǎn)者,那首先它得知道消息要發(fā)往哪個 Broker ,于是每 30s 會從某臺 NameServer 獲取 Topic 和 Broker 的映射關系存在本地內(nèi)存中,如果發(fā)現(xiàn)新的 Broker 就會和其建立長連接,每 30s 會發(fā)送心跳至 Broker 維護連接。

并且會輪詢當前可以發(fā)送的 Broker 來發(fā)送消息,達到負載均衡的目的,在同步發(fā)送情況下如果發(fā)送失敗會默認重投兩次(retryTimesWhenSendFailed = 2),并且不會選擇上次失敗的 broker,會向其他 broker 投遞。

在異步發(fā)送失敗的情況下也會重試,默認也是兩次 (retryTimesWhenSendAsyncFailed = 2),但是僅在同一個 Broker 上重試。 

Producer 啟動流程

然后我們再來看看 Producer 的啟動流程看看都干了些啥。

進階必看的RocketMQ知識點總結(jié)  

大致啟動流程圖中已經(jīng)表明的很清晰的,但是有些細節(jié)可能還不清楚,比如重平衡啊,TBW102 啥玩意啊,有哪些定時任務啊,別急都會提到的。

有人可能會問這生產(chǎn)者為什么要啟拉取服務、重平衡?

因為 Producer 和 Consumer 都需要用 MQClientInstance,而同一個 clientId 是共用一個 MQClientInstance 的, clientId 是通過本機 IP 和 instanceName(默認值 default)拼起來的,所以多個 Producer 、Consumer 實際用的是一個MQClientInstance。

至于有哪些定時任務,請看下圖:

進階必看的RocketMQ知識點總結(jié)     

Producer 發(fā)消息流程

我們再來看看發(fā)消息的流程,大致也不是很復雜,無非就是找到要發(fā)送消息的 Topic 在哪個 Broker 上,然后發(fā)送消息。

進階必看的RocketMQ知識點總結(jié)  

現(xiàn)在就知道 TBW102 是啥用的,就是接受自動創(chuàng)建主題的 Broker 啟動會把這個默認主題登記到 NameServer,這樣當 Producer 發(fā)送新 Topic 的消息時候就得知哪個 Broker 可以自動創(chuàng)建主題,然后發(fā)往那個 Broker。

而 Broker 接受到這個消息的時候發(fā)現(xiàn)沒找到對應的主題,但是它接受創(chuàng)建新主題,這樣就會創(chuàng)建對應的 Topic 路由信息。 

自動創(chuàng)建主題的弊端

自動創(chuàng)建主題那么有可能該主題的消息都只會發(fā)往一臺 Broker,起不到負載均衡的作用。

因為創(chuàng)建新 Topic 的請求到達 Broker 之后,Broker 創(chuàng)建對應的路由信息,但是心跳是每 30s 發(fā)送一次,所以說 NameServer 最長需要 30s 才能得知這個新 Topic 的路由信息。

假設此時發(fā)送方還在連續(xù)快速的發(fā)送消息,那 NameServer 上其實還沒有關于這個 Topic 的路由信息,所以有機會讓別的允許自動創(chuàng)建的 Broker 也創(chuàng)建對應的 Topic 路由信息,這樣集群里的 Broker 就能接受這個 Topic 的信息,達到負載均衡的目的,但也有個別 Broker 可能,沒收到。

如果發(fā)送方這一次發(fā)了之后 30s 內(nèi)一個都不發(fā),之前的那個 Broker 隨著心跳把這個路由信息更新到 NameServer 了,那么之后發(fā)送該 Topic 消息的 Producer 從 NameServer 只能得知該 Topic 消息只能發(fā)往之前的那臺 Broker ,這就不均衡了,如果這個新主題消息很多,那臺 Broker 負載就很高了。

所以不建議線上開啟允許自動創(chuàng)建主題,即 autoCreateTopicEnable 參數(shù)。 

發(fā)送消息故障延遲機制

有一個參數(shù)是 sendLatencyFaultEnable,默認不開啟。這個參數(shù)的作用是對于之前發(fā)送超時的 Broker 進行一段時間的退避。

發(fā)送消息會記錄此時發(fā)送消息的時間,如果超過一定時間,那么此 Broker 就在一段時間內(nèi)不允許發(fā)送。

進階必看的RocketMQ知識點總結(jié)  

比如發(fā)送時間超過 15000ms 則在 600000 ms 內(nèi)無法向該 Broker 發(fā)送消息。

這個機制其實很關鍵,發(fā)送超時大概率表明此 Broker 負載高,所以先避讓一會兒,讓它緩一緩,這也是實現(xiàn)消息發(fā)送高可用的關鍵。

小結(jié)一下

Producer 每 30s 會向 NameSrv 拉取路由信息更新本地路由表,有新的 Broker 就和其建立長連接,每隔 30s 發(fā)送心跳給 Broker 。

不要在生產(chǎn)環(huán)境開啟 autoCreateTopicEnable。

Producer 會通過重試和延遲機制提升消息發(fā)送的高可用。 

Broker

Broker 就比較復雜一些了,但是非常重要。大致分為以下五大模塊,我們來看一下官網(wǎng)的圖。

進階必看的RocketMQ知識點總結(jié) 
  • Remoting 遠程模塊,處理客戶請求。
  • Client Manager 管理客戶端,維護訂閱的主題。
  • Store Service 提供消息存儲查詢服務。
  • HA Serivce,主從同步高可用。
  • Index Serivce,通過指定key 建立索引,便于查詢。

有幾個模塊沒啥可說的就不分析了,先看看存儲的。 

Broker 的存儲

RocketMQ 存儲用的是本地文件存儲系統(tǒng),效率高也可靠。

主要涉及到三種類型的文件,分別是 CommitLog、ConsumeQueue、IndexFile。 

CommitLog

RocketMQ 的所有主題的消息都存在 CommitLog 中,單個 CommitLog 默認 1G,并且文件名以起始偏移量命名,固定 20 位,不足則前面補 0,比如 00000000000000000000 代表了第一個文件,第二個文件名就是 00000000001073741824,表明起始偏移量為 1073741824,以這樣的方式命名用偏移量就能找到對應的文件。

所有消息都是順序?qū)懭氲?,超過文件大小則開啟下一個文件。 

ConsumeQueue

ConsumeQueue 消息消費隊列,可以認為是 CommitLog 中消息的索引,因為 CommitLog 是糅合了所有主題的消息,所以通過索引才能更加高效的查找消息。

ConsumeQueue 存儲的條目是固定大小,只會存儲 8 字節(jié)的 commitlog 物理偏移量,4 字節(jié)的消息長度和 8 字節(jié) Tag 的哈希值,固定 20 字節(jié)。

在實際存儲中,ConsumeQueue 對應的是一個Topic 下的某個 Queue,每個文件約 5.72M,由 30w 條數(shù)據(jù)組成。

消費者是先從 ConsumeQueue 來得到消息真實的物理地址,然后再去 CommitLog 獲取消息。 

IndexFile

IndexFile 就是索引文件,是額外提供查找消息的手段,不影響主流程。

通過 Key 或者時間區(qū)間來查詢對應的消息,文件名以創(chuàng)建時間戳命名,固定的單個 IndexFile 文件大小約為400M,一個 IndexFile 存儲 2000W個索引。

我們再來看看以上三種文件的內(nèi)容是如何生成的:

進階必看的RocketMQ知識點總結(jié)  

消息到了先存儲到 Commitlog,然后會有一個 ReputMessageService 線程接近實時地將消息轉(zhuǎn)發(fā)給消息消費隊列文件與索引文件,也就是說是異步生成的。

消息刷盤機制

RocketMQ 提供消息同步刷盤和異步刷盤兩個選擇,關于刷盤我們都知道效率比較低,單純存入內(nèi)存中的話效率是最高的,但是可靠性不高,影響消息可靠性的情況大致有以下幾種:

  1. Broker 被暴力關閉,比如 kill -9
  2. Broker 掛了
  3. 操作系統(tǒng)掛了
  4. 機器斷電
  5. 機器壞了,開不了機
  6. 磁盤壞了

如果都是 1-4 的情況,同步刷盤肯定沒問題,異步的話就有可能丟失部分消息,5 和 6就得依靠副本機制了,如果同步雙寫肯定是穩(wěn)的,但是性能太差,如果異步則有可能丟失部分消息。

所以需要看場景來使用同步、異步刷盤和副本雙寫機制。 

頁緩存與內(nèi)存映射

Commitlog 是混合存儲的,所以所有消息的寫入就是順序?qū)懭耄瑢ξ募捻樞驅(qū)懭牒蛢?nèi)存的寫入速度基本上沒什么差別。

并且 RocketMQ 的文件都利用了內(nèi)存映射即 Mmap,將程序虛擬頁面直接映射到頁緩存上,無需有內(nèi)核態(tài)再往用戶態(tài)的拷貝,來看一下我之前文章畫的圖。

進階必看的RocketMQ知識點總結(jié)  

頁緩存其實就是操作系統(tǒng)對文件的緩存,用來加速文件的讀寫,也就是說對文件的寫入先寫到頁緩存中,操作系統(tǒng)會不定期刷盤(時間不可控),對文件的讀會先加載到頁緩存中,并且根據(jù)局部性原理還會預讀臨近塊的內(nèi)容。

其實也是因為使用內(nèi)存映射機制,所以 RocketMQ 的文件存儲都使用定長結(jié)構(gòu)來存儲,方便一次將整個文件映射至內(nèi)存中。 

文件預分配和文件預熱

而內(nèi)存映射也只是做了映射,只有當真正讀取頁面的時候產(chǎn)生缺頁中斷,才會將數(shù)據(jù)真正加載到內(nèi)存中,所以 RocketMQ 做了一些優(yōu)化,防止運行時的性能抖動。 

文件預分配

CommitLog 的大小默認是1G,當超過大小限制的時候需要準備新的文件,而 RocketMQ 就起了一個后臺線程 AllocateMappedFileService,不斷的處理 AllocateRequest,AllocateRequest 其實就是預分配的請求,會提前準備好下一個文件的分配,防止在消息寫入的過程中分配文件,產(chǎn)生抖動。 

文件預熱

有一個 warmMappedFile 方法,它會把當前映射的文件,每一頁遍歷多去,寫入一個0字節(jié),然后再調(diào)用mlock 和 madvise(MADV_WILLNEED)。

mlock:可以將進程使用的部分或者全部的地址空間鎖定在物理內(nèi)存中,防止其被交換到 swap 空間。

madvise:給操作系統(tǒng)建議,說這文件在不久的將來要訪問的,因此,提前讀幾頁可能是個好主意。 

小結(jié)一下

CommitLog 采用混合型存儲,也就是所有 Topic 都存在一起,順序追加寫入,文件名用起始偏移量命名。

消息先寫入 CommitLog 再通過后臺線程分發(fā)到 ConsumerQueue 和 IndexFile 中。

消費者先讀取 ConsumerQueue 得到真正消息的物理地址,然后訪問 CommitLog 得到真正的消息。

利用了 mmap 機制減少一次拷貝,利用文件預分配和文件預熱提高性能。

提供同步和異步刷盤,根據(jù)場景選擇合適的機制。 

Broker 的 HA

從 Broker 會和主 Broker 建立長連接,然后獲取主 Broker commitlog 最大偏移量,開始向主 Broker 拉取消息,主 Broker 會返回一定數(shù)量的消息,循環(huán)進行,達到主從數(shù)據(jù)同步。

消費者消費消息會先請求主 Broker ,如果主 Broker 覺得現(xiàn)在壓力有點大,則會返回從 Broker 拉取消息的建議,然后消費者就去從服務器拉取消息。 

Consumer

消費有兩種模式,分別是廣播模式和集群模式。

廣播模式:一個分組下的每個消費者都會消費完整的Topic 消息。

集群模式:一個分組下的消費者瓜分消費Topic 消息。

一般我們用的都是集群模式。

而消費者消費消息又分為推和拉模式,詳細看我這篇文章消息隊列推拉模式,分別從源碼級別分析了 RokcetMQ 和 Kafka 的消息推拉,以及推拉模式的優(yōu)缺點。 

Consumer 端的負載均衡機制

Consumer 會定期的獲取 Topic 下的隊列數(shù),然后再去查找訂閱了該 Topic 的同一消費組的所有消費者信息,默認的分配策略是類似分頁排序分配。

將隊列排好序,然后消費者排好序,比如隊列有 9 個,消費者有 3 個,那消費者-1 消費隊列 0、1、2 的消息,消費者-2 消費隊列 3、4、5,以此類推。

所以如果負載太大,那么就加隊列,加消費者,通過負載均衡機制就可以感知到重平衡,均勻負載。 

Consumer 消息消費的重試

難免會遇到消息消費失敗的情況,所以需要提供消費失敗的重試,而一般的消費失敗要么就是消息結(jié)構(gòu)有誤,要么就是一些暫時無法處理的狀態(tài),所以立即重試不太合適。

RocketMQ 會給每個消費組都設置一個重試隊列,Topic 是 %RETRY%+consumerGroup,并且設定了很多重試級別來延遲重試的時間。

為了利用 RocketMQ 的延時隊列功能,重試的消息會先保存在 Topic 名稱為“SCHEDULE_TOPIC_XXXX”的延遲隊列,在消息的擴展字段里面會存儲原來所屬的 Topic 信息。

delay 一段時間后再恢復到重試隊列中,然后 Consumer 就會消費這個重試隊列主題,得到之前的消息。

如果超過一定的重試次數(shù)都消費失敗,則會移入到死信隊列,即 Topic %DLQ%" + ConsumerGroup 中,存儲死信隊列即認為消費成功,因為實在沒轍了,暫時放過。

然后我們可以通過人工來處理死信隊列的這些消息。 

消息的全局順序和局部順序

全局順序就是消除一切并發(fā),一個 Topic 一個隊列,Producer 和 Consuemr 的并發(fā)都為一。

局部順序其實就是指某個隊列順序,多隊列之間還是能并行的。

可以通過 MessageQueueSelector 指定 Producer 某個業(yè)務只發(fā)這一個隊列,然后 Comsuer 通過MessageListenerOrderly 接受消息,其實就是加鎖消費。

在 Broker 會有一個 mqLockTable ,順序消息在創(chuàng)建拉取消息任務的時候需要在 Broker 鎖定該消息隊列,之后加鎖成功的才能消費。

而嚴格的順序消息其實很難,假設現(xiàn)在都好好的,如果有個 Broker 宕機了,然后發(fā)生了重平衡,隊列對應的消費者實例就變了,就會有可能會出現(xiàn)亂序的情況,如果要保持嚴格順序,那此時就只能讓整個集群不可用了。 

一些注意點

1、訂閱消息是以 ConsumerGroup 為單位存儲的,所以ConsumerGroup 中的每個 Consumer 需要有相同的訂閱。

因為訂閱消息是隨著心跳上傳的,如果一個 ConsumerGroup 中 Consumer 訂閱信息不一樣,那么就會出現(xiàn)互相覆蓋的情況。

比如消費者 A 訂閱 Topic a,消費者 B 訂閱 Topic b,此時消費者 A 去 Broker 拿消息,然后 B 的心跳包發(fā)出了,Broker 更新了,然后接到 A 的請求,一臉懵逼,沒這訂閱關系啊。

2、RocketMQ 主從讀寫分離

從只能讀,不能寫,并且只有當前客戶端讀的 offset 和 當前 Broker 已接受的最大 offset 超過限制的物理內(nèi)存大小時候才會去從讀,所以正常情況下從分擔不了流量

3、單單加機器提升不了消費速度,隊列的數(shù)量也需要跟上。

4、之前提到的,不要允許自動創(chuàng)建主題 

RocketMQ 的最佳實踐

這些最佳實踐部分參考自官網(wǎng)。 

Tags的使用

建議一個應用一個 Topic,利用 tages 來標記不同業(yè)務,因為 tages 設置比較靈活,且一個應用一個 Topic 很清晰,能直觀的辨別。 

Keys的使用

如果有消息業(yè)務上的唯一標識,請?zhí)顚懙?keys 字段中,方便日后的定位查找。 

提高 Consumer 的消費能力

1、提高消費并行度:增加隊列數(shù)和消費者數(shù)量,提高單個消費者的并行消費線程,參數(shù) consumeThreadMax。

2、批處理消費,設置 consumeMessageBatchMaxSize 參數(shù),這樣一次能拿到多條消息,然后比如一個 update語句之前要執(zhí)行十次,現(xiàn)在一次就執(zhí)行完。

3、跳過非核心的消息,當負載很重的時候,為了保住那些核心的消息,設置那些非核心的消息,例如此時消息堆積 1W 條了之后,就直接返回消費成功,跳過非核心消息。 

NameServer 的尋址

請使用 HTTP 靜態(tài)服務器尋址(默認),這樣 NameServer 就能動態(tài)發(fā)現(xiàn)。 

JVM選項

以下抄自官網(wǎng):

如果不關心 RocketMQ Broker的啟動時間,通過“預觸摸” Java 堆以確保在 JVM 初始化期間每個頁面都將被分配。

那些不關心啟動時間的人可以啟用它:-XX:+AlwaysPreTouch 禁用偏置鎖定可能會減少JVM暫停, -XX:-UseBiasedLocking 至于垃圾回收,建議使用帶JDK 1.8的G1收集器。

-XX:+UseG1GC -XX:G1HeapRegionSize=16m
-XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30

另外不要把-XX:MaxGCPauseMillis的值設置太小,否則JVM將使用一個小的年輕代來實現(xiàn)這個目標,這將導致非常頻繁的minor GC,所以建議使用rolling GC日志文件:

-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m 

Linux內(nèi)核參數(shù)

以下抄自官網(wǎng):

  • vm.extra_free_kbytes,告訴VM在后臺回收(kswapd)啟動的閾值與直接回收(通過分配進程)的閾值之間保留額外的可用內(nèi)存。RocketMQ使用此參數(shù)來避免內(nèi)存分配中的長延遲。(與具體內(nèi)核版本相關)
  • vm.min_free_kbytes,如果將其設置為低于1024KB,將會巧妙的將系統(tǒng)破壞,并且系統(tǒng)在高負載下容易出現(xiàn)死鎖。
  • vm.max_map_count,限制一個進程可能具有的最大內(nèi)存映射區(qū)域數(shù)。RocketMQ將使用mmap加載CommitLog和ConsumeQueue,因此建議將為此參數(shù)設置較大的值。(agressiveness --> aggressiveness)
  • vm.swappiness,定義內(nèi)核交換內(nèi)存頁面的積極程度。較高的值會增加攻擊性,較低的值會減少交換量。建議將值設置為10來避免交換延遲。
  • File descriptor limits,RocketMQ需要為文件(CommitLog和ConsumeQueue)和網(wǎng)絡連接打開文件描述符。我們建議設置文件描述符的值為655350。
  • Disk scheduler,RocketMQ建議使用I/O截止時間調(diào)度器,它試圖為請求提供有保證的延遲。

到此,關于“進階必看的RocketMQ知識點總結(jié)”的學習就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關知識,請繼續(xù)關注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI