溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ和Kafka是如何實現(xiàn)消息隊列的推拉模式

發(fā)布時間:2021-12-15 11:25:11 來源:億速云 閱讀:195 作者:柒染 欄目:大數(shù)據(jù)

RocketMQ和Kafka是如何實現(xiàn)消息隊列的推拉模式,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。

今天我們就來談一談消息隊列的推拉模式,這也是一個面試熱點,例如你在簡歷里面寫了 RocketMQ ,基本上會問你 RocketMQ 采用的是推模式還是拉模式???是拉模式?不是有 PushConsumer 嗎?

今天我們就來談談推拉模式,并且再來看看 RocketMQ 和 Kafka 是如何做的。

推拉模式

首先明確一下推拉模式到底是在討論消息隊列的哪一個步驟,一般而言我們在談論推拉模式的時候指的是 Comsumer 和 Broker 之間的交互。

默認的認為 Producer 與 Broker 之間就是推的方式,即 Producer 將消息推送給 Broker,而不是 Broker 主動去拉取消息。

想象一下,如果需要 Broker 去拉取消息,那么 Producer 就必須在本地通過日志的形式保存消息來等待 Broker 的拉取,如果有很多生產(chǎn)者的話,那么消息的可靠性不僅僅靠 Broker 自身,還需要靠成百上千的 Producer。

Broker 還能靠多副本等機制來保證消息的存儲可靠,而成百上千的 Producer 可靠性就有點難辦了,所以默認的 Producer 都是推消息給 Broker。

所以說有些情況分布式好,而有些時候還是集中管理好。

推模式

推模式指的是消息從 Broker 推向 Consumer,即 Consumer 被動的接收消息,由 Broker 來主導消息的發(fā)送。

我們來想一下推模式有什么好處?

消息實時性高, Broker 接受完消息之后可以立馬推送給 Consumer。

對于消費者使用來說更簡單,簡單啊就等著,反正有消息來了就會推過來。

推模式有什么缺點?

推送速率難以適應消費速率,推模式的目標就是以最快的速度推送消息,當生產(chǎn)者往 Broker 發(fā)送消息的速率大于消費者消費消息的速率時,隨著時間的增長消費者那邊可能就“爆倉”了,因為根本消費不過來啊。當推送速率過快就像 DDos 攻擊一樣消費者就傻了。

并且不同的消費者的消費速率還不一樣,身為 Broker 很難平衡每個消費者的推送速率,如果要實現(xiàn)自適應的推送速率那就需要在推送的時候消費者告訴 Broker ,我不行了你推慢點吧,然后 Broker 需要維護每個消費者的狀態(tài)進行推送速率的變更。

這其實就增加了 Broker 自身的復雜度。

所以說推模式難以根據(jù)消費者的狀態(tài)控制推送速率,適用于消息量不大、消費能力強要求實時性高的情況下。

拉模式

拉模式指的是 Consumer 主動向 Broker 請求拉取消息,即 Broker 被動的發(fā)送消息給 Consumer。

我們來想一下拉模式有什么好處?

拉模式主動權就在消費者身上了,消費者可以根據(jù)自身的情況來發(fā)起拉取消息的請求。假設當前消費者覺得自己消費不過來了,它可以根據(jù)一定的策略停止拉取,或者間隔拉取都行。

拉模式下 Broker 就相對輕松了,它只管存生產(chǎn)者發(fā)來的消息,至于消費的時候自然由消費者主動發(fā)起,來一個請求就給它消息唄,從哪開始拿消息,拿多少消費者都告訴它,它就是一個沒有感情的工具人,消費者要是沒來取也不關它的事。

拉模式可以更合適的進行消息的批量發(fā)送,基于推模式可以來一個消息就推送,也可以緩存一些消息之后再推送,但是推送的時候其實不知道消費者到底能不能一次性處理這么多消息。而拉模式就更加合理,它可以參考消費者請求的信息來決定緩存多少消息之后批量發(fā)送。

拉模式有什么缺點?

消息延遲,畢竟是消費者去拉取消息,但是消費者怎么知道消息到了呢?所以它只能不斷地拉取,但是又不能很頻繁地請求,太頻繁了就變成消費者在攻擊 Broker 了。因此需要降低請求的頻率,比如隔個 2 秒請求一次,你看著消息就很有可能延遲 2 秒了。

消息忙請求,忙請求就是比如消息隔了幾個小時才有,那么在幾個小時之內(nèi)消費者的請求都是無效的,在做無用功。

RocketMQ和Kafka是如何實現(xiàn)消息隊列的推拉模式

那到底是推還是拉

可以看到推模式和拉模式各有優(yōu)缺點,到底該如何選擇呢?

RocketMQ 和 Kafka 都選擇了拉模式,當然業(yè)界也有基于推模式的消息隊列如 ActiveMQ。

我個人覺得拉模式更加的合適,因為現(xiàn)在的消息隊列都有持久化消息的需求,也就是說本身它就有個存儲功能,它的使命就是接受消息,保存好消息使得消費者可以消費消息即可。

而消費者各種各樣,身為 Broker 不應該有依賴于消費者的傾向,我已經(jīng)為你保存好消息了,你要就來拿好了。

雖說一般而言 Broker 不會成為瓶頸,因為消費端有業(yè)務消耗比較慢,但是 Broker 畢竟是一個中心點,能輕量就盡量輕量。

那么竟然 RocketMQ 和 Kafka 都選擇了拉模式,它們就不怕拉模式的缺點么?怕,所以它們操作了一波,減輕了拉模式的缺點。

長輪詢

RocketMQ 和 Kafka 都是利用“長輪詢”來實現(xiàn)拉模式,我們就來看看它們是如何操作的。

為了簡單化,下面我把消息不滿足本次拉取的條數(shù)啊、總大小啊等等都統(tǒng)一描述成還沒有消息,反正都是不滿足條件。

RocketMQ 中的長輪詢

RocketMQ 中的 PushConsumer 其實是披著推模式實際上是拉模式的方法,只是看起來像推模式而已

因為 RocketMQ 在被背后偷偷的幫我們?nèi)?Broker 請求數(shù)據(jù)了。

后臺會有個 RebalanceService 線程,這個線程會根據(jù) topic 的隊列數(shù)量和當前消費組的消費者個數(shù)做負載均衡,每個隊列產(chǎn)生的 pullRequest 放入阻塞隊列 pullRequestQueue 中。然后又有個 PullMessageService 線程不斷的從阻塞隊列 pullRequestQueue 中獲取 pullRequest,然后通過網(wǎng)絡請求 broker,這樣實現(xiàn)的準實時拉取消息。

這一部分代碼我不截了,就是這么個事兒,稍后會用圖來展示。

然后 Broker 的 PullMessageProcessor 里面的 processRequest 方法是用來處理拉消息請求的,有消息就直接返回,如果沒有消息怎么辦呢?我們來看一下代碼。

RocketMQ和Kafka是如何實現(xiàn)消息隊列的推拉模式

我們再來看下 suspendPullRequest 方法做了什么。

RocketMQ和Kafka是如何實現(xiàn)消息隊列的推拉模式

而 PullRequestHoldService 這個線程會每 5 秒從 pullRequestTable 取PullRequest請求,然后看看待拉取消息請求的偏移量是否小于當前消費隊列最大偏移量,如果條件成立則說明有新消息了,則會調(diào)用 notifyMessageArriving ,最終調(diào)用 PullMessageProcessor 的 executeRequestWhenWakeup() 方法重新嘗試處理這個消息的請求,也就是再來一次,整個長輪詢的時間默認 30 秒。

RocketMQ和Kafka是如何實現(xiàn)消息隊列的推拉模式

簡單的說就是 5 秒會檢查一次消息時候到了,如果到了則調(diào)用 processRequest 再處理一次。這好像不太實時?。?秒?

別急,還有個 ReputMessageService 線程,這個線程用來不斷地從 commitLog 中解析數(shù)據(jù)并分發(fā)請求,構建出 ConsumeQueue 和 IndexFile 兩種類型的數(shù)據(jù),并且也會有喚醒請求的操作,來彌補每 5s 一次這么慢的延遲

代碼我就不截了,就是消息寫入并且會調(diào)用 pullRequestHoldService#notifyMessageArriving。

最后我再來畫個圖,描述一下整個流程。

RocketMQ和Kafka是如何實現(xiàn)消息隊列的推拉模式

Kafka 中的長輪詢

像 Kafka 在拉請求中有參數(shù),可以使得消費者請求在 “長輪詢” 中阻塞等待。

簡單的說就是消費者去 Broker 拉消息,定義了一個超時時間,也就是說消費者去請求消息,如果有的話馬上返回消息,如果沒有的話消費者等著直到超時,然后再次發(fā)起拉消息請求。

并且 Broker 也得配合,如果消費者請求過來,有消息肯定馬上返回,沒有消息那就建立一個延遲操作,等條件滿足了再返回。

我們來簡單的看一下源碼,為了突出重點,我會刪減一些代碼。

先來看消費者端的代碼。

RocketMQ和Kafka是如何實現(xiàn)消息隊列的推拉模式

上面那個 poll 接口想必大家都很熟悉,其實從注解直接就知道了確實是等待數(shù)據(jù)的到來或者超時,我們再簡單的往下看。

RocketMQ和Kafka是如何實現(xiàn)消息隊列的推拉模式

我們再來看下最終 client.poll 調(diào)用的是什么。

RocketMQ和Kafka是如何實現(xiàn)消息隊列的推拉模式

最后調(diào)用的就是 Kafka 包裝過的 selector,而最終會調(diào)用 Java nio 的 select(timeout)。

現(xiàn)在消費者端的代碼已經(jīng)清晰了,我們再來看看 Broker 如何做的。

Broker 處理所有請求的入口其實我在之前的文章介紹過,就在 KafkaApis.scala 文件的 handle 方法下,這次的主角就是 handleFetchRequest 。

RocketMQ和Kafka是如何實現(xiàn)消息隊列的推拉模式

這個方法進來,我截取最重要的部分。

RocketMQ和Kafka是如何實現(xiàn)消息隊列的推拉模式

下面的圖片就是 fetchMessages 方法內(nèi)部實現(xiàn),源碼給的注釋已經(jīng)很清晰了,大家放大圖片看下即可。

RocketMQ和Kafka是如何實現(xiàn)消息隊列的推拉模式

這個煉獄名字取得很有趣,簡單的說就是利用我之前文章提到的時間輪,來執(zhí)行定時任務,例如這里是delayedFetchPurgatory,專門用來處理延遲拉取操作。

我們先簡單想一下,這個延遲操作都需要實現(xiàn)哪些方法,首先構建的延遲操作需要有檢查機制,來查看消息是否已經(jīng)到了,然后呢還得有個消息到了之后該執(zhí)行的方法,還需要有執(zhí)行完畢之后該干啥的方法,當然還得有個超時之后得干啥的方法。

這幾個方法其實對應的就是代碼里的 DelayedFetch ,這個類繼承了 DelayedOperation 內(nèi)部有:

  • isCompleted 檢查條件是否滿足的方法

  • tryComplete 條件滿足之后執(zhí)行的方法

  • onComplete 執(zhí)行完畢之后調(diào)用的方法

  • onExpiration 過期之后需要執(zhí)行的方法

判斷是否過期就是由時間輪來推動判斷的,但是總不能等過期的時候再去看消息到了沒吧?

這里 Kafka 和 RocketMQ 的機制一樣,也會在消息寫入的時候提醒這些延遲請求消息來了,具體代碼我不貼了, 在 ReplicaManager#appendRecords 方法內(nèi)部再深入個兩方法可以看到。

不過雖說代碼不貼,圖還是要畫一下的。

RocketMQ和Kafka是如何實現(xiàn)消息隊列的推拉模式

RocketMQ  和 Kafka 都是采用“長輪詢”的機制,具體的做法都是通過消費者等待消息,當有消息的時候 Broker 會直接返回消息,如果沒有消息都會采取延遲處理的策略,并且為了保證消息的及時性,在對應隊列或者分區(qū)有新消息到來的時候都會提醒消息來了,及時返回消息。

一句話說就是消費者和 Broker 相互配合,拉取消息請求不滿足條件的時候 hold 住,避免了多次頻繁的拉取動作,當消息一到就提醒返回。

看完上述內(nèi)容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業(yè)資訊頻道,感謝您對億速云的支持。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。

AI