溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

RocketMQ消息消費與重平衡問題的示例分析

發(fā)布時間:2021-12-18 11:25:21 來源:億速云 閱讀:200 作者:小新 欄目:大數(shù)據(jù)

小編給大家分享一下RocketMQ消息消費與重平衡問題的示例分析,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

關(guān)于 push 模式下的消息循環(huán)拉取問題

之前發(fā)表說了Kafka 重平衡機制,有說到 RocketMQ 重平衡機制是每隔 20s 從任意一個 Broker 節(jié)點獲取消費組的消費 ID 以及訂閱信息,再根據(jù)這些訂閱信息進行分配,然后將分配到的信息封裝成 pullRequest 對象 pull 到 pullRequestQueue 隊列中,拉取線程喚醒后執(zhí)行拉取任務(wù),流程圖如下:

RocketMQ消息消費與重平衡問題的示例分析

但是其中有一些是沒有詳細說的,比如每次拉消息都要等 20s 嗎?真的有個網(wǎng)友問了我如下問題:

RocketMQ消息消費與重平衡問題的示例分析

很顯然他的項目是用了 push 模式進行消息拉取,要回答這個問題,就要從 RockeMQ 的消息拉取說起:

RocketMQ 的 push 模式的實現(xiàn)是基于 pull 模式,只不過在 pull 模式上套了一層,所以RocketMQ push 模式并不是真正意義上的 ”推模式“,因此,在 push 模式下,消費者拉取完消息后,立馬就有開始下一個拉取任務(wù),并不會真的等 20s 重平衡后才拉取,至于 push 模式是怎么實現(xiàn)的,那就從源碼去找答案。

之前有寫過一篇文章:「RocketMQ為什么要保證訂閱關(guān)系的一致性?」,里面有說過 消息拉取是從 PullRequestQueue 阻塞隊列中取出 PullRequest 拉取任務(wù)進行消息拉取的,但 PullRequest 是怎么放進 PullRequestQueue 阻塞隊列中的呢?

RocketMQ 一共提供了以下方法:

org.apache.rocketmq.client.impl.consumer.PullMessageService#executePullRequestImmediately:

public void executePullRequestImmediately(final PullRequest pullRequest) {
  try {
    this.pullRequestQueue.put(pullRequest);
  } catch (InterruptedException e) {
    log.error("executePullRequestImmediately pullRequestQueue.put", e);
  }
}

從調(diào)用鏈發(fā)現(xiàn),除了重平衡會調(diào)用該方法之外,在 push 模式下,PullCallback 回調(diào)對象中的 onSuccess 方法在消息消費時,也調(diào)用了該方法:

org.apache.rocketmq.client.consumer.PullCallback#onSuccess:

case FOUND:

// 如果本次拉取消息為空,則繼續(xù)將pullRequest放入阻塞隊列中
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
  DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
  // 將消息放入消費者消費線程去執(zhí)行
  DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
    pullResult.getMsgFoundList(), //
    processQueue, //
    pullRequest.getMessageQueue(), //
    dispathToConsume);
  // 將pullRequest放入阻塞隊列中
  DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);  
}

當(dāng)從 broker 拉取到消息后,如果消息被過濾掉,則繼續(xù)將pullRequest放入阻塞隊列中繼續(xù)循環(huán)執(zhí)行消息拉取任務(wù),否則將消息放入消費者消費線程去執(zhí)行,在pullRequest放入阻塞隊列中。

case NO_NEW_MESSAGE:

case NO_MATCHED_MSG:

pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);

如果從 broker 端沒有可拉取的新消息或者沒有匹配到消息,則將pullRequest放入阻塞隊列中繼續(xù)循環(huán)執(zhí)行消息拉取任務(wù)。

從以上消息消費邏輯可以看出,當(dāng)消息處理完后,立即將 pullRequest 重新放入阻塞隊列中,因此這就很好解釋為什么 push 模式可以持續(xù)拉取消息了:

在 push 模式下消息消費完后,還會調(diào)用該方法重新將 PullRequest 對象放進 PullRequestQueue 阻塞隊列中,不斷地從 broker 中拉取消息,實現(xiàn) push 效果。

重平衡后隊列被其它消費者分配后如何處理?

繼續(xù)再想一個問題,如果重平衡后,發(fā)現(xiàn)某個隊列被新的消費者分配了,怎么辦,總不能繼續(xù)從該隊列中拉取消息吧?

RocketMQ 重平衡后會檢查 pullRequest 是否還在新分配的列表中,如果不在,則丟棄,調(diào)用 isDrop() 可查出該pullRequest是否已丟棄:

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage:

final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
  log.info("the pull request[{}] is dropped.", pullRequest.toString());
  return;
}

在消息拉取之前,首先判斷該隊列是否被丟棄,如果已丟棄,則直接放棄本次拉取任務(wù)。

那什么時候隊列被丟棄呢?

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance:

Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
  Entry<MessageQueue, ProcessQueue> next = it.next();
  MessageQueue mq = next.getKey();
  ProcessQueue pq = next.getValue();

  if (mq.getTopic().equals(topic)) {
    // 判斷當(dāng)前緩存 MessageQueue 是否包含在最新的 mqSet 中,如果不存在則將隊列丟棄
    if (!mqSet.contains(mq)) {
      pq.setDropped(true);
      if (this.removeUnnecessaryMessageQueue(mq, pq)) {
        it.remove();
        changed = true;
        log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
      }
    } else if (pq.isPullExpired()) {
      // 如果隊列拉取過期則丟棄
      switch (this.consumeType()) {
        case CONSUME_ACTIVELY:
          break;
        case CONSUME_PASSIVELY:
          pq.setDropped(true);
          if (this.removeUnnecessaryMessageQueue(mq, pq)) {
            it.remove();
            changed = true;
            log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
                      consumerGroup, mq);
          }
          break;
        default:
          break;
      }
    }
  }
}

updateProcessQueueTableInRebalance 方法在重平衡時執(zhí)行,用于更新 processQueueTable,它是當(dāng)前消費者的隊列緩存列表,以上方法邏輯判斷當(dāng)前緩存 MessageQueue 是否包含在最新的 mqSet 中,如果不包含其中,則說明經(jīng)過這次重平衡后,該隊列被分配給其它消費者了,或者拉取時間間隔太大過期了,則調(diào)用 setDropped(true) 方法將隊列置為丟棄狀態(tài)。

可能你會問,processQueueTable 跟 pullRequest 里面 processQueue 有什么關(guān)聯(lián),往下看:

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance:

// 新建 ProcessQueue 
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
  // 將ProcessQueue放入processQueueTable中
  ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
  if (pre != null) {
    log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
  } else {
    log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
    PullRequest pullRequest = new PullRequest();
    pullRequest.setConsumerGroup(consumerGroup);
    pullRequest.setNextOffset(nextOffset);
    pullRequest.setMessageQueue(mq);
    // 將ProcessQueue放入pullRequest拉取任務(wù)對象中
    pullRequest.setProcessQueue(pq);
    pullRequestList.add(pullRequest);
    changed = true;
  }
}

可以看出,重平衡時會創(chuàng)建 ProcessQueue 對象,將其放入 processQueueTable 緩存隊列表中,再將其放入 pullRequest 拉取任務(wù)對象中,也就是 processQueueTable 中的 ProcessQueue 與 pullRequest 的中 ProcessQueue 是同一個對象。

重平衡后會導(dǎo)致消息重復(fù)消費嗎?

之前在群里有個網(wǎng)友提了這個問題:

RocketMQ消息消費與重平衡問題的示例分析

我當(dāng)時回答他 RocketMQ 正常也是沒有重復(fù)消費,但后來發(fā)現(xiàn)其實 RocketMQ 在某些情況下,也是會出現(xiàn)消息重復(fù)消費的現(xiàn)象。

前面講到,RocketMQ 消息消費時,會將消息放進消費線程中去執(zhí)行,代碼如下:

org.apache.rocketmq.client.consumer.PullCallback#onSuccess:

DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
  pullResult.getMsgFoundList(), //
  processQueue, //
  pullRequest.getMessageQueue(), //
  dispathToConsume);

ConsumeMessageService 類實現(xiàn)消息消費的邏輯,它有兩個實現(xiàn)類:

// 并發(fā)消息消費邏輯實現(xiàn)類
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
// 順序消息消費邏輯實現(xiàn)類
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;

先看并發(fā)消息消費相關(guān)處理邏輯:

ConsumeMessageConcurrentlyService:

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run:

if (this.processQueue.isDropped()) {
  log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
  return;
}

// 消息消費邏輯
// ...

// 如果隊列被設(shè)置為丟棄狀態(tài),則不提交消息消費進度
if (!processQueue.isDropped()) {
    ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
    log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}

ConsumeRequest 是一個繼承了 Runnable 的類,它是消息消費核心邏輯的實現(xiàn)類,submitConsumeRequest 方法將 ConsumeRequest 放入 消費線程池中執(zhí)行消息消費,從它的 run 方法中可看出,如果在執(zhí)行消息消費邏輯中有節(jié)點加入,重平衡后該隊列被分配給其它節(jié)點進行消費了,此時的隊列被丟棄,則不提交消息消費進度,因為之前已經(jīng)消費了,此時就會造成消息重復(fù)消費的情況。

再來看看順序消費相關(guān)處理邏輯:

ConsumeMessageOrderlyService:

org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService.ConsumeRequest#run:

public void run() {
  // 判斷隊列是否被丟棄
  if (this.processQueue.isDropped()) {
    log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
    return;
  }

  final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
  synchronized (objLock) {
    // 如果不是廣播模式,且隊列已加鎖且鎖沒有過期
    if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
        || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
      final long beginTime = System.currentTimeMillis();
      for (boolean continueConsume = true; continueConsume; ) {
        // 再次判斷隊列是否被丟棄
        if (this.processQueue.isDropped()) {
          log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
          break;
        }
        
        // 消息消費處理邏輯
        // ...
        
          continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
        } else {
          continueConsume = false;
        }
      }
    } else {
      if (this.processQueue.isDropped()) {
        log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
        return;
      }
      ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
    }
  }
}

RocketMQ 順序消息消費會將隊列鎖定,當(dāng)隊列獲取鎖之后才能進行消費,所以,即使消息在消費過程中有節(jié)點加入,重平衡后該隊列被分配給其它節(jié)點進行消費了,此時的隊列被丟棄,依然不會造成重復(fù)消費。

以上是“RocketMQ消息消費與重平衡問題的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學(xué)習(xí)更多知識,歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI