溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

java開(kāi)發(fā)RocketMQ生產(chǎn)者高可用示例分析

發(fā)布時(shí)間:2022-08-08 15:58:10 來(lái)源:億速云 閱讀:240 作者:iii 欄目:開(kāi)發(fā)技術(shù)

本篇內(nèi)容主要講解“java開(kāi)發(fā)RocketMQ生產(chǎn)者高可用示例分析”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“java開(kāi)發(fā)RocketMQ生產(chǎn)者高可用示例分析”吧!

    1 消息

    public class Message implements Serializable {
        private static final long serialVersionUID = 8445773977080406428L;
        //主題名字
        private String topic;
        //消息擴(kuò)展信息,Tag,keys,延遲級(jí)別都存在這里
        private Map<String, String> properties;
        //消息體,字節(jié)數(shù)組
        private byte[] body;
        //設(shè)置消息的key,
        public void setKeys(String keys) {}
        //設(shè)置topic
        public void setTopic(String topic) {}
        //延遲級(jí)別
        public int setDelayTimeLevel(int level) {}
        //消息過(guò)濾的標(biāo)記
        public void setTags(String tags) {}
        //擴(kuò)展信息存放在此
        public void putUserProperty(final String name, final String value) {}
    }

    消息就是孩子們,這些孩子們呢,有各自的特點(diǎn),也有共性。同一個(gè)家長(zhǎng)送來(lái)的兩個(gè)孩子可以是去同一個(gè)地方的,也可以是去不同的地方的。

    1.1 topic

    首先呢,每個(gè)孩子消息都有一個(gè)屬性topic,這個(gè)我們上文說(shuō)到了,是一個(gè)候船大廳。孩子們進(jìn)來(lái)之后,走到自己指定的候船大廳的指定區(qū)域(平時(shí)出門坐火車高鐵不也是指定的站臺(tái)乘車么),坐到message queue座位上等,等著出行。

    Broker有一個(gè)或者多個(gè)topic,消息會(huì)存放到topic內(nèi)的message queue內(nèi),等待被消費(fèi)。

    1.2 Body

    孩子消息,也有一個(gè)Body屬性,這就是他的能力,他會(huì)畫畫,他會(huì)唱歌,他會(huì)干啥干啥,就記錄在這個(gè)Body屬性里。等走出去了,體現(xiàn)價(jià)值的地方也是這個(gè)Body屬性。

    Body就是消息體,消費(fèi)者會(huì)根據(jù)消息體執(zhí)行對(duì)應(yīng)的操作。

    1.3 tag

    這個(gè)tag我們上節(jié)說(shuō)了,就是一個(gè)標(biāo)記,有的孩子背著畫板,相機(jī),有的游船就特意找到這些孩子拉走,完成他們的任務(wù)。

    可以給消息設(shè)置tag屬性,消費(fèi)者可以選擇含有特定tag屬性的消息進(jìn)行消費(fèi)。

    1.4 key

    key就是每個(gè)孩子消息的名字了。要找哪個(gè)孩子,喊他名就行。

    對(duì)發(fā)送的消息設(shè)置好 Key,以后可以根據(jù)這個(gè)Key 來(lái)查找消息。比如消息異常,消息丟失,進(jìn)行查找會(huì)很方便。

    1.5 延遲級(jí)別

    當(dāng)然,還有的孩子來(lái)就不急著走,來(lái)之前就想好了,要恰個(gè)飯,得30分鐘,所以自己來(lái)了會(huì)等30分鐘后被接走。

    設(shè)置延遲級(jí)別可以規(guī)定多久后消息可以被消費(fèi)。

    2 生產(chǎn)者高可用

    每個(gè)送孩子來(lái)的家長(zhǎng)都希望能送到候船大廳里,更不希望孩子被搞丟了,這個(gè)時(shí)候這個(gè)候船大廳就需要一些保證機(jī)制了。

    2.1 客戶端保證生產(chǎn)者高可用

    2.1.1 重試機(jī)制

    就是說(shuō)家長(zhǎng)送來(lái)了,孩子進(jìn)到候船大廳之后,沒(méi)能成功坐到message queue座位上,這個(gè)時(shí)候工作人員會(huì)安排重試,再去看是否有座位坐。重試次數(shù)默認(rèn)是2次,也就是說(shuō),消息孩子共有3次找座位坐的機(jī)會(huì)。

    看源碼,我特意加了注解,大致可以看懂一些了。

    //這里取到了重試的次數(shù)
    int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
    int times = 0;
    String[] brokersSent = new String[timesTotal];
    for (; times < timesTotal; times++) {
        String lastBrokerName = null == mq ? null : mq.getBrokerName();
        //獲取消息隊(duì)列
        MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
        if (mqSelected != null) {
            mq = mqSelected;
            brokersSent[times] = mq.getBrokerName();
            try {
                beginTimestampPrev = System.currentTimeMillis();
                if (times > 0) {
                    //Reset topic with namespace during resend.
                    msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                }
                long costTime = beginTimestampPrev - beginTimestampFirst;
                if (timeout < costTime) {
                    callTimeout = true;
                    break;
                }
                //發(fā)送消息
                sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                ...
            } catch (RemotingException e) {
                ...
                continue;
            } catch (MQClientException e) {
                ...
                continue;
            } catch (MQBrokerException e) {
                ...
                continue;
            } catch (InterruptedException e) {
                //可以看到只有InterruptedException拋出了異常,其他的exception都會(huì)繼續(xù)重試
                throw e;
            }
        } else {
            break;
        }
    }

    重試代碼如上,這個(gè)sendDefaultImpl方法中,會(huì)嘗試發(fā)送三次消息,若是都失敗,才會(huì)拋出對(duì)應(yīng)的錯(cuò)誤。

    2.1.2 客戶端容錯(cuò)

    若是有多個(gè)Broker候車大廳的時(shí)候,服務(wù)人員會(huì)安排消息孩子選擇一個(gè)相對(duì)不擁擠,比較容易進(jìn)入的來(lái)進(jìn)入。當(dāng)然那些已經(jīng)關(guān)閉的停電的,沒(méi)有服務(wù)能力的,我們是不會(huì)進(jìn)的。

    MQ Client會(huì)維護(hù)一個(gè)Broker的發(fā)送延遲信息,根據(jù)這個(gè)信息會(huì)選擇一個(gè)相對(duì)延遲較低的Broker來(lái)發(fā)送消息。會(huì)主動(dòng)剔除哪些已經(jīng)宕機(jī),不可用或發(fā)送延遲級(jí)別較高的Broker.

    選擇Broker就是在選擇message queue,對(duì)應(yīng)的代碼如下:

    這里會(huì)先判斷延遲容錯(cuò)開(kāi)關(guān)是否開(kāi)啟,這個(gè)開(kāi)關(guān)默認(rèn)是關(guān)閉的,若是開(kāi)啟的話,會(huì)優(yōu)先選擇延遲較低的Broker。

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        //判斷發(fā)送延遲容錯(cuò)開(kāi)關(guān)是否開(kāi)啟
        if (this.sendLatencyFaultEnable) {
            try {
                //選擇一個(gè)延遲上可以接受,并且和上次發(fā)送相同的Broker
                int index = tpInfo.getSendWhichQueue().incrementAndGet();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    //若是Broker的延遲時(shí)間可以接受,則返回這個(gè)Broker
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                        return mq;
                }
                //若是第一步?jīng)]能選中一個(gè)Broker,就選擇一個(gè)延遲較低的Broker
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
            //若是前邊都沒(méi)選中一個(gè)Broker,就隨機(jī)選一個(gè)Broker
            return tpInfo.selectOneMessageQueue();
        }
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

    但是當(dāng)延遲容錯(cuò)開(kāi)關(guān)關(guān)閉狀態(tài)的時(shí)候,執(zhí)行的代碼如下:

    為了均勻分散Broker的壓力,會(huì)選擇與之前不同的Broker。

    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        //若是沒(méi)有上次的Brokername做參考,就隨機(jī)選一個(gè)
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            //如果有,那么就選一個(gè)其他的Broker
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int index = this.sendWhichQueue.incrementAndGet();
                int pos = Math.abs(index) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                //這里判斷遇上一個(gè)使用的Broker不是同一個(gè)
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            //若是上邊的都沒(méi)選中,那么就隨機(jī)選一個(gè)
            return selectOneMessageQueue();
        }
    }

    2.2 Broker端保證生產(chǎn)者高可用

    Broker候船大廳為了能確切的接收到消息孩子,至少會(huì)有兩個(gè)廳,一個(gè)主廳一個(gè)副廳,一般來(lái)說(shuō)孩子都會(huì)進(jìn)入到主廳,然后一頓操作,卡該忙信那機(jī)資(影分身之術(shù)),然后讓分身進(jìn)入到副廳,這樣當(dāng)主廳停電了,不工作了,副廳的分身只要去完成了任務(wù)就ok的。一般來(lái)說(shuō)都是主廳的消息孩子去坐船完成任務(wù)。

    到此,相信大家對(duì)“java開(kāi)發(fā)RocketMQ生產(chǎn)者高可用示例分析”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

    向AI問(wèn)一下細(xì)節(jié)

    免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

    AI