溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

RocketMQ NameServer深入剖析

發(fā)布時(shí)間:2020-08-09 09:27:35 來(lái)源:網(wǎng)絡(luò) 閱讀:440 作者:Java_老男孩 欄目:編程語(yǔ)言

RocketMQ NameServer深入剖析

本文將深入剖析rocketmq為什么選擇自己開發(fā)NameServer,而不是選擇類似于ZK這樣的開源組件。同時(shí)對(duì)rocketmq的路由注冊(cè)、路由發(fā)現(xiàn)、路由剔除進(jìn)行剖析。并通過(guò)結(jié)合核心源碼,對(duì)筆者的觀點(diǎn)進(jìn)行驗(yàn)證。同時(shí)對(duì)不同類型消息的重試機(jī)制,以及客戶端選擇nameserver的策略進(jìn)行深入講解。

文章第一部分是name server在rocketmq整體架構(gòu)中的作用,熟悉的同學(xué)可以直接跳過(guò)。

1 NameServer的作用

Name Server 是專為 RocketMQ 設(shè)計(jì)的輕量級(jí)名稱服務(wù),具有簡(jiǎn)單、可集群橫吐擴(kuò)展、無(wú)狀態(tài),節(jié)點(diǎn)之間互不通信等特點(diǎn)。整個(gè)Rocketmq集群的工作原理如下圖所示:

RocketMQ NameServer深入剖析

可以看到,Broker集群、Producer集群、Consumer集群都需要與NameServer集群進(jìn)行通信:

Broker集群

Broker用于接收生產(chǎn)者發(fā)送消息,或者消費(fèi)者消費(fèi)消息的請(qǐng)求。一個(gè)Broker集群由多組Master/Slave組成,Master可寫可讀,Slave只可以讀,Master將寫入的數(shù)據(jù)同步給Slave。每個(gè)Broker節(jié)點(diǎn),在啟動(dòng)時(shí),都會(huì)遍歷NameServer列表,與每個(gè)NameServer建立長(zhǎng)連接,注冊(cè)自己的信息,之后定時(shí)上報(bào)。

Producer集群

消息的生產(chǎn)者,通過(guò)NameServer集群獲得Topic的路由信息,包括Topic下面有哪些Queue,這些Queue分布在哪些Broker上等。Producer只會(huì)將消息發(fā)送到Master節(jié)點(diǎn)上,因此只需要與Master節(jié)點(diǎn)建立連接。

Consumer集群

消息的消費(fèi)者,通過(guò)NameServer集群獲得Topic的路由信息,連接到對(duì)應(yīng)的Broker上消費(fèi)消息。注意,由于Master和Slave都可以讀取消息,因此Consumer會(huì)與Master和Slave都建立連接。

2 為什么選擇自己開發(fā)NameServer

目前可以作為服務(wù)發(fā)現(xiàn)組件有很多,如etcd、consul,zookeeper等:

RocketMQ NameServer深入剖析

那么為什么rocketmq選擇自己開發(fā)一個(gè)NameServer,而不是使用這些開源組件呢?特別的,Zookeeper其提供了Master選舉、分布式鎖、數(shù)據(jù)的發(fā)布和訂閱等諸多功能RocketMQ設(shè)計(jì)之初時(shí)參考的另一款消息中間件Kafka就使用了Zookeeper。

事實(shí)上,在RocketMQ的早期版本,即MetaQ 1.x和MetaQ 2.x階段,也是依賴Zookeeper的。但MetaQ 3.x(即RocketMQ)卻去掉了ZooKeeper依賴,轉(zhuǎn)而采用自己的NameServer。

而RocketMQ的架構(gòu)設(shè)計(jì)決定了只需要一個(gè)輕量級(jí)的元數(shù)據(jù)服務(wù)器就足夠了,只需要保持最終一致,而不需要Zookeeper這樣的強(qiáng)一致性解決方案,不需要再依賴另一個(gè)中間件,從而減少整體維護(hù)成本。

敏銳的同學(xué)肯定已經(jīng)意識(shí)到了,根據(jù)CAP理論,RocketMQ在名稱服務(wù)這個(gè)模塊的設(shè)計(jì)上選擇了AP,而不是CP:

RocketMQ NameServer深入剖析

  • 一致性(Consistency):Name Server 集群中的多個(gè)實(shí)例,彼此之間是不通信的,這意味著某一時(shí)刻,不同實(shí)例上維護(hù)的元數(shù)據(jù)可能是不同的,客戶端獲取到的數(shù)據(jù)也可能是不一致的。

  • 可用性(Availability):只要不是所有NameServer節(jié)點(diǎn)都掛掉,且某個(gè)節(jié)點(diǎn)可以在指定之間內(nèi)響應(yīng)客戶端即可。

  • 分區(qū)容錯(cuò)(Partiton Tolerance):對(duì)于分布式架構(gòu),網(wǎng)絡(luò)條件不可控,出現(xiàn)網(wǎng)絡(luò)分區(qū)是不可避免的,只要保證部分NameServer節(jié)點(diǎn)網(wǎng)絡(luò)可達(dá),就可以獲取到數(shù)據(jù)。具體看公司如何實(shí)施,例如:為了實(shí)現(xiàn)跨機(jī)房的容災(zāi),可以將NameServer部署的不同的機(jī)房,某個(gè)機(jī)房出現(xiàn)網(wǎng)絡(luò)故障,其他機(jī)房依然可用,當(dāng)然Broker集群/Producer集群/Consumer集群也要跨機(jī)房部署。

事實(shí)上,除了RocketMQ開發(fā)了自己的NameServer,最近 Kafka 社區(qū)也在 Wiki 空間上提交了一項(xiàng)新的改進(jìn)提案“KIP-500: Replace ZooKeeper with a Self-Managed Metadata Quorum”,其目的是為了消除 Kafka 對(duì) ZooKeeper 的依賴,該提案建議用自管理的元數(shù)據(jù)仲裁機(jī)制替換原來(lái)的 ZooKeeper 組件。感興趣的讀者可以自行查閱相關(guān)資料。

3?NameServer如何保證數(shù)據(jù)的最終一致

NameServer作為一個(gè)名稱服務(wù),需要提供服務(wù)注冊(cè)、服務(wù)剔除、服務(wù)發(fā)現(xiàn)這些基本功能,但是NameServer節(jié)點(diǎn)之間并不通信,在某個(gè)時(shí)刻各個(gè)節(jié)點(diǎn)數(shù)據(jù)可能不一致的情況下,如何保證客戶端可以最終拿到正確的數(shù)據(jù)。下面分別從路由注冊(cè)、路由剔除,路由發(fā)現(xiàn)三個(gè)角度進(jìn)行介紹。

3.1?路由注冊(cè)

對(duì)于Zookeeper、Etcd這樣強(qiáng)一致性組件,數(shù)據(jù)只要寫到主節(jié)點(diǎn),內(nèi)部會(huì)通過(guò)狀態(tài)機(jī)將數(shù)據(jù)復(fù)制到其他節(jié)點(diǎn),Zookeeper使用的是Zab協(xié)議,etcd使用的是raft協(xié)議。

但是NameServer節(jié)點(diǎn)之間是互不通信的,無(wú)法進(jìn)行數(shù)據(jù)復(fù)制。RocketMQ采取的策略是,在Broker節(jié)點(diǎn)在啟動(dòng)的時(shí)候,輪訓(xùn)NameServer列表,與每個(gè)NameServer節(jié)點(diǎn)建立長(zhǎng)連接,發(fā)起注冊(cè)請(qǐng)求。NameServer內(nèi)部會(huì)維護(hù)一個(gè)Broker表,用來(lái)動(dòng)態(tài)存儲(chǔ)Broker的信息。

同時(shí),Broker節(jié)點(diǎn)為了證明自己是存活的,會(huì)將最新的信息上報(bào)給NameServer,然后每隔30秒向NameServer發(fā)送心跳包,心跳包中包含 BrokerId、Broker地址、Broker名稱、Broker所屬集群名稱等等,然后NameServer接收到心跳包后,會(huì)更新時(shí)間戳,記錄這個(gè)Broker的最新存活時(shí)間。

