溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

RocketMq進(jìn)階源碼學(xué)習(xí)之如何實(shí)現(xiàn)生產(chǎn)者發(fā)送消息

發(fā)布時(shí)間:2021-10-15 13:43:18 來(lái)源:億速云 閱讀:134 作者:iii 欄目:編程語(yǔ)言

本篇內(nèi)容介紹了“RocketMq進(jìn)階源碼學(xué)習(xí)之如何實(shí)現(xiàn)生產(chǎn)者發(fā)送消息”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

RocketMq進(jìn)階源碼學(xué)習(xí)之生產(chǎn)者發(fā)送消息篇

在RocketMq的生產(chǎn)端這塊,最重要的自然是發(fā)送消息了,生產(chǎn)者發(fā)送消息有同步/異步/單向3種模式,每種模式的處理方式也都也都各不相同,最重要的是同步/異步的處理方式,單向應(yīng)用場(chǎng)景較少(一般適用于對(duì)消息可靠性要求不高的場(chǎng)景,如發(fā)送日志),本文將主要分析同步/異步.

老規(guī)矩,從實(shí)例開始,一個(gè)最簡(jiǎn)單的發(fā)送消息代碼

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
Message msg=new Message();
//同步發(fā)送
SendResult sendResult = producer.send(msg);
// 異步發(fā)送消息, 發(fā)送結(jié)果通過(guò)callback返回給客戶端。
producer.sendAsync(msg, new SendCallback() {
    @Override
    public void onSuccess(final SendResult sendResult) {
        // 消息發(fā)送成功。
        System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
    }

    @Override
    public void onException(OnExceptionContext context) {
        // 消息發(fā)送失敗,需要進(jìn)行重試處理,可重新發(fā)送這條消息或持久化這條數(shù)據(jù)進(jìn)行補(bǔ)償處理。
        System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
    }
});

先看同步的時(shí)候,進(jìn)入send方法,經(jīng)過(guò)N個(gè)send的鏈?zhǔn)秸{(diào)用,進(jìn)入真正的邏輯處理方法.再回頭看異步發(fā)送的時(shí)候,其實(shí)除了一開始傳入一個(gè)SendCallback,在前幾個(gè)鏈?zhǔn)秸{(diào)用中明確CommunicationMode是Async之外,最終都是進(jìn)入接下來(lái)的這個(gè)方法,無(wú)論是Async/Sync/Oneway都統(tǒng)一在這里處理

這里的處理是 1.先檢查producer是否已經(jīng)啟動(dòng) 2.再校驗(yàn)message的內(nèi)容是否合規(guī)(比如Topic是否為空,Topic字符是否合法等等,這個(gè)字符校驗(yàn)上我就犯過(guò)一次錯(cuò),Topic的內(nèi)容不能含有空格,切記檢查清楚配置文件的brokerClusterName,在回應(yīng)request消息的時(shí)候會(huì)默認(rèn)設(shè)置Topic的值上包含clusterName) 3.根據(jù)Topic的值去查找Topic的路由信息 4.檢查發(fā)送模式,如果是同步則只發(fā)送一次,異步則會(huì)在發(fā)送失敗時(shí)進(jìn)行重試,最多嘗試配置的重試次數(shù)+1 5.從Topic的多個(gè)MessageQueue中選中一個(gè)發(fā)送消息

    private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback/**sendCallback從頭貫徹到尾,是為了接收真正發(fā)送消息的結(jié)果 */,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        /** 檢查producer是否已經(jīng)啟動(dòng)*/
        this.makeSureStateOK();
        /** 校驗(yàn)一下message的內(nèi)容*/
        Validators.checkMessage(msg, this.defaultMQProducer);
        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        /** 查找topic的信息*/
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            /**異步就最多可以重試1+配置的重試次數(shù),同步則最多只嘗試一次*/
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            /**記錄嘗試發(fā)送過(guò)哪些broker*/
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                /**所以發(fā)送消息的負(fù)載均衡實(shí)在客戶端實(shí)現(xiàn)的,默認(rèn)是從Topic的所有的messageQueue中指定一個(gè)
                 * 這里使用了ThreadLocal,將messageQueue的index存在threadLocal中,這樣之后如果重試消息,
                 * 還能拿到之前的index,然后基于之前的選擇的messageQueue重新選擇,默認(rèn)是每次index+1*/
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        if (times > 0) {
                            //Reset topic with namespace during resend.
                      msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                        }
                        long costTime = beginTimestampPrev - beginTimestampFirst;
                        if (timeout < costTime) {
                            callTimeout = true;
                            break;
                        }

                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                return sendResult;
                            default:
                                break;
                        }
                    } catch (RemotingException e) {
                .........刪掉了很多對(duì)于exception的處理,對(duì)于此處的源碼理解無(wú)意義
            if (sendResult != null) {
                return sendResult;
            }

            String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
                times,
                System.currentTimeMillis() - beginTimestampFirst,
                msg.getTopic(),
                Arrays.toString(brokersSent));

            info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

            MQClientException mqClientException = new MQClientException(info, exception);
            if (callTimeout) {
                throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
            }

        validateNameServerSetting();

        throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
    }

這里繼續(xù)跟蹤sendKernelImpl方法,從方法名看,這是發(fā)送消息的內(nèi)核實(shí)現(xiàn)

1.根據(jù)上面?zhèn)飨聛?lái)的MessageQueue查找broker的ip,如果找不到這個(gè)broker的ip就重新另選一個(gè)MessageQueue(其實(shí)broker還有一個(gè)vip端口,挺有意思的)ps:果然哪里都有VIP啊:) 2.處理消息壓縮(異步發(fā)送的對(duì)情況下會(huì)對(duì)壓縮做特殊的處理,看下面注釋) 3.判斷是否是事務(wù)消息 4.執(zhí)行鉤子函數(shù) 5.組裝請(qǐng)求頭 6.根據(jù)消息發(fā)送模式選擇不同的發(fā)送策略

    private SendResult sendKernelImpl(final Message msg,
        final MessageQueue mq,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        /**根據(jù)brokerName找到broker的ip地址,如果這個(gè)messageQueue的broker找不到就
         * 重新?lián)Q一個(gè)messageQueue,再找broker地址*/
        if (null == brokerAddr) {
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

        SendMessageContext context = null;
        if (brokerAddr != null) {
            /**broker在啟動(dòng)的時(shí)候會(huì)啟動(dòng)兩個(gè)端口監(jiān)聽,一個(gè)是默認(rèn)port10911,一個(gè)是port-2
             * 也就是vip端口,這里在這里檢測(cè)是否要發(fā)往vip端口,將
             * vip端口只接收生產(chǎn)者發(fā)送請(qǐng)求,不接收消費(fèi)者的拉取
             * broker監(jiān)聽兩個(gè)端口的目的是默認(rèn)端口承載所有網(wǎng)絡(luò)請(qǐng)求,如果有時(shí)候請(qǐng)求非常繁忙,broker端所有I/O線程都在忙
             * 導(dǎo)致后續(xù)網(wǎng)絡(luò)請(qǐng)求進(jìn)入隊(duì)列,從而導(dǎo)致消息請(qǐng)求執(zhí)行緩慢,這種情況下就可以選擇將消息發(fā)送到vip端口*/
            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
            byte[] prevBody = msg.getBody();
            try {
                //for MessageBatch,ID has been set in the generating process
                if (!(msg instanceof MessageBatch)) {
                    MessageClientIDSetter.setUniqID(msg);
                }

                boolean topicWithNamespace = false;
                if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
                    msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
                    topicWithNamespace = true;
                }

                int sysFlag = 0;
                /**壓縮消息
                 * 除非是batchMsg或者壓縮的時(shí)候報(bào)錯(cuò)了,不然百分百會(huì)壓縮成功*/
                boolean msgBodyCompressed = false;
                if (this.tryToCompressMessage(msg)) {
                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                    msgBodyCompressed = true;

                }
                /**判斷是否是事務(wù)消息*/
                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                }
                /**擴(kuò)展鉤子接口,用戶可以自定義擴(kuò)展接口*/
                if (hasCheckForbiddenHook()) {
                    CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                    checkForbiddenContext.setCommunicationMode(communicationMode);
                    checkForbiddenContext.setBrokerAddr(brokerAddr);
                    checkForbiddenContext.setMessage(msg);
                    checkForbiddenContext.setMq(mq);
                    checkForbiddenContext.setUnitMode(this.isUnitMode());
                    this.executeCheckForbiddenHook(checkForbiddenContext);
                }
                /**擴(kuò)展鉤子接口,在發(fā)送消息之前執(zhí)行*/
                if (this.hasSendMessageHook()) {
                    context = new SendMessageContext();
                    context.setProducer(this);
                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                    context.setCommunicationMode(communicationMode);
                    context.setBornHost(this.defaultMQProducer.getClientIP());
                    context.setBrokerAddr(brokerAddr);
                    context.setMessage(msg);
                    context.setMq(mq);
                    context.setNamespace(this.defaultMQProducer.getNamespace());
                    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    if (isTrans != null && isTrans.equals("true")) {
                        context.setMsgType(MessageType.Trans_Msg_Half);
                    }

                    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                        context.setMsgType(MessageType.Delay_Msg);
                    }
                    this.executeSendMessageHookBefore(context);
                }
                /**組裝發(fā)送消息的請(qǐng)求頭*/
                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                requestHeader.setTopic(msg.getTopic());
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                requestHeader.setQueueId(mq.getQueueId());
                requestHeader.setSysFlag(sysFlag);
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                requestHeader.setFlag(msg.getFlag());
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                requestHeader.setBatch(msg instanceof MessageBatch);
                /**如果是重試topic,設(shè)置消息重試次數(shù)相關(guān)屬性*/
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if (reconsumeTimes != null) {
                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                    }

                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                    if (maxReconsumeTimes != null) {
                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                    }
                }

                SendResult sendResult = null;
                /**
                 * 如果是異步發(fā)送就會(huì)對(duì)壓縮消息做點(diǎn)特殊處理,同步不會(huì)
                 */
                switch (communicationMode) {
                    case ASYNC:
                        Message tmpMessage = msg;
                        boolean messageCloned = false;
                        /** 如果消息被壓縮過(guò),那就將msgBody設(shè)置為壓縮之前的body,
                         * 使用clone的消息進(jìn)行發(fā)送,防止消息重試消費(fèi)時(shí)被反復(fù)壓縮*/
                        if (msgBodyCompressed) {
                            //If msg body was compressed, msgbody should be reset using prevBody.
                            //Clone new message using commpressed message body and recover origin massage.
                            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                            /** 真正發(fā)送的還是壓縮過(guò)后的消息,只是克隆出來(lái)的壓縮過(guò)后的消息
                             * 克隆之后在把原始消息的body設(shè)置為未壓縮的狀態(tài),那重試的時(shí)候再執(zhí)行壓縮
                             * 就不會(huì)出現(xiàn)反復(fù)壓縮的問(wèn)題了*/
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            messageCloned = true;
                            msg.setBody(prevBody);
                        }

                        if (topicWithNamespace) {
                            if (!messageCloned) {
                                tmpMessage = MessageAccessor.cloneMessage(msg);
                                messageCloned = true;
                            }
                            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
                        }
                        /**計(jì)算是否超時(shí)*/
                        long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeAsync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            tmpMessage,
                            requestHeader,
                            timeout - costTimeAsync,
                            communicationMode,
                            sendCallback,
                            topicPublishInfo,
                            this.mQClientFactory,
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                            context,
                            this);
                        break;
                    case ONEWAY:
                    case SYNC:
                        long costTimeSync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeSync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout - costTimeSync,
                            communicationMode,
                            context,
                            this);
                        break;
                    default:
                        assert false;
                        break;
                }
                /**執(zhí)行消息發(fā)送后鉤子*/
                if (this.hasSendMessageHook()) {
                    context.setSendResult(sendResult);
                    this.executeSendMessageHookAfter(context);
                }

                return sendResult;
                //老規(guī)矩,刪除異常處理
            } catch (RemotingException e) {
            } catch (InterruptedException e) {
            } finally {
                msg.setBody(prevBody);
                msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
            }
        }

還沒(méi)到,咱還得繼續(xù)深入,繼續(xù)追蹤sendMessage方法,這里的處理就相對(duì)之前比較簡(jiǎn)單了

1.看能否能拿到消息的"類型"屬性,看是否為"REPLY"類型,或者又是否為BatchMessage 2.不同的"類型"構(gòu)造不同的通信請(qǐng)求 3.根據(jù)消息發(fā)送模式選擇不同的調(diào)用

public SendResult sendMessage(
    final String addr,
    final String brokerName,
    final Message msg,
    final SendMessageRequestHeader requestHeader,
    final long timeoutMillis,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final TopicPublishInfo topicPublishInfo,
    final MQClientInstance instance,
    final int retryTimesWhenSendFailed,
    final SendMessageContext context,
    final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
    long beginStartTime = System.currentTimeMillis();
    RemotingCommand request = null;
    String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
    boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
    if (isReply) {
        if (sendSmartMsg) {
            //該類的字段全為a,b,c...,目的是為了加速fastjson序列化
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
        } else {
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
        }
    } else {
        if (sendSmartMsg || msg instanceof MessageBatch) {
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
        } else {
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
        }
    }
    request.setBody(msg.getBody());

    switch (communicationMode) {
        case ONEWAY:
            this.remotingClient.invokeOneway(addr, request, timeoutMillis);
            return null;
        case ASYNC:
            final AtomicInteger times = new AtomicInteger();
            long costTimeAsync = System.currentTimeMillis() - beginStartTime;
            if (timeoutMillis < costTimeAsync) {
                throw new RemotingTooMuchRequestException("sendMessage call timeout");
            }
            this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
                retryTimesWhenSendFailed, times, context, producer);
            return null;
        case SYNC:
            long costTimeSync = System.currentTimeMillis() - beginStartTime;
            if (timeoutMillis < costTimeSync) {
                throw new RemotingTooMuchRequestException("sendMessage call timeout");
            }
            return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
        default:
            assert false;
            break;
    }

    return null;
}

我們應(yīng)該發(fā)現(xiàn)走了這么久,還一直都沒(méi)到真正的發(fā)送階段,一直在做發(fā)送前的準(zhǔn)備工作,那接下來(lái)應(yīng)該是了吧,這里我們先選擇同步發(fā)送的方法點(diǎn)進(jìn)去看下

private SendResult sendMessageSync(
    final String addr,
    final String brokerName,
    final Message msg,
    final long timeoutMillis,
    final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
    /**通過(guò)netty發(fā)送到broker*/
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
    assert response != null;
    /**將返回結(jié)果包裝到sendResult*/
    return this.processSendResponse(brokerName, msg, response);
}

看來(lái)終于要到真正的發(fā)送了,這里調(diào)用的包裝的Netty的通信類的同步數(shù)據(jù)傳輸接口,再拿到發(fā)送的結(jié)果,處理好之后返回給上層.

這下終于要揭開面紗了,進(jìn)入了網(wǎng)絡(luò)傳輸層,原來(lái)Netty就是在這里應(yīng)用的!繼續(xù)看remotingClient的invokeSync,這個(gè)接口是Netty客戶端的應(yīng)用的定義,實(shí)現(xiàn)在NettyRemotingClient.

這里是建立通信的channel,然后執(zhí)行一些鉤子函數(shù)

public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
    throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
    long beginStartTime = System.currentTimeMillis();
    /**通過(guò)ip地址建立通向地址的channel,如果緩存中有channel就從緩存中取*/
    final Channel channel = this.getAndCreateChannel(addr);
    /**確認(rèn)channel可用*/
    if (channel != null && channel.isActive()) {
        try {
            /**執(zhí)行rpc請(qǐng)求前的鉤子方法*/
            doBeforeRpcHooks(addr, request);
            long costTime = System.currentTimeMillis() - beginStartTime;
            if (timeoutMillis < costTime) {
                throw new RemotingTimeoutException("invokeSync call timeout");
            }
            RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
            doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
            return response;
        } catch (RemotingSendRequestException e) {
            log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
            this.closeChannel(addr, channel);
            throw e;
        } catch (RemotingTimeoutException e) {
            if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                this.closeChannel(addr, channel);
                log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
            }
            log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
            throw e;
        }
    } else {
        this.closeChannel(addr, channel);
        throw new RemotingConnectException(addr);
    }
}

進(jìn)入invokeSyncImpl

這里能看到Netty的數(shù)據(jù)發(fā)送了,因此整個(gè)消息的發(fā)送就到這里結(jié)束了(當(dāng)然還有將結(jié)果一步步回傳).可以看到Rocket的底層通信依賴的就是Netty,用Netty實(shí)現(xiàn)網(wǎng)絡(luò)通信也是非常的簡(jiǎn)單,并且Netty也相當(dāng)?shù)母咝?跑個(gè)題:Dubbo的通信也是用的Netty,有時(shí)間的話一定要精研一下Netty.

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
    final long timeoutMillis)
    throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
    final int opaque = request.getOpaque();

    try {
        final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
        this.responseTable.put(opaque, responseFuture);
        final SocketAddress addr = channel.remoteAddress();
        /**原生netty開始傳輸數(shù)據(jù)到網(wǎng)絡(luò),并建立一個(gè)listener監(jiān)聽傳輸結(jié)果*/
        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
                if (f.isSuccess()) {
                    responseFuture.setSendRequestOK(true);
                    return;
                } else {
                    responseFuture.setSendRequestOK(false);
                }

                responseTable.remove(opaque);
                responseFuture.setCause(f.cause());
                responseFuture.putResponse(null);
                log.warn("send a request command to channel <" + addr + "> failed.");
            }
        });

        RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
        if (null == responseCommand) {
            if (responseFuture.isSendRequestOK()) {
                throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                    responseFuture.getCause());
            } else {
                throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
            }
        }

        return responseCommand;
    } finally {
        this.responseTable.remove(opaque);
    }
}

回到三個(gè)代碼片段之前,我們沒(méi)講異步發(fā)送,在這里繼續(xù)看一下異步的.前面講過(guò)同步的了,這里咱們稍微看下注釋即可,在這里還有對(duì)于rocket的一個(gè)容錯(cuò)機(jī)制的處理

/**
 * 異步發(fā)送與同步發(fā)送流程差別不大
 * 主要區(qū)別在于異步發(fā)送不用返回結(jié)果給調(diào)用方了,異步發(fā)送在方法內(nèi)處理消息發(fā)送結(jié)果
 */
private void sendMessageAsync(
    final String addr,
    final String brokerName,
    final Message msg,
    final long timeoutMillis,
    final RemotingCommand request,
    final SendCallback sendCallback,
    final TopicPublishInfo topicPublishInfo,
    final MQClientInstance instance,
    final int retryTimesWhenSendFailed,
    final AtomicInteger times,
    final SendMessageContext context,
    final DefaultMQProducerImpl producer
) throws InterruptedException, RemotingException {
    final long beginStartTime = System.currentTimeMillis();
    this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
        @Override
        public void operationComplete(ResponseFuture responseFuture) {
            long cost = System.currentTimeMillis() - beginStartTime;
            RemotingCommand response = responseFuture.getResponseCommand();
            /**
             * 如果sendCallback不為空的話就不處理sendCallBack了
             */
            if (null == sendCallback && response != null) {

                try {
                    SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
                    if (context != null && sendResult != null) {
                        context.setSendResult(sendResult);
                        context.getProducer().executeSendMessageHookAfter(context);
                    }
                } catch (Throwable e) {
                }
                /**
                 * mq如果開啟了容錯(cuò)策略,rocketmq會(huì)通過(guò)預(yù)測(cè)機(jī)制來(lái)預(yù)測(cè)一個(gè)broker是否可用
                 * 更新當(dāng)前broker處理一條消息所需要的時(shí)間.根據(jù)這個(gè)時(shí)間去預(yù)測(cè)broker的可用時(shí)間
                 *通過(guò)currentLatency的大小區(qū)間,來(lái)預(yù)測(cè)
                 */
                producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
                return;
            }

            /**
             * 如果sendCallBack為空,那么將結(jié)果處理到sendCallBack
             */
            if (response != null) {
                try {
                    SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
                    assert sendResult != null;
                    if (context != null) {
                        context.setSendResult(sendResult);
                        context.getProducer().executeSendMessageHookAfter(context);
                    }

                    try {
                        sendCallback.onSuccess(sendResult);
                    } catch (Throwable e) {
                    }

                    producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
                } catch (Exception e) {
                    producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
                    onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
                        retryTimesWhenSendFailed, times, e, context, false, producer);
                }
            } else {
                producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
                if (!responseFuture.isSendRequestOK()) {
                    MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
                    onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
                        retryTimesWhenSendFailed, times, ex, context, true, producer);
                } else if (responseFuture.isTimeout()) {
                    MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
                        responseFuture.getCause());
                    onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
                        retryTimesWhenSendFailed, times, ex, context, true, producer);
                } else {
                    MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
                    onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
                        retryTimesWhenSendFailed, times, ex, context, true, producer);
                }
            }
        }
    });
}

繼續(xù)深入走到了RemotingClient的invokeAsync,基本與invokeSync是一樣的

public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
    throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
    RemotingSendRequestException {
    long beginStartTime = System.currentTimeMillis();
    final Channel channel = this.getAndCreateChannel(addr);
    if (channel != null && channel.isActive()) {
        try {
            doBeforeRpcHooks(addr, request);
            long costTime = System.currentTimeMillis() - beginStartTime;
            if (timeoutMillis < costTime) {
                throw new RemotingTooMuchRequestException("invokeAsync call timeout");
            }
            this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
        } catch (RemotingSendRequestException e) {
            log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
            this.closeChannel(addr, channel);
            throw e;
        }
    } else {
        this.closeChannel(addr, channel);
        throw new RemotingConnectException(addr);
    }
}

再看invokeAsyncImpl,這里與sync的區(qū)別主要是對(duì)發(fā)送失敗做了一些額外處理,因?yàn)楫惒降氖强梢耘渲弥卦嚥呗缘?其余發(fā)送數(shù)據(jù)就基本一樣了.

public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
    final InvokeCallback invokeCallback)
    throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
    long beginStartTime = System.currentTimeMillis();
    //重試次數(shù)
    final int opaque = request.getOpaque();
    boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
    if (acquired) {
        final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
        long costTime = System.currentTimeMillis() - beginStartTime;
        if (timeoutMillis < costTime) {
            once.release();
            throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
        }

        final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
        this.responseTable.put(opaque, responseFuture);
        try {
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    //如果響應(yīng)成功,直接返回
                    if (f.isSuccess()) {
                        responseFuture.setSendRequestOK(true);
                        return;
                    }
                    //失敗了則重試次數(shù)+1
                    requestFail(opaque);
                    log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
                }
            });
        } catch (Exception e) {
            responseFuture.release();
            log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
            throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
        }
    } else {
        if (timeoutMillis <= 0) {
            throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
        } else {
            String info =
                String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                    timeoutMillis,
                    this.semaphoreAsync.getQueueLength(),
                    this.semaphoreAsync.availablePermits()
                );
            log.warn(info);
            throw new RemotingTimeoutException(info);
        }
    }
}

總結(jié):Rocket真正發(fā)送消息的過(guò)程主要分為2步,一是發(fā)送前的準(zhǔn)備,包括各種鉤子函數(shù),消息校驗(yàn),按不同的發(fā)送策略對(duì)消息進(jìn)行不同的處理,二就是網(wǎng)絡(luò)發(fā)送了,網(wǎng)絡(luò)通信是依賴Netty完成的,實(shí)現(xiàn)非常的簡(jiǎn)單.

值得重點(diǎn)關(guān)注的細(xì)節(jié)點(diǎn)有很多,比如異步消息情況下對(duì)消息壓縮的特殊處理,以及重試消息的策略處理.ps:異步消息情況下的消息壓縮是因?yàn)槌鲞^(guò)bug,才有了如今的額外處理,之前是沒(méi)有的.

“RocketMq進(jìn)階源碼學(xué)習(xí)之如何實(shí)現(xiàn)生產(chǎn)者發(fā)送消息”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI