溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

RocketMQ客戶端PUSH消費如何實現(xiàn)負載均衡

發(fā)布時間:2021-11-18 09:35:47 來源:億速云 閱讀:174 作者:小新 欄目:大數(shù)據(jù)

這篇文章將為大家詳細講解有關(guān)RocketMQ客戶端PUSH消費如何實現(xiàn)負載均衡,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。

一、問題思考

1.主題隊列是如何分配的?
2.什么時候會進行負載均衡?
3.負載均衡后是否會導(dǎo)致消息重復(fù)消費?

二、調(diào)用鏈條

1.初始化鏈條

@1 DefaultMQPushConsumerImpl#start
this.mQClientFactory
= MQClientManager.getInstance().getAndCreateMQClientInstance
@2 MQClientManager#getAndCreateMQClientInstance
instance = new MQClientInstance
@3 MQClientInstance#MQClientInstance
this.rebalanceService = new RebalanceService

2.啟動鏈條

@1 DefaultMQPushConsumerImpl#start
mQClientFactory.start()
@2 MQClientInstance#start
this.rebalanceService.start

小結(jié):從初始化鏈和調(diào)用鏈可以看出RebalanceService為線程類,隨著消費啟動時而啟動,消費不退出則一直運行著。

三、負載均衡流程


1.負載均衡鏈條

@1 RebalanceService#run
mqClientFactory.doRebalance()
@2 MQClientInstance#doRebalance
impl.doRebalance()
@3 DefaultMQPushConsumerImpl#doRebalance
this.rebalanceImpl.doRebalance
@4 RebalanceImpl#doRebalance
rebalanceByTopic

2.負載均衡流程

RocketMQ客戶端PUSH消費如何實現(xiàn)負載均衡

小結(jié):在負載均衡時,會循環(huán)該消費組訂閱的所有Topic都會執(zhí)行負載均衡。


3.更新緩存processQueue流程

RocketMQ客戶端PUSH消費如何實現(xiàn)負載均衡

小結(jié):
1. 更新緩存時如果消費組訂閱的隊列不在新分配的隊列集合中或者隊列拉取時間超時失效,則將快照ProcessQueue設(shè)置為丟棄。
2. 消費拉取時判斷ProcessQueue為丟棄,則不再對該隊列拉取。
3. 順序消費時如果獲取消費鎖成功,表明此隊列空閑沒有被消費,此時向Broker發(fā)起解鎖請求,解鎖成功后將該隊列從緩存(processQueueTable)移除。
4. 順序消費時獲取鎖失敗,表明正在消費則不從processQueueTable移除,由于ProcessQueue設(shè)置為丟棄,在順序消費下次拉取時會退出該隊列的拉取請求。

4.向Broker發(fā)送心跳流程

RocketMQ客戶端PUSH消費如何實現(xiàn)負載均衡

5.隊列分配算法

負載均衡流程圖中對clientId和分區(qū)隊列的分配提交給分區(qū)算法執(zhí)行,那該算法是如何運作的呢?接口AllocateMessageQueueStrategy隊列分配策略提供五種分配算法實現(xiàn):

1.平均分配策略
   AllocateMessageQueueAveragely

2.環(huán)形分配策略
  AllocateMessageQueueAveragelyByCircle

3.機房分配策略
   AllocateMessageQueueByMachineRoom

4.一致性Hash分配策略

   AllocateMessageQueueConsistentHash

5.配置文件分配策略
  AllocateMessageQueueByConfig
除此之外可以自定義分配算法,實現(xiàn)接口接口即可,默認使用平均分配算法,也是最常用的,下面以該算法看看如何工作的。

public List<MessageQueue> allocate
(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
List<MessageQueue> result = new ArrayList<MessageQueue>();
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize = mqAll.size() <= cidAll.size() ?
1 : (mod > 0 && index < mod ?
mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}

代碼不是很好閱讀,看下面驗證結(jié)果即可。


6.平均分配算法驗證

  • 只有一個clientId時分配情況
    會把1個Broker的16個分區(qū)全部分配給該客戶端,每隔20秒觸發(fā)一次負載均衡。
    currentCID=2.0.1.138@consumer01分到的隊列為0~15

----------2019-08-04 22:10:15-----------
currentCID=2.0.1.138@consumer01
index=0
mod=0
averageSize=16
startIndex=0
range=16
result=[MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=0], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=1], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=2], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=3], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=4], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=5], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=6], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=7], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=8], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=9], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=10], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=11], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=12], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=13], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=14], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=15]]
----------2019-08-04 22:10:35-----------
currentCID=2.0.1.138@consumer01
index=0
mod=0
averageSize=16
startIndex=0
range=16
result=[MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=0], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=1], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=2], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=3], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=4], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=5], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=6], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=7], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=8], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=9], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=10], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=11], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=12], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=13], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=14], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=15]]
  • 新加入第二個client時
    此時有兩個clinetId分別為2.0.1.138@consumer01和2.0.1.138@consumer02,1個 Broker16個分區(qū)的分配情況。
    currentCID=2.0.1.138@consumer01分到的分區(qū)為0~7
    currentCID=2.0.1.138@consumer02分到的分區(qū)為8~16

----------2019-08-04 22:12:25-----------
currentCID=2.0.1.138@consumer01
index=0
mod=0
averageSize=8
startIndex=0
range=8
result=[MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=0], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=1], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=2], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=3], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=4], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=5], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=6], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=7]]
----------2019-08-04 22:12:45-----------
currentCID=2.0.1.138@consumer02
index=1
mod=0
averageSize=8
startIndex=8
range=8
result=[MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=8], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=9], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=10], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=11], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=12], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=13], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=14], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=15]]
  • 新加入第三個client時
    此時有三個客戶端2.0.1.138@consumer01、2.0.1.138@consumer02、2.0.1.138@consumer03,1個Broker的16個隊列的分配情況。
    currentCID=2.0.1.138@consumer01分到的隊列0~5
    currentCID=2.0.1.138@consumer02分到的隊列6~10
    currentCID=2.0.1.138@consumer03分到的隊列11~15

----------2019-08-04 22:13:58-----------
currentCID=2.0.1.138@consumer01
index=0
mod=1
averageSize=6
startIndex=0
range=6
result=[MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=0], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=1], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=2], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=3], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=4], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=5]]
----------2019-08-04 22:14:18-----------
currentCID=2.0.1.138@consumer02
index=1
mod=1
averageSize=5
startIndex=6
range=5
result=[MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=6], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=7], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=8], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=9], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=10]]
----------2019-08-04 22:14:39-----------
currentCID=2.0.1.138@consumer03
index=2
mod=1
averageSize=5
startIndex=11
range=5
result=[MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=11], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=12], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=13], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=14], MessageQueue [topic=topic_test, brokerName=liangyongdeMacBook-Pro.local, queueId=15]]
四、總結(jié)

1.主題隊列是如何分配的?

備注:見隊列分配算法,通常使用平均分配算法。

2.什么時候會進行負載均衡?

備注:負載均衡線程每隔20秒執(zhí)行一次,當(dāng)有新客戶端退出或者加入或者新的Broker加入或掉線都會觸發(fā)重新負載均衡。


3.負載均衡后是否會導(dǎo)致消息重復(fù)消費?

備注:

情況1: 并發(fā)消費可能導(dǎo)致消息被重復(fù)消費,看以下代碼。

//并發(fā)消費對結(jié)果的處理

//ConsumeMessageConcurrentlyService#ConsumeRequest

if (!processQueue.isDropped()) {
 ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);

} else { //被丟棄,消費進度不會更新

   log.warn("processQueue is dropped without process consume result.
   messageQueue=  {}, msgs={}", messageQueue, msgs);

如果負載均衡前已分配的隊列不在負載均衡后的新隊列集合中,會丟棄該隊列即:processQueue.isDropped()。而該隊列可能已經(jīng)被消費完了,在處理結(jié)果時被丟棄了,消費進度沒有更新。別的消費客戶端重新拉取該隊列時造成重復(fù)消費。

情況2: 順序消費不會導(dǎo)致消息被重復(fù)消費

關(guān)于“RocketMQ客戶端PUSH消費如何實現(xiàn)負載均衡”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,使各位可以學(xué)到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI