溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

RocketMQ存儲中如何實現(xiàn)同步刷盤和異步刷盤

發(fā)布時間:2021-11-18 09:36:31 來源:億速云 閱讀:419 作者:小新 欄目:大數(shù)據(jù)

這篇文章將為大家詳細(xì)講解有關(guān)RocketMQ存儲中如何實現(xiàn)同步刷盤和異步刷盤,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。

一、問題思考

1.同步刷盤是怎么工作的?
2.異步刷盤是怎么工作的?
3.上篇文章的疑問,寫入堆外內(nèi)存的消息如何落盤的?

二、Broker啟動刷盤有關(guān)調(diào)用鏈
1.調(diào)用鏈

//初始化鏈條
@1 BrokerStartup#main
start(createBrokerController(args));
@2 BrokerStartup#createBrokerController
final BrokerController controller = new BrokerController(...)
boolean initResult = controller.initialize();
@3 BrokerController#initialize
this.messageStore = new DefaultMessageStore(...);
@4 DefaultMessageStore#DefaultMessageStore()
this.commitLog = new CommitLog(this);
@5 CommitLog#CommitLog()
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig()
.getFlushDiskType()) {
this.flushCommitLogService = new GroupCommitService();
} else {
this.flushCommitLogService = new FlushRealTimeService();
}
this.commitLogService = new CommitRealTimeService();
//啟動鏈條
@6 BrokerStartup#start
controller.start();
@7 BrokerController#start()
this.messageStore.start();
@8 DefaultMessageStore#start()
this.commitLog.start();
@9 CommitLog#start()
this.flushCommitLogService.start();
if (defaultMessageStore.getMessageStoreConfig()
.isTransientStorePoolEnable()) {
this.commitLogService.start();
}

小結(jié):由調(diào)用鏈可以看出,初始化并啟動了以下線程類

1.同步刷盤 GroupCommitService

2.異步刷盤 FlushRealTimeService

3.如果開啟堆外內(nèi)存并且為異步刷盤 CommitRealTimeService


2.線程類關(guān)系圖

RocketMQ存儲中如何實現(xiàn)同步刷盤和異步刷盤

三、線程類工作流程

既然線程類在Broker啟動時就啟動了,他們在做啥呢?

1.堆外內(nèi)存線程類CommitRealTimeService工作流程

RocketMQ存儲中如何實現(xiàn)同步刷盤和異步刷盤


小結(jié):
1.CommitRealTimeService主要工作是將寫入堆外內(nèi)存(writeBuffer)的消息,寫入到fileChannel中,fileChannel為commitLog文件通道

2.committedPosition用于記錄將writeBuffer數(shù)據(jù)寫入到fileChannel中的內(nèi)存位點(相對偏移量offset)
3.committedWhere用于記錄寫入fileChannel中的物理偏移量(文件名稱+相對偏移量offset)

2.同步刷盤線程類GroupCommitService工作流程

RocketMQ存儲中如何實現(xiàn)同步刷盤和異步刷盤

注1:

1.執(zhí)行onWaitEnd時交換讀寫容器,該線程類提供兩個容器來裝GroupCommitRequest

2.requestsWrite和requestsRead,每次執(zhí)行提交(刷盤)前都會進(jìn)行容器交換

3.好處:讀寫請求容器分離,避免潛在的鎖競爭

private void swapRequests() {
List<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}

注2:

1.flushedPosition 標(biāo)記已經(jīng)刷盤內(nèi)存的位點。即刷盤相對偏移量,刷盤到什么位置了,下次從此處刷盤即可

2.flushedWhere 標(biāo)記已經(jīng)刷盤的物理偏移量,根據(jù)此位置可精確查找到文件中消息的存儲位置。flushedWhere = 當(dāng)前刷盤文件名稱(該日志文件的起始物理偏移量) + flushedPosition

注3:流程圖中標(biāo)記紅色部分,將刷盤結(jié)果通知給等待線程

小結(jié):同步刷盤線程類GroupCommitService主要工作
將請求從讀容器中取出并通過mappedByteBuffer.force()將數(shù)據(jù)落盤。

3.異步刷盤線程類FlushRealTimeService工作流程

RocketMQ存儲中如何實現(xiàn)同步刷盤和異步刷盤

小結(jié):FlushRealTimeService主要工作
1.不開啟堆外外內(nèi)存刷盤方式為mappedByteBuffer.force()
2.開啟堆外內(nèi)存刷盤方式為fileChannel.force


疑問:同步刷盤線程類GroupCommitService每執(zhí)行一次都會交換讀寫容器,那刷盤請求什么時候放到寫容器(requestsWrite)呢?


四、消息追加與線程類的交互

分析完線程類后,把鏡頭切換到消息追加,看看消息進(jìn)來后是如何跟線程類交互的?


1.調(diào)用鏈

@1 CommitLog#putMessage
//同步刷盤或者異步刷盤
handleDiskFlush(result, putMessageResult, msg);
@2 CommitLog#handleDiskFlush

2.同步刷盤主要代碼

同步刷盤時構(gòu)造刷盤請求,將請求提交給線程類GroupCommitService,service.putRequest(request),并獲取刷盤結(jié)果。

if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
//等待MappedFile刷盤成功狀態(tài)通過countDownLatch來控制
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
}
}

3.異步刷盤主要代碼

未開啟堆外內(nèi)存喚醒FlushRealTimeServicee,開啟堆外內(nèi)存喚醒CommitRealTimeService。

if (!this.defaultMessageStore.getMessageStoreConfig()
.isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}

五、刷盤方式示意圖
1.同步刷盤示意圖

RocketMQ存儲中如何實現(xiàn)同步刷盤和異步刷盤


2.異步刷盤未開啟堆外緩存示意圖

RocketMQ存儲中如何實現(xiàn)同步刷盤和異步刷盤


3.異步刷盤開啟堆外緩存示意圖

RocketMQ存儲中如何實現(xiàn)同步刷盤和異步刷盤

關(guān)于“RocketMQ存儲中如何實現(xiàn)同步刷盤和異步刷盤”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,使各位可以學(xué)到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI