您好,登錄后才能下訂單哦!
小編給大家分享一下RocketMQ中broker消息存儲之如何實(shí)現(xiàn)消息轉(zhuǎn)儲,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!
broker在接收到producer發(fā)送的消息之后,首先會將消息存儲到CommitLog的末尾,然后通過一個(gè)異步的分發(fā)線程ReputMessageService將消息轉(zhuǎn)儲到ConsumeQueue以及IndexFile中。
轉(zhuǎn)儲的核心邏輯在ReputMessageService.doReput中:
// DefaultMessageStore.ReputMessageService private void doReput() { if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) { log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.", this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset()); this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset(); } for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) { break; } // 1. 獲取reputFromOffset偏移所指向的數(shù)據(jù) SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset); if (result != null) { try { this.reputFromOffset = result.getStartOffset(); for (int readSize = 0; readSize < result.getSize() && doNext; ) { // 2. 解析消息體 DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize(); if (dispatchRequest.isSuccess()) { if (size > 0) { // 3. 執(zhí)行分發(fā) DefaultMessageStore.this.doDispatch(dispatchRequest); // ... this.reputFromOffset += size; readSize += size; // ... } else if (size == 0) { this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset); readSize = result.getSize(); } } else if (!dispatchRequest.isSuccess()) { // ... } } } finally { result.release(); } } else { doNext = false; } } }
ConsumeQueue的插入操作如下:
// ConsumeQueue.java private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, final long cqOffset) { if (offset + size <= this.maxPhysicOffset) { log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset); return true; } // 1. 將commitlog offset/msg size/tags code寫到內(nèi)存緩存 this.byteBufferIndex.flip(); this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE); this.byteBufferIndex.putLong(offset); this.byteBufferIndex.putInt(size); this.byteBufferIndex.putLong(tagsCode); final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE; // ConsumeQueue中偏移 // 2. 獲取最后一個(gè)MappedFile MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset); if (mappedFile != null) { // ... this.maxPhysicOffset = offset + size; // 3. 寫入索引數(shù)據(jù) return mappedFile.appendMessage(this.byteBufferIndex.array()); } return false; }
IndexFile的寫入邏輯如下:
// IndexService.java public void buildIndex(DispatchRequest req) { IndexFile indexFile = retryGetAndCreateIndexFile(); if (indexFile != null) { long endPhyOffset = indexFile.getEndPhyOffset(); DispatchRequest msg = req; String topic = msg.getTopic(); String keys = msg.getKeys(); if (msg.getCommitLogOffset() < endPhyOffset) { return; } // ... if (keys != null && keys.length() > 0) { String[] keyset = keys.split(MessageConst.KEY_SEPARATOR); for (int i = 0; i < keyset.length; i++) { // 為每個(gè)key執(zhí)行寫入 String key = keyset[i]; if (key.length() > 0) { indexFile = putKey(indexFile, msg, buildKey(topic, key)); if (indexFile == null) { log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey()); return; } } } } } else { log.error("build index error, stop building index"); } } private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) { for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) { log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one"); indexFile = retryGetAndCreateIndexFile(); // 文件已滿,重試 if (null == indexFile) { return null; } ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); } return indexFile; }
消息轉(zhuǎn)儲的整體流程如下圖:
看完了這篇文章,相信你對“RocketMQ中broker消息存儲之如何實(shí)現(xiàn)消息轉(zhuǎn)儲”有了一定的了解,如果想了解更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。