您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關(guān)consumer數(shù)量變化會(huì)怎樣,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
ConsumerManager public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) { ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); if (null == consumerGroupInfo) { ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere); ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp); consumerGroupInfo = prev != null ? prev : tmp; } boolean r1 = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere); boolean r2 = consumerGroupInfo.updateSubscription(subList); if (r1 || r2) { if (isNotifyConsumerIdsChangedEnable) { //通知同組內(nèi)的其他consumer this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); } } this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList); return r1 || r2; } public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo, boolean isNotifyConsumerIdsChangedEnable) { ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); if (null != consumerGroupInfo) { consumerGroupInfo.unregisterChannel(clientChannelInfo); if (consumerGroupInfo.getChannelInfoTable().isEmpty()) { ConsumerGroupInfo remove = this.consumerTable.remove(group); if (remove != null) { log.info("unregister consumer ok, no any connection, and remove consumer group, {}", group); this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group); } } if (isNotifyConsumerIdsChangedEnable) { //單向通知channel this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); } } }
DefaultConsumerIdsChangeListener @Override public void handle(ConsumerGroupEvent event, String group, Object... args) { case CHANGE: if (args == null || args.length < 1) { return; } List<Channel> channels = (List<Channel>) args[0]; if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) { //對組內(nèi)的其他consumer的channel連接發(fā)送單向通知(不管對方有木有收到) for (Channel chl : channels) { this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group); } } break; }
Broker2Client public void notifyConsumerIdsChanged( final Channel channel, final String consumerGroup) { if (null == consumerGroup) { log.error("notifyConsumerIdsChanged consumerGroup is null"); return; } NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader(); requestHeader.setConsumerGroup(consumerGroup); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader); try { this.brokerController.getRemotingServer().invokeOneway(channel, request, 10); } catch (Exception e) { //發(fā)送異常,只是打印log log.error("notifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage()); } }
通知channel是單向的,也就是不管對方有沒有答復(fù),都認(rèn)為發(fā)送成功了,這樣會(huì)有兩種情況發(fā)生:
channel收到消息:收到消息后,channel會(huì)觸發(fā)rebalance,正常邏輯
channel沒收到消息:該consumer不會(huì)觸發(fā)rebalance,存在問題!
register:該consumer不知道已經(jīng)有新的consumer加入,造成同一個(gè)mq會(huì)有多個(gè)consumer進(jìn)行消費(fèi)
unregister:該consumer不知道有consumer下線,造成部分mq沒有consumer負(fù)責(zé)消費(fèi)
我們先看unregister這種情況
在consumer啟動(dòng)時(shí),會(huì)同時(shí)啟動(dòng)一個(gè)RebalanceService線程,這個(gè)線程做的事就是每隔20秒主動(dòng)進(jìn)行一次rebalance,這樣就能把unregister這種影響降低,最多導(dǎo)致該mq的消息會(huì)延遲20秒之后才有consumer負(fù)責(zé)消費(fèi)
RebalanceService private static long waitInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000")); @Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { this.waitForRunning(waitInterval); this.mqClientFactory.doRebalance(); } log.info(this.getServiceName() + " service end"); }
接下來分析比較大條的Register
同一個(gè)mq在同一組內(nèi)有不同的consumer消費(fèi),這種情況在clustering模式下是有大問題的,會(huì)造成重復(fù)消費(fèi),消費(fèi)進(jìn)度錯(cuò)誤等問題,帶著rocketmq應(yīng)該不至于犯如此低級錯(cuò)誤的想法再繼續(xù)看代碼,果然別有洞天
RebalanceImpl private void rebalanceByTopic(final String topic, final boolean isOrder) { //rebalance過程 //關(guān)鍵點(diǎn)在這,在上面rebalance完之后, 就能知道自己該負(fù)責(zé)哪些mq的消費(fèi) boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); } private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) { for (MessageQueue mq : mqSet) { //如果是新增的mq,會(huì)嘗試調(diào)用遠(yuǎn)程broker lock mq,獲取鎖失敗,則說明有其他consumer獲取了鎖,自己應(yīng)該放棄消費(fèi)該mq if (!this.processQueueTable.containsKey(mq)) { if (isOrder && !this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } } } }
以上就是consumer數(shù)量變化會(huì)怎樣,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過這篇文章學(xué)到更多知識(shí)。更多詳情敬請關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。