您好,登錄后才能下訂單哦!
這篇文章主要介紹RocketMQ中如何實現(xiàn)push consumer消息拉取,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!
RebalanceImpl.updateProcessQueueTableInRebalance方法的末尾,對于每一個新生成的ProcessQueue都會創(chuàng)建一個PullRequest執(zhí)行首次消息拉取操作。PullRequest會通過RebalanceImpl.dispatchPullRequest方法達到DefaultMQPushConsumerImpl.executePullRequestImmediately,然后被投遞到PullMessageService的本地隊列中。
PulMessageService會啟動一個服務(wù)線程,不斷消費投遞到本地隊列中的PullRequest,最終調(diào)用到DefaultMQPushConsumerImpl.pullMessage方法。PullMessageService被MQClientInstance持有,同一個客戶端實例中所有的push consumer產(chǎn)生的PullRequest都會被投遞到同一個PullMessageService本地隊列中排隊等待執(zhí)行。
DefaultMQPushConsumerImpl.pullMessage是消息拉取的核心方法。該方法首先會執(zhí)行一系列的限流判斷,若命中限流條件則本次執(zhí)行結(jié)束,等待一個固定時間之后會再次將同一個PullRequest投遞到PullMessageService中重新觸發(fā)消息拉取。
DefaultMQPushConsumerImpl.pullMessage核心邏輯:
public void pullMessage(final PullRequest pullRequest) { // 限流判斷 .... final long beginTimestamp = System.currentTimeMillis(); // 消息拉取callback PullCallback pullCallback = new PullCallback() {...}; boolean commitOffsetEnable = false; long commitOffsetValue = 0L; if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) { // 獲取當(dāng)前隊列已經(jīng)被消費到最新的offset,通過本次pull請求附帶在broker上commit該offset commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY); if (commitOffsetValue > 0) { commitOffsetEnable = true; } } String subExpression = null; boolean classFilter = false; SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (sd != null) { if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) { subExpression = sd.getSubString(); // 消息過濾表達式 } classFilter = sd.isClassFilterMode(); } int sysFlag = PullSysFlag.buildSysFlag( commitOffsetEnable, // commitOffset true, // suspend subExpression != null, // subscription classFilter // class filter ); try { // 發(fā)起pull請求,成功后異步回調(diào)pullCallback this.pullAPIWrapper.pullKernelImpl( pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this.defaultMQPushConsumer.getPullBatchSize(), sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, CommunicationMode.ASYNC, pullCallback ); } catch (Exception e) { log.error("pullKernelImpl exception", e); this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } }
pullMessage方法中創(chuàng)建的匿名PullCallback用來處理拉取到的消息列表:
PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { // 1. 反序列化,并執(zhí)行過濾 pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT); long firstMsgOffset = Long.MAX_VALUE; if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } else { firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); // 2. 保存到本地ProcessQueue中緩存 boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); // 3. 提交到ConsumeMessageService中,被push到message listener執(zhí)行業(yè)務(wù)處理 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); // 4. 提交下一次PullRequest if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } } if (pullResult.getNextBeginOffset() < prevRequestOffset || firstMsgOffset < prevRequestOffset) { log.warn( "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", pullResult.getNextBeginOffset(), firstMsgOffset, prevRequestOffset); } break; // .... } } } // .... };
上面的第3步consumeMessageService.submitConsumeRequest中將根據(jù)并行或串行不同的方式將message提交給listener執(zhí)行業(yè)務(wù)處理動作。
消息拉取的整體流程如下:
以上是“RocketMQ中如何實現(xiàn)push consumer消息拉取”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對大家有幫助,更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。