溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

RocketMQ broker啟動流程是什么

發(fā)布時間:2023-03-23 14:28:34 來源:億速云 閱讀:77 作者:iii 欄目:開發(fā)技術(shù)

這篇文章主要介紹“RocketMQ broker啟動流程是什么”的相關(guān)知識,小編通過實際案例向大家展示操作過程,操作方法簡單快捷,實用性強,希望這篇“RocketMQ broker啟動流程是什么”文章能幫助大家解決問題。

    1. 啟動入口

    本系列RocketMQ4.8注釋github地址,希望對大家有所幫助,要是覺得可以的話麻煩給點一下Star哈

    前面我們已經(jīng)分析完了NameServerproducer,從本文開始,我們將分析Broker。

    broker的啟動類為org.apache.rocketmq.broker.BrokerStartup,代碼如下:

    public class BrokerStartup {
        ...
        public static void main(String[] args) {
            start(createBrokerController(args));
        }
        ...
    }

    main()方法中,僅有一行代碼,這行代碼包含了兩個操作:

    • createBrokerController(...):創(chuàng)建BrokerController

    • start(...):啟動Broker

    接下來我們就來分析這兩個操作。

    2. 創(chuàng)建BrokerController

    創(chuàng)建BrokerController的方法為BrokerStartup#createBrokerController,代碼如下:

    /**
     * 創(chuàng)建 broker 的配置參數(shù)
     */
    public static BrokerController createBrokerController(String[] args) {
        ...
        try {
            //解析命令行參數(shù)
            Options options = ServerUtil.buildCommandlineOptions(new Options());
            commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
                new PosixParser());
            if (null == commandLine) {
                System.exit(-1);
            }
            // 處理配置
            final BrokerConfig brokerConfig = new BrokerConfig();
            final NettyServerConfig nettyServerConfig = new NettyServerConfig();
            final NettyClientConfig nettyClientConfig = new NettyClientConfig();
            // tls安全相關(guān)
            nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
                String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
            // 配置端口
            nettyServerConfig.setListenPort(10911);
            // 消息存儲的配置
            final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
            ...
            // 將命令行中的配置設(shè)置到brokerConfig對象中
            MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
            // 檢查環(huán)境變量:ROCKETMQ_HOME
            if (null == brokerConfig.getRocketmqHome()) {
                System.out.printf("Please set the %s variable in your environment to match 
                    the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
                System.exit(-2);
            }
            //省略一些配置
            ...
            // 創(chuàng)建 brokerController
            final BrokerController controller = new BrokerController(
                brokerConfig,
                nettyServerConfig,
                nettyClientConfig,
                messageStoreConfig);
            controller.getConfiguration().registerConfig(properties);
            // 初始化
            boolean initResult = controller.initialize();
            if (!initResult) {
                controller.shutdown();
                System.exit(-3);
            }
            // 關(guān)閉鉤子,在關(guān)閉前處理一些操作
            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                private volatile boolean hasShutdown = false;
                private AtomicInteger shutdownTimes = new AtomicInteger(0);
                @Override
                public void run() {
                    synchronized (this) {
                        if (!this.hasShutdown) {
                            ...
                            // 這里會發(fā)一條注銷消息給nameServer
                            controller.shutdown();
                            ...
                        }
                    }
                }
            }, "ShutdownHook"));
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }
        return null;
    }

    這個方法的代碼有點長,但功能并不多,總的來說就三個功能:

    • 處理配置:主要是處理nettyServerConfignettyClientConfig的配置,這塊就是一些配置解析的操作,處理方式與NameServer很類似,這里就不多說了。

    • 創(chuàng)建及初始化controller:調(diào)用方法controller.initialize(),這塊內(nèi)容我們后面分析。

    • 注冊關(guān)閉鉤子:調(diào)用Runtime.getRuntime().addShutdownHook(...),可以在jvm進程關(guān)閉前進行一些操作。

    2.1 controller實例化

    BrokerController的創(chuàng)建及初始化是在BrokerStartup#createBrokerController方法中進行,我們先來看看它的構(gòu)造方法:

    public BrokerController(
        final BrokerConfig brokerConfig,
        final NettyServerConfig nettyServerConfig,
        final NettyClientConfig nettyClientConfig,
        final MessageStoreConfig messageStoreConfig
    ) {
        // 4個核心配置信息
        this.brokerConfig = brokerConfig;
        this.nettyServerConfig = nettyServerConfig;
        this.nettyClientConfig = nettyClientConfig;
        this.messageStoreConfig = messageStoreConfig;
        // 管理consumer消費消息的offset
        this.consumerOffsetManager = new ConsumerOffsetManager(this);
        // 管理topic配置
        this.topicConfigManager = new TopicConfigManager(this);
        // 處理 consumer 拉消息請求的
        this.pullMessageProcessor = new PullMessageProcessor(this);
        this.pullRequestHoldService = new PullRequestHoldService(this);
        // 消息送達的監(jiān)聽器
        this.messageArrivingListener 
            = new NotifyMessageArrivingListener(this.pullRequestHoldService);
        ...
        // 往外發(fā)消息的組件
        this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
        ...
    }

    BrokerController的構(gòu)造方法很長,基本都是一些賦值操作,代碼中已列出關(guān)鍵項,這些包括:

    • 核心配置賦值:主要是brokerConfig/nettyServerConfig/nettyClientConfig/messageStoreConfig四個配置

    • ConsumerOffsetManager:管理consumer消費消息位置的偏移量,偏移量表示消費者組消費該topic消息的位置,后面再消費時,就從該位置后消費,避免重復(fù)消費消息,也避免了漏消費消息。

    • topicConfigManagertopic配置管理器,就是用來管理topic配置的,如topic名稱,topic隊列數(shù)量

    • pullMessageProcessor:消息處理器,用來處理消費者拉消息

    • messageArrivingListener:消息送達的監(jiān)聽器,當(dāng)生產(chǎn)者的消息送達時,由該監(jiān)聽器監(jiān)聽

    • brokerOuterAPI:往外發(fā)消息的組件,如向NameServer發(fā)送注冊/注銷消息

    以上這些組件的用處,這里先混個臉熟,我們后面再分析。

    2.2 初始化controller

    我們再來看看初始化操作,方法為BrokerController#initialize

    public boolean initialize() throws CloneNotSupportedException {
        // 加載配置文件中的配置
        boolean result = this.topicConfigManager.load();
        result = result && this.consumerOffsetManager.load();
        result = result && this.subscriptionGroupManager.load();
        result = result && this.consumerFilterManager.load();
        if (result) {
            try {
                // 消息存儲管理組件,管理磁盤上的消息
                this.messageStore =
                    new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, 
                        this.messageArrivingListener, this.brokerConfig);
                // 啟用了DLeger,就創(chuàng)建DLeger相關(guān)組件
                if (messageStoreConfig.isEnableDLegerCommitLog()) {
                    ...
                }
                // broker統(tǒng)計組件
                this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
                //load plugin
                MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, 
                    brokerStatsManager, messageArrivingListener, brokerConfig);
                this.messageStore = MessageStoreFactory.build(context, this.messageStore);
                this.messageStore.getDispatcherList().addFirst(
                    new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
            } catch (IOException e) {
                result = false;
                log.error("Failed to initialize", e);
            }
        }
        // 加載磁盤上的記錄,如commitLog寫入的位置、消費者主題/隊列的信息
        result = result && this.messageStore.load();
        if (result) {
            // 處理 nettyServer
            this.remotingServer = new NettyRemotingServer(
                this.nettyServerConfig, this.clientHousekeepingService);
            NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
            fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
            this.fastRemotingServer = new NettyRemotingServer(
                fastConfig, this.clientHousekeepingService);
            // 創(chuàng)建線程池start... 這里會創(chuàng)建多種類型的線程池
            ...
            // 處理consumer pull操作的線程池
            this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getPullMessageThreadPoolNums(),
                this.brokerConfig.getPullMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.pullThreadPoolQueue,
                new ThreadFactoryImpl("PullMessageThread_"));
            ...
            // 創(chuàng)建線程池end...
            // 注冊處理器
            this.registerProcessor();
            // 啟動定時任務(wù)start... 這里會啟動好多的定時任務(wù)
            ...
            // 定時將consumer消費到的offset進行持久化操作,即將數(shù)據(jù)保存到磁盤上
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.consumerOffsetManager.persist();
                    } catch (Throwable e) {
                        log.error("schedule persist consumerOffset error.", e);
                    }
                }
            }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
            ...
            // 啟動定時任務(wù)end...
            ...
            // 開啟 DLeger 的一些操作
            if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                ...
            }
            // 處理tls配置
            if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
                ...
            }
            // 初始化一些操作
            initialTransaction();
            initialAcl();
            initialRpcHooks();
        }
        return result;
    }

    這個還是很長,關(guān)鍵部分都做了注釋,該方法所做的工作如下:

    • 加載配置文件中的配置

    • 賦值與初始化操作

    • 創(chuàng)建線程池

    • 注冊處理器

    • 啟動定時任務(wù)

    這里我們來看下注冊處理器的操作this.registerProcessor():

    2.2.1 注冊處理器:BrokerController#registerProcessor

    this.registerProcessor()實際調(diào)用的方法是BrokerController#registerProcessor,代碼如下:

    public void registerProcessor() {
        /**
         * SendMessageProcessor
         */
        SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
        sendProcessor.registerSendMessageHook(sendMessageHookList);
        sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, 
            this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor,  
            this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, 
            this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, 
            this.sendMessageExecutor);
        ...
        /**
         * PullMessageProcessor
         */
        this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, 
            this.pullMessageExecutor);
        this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
        /**
            * ReplyMessageProcessor
            */
        ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);
        replyMessageProcessor.registerSendMessageHook(sendMessageHookList);
        ...
    }

    這個方法里注冊了許許多多的處理器,這里僅列出了與消息相關(guān)的內(nèi)容,如發(fā)送消息、回復(fù)消息、拉取消息等,后面在處理producer/consumer的消息時,就會用到這些處理器,這里先不展開分析。

    2.2.2 remotingServer注冊處理器:NettyRemotingServer#registerProcessor

    我們來看下remotingServer注冊處理器的操作,方法為NettyRemotingServer#registerProcessor

    public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
        ...
        @Override
        public void registerProcessor(int requestCode, NettyRequestProcessor processor, 
                ExecutorService executor) {
            ExecutorService executorThis = executor;
            if (null == executor) {
                executorThis = this.publicExecutor;
            }
            Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, 
                    ExecutorService>(processor, executorThis);
            // 注冊到processorTable 中
            this.processorTable.put(requestCode, pair);
        }
        ...
    }

    最終,這些處理器注冊到了processorTable中,它是NettyRemotingAbstract的成員變量,定義如下:

    HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>>

    這是一個hashMap的結(jié)構(gòu),keycodevaluePair,該類中有兩個成員變量:NettyRequestProcessor、ExecutorService,codeNettyRequestProcessor的映射關(guān)系就是在hashMap里存儲的。

    2.3 注冊關(guān)閉鉤子:Runtime.getRuntime().addShutdownHook(...)

    接著我們來看看注冊關(guān)閉鉤子的操作:

    // 關(guān)閉鉤子,在關(guān)閉前處理一些操作
    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
        private volatile boolean hasShutdown = false;
        private AtomicInteger shutdownTimes = new AtomicInteger(0);
        @Override
        public void run() {
            synchronized (this) {
                if (!this.hasShutdown) {
                    ...
                    // 這里會發(fā)一條注銷消息給nameServer
                    controller.shutdown();
                    ...
                }
            }
        }
    }, "ShutdownHook"));

    跟進BrokerController#shutdown方法:

    public void shutdown() {
        // 調(diào)用各組件的shutdown方法
        ...
        // 發(fā)送注銷消息到NameServer
        this.unregisterBrokerAll();
        ...
        // 持久化consumer的消費偏移量
        this.consumerOffsetManager.persist();
        // 又是調(diào)用各組件的shutdown方法
        ...

    這個方法里會調(diào)用各組件的shutdown()方法、發(fā)送注銷消息給NameServer、持久化consumer的消費偏移量,這里我們主要看發(fā)送注銷消息的方法BrokerController#unregisterBrokerAll:

    private void unregisterBrokerAll() {
        // 發(fā)送一條注銷消息給nameServer
        this.brokerOuterAPI.unregisterBrokerAll(
            this.brokerConfig.getBrokerClusterName(),
            this.getBrokerAddr(),
            this.brokerConfig.getBrokerName(),
            this.brokerConfig.getBrokerId());
    }

    繼續(xù)進入BrokerOuterAPI#unregisterBrokerAll

    public void unregisterBrokerAll(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId
    ) {
        // 獲取所有的 nameServer,遍歷發(fā)送注銷消息
        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
        if (nameServerAddressList != null) {
            for (String namesrvAddr : nameServerAddressList) {
                try {
                    this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId);
                    log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr);
                } catch (Exception e) {
                    log.warn("unregisterBroker Exception, {}", namesrvAddr, e);
                }
            }
        }
    }

    這個方法里,會獲取到所有的nameServer,然后逐個發(fā)送注銷消息,繼續(xù)進入BrokerOuterAPI#unregisterBroker方法:

    public void unregisterBroker(
        final String namesrvAddr,
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId
    ) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, 
            InterruptedException, MQBrokerException {
        UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader();
        requestHeader.setBrokerAddr(brokerAddr);
        requestHeader.setBrokerId(brokerId);
        requestHeader.setBrokerName(brokerName);
        requestHeader.setClusterName(clusterName);
        // 發(fā)送的注銷消息:RequestCode.UNREGISTER_BROKER
        RemotingCommand request = RemotingCommand.createRequestCommand(
                c, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, 3000);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                return;
            }
            default:
                break;
        }
        throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr);
    }

    最終調(diào)用的是RemotingClient#invokeSync進行消息發(fā)送,請求codeRequestCode.UNREGISTER_BROKER,這就與NameServer接收broker的注銷消息對應(yīng)上了。

    3. 啟動Broker:start(...)

    我們再來看看Broker的啟動流程,處理方法為BrokerController#start

    public void start() throws Exception {
        // 啟動各組件
        // 啟動消息存儲相關(guān)組件
        if (this.messageStore != null) {
            this.messageStore.start();
        }
        // 啟動 remotingServer,其實就是啟動一個netty服務(wù),用來接收producer傳來的消息
        if (this.remotingServer != null) {
            this.remotingServer.start();
        }
        ...
        // broker對外發(fā)放消息的組件,向nameServer上報存活消息時使用了它,也是一個netty服務(wù)
        if (this.brokerOuterAPI != null) {
            this.brokerOuterAPI.start();
        }
        ...
        // broker 核心的心跳注冊任務(wù)
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false, 
                        brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
            // brokerConfig.getRegisterNameServerPeriod() 值為 1000 * 30,最終計算得到默認30秒執(zhí)行一次
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), 
                TimeUnit.MILLISECONDS);
        ...
    }

    這個方法主要就是啟動各組件了,這里列出了幾大重要組件的啟動:

    • messageStore:消息存儲組件,在這個組件里,會啟動消息存儲相關(guān)的線程,如消息的投遞操作、commitLog文件的flush操作、comsumeQueue文件的flush操作等

    • remotingServernetty服務(wù),用來接收請求消息,如producer發(fā)送過來的消息

    • brokerOuterAPI:也是一個netty服務(wù),用來對外發(fā)送消息,如向nameServer上報心跳消息

    • 啟動定時任務(wù):brokernameServer發(fā)送注冊消息

    這里我們重點來看定時任務(wù)是如何發(fā)送心跳發(fā)送的。

    處理注冊消息發(fā)送的時間間隔如下:

    Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)

    這行代碼看著長,但意思就一句話:時間間隔可以自行配置,但不能小于10s,不能大于60s,默認是30s.

    處理消息注冊的方法為BrokerController#registerBrokerAll(...),代碼如下:

    public synchronized void registerBrokerAll(final boolean checkOrderConfig, 
            boolean oneway, boolean forceRegister) {
        TopicConfigSerializeWrapper topicConfigWrapper 
                = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
        // 處理topic相關(guān)配置
        if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
            || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
            ...
        }
        // 這里會判斷是否需要進行注冊
        if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
            this.getBrokerAddr(),
            this.brokerConfig.getBrokerName(),
            this.brokerConfig.getBrokerId(),
            this.brokerConfig.getRegisterBrokerTimeoutMills())) {
            // 進行注冊操作    
            doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
        }
    }

    這個方法就是用來處理注冊操作的,不過注冊前會先驗證下是否需要注冊,驗證是否需要注冊的方法為BrokerController#needRegister, 代碼如下:

    private boolean needRegister(final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final int timeoutMills) {
        TopicConfigSerializeWrapper topicConfigWrapper 
            = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
        // 判斷是否需要進行注冊
        List&lt;Boolean&gt; changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, 
            brokerId, topicConfigWrapper, timeoutMills);
        // 有一個發(fā)生了變化,就表示需要注冊了    
        boolean needRegister = false;
        for (Boolean changed : changeList) {
            if (changed) {
                needRegister = true;
                break;
            }
        }
        return needRegister;
    }

    這個方法調(diào)用了brokerOuterAPI.needRegister(...)來判斷broker是否發(fā)生了變化,只要一個NameServer上發(fā)生了變化,就說明需要進行注冊操作。

    brokerOuterAPI.needRegister(...)是如何判斷broker是否發(fā)生了變化的呢?繼續(xù)跟進BrokerOuterAPI#needRegister

    public List<Boolean> needRegister(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final int timeoutMills) {
        final List<Boolean> changedList = new CopyOnWriteArrayList<>();
        // 獲取所有的 nameServer
        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
        if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
            final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
            // 遍歷所有的nameServer,逐一發(fā)送請求
            for (final String namesrvAddr : nameServerAddressList) {
                brokerOuterExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            QueryDataVersionRequestHeader requestHeader 
                                = new QueryDataVersionRequestHeader();
                            ...
                            // 向nameServer發(fā)送消息,命令是 RequestCode.QUERY_DATA_VERSION
                            RemotingCommand request = RemotingCommand
                                .createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);
                            // 把當(dāng)前的 DataVersion 發(fā)到 nameServer     
                            request.setBody(topicConfigWrapper.getDataVersion().encode());
                            // 發(fā)請求到nameServer
                            RemotingCommand response = remotingClient
                                .invokeSync(namesrvAddr, request, timeoutMills);
                            DataVersion nameServerDataVersion = null;
                            Boolean changed = false;
                            switch (response.getCode()) {
                                case ResponseCode.SUCCESS: {
                                    QueryDataVersionResponseHeader queryDataVersionResponseHeader =
                                      (QueryDataVersionResponseHeader) response
                                      .decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
                                    changed = queryDataVersionResponseHeader.getChanged();
                                    byte[] body = response.getBody();
                                    if (body != null) {
                                        // 拿到 DataVersion
                                        nameServerDataVersion = DataVersion.decode(body, D
                                            ataVersion.class);
                                        // 這里是判斷的關(guān)鍵
                                        if (!topicConfigWrapper.getDataVersion()
                                            .equals(nameServerDataVersion)) {
                                            changed = true;
                                        }
                                    }
                                    if (changed == null || changed) {
                                        changedList.add(Boolean.TRUE);
                                    }
                                }
                                default:
                                    break;
                            }
                            ...
                        } catch (Exception e) {
                            ...
                        } finally {
                            countDownLatch.countDown();
                        }
                    }
                });
            }
            try {
                countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                log.error("query dataversion from nameserver countDownLatch await Exception", e);
            }
        }
        return changedList;
    }

    這個方法里,先是遍歷所有的nameServer,向每個nameServer都發(fā)送一條codeRequestCode.QUERY_DATA_VERSION的參數(shù),參數(shù)為當(dāng)前brokerDataVersion,當(dāng)nameServer收到消息后,就返回nameServer中保存的、與當(dāng)前broker對應(yīng)的DataVersion,當(dāng)兩者版本不相等時,就表明當(dāng)前broker發(fā)生了變化,需要重新注冊。

    DataVersion是個啥呢?它的部分代碼如下:

    public class DataVersion extends RemotingSerializable {
        // 時間戳
        private long timestamp = System.currentTimeMillis();
        // 計數(shù)器,可以理解為最近的版本號
        private AtomicLong counter = new AtomicLong(0);
        public void nextVersion() {
            this.timestamp = System.currentTimeMillis();
            this.counter.incrementAndGet();
        }
        /**
         * equals 方法,當(dāng) timestamp 與 counter 都相等時,則兩者相等
         */
        @Override
        public boolean equals(final Object o) {
            if (this == o)
                return true;
            if (o == null || getClass() != o.getClass())
                return false;
            final DataVersion that = (DataVersion) o;
            if (timestamp != that.timestamp) {
                return false;
            }
            if (counter != null && that.counter != null) {
                return counter.longValue() == that.counter.longValue();
            }
            return (null == counter) && (null == that.counter);
        }
        ...
    }

    DataVersionequals()方法來看,只有當(dāng)timestampcounter都相等時,兩個DataVersion對象才相等。那這兩個值會在哪里被修改呢?從DataVersion#nextVersion方法的調(diào)用情況來看,引起這兩個值的變化主要有兩種:

    • broker 上新創(chuàng)建了一個 topic

    • topic的發(fā)了的變化

    在這兩種情況下,DataVersion#nextVersion方法被調(diào)用,從而引起DataVersion的改變。DataVersion改變了,就表明當(dāng)前broker需要向nameServer注冊了。

    讓我們再回到BrokerController#registerBrokerAll(...)方法:

    public synchronized void registerBrokerAll(final boolean checkOrderConfig, 
            boolean oneway, boolean forceRegister) {
        ...
        // 這里會判斷是否需要進行注冊
        if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
            this.getBrokerAddr(),
            this.brokerConfig.getBrokerName(),
            this.brokerConfig.getBrokerId(),
            this.brokerConfig.getRegisterBrokerTimeoutMills())) {
            // 進行注冊操作    
            doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
        }
    }

    處理注冊的方法為BrokerController#doRegisterBrokerAll,稍微看下它的流程:

    private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
            TopicConfigSerializeWrapper topicConfigWrapper) {
        // 注冊
        List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
            this.brokerConfig.getBrokerClusterName(),
            this.getBrokerAddr(),
            this.brokerConfig.getBrokerName(),
            this.brokerConfig.getBrokerId(),
            this.getHAServerAddr(),
            // 這個對象里就包含了當(dāng)前broker的版本信息
            topicConfigWrapper,
            this.filterServerManager.buildNewFilterServerList(),
            oneway,
            this.brokerConfig.getRegisterBrokerTimeoutMills(),
            this.brokerConfig.isCompressedRegister());
        ...
    }

    繼續(xù)跟下去,最終調(diào)用的是BrokerOuterAPI#registerBroker方法:

    private RegisterBrokerResult registerBroker(
        final String namesrvAddr,
        final boolean oneway,
        final int timeoutMills,
        final RegisterBrokerRequestHeader requestHeader,
        final byte[] body
    ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, 
        RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
        // 構(gòu)建請求
        RemotingCommand request = RemotingCommand
            .createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
        request.setBody(body);
        // 處理發(fā)送操作:sendOneWay
        if (oneway) {
            try {
                // 注冊操作
                this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
            } catch (RemotingTooMuchRequestException e) {
                // Ignore
            }
            return null;
            ...
        }
        ....
    }

    所以,所謂的注冊操作,就是當(dāng)nameServer發(fā)送一條codeRequestCode.REGISTER_BROKER的消息,消息里會帶上當(dāng)前brokertopic信息、版本號等。

    關(guān)于“RocketMQ broker啟動流程是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識,可以關(guān)注億速云行業(yè)資訊頻道,小編每天都會為大家更新不同的知識點。

    向AI問一下細節(jié)

    免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

    AI