溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

如何理解Github上14.1k Star的RocketMQ

發(fā)布時(shí)間:2021-10-12 14:59:28 來(lái)源:億速云 閱讀:142 作者:iii 欄目:編程語(yǔ)言

這篇文章主要講解了“如何理解Github上14.1k Star的RocketMQ”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“如何理解Github上14.1k Star的RocketMQ”吧!

宏觀概覽

RocketMQ 事務(wù)消息發(fā)送流程:

如何理解Github上14.1k Star的RocketMQ

結(jié)合源碼來(lái)看,RocketMQ 的事務(wù)消息 TransactionMQProducer 的 sendMessageInTransaction 方法,實(shí)際調(diào)用了 DefaultMQProducerImpl 的 sendMessageInTransaction 方法。我們進(jìn)入 sendMessageInTransaction 方法,整個(gè)事務(wù)消息的發(fā)送流程清晰可見(jiàn)。

首先,做發(fā)送前檢查,并填入必要參數(shù),包括設(shè) prepare 事務(wù)消息。

源碼清單-1

public TransactionSendResult sendMessageInTransaction(final Message msg,
    final LocalTransactionExecuter localTransactionExecuter, final Object arg)
    throws MQClientException {
    TransactionListener transactionListener = getCheckListener(); 
        if (null == localTransactionExecuter && null == transactionListener) {
        throw new MQClientException("tranExecutor is null", null);
    }

    // ignore DelayTimeLevel parameter
    if (msg.getDelayTimeLevel() != 0) {
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
    }

    Validators.checkMessage(msg, this.defaultMQProducer);

    SendResult sendResult = null;
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());

進(jìn)入發(fā)送處理流程:

源碼清單-2

    try {
        sendResult = this.send(msg);
    } catch (Exception e) {
        throw new MQClientException("send message Exception", e);
    }

根據(jù) broker 返回的處理結(jié)果決策本地事務(wù)是否執(zhí)行,半消息發(fā)送成功則開(kāi)始本地事務(wù)執(zhí)行:

源碼清單-3

    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
    Throwable localException = null;
    switch (sendResult.getSendStatus()) {
        case SEND_OK: {
            try {
                if (sendResult.getTransactionId() != null) {
                    msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                }
                String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                if (null != transactionId && !"".equals(transactionId)) {
                    msg.setTransactionId(transactionId);
                }
                if (null != localTransactionExecuter) { 
                    localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                } else if (transactionListener != null) { 
                    log.debug("Used new transaction API");
                    localTransactionState = transactionListener.executeLocalTransaction(msg, arg); 
                }
                if (null == localTransactionState) {
                    localTransactionState = LocalTransactionState.UNKNOW;
                }

                if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                    log.info("executeLocalTransactionBranch return {}", localTransactionState);
                    log.info(msg.toString());
                }
            } catch (Throwable e) {
                log.info("executeLocalTransactionBranch exception", e);
                log.info(msg.toString());
                localException = e;
            }
        }
        break;
        case FLUSH_DISK_TIMEOUT:
        case FLUSH_SLAVE_TIMEOUT:
        case SLAVE_NOT_AVAILABLE:  // 當(dāng)備broker狀態(tài)不可用時(shí),半消息要回滾,不執(zhí)行本地事務(wù)
            localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
            break;
        default:
            break;
    }

本地事務(wù)執(zhí)行結(jié)束,根據(jù)本地事務(wù)狀態(tài)進(jìn)行二階段處理:

源碼清單-4

    try {
        this.endTransaction(sendResult, localTransactionState, localException);
    } catch (Exception e) {
        log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
    }

    // 組裝發(fā)送結(jié)果
    // ...
    return transactionSendResult;
}

接下來(lái),我們深入每個(gè)階段代碼分析。

深扒內(nèi)幕

Ⅰ階段發(fā)送

重點(diǎn)分析 send 方法。進(jìn)入 send 方法后,我們發(fā)現(xiàn),RocketMQ 的事務(wù)消息的一階段,使用了 SYNC 同步模式:

源碼清單-5

