溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

RocketMQ中怎么對(duì)DLedger進(jìn)行整合

發(fā)布時(shí)間:2021-06-18 15:49:55 來(lái)源:億速云 閱讀:443 作者:Leah 欄目:大數(shù)據(jù)

RocketMQ中怎么對(duì)DLedger進(jìn)行整合,很多新手對(duì)此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來(lái)學(xué)習(xí)下,希望你能有所收獲。

1、閱讀源碼之前的思考

RocketMQ 的消息存儲(chǔ)文件主要包括 commitlog 文件、consumequeue 文件與 Index 文件。commitlog 文件存儲(chǔ)全量的消息,consumequeue、index 文件都是基于 commitlog 文件構(gòu)建的。要使用 DLedger 來(lái)實(shí)現(xiàn)消息存儲(chǔ)的一致性,應(yīng)該關(guān)鍵是要實(shí)現(xiàn) commitlog 文件的一致性,即 DLedger 要整合的對(duì)象應(yīng)該是 commitlog 文件,即只需保證 raft 協(xié)議的復(fù)制組內(nèi)各個(gè)節(jié)點(diǎn)的 commitlog 文件一致即可。

我們知道使用文件存儲(chǔ)消息都會(huì)基于一定的存儲(chǔ)格式,rocketmq 的 commitlog 一個(gè)條目就包含魔數(shù)、消息長(zhǎng)度,消息屬性、消息體等,而我們?cè)賮?lái)回顧一下 DLedger 日志的存儲(chǔ)格式: RocketMQ中怎么對(duì)DLedger進(jìn)行整合

DLedger 要整合 commitlog 文件,是不是可以把 rocketmq 消息,即一個(gè)個(gè) commitlog 條目整體當(dāng)成 DLedger 的 body 字段即可。

還等什么,跟我一起來(lái)看源碼吧?。。e急,再拋一個(gè)問(wèn)題,DLedger 整合 RocketMQ commitlog,能不能做到平滑升級(jí)?

帶著這些思考和問(wèn)題,一起來(lái)探究 DLedger 是如何整合 RocketMQ 的。

2、從 Broker 啟動(dòng)流程看 DLedger

> 溫馨提示:本文不會(huì)詳細(xì)介紹 Broker 端的啟動(dòng)流程,只會(huì)點(diǎn)出在啟動(dòng)過(guò)程中與 DLedger 相關(guān)的代碼,如想詳細(xì)了解 Broker 的啟動(dòng)流程,建議關(guān)注筆者的《RocketMQ技術(shù)內(nèi)幕》一書。

Broker 涉及到 DLedger 相關(guān)關(guān)鍵點(diǎn)如下: RocketMQ中怎么對(duì)DLedger進(jìn)行整合

2.1 構(gòu)建 DefaultMessageStore

DefaultMessageStore 構(gòu)造方法

if(messageStoreConfig.isEnableDLegerCommitLog()) {  // [@1](https://my.oschina.net/u/1198)
    this.commitLog = new DLedgerCommitLog(this);
 else {
    this.commitLog = new CommitLog(this);                    // @2
}

代碼@1:如果開啟 DLedger ,commitlog 的實(shí)現(xiàn)類為 DLedgerCommitLog,也是本文需要關(guān)注的關(guān)鍵所在。

代碼@2:如果未開啟 DLedger,則使用舊版的 Commitlog實(shí)現(xiàn)類。

2.2 增加節(jié)點(diǎn)狀態(tài)變更事件監(jiān)聽器

BrokerController#initialize

if (messageStoreConfig.isEnableDLegerCommitLog()) {
    DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
    ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}

主要調(diào)用 LedgerLeaderElector 的 addRoleChanneHandler 方法增加 節(jié)點(diǎn)角色變更事件監(jiān)聽器,DLedgerRoleChangeHandler 是實(shí)現(xiàn)主從切換的另外一個(gè)關(guān)鍵點(diǎn)。

2.3 調(diào)用 DefaultMessageStore 的 load 方法

DefaultMessageStore#load

// load Commit Log
result = result && this.commitLog.load();   // [@1](https://my.oschina.net/u/1198)
// load Consume Queue
result = result && this.loadConsumeQueue();  
if (result) {
    this.storeCheckpoint =  new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
    this.indexService.load(lastExitOK);
    this.recover(lastExitOK);                         // @2
    log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
}

代碼@1、@2 最終都是委托 commitlog 對(duì)象來(lái)執(zhí)行,這里的關(guān)鍵又是如果開啟了 DLedger,則最終調(diào)用的是 DLedgerCommitLog。

經(jīng)過(guò)上面的鋪墊,主角 DLedgerCommitLog “閃亮登場(chǎng)“了。

3、DLedgerCommitLog 詳解

> 溫馨提示:由于 Commitlog 的絕大部分方法都已經(jīng)在《RocketMQ技術(shù)內(nèi)幕》一書中詳細(xì)介紹了,并且 DLedgerCommitLog 的實(shí)現(xiàn)原理與 Commitlog 文件的實(shí)現(xiàn)原理類同,本文會(huì)一筆帶過(guò)關(guān)于存儲(chǔ)部分的實(shí)現(xiàn)細(xì)節(jié)。

3.1 核心類圖

RocketMQ中怎么對(duì)DLedger進(jìn)行整合

DLedgerCommitlog 繼承自 Commitlog。讓我們一一來(lái)看一下它的核心屬性。

  • DLedgerServer dLedgerServer 基于 raft 協(xié)議實(shí)現(xiàn)的集群內(nèi)的一個(gè)節(jié)點(diǎn),用 DLedgerServer 實(shí)例表示。

  • DLedgerConfig dLedgerConfig DLedger 的配置信息。

  • DLedgerMmapFileStore dLedgerFileStore DLedger 基于文件映射的存儲(chǔ)實(shí)現(xiàn)。

  • MmapFileList dLedgerFileList DLedger 所管理的存儲(chǔ)文件集合,對(duì)比 RocketMQ 中的 MappedFileQueue。

  • int id 節(jié)點(diǎn)ID,0 表示主節(jié)點(diǎn),非0表示從節(jié)點(diǎn)

  • MessageSerializer messageSerializer 消息序列器。

  • long beginTimeInDledgerLock = 0 用于記錄 消息追加的時(shí)耗(日志追加所持有鎖時(shí)間)。

  • long dividedCommitlogOffset = -1 記錄的舊 commitlog 文件中的最大偏移量,如果訪問(wèn)的偏移量大于它,則訪問(wèn) dledger 管理的文件。

  • boolean isInrecoveringOldCommitlog = false 是否正在恢復(fù)舊的 commitlog 文件。

接下來(lái)我們將詳細(xì)介紹 DLedgerCommitlog 各個(gè)核心方法及其實(shí)現(xiàn)要點(diǎn)。

3.2 構(gòu)造方法

public DLedgerCommitLog(final DefaultMessageStore defaultMessageStore) {
    super(defaultMessageStore);                   // @1
    dLedgerConfig =  new DLedgerConfig();
    dLedgerConfig.setEnableDiskForceClean(defaultMessageStore.getMessageStoreConfig().isCleanFileForciblyEnable());
    dLedgerConfig.setStoreType(DLedgerConfig.FILE);
    dLedgerConfig.setSelfId(defaultMessageStore.getMessageStoreConfig().getdLegerSelfId());
    dLedgerConfig.setGroup(defaultMessageStore.getMessageStoreConfig().getdLegerGroup());
    dLedgerConfig.setPeers(defaultMessageStore.getMessageStoreConfig().getdLegerPeers());
    dLedgerConfig.setStoreBaseDir(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
    dLedgerConfig.setMappedFileSizeForEntryData(defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog());
    dLedgerConfig.setDeleteWhen(defaultMessageStore.getMessageStoreConfig().getDeleteWhen());
    dLedgerConfig.setFileReservedHours(defaultMessageStore.getMessageStoreConfig().getFileReservedTime() + 1);  
    id = Integer.valueOf(dLedgerConfig.getSelfId().substring(1)) + 1;            // @2
    dLedgerServer = new DLedgerServer(dLedgerConfig);                           // @3
    dLedgerFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore();
    DLedgerMmapFileStore.AppendHook appendHook = (entry, buffer, bodyOffset) -> {
            assert bodyOffset == DLedgerEntry.BODY_OFFSET;
            buffer.position(buffer.position() + bodyOffset + MessageDecoder.PHY_POS_POSITION);
            buffer.putLong(entry.getPos() + bodyOffset);
    };
    dLedgerFileStore.addAppendHook(appendHook);   // @4
    dLedgerFileList = dLedgerFileStore.getDataFileList();
    this.messageSerializer = new MessageSerializer(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());   // @5
}

代碼@1:調(diào)用父類 即 CommitLog 的構(gòu)造函數(shù),加載 ${ROCKETMQ_HOME}/store/ comitlog 下的 commitlog 文件,以便兼容升級(jí) DLedger 的消息。我們稍微看一下 CommitLog 的構(gòu)造函數(shù): RocketMQ中怎么對(duì)DLedger進(jìn)行整合

代碼@2:構(gòu)建 DLedgerConfig 相關(guān)配置屬性,其主要屬性如下:

  • enableDiskForceClean 是否強(qiáng)制刪除文件,取自 broker 配置屬性 cleanFileForciblyEnable,默認(rèn)為 true 。

  • storeType DLedger 存儲(chǔ)類型,固定為 基于文件的存儲(chǔ)模式。

  • dLegerSelfId leader 節(jié)點(diǎn)的 id 名稱,示例配置:n0,其配置要求第二個(gè)字符后必須是數(shù)字。

  • dLegerGroup DLeger group 的名稱,建議與 broker 配置屬性 brokerName 保持一致。

  • dLegerPeers DLeger Group 中所有的節(jié)點(diǎn)信息,其配置示例 n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913。多個(gè)節(jié)點(diǎn)使用分號(hào)隔開。

  • storeBaseDir 設(shè)置 DLedger 的日志文件的根目錄,取自 borker 配件文件中的 storePathRootDir ,即 RocketMQ 的數(shù)據(jù)存儲(chǔ)根路徑。

  • mappedFileSizeForEntryData 設(shè)置 DLedger 的單個(gè)日志文件的大小,取自 broker 配置文件中的 - mapedFileSizeCommitLog,即與 commitlog 文件的單個(gè)文件大小一致。

  • deleteWhen DLedger 日志文件的刪除時(shí)間,取自 broker 配置文件中的 deleteWhen,默認(rèn)為凌晨 4點(diǎn)。

  • fileReservedHours DLedger 日志文件保留時(shí)長(zhǎng),取自 broker 配置文件中的 fileReservedHours,默認(rèn)為 72h。

代碼@3:根據(jù) DLedger 配置信息創(chuàng)建 DLedgerServer,即創(chuàng)建 DLedger 集群節(jié)點(diǎn),集群內(nèi)各個(gè)節(jié)點(diǎn)啟動(dòng)后,就會(huì)觸發(fā)選主。

代碼@4:構(gòu)建 appendHook 追加鉤子函數(shù),這是兼容 Commitlog 文件很關(guān)鍵的一步,后面會(huì)詳細(xì)介紹其作用。

代碼@5:構(gòu)建消息序列化。

根據(jù)上述的流程圖,構(gòu)建好 DefaultMessageStore 實(shí)現(xiàn)后,就是調(diào)用其 load 方法,在啟用 DLedger 機(jī)制后,會(huì)依次調(diào)用 DLedgerCommitlog 的 load、recover 方法。

3.3 load

public boolean load() {
    boolean result = super.load();
    if (!result) {
        return false;
    }
    return true;
}

DLedgerCommitLog 的 laod 方法實(shí)現(xiàn)比較簡(jiǎn)單,就是調(diào)用 其父類 Commitlog 的 load 方法,即這里也是為了啟用 DLedger 時(shí)能夠兼容以前的消息。

3.4 recover

在 Broker 啟動(dòng)時(shí)會(huì)加載 commitlog、consumequeue等文件,需要恢復(fù)其相關(guān)是數(shù)據(jù)結(jié)構(gòu),特別是與寫入、刷盤、提交等指針,其具體調(diào)用 recover 方法。 DLedgerCommitLog#recover

public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {  // @1
    recover(maxPhyOffsetOfConsumeQueue);
}

首先會(huì)先恢復(fù) consumequeue,得出 consumequeue 中記錄的最大有效物理偏移量,然后根據(jù)該物理偏移量進(jìn)行恢復(fù)。 接下來(lái)看一下該方法的處理流程與關(guān)鍵點(diǎn)。

DLedgerCommitLog#recover

dLedgerFileStore.load();

Step1:加載 DLedger 相關(guān)的存儲(chǔ)文件,并一一構(gòu)建對(duì)應(yīng)的 MmapFile,其初始化三個(gè)重要的指針 wrotePosition、flushedPosition、committedPosition 三個(gè)指針為文件的大小。

DLedgerCommitLog#recover

if (dLedgerFileList.getMappedFiles().size() > 0) {   
    dLedgerFileStore.recover();   // @1
    dividedCommitlogOffset = dLedgerFileList.getFirstMappedFile().getFileFromOffset();     // @2
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
    if (mappedFile != null) {                                                                                                       // @3
        disableDeleteDledger();
    }
    long maxPhyOffset = dLedgerFileList.getMaxWrotePosition();
    // Clear ConsumeQueue redundant data
    if (maxPhyOffsetOfConsumeQueue >= maxPhyOffset) {      // @4
        log.warn("[TruncateCQ]maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, maxPhyOffset);
        this.defaultMessageStore.truncateDirtyLogicFiles(maxPhyOffset);
    }
    return;
}

Step2:如果已存在 DLedger 的數(shù)據(jù)文件,則只需要恢復(fù) DLedger 相關(guān)數(shù)據(jù)文建,因?yàn)樵诩虞d舊的 commitlog 文件時(shí)已經(jīng)將其重要的數(shù)據(jù)指針設(shè)置為最大值。其關(guān)鍵實(shí)現(xiàn)點(diǎn)如下:

  • 首先調(diào)用 DLedger 文件存儲(chǔ)實(shí)現(xiàn)類 DLedgerFileStore 的 recover 方法,恢復(fù)管轄的 MMapFile 對(duì)象(一個(gè)文件對(duì)應(yīng)一個(gè)MMapFile實(shí)例)的相關(guān)指針,其實(shí)現(xiàn)方法與 RocketMQ 的 DefaultMessageStore 的恢復(fù)過(guò)程類似。

  • 設(shè)置 dividedCommitlogOffset 的值為 DLedger 中所有物理文件的最小偏移量。操作消息的物理偏移量小于該值,則從 commitlog 文件中查找;物理偏移量大于等于該值的話則從 DLedger 相關(guān)的文件中查找消息。

  • 如果存在舊的 commitlog 文件,則禁止刪除 DLedger 文件,其具體做法就是禁止強(qiáng)制刪除文件,并將文件的有效存儲(chǔ)時(shí)間設(shè)置為 10 年。

  • 如果 consumequeue 中存儲(chǔ)的最大物理偏移量大于 DLedger 中最大的物理偏移量,則刪除多余的 consumequeue 文件。

>溫馨提示:為什么當(dāng)存在 commitlog 文件的情況下,不能刪除 DLedger 相關(guān)的日志文件呢?

因?yàn)樵诖朔N情況下,如果 DLedger 中的物理文件有刪除,則物理偏移量會(huì)斷層。 RocketMQ中怎么對(duì)DLedger進(jìn)行整合

正常情況下, maxCommitlogPhyOffset 與 dividedCommitlogOffset 是連續(xù)的,這樣非常方便是訪問(wèn) commitlog 還是 訪問(wèn) DLedger ,但如果DLedger 部分文件刪除后,這兩個(gè)值就變的不連續(xù),就會(huì)造成中間的文件空洞,無(wú)法被連續(xù)訪問(wèn)。

DLedgerCommitLog#recover

isInrecoveringOldCommitlog = true;
super.recoverNormally(maxPhyOffsetOfConsumeQueue);
isInrecoveringOldCommitlog = false;

Step3:如果啟用了 DLedger 并且是初次啟動(dòng)(還未生成 DLedger 相關(guān)的日志文件),則需要恢復(fù) 舊的 commitlog 文件。

DLedgerCommitLog#recover

MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
if (mappedFile == null) {           // @1
    return;
}
ByteBuffer byteBuffer =  mappedFile.sliceByteBuffer();
byteBuffer.position(mappedFile.getWrotePosition());
boolean needWriteMagicCode = true;
// 1 TOTAL SIZE
byteBuffer.getInt(); //size
int magicCode = byteBuffer.getInt();
if (magicCode == CommitLog.BLANK_MAGIC_CODE) {   // @2
    needWriteMagicCode = false;
} else {
    log.info("Recover old commitlog found a illegal magic code={}", magicCode);
}
dLedgerConfig.setEnableDiskForceClean(false);
dividedCommitlogOffset = mappedFile.getFileFromOffset() + mappedFile.getFileSize();   // @3
log.info("Recover old commitlog needWriteMagicCode={} pos={} file={} dividedCommitlogOffset={}", needWriteMagicCode, mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(), mappedFile.getFileName(), dividedCommitlogOffset);
if (needWriteMagicCode) {  // @4
    byteBuffer.position(mappedFile.getWrotePosition());
    byteBuffer.putInt(mappedFile.getFileSize() - mappedFile.getWrotePosition());
    byteBuffer.putInt(BLANK_MAGIC_CODE);
    mappedFile.flush(0);
}
mappedFile.setWrotePosition(mappedFile.getFileSize());   // @5
mappedFile.setCommittedPosition(mappedFile.getFileSize());
mappedFile.setFlushedPosition(mappedFile.getFileSize());
dLedgerFileList.getLastMappedFile(dividedCommitlogOffset);
log.info("Will set the initial commitlog offset={} for dledger", dividedCommitlogOffset);
}

Step4:如果存在舊的 commitlog 文件,需要將最后的文件剩余部分全部填充,即不再接受新的數(shù)據(jù)寫入,新的數(shù)據(jù)全部寫入到 DLedger 的數(shù)據(jù)文件中。其關(guān)鍵實(shí)現(xiàn)點(diǎn)如下:

  • 嘗試查找最后一個(gè) commitlog 文件,如果未找到,則結(jié)束。

  • 從最后一個(gè)文件的最后寫入點(diǎn)(原 commitlog 文件的 待寫入位點(diǎn))嘗試去查找寫入的魔數(shù),如果存在魔數(shù)并等于 CommitLog.BLANK_MAGIC_CODE,則無(wú)需再寫入魔數(shù),在升級(jí) DLedger 第一次啟動(dòng)時(shí),魔數(shù)為空,故需要寫入魔數(shù)。

  • 初始化 dividedCommitlogOffset ,等于最后一個(gè)文件的起始偏移量加上文件的大小,即該指針指向最后一個(gè)文件的結(jié)束位置。

  • 將最后一個(gè) commitlog 未寫滿的數(shù)據(jù)全部寫入,其方法為 設(shè)置消息體的 size 與 魔數(shù)即可。

  • 設(shè)置最后一個(gè)文件的 wrotePosition、flushedPosition、committedPosition 為文件的大小,同樣有意味者最后一個(gè)文件已經(jīng)寫滿,下一條消息將寫入 DLedger 中。

在啟用 DLedger 機(jī)制時(shí) Broker 的啟動(dòng)流程就介紹到這里了,相信大家已經(jīng)了解 DLedger 在整合 RocketMQ 上做的努力,接下來(lái)我們從消息追加、消息讀取兩個(gè)方面再來(lái)探討 DLedger 是如何無(wú)縫整合 RocketMQ 的,實(shí)現(xiàn)平滑升級(jí)的。

4、從消息追加看 DLedger 整合 RocketMQ 如何實(shí)現(xiàn)無(wú)縫兼容

> 溫馨提示:本節(jié)同樣也不會(huì)詳細(xì)介紹整個(gè)消息追加(存儲(chǔ)流程),只是要點(diǎn)出與 DLedger(多副本、主從切換)相關(guān)的核心關(guān)鍵點(diǎn)。如果想詳細(xì)了解消息追加的流程,可以閱讀筆者所著的《RocketMQ技術(shù)內(nèi)幕》一書。

DLedgerCommitLog#putMessage

AppendEntryRequest request = new AppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
request.setBody(encodeResult.data);
dledgerFuture = (AppendFuture<appendentryresponse>) dLedgerServer.handleAppend(request);
if (dledgerFuture.getPos() == -1) {
    return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
}

關(guān)鍵點(diǎn)一:消息追加時(shí),則不再寫入到原先的 commitlog 文件中,而是調(diào)用 DLedgerServer 的 handleAppend 進(jìn)行消息追加,該方法會(huì)有集群內(nèi)的 Leader 節(jié)點(diǎn)負(fù)責(zé)消息追加以及在消息復(fù)制,只有超過(guò)集群內(nèi)的半數(shù)節(jié)點(diǎn)成功寫入消息后,才會(huì)返回寫入成功。如果追加成功,將會(huì)返回本次追加成功后的起始偏移量,即 pos 屬性,即類似于 rocketmq 中 commitlog 的偏移量,即物理偏移量。

DLedgerCommitLog#putMessage

long wroteOffset =  dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET;
ByteBuffer buffer = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);
String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset);
eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.data.length, msgId, System.currentTimeMillis(), queueOffset, eclipseTimeInLock);

關(guān)鍵點(diǎn)二:根據(jù) DLedger 的起始偏移量計(jì)算真正的消息的物理偏移量,從開頭部分得知,DLedger 自身有其存儲(chǔ)協(xié)議,其 body 字段存儲(chǔ)真實(shí)的消息,即 commitlog 條目的存儲(chǔ)結(jié)構(gòu),返回給客戶端的消息偏移量為 body 字段的開始偏移量,即通過(guò) putMessage 返回的物理偏移量與不使用Dledger 方式返回的物理偏移量的含義是一樣的,即從開偏移量開始,可以正確讀取消息,這樣 DLedger 完美的兼容了 RocketMQ Commitlog。關(guān)于 pos 以及 wroteOffset 的圖解如下: RocketMQ中怎么對(duì)DLedger進(jìn)行整合

5、從消息讀取看 DLedger 整合 RocketMQ 如何實(shí)現(xiàn)無(wú)縫兼容

DLedgerCommitLog#getMessage

public SelectMappedBufferResult getMessage(final long offset, final int size) {
    if (offset &lt; dividedCommitlogOffset) {   // @1
        return super.getMessage(offset, size);
    }
    int mappedFileSize = this.dLedgerServer.getdLedgerConfig().getMappedFileSizeForEntryData();
    MmapFile mappedFile = this.dLedgerFileList.findMappedFileByOffset(offset, offset == 0);   // @2
    if (mappedFile != null) {
        int pos = (int) (offset % mappedFileSize);
        return  convertSbr(mappedFile.selectMappedBuffer(pos, size));                                       // @3
    }
    return null;
}

消息查找比較簡(jiǎn)單,因?yàn)榉祷亟o客戶端消息,轉(zhuǎn)發(fā)給 consumequeue 的消息物理偏移量并不是 DLedger 條目的偏移量,而是真實(shí)消息的起始偏移量。其實(shí)現(xiàn)關(guān)鍵點(diǎn)如下:

  • 如果查找的物理偏移量小于 dividedCommitlogOffset,則從原先的 commitlog 文件中查找。

  • 然后根據(jù)物理偏移量按照二分方找到具體的物理文件。

  • 對(duì)物理偏移量取模,得出在該物理文件中中的絕對(duì)偏移量,進(jìn)行消息查找即可,因?yàn)橹挥兄榔湮锢砥屏浚瑥脑撎幭葘⑾⒌拈L(zhǎng)度讀取出來(lái),然后即可讀出一條完整的消息。

5、總結(jié)

根據(jù)上面詳細(xì)的介紹,我想讀者朋友們應(yīng)該不難得出如下結(jié)論:

  • DLedger 在整合時(shí),使用 DLedger 條目包裹 RocketMQ 中的 commitlog 條目,即在 DLedger 條目的 body 字段來(lái)存儲(chǔ)整條 commitlog 條目。

  • 引入 dividedCommitlogOffset 變量,表示物理偏移量小于該值的消息存在于舊的 commitlog 文件中,實(shí)現(xiàn) 升級(jí) DLedger 集群后能訪問(wèn)到舊的數(shù)據(jù)。

  • 新 DLedger 集群?jiǎn)?dòng)后,會(huì)將最后一個(gè) commitlog 填充,即新的數(shù)據(jù)不會(huì)再寫入到 原先的 commitlog 文件。

  • 消息追加到 DLedger 數(shù)據(jù)日志文件中,返回的偏移量不是 DLedger 條目的起始偏移量,而是DLedger 條目中 body 字段的起始偏移量,即真實(shí)消息的起始偏移量,保證消息物理偏移量的語(yǔ)義與 RocketMQ Commitlog一樣。

看完上述內(nèi)容是否對(duì)您有幫助呢?如果還想對(duì)相關(guān)知識(shí)有進(jìn)一步的了解或閱讀更多相關(guān)文章,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝您對(duì)億速云的支持。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI