您好,登錄后才能下訂單哦!
這篇文章給大家分享的是有關(guān)RocketMQ源碼中如何實現(xiàn)注冊服務(wù)器的內(nèi)容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。
該類用于啟動注冊服務(wù)器。其main
方法委托了main0
方法,該方法的執(zhí)行邏輯如下:
調(diào)用方法NamesrvStartup#createNamesrvController
創(chuàng)建一個NamesrvController
實例,聲明為controller
。
調(diào)用方法NamesrvStartup#start
將這個controller啟動。
那么下面就分別來看下兩個方法的具體內(nèi)容。
這個方法最重要的就是用構(gòu)造方法創(chuàng)建了NamesrvController
對象。而在調(diào)用構(gòu)造方法之前有較多的代碼是用于解析命令行對象,以及可能的情況下讀取文件中的配置信息、打印當(dāng)前的整體配置信息。
這些額外配置不存在的時候,默認(rèn)配置下,注冊服務(wù)器是監(jiān)聽于9876端口。
該方法的作用是啟動入?yún)⒌?code>NamesrvController實例。具體來說,流程如下:
執(zhí)行方法NamesrvController#initialize
進行初始化。
為運行時添加一個hook,在JVM關(guān)閉的時候,執(zhí)行方法NamesrvController#shutdown
對注冊服務(wù)器執(zhí)行優(yōu)雅關(guān)閉。
執(zhí)行方法NamesrvController#start
啟動注冊服務(wù)器。
這個類用于控制注冊服務(wù)器。
構(gòu)造方法中主要是為了幾個重要屬性進行賦值操作。比如初始化kvConfigManager
和routeInfoManager
這兩個重要的屬性。
該方法用于初始化注冊服務(wù)器,執(zhí)行邏輯如下:
執(zhí)行方法kvconfig.KVConfigManager#load
加載配置信息。默認(rèn)情況下,加載 ${user.home}/namesrv/kvConfig.json 文件的內(nèi)容到屬性kvconfig.KVConfigManager#configTable
中。
新建一個NettyRemotingServer對象,為屬性NamesrvController#remotingServer
賦值。這個新建的對象,使用了BrokerHousekeepingService
作為入?yún)?。?code>BrokerHousekeepingService的作用就是在發(fā)生通道關(guān)閉、異常、空閑等情況時,將該通道從路由信息里刪除。
創(chuàng)建一個線程池,賦值給屬性NamesrvController#remotingExecutor
,用于注冊服務(wù)器在Netty中的業(yè)務(wù)執(zhí)行。
調(diào)用方法NamesrvController#registerProcessor
將業(yè)務(wù)處理器注冊到RemotingServer
中。使用的線程池就是步驟3創(chuàng)建的線程池。
創(chuàng)建一個間隔時間為10秒的周期性任務(wù),任務(wù)內(nèi)容是調(diào)用方法RouteInfoManager#scanNotActiveBroker
掃描非激活模式的Broker
。
該方法沒有更多內(nèi)容,只是簡單了啟動了RemotingServer
。在這個方法之后,就可以開始監(jiān)聽Broker
上送的注冊請求。
該類是注冊服務(wù)器的配置存儲類。會將配置信息存儲在文件 ${user.home}/namesrv/kvConfig.json 。內(nèi)部用來存儲配置信息的是一個HashMap<String, HashMap<String, String>>
結(jié)構(gòu),也就是兩級結(jié)構(gòu)。
第一級是命名空間,第二集是KV對,都是字符串形式。
該類的load
方法可以從文件中加載數(shù)據(jù)到內(nèi)存里,persist
方法可以將內(nèi)存中的數(shù)據(jù)再寫入到文件中。
這個類是 rocketmq-namesrv 這個包下面,代碼量最多的類了。因為業(yè)務(wù)處理都實現(xiàn)在了這個類上面。
按照NettyRequestProcessor
接口的實現(xiàn)套路,業(yè)務(wù)請求的分流都是在processRequest
方法中,這里也是,接下來就一個個看這個類支持的命令。
該命令沒有請求體,請求頭中有namespace
、key
、value
字段,調(diào)用方法kvconfig.KVConfigManager#putKVConfig
將配置項放入到配置管理器中即可。
該命令沒有請求體,請求頭中有namespace
、key
字段,調(diào)用方法kvconfig.KVConfigManager#getKVConfig
獲取對應(yīng)配置項。
如果配置項存在,返回成功響應(yīng)。如果配置信息不存在,返回失敗響應(yīng),響應(yīng)碼為QUERY_NOT_FOUND。
該命令沒有請求體,請求頭中有namespace
、key
字段,調(diào)用方法kvconfig.KVConfigManager#deleteKVConfig
刪除對應(yīng)配置項。
該命令用于查詢注冊服務(wù)器上Broker
的數(shù)據(jù)版本號。具體執(zhí)行邏輯如下:
從命令的內(nèi)容體解析出DataVersion
對象,從請求頭中解析出BrokerAddr
數(shù)據(jù)。使用這兩個作為入?yún)?,調(diào)用方法RouteInfoManager#isBrokerTopicConfigChanged
判斷與服務(wù)器上該BrokerAddr
的版本號是否一致,將結(jié)果聲明為changed
。
如果changed
為false
,表明版本號沒有變化,那么服務(wù)器上的數(shù)據(jù)在當(dāng)前時間還是有效的,調(diào)用方法RouteInfoManager#updateBrokerInfoUpdateTimestamp
更新這個數(shù)據(jù)的有效時間。
調(diào)用方法RouteInfoManager#queryBrokerTopicConfig
查詢服務(wù)器上BrokerAddr
對應(yīng)的版本號,聲明為nameSeverDataVersion
。
構(gòu)建命令響應(yīng)對象,如果nameSeverDataVersion
不為null,則編碼后設(shè)置到內(nèi)容體。在響應(yīng)頭中設(shè)置changed
屬性,值為步驟1產(chǎn)生的聲明對象。
該命令用于Broker
信息的注冊。首先獲取請求頭中MQ的版本號,如果版本號大于等于3.0.11,則調(diào)用方法processor.DefaultRequestProcessor#registerBrokerWithFilterServer
進行信息注冊;否則調(diào)用方法processor.DefaultRequestProcessor#registerBroker
進行信息注冊。
方法的執(zhí)行邏輯如下:
對請求命令進行解碼工作,創(chuàng)建出RegisterBrokerRequestHeader
對象。使用該對象對象和請求中的body字段執(zhí)行crc校驗,如果校驗失敗,返回系統(tǒng)錯誤響應(yīng)。否則,繼續(xù)后續(xù)流程。
如果命令請求對象中包含內(nèi)容體,則解碼出RegisterBrokerBody
對象,聲明為registerBrokerBody
。如果命令請求對象不包含內(nèi)容體,則手動創(chuàng)建RegisterBrokerBody
對象,并且將其DataVersion
的版本號設(shè)置為0,時間戳設(shè)置為0.
調(diào)用方法RouteInfoManager#registerBroker
注冊路由信息,將結(jié)果聲明為result
。
創(chuàng)建類型為RegisterBrokerResponseHeader
的響應(yīng)頭對象,聲明為responseHeader
。將result
的masterAddr
和HaServerAddr
屬性設(shè)置到響應(yīng)頭對象中。
從配置管理器中以ORDER_TOPIC_CONFIG作為命名空間,取出該命名空間下面的配置數(shù)據(jù)對象,編碼后將二進制設(shè)置為響應(yīng)的內(nèi)容體。
返回響應(yīng)對象。
與registerBrokerWithFilterServer
方法的流程基本一致,只不過在調(diào)用方法RouteInfoManager#registerBroker
的時候,入?yún)⒌?code>filterServerList為null。
該命令用于注銷 Broker 的注冊。調(diào)用方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#unregisterBroker
完成,而該方法內(nèi)部則是委托給了方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#unregisterBroker
。
該命名用于查詢主題的路由信息,調(diào)用了方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getRouteInfoByTopic
。
該方法用于在路由管理器中根據(jù)主題名稱獲取全量的路由信息,具體流程如下:
使用方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#pickupTopicRouteData
根據(jù)請求的主題名稱得到類型為TopicRouteData
的結(jié)果,聲明為topicRouteData
。
如果topicRouteData
不為null,則執(zhí)行如下子流程。
如果配置org.apache.rocketmq.common.namesrv.NamesrvConfig#orderMessageEnable
開啟,則從命名空間ORDER_TOPIC_CONFIG下面,獲取入?yún)⒅黝}名稱的配置信息,聲明為orderTopicConf
。將orderTopicConf
設(shè)置到屬性org.apache.rocketmq.common.protocol.route.TopicRouteData#orderTopicConf
。
將topicRouteData
進行編碼,設(shè)置為響應(yīng)的內(nèi)容體,返回響應(yīng)對象。
如果topicRouteData
為null,則返回TOPIC_NOT_EXIST響應(yīng)。
該命令調(diào)用了方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getBrokerClusterInfo
。該方法的邏輯就是調(diào)用方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getAllClusterInfo
得到一個編碼后的內(nèi)容體,將這個內(nèi)容體設(shè)置為響應(yīng)的內(nèi)容體,返回響應(yīng)對象即可。
編碼的內(nèi)容體數(shù)據(jù)結(jié)構(gòu)類是ClusterInfo
,其屬性如下
HashMap<String/* brokerName */, BrokerData> brokerAddrTable; HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
該命令用于擦除Broker的寫權(quán)限,也就說所有在該Broker
上的主題都沒有寫入權(quán)限了。調(diào)用了方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#wipeWritePermOfBroker
實現(xiàn),該方法的邏輯如下:
調(diào)用方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#wipeWritePermOfBrokerByLock
擦除入?yún)?code>Broker的寫權(quán)限,方法的返回值為擦除的隊列信息個數(shù)。將結(jié)果聲明為wipeTopicCnt
。
將wipeTopicCnt
設(shè)置到響應(yīng)頭的對應(yīng)屬性,返回響應(yīng)。
該命令用于獲取注冊服務(wù)器上全量的主題信息,調(diào)用方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getAllTopicListFromNameserver
實現(xiàn)。
該方法內(nèi)部調(diào)用方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getAllTopicList
獲取所有的主題名稱形成的列表,并且編碼為二進制數(shù)組,設(shè)置為響應(yīng)的內(nèi)容體,將響應(yīng)返回。
該命令用于刪除服務(wù)器上的主題信息,通過方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#deleteTopicInNamesrv
實現(xiàn)。方法實現(xiàn)也簡單,直接從topicQueueTable
中刪除對應(yīng)的主題名稱即可。
該命令用于獲取服務(wù)器上特定命名空間下的配置信息。通過方法org.apache.rocketmq.namesrv.kvconfig.KVConfigManager#getKVListByNamespace
獲取到對應(yīng)的配置信息,并且編碼為二進制數(shù)組。
如果數(shù)組存在,則設(shè)置到響應(yīng)的內(nèi)容體中,返回成功響應(yīng)。
如果數(shù)組不存在,則返回QUERY_NOT_FOUND響應(yīng)。
該命令用于獲取集群下所有的主題名稱,調(diào)用方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getTopicsByCluster
完成。該方法內(nèi)部調(diào)用org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getTopicsByCluster
獲取集群下的所有主題名稱的編碼結(jié)果,將編碼結(jié)果的二進制數(shù)組設(shè)置到響應(yīng)的內(nèi)容體中,返回成功響應(yīng)。
這個命令有點奇怪,看命令名稱是獲取系統(tǒng)主題列表。但是從方法實現(xiàn)上,內(nèi)部的內(nèi)容整體是混亂的。這個命令暫且放下,等看到相關(guān)聯(lián)的請求查詢的時候在處理。
該命令用于獲取集群下,有unit標(biāo)識的主題名稱集合。通過方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getUnitTopicList
實現(xiàn),該方法內(nèi)部調(diào)用了方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getUnitTopics
來返回具備unit標(biāo)識的主題名稱集合的編碼后二進制數(shù)組。將這個數(shù)組設(shè)置為響應(yīng)的內(nèi)容體,并且返回。
該命令用于獲取集群下,有unit_sub標(biāo)識的主題名稱集合。做法上與GET_UNIT_TOPIC_LIST命令是相同的,只不過用的標(biāo)識不同。
該命令用于獲取集群下,同時有unit和unit_sub標(biāo)識的主題名稱集合。做法上與上述的一致,只不過用的標(biāo)識不同。
這個命令是用于管理端直接發(fā)送配置的文本到注冊服務(wù),用于更新注冊服務(wù)自身的配置,而后將配置信息持久化到磁盤文件。
這個命令用于獲取注冊服務(wù)的配置信息,將配置信息設(shè)置到響應(yīng)的內(nèi)容體中。
該類是路由信息的管理器,其中使用了多個類來抽象各種路由信息。下面先看下這些定義類。
QueueData
該類保存了Broker中的隊列信息。有如下屬性:
brokerName,Broker的名稱,默認(rèn)情況下是Broker所在機器的域名,可以由配置定義。
readQueueNums,用于讀取的隊列數(shù)量。
writeQueueNums,用于寫入的隊列數(shù)量。
perm,該Broker的權(quán)限信息,權(quán)限指的是是否可讀、是否可寫。
topicSynFlag,主題同步標(biāo)識。
BrokerData
該類保存了Broker集群的地址信息,有如下屬性:
cluster,集群標(biāo)識。
brokerName,Broker名稱。
brokerAddrs,brokerId和BrokerAddr的映射表。該屬性存儲了同一個Broker名稱下id和地址的映射關(guān)系。
BrokerLiveInfo
該類保存了具體某個Broker的存活信息,有如下屬性
lastUpdateTimestamp,最近一次數(shù)據(jù)更新時間。
dataVersion,該Broker的主題配置信息的版本號。
channel,Netty的Channel對象,該對象即是Broker與服務(wù)器之間的鏈接對象。
haServerAddr,高可用主節(jié)點地址。格式為${ip}:${port} 。
RouteInfoManager內(nèi)部管理著5個Map結(jié)構(gòu),用于存儲路由相關(guān)信息,這些信息用代碼來看會更清晰一些,如下:
HashMap<String/* topic */, List<QueueData>> topicQueueTable; HashMap<String/* brokerName */, BrokerData> brokerAddrTable; HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
該方法用于實現(xiàn)Broker信息注冊到路由管理器上,具體方法流程如下:
從clusterAddrTable
中以入?yún)⒌?code>clusterName獲取集群下所有Broker
的名稱,聲明為brokerNames
。
如果brokerNames
為null,則為其賦值一個空的HashSet<String>
。并且在clusterAddrTable
放入這個clusterName
和brokerNames
兩個值。
在brokerNames
中添加本次注冊上來的Broker的名稱。
從brokerAddrTable
以brokerName
獲取BrokerData
對象,如果不存在則新建一個并且放入到brokerAddrTable
中。
取出步驟4中brokerData
中的brokerAddrs
映射,遍歷其中的元素,如果值與入?yún)⒌?code>brokerAddr相等,鍵與入?yún)⒌?code>brokerId不等,則刪除這個這一鍵值對。這種情況說明此時該IP對應(yīng)的Broker信息已經(jīng)發(fā)生了變化。
將入?yún)⒌?code>brokerId和brokerAddr
放入到brokerAddrs
中。
如果brokerId為0也就是主節(jié)點,并且入?yún)⒌?code>topicConfigWrapper不為null,也就是說Broker發(fā)送的注冊命令是包含了請求體,那么執(zhí)行子流程。否則繼續(xù)后續(xù)流程。
從brokerLiveTable
查詢該broker
的版本號,與topicConfigWrapper
的版本號對比,確認(rèn)是否有變化。如果有變化,或者該Broker
是新注冊的(brokerName
第一次注冊或者brokerId
第一次注冊),那么就很有可能本次攜帶了新的主題配置信息。則需要更更新注冊服務(wù)器上主題配置信息。也就執(zhí)行后續(xù)流程。否則結(jié)束子流程,繼續(xù)執(zhí)行步驟8.
遍歷屬性TopicConfigSerializeWrapper#topicConfigTable
,對集合中每一個元素調(diào)用方法RouteInfoManager#createAndUpdateQueueData
,更新主題對應(yīng)的隊列信息。
構(gòu)建BrokerLiveInfo
對象,放入brokerLiveTable
中。
如果入?yún)⒌?code>filterServerList不為null,則放入filterServerTable
。
如果brokerId不為0,也就是當(dāng)前是從節(jié)點在注冊自己,則從brokerAddrs
獲取主節(jié)點的地址。如果主節(jié)點地址存在,則進一步獲取其 HaServer 地址。將這兩個數(shù)據(jù)設(shè)置到返回的結(jié)果對象result中。
返回結(jié)果對象result。從代碼可以看出,如果當(dāng)前注冊不是從節(jié)點,或者對應(yīng)的主節(jié)點不存在,則result是一個空對象。
該方法是用于創(chuàng)建或更新 topicQueueTable 中的QueueData
對象的。具體流程如下:
構(gòu)建一個QueuData對象,里面的屬性來自brokerName、topicConfig 對象。
從 topicQueueTable 中獲取topicConfig 主題對應(yīng)的 queueDataList 對象。
如果 queueDataList
不存在,意味著該主題是第一次出現(xiàn)在注冊服務(wù)器中。構(gòu)建一個新的linkedList
對象,添加queueData
對象到其中,并且將queueDataList
放入到topicQueueTable
中。流程結(jié)束。
如果queueDataList
存在,則對其元素遍歷,執(zhí)行如下子操作。
元素的brokerName
屬性與入?yún)⒌?code>brokerName值相同,則繼續(xù)執(zhí)行后續(xù)流程,否則進入下一次循環(huán)迭代。
判斷元素與步驟1構(gòu)建的對象是否相同,如果相同,不做操作;如果不同,意味著數(shù)據(jù)有變化,將元素從集合中刪除。
如果步驟4中有元素被刪除,則將步驟1的對象,添加到queueDataList
中。
該方法用于在刪除路由管理器中某一個Broker
的信息。具體流程如下:
在brokerLiveTable
中刪除該Broker
信息。
在filterServerTable
刪除該Broker
的信息。
聲明一個局部變量removeBrokerName
。從brokerAddrTable
獲取該BrokerName
對應(yīng)的brokerData
。如果其不為空,則執(zhí)行子流程。
從brokerData
的brokerAddrs
刪除該brokerId
對應(yīng)的映射。
如果brokerAddrs
集合為空,則從brokerAddrTable
刪除該brokerName
對應(yīng)的映射。為removeBrokerName
賦值true
。
如果removeBrokerName
為真,則執(zhí)行子流程,否則流程結(jié)束。
從clusterAddrTable
獲取該clusterName
對應(yīng)的brokerName
的集合,聲明為nameSet
。
nameSet
不為null的情況下,從nameSet
刪除本次的brokerName
。如果刪除后nameSet
為空,則從clusterAddrTable
刪除該brokerName
的映射。
調(diào)用方法removeTopicByBrokerName
刪除brokerName
對應(yīng)的主題信息。
該方法用于刪除brokerName
對應(yīng)的主題配置信息,具體執(zhí)行邏輯如下:
遍歷topicQueueTable
,為每一個元素執(zhí)行后續(xù)邏輯。
針對每一個元素,取出其QueueData
列表,遍歷該對象。執(zhí)行子流程。
遍歷QueueData
列表,如果元素QueueData
的brokerName
與入?yún)?code>brokerName相同,則從列表中刪除該元素。
遍歷完畢后,如果列表為空,則從topicQueueTable
中刪除該映射。
首先來看下數(shù)據(jù)結(jié)構(gòu)對象TopicRouteData
的定義,其屬性如下
String orderTopicConf; List<QueueData> queueDatas; List<BrokerData> brokerDatas; HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable
從數(shù)據(jù)結(jié)構(gòu)對象也可以簡單的推測出pickupTopicRouteData
方法的實現(xiàn)邏輯。大致來說分為幾個步驟:
從topicQueueTable
按照主題名稱查詢queueDatas
。
根據(jù)queueDatas
中每一個元素QueueData
的brokerName
屬性從brokerAddrTable
取得brokerData
對象,組成成一個List,也就是brokerDatas
。
根據(jù)步驟2的brokerDatas
從filterServerTable
查詢到對應(yīng)的filterServer
列表,組裝為映射。
將步驟1到3的值組裝為TopicRouteData
對象返回給調(diào)用者。
該方法會以可中斷的方式獲取寫鎖,獲取成功后調(diào)用方法wipeWritePermOfBroker
。如果獲取失敗則返回0,獲取成功則執(zhí)行方法wipeWritePermOfBroker
執(zhí)行擦除工作。
wipeWritePermOfBroker
方法的內(nèi)容也很簡單,遍歷topicQueueTable
,針對每一個元素,在遍歷其QueueData
,如果brokerName
與入?yún)⒌?code>brokerName相同就意味著找到對應(yīng)的QueueData
。將這個里面的perm
屬性重新設(shè)置值,去掉代表寫權(quán)限的標(biāo)志位即可。
該方法用于獲取具備unit標(biāo)識的主題名稱集合。具體流程如下:
以可中斷的方式獲取讀鎖。遍歷topicQueueTable
元素。
如果鍵值對中的QueueData
列表的首個元素的topicSynFlag
屬性值包含了unit標(biāo)識,將這個鍵值對的key
,也即是主題名稱加入到臨時集合中。
遍歷完后后,返回臨時集合編碼的二進制數(shù)組。
當(dāng)一個Broker的通道關(guān)閉的時候,會觸發(fā)到這個方法。這個方法的代碼雖然比較多,但是方法思路很簡單,首先通過Channel在brokerLiveTable
中找到對應(yīng)的BrokerLiveInfo對象。并且依靠這個對象的信息,在路由管理器中刪除所有相關(guān)的信息接口。
感謝各位的閱讀!關(guān)于“RocketMQ源碼中如何實現(xiàn)注冊服務(wù)器”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,讓大家可以學(xué)到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。