溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

RocketMQ中broker消息存儲之如何實(shí)現(xiàn)拉取消息

發(fā)布時間:2021-12-17 14:22:02 來源:億速云 閱讀:252 作者:小新 欄目:大數(shù)據(jù)

這篇文章給大家分享的是有關(guān)RocketMQ中broker消息存儲之如何實(shí)現(xiàn)拉取消息的內(nèi)容。小編覺得挺實(shí)用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。

在consumer拉取消息時,broker首先會根據(jù)待拉取的topic+queueId得到對應(yīng)的ConsumeQueue,再根據(jù)消費(fèi)offset從ConsumeQueue相應(yīng)的偏移位置中獲取該消息在commitlog里真實(shí)的offset/msgsize/tagscode信息,最后再從commitlog查出消息體。

消息拉取在broker存儲層的調(diào)用入口為DefaultMessageStore.getMessage方法。核心邏輯如下:

    // DefaultMessageStore.java
    public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
        final int maxMsgNums,
        final MessageFilter messageFilter) {
        // ...

        // 1. 定位ConsumeQueue
        ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
        if (consumeQueue != null) {
            minOffset = consumeQueue.getMinOffsetInQueue();
            maxOffset = consumeQueue.getMaxOffsetInQueue();

            if (maxOffset == 0) {
                status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
                nextBeginOffset = nextOffsetCorrection(offset, 0);
            } else if (offset < minOffset) {
                status = GetMessageStatus.OFFSET_TOO_SMALL;
                nextBeginOffset = nextOffsetCorrection(offset, minOffset);
            } else if (offset == maxOffset) {
                status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
                nextBeginOffset = nextOffsetCorrection(offset, offset);
            } else if (offset > maxOffset) {
                status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
                if (0 == minOffset) {
                    nextBeginOffset = nextOffsetCorrection(offset, minOffset);
                } else {
                    nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
                }
            } else {
                // 2. 從ConsumeQueue中讀取消費(fèi)偏移offset處的內(nèi)容
                SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
                if (bufferConsumeQueue != null) {
                    try {
                        status = GetMessageStatus.NO_MATCHED_MESSAGE;

                        long nextPhyFileStartOffset = Long.MIN_VALUE;
                        long maxPhyOffsetPulling = 0;

                        int i = 0;
                        final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE); // 單個請求最大拉取數(shù)據(jù)量
                        final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                        for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();  // commitlog offset 8bytes
                            int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); // msg size 4bytes
                            long tagsCode = bufferConsumeQueue.getByteBuffer().getLong(); // tags hashcode 8bytes

                            // ...

                            // 3. 通過tagscode快速過濾
                            if (messageFilter != null
                                && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
                                if (getResult.getBufferTotalSize() == 0) {
                                    status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                }

                                continue;
                            }

                            // 4. 從commitlog獲取消息體
                            SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
                            if (null == selectResult) {
                                if (getResult.getBufferTotalSize() == 0) {
                                    status = GetMessageStatus.MESSAGE_WAS_REMOVING;
                                }

                                nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
                                continue;
                            }

                            // 5. 通過消息體過濾
                            if (messageFilter != null
                                && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
                                if (getResult.getBufferTotalSize() == 0) {
                                    status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                }
                                // release...
                                selectResult.release();
                                continue;
                            }

                            this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
                            // 6.添加到返回結(jié)果
                            getResult.addMessage(selectResult);
                            status = GetMessageStatus.FOUND;
                            nextPhyFileStartOffset = Long.MIN_VALUE;
                        }

                        // ...
                    } finally {

                        bufferConsumeQueue.release();
                    }
                } else {
                    // ...
                }
            }
        } else {
            status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
            nextBeginOffset = nextOffsetCorrection(offset, 0);
        }

        // ...
        return getResult;
    }

ConsumeQueue中存儲的是固定長度(每個消息20字節(jié))的內(nèi)容,因此訪問比較簡單:

    // ConsumeQueue.java    
    public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
        int mappedFileSize = this.mappedFileSize;
        long offset = startIndex * CQ_STORE_UNIT_SIZE; // 消費(fèi)者offset * 固定20字節(jié)長度
        if (offset >= this.getMinLogicOffset()) {
            // 定位到所屬的MappedFile
            MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
            if (mappedFile != null) {
                SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize)); // 從MappedFile中讀取實(shí)際的數(shù)據(jù)
                return result;
            }
        }
        return null;
    }

通過ConsumeQueue獲取消息在commitlog中的偏移量以及消息大小之后,獲取消息體的方法如下

    // CommitLog.java
    public SelectMappedBufferResult getMessage(final long offset, final int size) {
        int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
        // 定位消息所在的MappedFile
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
        if (mappedFile != null) {
            int pos = (int) (offset % mappedFileSize);
            return mappedFile.selectMappedBuffer(pos, size); // 從MappedFile中獲取消息體
        }
        return null;
    }

消息拉取整體流程如下

RocketMQ中broker消息存儲之如何實(shí)現(xiàn)拉取消息

感謝各位的閱讀!關(guān)于“RocketMQ中broker消息存儲之如何實(shí)現(xiàn)拉取消息”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,讓大家可以學(xué)到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI