您好,登錄后才能下訂單哦!
這篇文章主要講解了“如何理解Github上14.1k Star的RocketMQ”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“如何理解Github上14.1k Star的RocketMQ”吧!
RocketMQ 事務(wù)消息發(fā)送流程:
結(jié)合源碼來(lái)看,RocketMQ 的事務(wù)消息 TransactionMQProducer 的 sendMessageInTransaction 方法,實(shí)際調(diào)用了 DefaultMQProducerImpl 的 sendMessageInTransaction 方法。我們進(jìn)入 sendMessageInTransaction 方法,整個(gè)事務(wù)消息的發(fā)送流程清晰可見(jiàn)。
首先,做發(fā)送前檢查,并填入必要參數(shù),包括設(shè) prepare 事務(wù)消息。
源碼清單-1
public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { TransactionListener transactionListener = getCheckListener(); if (null == localTransactionExecuter && null == transactionListener) { throw new MQClientException("tranExecutor is null", null); } // ignore DelayTimeLevel parameter if (msg.getDelayTimeLevel() != 0) { MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL); } Validators.checkMessage(msg, this.defaultMQProducer); SendResult sendResult = null; MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
進(jìn)入發(fā)送處理流程:
源碼清單-2
try { sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); }
根據(jù) broker 返回的處理結(jié)果決策本地事務(wù)是否執(zhí)行,半消息發(fā)送成功則開(kāi)始本地事務(wù)執(zhí)行:
源碼清單-3
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; switch (sendResult.getSendStatus()) { case SEND_OK: { try { if (sendResult.getTransactionId() != null) { msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); } String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null != transactionId && !"".equals(transactionId)) { msg.setTransactionId(transactionId); } if (null != localTransactionExecuter) { localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); } else if (transactionListener != null) { log.debug("Used new transaction API"); localTransactionState = transactionListener.executeLocalTransaction(msg, arg); } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { log.info("executeLocalTransactionBranch return {}", localTransactionState); log.info(msg.toString()); } } catch (Throwable e) { log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; } } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: // 當(dāng)備broker狀態(tài)不可用時(shí),半消息要回滾,不執(zhí)行本地事務(wù) localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; }
本地事務(wù)執(zhí)行結(jié)束,根據(jù)本地事務(wù)狀態(tài)進(jìn)行二階段處理:
源碼清單-4
try { this.endTransaction(sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); } // 組裝發(fā)送結(jié)果 // ... return transactionSendResult; }
接下來(lái),我們深入每個(gè)階段代碼分析。
重點(diǎn)分析 send 方法。進(jìn)入 send 方法后,我們發(fā)現(xiàn),RocketMQ 的事務(wù)消息的一階段,使用了 SYNC 同步模式:
源碼清單-5
public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); }
這一點(diǎn)很容易理解,畢竟事務(wù)消息是要根據(jù)一階段發(fā)送結(jié)果來(lái)決定要不要執(zhí)行本地事務(wù)的,所以一定要阻塞等待 broker 的 ack。
我們進(jìn)入 DefaultMQProducerImpl.java 中去看 sendDefaultImpl 方法的實(shí)現(xiàn),通過(guò)讀這個(gè)方法的代碼,來(lái)嘗試了解在事務(wù)消息的一階段發(fā)送過(guò)程中 producer 的行為。
值得注意的是,這個(gè)方法并非為事務(wù)消息定制,甚至不是為 SYNC 同步模式定制的,因此讀懂了這段代碼,基本可以對(duì) RocketMQ 的消息發(fā)送機(jī)制有了一個(gè)較為全面的認(rèn)識(shí)。
這段代碼邏輯非常通暢,不忍切片。為了節(jié)省篇幅,將代碼中較為繁雜但信息量不大的部分以注釋代替,盡可能保留流程的完整性。個(gè)人認(rèn)為較為重要或是容易被忽略的部分,以注釋標(biāo)出,后文還有部分細(xì)節(jié)的詳細(xì)解讀。
源碼清單-6
private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); // 一、消息有效性校驗(yàn)。見(jiàn)后文 Validators.checkMessage(msg, this.defaultMQProducer); final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; // 獲取當(dāng)前topic的發(fā)送路由信息,主要是要broker,如果沒(méi)找到則從namesrv獲取 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { boolean callTimeout = false; MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; // 二、發(fā)送重試機(jī)制。見(jiàn)后文 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { // 第一次發(fā)送是mq == null, 之后都是有broker信息的 String lastBrokerName = null == mq ? null : mq.getBrokerName(); // 三、rocketmq發(fā)送消息時(shí)如何選擇隊(duì)列?——broker異常規(guī)避機(jī)制 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } // 發(fā)送核心代碼 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); // rocketmq 選擇 broker 時(shí)的規(guī)避機(jī)制,開(kāi)啟 sendLatencyFaultEnable == true 才生效 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { // 四、RocketMQ的三種CommunicationMode。見(jiàn)后文 case ASYNC: // 異步模式 return null; case ONEWAY: // 單向模式 return null; case SYNC: // 同步模式 if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } catch (RemotingException e) { // ... // 自動(dòng)重試 } catch (MQClientException e) { // ... // 自動(dòng)重試 } catch (MQBrokerException e) { // ... // 僅返回碼==NOT_IN_CURRENT_UNIT==205 時(shí)自動(dòng)重試 // 其他情況不重試,拋異常 } catch (InterruptedException e) { // ... // 不重試,拋異常 } } else { break; } } if (sendResult != null) { return sendResult; } // 組裝返回的info信息,最后以MQClientException拋出 // ... ... // 超時(shí)場(chǎng)景拋RemotingTooMuchRequestException if (callTimeout) { throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout"); } // 填充MQClientException異常信息 // ... } validateNameServerSetting(); throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); }
源碼清單-7
Validators.checkMessage(msg, this.defaultMQProducer);
在此方法中校驗(yàn)消息的有效性,包括對(duì) topic 和消息體的校驗(yàn)。topic 的命名必須符合規(guī)范,且避免使用內(nèi)置的系統(tǒng)消息 TOPIC。消息體長(zhǎng)度 > 0 && 消息體長(zhǎng)度 <= 102410244 = 4M 。
源碼清單-8
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException { if (null == msg) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null"); } // topic Validators.checkTopic(msg.getTopic()); Validators.isNotAllowedSendTopic(msg.getTopic()); // body if (null == msg.getBody()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null"); } if (0 == msg.getBody().length) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero"); } if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize()); } }
Producer 在消息發(fā)送不成功時(shí),會(huì)自動(dòng)重試,最多發(fā)送次數(shù) = retryTimesWhenSendFailed + 1 = 3 次 。
值得注意的是,并非所有異常情況都會(huì)重試,從以上源碼中可以提取到的信息告訴我們,在以下三種情況下,會(huì)自動(dòng)重試:
發(fā)生 RemotingException,MQClientException 兩種異常之一時(shí)
發(fā)生 MQBrokerException 異常,且 ResponseCode 是 NOT_IN_CURRENT_UNIT = 205 時(shí)
SYNC 模式下,未發(fā)生異常且發(fā)送結(jié)果狀態(tài)非 SEND_OK
在每次發(fā)送消息之前,會(huì)先檢查是否在前面這兩步就已經(jīng)耗時(shí)超長(zhǎng)(超時(shí)時(shí)長(zhǎng)默認(rèn) 3000ms),若是,則不再繼續(xù)發(fā)送并且直接返回超時(shí),不再重試。這里說(shuō)明了 2 個(gè)問(wèn)題:
producer 內(nèi)部自動(dòng)重試對(duì)業(yè)務(wù)應(yīng)用而言是無(wú)感知的,應(yīng)用看到的發(fā)送耗時(shí)是包含所有重試的耗時(shí)在內(nèi)的;
一旦超時(shí)意味著本次消息發(fā)送已經(jīng)以失敗告終,原因是超時(shí)。這個(gè)信息最后會(huì)以 RemotingTooMuchRequestException 的形式拋出。
這里需要指出的是,在 RocketMQ 官方文檔中指出,發(fā)送超時(shí)時(shí)長(zhǎng)是 10s,即 10000ms,網(wǎng)上許多人對(duì) rocketMQ 的超時(shí)時(shí)間解讀也認(rèn)為是 10s。然而代碼中卻明明白白寫(xiě)著 3000ms,最終我 debug 之后確認(rèn),默認(rèn)超時(shí)時(shí)間確實(shí)是 3000ms。
源碼清單-9
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
這行代碼是發(fā)送前選擇 queue 的過(guò)程。
這里涉及 RocketMQ 消息發(fā)送高可用的的一個(gè)核心機(jī)制,latencyFaultTolerance。這個(gè)機(jī)制是 Producer 負(fù)載均衡的一部分,通過(guò) sendLatencyFaultEnable 的值來(lái)控制,默認(rèn)是 false 關(guān)閉狀態(tài),不啟動(dòng) broker 故障延遲機(jī)制,值為 true 時(shí)啟用 broker 故障延遲機(jī)制,可由 Producer 主動(dòng)打開(kāi)。
選擇隊(duì)列時(shí),開(kāi)啟異常規(guī)避機(jī)制,則根據(jù) broker 的工作狀態(tài)避免選擇當(dāng)前狀態(tài)不佳的 broker 代理,不健康的 broker 會(huì)在一段時(shí)間內(nèi)被規(guī)避,不開(kāi)啟異常規(guī)避機(jī)制時(shí),則按順序選取下一個(gè)隊(duì)列,但在重試場(chǎng)景下會(huì)盡量選擇不同于上次發(fā)送 broker 的 queue。每次消息發(fā)送都會(huì)通過(guò) updateFaultItem 方法來(lái)維護(hù) broker 的狀態(tài)信息。
源碼清單-10
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { if (this.sendLatencyFaultEnable) { // 計(jì)算延遲多久,isolation表示是否需要隔離該broker,若是,則從30s往前找第一個(gè)比30s小的延遲值,再按下標(biāo)判斷規(guī)避的周期,若30s,則是10min規(guī)避; // 否則,按上一次發(fā)送耗時(shí)來(lái)決定規(guī)避時(shí)長(zhǎng); long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } }
深入到 selectOneMessageQueue 方法內(nèi)部一探究竟:
源碼清單-11
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { if (this.sendLatencyFaultEnable) { // 開(kāi)啟異常規(guī)避 try { int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; // 按順序取下一個(gè)message queue作為發(fā)送的queue MessageQueue mq = tpInfo.getMessageQueueList().get(pos); // 當(dāng)前queue所在的broker可用,且與上一個(gè)queue的broker相同, // 或者第一次發(fā)送,則使用這個(gè)queue if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } // 不開(kāi)啟異常規(guī)避,則隨機(jī)自增選擇Queue return tpInfo.selectOneMessageQueue(lastBrokerName); }
源碼清單-12
public enum CommunicationMode { SYNC, ASYNC, ONEWAY, }
以上三種模式指的都是消息從發(fā)送方到達(dá) broker 的階段,不包含 broker 將消息投遞給訂閱方的過(guò)程。三種模式的發(fā)送方式的差異:
**單向模式:**ONEWAY。消息發(fā)送方只管發(fā)送,并不關(guān)心 broker 處理的結(jié)果如何。這種模式下,由于處理流程少,發(fā)送耗時(shí)非常小,吞吐量大,但不能保證消息可靠不丟,常用于流量巨大但不重要的消息場(chǎng)景,例如心跳發(fā)送等。
**異步模式:**ASYNC。消息發(fā)送方發(fā)送消息到 broker 后,無(wú)需等待 broker 處理,拿到的是 null 的返回值,而由一個(gè)異步的線(xiàn)程來(lái)做消息處理,處理完成后以回調(diào)的形式告訴發(fā)送方發(fā)送結(jié)果。異步處理時(shí)如有異常,返回發(fā)送方失敗結(jié)果之前,會(huì)經(jīng)過(guò)內(nèi)部重試(默認(rèn) 3 次,發(fā)送方不感知)。這種模式下,發(fā)送方等待時(shí)長(zhǎng)較小,吞吐量較大,消息可靠,用于流量大但重要的消息場(chǎng)景。
**同步模式:**SYNC。消息發(fā)送方需等待 broker 處理完成并明確返回成功或失敗,在消息發(fā)送方拿到消息發(fā)送失敗的結(jié)果之前,也會(huì)經(jīng)歷過(guò)內(nèi)部重試(默認(rèn) 3 次,發(fā)送方不感知)這種模式下,發(fā)送方會(huì)阻塞等待消息處理結(jié)果,等待時(shí)長(zhǎng)較長(zhǎng),消息可靠,用于流量不大但重要的消息場(chǎng)景。需要強(qiáng)調(diào)的是,事務(wù)消息的一階段半事務(wù)消息的處理是同步模式。
在 sendKernelImpl 方法中也可以看到具體的實(shí)現(xiàn)差異。ONEWAY 模式最為簡(jiǎn)單,不做任何處理。負(fù)責(zé)發(fā)送的 sendMessage 方法參數(shù)中,相比同步模式,異步模式多了回調(diào)方法、包含 topic 發(fā)送路由元信息的 topicPublishInfo、包含發(fā)送 broker 信息的 instance、包含發(fā)送隊(duì)列信息的 producer、重試次數(shù)。另外,異步模式下,會(huì)對(duì)有壓縮的消息先做 copy。
源碼清單-13
switch (communicationMode) { case ASYNC: Message tmpMessage = msg; boolean messageCloned = false; if (msgBodyCompressed) { //If msg body was compressed, msgbody should be reset using prevBody. //Clone new message using commpressed message body and recover origin massage. //Fix bug:https://github.com/apache/rocketmq-externals/issues/66 tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; msg.setBody(prevBody); } if (topicWithNamespace) { if (!messageCloned) { tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; } msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); } long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeAsync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout - costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this); break; case ONEWAY: case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this); break; default: assert false; break; }
官方文檔中有這樣一張圖,十分清晰的描述了異步通信的詳細(xì)過(guò)程:
源碼清單-3 體現(xiàn)了本地事務(wù)的執(zhí)行,localTransactionState 將本地事務(wù)執(zhí)行結(jié)果與事務(wù)消息二階段的發(fā)送關(guān)聯(lián)起來(lái)。
值得注意的是,如果一階段的發(fā)送結(jié)果是 SLAVENOTAVAILABLE,即便 broker 不可用時(shí),也會(huì)將 localTransactionState 置為 Rollback,此時(shí)將不會(huì)執(zhí)行本地事務(wù)。之后由 endTransaction 方法負(fù)責(zé)二階段提交,見(jiàn)源碼清單-4。具體到 endTransaction 的實(shí)現(xiàn):
源碼清單-14
public void endTransaction( final SendResult sendResult, final LocalTransactionState localTransactionState, final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { final MessageId id; if (sendResult.getOffsetMsgId() != null) { id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()); } else { id = MessageDecoder.decodeMessageId(sendResult.getMsgId()); } String transactionId = sendResult.getTransactionId(); final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName()); EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset()); switch (localTransactionState) { case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; case UNKNOW: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break; } requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); requestHeader.setMsgId(sendResult.getMsgId()); String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null; // 采用oneway的方式發(fā)送二階段消息 this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout()); }
在二階段發(fā)送時(shí),之所以用 oneway 的方式發(fā)送,個(gè)人理解這正是因?yàn)槭聞?wù)消息有一個(gè)特殊的可靠機(jī)制——回查。
當(dāng) Broker 經(jīng)過(guò)了一個(gè)特定的時(shí)間,發(fā)現(xiàn)依然沒(méi)有得到事務(wù)消息的二階段是否要提交或者回滾的確切信息,Broker 不知道 Producer 發(fā)生了什么情況(可能 producer 掛了,也可能 producer 發(fā)了 commit 但網(wǎng)絡(luò)抖動(dòng)丟了,也可能……于是主動(dòng)發(fā)起回查。
事務(wù)消息的回查機(jī)制,更多的是在 broker 端的體現(xiàn)。RocketMQ 的 broker 以 Half 消息、Op 消息、真實(shí)消息三個(gè)不同的 topic 來(lái)將不同發(fā)送階段的事務(wù)消息進(jìn)行了隔離,使得 Consumer 只能看到最終確認(rèn) commit 需要投遞出去的消息。其中詳細(xì)的實(shí)現(xiàn)邏輯在本文中暫不多贅述,后續(xù)可另開(kāi)一篇專(zhuān)門(mén)來(lái)從 Broker 視角來(lái)解讀。
回到 Producer 的視角,當(dāng)收到了 Broker 的回查請(qǐng)求,Producer 將根據(jù)消息檢查本地事務(wù)狀態(tài),根據(jù)結(jié)果決定提交或回滾,這就要求 Producer 必須指定回查實(shí)現(xiàn),以備不時(shí)之需。當(dāng)然,正常情況下,并不推薦主動(dòng)發(fā)送 UNKNOW 狀態(tài),這個(gè)狀態(tài)毫無(wú)疑問(wèn)會(huì)給 broker 帶來(lái)額外回查開(kāi)銷(xiāo),只在出現(xiàn)不可預(yù)知的異常情況時(shí)才啟動(dòng)回查機(jī)制,是一種比較合理的選擇。
另外,4.7.1 版本的事務(wù)回查并非無(wú)限回查,而是最多回查 15 次:
源碼清單-15
/** * The maximum number of times the message was checked, if exceed this value, this message will be discarded. */ @ImportantField private int transactionCheckMax = 15;
感謝各位的閱讀,以上就是“如何理解Github上14.1k Star的RocketMQ”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)如何理解Github上14.1k Star的RocketMQ這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。