public SendResult send(Message msg,
    long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

這一點(diǎn)很容易理解,畢竟事務(wù)消息是要根據(jù)一階段發(fā)送結(jié)果來(lái)決定要不要執(zhí)行本地事務(wù)的,所以一定要阻塞等待 broker 的 ack。

我們進(jìn)入 DefaultMQProducerImpl.java 中去看 sendDefaultImpl 方法的實(shí)現(xiàn),通過(guò)讀這個(gè)方法的代碼,來(lái)嘗試了解在事務(wù)消息的一階段發(fā)送過(guò)程中 producer 的行為。

值得注意的是,這個(gè)方法并非為事務(wù)消息定制,甚至不是為 SYNC 同步模式定制的,因此讀懂了這段代碼,基本可以對(duì) RocketMQ 的消息發(fā)送機(jī)制有了一個(gè)較為全面的認(rèn)識(shí)。

這段代碼邏輯非常通暢,不忍切片。為了節(jié)省篇幅,將代碼中較為繁雜但信息量不大的部分以注釋代替,盡可能保留流程的完整性。個(gè)人認(rèn)為較為重要或是容易被忽略的部分,以注釋標(biāo)出,后文還有部分細(xì)節(jié)的詳細(xì)解讀。

源碼清單-6

private SendResult sendDefaultImpl(
    Message msg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    this.makeSureStateOK();
    // 一、消息有效性校驗(yàn)。見(jiàn)后文
    Validators.checkMessage(msg, this.defaultMQProducer);
    final long invokeID = random.nextLong();
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev = beginTimestampFirst;
    long endTimestamp = beginTimestampFirst;

    // 獲取當(dāng)前topic的發(fā)送路由信息,主要是要broker,如果沒(méi)找到則從namesrv獲取
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        boolean callTimeout = false;
        MessageQueue mq = null;
        Exception exception = null;
        SendResult sendResult = null;
        // 二、發(fā)送重試機(jī)制。見(jiàn)后文
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
        int times = 0;
        String[] brokersSent = new String[timesTotal];
        for (; times < timesTotal; times++) {
            // 第一次發(fā)送是mq == null, 之后都是有broker信息的
            String lastBrokerName = null == mq ? null : mq.getBrokerName();
            // 三、rocketmq發(fā)送消息時(shí)如何選擇隊(duì)列?——broker異常規(guī)避機(jī)制 
            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

            if (mqSelected != null) {
                mq = mqSelected;
                brokersSent[times] = mq.getBrokerName();
                try {
                    beginTimestampPrev = System.currentTimeMillis();
                    if (times > 0) {
                        //Reset topic with namespace during resend.
                        msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                    }
                    long costTime = beginTimestampPrev - beginTimestampFirst;
                    if (timeout < costTime) {
                        callTimeout = true;
                        break;
                    }
                    // 發(fā)送核心代碼
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                    endTimestamp = System.currentTimeMillis();
                    // rocketmq 選擇 broker 時(shí)的規(guī)避機(jī)制,開(kāi)啟 sendLatencyFaultEnable == true 才生效
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);

                    switch (communicationMode) {
                    // 四、RocketMQ的三種CommunicationMode。見(jiàn)后文
                        case ASYNC: // 異步模式
                            return null;
                        case ONEWAY: // 單向模式
                            return null;
                        case SYNC: // 同步模式
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                    continue;
                                }
                            }
                            return sendResult;
                        default:
                            break;
                    }
                } catch (RemotingException e) {
                    // ...
                    // 自動(dòng)重試
                } catch (MQClientException e) {
                    // ...
                    // 自動(dòng)重試
                } catch (MQBrokerException e) {
                   // ...
                    // 僅返回碼==NOT_IN_CURRENT_UNIT==205 時(shí)自動(dòng)重試
                    // 其他情況不重試,拋異常
                } catch (InterruptedException e) {
                   // ...
                    // 不重試,拋異常
                }
            } else {
                break;
            }
        }

        if (sendResult != null) {
            return sendResult;
        }

        // 組裝返回的info信息,最后以MQClientException拋出
        // ... ...

        // 超時(shí)場(chǎng)景拋RemotingTooMuchRequestException
        if (callTimeout) {
            throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
        }

        // 填充MQClientException異常信息
        // ...
    }

    validateNameServerSetting();

    throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
        null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
一、消息有效性校驗(yàn)

源碼清單-7

 Validators.checkMessage(msg, this.defaultMQProducer);

在此方法中校驗(yàn)消息的有效性,包括對(duì) topic 和消息體的校驗(yàn)。topic 的命名必須符合規(guī)范,且避免使用內(nèi)置的系統(tǒng)消息 TOPIC。消息體長(zhǎng)度 > 0 && 消息體長(zhǎng)度 <= 102410244 = 4M 。

