溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

RocketMQ DLedger多副本即主從切換的實(shí)現(xiàn)原理

發(fā)布時(shí)間:2021-06-25 11:48:29 來(lái)源:億速云 閱讀:387 作者:chen 欄目:大數(shù)據(jù)

本篇內(nèi)容介紹了“RocketMQ DLedger多副本即主從切換的實(shí)現(xiàn)原理”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

DLedger 基于 raft 協(xié)議,故天然支持主從切換,即主節(jié)點(diǎn)(Leader)發(fā)生故障,會(huì)重新觸發(fā)選主,在集群內(nèi)再選舉出新的主節(jié)點(diǎn)。

RocketMQ 中主從同步,從節(jié)點(diǎn)不僅會(huì)從主節(jié)點(diǎn)同步數(shù)據(jù),也會(huì)同步元數(shù)據(jù),包含 topic 路由信息、消費(fèi)進(jìn)度、延遲隊(duì)列處理隊(duì)列、消費(fèi)組訂閱配置等信息。那主從切換后元數(shù)據(jù)如何同步呢?特別是主從切換過(guò)程中,對(duì)消息消費(fèi)有多大的影響,會(huì)丟失消息嗎?

1、BrokerController 中與主從相關(guān)的方法詳解

本節(jié)先對(duì) BrokerController 中與主從切換相關(guān)的方法。

1.1 startProcessorByHa

BrokerController#startProcessorByHa

private void startProcessorByHa(BrokerRole role) {
    if (BrokerRole.SLAVE != role) {
        if (this.transactionalMessageCheckService != null) {
            this.transactionalMessageCheckService.start();
        }
    }
}

感覺(jué)該方法的取名較為隨意,該方法的作用是開(kāi)啟事務(wù)狀態(tài)回查處理器,即當(dāng)節(jié)點(diǎn)為主節(jié)點(diǎn)時(shí),開(kāi)啟對(duì)應(yīng)的事務(wù)狀態(tài)回查處理器,對(duì)PREPARE狀態(tài)的消息發(fā)起事務(wù)狀態(tài)回查請(qǐng)求。

1.2 shutdownProcessorByHa

BrokerController#shutdownProcessorByHa

private void shutdownProcessorByHa() {
    if (this.transactionalMessageCheckService != null) {
        this.transactionalMessageCheckService.shutdown(true);
    }
}

關(guān)閉事務(wù)狀態(tài)回查處理器,當(dāng)節(jié)點(diǎn)從主節(jié)點(diǎn)變更為從節(jié)點(diǎn)后,該方法被調(diào)用。

1.3 handleSlaveSynchronize

BrokerController#handleSlaveSynchronize

private void handleSlaveSynchronize(BrokerRole role) {
    if (role == BrokerRole.SLAVE) {   // [@1](https://my.oschina.net/u/1198)
        if (null != slaveSyncFuture) {   
            slaveSyncFuture.cancel(false);
        }
        this.slaveSynchronize.setMasterAddr(null);   // 
        slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            [@Override](https://my.oschina.net/u/1162528)
            public void run() {
                try {
                    BrokerController.this.slaveSynchronize.syncAll();
                } catch (Throwable e) {
                    log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
                }
            }
        }, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
    } else {  // @2
        //handle the slave synchronise
        if (null != slaveSyncFuture) {
            slaveSyncFuture.cancel(false);
        }
        this.slaveSynchronize.setMasterAddr(null);
    }
}

該方法的主要作用是處理從節(jié)點(diǎn)的元數(shù)據(jù)同步,即從節(jié)點(diǎn)向主節(jié)點(diǎn)主動(dòng)同步 topic 的路由信息、消費(fèi)進(jìn)度、延遲隊(duì)列處理隊(duì)列、消費(fèi)組訂閱配置等信息。

代碼@1:如果當(dāng)前節(jié)點(diǎn)的角色為從節(jié)點(diǎn):

  • 如果上次同步的 future 不為空,則首先先取消。

  • 然后設(shè)置 slaveSynchronize 的 master 地址為空。不知大家是否與筆者一樣,有一個(gè)疑問(wèn),從節(jié)點(diǎn)的時(shí)候,如果將 master 地址設(shè)置為空,那如何同步元數(shù)據(jù),那這個(gè)值會(huì)在什么時(shí)候設(shè)置呢?

  • 開(kāi)啟定時(shí)同步任務(wù),每 10s 從主節(jié)點(diǎn)同步一次元數(shù)據(jù)。

代碼@2:如果當(dāng)前節(jié)點(diǎn)的角色為主節(jié)點(diǎn),則取消定時(shí)同步任務(wù)并設(shè)置 master 的地址為空。

1.4 changeToSlave

BrokerController#changeToSlave

public void changeToSlave(int brokerId) {
    log.info("Begin to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);
    //change the role
    brokerConfig.setBrokerId(brokerId == 0 ? 1 : brokerId); //TO DO check       // @1
    messageStoreConfig.setBrokerRole(BrokerRole.SLAVE);                            // @2
    //handle the scheduled service
    try {
        this.messageStore.handleScheduleMessageService(BrokerRole.SLAVE);    //  @3
    } catch (Throwable t) {
        log.error("[MONITOR] handleScheduleMessageService failed when changing to slave", t);
    }
    //handle the transactional service
    try {
        this.shutdownProcessorByHa();                                                                    //  @4
    } catch (Throwable t) {
        log.error("[MONITOR] shutdownProcessorByHa failed when changing to slave", t);
    }
    //handle the slave synchronise
    handleSlaveSynchronize(BrokerRole.SLAVE);                                               // @5
    try {
        this.registerBrokerAll(true, true, brokerConfig.isForceRegister());              // @6
    } catch (Throwable ignored) {
    }
    log.info("Finish to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);
}

Broker 狀態(tài)變更為從節(jié)點(diǎn)。其關(guān)鍵實(shí)現(xiàn)如下:

  • 設(shè)置 brokerId,如果broker的id為0,則設(shè)置為1,這里在使用的時(shí)候,注意規(guī)劃好集群內(nèi)節(jié)點(diǎn)的 brokerId。

  • 設(shè)置 broker 的狀態(tài)為 BrokerRole.SLAVE。

  • 如果是從節(jié)點(diǎn),則關(guān)閉定時(shí)調(diào)度線程(處理 RocketMQ 延遲隊(duì)列),如果是主節(jié)點(diǎn),則啟動(dòng)該線程。

  • 關(guān)閉事務(wù)狀態(tài)回查處理器。

  • 從節(jié)點(diǎn)需要啟動(dòng)元數(shù)據(jù)同步處理器,即啟動(dòng) SlaveSynchronize 定時(shí)從主服務(wù)器同步元數(shù)據(jù)。

  • 立即向集群內(nèi)所有的 nameserver 告知 broker 信息狀態(tài)的變更。

1.5 changeToMaster

BrokerController#changeToMaster

public void changeToMaster(BrokerRole role) {
    if (role == BrokerRole.SLAVE) {
        return;
    }
    log.info("Begin to change to master brokerName={}", brokerConfig.getBrokerName());
    //handle the slave synchronise
    handleSlaveSynchronize(role);   // @1
    //handle the scheduled service
    try {
        this.messageStore.handleScheduleMessageService(role);      // @2
    } catch (Throwable t) {
        log.error("[MONITOR] handleScheduleMessageService failed when changing to master", t);
    }
    //handle the transactional service
    try {
        this.startProcessorByHa(BrokerRole.SYNC_MASTER);         // @3
    } catch (Throwable t) {
        log.error("[MONITOR] startProcessorByHa failed when changing to master", t);
    }
    //if the operations above are totally successful, we change to master
    brokerConfig.setBrokerId(0); //TO DO check                              // @4
    messageStoreConfig.setBrokerRole(role);                               
    try {
        this.registerBrokerAll(true, true, brokerConfig.isForceRegister()); // @5
    } catch (Throwable ignored) {
    }
    log.info("Finish to change to master brokerName={}", brokerConfig.getBrokerName());
}

該方法是 Broker 角色從從節(jié)點(diǎn)變更為主節(jié)點(diǎn)的處理邏輯,其實(shí)現(xiàn)要點(diǎn)如下:

  • 關(guān)閉元數(shù)據(jù)同步器,因?yàn)橹鞴?jié)點(diǎn)無(wú)需同步。

  • 開(kāi)啟定時(shí)任務(wù)處理線程。

  • 開(kāi)啟事務(wù)狀態(tài)回查處理線程。

  • 設(shè)置 brokerId 為 0。

  • 向 nameserver 立即發(fā)送心跳包以便告知 broker 服務(wù)器當(dāng)前最新的狀態(tài)。

主從節(jié)點(diǎn)狀態(tài)變更的核心方法就介紹到這里了,接下來(lái)看看如何觸發(fā)主從切換。

2、如何觸發(fā)主從切換

從前面的文章我們可以得知,RocketMQ DLedger 是基于 raft 協(xié)議實(shí)現(xiàn)的,在該協(xié)議中就實(shí)現(xiàn)了主節(jié)點(diǎn)的選舉與主節(jié)點(diǎn)失效后集群會(huì)自動(dòng)進(jìn)行重新選舉,經(jīng)過(guò)協(xié)商投票產(chǎn)生新的主節(jié)點(diǎn),從而實(shí)現(xiàn)高可用。

BrokerController#initialize

if (messageStoreConfig.isEnableDLegerCommitLog()) {
    DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
    ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}

上述代碼片段截取自 BrokerController 的 initialize 方法,我們可以得知在 Broker 啟動(dòng)時(shí),如果開(kāi)啟了 多副本機(jī)制,即 enableDLedgerCommitLog 參數(shù)設(shè)置為 true,會(huì)為 集群節(jié)點(diǎn)選主器添加 roleChangeHandler 事件處理器,即節(jié)點(diǎn)發(fā)送變更后的事件處理器。

接下來(lái)我們將重點(diǎn)探討 DLedgerRoleChangeHandler 。

2.1 類圖

RocketMQ DLedger多副本即主從切換的實(shí)現(xiàn)原理

DLedgerRoleChangeHandler 繼承自 RoleChangeHandler,即節(jié)點(diǎn)狀態(tài)發(fā)生變更后的事件處理器。上述的屬性都很簡(jiǎn)單,在這里就重點(diǎn)介紹一下 ExecutorService executorService,事件處理線程池,但只會(huì)開(kāi)啟一個(gè)線程,故事件將一個(gè)一個(gè)按順序執(zhí)行。

接下來(lái)我們來(lái)重點(diǎn)看一下 handle 方法的執(zhí)行。

2.2 handle 主從狀態(tài)切換處理邏輯

DLedgerRoleChangeHandler#handle

public void handle(long term, MemberState.Role role) {
    Runnable runnable = new Runnable() {
        public void run() {
            long start = System.currentTimeMillis();
            try {
                boolean succ = true;
                log.info("Begin handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole());
                switch (role) {
                    case CANDIDATE:    // @1
                        if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
                            brokerController.changeToSlave(dLedgerCommitLog.getId());
                        }
                        break;
                    case FOLLOWER:         // @2
                        brokerController.changeToSlave(dLedgerCommitLog.getId());
                        break;
                    case LEADER:           // @3
                        while (true) {
                            if (!dLegerServer.getMemberState().isLeader()) {
                                succ = false;
                                break;
                            }
                            if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == -1) {
                                break;
                            }
                            if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == dLegerServer.getdLedgerStore().getCommittedIndex()
                                && messageStore.dispatchBehindBytes() == 0) {
                                break;
                            }
                            Thread.sleep(100);
                        }
                        if (succ) {
                            messageStore.recoverTopicQueueTable();
                            brokerController.changeToMaster(BrokerRole.SYNC_MASTER);
                        }
                        break;
                    default:
                        break;
                }
                log.info("Finish handling broker role change succ={} term={} role={} currStoreRole={} cost={}", succ, term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start));
            } catch (Throwable t) {
                log.info("[MONITOR]Failed handling broker role change term={} role={} currStoreRole={} cost={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start), t);
            }
        }
    };
    executorService.submit(runnable);
}

代碼@1:如果當(dāng)前節(jié)點(diǎn)狀態(tài)機(jī)狀態(tài)為 CANDIDATE,表示正在發(fā)起 Leader 節(jié)點(diǎn),如果該服務(wù)器的角色不是 SLAVE 的話,需要將狀態(tài)切換為 SLAVE。

代碼@2:如果當(dāng)前節(jié)點(diǎn)狀態(tài)機(jī)狀態(tài)為 FOLLOWER,broker 節(jié)點(diǎn)將轉(zhuǎn)換為 從節(jié)點(diǎn)。

代碼@3:如果當(dāng)前節(jié)點(diǎn)狀態(tài)機(jī)狀態(tài)為 Leader,說(shuō)明該節(jié)點(diǎn)被選舉為 Leader,在切換到 Master 節(jié)點(diǎn)之前,首先需要等待當(dāng)前節(jié)點(diǎn)追加的數(shù)據(jù)都已經(jīng)被提交后才可以將狀態(tài)變更為 Master,其關(guān)鍵實(shí)現(xiàn)如下:

  • 如果 ledgerEndIndex 為 -1,表示當(dāng)前節(jié)點(diǎn)還未又?jǐn)?shù)據(jù)轉(zhuǎn)發(fā),直接跳出循環(huán),無(wú)需等待。

  • 如果 ledgerEndIndex 不為 -1 ,則必須等待數(shù)據(jù)都已提交,即 ledgerEndIndex 與 committedIndex 相等。

  • 并且需要等待 commitlog 日志全部已轉(zhuǎn)發(fā)到 consumequeue中,即 ReputMessageService 中的 reputFromOffset 與 commitlog 的 maxOffset 相等。

等待上述條件滿足后,即可以進(jìn)行狀態(tài)的變更,需要恢復(fù) ConsumeQueue,維護(hù)每一個(gè) queue 對(duì)應(yīng)的 maxOffset,然后將 broker 角色轉(zhuǎn)變?yōu)?master。

經(jīng)過(guò)上面的步驟,就能實(shí)時(shí)完成 broker 主節(jié)點(diǎn)的自動(dòng)切換。由于單從代碼的角度來(lái)看主從切換不夠直觀,下面我將給出主從切換的流程圖。

2.3 主從切換流程圖

由于從源碼的角度或許不夠直觀,故本節(jié)給出其流程圖。

> 溫馨提示:該流程圖的前半部分在 源碼分析 RocketMQ 整合 DLedger(多副本)實(shí)現(xiàn)平滑升級(jí)的設(shè)計(jì)技巧 該文中有所闡述。

RocketMQ DLedger多副本即主從切換的實(shí)現(xiàn)原理

3、主從切換若干問(wèn)題思考

我相信經(jīng)過(guò)上面的講解,大家應(yīng)該對(duì)主從切換的實(shí)現(xiàn)原理有了一個(gè)比較清晰的理解,我更相信讀者朋友們會(huì)拋出一個(gè)疑問(wèn),主從切換會(huì)不會(huì)丟失消息,消息消費(fèi)進(jìn)度是否會(huì)丟失而導(dǎo)致重復(fù)消費(fèi)呢?

3.1 消息消費(fèi)進(jìn)度是否存在丟失風(fēng)險(xiǎn)

首先,由于 RocketMQ 元數(shù)據(jù),當(dāng)然也包含消息消費(fèi)進(jìn)度的同步是采用的從服務(wù)器定時(shí)向主服務(wù)器拉取進(jìn)行更新,存在時(shí)延,引入 DLedger 機(jī)制,也并不保證其一致性,DLedger 只保證 commitlog 文件的一致性。

當(dāng)主節(jié)點(diǎn)宕機(jī)后,各個(gè)從節(jié)點(diǎn)并不會(huì)完成同步了消息消費(fèi)進(jìn)度,于此同時(shí),消息消費(fèi)繼續(xù),此時(shí)消費(fèi)者會(huì)繼續(xù)從從節(jié)點(diǎn)拉取消息進(jìn)行消費(fèi),但匯報(bào)的從節(jié)點(diǎn)并不一定會(huì)成為新的主節(jié)點(diǎn),故消費(fèi)進(jìn)度在 broker 端存在丟失的可能性。當(dāng)然并不是一定會(huì)丟失,因?yàn)橄⑾M(fèi)端只要不重啟,消息消費(fèi)進(jìn)度會(huì)存儲(chǔ)在內(nèi)存中。

綜合所述,消息消費(fèi)進(jìn)度在 broker 端會(huì)有丟失的可能性,存在重復(fù)消費(fèi)的可能性,不過(guò)問(wèn)題不大,因?yàn)?RocketMQ 本身也不承若不會(huì)重復(fù)消費(fèi)。

3.2 消息是否存在丟失風(fēng)險(xiǎn)

消息會(huì)不會(huì)丟失的關(guān)鍵在于,日志復(fù)制進(jìn)度較慢的從節(jié)點(diǎn)是否可以被選舉為主節(jié)點(diǎn),如果在一個(gè)集群中,從節(jié)點(diǎn)的復(fù)制進(jìn)度落后與從主節(jié)點(diǎn),但當(dāng)主節(jié)點(diǎn)宕機(jī)后,如果該從節(jié)點(diǎn)被選舉成為新的主節(jié)點(diǎn),那這將是一個(gè)災(zāi)難,將會(huì)丟失數(shù)據(jù)。關(guān)于一個(gè)節(jié)點(diǎn)是否給另外一個(gè)節(jié)點(diǎn)投贊成票的邏輯在 源碼分析 RocketMQ DLedger 多副本之 Leader 選主 的 2.4.2 handleVote 方法中已詳細(xì)介紹,在這里我以截圖的方式再展示其核心點(diǎn): RocketMQ DLedger多副本即主從切換的實(shí)現(xiàn)原理 RocketMQ DLedger多副本即主從切換的實(shí)現(xiàn)原理

從上面可以得知,如果發(fā)起投票節(jié)點(diǎn)的復(fù)制進(jìn)度比自己小的話,會(huì)投拒絕票。

RocketMQ DLedger多副本即主從切換的實(shí)現(xiàn)原理

RocketMQ DLedger多副本即主從切換的實(shí)現(xiàn)原理

必須得到集群內(nèi)超過(guò)半數(shù)節(jié)點(diǎn)認(rèn)可,即最終選舉出來(lái)的主節(jié)點(diǎn)的當(dāng)前復(fù)制進(jìn)度一定是比絕大多數(shù)的從節(jié)點(diǎn)要大,并且也會(huì)等于承偌給客戶端的已提交偏移量。故得出的結(jié)論是不會(huì)丟消息。

本文的介紹就到此為止了,最后拋出一個(gè)思考題與大家相互交流學(xué)習(xí),也算是對(duì) DLedger 多副本即主從切換一個(gè)總結(jié)回顧。答案我會(huì)以留言的方式或在下一篇文章中給出。

4、思考題

例如一個(gè)集群內(nèi)有5個(gè)節(jié)點(diǎn)的 DLedgr 集群。 Leader Node: n0-broker-a folloer Node: n1-broker-a,n2-broker-a,n3-broker-a,n4-broker-a

從節(jié)點(diǎn)的復(fù)制進(jìn)度可能不一致,例如: n1-broker-a復(fù)制進(jìn)度為 100 n2-broker-a復(fù)制進(jìn)度為 120 n3-broker-a復(fù)制進(jìn)度為 90 n4-broker-a負(fù)載進(jìn)度為 90

如果此時(shí) n0-broker-a 節(jié)點(diǎn)宕機(jī),觸發(fā)選主,如果 n1率先發(fā)起投票,由于 n1,的復(fù)制進(jìn)度大于 n3,n4,再加上自己一票,是有可能成為leader的,此時(shí)消息會(huì)丟失嗎?為什么?

“RocketMQ DLedger多副本即主從切換的實(shí)現(xiàn)原理”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI