您好,登錄后才能下訂單哦!
這篇文章主要介紹“RocketMQ broker消息投遞流程處理PULL_MESSAGE請(qǐng)求的方法是什么”,在日常操作中,相信很多人在RocketMQ broker消息投遞流程處理PULL_MESSAGE請(qǐng)求的方法是什么問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”RocketMQ broker消息投遞流程處理PULL_MESSAGE請(qǐng)求的方法是什么”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!
RocketMq
消息處理整個(gè)流程如下:
消息接收:消息接收是指接收producer
的消息,處理類是SendMessageProcessor
,將消息寫(xiě)入到commigLog
文件后,接收流程處理完畢;
消息分發(fā):broker
處理消息分發(fā)的類是ReputMessageService
,它會(huì)啟動(dòng)一個(gè)線程,不斷地將commitLong
分到到對(duì)應(yīng)的consumerQueue
,這一步操作會(huì)寫(xiě)兩個(gè)文件:consumerQueue
與indexFile
,寫(xiě)入后,消息分發(fā)流程處理 完畢;
消息投遞:消息投遞是指將消息發(fā)往consumer
的流程,consumer
會(huì)發(fā)起獲取消息的請(qǐng)求,broker
收到請(qǐng)求后,調(diào)用PullMessageProcessor
類處理,從consumerQueue
文件獲取消息,返回給consumer
后,投遞流程處理完畢。
以上就是rocketMq
處理消息的流程了,接下來(lái)我們就從源碼來(lái)分析消息投遞的實(shí)現(xiàn)。
與producer
不同,consumer
從broker
拉取消息時(shí),發(fā)送的請(qǐng)求code
為PULL_MESSAGE
,processor
為PullMessageProcessor
,我們直接進(jìn)入它的processRequest
方法:
@Override public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { // 調(diào)用方法 return this.processRequest(ctx.channel(), request, true); }
這個(gè)方法就只是調(diào)用了一個(gè)重載方法,多出來(lái)的參數(shù)true
表示允許broker
掛起請(qǐng)求,我們繼續(xù),
/** * 繼續(xù)處理 */ private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)throws RemotingCommandException { RemotingCommand response = RemotingCommand .createResponseCommand(PullMessageResponseHeader.class); final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader(); final PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class); response.setOpaque(request.getOpaque()); // 省略權(quán)限校驗(yàn)流程 // 1. rocketMq 可以設(shè)置校驗(yàn)信息,以阻擋非法客戶端的連接 // 2. 同時(shí),對(duì)topic可以設(shè)置DENY(拒絕)、ANY(PUB 或者 SUB 權(quán)限)、PUB(發(fā)送權(quán)限)、SUB(訂閱權(quán)限)等權(quán)限, // 可以細(xì)粒度控制客戶端對(duì)topic的操作內(nèi)容 ... // 獲取訂閱組 SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager() .findSubscriptionGroupConfig(requestHeader.getConsumerGroup()); ... // 獲取訂閱主題 TopicConfig topicConfig = this.brokerController.getTopicConfigManager() .selectTopicConfig(requestHeader.getTopic()); ... // 處理filter // consumer在訂閱消息時(shí),可以對(duì)訂閱的消息進(jìn)行過(guò)濾,過(guò)濾方法有兩種:tag與sql92 // 這里我們重點(diǎn)關(guān)注拉取消息的流程,具體的過(guò)濾細(xì)節(jié)后面再分析 ... // 獲取消息 // 1. 根據(jù) topic 與 queueId 獲取 ConsumerQueue 文件 // 2. 根據(jù) ConsumerQueue 文件的信息,從 CommitLog 中獲取消息內(nèi)容 final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage( requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter); if (getMessageResult != null) { // 省略一大堆的校驗(yàn)過(guò)程 ... switch (response.getCode()) { // 表示消息可以處理,這里會(huì)把消息內(nèi)容寫(xiě)入到 response 中 case ResponseCode.SUCCESS: ... // 處理消息消息內(nèi)容,就是把消息從 getMessageResult 讀出來(lái),放到 response 中 if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) { final long beginTimeMills = this.brokerController.getMessageStore().now(); // 將消息內(nèi)容轉(zhuǎn)為byte數(shù)組 final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); ... response.setBody(r); } else { try { // 消息轉(zhuǎn)換 FileRegion fileRegion = new ManyMessageTransfer(response.encodeHeader( getMessageResult.getBufferTotalSize()), getMessageResult); channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { ... }); } catch (Throwable e) { ... } response = null; } break; // 未找到滿足條件的消息 case ResponseCode.PULL_NOT_FOUND: // 如果支持掛起,就掛起當(dāng)前請(qǐng)求 if (brokerAllowSuspend && hasSuspendFlag) { ... PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills, this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter); // 沒(méi)有找到相關(guān)的消息,掛起操作 this.brokerController.getPullRequestHoldService() .suspendPullRequest(topic, queueId, pullRequest); response = null; break; } // 省略其他類型的處理 ... break; default: assert false; } } else { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("store getMessage return null"); } ... return response; }
在源碼中,這個(gè)方法也是非常長(zhǎng),這里我抹去了各種細(xì)枝末節(jié),僅留下了一些重要的流程,整個(gè)處理流程如下:
權(quán)限校驗(yàn):rocketMq
可以設(shè)置校驗(yàn)信息,以阻擋非法客戶端的連接,同時(shí)也可以設(shè)置客戶端的發(fā)布、訂閱權(quán)限,細(xì)節(jié)度控制訪問(wèn)權(quán)限;
獲取訂閱組、訂閱主題等,這塊主要是通過(guò)請(qǐng)求消息里的內(nèi)容獲取broker
中對(duì)應(yīng)的記錄
創(chuàng)建過(guò)濾組件:consumer
在訂閱消息時(shí),可以對(duì)訂閱的消息進(jìn)行過(guò)濾,過(guò)濾方法有兩種:tag
與sql92
獲取消息:先是根據(jù) topic
與 queueId
獲取 ConsumerQueue
文件,根據(jù) ConsumerQueue
文件的信息,從 CommitLog
中獲取消息內(nèi)容,消息的過(guò)濾操作也是發(fā)生在這一步
轉(zhuǎn)換消息:如果獲得了消息,就是把具體的消息內(nèi)容,復(fù)制到reponse
中
掛起請(qǐng)求:如果沒(méi)獲得消息,而當(dāng)前請(qǐng)求又支持掛起,就掛起當(dāng)前請(qǐng)求
以上代碼還是比較清晰的,相關(guān)流程代碼中都作了注釋。
以上流程就是整個(gè)消息的獲取流程了,在本文中,我們僅關(guān)注與獲取消息相關(guān)的步驟,重點(diǎn)關(guān)注以下兩個(gè)操作:
獲取消息
掛起請(qǐng)求
獲取消息的方法為DefaultMessageStore#getMessage
,代碼如下:
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums, final MessageFilter messageFilter) { // 省略一些判斷 ... // 根據(jù)topic與queueId一個(gè)ConsumeQueue,consumeQueue記錄的是消息在commitLog的位置 ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); if (consumeQueue != null) { minOffset = consumeQueue.getMinOffsetInQueue(); maxOffset = consumeQueue.getMaxOffsetInQueue(); if (...) { // 判斷 offset 是否符合要求 ... } else { // 從 consumerQueue 文件中獲取消息 SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset); if (bufferConsumeQueue != null) { ... for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { // 省略一大堆的消息過(guò)濾操作 ... // 從 commitLong 獲取消息 SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy); if (null == selectResult) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.MESSAGE_WAS_REMOVING; } nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy); continue; } // 省略一大堆的消息過(guò)濾操作 ... } } } else { status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE; nextBeginOffset = nextOffsetCorrection(offset, 0); } if (GetMessageStatus.FOUND == status) { this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet(); } else { this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet(); } long elapsedTime = this.getSystemClock().now() - beginTime; this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime); getResult.setStatus(status); // 又是處理 offset getResult.setNextBeginOffset(nextBeginOffset); getResult.setMaxOffset(maxOffset); getResult.setMinOffset(minOffset); return getResult; }
這個(gè)方法不是比較長(zhǎng)的,這里僅保留了關(guān)鍵流程,獲取消息的關(guān)鍵流程如下:
根據(jù)topic
與queueId
找到ConsumerQueue
從ConsumerQueue
對(duì)應(yīng)的文件中獲取消息信息,如tag
的hashCode
、消息在commitLog
中的位置信息
根據(jù)位置信息,從commitLog
中獲取完整的消息
經(jīng)過(guò)以上步驟,消息就能獲取到了,不過(guò)在獲取消息的前后,會(huì)進(jìn)行消息過(guò)濾操作,即根據(jù)tag
或sql
語(yǔ)法來(lái)過(guò)濾消息,關(guān)于消息過(guò)濾的一些細(xì)節(jié),我們留到后面消息過(guò)濾相關(guān)章節(jié)作進(jìn)一步分析。
當(dāng)broker
無(wú)新消息時(shí),consumer
拉取消息的請(qǐng)求就會(huì)掛起,方法為PullRequestHoldService#suspendPullRequest
:
public class PullRequestHoldService extends ServiceThread { private ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable = new ConcurrentHashMap<String, ManyPullRequest>(1024); public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) { String key = this.buildKey(topic, queueId); ManyPullRequest mpr = this.pullRequestTable.get(key); if (null == mpr) { mpr = new ManyPullRequest(); ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr); if (prev != null) { mpr = prev; } } mpr.addPullRequest(pullRequest); } ... }
在suspendPullRequest
方法中,所做的工作僅是把當(dāng)前請(qǐng)求放入pullRequestTable
中了。從代碼中可以看到,pullRequestTable
是一個(gè)ConcurrentMap
,key
是 topic@queueId
,value
就是掛起的請(qǐng)求了。
請(qǐng)求掛起后,何時(shí)處理呢?這就是PullRequestHoldService
線程的工作了。
看完PullRequestHoldService#suspendPullRequest
方法后,我們?cè)賮?lái)看看PullRequestHoldService
。
PullRequestHoldService
是ServiceThread
的子類(上一次看到ServiceThread
的子類還是ReputMessageService
),它也會(huì)啟動(dòng)一個(gè)新線程來(lái)處理掛起操作。
我們先來(lái)看看它是在哪里啟動(dòng)PullRequestHoldService
的線程的,在BrokerController
的啟動(dòng)方法start()
中有這么一行:
BrokerController#start
public void start() throws Exception { ... if (this.pullRequestHoldService != null) { this.pullRequestHoldService.start(); } ... }
這里就是啟動(dòng)pullRequestHoldService
的線程操作了。
為了探究這個(gè)線程做了什么,我們進(jìn)入PullRequestHoldService#run
方法:
@Override public void run() { log.info("{} service started", this.getServiceName()); while (!this.isStopped()) { try { // 等待中 if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { this.waitForRunning(5 * 1000); } else { this.waitForRunning( this.brokerController.getBrokerConfig().getShortPollingTimeMills()); } long beginLockTimestamp = this.systemClock.now(); // 檢查操作 this.checkHoldRequest(); long costTime = this.systemClock.now() - beginLockTimestamp; if (costTime > 5 * 1000) { log.info("[NOTIFYME] check hold request cost {} ms.", costTime); } } catch (Throwable e) { log.warn(this.getServiceName() + " service has exception. ", e); } } log.info("{} service end", this.getServiceName()); }
從代碼來(lái)看,這個(gè)線程先是進(jìn)行等待,然后調(diào)用PullRequestHoldService#checkHoldRequest
方法,看來(lái)關(guān)注就是這個(gè)方法了,它的代碼如下:
private void checkHoldRequest() { for (String key : this.pullRequestTable.keySet()) { String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR); if (2 == kArray.length) { String topic = kArray[0]; int queueId = Integer.parseInt(kArray[1]); final long offset = this.brokerController.getMessageStore() .getMaxOffsetInQueue(topic, queueId); try { // 調(diào)用notifyMessageArriving方法操作 this.notifyMessageArriving(topic, queueId, offset); } catch (Throwable e) { log.error(...); } } } }
這個(gè)方法調(diào)用了PullRequestHoldService#notifyMessageArriving(...)
,我們繼續(xù)進(jìn)入:
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) { // 繼續(xù)調(diào)用 notifyMessageArriving(topic, queueId, maxOffset, null, 0, null, null); } /** * 這個(gè)方法就是最終調(diào)用的了 */ public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { String key = this.buildKey(topic, queueId); ManyPullRequest mpr = this.pullRequestTable.get(key); if (mpr != null) { List<PullRequest> requestList = mpr.cloneListAndClear(); if (requestList != null) { List<PullRequest> replayList = new ArrayList<PullRequest>(); for (PullRequest request : requestList) { // 判斷是否有新消息到達(dá),要根據(jù) comsumerQueue 的偏移量與request的偏移量判斷 long newestOffset = maxOffset; if (newestOffset <= request.getPullFromThisOffset()) { newestOffset = this.brokerController.getMessageStore() .getMaxOffsetInQueue(topic, queueId); } if (newestOffset > request.getPullFromThisOffset()) { boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode, new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap)); if (match && properties != null) { match = request.getMessageFilter().isMatchedByCommitLog(null, properties); } if (match) { try { // 喚醒操作 this.brokerController.getPullMessageProcessor() .executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed.", e); } continue; } } // 超時(shí)時(shí)間到了 if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) { try { // 喚醒操作 this.brokerController.getPullMessageProcessor() .executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed.", e); } continue; } replayList.add(request); } if (!replayList.isEmpty()) { mpr.addPullRequest(replayList); } } } }
這個(gè)方法就是用來(lái)檢查是否有新消息送達(dá)的操作了,方法雖然有點(diǎn)長(zhǎng),但可以用一句話來(lái)總結(jié):如果有新消息送達(dá),或者pullRquest
hold
住的時(shí)間到了,就喚醒pullRquest
(即調(diào)用PullMessageProcessor#executeRequestWhenWakeup
方法)。
在判斷是否有新消息送達(dá)時(shí),會(huì)獲取comsumerQueue
文件中的最大偏移量,與當(dāng)前pullRquest
中的偏移量進(jìn)行比較,如果前者大,就表示有新消息送達(dá)了,需要喚醒pullRquest
前面說(shuō)過(guò),當(dāng)consumer
請(qǐng)求沒(méi)獲取到消息時(shí),broker
會(huì)hold
這個(gè)請(qǐng)求一段時(shí)間(30s),當(dāng)這個(gè)時(shí)間到了,也會(huì)喚醒pullRquest
,之后就不會(huì)再hold
住它了
我們?cè)賮?lái)看看 PullMessageProcessor#executeRequestWhenWakeup
方法:
public void executeRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException { // 關(guān)注 Runnable#run() 方法即可 Runnable run = new Runnable() { @Override public void run() { try { // 再一次調(diào)用 PullMessageProcessor#processRequest(...) 方法 final RemotingCommand response = PullMessageProcessor.this .processRequest(channel, request, false); ... } catch (RemotingCommandException e1) { log.error("excuteRequestWhenWakeup run", e1); } } }; // 提交任務(wù) this.brokerController.getPullMessageExecutor() .submit(new RequestTask(run, channel, request)); }
這個(gè)方法準(zhǔn)備了一個(gè)任務(wù),然后將其提交到線程池中執(zhí)行,任務(wù)內(nèi)容很簡(jiǎn)單,僅是調(diào)用了PullMessageProcessor#processRequest(...)
方法,這個(gè)方法就是本節(jié)一始提到的處理consumer
拉取消息的方法了。
在分析消息分發(fā)流程時(shí),DefaultMessageStore.ReputMessageService#doReput
方法中有這么一段:
private void doReput() { ... // 分發(fā)消息 DefaultMessageStore.this.doDispatch(dispatchRequest); // 長(zhǎng)輪詢:如果有消息到了主節(jié)點(diǎn),并且開(kāi)啟了長(zhǎng)輪詢 if (BrokerRole.SLAVE != DefaultMessageStore.this .getMessageStoreConfig().getBrokerRole() &&DefaultMessageStore.this.brokerConfig.isLongPollingEnable()){ // 調(diào)用NotifyMessageArrivingListener的arriving方法 DefaultMessageStore.this.messageArrivingListener.arriving( dispatchRequest.getTopic(), dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); } ... }
這段就是用來(lái)主動(dòng)喚醒hold
住的consumer
請(qǐng)求的,我們進(jìn)入NotifyMessageArrivingListener#arriving
方法:
@Override public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties); }
最終它也是調(diào)用了 PullRequestHoldService#notifyMessageArriving(...)
方法。
到此,關(guān)于“RocketMQ broker消息投遞流程處理PULL_MESSAGE請(qǐng)求的方法是什么”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。