NameServer在處理心跳包的時(shí)候,存在多個(gè)Broker同時(shí)操作一張Broker表,為了防止并發(fā)修改Broker表導(dǎo)致不安全,路由注冊(cè)操作引入了ReadWriteLock讀寫鎖,這個(gè)設(shè)計(jì)亮點(diǎn)允許多個(gè)消息生產(chǎn)者并發(fā)讀,保證了消息發(fā)送時(shí)的高并發(fā),但是同一時(shí)刻N(yùn)ameServer只能處理一個(gè)Broker心跳包,多個(gè)心跳包串行處理。這也是讀寫鎖的經(jīng)典使用場(chǎng)景,即讀多寫少。

3.2?路由剔除

正常情況下,如果Broker關(guān)閉,則會(huì)與NameServer斷開長(zhǎng)連接,Netty的通道關(guān)閉監(jiān)聽器會(huì)監(jiān)聽到連接斷開事件,然后會(huì)將這個(gè)Broker信息剔除掉。

異常情況下,NameServer中有一個(gè)定時(shí)任務(wù),每隔10秒掃描一下Broker表,如果某個(gè)Broker的心跳包最新時(shí)間戳距離當(dāng)前時(shí)間超多120秒,也會(huì)判定Broker失效并將其移除。

特別的,對(duì)于一些日常運(yùn)維工作,例如:Broker升級(jí),RocketMQ提供了一種優(yōu)雅剔除路由信息的方式。如在升級(jí)一個(gè)節(jié)Master點(diǎn)之前,可以先通過(guò)命令行工具禁止這個(gè)Broker的寫權(quán)限,生產(chǎn)者發(fā)送到這個(gè)Broker的請(qǐng)求,都會(huì)收到一個(gè)NO_PERMISSION響應(yīng),之后會(huì)自動(dòng)重試其他的Broker。當(dāng)觀察到這個(gè)broker沒(méi)有流量后,再將這個(gè)broker移除。

3.3?路由發(fā)現(xiàn)

路由發(fā)現(xiàn)是客戶端的行為,這里的客戶端主要說(shuō)的是生產(chǎn)者和消費(fèi)者。具體來(lái)說(shuō):

  • 對(duì)于生產(chǎn)者,可以發(fā)送消息到多個(gè)Topic,因此一般是在發(fā)送第一條消息時(shí),才會(huì)根據(jù)Topic獲取從NameServer獲取路由信息。

  • 對(duì)于消費(fèi)者,訂閱的Topic一般是固定的,所在在啟動(dòng)時(shí)就會(huì)拉取。

那么生產(chǎn)者/消費(fèi)者在工作的過(guò)程中,如果路由信息發(fā)生了變化怎么處理呢?如:Broker集群新增了節(jié)點(diǎn),節(jié)點(diǎn)宕機(jī)或者Queue的數(shù)量發(fā)生了變化。細(xì)心的讀者注意到,前面講解NameServer在路由注冊(cè)或者路由剔除過(guò)程中,并不會(huì)主動(dòng)推送會(huì)客戶端的,這意味著,需要由客戶端拉取主題的最新路由信息。

事實(shí)上,RocketMQ客戶端提供了定時(shí)拉取Topic最新路由信息的機(jī)制,這里我們直接結(jié)合源碼來(lái)講解。?? ??? ?

DefaultMQProducer和DefaultMQConsumer有一個(gè)pollNameServerInterval配置項(xiàng),用于定時(shí)從NameServer并獲取最新的路由表,默認(rèn)是30秒,它們底層都依賴一個(gè)MQClientInstance類。

MQClientInstance類中有一個(gè)updateTopicRouteInfoFromNameServer方法,用于根據(jù)指定的拉取時(shí)間間隔,周期性的的從NameServer拉取路由信息。?在拉取時(shí),會(huì)把當(dāng)前啟動(dòng)的Producer和Consumer需要使用到的Topic列表放到一個(gè)集合中,逐個(gè)從NameServer進(jìn)行更新。以下源碼截圖展示了這個(gè)過(guò)程:

RocketMQ NameServer深入剖析

然而定時(shí)拉取,還不能解決所有的問(wèn)題。因?yàn)榭蛻舳四J(rèn)是每隔30秒會(huì)定時(shí)請(qǐng)求NameServer并獲取最新的路由表,意味著客戶端獲取路由信息總是會(huì)有30秒的延時(shí)。這就帶來(lái)一個(gè)嚴(yán)重的問(wèn)題,客戶端無(wú)法實(shí)時(shí)感知Broker服務(wù)器的宕機(jī)。如果生產(chǎn)者和消費(fèi)者在這30秒內(nèi),依然會(huì)向這個(gè)宕機(jī)的broker發(fā)送或消費(fèi)消息呢?

這個(gè)問(wèn)題,可以通過(guò)客戶端重試機(jī)制來(lái)解決。

4 生產(chǎn)者重試機(jī)制

在講解生產(chǎn)者重試機(jī)制之前,我們必須先對(duì)三種消息類型:普通消息、普通有序消息、嚴(yán)格有序消息進(jìn)行介紹。因?yàn)镽ocketMQ客戶端的生產(chǎn)者重試機(jī)制,只會(huì)普通消息有作用。對(duì)于普通有序消息、嚴(yán)格有序消息是沒(méi)有作用。目前網(wǎng)上絕大部分文章對(duì)此并沒(méi)有進(jìn)行區(qū)分,導(dǎo)致參考了這些文章的同學(xué)誤以為自己的消息發(fā)送失敗會(huì)自動(dòng)進(jìn)行重試,然而事實(shí)上可能根本沒(méi)有進(jìn)行重試。三種消息的類型介紹如下:

  • 普通消息:消息是無(wú)序的,任意發(fā)送發(fā)送哪一個(gè)隊(duì)列都可以。

  • 普通有序消息:同一類消息(例如某個(gè)用戶的消息)總是發(fā)送到同一個(gè)隊(duì)列,在異常情況下,也可以發(fā)送到其他隊(duì)列。

  • 嚴(yán)格有序消息:消息必須被發(fā)送到同一個(gè)隊(duì)列,即使在異常情況下,也不允許發(fā)送到其他隊(duì)列。

對(duì)于這三種類型的消息,RocketMQ對(duì)應(yīng)的提供了對(duì)應(yīng)的方法來(lái)分別消息,例如同步發(fā)送(異步/批量/oneway也是類似):

RocketMQ NameServer深入剖析

需要注意的是:這些方法重載形式,本意是為了支持以上三種不同的消息類型。但是你不按套路出牌,例如:對(duì)于一個(gè)用戶的多條消息,在調(diào)用第一種send方法形式時(shí),依然在對(duì)于同一個(gè)用戶每次發(fā)送消息時(shí),選擇了不同的隊(duì)列(MessageQueue),那么也沒(méi)有人能阻止。我只能說(shuō),你忽略了RocketMQ團(tuán)隊(duì)設(shè)計(jì)這三個(gè)方法的意圖。

4.1 普通消息的重試

對(duì)于普通消息,消息發(fā)送默認(rèn)采用round-robin機(jī)制來(lái)選擇發(fā)送到哪一個(gè)隊(duì)列,如果發(fā)送失敗,默認(rèn)重試2次。由于之前發(fā)送失敗的Queue必然位于某個(gè)Broker上,在重試過(guò)程中,這個(gè)失敗的Broker上的Queue都不會(huì)選擇,這里主要是考慮,既然發(fā)送到這個(gè)Broker上某個(gè)Queue失敗了,那么發(fā)送到這個(gè)Broker上的Queue失敗的可能性依然很大,所以選擇其他Broker。

但是一定會(huì)這樣嗎?例如Broker集群只是由一組Master/Slave組成,發(fā)送消息只會(huì)選擇Master,如果這個(gè)Master失敗了,沒(méi)有其他Master可選,此時(shí)已然會(huì)選擇這個(gè)Master上的其他Queue。

在實(shí)際生產(chǎn)環(huán)境中,通常Broker集群至少由2組Master/Slave組成,甚至更多,例如我司就是3主3從。這樣就可以很好的利用RocketMQ對(duì)于普通消息發(fā)送的重試機(jī)制,每次重試到不同的Broker上。

從源碼層面來(lái)看,對(duì)于普通消息,RocketMQ選擇隊(duì)列默認(rèn)是通過(guò)MQFaultStrategy#selectOneMessageQueue來(lái)選擇一個(gè)的隊(duì)列,在未開啟延遲容錯(cuò)的情況下,內(nèi)部會(huì)調(diào)用TopicPublishInfo#selectOneMessageQueue方法,這個(gè)方法源碼體了前面說(shuō)的重試邏輯:

RocketMQ NameServer深入剖析

事情到這里并沒(méi)有結(jié)束,這段代碼只是單次發(fā)送消息失敗重試選擇隊(duì)列的邏輯。實(shí)際情況可能是,在Broker宕機(jī)期間,可能會(huì)發(fā)送多條消息,那么每次都可能會(huì)選擇到失敗的Broker上的Queue,然后再重試,盡管重試可能會(huì)成功,但是每次發(fā)送消息的耗時(shí)會(huì)增加。因此,MQFaultStrategy實(shí)際上還提供了以下兩個(gè)功能(超出本文范疇,將會(huì)后續(xù)其他文章中講解):

  • 失敗隔離:即發(fā)送消息到某個(gè)broker失敗之后,將其進(jìn)行隔離,優(yōu)先從其他正常的broker中進(jìn)行選擇

  • 延遲隔離:優(yōu)先發(fā)送消息到延遲比較小的broker

?? ?? ?對(duì)于無(wú)序消息,通過(guò)這種異常重試機(jī)制,就可以保證消息發(fā)送的高可用了。同時(shí)由于不需要NameServer通知眾多不固定的生產(chǎn)者,也降低了NameServer實(shí)現(xiàn)的復(fù)雜性。

既然重試機(jī)制有這么明顯的好處,那么對(duì)于普通有序消息,和嚴(yán)格有序消息,rocketmq為什么默認(rèn)不進(jìn)行重試呢?答案很簡(jiǎn)單,這些消息只能發(fā)送某個(gè)特定的Broker上的某個(gè)特定的Queue中,如果發(fā)送失敗,重試失敗的可能依然很大,所以默認(rèn)不進(jìn)行重試。如果需要重試,需要業(yè)務(wù)方自己來(lái)做。

4.2 普通有序消息失敗情況下的短暫無(wú)序

首先說(shuō)明,對(duì)于普通有序消息,RocketMQ是不會(huì)進(jìn)行重試的。如果需要重試,那么業(yè)務(wù)RD同學(xué)需要自己編寫重試代碼,例如通過(guò)一個(gè)for循環(huán),最多重試幾次。這里主要說(shuō)明:對(duì)于普通有序消息,在異常情況下,如何經(jīng)歷短暫無(wú)序之后再恢復(fù)有序。從MessageQueueSelector源碼來(lái)尋找答案:

RocketMQ NameServer深入剖析

可以看到,這個(gè)接口到的select方法接收一個(gè)List<MessageQueue>類型參數(shù),也就是當(dāng)前Topic下的隊(duì)列集合。這個(gè)接口由業(yè)務(wù)RD實(shí)現(xiàn),生產(chǎn)者客戶端在發(fā)送消息之前會(huì)回調(diào)這個(gè)接口。

正常情況下的有序

業(yè)務(wù)RD在實(shí)現(xiàn)這個(gè)接口時(shí),為了保證消息的有序??梢圆扇∫恍┎呗?,例如:發(fā)送的是一個(gè)用戶的消息,先計(jì)算pos=user_id%mqs.size(),之后mqs.get(pos)獲得對(duì)應(yīng)的隊(duì)列。因此在正常情況下,一個(gè)用戶的消息總是有序的。

異常情況下的短暫無(wú)序

在異常情況下,例如一個(gè)Broker宕機(jī),路由信息刷新后,這個(gè)Broker上隊(duì)列就會(huì)從List集合中移除。此時(shí)按照相同的方式選擇隊(duì)列,就會(huì)選擇到其他隊(duì)列上,造成了無(wú)序。但是這個(gè)無(wú)序是很短暫的,因?yàn)橹笸粋€(gè)用戶的信息,都會(huì)發(fā)送到同一個(gè)新的隊(duì)列上。如果宕機(jī)的broker恢復(fù)了,那么再次經(jīng)歷一下短暫無(wú)序,之后又變得有序了。

4.3?嚴(yán)格有序消息重試的問(wèn)題

對(duì)于嚴(yán)格有序消息,由于直接指定了一個(gè)MessageQueue。如果這個(gè)MessageQueue所在的Broker宕機(jī)了,那么之后的重試必然都失敗,只有無(wú)限重試,直到成功。因此,非必要的情況下,是不建議使用嚴(yán)格有序消息的。

5?客戶端NameServer選擇策略

前面講解了客戶端在獲取路由信息時(shí),每次都會(huì)嘗試先從緩存的路由表中查找Topic路由信息,如果找不到,那么就去NameServer更新嘗試。下面介紹一下客戶端NameServer節(jié)點(diǎn)的選擇策略。

RocketMQ會(huì)將用戶設(shè)置的NameServer列表會(huì)設(shè)置到NettyRemotingClient類的namesrvAddrList字段中,NettyRemotingClient是RocketMQ對(duì)Netty進(jìn)行了封裝,如下:

private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();

具體選擇哪個(gè)NameServer,也是使用round-robin的策略。需要注意的是,盡管使用round-robin策略,但是在選擇了一個(gè)NameServer節(jié)點(diǎn)之后,后面總是會(huì)優(yōu)先選擇這個(gè)NameServer,除非與這個(gè)NameServer節(jié)點(diǎn)通信出現(xiàn)異常的情況下,才會(huì)選擇其他節(jié)點(diǎn)。

為什么客戶端不與所有NameServer節(jié)點(diǎn)建立連接呢,而是只選擇其中一個(gè)?筆者考慮,通常NameServer節(jié)點(diǎn)是固定的幾個(gè),但是客戶端的數(shù)量可能是成百上千,為了減少每個(gè)NameServer節(jié)點(diǎn)的壓力,所以每個(gè)客戶端節(jié)點(diǎn)只隨機(jī)與其中一個(gè)NameServer節(jié)點(diǎn)建立連接。

為了盡可能保證NameServer集群每個(gè)節(jié)點(diǎn)的負(fù)載均衡,在round-robin策略選擇時(shí),每個(gè)客戶端的初始隨機(jī)位置都不同,如下:

private?final?AtomicInteger?namesrvIndex?=?new?AtomicInteger(initValueIndex());

其中initValueIndex()就是計(jì)算一個(gè)隨機(jī)值,之后每次選擇NameServer時(shí),namesrvIndex+1之后,對(duì)namesrvAddrList取模,計(jì)算在數(shù)據(jù)下標(biāo)的位置,嘗試創(chuàng)建連接,一旦創(chuàng)建成功,會(huì)將當(dāng)前選擇的NameServer地址記錄到namesrvAddrChoosed字段中:

private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>();

如果某個(gè)NameServer節(jié)點(diǎn)創(chuàng)建連接失敗是,會(huì)自動(dòng)重試其他節(jié)點(diǎn)。具體可參見:getAndCreateNameserverChannel最后留一個(gè)筆者沒(méi)有想明白的問(wèn)題,選擇NameServer的初始隨機(jī)位置是 ?initValueIndex()方法,這個(gè)方法的實(shí)現(xiàn)如下:

private static int initValueIndex() {
    Random r = new Random();
    return Math.abs(r.nextInt() % 999) % 999;
}
向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI