溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

RocketMQ?Broker怎么實(shí)現(xiàn)高可用高并發(fā)的消息中轉(zhuǎn)服務(wù)

發(fā)布時(shí)間:2023-05-05 15:52:07 來源:億速云 閱讀:127 作者:iii 欄目:開發(fā)技術(shù)

這篇文章主要介紹“RocketMQ Broker怎么實(shí)現(xiàn)高可用高并發(fā)的消息中轉(zhuǎn)服務(wù)”的相關(guān)知識(shí),小編通過實(shí)際案例向大家展示操作過程,操作方法簡(jiǎn)單快捷,實(shí)用性強(qiáng),希望這篇“RocketMQ Broker怎么實(shí)現(xiàn)高可用高并發(fā)的消息中轉(zhuǎn)服務(wù)”文章能幫助大家解決問題。

RocketMq-broker

broker主要作用就是存儲(chǔ)消息。所以重點(diǎn)就放在它對(duì)于消息的處理上面。我提出幾個(gè)問題,后續(xù)看代碼解答。

  • broker啟動(dòng)的時(shí)候是怎么向nameserv進(jìn)行注冊(cè)的?

  • productor發(fā)送過來的消息是怎么儲(chǔ)存的?

  • comsumer是怎么在broker拉取數(shù)據(jù)的?

  • 高可用怎么做的?broker掛了怎么辦,數(shù)據(jù)肯定要有備份的

注冊(cè)

注冊(cè)的時(shí)候,就是在啟動(dòng)的時(shí)候,向所有的nameService注冊(cè)自己的信息。其中nameService的地址是可以在啟動(dòng)的時(shí)候配置的。代碼在org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll。這里我省略了其他代碼

    public List<RegisterBrokerResult> registerBrokerAll(
            final String clusterName,
            final String brokerAddr,
            final String brokerName,
            final long brokerId,
            final String haServerAddr,
            final TopicConfigSerializeWrapper topicConfigWrapper,
            final List<String> filterServerList,
            final boolean oneway,
            final int timeoutMills,
            final boolean enableActingMaster,
            final boolean compressed,
            final Long heartbeatTimeoutMillis,
            final BrokerIdentity brokerIdentity) {
        final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
        List<String> nameServerAddressList = this.remotingClient.getAvailableNameSrvList();
        if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
            final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
            for (final String namesrvAddr : nameServerAddressList) {
                brokerOuterExecutor.execute(new AbstractBrokerRunnable(brokerIdentity) {
                    @Override
                    public void run2() {
                        try {
                            RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
                            if (result != null) {
                                registerBrokerResultList.add(result);
                            }
                            LOGGER.info("Registering current broker to name server completed. TargetHost={}", namesrvAddr);
                        } catch (Exception e) {
                            LOGGER.error("Failed to register current broker to name server. TargetHost={}", namesrvAddr, e);
                        } finally {
                            countDownLatch.countDown();
                        }
                    }
                });
            }
            try {
                if (!countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS)) {
                    LOGGER.warn("Registration to one or more name servers does NOT complete within deadline. Timeout threshold: {}ms", timeoutMills);
                }
            } catch (InterruptedException ignore) {
            }
        }
        return registerBrokerResultList;
    }

這里用了countDownLatch來判斷一下所有broker注冊(cè)完成是否超時(shí),超時(shí)就打印一個(gè)warn。

消息存儲(chǔ)

具體可以看官網(wǎng)的文檔設(shè)計(jì)。我這里貼一部分內(nèi)容。

消息存儲(chǔ)架構(gòu)圖中主要有下面三個(gè)跟消息存儲(chǔ)相關(guān)的文件構(gòu)成。

(1) CommitLog:消息主體以及元數(shù)據(jù)的存儲(chǔ)主體,存儲(chǔ)Producer端寫入的消息主體內(nèi)容,消息內(nèi)容不是定長(zhǎng)的。單個(gè)文件大小默認(rèn)1G, 文件名長(zhǎng)度為20位,左邊補(bǔ)零,剩余為起始偏移量,比如00000000000000000000代表了第一個(gè)文件,起始偏移量為0,文件大小為1G=1073741824;當(dāng)?shù)谝粋€(gè)文件寫滿了,第二個(gè)文件為00000000001073741824,起始偏移量為1073741824,以此類推。消息主要是順序?qū)懭肴罩疚募?,?dāng)文件滿了,寫入下一個(gè)文件;

(2) ConsumeQueue:消息消費(fèi)索引,引入的目的主要是提高消息消費(fèi)的性能。ConsumeQueue作為消費(fèi)消息的索引,保存了指定Topic下的隊(duì)列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夾的組織方式如下:topic/queue/file三層組織結(jié)構(gòu)

(3) IndexFile:IndexFile(索引文件)提供了一種可以通過key或時(shí)間區(qū)間來查詢消息的方法。Index文件的存儲(chǔ)位置是:$HOME/store/index/{fileName},文件名fileName是以創(chuàng)建時(shí)的時(shí)間戳命名的,固定的單個(gè)IndexFile文件大小約為400M,一個(gè)IndexFile可以保存 2000W個(gè)索引,IndexFile的底層存儲(chǔ)設(shè)計(jì)為在文件系統(tǒng)中實(shí)現(xiàn)HashMap結(jié)構(gòu),故RocketMQ的索引文件其底層實(shí)現(xiàn)為hash索引。

具體請(qǐng)求是通過netty來處理的

NettyRemotingAbstract#processRequestCommand里面會(huì)根據(jù)請(qǐng)求code拿到具體的processor。

其中

  • SendMessageProcessor 負(fù)責(zé)處理 Producer 發(fā)送消息的請(qǐng)求;

  • PullMessageProcessor 負(fù)責(zé)處理 Consumer 消費(fèi)消息的請(qǐng)求;

  • QueryMessageProcessor 負(fù)責(zé)處理按照消息 Key 等查詢消息的請(qǐng)求。

數(shù)據(jù)寫入主要是在DefaultMessageStore#asyncPutMessage里面

    public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
        ......
        topicQueueLock.lock(topicQueueKey);
        try {
            boolean needAssignOffset = true;
            if (defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()
                && defaultMessageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
                needAssignOffset = false;
            }
            if (needAssignOffset) {
                defaultMessageStore.assignOffset(msg, getMessageNum(msg));
            }
            PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
            if (encodeResult != null) {
                return CompletableFuture.completedFuture(encodeResult);
            }
            msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer());
            PutMessageContext putMessageContext = new PutMessageContext(topicQueueKey);
            putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
            try {
                long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
                this.beginTimeInLock = beginLockTimestamp;
                // Here settings are stored timestamp, in order to ensure an orderly
                // global
                if (!defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
                    msg.setStoreTimestamp(beginLockTimestamp);
                }
                if (null == mappedFile || mappedFile.isFull()) {
                    // 首先獲取mappedFile
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
                }
                if (null == mappedFile) {
                    log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                    beginTimeInLock = 0;
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null));
                }
                // 寫入數(shù)據(jù)
                result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
                switch (result.getStatus()) {
                    case PUT_OK:
                        onCommitLogAppend(msg, result, mappedFile);
                        break;
                    case END_OF_FILE:
                        onCommitLogAppend(msg, result, mappedFile);
                        unlockMappedFile = mappedFile;
                        // Create a new file, re-write the message
                        mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                        if (null == mappedFile) {
                            // XXX: warn and notify me
                            log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                            beginTimeInLock = 0;
                            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result));
                        }
                        result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
                        if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) {
                            onCommitLogAppend(msg, result, mappedFile);
                        }
                        break;
                    case MESSAGE_SIZE_EXCEEDED:
                    case PROPERTIES_SIZE_EXCEEDED:
                        beginTimeInLock = 0;
                        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
                    case UNKNOWN_ERROR:
                        beginTimeInLock = 0;
                        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
                    default:
                        beginTimeInLock = 0;
                        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
                }
                elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
                beginTimeInLock = 0;
            } finally {
                putMessageLock.unlock();
            }
        } finally {
            topicQueueLock.unlock(topicQueueKey);
        }
        if (elapsedTimeInLock > 500) {
            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
        }
        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        }
        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
        // Statistics
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(result.getMsgNum());
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());
        // 刷盤策略
        return handleDiskFlushAndHA(putMessageResult, msg, needAckNums, needHandleHA);
    }

首先獲取mappedFile,可以理解就是commitLog文件的一個(gè)映射。創(chuàng)建mappedFile會(huì)同時(shí)提前創(chuàng)建兩個(gè)文件,避免了下次創(chuàng)建文件等待。

org.apache.rocketmq.store.AllocateMappedFileService#mmapOperation

private boolean mmapOperation() {
        boolean isSuccess = false;
        AllocateRequest req = null;
        try {
            req = this.requestQueue.take();
            AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
            if (null == expectedRequest) {
                log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
                    + req.getFileSize());
                return true;
            }
            if (expectedRequest != req) {
                log.warn("never expected here,  maybe cause timeout " + req.getFilePath() + " "
                    + req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
                return true;
            }
            if (req.getMappedFile() == null) {
                long beginTime = System.currentTimeMillis();
                MappedFile mappedFile;
                if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                    try {
                        mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
                        mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                    } catch (RuntimeException e) {
                        log.warn("Use default implementation.");
                        mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                    }
                } else {
                    mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize());
                }
                long elapsedTime = UtilAll.computeElapsedTimeMilliseconds(beginTime);
                if (elapsedTime > 10) {
                    int queueSize = this.requestQueue.size();
                    log.warn("create mappedFile spent time(ms) " + elapsedTime + " queue size " + queueSize
                        + " " + req.getFilePath() + " " + req.getFileSize());
                }
                // pre write mappedFile
                if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
                    .getMappedFileSizeCommitLog()
                    &&
                    this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
                    mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
                        this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
                }
                req.setMappedFile(mappedFile);
                this.hasException = false;
                isSuccess = true;
            }
        } catch (InterruptedException e) {
            log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
            this.hasException = true;
            return false;
        } catch (IOException e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
            this.hasException = true;
            if (null != req) {
                requestQueue.offer(req);
                try {
                    Thread.sleep(1);
                } catch (InterruptedException ignored) {
                }
            }
        } finally {
            if (req != null && isSuccess)
                req.getCountDownLatch().countDown();
        }
        return true;
    }

這里會(huì)去初始化mapperFile

org.apache.rocketmq.store.logfile.DefaultMappedFile#init

    private void init(final String fileName, final int fileSize) throws IOException {
        ......
        try {
            this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
            this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
            TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
            TOTAL_MAPPED_FILES.incrementAndGet();
            ok = true;
        } catch (FileNotFoundException e) {
            log.error("Failed to create file " + this.fileName, e);
            throw e;
        } catch (IOException e) {
            log.error("Failed to map file " + this.fileName, e);
            throw e;
        } finally {
            if (!ok && this.fileChannel != null) {
                this.fileChannel.close();
            }
        }
    }

這里其實(shí)就是用java的map創(chuàng)建文件。

如果開啟了堆外對(duì)象池,會(huì)用writeBuffer來寫入數(shù)據(jù)。讀取文件還是用mappedByteBuffer。

    @Override
    public void init(final String fileName, final int fileSize,
                     final TransientStorePool transientStorePool) throws IOException {
        init(fileName, fileSize);
        this.writeBuffer = transientStorePool.borrowBuffer();
        this.transientStorePool = transientStorePool;
    }

在創(chuàng)建好maperFile后,還有個(gè)預(yù)熱的操作

    public void warmMappedFile(FlushDiskType type, int pages) {
        this.mappedByteBufferAccessCountSinceLastSwap++;
        long beginTime = System.currentTimeMillis();
        ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
        int flush = 0;
        long time = System.currentTimeMillis();
        //通過寫入 1G 的字節(jié) 0 來讓操作系統(tǒng)分配物理內(nèi)存空間,如果沒有填充值,操作系統(tǒng)不會(huì)實(shí)際分配物理內(nèi)存,防止在寫入消息時(shí)發(fā)生缺頁異常
        for (int i = 0, j = 0; i < this.fileSize; i += DefaultMappedFile.OS_PAGE_SIZE, j++) {
            byteBuffer.put(i, (byte) 0);
            // force flush when flush disk type is sync
            if (type == FlushDiskType.SYNC_FLUSH) {
                if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
                    flush = i;
                    mappedByteBuffer.force();
                }
            }
            // 這里就是每隔一段時(shí)間sleep一下,這樣讓其他線程有執(zhí)行的機(jī)會(huì),這其中也包括gc線程,讓gc線程有機(jī)會(huì)在循環(huán)的中途可以執(zhí)行g(shù)c。避免很久才執(zhí)行一次gc
            // prevent gc
            if (j % 1000 == 0) {
                log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
                time = System.currentTimeMillis();
                try {
                    Thread.sleep(0);
                } catch (InterruptedException e) {
                    log.error("Interrupted", e);
                }
            }
        }
        // force flush when prepare load finished
        if (type == FlushDiskType.SYNC_FLUSH) {
            log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",
                    this.getFileName(), System.currentTimeMillis() - beginTime);
            mappedByteBuffer.force();
        }
        log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(),
                System.currentTimeMillis() - beginTime);
        this.mlock();
    }

因?yàn)橥ㄟ^ mmap 映射,只是建立了進(jìn)程虛擬內(nèi)存地址與物理內(nèi)存地址之間的映射關(guān)系,并沒有將 Page Cache 加載至內(nèi)存。讀寫數(shù)據(jù)時(shí)如果沒有命中寫 Page Cache 則發(fā)生缺頁中斷,從磁盤重新加載數(shù)據(jù)至內(nèi)存,這樣會(huì)影響讀寫性能。為了防止缺頁異常,阻止操作系統(tǒng)將相關(guān)的內(nèi)存頁調(diào)度到交換空間(swap space),RocketMQ 通過對(duì)文件預(yù)熱,將對(duì)應(yīng)page cache提前加載到內(nèi)存中。

然后中間循環(huán)會(huì)sleep一下,就是讓gc可以運(yùn)行。我復(fù)制一下chatGpt的回答:

這段代碼中的if (j % 1000 == 0)語句是為了防止頻繁的GC。在每次循環(huán)中,當(dāng)j的值是1000的倍數(shù)時(shí),會(huì)執(zhí)行一次Thread.sleep(0),這個(gè)操作會(huì)讓當(dāng)前線程暫停一小段時(shí)間,從而讓JVM有機(jī)會(huì)回收一些不再使用的對(duì)象。這樣做的目的是為了減少GC的頻率,從而提高程序的性能。

最后還有一個(gè)鎖定

    public void mlock() {
        final long beginTime = System.currentTimeMillis();
        final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
        Pointer pointer = new Pointer(address);
        {
            // 通過系統(tǒng)調(diào)用 mlock 鎖定該文件的 Page Cache,防止其被交換到 swap 空間
            int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
            log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
        }
        {
            // 通過系統(tǒng)調(diào)用 madvise 給操作系統(tǒng)建議,說明該文件在不久的將來要被訪問
            int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
            log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
        }
    }

然后就是對(duì)mapperFile進(jìn)行寫入消息。就是拿著buffer寫入具體的數(shù)據(jù)。

接著就是處理刷盤方式和高可用。

org.apache.rocketmq.store.CommitLog#handleDiskFlushAndHA

    private CompletableFuture<PutMessageResult> handleDiskFlushAndHA(PutMessageResult putMessageResult,
        MessageExt messageExt, int needAckNums, boolean needHandleHA) {
        // 處理刷盤機(jī)制
        CompletableFuture<PutMessageStatus> flushResultFuture = handleDiskFlush(putMessageResult.getAppendMessageResult(), messageExt);
        CompletableFuture<PutMessageStatus> replicaResultFuture;
        if (!needHandleHA) {
            replicaResultFuture = CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
        } else {
            // 處理HA
            replicaResultFuture = handleHA(putMessageResult.getAppendMessageResult(), putMessageResult, needAckNums);
        }
        return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
            if (flushStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(flushStatus);
            }
            if (replicaStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(replicaStatus);
            }
            return putMessageResult;
        });
    }

處理刷盤

org.apache.rocketmq.store.CommitLog.DefaultFlushManager#handleDiskFlush

 @Override
        public CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult result, MessageExt messageExt) {
            // Synchronization flush
            if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
                final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
                if (messageExt.isWaitStoreMsgOK()) {
                    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), CommitLog.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                    flushDiskWatcher.add(request);
                    service.putRequest(request);
                    return request.future();
                } else {
                    service.wakeup();
                    return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
                }
            }
            // Asynchronous flush
            else {
                if (!CommitLog.this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                    flushCommitLogService.wakeup();
                } else {
                    commitLogService.wakeup();
                }
                return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
            }
        }