源碼清單-8

public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
    throws MQClientException {
    if (null == msg) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
    }
    // topic
    Validators.checkTopic(msg.getTopic());
    Validators.isNotAllowedSendTopic(msg.getTopic());

    // body
    if (null == msg.getBody()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
    }

    if (0 == msg.getBody().length) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
    }

    if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
            "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
    }
}
二、發(fā)送重試機(jī)制

Producer 在消息發(fā)送不成功時(shí),會(huì)自動(dòng)重試,最多發(fā)送次數(shù) = retryTimesWhenSendFailed + 1 = 3 次 。

值得注意的是,并非所有異常情況都會(huì)重試,從以上源碼中可以提取到的信息告訴我們,在以下三種情況下,會(huì)自動(dòng)重試:

  • 發(fā)生 RemotingException,MQClientException 兩種異常之一時(shí)

  • 發(fā)生 MQBrokerException 異常,且 ResponseCode 是 NOT_IN_CURRENT_UNIT = 205 時(shí)

  • SYNC 模式下,未發(fā)生異常且發(fā)送結(jié)果狀態(tài)非 SEND_OK

在每次發(fā)送消息之前,會(huì)先檢查是否在前面這兩步就已經(jīng)耗時(shí)超長(zhǎng)(超時(shí)時(shí)長(zhǎng)默認(rèn) 3000ms),若是,則不再繼續(xù)發(fā)送并且直接返回超時(shí),不再重試。這里說(shuō)明了 2 個(gè)問(wèn)題:

  • producer 內(nèi)部自動(dòng)重試對(duì)業(yè)務(wù)應(yīng)用而言是無(wú)感知的,應(yīng)用看到的發(fā)送耗時(shí)是包含所有重試的耗時(shí)在內(nèi)的;

  • 一旦超時(shí)意味著本次消息發(fā)送已經(jīng)以失敗告終,原因是超時(shí)。這個(gè)信息最后會(huì)以 RemotingTooMuchRequestException 的形式拋出。

這里需要指出的是,在 RocketMQ 官方文檔中指出,發(fā)送超時(shí)時(shí)長(zhǎng)是 10s,即 10000ms,網(wǎng)上許多人對(duì) rocketMQ 的超時(shí)時(shí)間解讀也認(rèn)為是 10s。然而代碼中卻明明白白寫(xiě)著 3000ms,最終我 debug 之后確認(rèn),默認(rèn)超時(shí)時(shí)間確實(shí)是 3000ms。

三、broker 的異常規(guī)避機(jī)制

源碼清單-9

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

這行代碼是發(fā)送前選擇 queue 的過(guò)程。

這里涉及 RocketMQ 消息發(fā)送高可用的的一個(gè)核心機(jī)制,latencyFaultTolerance。這個(gè)機(jī)制是 Producer 負(fù)載均衡的一部分,通過(guò) sendLatencyFaultEnable 的值來(lái)控制,默認(rèn)是 false 關(guān)閉狀態(tài),不啟動(dòng) broker 故障延遲機(jī)制,值為 true 時(shí)啟用 broker 故障延遲機(jī)制,可由 Producer 主動(dòng)打開(kāi)。

選擇隊(duì)列時(shí),開(kāi)啟異常規(guī)避機(jī)制,則根據(jù) broker 的工作狀態(tài)避免選擇當(dāng)前狀態(tài)不佳的 broker 代理,不健康的 broker 會(huì)在一段時(shí)間內(nèi)被規(guī)避,不開(kāi)啟異常規(guī)避機(jī)制時(shí),則按順序選取下一個(gè)隊(duì)列,但在重試場(chǎng)景下會(huì)盡量選擇不同于上次發(fā)送 broker 的 queue。每次消息發(fā)送都會(huì)通過(guò) updateFaultItem 方法來(lái)維護(hù) broker 的狀態(tài)信息。

源碼清單-10

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    if (this.sendLatencyFaultEnable) {
        // 計(jì)算延遲多久,isolation表示是否需要隔離該broker,若是,則從30s往前找第一個(gè)比30s小的延遲值,再按下標(biāo)判斷規(guī)避的周期,若30s,則是10min規(guī)避;
        // 否則,按上一次發(fā)送耗時(shí)來(lái)決定規(guī)避時(shí)長(zhǎng);
        long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
    }
}

深入到 selectOneMessageQueue 方法內(nèi)部一探究竟:

源碼清單-11

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    if (this.sendLatencyFaultEnable) {
        // 開(kāi)啟異常規(guī)避
        try {
            int index = tpInfo.getSendWhichQueue().getAndIncrement();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                // 按順序取下一個(gè)message queue作為發(fā)送的queue
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                // 當(dāng)前queue所在的broker可用,且與上一個(gè)queue的broker相同,
                // 或者第一次發(fā)送,則使用這個(gè)queue
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                    if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                        return mq;
                }
            }

            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                }
                return mq;
            } else {
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }

        return tpInfo.selectOneMessageQueue();
    }
    // 不開(kāi)啟異常規(guī)避,則隨機(jī)自增選擇Queue
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}
四、RocketMQ 的三種 CommunicationMode

源碼清單-12

 public enum CommunicationMode {
    SYNC,
    ASYNC,
    ONEWAY,
}

以上三種模式指的都是消息從發(fā)送方到達(dá) broker 的階段,不包含 broker 將消息投遞給訂閱方的過(guò)程。三種模式的發(fā)送方式的差異:

**單向模式:**ONEWAY。消息發(fā)送方只管發(fā)送,并不關(guān)心 broker 處理的結(jié)果如何。這種模式下,由于處理流程少,發(fā)送耗時(shí)非常小,吞吐量大,但不能保證消息可靠不丟,常用于流量巨大但不重要的消息場(chǎng)景,例如心跳發(fā)送等。

**異步模式:**ASYNC。消息發(fā)送方發(fā)送消息到 broker 后,無(wú)需等待 broker 處理,拿到的是 null 的返回值,而由一個(gè)異步的線(xiàn)程來(lái)做消息處理,處理完成后以回調(diào)的形式告訴發(fā)送方發(fā)送結(jié)果。異步處理時(shí)如有異常,返回發(fā)送方失敗結(jié)果之前,會(huì)經(jīng)過(guò)內(nèi)部重試(默認(rèn) 3 次,發(fā)送方不感知)。這種模式下,發(fā)送方等待時(shí)長(zhǎng)較小,吞吐量較大,消息可靠,用于流量大但重要的消息場(chǎng)景。

**同步模式:**SYNC。消息發(fā)送方需等待 broker 處理完成并明確返回成功或失敗,在消息發(fā)送方拿到消息發(fā)送失敗的結(jié)果之前,也會(huì)經(jīng)歷過(guò)內(nèi)部重試(默認(rèn) 3 次,發(fā)送方不感知)這種模式下,發(fā)送方會(huì)阻塞等待消息處理結(jié)果,等待時(shí)長(zhǎng)較長(zhǎng),消息可靠,用于流量不大但重要的消息場(chǎng)景。需要強(qiáng)調(diào)的是,事務(wù)消息的一階段半事務(wù)消息的處理是同步模式。

在 sendKernelImpl 方法中也可以看到具體的實(shí)現(xiàn)差異。ONEWAY 模式最為簡(jiǎn)單,不做任何處理。負(fù)責(zé)發(fā)送的 sendMessage 方法參數(shù)中,相比同步模式,異步模式多了回調(diào)方法、包含 topic 發(fā)送路由元信息的 topicPublishInfo、包含發(fā)送 broker 信息的 instance、包含發(fā)送隊(duì)列信息的 producer、重試次數(shù)。另外,異步模式下,會(huì)對(duì)有壓縮的消息先做 copy。

源碼清單-13

    switch (communicationMode) {
                case ASYNC:
                    Message tmpMessage = msg;
                    boolean messageCloned = false;
                    if (msgBodyCompressed) {
                        //If msg body was compressed, msgbody should be reset using prevBody.
                        //Clone new message using commpressed message body and recover origin massage.
                        //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                        tmpMessage = MessageAccessor.cloneMessage(msg);
                        messageCloned = true;
                        msg.setBody(prevBody);
                    }

                    if (topicWithNamespace) {
                        if (!messageCloned) {
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            messageCloned = true;
                        }
                        msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
                    }

                    long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeAsync) {
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        mq.getBrokerName(),
                        tmpMessage,
                        requestHeader,
                        timeout - costTimeAsync,
                        communicationMode,
                        sendCallback,
                        topicPublishInfo,
                        this.mQClientFactory,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                        context,
                        this);
                    break;
                case ONEWAY:
                case SYNC:
                    long costTimeSync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeSync) {
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        mq.getBrokerName(),
                        msg,
                        requestHeader,
                        timeout - costTimeSync,
                        communicationMode,
                        context,
                        this);
                    break;
                default:
                    assert false;
                    break;
            }

官方文檔中有這樣一張圖,十分清晰的描述了異步通信的詳細(xì)過(guò)程:

如何理解Github上14.1k Star的RocketMQ

Ⅱ 階段發(fā)送

源碼清單-3 體現(xiàn)了本地事務(wù)的執(zhí)行,localTransactionState 將本地事務(wù)執(zhí)行結(jié)果與事務(wù)消息二階段的發(fā)送關(guān)聯(lián)起來(lái)。

值得注意的是,如果一階段的發(fā)送結(jié)果是 SLAVENOTAVAILABLE,即便 broker 不可用時(shí),也會(huì)將 localTransactionState 置為 Rollback,此時(shí)將不會(huì)執(zhí)行本地事務(wù)。之后由 endTransaction 方法負(fù)責(zé)二階段提交,見(jiàn)源碼清單-4。具體到 endTransaction 的實(shí)現(xiàn):

源碼清單-14

public void endTransaction(
    final SendResult sendResult,
    final LocalTransactionState localTransactionState,
    final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
    final MessageId id;
    if (sendResult.getOffsetMsgId() != null) {
        id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
    } else {
        id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
    }
    String transactionId = sendResult.getTransactionId();
    final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
    EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
    requestHeader.setTransactionId(transactionId);
    requestHeader.setCommitLogOffset(id.getOffset());
    switch (localTransactionState) {
        case COMMIT_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
            break;
        case ROLLBACK_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
            break;
        case UNKNOW:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
            break;
        default:
            break;
    }

    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
    requestHeader.setMsgId(sendResult.getMsgId());
    String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
    // 采用oneway的方式發(fā)送二階段消息
    this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
        this.defaultMQProducer.getSendMsgTimeout());
}

在二階段發(fā)送時(shí),之所以用 oneway 的方式發(fā)送,個(gè)人理解這正是因?yàn)槭聞?wù)消息有一個(gè)特殊的可靠機(jī)制——回查。

消息回查

當(dāng) Broker 經(jīng)過(guò)了一個(gè)特定的時(shí)間,發(fā)現(xiàn)依然沒(méi)有得到事務(wù)消息的二階段是否要提交或者回滾的確切信息,Broker 不知道 Producer 發(fā)生了什么情況(可能 producer 掛了,也可能 producer 發(fā)了 commit 但網(wǎng)絡(luò)抖動(dòng)丟了,也可能……于是主動(dòng)發(fā)起回查。

事務(wù)消息的回查機(jī)制,更多的是在 broker 端的體現(xiàn)。RocketMQ 的 broker 以 Half 消息、Op 消息、真實(shí)消息三個(gè)不同的 topic 來(lái)將不同發(fā)送階段的事務(wù)消息進(jìn)行了隔離,使得 Consumer 只能看到最終確認(rèn) commit 需要投遞出去的消息。其中詳細(xì)的實(shí)現(xiàn)邏輯在本文中暫不多贅述,后續(xù)可另開(kāi)一篇專(zhuān)門(mén)來(lái)從 Broker 視角來(lái)解讀。

回到 Producer 的視角,當(dāng)收到了 Broker 的回查請(qǐng)求,Producer 將根據(jù)消息檢查本地事務(wù)狀態(tài),根據(jù)結(jié)果決定提交或回滾,這就要求 Producer 必須指定回查實(shí)現(xiàn),以備不時(shí)之需。當(dāng)然,正常情況下,并不推薦主動(dòng)發(fā)送 UNKNOW 狀態(tài),這個(gè)狀態(tài)毫無(wú)疑問(wèn)會(huì)給 broker 帶來(lái)額外回查開(kāi)銷(xiāo),只在出現(xiàn)不可預(yù)知的異常情況時(shí)才啟動(dòng)回查機(jī)制,是一種比較合理的選擇。

另外,4.7.1 版本的事務(wù)回查并非無(wú)限回查,而是最多回查 15 次:

源碼清單-15

/**
 * The maximum number of times the message was checked, if exceed this value, this message will be discarded.
 */
@ImportantField
private int transactionCheckMax = 15;

感謝各位的閱讀,以上就是“如何理解Github上14.1k Star的RocketMQ”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)如何理解Github上14.1k Star的RocketMQ這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI