您好,登錄后才能下訂單哦!
這篇文章主要介紹了RocketMQ事務(wù)消息如何實(shí)現(xiàn),具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
根據(jù)上文的描述,發(fā)送事務(wù)消息的入口為:
TransactionMQProducer#sendMessageInTransaction:public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException { if (null == this.transactionListener) { // @1 throw new MQClientException("TransactionListener is null", null); } return this.defaultMQProducerImpl.sendMessageInTransaction(msg, transactionListener, arg); // @2 }
代碼@1:如果transactionListener為空,則直接拋出異常。
代碼@2:調(diào)用defaultMQProducerImpl的sendMessageInTransaction方法。
接下來(lái)重點(diǎn)分享sendMessageInTransaction方法
DefaultMQProducerImpl#sendMessageInTransactionpublic TransactionSendResult sendMessageInTransaction(final Message msg, final TransactionListener tranExecuter, final Object arg) throws MQClientException {
Step1:首先先闡述一下參數(shù)含義。
final Message msg:消息
TransactionListener tranExecuter:事務(wù)監(jiān)聽(tīng)器
Object arg:其他附加參數(shù)
DefaultMQProducerImpl#sendMessageInTransaction SendResult sendResult = null; MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());try { sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); }
Step2:在消息屬性中增加兩個(gè)屬性:TRAN_MSG,其值為true,表示為事務(wù)消息;PGROUP:消息所屬發(fā)送者組,然后以同步方式發(fā)送消息。在消息發(fā)送之前,會(huì)先檢查消息的屬性TRAN_MSG,如果存在并且值為true,則通過(guò)設(shè)置消息系統(tǒng)標(biāo)記的方式,設(shè)置消息為MessageSysFlag.TRANSACTION_PREPARED_TYPE。
DefaultMQProducerImpl#sendKernelImplfinal String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; } SendMessageProcessor#sendMessage String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (traFlag != null && Boolean.parseBoolean(traFlag)) { if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); return response; } putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner); } else { putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); }
Step3:Broker端收到客戶(hù)端發(fā)送消息請(qǐng)求后,判斷消息類(lèi)型。如果是事務(wù)消息,則調(diào)用TransactionalMessageService#prepareMessage方法,否則走原先的邏輯,調(diào)用MessageStore#putMessage方法將消息存入Broker服務(wù)端。
本節(jié)重點(diǎn)闡述事務(wù)消息的實(shí)現(xiàn)原理,故接下來(lái)將重點(diǎn)關(guān)注prepareMessage方法,如想了解RocketMQ消息存儲(chǔ)相關(guān),可以關(guān)注作者
源碼分析RocketMQ系列
。
org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#prepareMessagepublic PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) { return transactionalMessageBridge.putHalfMessage(messageInner); }
step4:事務(wù)消息,將調(diào)用TransactionalMessageServiceImpl#prepareMessage方法,繼而調(diào)用TransactionalMessageBridge#prepareMessage方法。
TransactionalMessageBridge#parseHalfMessageInnerpublic PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) { return store.putMessage(parseHalfMessageInner(messageInner)); } private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) { MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId())); msgInner.setSysFlag( MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE)); msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic()); msgInner.setQueueId(0); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); return msgInner; }
Step5:備份消息的原主題名稱(chēng)與原隊(duì)列ID,然后取消消息的事務(wù)消息標(biāo)簽,重新設(shè)置消息的主題為:RMQ_SYS_TRANS_HALF_TOPIC,隊(duì)列ID固定為0。然后調(diào)用MessageStore#putMessage方法將消息持久化,這里TransactionalMessageBridge橋接類(lèi),就是封裝事務(wù)消息的相關(guān)流程,最終調(diào)用MessageStore完成消息的持久化。消息入庫(kù)后,會(huì)繼續(xù)回到DefaultMQProducerImpl#sendMessageInTransaction,上文的Step2后面,也就是通過(guò)同步將消息發(fā)送到消息服務(wù)端。
注:這是事務(wù)消息Prepare狀態(tài)的處理邏輯,消息是存儲(chǔ)在消息服務(wù)器了,但存儲(chǔ)的并不是原主題,而是RMQ_SYS_TRANS_HALF_TOPIC,故此時(shí)消費(fèi)端是無(wú)法消費(fèi)shen
生產(chǎn)者發(fā)送的消息的??吹竭@里,如果對(duì)RocketMQ比較熟悉的話,肯定會(huì)有一個(gè)“定時(shí)任務(wù)”去取這個(gè)主題下的消息,然后則“合適”的時(shí)機(jī)將消息的主題恢復(fù)。
DefaultMQProducerImpl#sendMessageInTransactionswitch (sendResult.getSendStatus()) { case SEND_OK: { try { if (sendResult.getTransactionId() != null) { msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); } String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null != transactionId && !"".equals(transactionId)) { msg.setTransactionId(transactionId); } localTransactionState = tranExecuter.executeLocalTransaction(msg, arg); if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { log.info("executeLocalTransactionBranch return {}", localTransactionState); log.info(msg.toString()); } } catch (Throwable e) { log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; } } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; }
Step6:如果消息發(fā)送成功,會(huì)回調(diào)TransactionListener#executeLocalTransaction方法,執(zhí)行本地事務(wù),并且返回本地事務(wù)狀態(tài)為L(zhǎng)ocalTransactionState,枚舉值如下:
COMMIT_MESSAGE,
ROLLBACK_MESSAGE,
UNKNOW
注意:TransactionListener#executeLocalTransaction是在發(fā)送者成功發(fā)送PREPARED消息后,會(huì)執(zhí)行本地事務(wù)方法,然后返回本地事務(wù)狀態(tài);如果PREPARED消息發(fā)送失敗,則不會(huì)調(diào)用TransactionListener#executeLocalTransaction,并且本地事務(wù)消息,設(shè)置為L(zhǎng)ocalTransactionState.ROLLBACK_MESSAGE,表示消息需要被回滾。
DefaultMQProducerImpl#sendMessageInTransactiontry {this.endTransaction(sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); }
step7:調(diào)用endTransaction方法結(jié)束事務(wù)(提交或回滾)。
DefaultMQProducerImpl#endTransaction EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset());switch (localTransactionState) { case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; case UNKNOW: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break; } requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); requestHeader.setMsgId(sendResult.getMsgId());
step8:組裝結(jié)束事務(wù)請(qǐng)求,主要參數(shù)為:事務(wù)ID、事務(wù)操作(commitOrRollback)、消費(fèi)組、消息隊(duì)列偏移量、消息ID,fromTransactionCheck,從這里發(fā)出的請(qǐng)求,默認(rèn)為false。Broker端的請(qǐng)求處理器為:EndTransactionProcessor。
step9:EndTransactionProcessor根據(jù)事務(wù)提交類(lèi)型:TRANSACTION_COMMIT_TYPE(提交事務(wù))、TRANSACTION_ROLLBACK_TYPE(回滾事務(wù))、TRANSACTION_NOT_TYPE(忽略該請(qǐng)求)。
到目前為止,已詳細(xì)梳理了RocketMQ事務(wù)消息的發(fā)送流程,更加準(zhǔn)確的說(shuō)是Prepare狀態(tài)的消息發(fā)送流程。具體流程如圖所示:
cdn.com/b0bfa9bc730e64d5d03312968f1a2b07132364f1.png">
本文到這里,初步展示了事務(wù)消息的發(fā)送流程,總的說(shuō)來(lái),RocketMQ的事務(wù)消息發(fā)送使用二階段提交思路,首先,在消息發(fā)送時(shí),先發(fā)送消息類(lèi)型為Prepread類(lèi)型的消息,然后在將該消息成功存入到消息服務(wù)器后,會(huì)回調(diào)TransactionListener#executeLocalTransaction,執(zhí)行本地事務(wù)狀態(tài)回調(diào)函數(shù),然后根據(jù)該方法的返回值,結(jié)束事務(wù):
1、COMMIT_MESSAGE :提交事務(wù)。
2、ROLLBACK_MESSAGE:回滾事務(wù)。
3、UNKNOW:未知事務(wù)狀態(tài),此時(shí)消息服務(wù)器(Broker)收到EndTransaction命令時(shí),將不對(duì)這種消息做處理,消息還處于Prepared類(lèi)型,存儲(chǔ)在主題為:RMQ_SYS_TRANS_HALF_TOPIC的隊(duì)列中,然后消息發(fā)送流程將結(jié)束,那這些消息如何提交或回滾呢?
為了實(shí)現(xiàn)避免客戶(hù)端需要再次發(fā)送提交、回滾命令,RocketMQ會(huì)采取定時(shí)任務(wù)將RMQ_SYS_TRANS_HALF_TOPIC中的消息取出,然后回到客戶(hù)端,判斷該消息是否需要提交或回滾,來(lái)完成事務(wù)消息的聲明周期。
感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“RocketMQ事務(wù)消息如何實(shí)現(xiàn)”這篇文章對(duì)大家有幫助,同時(shí)也希望大家多多支持億速云,關(guān)注億速云行業(yè)資訊頻道,更多相關(guān)知識(shí)等著你來(lái)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。