根據(jù)配置的同步刷盤或者異步刷盤的機(jī)制來決定具體的刷盤策略。

處理高可用

org.apache.rocketmq.store.CommitLog#handleHA

    private CompletableFuture<PutMessageStatus> handleHA(AppendMessageResult result, PutMessageResult putMessageResult,
        int needAckNums) {
        if (needAckNums >= 0 && needAckNums <= 1) {
            return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
        }
        HAService haService = this.defaultMessageStore.getHaService();
        long nextOffset = result.getWroteOffset() + result.getWroteBytes();
        // Wait enough acks from different slaves
        GroupCommitRequest request = new GroupCommitRequest(nextOffset, this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout(), needAckNums);
        haService.putRequest(request);
        haService.getWaitNotifyObject().wakeupAll();
        return request.future();
    }

其實(shí)后臺(tái)一直有一個(gè)同步線程去處理消息同步的事情,只要比較一下master和salve的commitLog的offset就可以比較出來差多少數(shù)據(jù)了。所以把slave沒有的數(shù)據(jù)同步過去就可以了,這塊后面再寫一篇文章細(xì)講。

那還有一個(gè)問題,consumeQueue和indexFile是怎么處理的呢?

ReputMessageService里面會(huì)去讀取commitLog的數(shù)據(jù),寫入到comsunerQueue和IndexFile

根據(jù)各個(gè)dispatch,分別處理兩個(gè)文件。這里就不細(xì)講了。

ConsumeQueue的處理是在這里面

org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildConsumeQueue#dispatch

文件的名字其實(shí)就是topic/queueid。寫入的數(shù)據(jù)是

        this.byteBufferIndex.flip();
        this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
        this.byteBufferIndex.putLong(offset);
        this.byteBufferIndex.putInt(size);
        this.byteBufferIndex.putLong(tagsCode);

其實(shí)就是commitLog的一個(gè)offset,根據(jù)這個(gè)值就可以拿到具體的消息了。

org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildIndex

indexFile就是寫入這些數(shù)據(jù)

                this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
                this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

包括key的hash值,還有物理偏移,還有時(shí)間等信息。首先文件是按照每個(gè)毫秒創(chuàng)建的,所以天然就是按照時(shí)間順序排列。根據(jù)key查詢的話,寫入文件的位置是根據(jù)key的hash來的,所以可以馬上知道是哪個(gè)位置。

好了,到這里數(shù)據(jù)存儲(chǔ)就差不多了。來看看怎么讀消息的

消息讀取

消費(fèi)者拉取消息

拉取消息有自己的處理器:

org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest

里面有很多額外的邏輯,具體在下面的方法中:

org.apache.rocketmq.store.DefaultMessageStore#getMessage

消息讀取很簡(jiǎn)單,就是從根據(jù)topic和queueId去consumeQueue里面讀,消費(fèi)者知道上次拉取到了哪里,所以就直接根據(jù)consumeQueue的offset去讀內(nèi)容,consumeQueue里面存的是commitLog的offset和size,根據(jù)這兩個(gè)值就可以從commitLog里面拿到消息,返回。然后更新下次的offset,返回給productor。

按照key查詢

org.apache.rocketmq.store.DefaultMessageStore#queryMessage

主要是查的indexFile,前面提到indexFile就是按照時(shí)間來創(chuàng)建文件的,所以先按照時(shí)間篩選出符合條件的indexFile,然后根據(jù)key的hash,找到文件對(duì)應(yīng)的寫入位置,因?yàn)閷?duì)應(yīng)的hash會(huì)有沖突,就一個(gè)個(gè)遍歷,找到所有hash值相等的數(shù)據(jù)。然后再根據(jù)indexFile記錄的offset,去commitLog里面去查消息。

關(guān)于“RocketMQ Broker怎么實(shí)現(xiàn)高可用高并發(fā)的消息中轉(zhuǎn)服務(wù)”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí),可以關(guān)注億速云行業(yè)資訊頻道,小編每天都會(huì)為大家更新不同的知識(shí)點(diǎn)。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI