溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ消息發(fā)送出現(xiàn)system busy、broker busy的原因與解決方案

發(fā)布時間:2021-06-26 14:21:27 來源:億速云 閱讀:357 作者:chen 欄目:大數(shù)據(jù)

這篇文章主要講解了“RocketMQ消息發(fā)送出現(xiàn)system busy、broker busy的原因與解決方案”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“RocketMQ消息發(fā)送出現(xiàn)system busy、broker busy的原因與解決方案”吧!


1、現(xiàn)象

最近收到很多RocketMQ使用者,反饋生產環(huán)境中在消息發(fā)送過程中偶爾會出現(xiàn)如下4個錯誤信息之一:

  • [REJECTREQUEST]system busy, start flow control for a while

  • too many requests and system thread pool busy, RejectedExecutionException

  • [PC_SYNCHRONIZED]broker busy, start flow control for a while

  • [PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d

2、原理解讀

在進行消息中間件的選型時,如果待選中間件在功能上、性能上都能滿足業(yè)務的情況下,建議把中間件的實現(xiàn)語言這個因素也考慮進去,畢竟選擇一門用自己擅長的語言實現(xiàn)的中間件會更具掌控性。在出現(xiàn)異常的情況下,我們可以根據(jù)自己的經驗提取錯誤信息關鍵字system busy,在RocketMQ源碼中直接搜索,得到拋出上述錯誤信息的代碼如下: RocketMQ消息發(fā)送出現(xiàn)system busy、broker busy的原因與解決方案

其代碼入口為:org.apache.rocketmq.remoting.netty.NettyRemotingAbstract 的 processRequestCommand 方法。從圖中可以看出,拋出上述錯誤的關鍵原因是:pair.getObject1() 的 rejectRequest 方法和拋出RejectedExecutionException 異常。

備注:本文偏實戰(zhàn),源碼只是作為分析的重點證據(jù),故本文只會點出關鍵源碼,并不會詳細跟蹤其整個實現(xiàn)流程,如果想詳細了解其實現(xiàn),可以查閱筆者編著的《RocketMQ技術內幕》。

2.1 RocketMQ 網絡處理機制概述

RocketMQ的網絡設計非常值得我們學習與借鑒,首先在客戶端端將不同的請求定義不同的請求命令CODE,服務端會將客戶端請求進行分類,每個命令或每類請求命令定義一個處理器(NettyRequestProcessor),然后每一個NettyRequestProcessor綁定到一個單獨的線程池,進行命令處理,不同類型的請求將使用不同的線程池進行處理,實現(xiàn)線程隔離。

為了方便下文的描述,我們先簡單的認識一下NettyRequestProcessor、Pair、RequestCode。其核心關鍵點如下: RocketMQ消息發(fā)送出現(xiàn)system busy、broker busy的原因與解決方案

  1. NettyRequestProcessor RocketMQ 服務端請求處理器,例如SendMessageProcessor是消息發(fā)送處理器、PullMessageProcessor是消息拉取命令處理器。

  2. RequestCode 請求CODE,用來區(qū)分請求的類型,例如SEND_MESSAGE:表示該請求為消息發(fā)送,PULL_MESSAGE:消息拉取請求。

  3. Pair 用來封裝NettyRequestProcessor與ExecuteService的綁定關系。在RocketMQ的網絡處理模型中,會為每一個NettyRequestProcessor與特定的線程池綁定,所有該NettyRequestProcessor的處理邏輯都在該線程池中運行。

2.2 pair.getObject1().rejectRequest()

由于讀者朋友提出的問題,都是發(fā)生在消息發(fā)送過程中,故本文重點關注SendMessageProcessor#rejectRequest方法。 SendMessageProcessor#rejectRequest

public boolean rejectRequest() {
    return this.brokerController.getMessageStore().isOSPageCacheBusy() ||               // [@1](https://my.oschina.net/u/1198)
        this.brokerController.getMessageStore().isTransientStorePoolDeficient();        // @2
}

拒絕請求的條件有兩個,只要其中任意一個滿足,則返回true。

代碼@1:Os PageCache busy,判斷操作系統(tǒng)PageCache是否繁忙,如果忙,則返回true。想必看到這里大家肯定與我一樣好奇,RocketMQ是如何判斷pageCache是否繁忙呢?下面會重點分析。

代碼@2:transientStorePool是否不足。

2.2.1 isOSPageCacheBusy()

DefaultMessageStore#isOSPageCacheBusy()

public boolean isOSPageCacheBusy() {
    long begin = this.getCommitLog().getBeginTimeInLock();  // [@1](https://my.oschina.net/u/1198) start
    long diff = this.systemClock.now() - begin;                         // @1  end

    return diff < 10000000
                && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();     // @2
}

代碼@1:先重點解釋begin、diff兩個局部變量的含義:

  • begin 通俗的一點講,就是將消息寫入Commitlog文件所持有鎖的時間,精確說是將消息體追加到內存映射文件(DirectByteBuffer)或pageCache(FileChannel#map)該過程中開始持有鎖的時間戳,具體的代碼請參考:CommitLog#putMessage。

  • diff 一次消息追加過程中持有鎖的總時長,即往內存映射文件或pageCache追加一條消息所耗時間。

代碼@2:如果一次消息追加過程的時間超過了Broker配置文件osPageCacheBusyTimeOutMills,則認為pageCache繁忙,osPageCacheBusyTimeOutMills默認值為1000,表示1s。

2.2.2 isTransientStorePoolDeficient()

DefaultMessageStore#isTransientStorePoolDeficient

public boolean isTransientStorePoolDeficient() {
    return remainTransientStoreBufferNumbs() == 0;
}
public int remainTransientStoreBufferNumbs() {
    return this.transientStorePool.remainBufferNumbs();
}

最終調用TransientStorePool#remainBufferNumbs方法。

public int remainBufferNumbs() {
        if (storeConfig.isTransientStorePoolEnable()) {
            return availableBuffers.size();
        }
        return Integer.MAX_VALUE;
}

如果啟用transientStorePoolEnable機制,返回當前可用的ByteBuffer個數(shù),即整個isTransientStorePoolDeficient方法的用意是是否還存在可用的ByteBuffer,如果不存在,即表示pageCache繁忙。那什么是transientStorePoolEnable機制呢?

2.3 漫談transientStorePoolEnable機制

Java NIO的內存映射機制,提供了將文件系統(tǒng)中的文件映射到內存機制,實現(xiàn)對文件的操作轉換對內存地址的操作,極大的提高了IO特性,但這部分內存并不是常駐內存,可以被置換到交換內存(虛擬內存),RocketMQ為了提高消息發(fā)送的性能,引入了內存鎖定機制,即將最近需要操作的commitlog文件映射到內存,并提供內存鎖定功能,確保這些文件始終存在內存中,該機制的控制參數(shù)就是transientStorePoolEnable。

2.3.1 MappedFile

重點關注MappedFile的ByteBuffer writeBuffer、MappedByteBuffer mappedByteBuffer這兩個屬性的初始化,因為這兩個方法是寫消息與查消息操作的直接數(shù)據(jù)結構。 RocketMQ消息發(fā)送出現(xiàn)system busy、broker busy的原因與解決方案

兩個關鍵點如下:

  • ByteBuffer writeBuffer 如果開啟了transientStorePoolEnable,則使用ByteBuffer.allocateDirect(fileSize),創(chuàng)建(java.nio的內存映射機制)。如果未開啟,則為空。

  • MappedByteBuffer mappedByteBuffer 使用FileChannel#map方法創(chuàng)建,即真正意義上的PageCache。

消息寫入時: MappedFile#appendMessagesInner RocketMQ消息發(fā)送出現(xiàn)system busy、broker busy的原因與解決方案

從中可見,在消息寫入時,如果writerBuffer不為空,說明開啟了transientStorePoolEnable機制,則消息首先寫入writerBuffer中,如果其為空,則寫入mappedByteBuffer中。

消息拉取(讀消息): MappedFile#selectMappedBuffer RocketMQ消息發(fā)送出現(xiàn)system busy、broker busy的原因與解決方案

消息讀取時,是從mappedByteBuffer中讀(pageCache)。

大家是不是發(fā)現(xiàn)了一個有趣的點,如果開啟transientStorePoolEnable機制,是不是有了讀寫分離的效果,先寫入writerBuffer中,讀卻是從mappedByteBuffer中讀取。

為了對transientStorePoolEnable引入意圖闡述的更加明白,這里我引入Rocketmq社區(qū)貢獻者胡宗棠關于此問題的見解。

通常有如下兩種方式進行讀寫:

  1. 第一種,Mmap+PageCache的方式,讀寫消息都走的是pageCache,這樣子讀寫都在pagecache里面不可避免會有鎖的問題,在并發(fā)的讀寫操作情況下,會出現(xiàn)缺頁中斷降低,內存加鎖,污染頁的回寫。

  2. 第二種,DirectByteBuffer(堆外內存)+PageCache的兩層架構方式,這樣子可以實現(xiàn)讀寫消息分離,寫入消息時候寫到的是DirectByteBuffer——堆外內存中,讀消息走的是PageCache(對于,DirectByteBuffer是兩步刷盤,一步是刷到PageCache,還有一步是刷到磁盤文件中),帶來的好處就是,避免了內存操作的很多容易堵的地方,降低了時延,比如說缺頁中斷降低,內存加鎖,污染頁的回寫。

溫馨提示:如果想與胡宗棠大神進一步溝通交流,可以關注他的github賬號:https://github.com/zongtanghu

不知道大家會不會有另外一個擔憂,如果開啟了transientStorePoolEnable,內存鎖定機制,那是不是隨著commitlog文件的不斷增加,最終導致內存溢出?

2.3.2 TransientStorePool初始化

RocketMQ消息發(fā)送出現(xiàn)system busy、broker busy的原因與解決方案

從這里可以看出,TransientStorePool默認會初始化5個DirectByteBuffer(對外內存),并提供內存鎖定功能,即這部分內存不會被置換,可以通過transientStorePoolSize參數(shù)控制。

在消息寫入消息時,首先從池子中獲取一個DirectByteBuffer進行消息的追加。當5個DirectByteBuffer全部寫滿消息后,該如何處理呢?從RocketMQ的設計中來看,同一時間,只會對一個commitlog文件進行順序寫,寫完一個后,繼續(xù)創(chuàng)建一個新的commitlog文件。故TransientStorePool的設計思想是循環(huán)利用這5個DirectByteBuffer,只需要寫入到DirectByteBuffer的內容被提交到PageCache后,即可重復利用。對應的代碼如下: TransientStorePool#returnBuffer

public void returnBuffer(ByteBuffer byteBuffer) {
    byteBuffer.position(0);
    byteBuffer.limit(fileSize);
    this.availableBuffers.offerFirst(byteBuffer);
}

其調用棧如下: RocketMQ消息發(fā)送出現(xiàn)system busy、broker busy的原因與解決方案

從上面的分析看來,并不會隨著消息的不斷寫入而導致內存溢出。

3、現(xiàn)象解答

3.1 [REJECTREQUEST]system busy

RocketMQ消息發(fā)送出現(xiàn)system busy、broker busy的原因與解決方案

其拋出的源碼入口點:NettyRemotingAbstract#processRequestCommand,上面的原理分析部分已經詳細介紹其實現(xiàn)原理,總結如下。

在不開啟transientStorePoolEnable機制時,如果Broker PageCache繁忙時則拋出上述錯誤,判斷PageCache繁忙的依據(jù)就是向PageCache追加消息時,如果持有鎖的時間超過1s,則會拋出該錯誤;在開啟transientStorePoolEnable機制時,其判斷依據(jù)是如果TransientStorePool中不存在可用的堆外內存時拋出該錯誤。

3.2 too many requests and system thread pool busy, RejectedExecutionException

RocketMQ消息發(fā)送出現(xiàn)system busy、broker busy的原因與解決方案

其拋出的源碼入口點:NettyRemotingAbstract#processRequestCommand,其調用地方緊跟3.1,是在向線程池執(zhí)行任務時,被線程池拒絕執(zhí)行時拋出的,我們可以順便看看Broker消息處理發(fā)送的線程信息: BrokerController#registerProcessor RocketMQ消息發(fā)送出現(xiàn)system busy、broker busy的原因與解決方案

該線程池的隊列長度默認為10000,我們可以通過sendThreadPoolQueueCapacity來改變默認值。

3.3 [PC_SYNCHRONIZED]broker busy

RocketMQ消息發(fā)送出現(xiàn)system busy、broker busy的原因與解決方案

其拋出的源碼入口點:DefaultMessageStore#putMessage,在進行消息追加時,再一次判斷PageCache是否繁忙,如果繁忙,則拋出上述錯誤。

3.4 broker busy, period in queue: %sms, size of queue: %d

RocketMQ消息發(fā)送出現(xiàn)system busy、broker busy的原因與解決方案

其拋出源碼的入口點:BrokerFastFailure#cleanExpiredRequest。該方法的調用頻率為每隔10s中執(zhí)行一次,不過有一個執(zhí)行前提條件就是Broker端要開啟快速失敗,默認為開啟,可以通過參數(shù)brokerFastFailureEnable來設置。該方法的實現(xiàn)要點是每隔10s,檢測一次,如果檢測到PageCache繁忙,并且發(fā)送隊列中還有排隊的任務,則直接不再等待,直接拋出系統(tǒng)繁忙錯誤,使正在排隊的線程快速失敗,結束等待。

4、實踐建議

經過上面的原理講解與現(xiàn)象分析,消息發(fā)送時拋出system busy、broker busy的原因都是PageCache繁忙,那是不是可以通過調整上述提到的某些參數(shù)來避免拋出錯誤呢?.例如如下參數(shù):

  • osPageCacheBusyTimeOutMills 設置PageCache系統(tǒng)超時的時間,默認為1000,表示1s,那是不是可以把增加這個值,例如設置為2000或3000。作者觀點:非常不可取。

  • sendThreadPoolQueueCapacity Broker服務器處理的排隊隊列,默認為10000,如果隊列中積壓了10000個請求,則會拋出RejectExecutionException。作者觀點:不可取。

  • brokerFastFailureEnable 是否啟用快速失敗,默認為true,表示當如果發(fā)現(xiàn)Broker服務器的PageCache繁忙,如果發(fā)現(xiàn)sendThreadPoolQueue隊列中不為空,表示還有排隊的發(fā)送請求在排隊等待執(zhí)行,則直接結束等待,返回broker busy。那如果不開啟快速失敗,則同樣可以避免拋出這個錯誤。作者觀點:非常不可取。

修改上述參數(shù),都不可取,原因是出現(xiàn)system busy、broker busy這個錯誤,其本質是系統(tǒng)的PageCache繁忙,通俗一點講就是向PageCache追加消息時,單個消息發(fā)送占用的時間超過1s了,如果繼續(xù)往該Broker服務器發(fā)送消息并等待,其TPS根本無法滿足,哪還是高性能的消息中間了呀。故才會采用快速失敗機制,直接給消息發(fā)送者返回錯誤,消息發(fā)送者默認情況會重試2次,將消息發(fā)往其他Broker,保證其高可用。

下面根據(jù)個人的見解,提出如下解決辦法:

4.1 開啟transientStorePoolEnable

在broker.config中將transientStorePoolEnable=true。

  • 方案依據(jù): 啟用“讀寫”分離,消息發(fā)送時消息先追加到DirectByteBuffer(堆外內存)中,然后在異步刷盤機制下,會將DirectByteBuffer中的內容提交到PageCache,然后刷寫到磁盤。消息拉取時,直接從PageCache中拉取,實現(xiàn)了讀寫分離,減輕了PageCaceh的壓力,能從根本上解決該問題。

  • 方案缺點: 會增加數(shù)據(jù)丟失的可能性,如果Broker JVM進程異常退出,提交到PageCache中的消息是不會丟失的,但存在堆外內存(DirectByteBuffer)中但還未提交到PageCache中的這部分消息,將會丟失。但通常情況下,RocketMQ進程退出的可能性不大。

4.2 擴容Broker服務器

方案依據(jù):

當Broker服務器自身比較忙的時候,快速失敗,并且在接下來的一段時間內會規(guī)避該Broker,這樣該Broker恢復提供了時間保證,Broker本身的架構是支持分布式水平擴容的,增加Topic的隊列數(shù),降低單臺Broker服務器的負載,從而避免出現(xiàn)PageCache。 > 溫馨提示:在Broker擴容時候,可以復制集群中任意一臺Broker服務下${ROCKETMQ_HOME}/store/config/topics.json到新Broker服務器指定目錄,避免在新Broker服務器上為Broker創(chuàng)建隊列,然后消息發(fā)送者、消息消費者都能動態(tài)獲取Topic的路由信息。

與之擴容對應的,也可以通過對原有Broker進行升配,例如增加內存、把機械盤換成SSD,但這種情況,通常需要重啟Broekr服務器,沒有擴容來的方便。

感謝各位的閱讀,以上就是“RocketMQ消息發(fā)送出現(xiàn)system busy、broker busy的原因與解決方案”的內容了,經過本文的學習后,相信大家對RocketMQ消息發(fā)送出現(xiàn)system busy、broker busy的原因與解決方案這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經查實,將立刻刪除涉嫌侵權內容。

AI