溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

RocketMQ事務(wù)消息如何實(shí)現(xiàn)

發(fā)布時(shí)間:2021-12-17 17:39:45 來(lái)源:億速云 閱讀:182 作者:小新 欄目:開(kāi)發(fā)技術(shù)

這篇文章主要介紹了RocketMQ事務(wù)消息如何實(shí)現(xiàn),具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

根據(jù)上文的描述,發(fā)送事務(wù)消息的入口為:

TransactionMQProducer#sendMessageInTransaction:public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException {        if (null == this.transactionListener) {    // @1
            throw new MQClientException("TransactionListener is null", null);
        }        return this.defaultMQProducerImpl.sendMessageInTransaction(msg, transactionListener, arg);  // @2
    }

代碼@1:如果transactionListener為空,則直接拋出異常。
代碼@2:調(diào)用defaultMQProducerImpl的sendMessageInTransaction方法。

接下來(lái)重點(diǎn)分享sendMessageInTransaction方法

DefaultMQProducerImpl#sendMessageInTransactionpublic TransactionSendResult sendMessageInTransaction(final Message msg,           final TransactionListener tranExecuter, final Object arg)  throws MQClientException {

Step1:首先先闡述一下參數(shù)含義。

  • final Message msg:消息

  • TransactionListener tranExecuter:事務(wù)監(jiān)聽(tīng)器

  • Object arg:其他附加參數(shù)

DefaultMQProducerImpl#sendMessageInTransaction
SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());try {
       sendResult = this.send(msg);
} catch (Exception e) {       throw new MQClientException("send message Exception", e);
}

Step2:在消息屬性中增加兩個(gè)屬性:TRAN_MSG,其值為true,表示為事務(wù)消息;PGROUP:消息所屬發(fā)送者組,然后以同步方式發(fā)送消息。在消息發(fā)送之前,會(huì)先檢查消息的屬性TRAN_MSG,如果存在并且值為true,則通過(guò)設(shè)置消息系統(tǒng)標(biāo)記的方式,設(shè)置消息為MessageSysFlag.TRANSACTION_PREPARED_TYPE。

DefaultMQProducerImpl#sendKernelImplfinal String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
       sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
SendMessageProcessor#sendMessage
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (traFlag != null && Boolean.parseBoolean(traFlag)) {        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
             response.setCode(ResponseCode.NO_PERMISSION);
             response.setRemark(                    "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                        + "] sending transaction message is forbidden");             return response;
       }
      putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
      putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}

Step3:Broker端收到客戶(hù)端發(fā)送消息請(qǐng)求后,判斷消息類(lèi)型。如果是事務(wù)消息,則調(diào)用TransactionalMessageService#prepareMessage方法,否則走原先的邏輯,調(diào)用MessageStore#putMessage方法將消息存入Broker服務(wù)端。
本節(jié)重點(diǎn)闡述事務(wù)消息的實(shí)現(xiàn)原理,故接下來(lái)將重點(diǎn)關(guān)注prepareMessage方法,如想了解RocketMQ消息存儲(chǔ)相關(guān),可以關(guān)注作者 源碼分析RocketMQ系列 。

org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#prepareMessagepublic PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {        return transactionalMessageBridge.putHalfMessage(messageInner);
 }

step4:事務(wù)消息,將調(diào)用TransactionalMessageServiceImpl#prepareMessage方法,繼而調(diào)用TransactionalMessageBridge#prepareMessage方法。

TransactionalMessageBridge#parseHalfMessageInnerpublic PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {        return store.putMessage(parseHalfMessageInner(messageInner));
    }    private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
            String.valueOf(msgInner.getQueueId()));
        msgInner.setSysFlag(
            MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
        msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
        msgInner.setQueueId(0);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));        return msgInner;
    }

Step5:備份消息的原主題名稱(chēng)與原隊(duì)列ID,然后取消消息的事務(wù)消息標(biāo)簽,重新設(shè)置消息的主題為:RMQ_SYS_TRANS_HALF_TOPIC,隊(duì)列ID固定為0。然后調(diào)用MessageStore#putMessage方法將消息持久化,這里TransactionalMessageBridge橋接類(lèi),就是封裝事務(wù)消息的相關(guān)流程,最終調(diào)用MessageStore完成消息的持久化。消息入庫(kù)后,會(huì)繼續(xù)回到DefaultMQProducerImpl#sendMessageInTransaction,上文的Step2后面,也就是通過(guò)同步將消息發(fā)送到消息服務(wù)端。

注:這是事務(wù)消息Prepare狀態(tài)的處理邏輯,消息是存儲(chǔ)在消息服務(wù)器了,但存儲(chǔ)的并不是原主題,而是RMQ_SYS_TRANS_HALF_TOPIC,故此時(shí)消費(fèi)端是無(wú)法消費(fèi)shen
生產(chǎn)者發(fā)送的消息的??吹竭@里,如果對(duì)RocketMQ比較熟悉的話,肯定會(huì)有一個(gè)“定時(shí)任務(wù)”去取這個(gè)主題下的消息,然后則“合適”的時(shí)機(jī)將消息的主題恢復(fù)。

DefaultMQProducerImpl#sendMessageInTransactionswitch (sendResult.getSendStatus()) {            case SEND_OK: {                try {                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
                    String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);                    if (null != transactionId && !"".equals(transactionId)) {
                        msg.setTransactionId(transactionId);
                    }
                    localTransactionState = tranExecuter.executeLocalTransaction(msg, arg);                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        log.info(msg.toString());
                    }
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                }
            }            break;            case FLUSH_DISK_TIMEOUT:            case FLUSH_SLAVE_TIMEOUT:            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;                break;            default:                break;
        }

Step6:如果消息發(fā)送成功,會(huì)回調(diào)TransactionListener#executeLocalTransaction方法,執(zhí)行本地事務(wù),并且返回本地事務(wù)狀態(tài)為L(zhǎng)ocalTransactionState,枚舉值如下:

  • COMMIT_MESSAGE,

  • ROLLBACK_MESSAGE,

  • UNKNOW

注意:TransactionListener#executeLocalTransaction是在發(fā)送者成功發(fā)送PREPARED消息后,會(huì)執(zhí)行本地事務(wù)方法,然后返回本地事務(wù)狀態(tài);如果PREPARED消息發(fā)送失敗,則不會(huì)調(diào)用TransactionListener#executeLocalTransaction,并且本地事務(wù)消息,設(shè)置為L(zhǎng)ocalTransactionState.ROLLBACK_MESSAGE,表示消息需要被回滾。

DefaultMQProducerImpl#sendMessageInTransactiontry {this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}

step7:調(diào)用endTransaction方法結(jié)束事務(wù)(提交或回滾)。

DefaultMQProducerImpl#endTransaction
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());switch (localTransactionState) {    case COMMIT_MESSAGE:
         requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);         break;    case ROLLBACK_MESSAGE:
         requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);         break;     case UNKNOW:
         requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);         break;     default:         break;
}
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());

step8:組裝結(jié)束事務(wù)請(qǐng)求,主要參數(shù)為:事務(wù)ID、事務(wù)操作(commitOrRollback)、消費(fèi)組、消息隊(duì)列偏移量、消息ID,fromTransactionCheck,從這里發(fā)出的請(qǐng)求,默認(rèn)為false。Broker端的請(qǐng)求處理器為:EndTransactionProcessor。

step9:EndTransactionProcessor根據(jù)事務(wù)提交類(lèi)型:TRANSACTION_COMMIT_TYPE(提交事務(wù))、TRANSACTION_ROLLBACK_TYPE(回滾事務(wù))、TRANSACTION_NOT_TYPE(忽略該請(qǐng)求)。

到目前為止,已詳細(xì)梳理了RocketMQ事務(wù)消息的發(fā)送流程,更加準(zhǔn)確的說(shuō)是Prepare狀態(tài)的消息發(fā)送流程。具體流程如圖所示:
RocketMQ事務(wù)消息如何實(shí)現(xiàn)cdn.com/b0bfa9bc730e64d5d03312968f1a2b07132364f1.png">

本文到這里,初步展示了事務(wù)消息的發(fā)送流程,總的說(shuō)來(lái),RocketMQ的事務(wù)消息發(fā)送使用二階段提交思路,首先,在消息發(fā)送時(shí),先發(fā)送消息類(lèi)型為Prepread類(lèi)型的消息,然后在將該消息成功存入到消息服務(wù)器后,會(huì)回調(diào)TransactionListener#executeLocalTransaction,執(zhí)行本地事務(wù)狀態(tài)回調(diào)函數(shù),然后根據(jù)該方法的返回值,結(jié)束事務(wù):
1、COMMIT_MESSAGE :提交事務(wù)。
2、ROLLBACK_MESSAGE:回滾事務(wù)。
3、UNKNOW:未知事務(wù)狀態(tài),此時(shí)消息服務(wù)器(Broker)收到EndTransaction命令時(shí),將不對(duì)這種消息做處理,消息還處于Prepared類(lèi)型,存儲(chǔ)在主題為:RMQ_SYS_TRANS_HALF_TOPIC的隊(duì)列中,然后消息發(fā)送流程將結(jié)束,那這些消息如何提交或回滾呢?

為了實(shí)現(xiàn)避免客戶(hù)端需要再次發(fā)送提交、回滾命令,RocketMQ會(huì)采取定時(shí)任務(wù)將RMQ_SYS_TRANS_HALF_TOPIC中的消息取出,然后回到客戶(hù)端,判斷該消息是否需要提交或回滾,來(lái)完成事務(wù)消息的聲明周期。

感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“RocketMQ事務(wù)消息如何實(shí)現(xiàn)”這篇文章對(duì)大家有幫助,同時(shí)也希望大家多多支持億速云,關(guān)注億速云行業(yè)資訊頻道,更多相關(guān)知識(shí)等著你來(lái)學(xué)習(xí)!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI