溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

RocketMQ?producer同步發(fā)送和單向發(fā)送源碼分析

發(fā)布時(shí)間:2023-03-17 13:47:16 來源:億速云 閱讀:258 作者:iii 欄目:開發(fā)技術(shù)

這篇文章主要介紹“RocketMQ producer同步發(fā)送和單向發(fā)送源碼分析”的相關(guān)知識,小編通過實(shí)際案例向大家展示操作過程,操作方法簡單快捷,實(shí)用性強(qiáng),希望這篇“RocketMQ producer同步發(fā)送和單向發(fā)送源碼分析”文章能幫助大家解決問題。

RocketMQ生產(chǎn)者發(fā)送消息分為三種模式

RocketMQ生產(chǎn)者發(fā)送消息分為三種模式,分別是同步發(fā)送,異步發(fā)送和單向發(fā)送。

  • 單向發(fā)送,這個(gè)就是發(fā)送之后不用接收結(jié)果的,就是你發(fā)出去一個(gè)消息,然后就返回了,就算有結(jié)果返回也不會接收了,這是站在消息生產(chǎn)者的角度;

  • 同步發(fā)送的話,就是發(fā)出去一個(gè)消息,這個(gè)線程要等著它返回消息發(fā)送結(jié)果,然后你這個(gè)線程再根據(jù)這個(gè)消息發(fā)送結(jié)果再做一些業(yè)務(wù)操作等等;

  • 異步發(fā)送,這個(gè)就是在你發(fā)送消息之前要給一個(gè)callback,發(fā)送的時(shí)候,你這個(gè)線程就不用等著,該干什么就干什么,然后發(fā)送結(jié)果回來的時(shí)候,是由其他線程調(diào)用你這個(gè)callback來處理的,你可以把這個(gè)callback看作是一個(gè)回調(diào)函數(shù),回調(diào)方法,這個(gè)方法里面的業(yè)務(wù)邏輯就是你對這個(gè)消息發(fā)送結(jié)果的處理。注意,本文介紹的消息發(fā)送只是普通的消息發(fā)送,那種事務(wù)類型的消息,我們以后會有介紹。

1. 同步發(fā)送

producer同步發(fā)送消息的示例在org.apache.rocketmq.example.simple.Producer類中,代碼如下:

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // 1. 創(chuàng)建 DefaultMQProducer 對象
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        /*
         * Launch the instance.
         */
        // todo 2. 啟動(dòng) producer
        producer.start();
        for (int i = 0; i < 1000; i++) {
            try {
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                // 3. 發(fā)送消息
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } 
            ...
        }
        producer.shutdown();
    }
}

我們可以看到這個(gè)代碼,你是同步消息你是需要在你自己的業(yè)務(wù)線程里面接收這個(gè)sendResult的,然后在做一些業(yè)務(wù)處理,比如我這里就是打印了一下這個(gè)sendResult。

接下來我們看下它是怎樣發(fā)送的,這里是調(diào)用了這個(gè)producer的send方法。

@Override
public SendResult send(
    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // topic 和消息長度 校驗(yàn)
    Validators.checkMessage(msg, this);
    msg.setTopic(withNamespace(msg.getTopic()));
    // todo
    return this.defaultMQProducerImpl.send(msg);
}

我們可以看到,這個(gè) DefaultMQProducer 將這個(gè)消息給了defaultMQProducerImpl 這個(gè)實(shí)現(xiàn)的send方法來處理了。

public SendResult send(
    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // todo 默認(rèn)超時(shí)時(shí)間3s
    return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}

defaultMQProducerImpl的send方法,加了個(gè)超時(shí)時(shí)間 ,然后有調(diào)用它的重載方法send(msg,timeout)

public SendResult send(Message msg,
    long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // todo 同步模式
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

這個(gè)send(msg,timeout)又調(diào)用了sendDefaultImpl 方法,然后他這里加了個(gè)通信模式是同步,CommunicationMode.SYNC。

1.1 DefaultMQProducerImpl#sendDefaultImpl

sendDefaultImpl 方法就比較長了了我們分成幾部分來介紹:

private SendResult sendDefaultImpl(
    Message msg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // 判斷狀態(tài)是否是running
    this.makeSureStateOK();
    // 檢查消息合法性
    Validators.checkMessage(msg, this.defaultMQProducer);
    // 隨機(jī)的invokeID
    final long invokeID = random.nextLong();
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev = beginTimestampFirst;
    long endTimestamp = beginTimestampFirst;
    // todo 獲取topic信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    ...
}

這一小段代碼其實(shí)就是做了一些準(zhǔn)備檢查工作,注意第二行的個(gè)檢查消息合法性,它要檢查你topic,消息長度的,你不能發(fā)空消息,消息長度也不能太長,默認(rèn)是不超過4m,接下來這些就是記錄一下時(shí)間了,再看最后一行,就是根據(jù)你這個(gè)消息發(fā)送的topic,然后獲取topic 發(fā)送消息的這么一個(gè)信息,這里面就有這topic 有幾個(gè)MessageQueue,然后每個(gè)MessageQueue對應(yīng)在哪個(gè)broker上面,broker 的地址又是啥的,它這個(gè)方法會先從本地的一個(gè)緩存中獲取下,沒有的話就從nameserv更新下這個(gè)本地緩存,再找找,要是再找不到,它就認(rèn)為你沒有這個(gè)topic了,然后就去nameserv上面拉取一個(gè)默認(rèn)topic的一些配置信息給你用(這個(gè)其實(shí)就是在新建一個(gè)topic)。 接著這個(gè)方法往下看,接著就是判斷 這個(gè)TopicPublishInfo 是否存在了,如果不存在的話就拋出異常了,沒有后續(xù)了就,如果存在的話:

...
if (topicPublishInfo != null && topicPublishInfo.ok()) {
    boolean callTimeout = false;
    MessageQueue mq = null;
    Exception exception = null;
    SendResult sendResult = null;
    // 重試次數(shù) 區(qū)分同步、其他
    int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
    int times = 0;
    // 存放發(fā)送過的broker name
    String[] brokersSent = new String[timesTotal];
    // 重試發(fā)送
    for (; times < timesTotal; times++) {
        String lastBrokerName = null == mq ? null : mq.getBrokerName();
        // todo 選擇message queue
        MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
        if (mqSelected != null) {
            mq = mqSelected;
            brokersSent[times] = mq.getBrokerName();
            try {
                beginTimestampPrev = System.currentTimeMillis();
                if (times > 0) {
                    //Reset topic with namespace during resend.
                    msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                }
                long costTime = beginTimestampPrev - beginTimestampFirst;
                if (timeout < costTime) {
                    callTimeout = true;
                    break;
                }
                // todo 進(jìn)行發(fā)送
                sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                endTimestamp = System.currentTimeMillis();
                // todo isolation 參數(shù)為false(看一下異常情況)
                this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                switch (communicationMode) {
                    case ASYNC:
                        return null;
                    case ONEWAY:
                        return null;
                    case SYNC:
                        if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                            if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                continue;
                            }
                        }
                        return sendResult;
                    default:
                        break;
                }
            } catch (RemotingException e) {
                endTimestamp = System.currentTimeMillis();
                this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                log.warn(msg.toString());
            ...

其實(shí)下面還有許多處理異常的操作沒有放上,不過不影響我們的主流程,先是判斷你這個(gè)通信模式,如果是同步的話,默認(rèn)重試次數(shù)就是2 ,然后加上本身這次請求,也就是最查請求3次。這個(gè)for循環(huán)就是失敗重試的代碼,再看下代碼selectOneMessageQueue這個(gè)就是選擇一個(gè)MesssageQueue的方法了,這個(gè)是比較重要的,這里我們先不說,你可以把它理解為 我們的負(fù)載均衡。接著往下走,就是判斷一下時(shí)間了,計(jì)算一下剩下的時(shí)間, 如果這一堆前面的內(nèi)容耗時(shí)很長,然后已經(jīng)超了之前設(shè)置的默認(rèn)超時(shí)時(shí)間,這個(gè)時(shí)候就會超時(shí)了,然后將這個(gè)calltimeout設(shè)置成true了。

1.2 DefaultMQProducerImpl#sendKernelImpl

接著就是進(jìn)行發(fā)送了調(diào)用sendKernelImpl 方法:

private SendResult sendKernelImpl(final Message msg,
    final MessageQueue mq,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final TopicPublishInfo topicPublishInfo,
    final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    long beginStartTime = System.currentTimeMillis();
    // 根據(jù)MessageQueue獲取Broker的網(wǎng)絡(luò)地址
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    if (null == brokerAddr) {
        tryToFindTopicPublishInfo(mq.getTopic());
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    }
    SendMessageContext context = null;
    ...

這個(gè)sendKernelImpl 也是有點(diǎn)長,然后我們一部分一部分的看下,這就是根據(jù)MessageQueue里面的broker name 獲取一下broker addr,他這個(gè)broker addr 選的是master的,比如說我們 broker使用的是 master/slave 高可用架構(gòu),這個(gè)時(shí)候只會選擇那個(gè)master,畢竟是往里面寫消息,然后只能用master,等到介紹消息消費(fèi)者的時(shí)候,消息消費(fèi)者是可以向slave node 獲取消息消費(fèi)的,前提是 master 負(fù)載比較大,然后消息消費(fèi)者下次獲取消費(fèi)的消息已經(jīng)在slave里面了,然后消息消費(fèi)者獲取到消息之后,它里面有個(gè)字段是告訴你下次可以去xxx 地址的broker 拉取消息,這個(gè)我們介紹到消息消費(fèi)者的時(shí)候再說。

接著回來,如果沒有獲取到這個(gè)broker 地址的話,就是去nameserv上更新下本地緩存,然后再獲取下。接著再往下就是再次判斷一下這個(gè)broker addr 了,如果還沒有就拋出異常,如果有的話 就執(zhí)行下面的代碼了:

...
SendMessageContext context = null;
if (brokerAddr != null) {
    brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
    byte[] prevBody = msg.getBody();
    try {
        //for MessageBatch,ID has been set in the generating process
        // 給消息設(shè)置全局唯一id, 對于MessageBatch在生成過程中已設(shè)置了id
        if (!(msg instanceof MessageBatch)) {
            MessageClientIDSetter.setUniqID(msg);
        }
        boolean topicWithNamespace = false;
        if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
            msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
            topicWithNamespace = true;
        }
        int sysFlag = 0;
        // 消息體是否壓縮
        boolean msgBodyCompressed = false;
        // 壓縮消息 內(nèi)容部分超了4k就會壓縮
        if (this.tryToCompressMessage(msg)) {
            sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
            msgBodyCompressed = true;
        }
        final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
            sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
        }
        // 判斷有沒有hook
        if (hasCheckForbiddenHook()) {
            CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
            checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
            checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
            checkForbiddenContext.setCommunicationMode(communicationMode);
            checkForbiddenContext.setBrokerAddr(brokerAddr);
            checkForbiddenContext.setMessage(msg);
            checkForbiddenContext.setMq(mq);
            checkForbiddenContext.setUnitMode(this.isUnitMode());
            // 執(zhí)行Forbidden 鉤子
            this.executeCheckForbiddenHook(checkForbiddenContext);
        }
        ...

第一句,這個(gè)其實(shí)就是進(jìn)行一個(gè)vip通道地址的轉(zhuǎn)換,這個(gè)比較有意思,如果你這個(gè)支持vip channel的話,它會把broker addr 里面的端口改變一下,這個(gè)所謂的vip channel ,其實(shí)就是與它的另一個(gè)端口建立連接,這個(gè)端口就是當(dāng)前端口-2 ;

接著,如果這個(gè)消息不是批量消息的話,我們就給這個(gè)消息設(shè)置一個(gè)唯一的消息id,再往下就是 sysflag的處理了,這個(gè)sysflag里面記錄了好幾個(gè)屬性值,使用二進(jìn)制來處理的,比如說消息是否壓縮了(這個(gè)壓縮,就是你消息內(nèi)容超過了默認(rèn)的4k之后,就會進(jìn)行壓縮,這個(gè)壓縮的閾值你是可以配置的),是否是個(gè)事務(wù)消息等等。 接下來就是執(zhí)行hook了,這個(gè)hook就是forbidenHook ,其實(shí)就是對消息進(jìn)行過濾。

...
if (this.hasSendMessageHook()) {
    context = new SendMessageContext();
    context.setProducer(this);
    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    context.setCommunicationMode(communicationMode);
    context.setBornHost(this.defaultMQProducer.getClientIP());
    context.setBrokerAddr(brokerAddr);
    context.setMessage(msg);
    context.setMq(mq);
    context.setNamespace(this.defaultMQProducer.getNamespace());
    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (isTrans != null && isTrans.equals("true")) {
        context.setMsgType(MessageType.Trans_Msg_Half);
    }
    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
        context.setMsgType(MessageType.Delay_Msg);
    }
    this.executeSendMessageHookBefore(context);
}
// 封裝消息頭
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
// 設(shè)置group
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
// topic
requestHeader.setTopic(msg.getTopic());
// 設(shè)置默認(rèn)topic
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
// 設(shè)置默認(rèn)topic的隊(duì)列數(shù)量 默認(rèn)4個(gè)
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
// 隊(duì)列id
requestHeader.setQueueId(mq.getQueueId());
// 消息系統(tǒng)標(biāo)記
requestHeader.setSysFlag(sysFlag);
// 消息發(fā)送時(shí)間
requestHeader.setBornTimestamp(System.currentTimeMillis());
// 消息標(biāo)記(RocketMQ對消息標(biāo)記不做任何處理,供應(yīng)用程序使用)
requestHeader.setFlag(msg.getFlag());
// 設(shè)置擴(kuò)展屬性
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
// 是否批量
requestHeader.setBatch(msg instanceof MessageBatch);
// 判斷消息是否是 %RETRY% 開頭
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
    if (reconsumeTimes != null) {
        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
    }
    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
    if (maxReconsumeTimes != null) {
        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
    }
}
...

在往下就是執(zhí)行一下發(fā)送消息之前的hook,再往下就是封裝發(fā)送消息請求頭,然后這個(gè)請求頭里面就涵蓋了很多的參數(shù),比如說topic,MessageQueue 隊(duì)列Id, 出生日期,flag等等。再往下就是消息發(fā)送了

...
SendResult sendResult = null;
// 同步 異步  單向
switch (communicationMode) {
    // 異步
    case ASYNC:
        Message tmpMessage = msg;
        boolean messageCloned = false;
        if (msgBodyCompressed) {
            //If msg body was compressed, msgbody should be reset using prevBody.
            //Clone new message using commpressed message body and recover origin massage.
            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
            tmpMessage = MessageAccessor.cloneMessage(msg);
            messageCloned = true;
            msg.setBody(prevBody);
        }
        if (topicWithNamespace) {
            if (!messageCloned) {
                tmpMessage = MessageAccessor.cloneMessage(msg);
                messageCloned = true;
            }
            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
        }
        // 判斷超時(shí)時(shí)間
        long costTimeAsync = System.currentTimeMillis() - beginStartTime;
        if (timeout < costTimeAsync) {
            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
        }
        // todo
        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
            brokerAddr,
            mq.getBrokerName(),
            tmpMessage,
            requestHeader,
            timeout - costTimeAsync,
            communicationMode,
            sendCallback,
            topicPublishInfo,
            this.mQClientFactory,
            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
            context,
            this);
        break;
        // 單向
    case ONEWAY:
        // 同步
    case SYNC:
        long costTimeSync = System.currentTimeMillis() - beginStartTime;
        // 判判是否超時(shí)
        if (timeout < costTimeSync) {
            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
        }
        // todo 交給 mq api去發(fā)送消息
        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
            brokerAddr,
            mq.getBrokerName(),
            msg,
            requestHeader,
            timeout - costTimeSync,
            communicationMode,
            context,
            this);
        break;
    default:
        assert false;
        break;
}
// 是否注冊了消息發(fā)送鉤子函數(shù)
if (this.hasSendMessageHook()) {
    context.setSendResult(sendResult);
    this.executeSendMessageHookAfter(context);
}
...

因?yàn)楸拘」?jié)主要是介紹下這個(gè)同步發(fā)送消息,然后我們就主要介紹下這個(gè)sync的代碼邏輯: 首先是判斷超時(shí),然后交給 MQClientAPI層去處理,然后返回sendResult。

1.3 MQClientAPIImpl#sendMessage

我們這里接著看下MQClientAPIImpl里面的sendMessage 實(shí)現(xiàn):

public SendResult sendMessage(
    final String addr,
    final String brokerName,
    final Message msg,
    final SendMessageRequestHeader requestHeader,
    final long timeoutMillis,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final TopicPublishInfo topicPublishInfo,
    final MQClientInstance instance,
    final int retryTimesWhenSendFailed,
    final SendMessageContext context,
    final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
    long beginStartTime = System.currentTimeMillis();
    RemotingCommand request = null;
    String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
    boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
    if (isReply) {
        if (sendSmartMsg) {
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
        } else {
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
        }
    } else {
        // sendSmartMsg默認(rèn)開啟,也算一種優(yōu)化吧 批量消息
        if (sendSmartMsg || msg instanceof MessageBatch) {
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
        } else {
            // 普通消息
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
        }
    }
    // 設(shè)置消息體
    request.setBody(msg.getBody());
    switch (communicationMode) {
        case ONEWAY:
            // todo
            this.remotingClient.invokeOneway(addr, request, timeoutMillis);
            return null;
        case ASYNC:
            final AtomicInteger times = new AtomicInteger();
            long costTimeAsync = System.currentTimeMillis() - beginStartTime;
            // 判斷超時(shí)時(shí)間
            if (timeoutMillis < costTimeAsync) {
                throw new RemotingTooMuchRequestException("sendMessage call timeout");
            }
            // todo
            this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
                retryTimesWhenSendFailed, times, context, producer);
            return null;
        case SYNC:
            long costTimeSync = System.currentTimeMillis() - beginStartTime;
            // 判斷超時(shí)時(shí)間
            if (timeoutMillis < costTimeSync) {
                throw new RemotingTooMuchRequestException("sendMessage call timeout");
            }
            // todo 同步發(fā)送
            return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
        default:
            assert false;
            break;
    }
    return null;
}

這里先生成一個(gè)RemotingCommand 這么個(gè)實(shí)體對象,然后RequestCode就是SEND_MESSAGE,其實(shí)這里判斷了一下sendSmartMsg 這個(gè)參數(shù),把requestHeader優(yōu)化了一下,然后換成了requestHeaderV2,其實(shí)這個(gè)requestHeaderV2 內(nèi)容跟requestHeader一樣,但是變量名是單個(gè)字母的,然后序列化,反序列化,傳輸內(nèi)容都有所優(yōu)化,其實(shí)他這個(gè)序列化使用是json形式的,然后想想就知道有些哪些好處了, 唯一的缺點(diǎn)就是可讀性差點(diǎn),但是這個(gè)玩意是對用戶透明的,用戶不需要關(guān)心。

接著就是判斷通信類型,然后發(fā)送消息了,這里是同步發(fā)送,先是判斷一下超時(shí)時(shí)間,接著就是調(diào)用sendMessageSync 進(jìn)行同步發(fā)送了,我們接著來看下這個(gè)sendMessageSync 方法實(shí)現(xiàn)。

1.4 MQClientAPIImpl#sendMessageSync

private SendResult sendMessageSync(
    final String addr,
    final String brokerName,
    final Message msg,
    final long timeoutMillis,
    final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
    // todo 同步調(diào)用
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
    assert response != null;
    // 處理響應(yīng)
    return this.processSendResponse(brokerName, msg, response,addr);
}

這里就調(diào)用到了client 模塊(這個(gè)client其實(shí)就是直接操作netty了)來處理了,然后返回響應(yīng),調(diào)用processSendResponse 方法來處理響應(yīng)。

1.5 NettyRemotingClient#invokeSync

我們再來看下client的 invokeSync 方法:

public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
    throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
    // 開始時(shí)間
    long beginStartTime = System.currentTimeMillis();
    // todo 輪詢獲取namesrv地址Channel
    final Channel channel = this.getAndCreateChannel(addr);
    if (channel != null && channel.isActive()) {
        try {
            // 執(zhí)行開始之前的rpchook
            doBeforeRpcHooks(addr, request);
            long costTime = System.currentTimeMillis() - beginStartTime;
            // 判斷超時(shí) 之前有獲取鏈接的操作,可能會出現(xiàn)超時(shí)的情況
            if (timeoutMillis < costTime) {
                throw new RemotingTimeoutException("invokeSync call timeout");
            }
            // todo 進(jìn)行同步執(zhí)行,獲取響應(yīng)
            RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
            // 執(zhí)行之后的rpchook
            doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
            return response;
            // 遠(yuǎn)程發(fā)送請求異常
        } catch (RemotingSendRequestException e) {
            log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
            // 關(guān)閉channel
            this.closeChannel(addr, channel);
            throw e;
            // 超時(shí)異常
        } catch (RemotingTimeoutException e) {
            // 如果超時(shí) 就關(guān)閉cahnnel話,就關(guān)閉channel 默認(rèn)是不關(guān)閉的
            if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                this.closeChannel(addr, channel);
                log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
            }
            log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
            throw e;
        }
    } else {
        this.closeChannel(addr, channel);
        throw new RemotingConnectException(addr);
    }
}

這里有兩個(gè)點(diǎn)需要關(guān)注下,首先是根據(jù)broker addr 這個(gè)地址獲取一下對應(yīng)的channel ,如果不存在的話就創(chuàng)建一下這個(gè)連接, 稍微看下這塊的代碼:

private Channel getAndCreateChannel(final String addr) throws RemotingConnectException, InterruptedException {
    // 如果地址不存在,就返回namesrv 的channel
    if (null == addr) {
        return getAndCreateNameserverChannel();
    }
    ChannelWrapper cw = this.channelTables.get(addr);
    if (cw != null && cw.isOK()) {
        return cw.getChannel();
    }
    // 創(chuàng)建channel
    return this.createChannel(addr);
}

如果你這個(gè)addr是空的話,這個(gè)就是默認(rèn)找nameserv的addr ,然后找對應(yīng)channel就可以了,如果不是null ,然后它會去這個(gè)channelTable 這個(gè)map中去找,如果沒有的話就創(chuàng)建一個(gè)對應(yīng)的channel

接著回到這個(gè)invokeSync 方法中,獲得channel之后,就是執(zhí)行一下rpcHook了,這東西就是你在創(chuàng)建MQProducer的時(shí)候設(shè)置的,在調(diào)用前執(zhí)行一次,調(diào)用后執(zhí)行一次,其實(shí)你就可以通過這個(gè)hook來實(shí)現(xiàn)很多功能,監(jiān)控的功能比較多些。接著就是調(diào)用了invokeSyncImpl 這個(gè)實(shí)現(xiàn)方法來發(fā)送消息了,這個(gè)方法是它的一個(gè)父類里面的:

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
    final long timeoutMillis)
    throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
    // 獲取 請求id
    final int opaque = request.getOpaque();
    try {
        // 創(chuàng)建ResponseFuture
        final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
        // 放入responseTable 表中
        this.responseTable.put(opaque, responseFuture);
        // 獲取遠(yuǎn)程地址
        final SocketAddress addr = channel.remoteAddress();
        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
                // 成功
                if (f.isSuccess()) {
                    responseFuture.setSendRequestOK(true);
                    return;
                // 失敗
                } else {
                    responseFuture.setSendRequestOK(false);
                }
                // 移除response中的緩存
                responseTable.remove(opaque);
                responseFuture.setCause(f.cause());
                responseFuture.putResponse(null);
                log.warn("send a request command to channel <" + addr + "> failed.");
            }
        });
        RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
        if (null == responseCommand) {
            // 成功了還是null  還是超時(shí)
            if (responseFuture.isSendRequestOK()) {
                throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                    responseFuture.getCause());
            } else {
                // 沒發(fā)出去,就排除異常
                throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
            }
        }
        // 返回響應(yīng)結(jié)果
        return responseCommand;
    } finally {
        // 移除
        this.responseTable.remove(opaque);
    }
}

這個(gè)方法其實(shí)就是最終往 channel里面寫內(nèi)容的方法了,我們來看下,先是為這次request創(chuàng)建一個(gè)id 吧,這個(gè)id主要用來返回響應(yīng)的時(shí)候用的。

接著創(chuàng)建一個(gè)ResposeFuture ,這個(gè)東西異步,同步都可以用,這個(gè)一會介紹一下它的原理,接著就是將這個(gè)id 與這個(gè) ResposeFuture 關(guān)聯(lián)起來放到這個(gè) responseTable 里面的, 接著就是往channel里面發(fā)送消息了,這里它添加一個(gè)listener ,這listener的執(zhí)行時(shí)機(jī)就是發(fā)送出去的時(shí)候,最后就是等待這個(gè)響應(yīng)了。

我們來解釋下這個(gè)ResposeFuture 原理, 當(dāng)執(zhí)行了responseFuture.waitResponse(timeoutMillis); 這行代碼,當(dāng)前線程就會wait ,然后被阻塞,然后等著響應(yīng)回來的時(shí)候,netty處理響應(yīng)的線程會從響應(yīng)里面獲取一下這個(gè)opaque這個(gè)id,就是請求之前在request生成的,broker 在響應(yīng)的時(shí)候會會把這個(gè)id 放回到response 中, 然后會根據(jù)這個(gè)opaque 從responseTable中找到這個(gè) ResposeFuture ,然后把響應(yīng)設(shè)置到這個(gè)里面,最后喚醒一下wait在這個(gè)對象里面的線程就可以了,這樣你這個(gè)業(yè)務(wù)線程就得到了這個(gè)RemotingResponse 了。 好了,到這我們就解釋清楚了,然后我們看下他這個(gè)代碼是怎樣實(shí)現(xiàn)的:

public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
    final int opaque = cmd.getOpaque();
    // 獲取對應(yīng)id 的responseFuture
    final ResponseFuture responseFuture = responseTable.get(opaque);
    if (responseFuture != null) {
        // 設(shè)置
        responseFuture.setResponseCommand(cmd);
        // 從響應(yīng)表中移除
        responseTable.remove(opaque);
        if (responseFuture.getInvokeCallback() != null) {
            // todo 執(zhí)行回調(diào)
            executeInvokeCallback(responseFuture);
        } else {
            responseFuture.putResponse(cmd);
            responseFuture.release();
        }
    } else {
        log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
        log.warn(cmd.toString());
    }
}

不過它這個(gè)ResposeFuture 是使用CountDownLatch 來實(shí)現(xiàn)這個(gè)wait與喚醒的。我們來具體看下這個(gè) waitResponse方法與這個(gè)putResponse方法:

public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
    this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
    return this.responseCommand;
}
public void putResponse(final RemotingCommand responseCommand) {
    this.responseCommand = responseCommand;
    this.countDownLatch.countDown();
}

2. 單向發(fā)送

單向發(fā)送其實(shí)這塊跟同步發(fā)送的流程差不多,我們來看下它的生產(chǎn)者代碼是怎樣寫的: org.apache.rocketmq.example.openmessaging.SimpleProducer:

public static void main(String[] args) {
    final MessagingAccessPoint messagingAccessPoint =
        OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
    final Producer producer = messagingAccessPoint.createProducer();
    messagingAccessPoint.startup();
    System.out.printf("MessagingAccessPoint startup OK%n");
    producer.startup();
    System.out.printf("Producer startup OK%n");
    ...
    {
        producer.sendOneway(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
        System.out.printf("Send oneway message OK%n");
    }
   ...
}

可以看到我們最后發(fā)送的時(shí)候調(diào)用的是sendOneway方法,這個(gè)方法是沒有返回值的。

public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
    msg.setTopic(withNamespace(msg.getTopic()));
    this.defaultMQProducerImpl.sendOneway(msg);
}

2.1 DefaultMQProducerImpl#sendOneway

這里就是調(diào)用了defaultMQProducerImpl的 sendOneway方法

public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
    msg.setTopic(withNamespace(msg.getTopic()));
    this.defaultMQProducerImpl.sendOneway(msg);
}

這里需要注意的是它也是調(diào)用了sendDefaultImpl 方法,然后通信方式是oneway 。這里我們就不細(xì)說了,可以看下同步方法解析這個(gè)方法的說明,這里唯一要提一點(diǎn)是單向發(fā)送是沒有這個(gè)重試的,然后就發(fā)送一次。下面的流程都是一樣的,然后就到了這個(gè)MQClientAPIImpl 的 sendMessage 方法

...
switch (communicationMode) {
    case ONEWAY:
        // todo
        this.remotingClient.invokeOneway(addr, request, timeoutMillis);
        return null;
    ...

然后他這個(gè)是又調(diào)用了NettyRemotingClient 的 invokeOneway 方法:

public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
    RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
    final Channel channel = this.getAndCreateChannel(addr);
    if (channel != null &amp;&amp; channel.isActive()) {
        try {
            doBeforeRpcHooks(addr, request);
            this.invokeOnewayImpl(channel, request, timeoutMillis);
        } catch (RemotingSendRequestException e) {
            log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
            this.closeChannel(addr, channel);
            throw e;
        }
    } else {
        this.closeChannel(addr, channel);
        throw new RemotingConnectException(addr);
    }
}

這里也是根據(jù)broker addr 獲取channel, 如果沒有的話,也是創(chuàng)建一個(gè),接著就是執(zhí)行這個(gè)rpc調(diào)用前的hook ,注意這里沒有調(diào)用后的一個(gè)hook,因?yàn)槲覀儾⒉恢浪鞘裁辞闆r。 接著又調(diào)用了invokeOnewayImpl 方法:

public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
    throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
    // 請求體, 標(biāo)記是一個(gè)單向調(diào)用
    request.markOnewayRPC();
    // 獲取憑證
    boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
    if (acquired) {
        final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
        try {
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    // 釋放信號量
                    once.release();
                    if (!f.isSuccess()) {
                        log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
                    }
                }
            });
        } catch (Exception e) {
            // 釋放信號量
            once.release();
            log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
            throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
        }
    } else {
        if (timeoutMillis <= 0) {
            throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
        } else {
            String info = String.format(
                "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                timeoutMillis,
                this.semaphoreOneway.getQueueLength(),
                this.semaphoreOneway.availablePermits()
            );
            log.warn(info);
            throw new RemotingTimeoutException(info);
        }
    }
}

這里使用了semaphore進(jìn)行限流,然后默認(rèn)的話是同時(shí)支持65535 個(gè)請求發(fā)送的,這個(gè)semaphore 限流只有單向發(fā)送與這個(gè)異步發(fā)送會有,接著就會將這個(gè)request寫入channel中,然后add了一個(gè)listener ,這個(gè)listener執(zhí)行時(shí)機(jī)就是消息發(fā)送出去了,這個(gè)時(shí)候就會釋放 信號量。到這我們這個(gè)單向發(fā)送就解析完成了。

關(guān)于“RocketMQ producer同步發(fā)送和單向發(fā)送源碼分析”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識,可以關(guān)注億速云行業(yè)資訊頻道,小編每天都會為大家更新不同的知識點(diǎn)。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI