您好,登錄后才能下訂單哦!
RocketMQ 是一款開(kāi)源的消息中間件,采用Java實(shí)現(xiàn),設(shè)計(jì)思想來(lái)自于Kafka(Scala實(shí)現(xiàn)),在具體設(shè)計(jì)時(shí)體現(xiàn)了自己的選擇和需求,具體差別可以看RocketMQ與Kafka對(duì)比。接下來(lái)是自己閱讀源碼的一些探索。
RocketMQ的整體架構(gòu)如下,可以看到各個(gè)組件充當(dāng)?shù)慕巧?,Name Server 負(fù)責(zé)維護(hù)一些全局的路由信息:當(dāng)前有哪些broker,每個(gè)Topic在哪個(gè)broker上; Broker具體處理消息的存儲(chǔ)和服務(wù);生產(chǎn)者和消費(fèi)者是消息的源頭和歸宿。
一、Producer 發(fā)送消息
Producer發(fā)送消息是如何得知發(fā)到哪個(gè)broker的 ? 每個(gè)應(yīng)用在收發(fā)消息之前,一般會(huì)調(diào)用一次producer.start()/consumer.start()做一些初始化工作,其中包括:創(chuàng)建需要的實(shí)例對(duì)象,如MQClientInstance;設(shè)置定時(shí)任務(wù),如從Nameserver中定時(shí)更新本地的Topic route info,發(fā)送心跳信息到所有的 broker,動(dòng)態(tài)調(diào)整線程池的大小,把當(dāng)前producer加入到指定的組中等等。客戶端會(huì)緩存路由信息TopicPublishInfo, 同時(shí)定期從NameServer取Topic路由信息,每個(gè)Broker與NameServer集群中的所有節(jié)點(diǎn)建立長(zhǎng)連接,定時(shí)注冊(cè)Topic信息到所有的NameServer。Producer在發(fā)送消息的時(shí)候會(huì)去查詢本地的topicPublishInfoTable(一個(gè)ConcurrentHashMap),如果沒(méi)有命中的話就會(huì)詢問(wèn)NameServer得到路由信息 (RequestCode=GET_ROUTEINTO_BY_TOPIC) 如果nameserver中也沒(méi)有查詢到(表示該主題的消息第一次發(fā)送),那么將會(huì)發(fā)送一個(gè)default的topic進(jìn)行路由查詢。
具體過(guò)程如下圖所示:
Producer 在得到了具體的通信地址后,發(fā)送過(guò)程就顯而易見(jiàn)了。通過(guò)代碼可以看到在選擇消息隊(duì)列進(jìn)行發(fā)送時(shí)采用隨機(jī)方式,同時(shí)和上一次發(fā)送的broker保持不同,防止熱點(diǎn)。
二、Broker處理來(lái)自Producer的消息
每個(gè)producer在發(fā)送消息的時(shí)候都和對(duì)應(yīng)的Broker建立了長(zhǎng)連接,此時(shí)broker已經(jīng)準(zhǔn)備好接收Message,Broker的SendMessageProcessor.sendMessage處理消息的存儲(chǔ),具體過(guò)程如下。接收到消息后,會(huì)先寫入Commit Log文件(順序?qū)?,寫滿了會(huì)新建一個(gè)新的文件),然后更新Consume queue文件(存儲(chǔ)如何由topic定位到具體的消息)。
三、RocketMQ 存儲(chǔ)特點(diǎn)
RocketMQ的消息采用順序?qū)懙絚ommitlog文件,然后利用consume queue文件作為索引,如圖。RocketMQ采用零拷貝mmap+write的方式來(lái)回應(yīng)Consumer的請(qǐng)求,RocketMQ宣稱大部分請(qǐng)求都會(huì)在Page Cache層得到滿足,所以消息過(guò)多不會(huì)因?yàn)榇疟P讀使得性能下降,這里自己的理解是,在64bit機(jī)器下,虛存地址空間(vm_area_struct)不是問(wèn)題,所以相關(guān)的文件都會(huì)被映射到內(nèi)存中(有定期刪除文件的操作),即使此刻不在內(nèi)存,操作系統(tǒng)也會(huì)因?yàn)槿表?yè)異常進(jìn)行換入,雖然地址空間不是問(wèn)題,但是一個(gè)進(jìn)程映射文件的個(gè)數(shù)(/proc/sys/vm/max_map_count)是有限的,所以可能在這里發(fā)生OOM。
通過(guò)Broker中的存儲(chǔ)目錄(默認(rèn)路徑是 $HOME/store)也能看到存儲(chǔ)的邏輯視圖:
四、順序消息是如何保證的?
需要業(yè)務(wù)層自己決定哪些消息應(yīng)該順序到達(dá),然后發(fā)送的時(shí)候通過(guò)規(guī)則(hash)映射到同一個(gè)隊(duì)列,因?yàn)闆](méi)有誰(shuí)比業(yè)務(wù)自己更加知道關(guān)于消息順序的特點(diǎn)。這樣的順序是相對(duì)順序,局部順序,因?yàn)榘l(fā)送方只保證把這些消息順序的發(fā)送到broker上的同一隊(duì)列,但是不保證其他Producer也會(huì)發(fā)送消息到那個(gè)隊(duì)列,所以需要Consumer在拉到消息后做一些過(guò)濾。
五、RocketMQ 刷盤實(shí)現(xiàn)
Broker 在消息的存取時(shí)直接操作的是內(nèi)存(內(nèi)存映射文件),這可以提供系統(tǒng)的吞吐量,但是無(wú)法避免機(jī)器掉電時(shí)數(shù)據(jù)丟失,所以需要持久化到磁盤中。刷盤的最終實(shí)現(xiàn)都是使用NIO中的 MappedByteBuffer.force() 將映射區(qū)的數(shù)據(jù)寫入到磁盤,如果是同步刷盤的話,在Broker把消息寫到CommitLog映射區(qū)后,就會(huì)等待寫入完成。異步而言,只是喚醒對(duì)應(yīng)的線程,不保證執(zhí)行的時(shí)機(jī),流程如圖所示,更多細(xì)節(jié)可以參考。
六、消息過(guò)濾
類似于重復(fù)數(shù)據(jù)刪除技術(shù)(Data Deduplication),可以在源端做,也可以在目的端實(shí)現(xiàn),就是網(wǎng)絡(luò)和存儲(chǔ)的權(quán)衡,如果在Broker端做消息過(guò)濾就需要逐一比對(duì)consume queue 的 tagsCode 字段(hashcode),如果符合則傳輸給消費(fèi)者,因?yàn)槭?hashcode,所以存在誤判,需要在 Consumer 接收到消息后進(jìn)行字符串級(jí)別的過(guò)濾,確保準(zhǔn)確性。
小結(jié)
這次代碼閱讀主要著眼于消息的發(fā)送過(guò)程和Broker上的存儲(chǔ),其他方面的細(xì)節(jié)有待深入。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。