溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

RocketMQ的刷盤(pán)策略以及實(shí)現(xiàn)同步刷盤(pán)和異步刷盤(pán)的實(shí)例代碼

發(fā)布時(shí)間:2021-09-07 07:56:15 來(lái)源:億速云 閱讀:537 作者:chen 欄目:大數(shù)據(jù)

這篇文章主要介紹“RocketMQ的刷盤(pán)策略以及實(shí)現(xiàn)同步刷盤(pán)和異步刷盤(pán)的實(shí)例代碼”,在日常操作中,相信很多人在RocketMQ的刷盤(pán)策略以及實(shí)現(xiàn)同步刷盤(pán)和異步刷盤(pán)的實(shí)例代碼問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”RocketMQ的刷盤(pán)策略以及實(shí)現(xiàn)同步刷盤(pán)和異步刷盤(pán)的實(shí)例代碼”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!

1、刷盤(pán)策略

RocketMQ提供了兩種刷盤(pán)策略同步刷盤(pán)、異步刷盤(pán)

同步刷盤(pán):在消息到達(dá)MQ后,RocketMQ需要將數(shù)據(jù)持久化,同步刷盤(pán)是指數(shù)據(jù)到達(dá)內(nèi)存之后,必須刷到commitlog日志之后才算成功,然后返回producer數(shù)據(jù)已經(jīng)發(fā)送成功。

異步刷盤(pán):,同步刷盤(pán)是指數(shù)據(jù)到達(dá)內(nèi)存之后,返回producer說(shuō)數(shù)據(jù)已經(jīng)發(fā)送成功。,然后再寫(xiě)入commitlog日志。

復(fù)制方式優(yōu)點(diǎn)缺點(diǎn)適應(yīng)場(chǎng)景
同步刷盤(pán)保證了消息不丟失吞吐率相對(duì)于異步刷盤(pán)要低消息可靠性要求較高的場(chǎng)景
異步刷盤(pán)系統(tǒng)的吞吐量提高系統(tǒng)斷電等異常時(shí)會(huì)有部分丟失對(duì)應(yīng)吞吐量要求較高的場(chǎng)景

下面我們從源碼的角度分析其實(shí)現(xiàn)的邏輯

RocketMQ的刷盤(pán)策略以及實(shí)現(xiàn)同步刷盤(pán)和異步刷盤(pán)的實(shí)例代碼

2、同步刷盤(pán)

CommitLog.putMessage()方法中的刷盤(pán)的核心方法handleDiskFlush()

public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
    // Synchronization flush  同步刷盤(pán)
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        //客戶端確認(rèn)要等待刷盤(pán)成功
        if (messageExt.isWaitStoreMsgOK()) {
        	//封裝刷盤(pán)請(qǐng)求對(duì)象 nextoffset : 當(dāng)前內(nèi)存寫(xiě)的位置 + 本次要寫(xiě)入的字節(jié)數(shù)
            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
            //添加刷盤(pán)請(qǐng)求(后臺(tái)定時(shí)任務(wù)進(jìn)行刷盤(pán),每隔10毫秒批量刷盤(pán)。10毫秒中如果有多個(gè)請(qǐng)求,則多個(gè)請(qǐng)求一塊刷盤(pán))
            service.putRequest(request);
            //等待刷盤(pán)請(qǐng)求結(jié)果(最長(zhǎng)等待5秒鐘,刷盤(pán)成功后馬上可以獲取結(jié)果。)
            boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
            if (!flushOK) {
                log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                    + " client address: " + messageExt.getBornHostString());
                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
            }
        } else {
            service.wakeup();
        }
    }else {// Asynchronous flush 異步刷盤(pán)
        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
        	//喚醒FlushRealTimeService服務(wù)線程
            flushCommitLogService.wakeup();
        } else {
        	//喚醒CommitRealTimeService服務(wù)線程
            commitLogService.wakeup();
        }
    }
}

查看同步刷盤(pán)的核心類(lèi)GroupCommitService中的核心屬性

private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>(); private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>(); requestsWrite : 寫(xiě)隊(duì)列,主要用于向該線程添加刷盤(pán)任務(wù) requestsRead : 讀隊(duì)列,主要用于執(zhí)行特定的刷盤(pán)任務(wù),這是是GroupCommitService 設(shè)計(jì)的一個(gè)亮點(diǎn),把讀寫(xiě)分離,每處理完requestsRead中的任務(wù),就交換這兩個(gè)隊(duì)列。

我們查看其run()方法

public void run() {
    CommitLog.log.info(this.getServiceName() + " service started");
    while (!this.isStopped()) {
        try {
        	//等待通知,如果數(shù)據(jù)過(guò)來(lái),提前結(jié)束等待執(zhí)行onWaitEnd()方法交換讀寫(xiě)swapRequests()
        	//刷盤(pán)請(qǐng)求的requestsWrite->requestsRead
            this.waitForRunning(10);
            //執(zhí)行刷盤(pán)
            this.doCommit();
        } catch (Exception e) {
            CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }
    //省略代碼...
}

waitForRunning方法中執(zhí)行了swapRequests()方法

private void swapRequests() {
    List<GroupCommitRequest> tmp = this.requestsWrite;
    this.requestsWrite = this.requestsRead;
    this.requestsRead = tmp;
}

GroupCommitService接收到的刷盤(pán)請(qǐng)求通過(guò)putRequest()方法加入到requestsWrite集合中,swapRequests()方法將requestsWrite請(qǐng)求集合交換到requestsRead集合中供刷盤(pán)使用,我們重點(diǎn)查看doCommit()方法

private void doCommit() {
    synchronized (this.requestsRead) {
        if (!this.requestsRead.isEmpty()) {
        	//循環(huán)每一個(gè)刷盤(pán)請(qǐng)求
            for (GroupCommitRequest req : this.requestsRead) {
                // There may be a message in the next file, so a maximum of
                // two times the flush
                boolean flushOK = false;
                for (int i = 0; i < 2 && !flushOK; i++) {
                	//判斷是否已經(jīng)刷盤(pán)過(guò)了,刷盤(pán)的位置和當(dāng)前消息下次刷盤(pán)需要的位置比較
                    flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
                    if (!flushOK) {
                    	//0代碼立刻刷盤(pán),不管緩存中消息有多少
                        CommitLog.this.mappedFileQueue.flush(0);
                    }
                }
                //返回刷盤(pán)的結(jié)果
                req.wakeupCustomer(flushOK);
            }
            long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
            //設(shè)置刷盤(pán)的時(shí)間點(diǎn)
            if (storeTimestamp > 0) {
                CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
            }
            //清空requestsRead對(duì)象
            this.requestsRead.clear();
        } else {
            // Because of individual messages is set to not sync flush, it
            // will come to this process
            CommitLog.this.mappedFileQueue.flush(0);
        }
    }
}

mappedFileQueue.flush(0)立刻刷盤(pán)

public boolean flush(final int flushLeastPages) {
    boolean result = true;
    MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
    if (mappedFile != null) {
        long tmpTimeStamp = mappedFile.getStoreTimestamp();
        //刷盤(pán),返回刷寫(xiě)到磁盤(pán)指針
        int offset = mappedFile.flush(flushLeastPages);
        //計(jì)算當(dāng)前的刷盤(pán)指針,之前的所有數(shù)據(jù)已經(jīng)持久化到磁盤(pán)中
        long where = mappedFile.getFileFromOffset() + offset;
        result = where == this.flushedWhere;
        this.flushedWhere = where;
        if (0 == flushLeastPages) {
            this.storeTimestamp = tmpTimeStamp;
        }
    }
    return result;
}

mappedFile.flush(0);保證立刻刷盤(pán)后面異步刷盤(pán)時(shí)也會(huì)調(diào)用mappedFile.flush()方法

3、異步刷盤(pán)

if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
	//喚醒FlushRealTimeService服務(wù)線程
    flushCommitLogService.wakeup();
} else {
	//喚醒CommitRealTimeService服務(wù)線程
    commitLogService.wakeup();
}

我們發(fā)現(xiàn)異步刷盤(pán)的時(shí)候有兩種方式,一種是堆外內(nèi)存池開(kāi)啟時(shí)啟動(dòng)CommitRealTimeService服務(wù)線程,另一個(gè)是默認(rèn)執(zhí)行的FlushRealTimeService服務(wù)線程進(jìn)行刷盤(pán)操作,關(guān)于TransientStorePoolEnable在《RocketMQ內(nèi)存映射》章節(jié)中的**“創(chuàng)建映射文件MappedFile”**中有介紹

RocketMQ的刷盤(pán)策略以及實(shí)現(xiàn)同步刷盤(pán)和異步刷盤(pán)的實(shí)例代碼

圖3-1

1、FlushRealTimeService

查看其run()方法

public void run() {
    CommitLog.log.info(this.getServiceName() + " service started");
    while (!this.isStopped()) {
    	// 每次刷盤(pán)的間隔時(shí)間,默認(rèn) 200ms
        int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
        // 每次commit最少的頁(yè)數(shù) 默認(rèn)4頁(yè)
        int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
        // 如果上次刷新的時(shí)間+該值 小于當(dāng)前時(shí)間,則改變flushPhysicQueueLeastPages =0 默認(rèn)為200
        int commitDataThoroughInterval =
            CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();

        long begin = System.currentTimeMillis();
        //距離上一次刷盤(pán)時(shí)間超過(guò)200ms則立刻刷盤(pán),commit最少的頁(yè)數(shù)置為0
        if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
            this.lastCommitTimestamp = begin;
            commitDataLeastPages = 0;
        }
        try {
        	//刷盤(pán)
            boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
            long end = System.currentTimeMillis();
            if (!result) {
                this.lastCommitTimestamp = end; // result = false means some data committed.
                //now wake up flush thread.
                flushCommitLogService.wakeup();
            }

            if (end - begin > 500) {
                log.info("Commit data to file costs {} ms", end - begin);
            }
            this.waitForRunning(interval);
        } catch (Throwable e) {
            CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
        }
    }

    boolean result = false;
    for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
        result = CommitLog.this.mappedFileQueue.commit(0);
        CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
    }
    CommitLog.log.info(this.getServiceName() + " service end");
    }
}

這種方式和同步刷盤(pán)一樣就是mappedFileQueue.commit(commitDataLeastPages)參數(shù)有限制,數(shù)據(jù)達(dá)到一定量的時(shí)候才進(jìn)行刷盤(pán)操作提高數(shù)據(jù)的刷盤(pán)性能。

2、CommitRealTimeService

查看其run()方法

public void run() {
    CommitLog.log.info(this.getServiceName() + " service started");
    while (!this.isStopped()) {
    	// 每次刷盤(pán)的間隔時(shí)間,默認(rèn) 200ms
        int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
        // 每次commit最少的頁(yè)數(shù) 默認(rèn)4頁(yè)
        int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
        // 如果上次刷新的時(shí)間+該值 小于當(dāng)前時(shí)間,則改變flushPhysicQueueLeastPages =0 默認(rèn)為200
        int commitDataThoroughInterval =
            CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();

        long begin = System.currentTimeMillis();
        //距離上一次刷盤(pán)時(shí)間超過(guò)200ms則立刻刷盤(pán),commit最少的頁(yè)數(shù)置為0
        if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
            this.lastCommitTimestamp = begin;
            commitDataLeastPages = 0;
        }
        try {
        	//刷盤(pán)
            boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
            long end = System.currentTimeMillis();
            //返回的是false說(shuō)明數(shù)據(jù)已經(jīng)commit到了fileChannel中
            if (!result) {
                this.lastCommitTimestamp = end; // result = false means some data committed.
                //now wake up flush thread.
                flushCommitLogService.wakeup();
            }
            if (end - begin > 500) {
                log.info("Commit data to file costs {} ms", end - begin);
            }
            this.waitForRunning(interval);
        } catch (Throwable e) {
            CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
        }
    }
    boolean result = false;
    for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
        result = CommitLog.this.mappedFileQueue.commit(0);
        CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
    }
    CommitLog.log.info(this.getServiceName() + " service end");
    }
}

我們發(fā)現(xiàn)其刷盤(pán)方法不一樣mappedFileQueue.commit()調(diào)用MappedFile.commit()方法

public int commit(final int commitLeastPages) {
    if (writeBuffer == null) {
        //no need to commit data to file channel, so just regard wrotePosition as committedPosition.
        return this.wrotePosition.get();
    }
    //如果提交的數(shù)據(jù)不滿commitLeastPages則不執(zhí)行本次的提交,待下一次提交
    if (this.isAbleToCommit(commitLeastPages)) {
        if (this.hold()) {
            commit0(commitLeastPages);
            this.release();
        } else {
            log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
        }
    }

    // All dirty data has been committed to FileChannel.
    if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
        this.transientStorePool.returnBuffer(writeBuffer);
        this.writeBuffer = null;
    }

    return this.committedPosition.get();
}

查看其核心刷盤(pán)方法

protected void commit0(final int commitLeastPages) {
    int writePos = this.wrotePosition.get();
    int lastCommittedPosition = this.committedPosition.get();
    if (writePos - this.committedPosition.get() > 0) {
        try {
        	//創(chuàng)建writeBuffer的共享緩存區(qū)
            ByteBuffer byteBuffer = writeBuffer.slice();
            //將指針回退到上一次提交的位置
            byteBuffer.position(lastCommittedPosition);
            //設(shè)置limit為writePos
            byteBuffer.limit(writePos);
            this.fileChannel.position(lastCommittedPosition);
            //將committedPosition指針到wrotePosition的數(shù)據(jù)復(fù)制(寫(xiě)入)到fileChannel中
            this.fileChannel.write(byteBuffer);
            //更新committedPosition指針為writePos
            this.committedPosition.set(writePos);
        } catch (Throwable e) {
            log.error("Error occurred when commit data to FileChannel.", e);
        }
    }
}

commit0()只是將緩存數(shù)據(jù)加入到fileChannel中,我們?cè)贑ommitRealTimeService.run()方法中看到喚醒flushCommitLogService線程需要將fileChannel中的數(shù)據(jù)flush到磁盤(pán)中,我們發(fā)現(xiàn)兩種方式都需要走flushCommitLogService.run()方法最后都執(zhí)行MappedFile.flush(int)

public int flush(final int flushLeastPages) {
    if (this.isAbleToFlush(flushLeastPages)) {
        if (this.hold()) {
            int value = getReadPosition();
            try {
                //We only append data to fileChannel or mappedByteBuffer, never both.
                if (writeBuffer != null || this.fileChannel.position() != 0) {
                    this.fileChannel.force(false);
                } else {
                    this.mappedByteBuffer.force();
                }
            } catch (Throwable e) {
                log.error("Error occurred when force data to disk.", e);
            }
            //設(shè)置刷盤(pán)后的指針
            this.flushedPosition.set(value);
            this.release();
        } else {
            log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
            this.flushedPosition.set(getReadPosition());
        }
    }
    return this.getFlushedPosition();
}

兩種緩存方式走的刷盤(pán)邏輯也不同,可以查看**“圖3-1”**兩種方式的處理流程圖

我們還發(fā)現(xiàn)一個(gè)方法isAbleToFlush()判斷是否需要刷盤(pán)

private boolean isAbleToFlush(final int flushLeastPages) {
    int flush = this.flushedPosition.get();
    int write = getReadPosition();
    if (this.isFull()) {
        return true;
    }
    if (flushLeastPages > 0) {
        return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
    }
    return write > flush;
}

同步刷盤(pán)時(shí)flushLeastPages=0立刻刷盤(pán)

異步刷盤(pán)時(shí)flushLeastPages=4 ,默認(rèn)是4,需要刷盤(pán)的數(shù)據(jù)達(dá)到PageCache的頁(yè)數(shù)4倍時(shí)才會(huì)刷盤(pán),或者距上一次刷盤(pán)時(shí)間>=200ms則設(shè)置flushLeastPages=0立刻刷盤(pán)

同步刷盤(pán)時(shí)無(wú)論消息的大小都立刻刷盤(pán),線程阻塞等待刷盤(pán)結(jié)果

異步刷盤(pán)有兩種方式但是其邏輯都是需要刷盤(pán)的數(shù)據(jù)OS_PAGE_SIZE的4倍即(1024 * 4)*4=16k或者距上一次刷盤(pán)時(shí)間>=200ms時(shí)才刷盤(pán),提高數(shù)據(jù)的刷盤(pán)性能

到此,關(guān)于“RocketMQ的刷盤(pán)策略以及實(shí)現(xiàn)同步刷盤(pán)和異步刷盤(pán)的實(shí)例代碼”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI