溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

RocketMQ中push consumer啟動之觸發(fā)消息拉取的示例代碼

發(fā)布時間:2021-12-17 14:20:13 來源:億速云 閱讀:304 作者:小新 欄目:大數(shù)據(jù)

這篇文章主要介紹RocketMQ中push consumer啟動之觸發(fā)消息拉取的示例代碼,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!

一、consumer端數(shù)據(jù)存儲結(jié)構(gòu)

push consumer啟動方法DefaultMQPushConsumerImpl.start最后一步會觸發(fā)MQClientInstance.rebalanceImmediately,該調(diào)用最終會進入到RebalanceImpl.doRebalance中,它會根據(jù)topic當(dāng)前的實際consumer數(shù)量(從nameserver獲?。┩ㄟ^負載均衡原則來決定自己所要訂閱的message queue。然后在本地創(chuàng)建對應(yīng)的消息緩存隊列(ProcessQueue),并觸發(fā)消息拉取操作。

RebalanceImpl是整個consumer的核心,它即保存本消費者訂閱的topic信息,又緩存了topic中的message數(shù)據(jù)。RebalanceImpl相關(guān)的幾個核心類如下:

RocketMQ中push consumer啟動之觸發(fā)消息拉取的示例代碼

  • MessageQueue代表的是遠端broker上一個topic下的某個message queue

  • ProcessQueue是對遠端message queue的一個本地緩存,拉取下來的消息都存在一個TreeMap中,其中key是commitlog中的offset

  • RebalanceImpl中保存了三種關(guān)系:message queue和process queue的映射關(guān)系;topic和message queue的映射關(guān)系;topic的訂閱關(guān)系

二、訂閱端的負載均衡策略

doRebalance方法會調(diào)用rebalanceByTopic來決定本消費者具體要訂閱一個topic下的哪些message queue,以達到負載均衡的效果。

    private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
                // 廣播模式,訂閱所有message queue
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                if (mqSet != null) {
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                    if (changed) {
                        this.messageQueueChanged(topic, mqSet, mqSet);
                        log.info("messageQueueChanged {} {} {} {}",
                            consumerGroup,
                            topic,
                            mqSet,
                            mqSet);
                    }
                } else {
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                }
                break;
            }
            case CLUSTERING: {
                // 集群模式,獲取該topic下所有的message queue + 該topic所有的consumer
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                if (null == mqSet) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                    }
                }

                if (null == cidAll) {
                    log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
                }

                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);

                    Collections.sort(mqAll);
                    Collections.sort(cidAll);

                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                    List<MessageQueue> allocateResult = null;
                    try {
                        // 通過負載均衡策略計算出當(dāng)前消費者所需訂閱的message queue子集
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) {
                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                            e);
                        return;
                    }

                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }

                    // 更新Process Queue緩存列表
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {
                        log.info(
                            "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                            strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                            allocateResultSet.size(), allocateResultSet);
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
                }
                break;
            }
            default:
                break;
        }
    }

三、創(chuàng)建本地緩存并啟動消息拉取

rebalanceByTopic中通過負載均衡策略計算出當(dāng)前消費者對于一個topic實際訂閱的message queue子集之后,就會在updateProcessQueueTableInBalance方法中創(chuàng)建ProcessQueue,并啟動消息拉取。

    private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
        final boolean isOrder) {
        boolean changed = false;

        Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<MessageQueue, ProcessQueue> next = it.next();
            MessageQueue mq = next.getKey();
            ProcessQueue pq = next.getValue();

            if (mq.getTopic().equals(topic)) {
                if (!mqSet.contains(mq)) {
                    // 本地Process Queue存在,但不再訂閱,則廢棄改process queue
                    pq.setDropped(true);
                    if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                        it.remove();
                        changed = true;
                        log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
                    }
                } else if (pq.isPullExpired()) { // process queue過期,也廢棄,等待新建
                    switch (this.consumeType()) {
                        case CONSUME_ACTIVELY:
                            break;
                        case CONSUME_PASSIVELY:
                            pq.setDropped(true);
                            if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                                it.remove();
                                changed = true;
                                log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
                                    consumerGroup, mq);
                            }
                            break;
                        default:
                            break;
                    }
                }
            }
        }

        List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
        for (MessageQueue mq : mqSet) {
            if (!this.processQueueTable.containsKey(mq)) {
                // 新訂閱,本地不存在對應(yīng)的process queue,則新建
                if (isOrder && !this.lock(mq)) {
                    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                    continue;
                }

                this.removeDirtyOffset(mq);
                ProcessQueue pq = new ProcessQueue();
                long nextOffset = this.computePullFromWhere(mq);
                if (nextOffset >= 0) {
                    ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                    if (pre != null) {
                        log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                    } else {
                        log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                        // 初始化首次拉取請求
                        PullRequest pullRequest = new PullRequest();
                        pullRequest.setConsumerGroup(consumerGroup);
                        pullRequest.setNextOffset(nextOffset);
                        pullRequest.setMessageQueue(mq);
                        pullRequest.setProcessQueue(pq);
                        pullRequestList.add(pullRequest);
                        changed = true;
                    }
                } else {
                    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                }
            }
        }

        // 批量觸發(fā)首次拉取請求
        this.dispatchPullRequest(pullRequestList);

        return changed;
    }

四、整體流程

消息拉取的初始化過程如下圖:

RocketMQ中push consumer啟動之觸發(fā)消息拉取的示例代碼

以上是“RocketMQ中push consumer啟動之觸發(fā)消息拉取的示例代碼”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對大家有幫助,更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI