溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

RocketMQ消費失敗重試機(jī)制的示例分析

發(fā)布時間:2021-10-20 16:36:46 來源:億速云 閱讀:160 作者:柒染 欄目:大數(shù)據(jù)

RocketMQ消費失敗重試機(jī)制的示例分析,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。

現(xiàn)象:mq消費1次,重試3次,然后停止,如下實例展示

首次(reconsumeTimes=0)

MQ_CON_MSG gmcf-lsc-zhongbang-repu-calc-from-topic MSG MessageExt [queueId=1, storeSize=453, queueOffset=25, sysFlag=0, bornTimestamp=1566785215908, bornHost=/10.42.0.77:54608, storeTimestamp=1566785215908, storeHost=/10.42.0.244:10911, msgId=0A2A00F400002A9F000000000B77CE84, commitLogOffset=192401028, bodyCRC=53737244, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='gmcf-lsc-zhongbang-repu-calc-from-topic', flag=0, properties={MIN_OFFSET=0, _catChildMessageId1=gmcf-lsc-job-0a2a004d-435218-15071, HASH_CODE=690132963, MAX_OFFSET=26, _catRootMessageId=lts-0a2a006b-434483-0, CONSUME_START_TIME=1566785215911, SERIALIZE_CLASS=java.lang.String, _catParentMessageId=lts-0a2a006b-435218-524, _catChildMessageId=gmcf-lsc-job-0a2a004d-435218-15072, UNIQ_KEY=0A2A004D000938AF386882EAA5A40112, WAIT=true}, body=[71, 77, 66, 69, 67, 49, 57, 48, 56, 50, 52, 49, 48, 52, 48, 52, 54, 56, 57, 52, 48, 52, 48, 56, 48], transactionId='null’}]

第一次retry(reconsumeTimes=1,DELAY=3)

MQ_CON_MSG %RETRY%gmcf-lsc-consumerGroup MSG MessageExt [queueId=0, storeSize=608, queueOffset=1187, sysFlag=0, bornTimestamp=1566785215923, bornHost=/10.42.0.77:54608, storeTimestamp=1566785226241, storeHost=/10.42.0.244:10911, msgId=0A2A00F400002A9F000000000B785900, commitLogOffset=192436480, bodyCRC=893293938, reconsumeTimes=1, preparedTransactionOffset=0, toString()=Message{topic='gmcf-lsc-zhongbang-repu-calc-from-topic', flag=0, properties={_catChildMessageId1=gmcf-lsc-job-0a2a004d-435218-15075, _catRootMessageId=lts-0a2a006b-434483-0, CONSUME_START_TIME=1566785226242, SERIALIZE_CLASS=java.lang.String, _catParentMessageId=lts-0a2a006b-435218-524, _catChildMessageId=gmcf-lsc-job-0a2a004d-435218-15076, MIN_OFFSET=0, REAL_TOPIC=%RETRY%gmcf-lsc-consumerGroup, ORIGIN_MESSAGE_ID=0A2A00F400002A9F000000000B77D049, RETRY_TOPIC=gmcf-lsc-zhongbang-repu-calc-from-topic, HASH_CODE=1506102461, MAX_OFFSET=1188, UNIQ_KEY=0A2A004D000938AF386882EAA5B30113, WAIT=false, DELAY=3, REAL_QID=0}, body=[71, 77, 66, 69, 67, 49, 57, 48, 56, 50, 52, 49, 48, 52, 49, 49, 49, 50, 48, 55, 48, 52, 48, 56, 49], transactionId='null'}]

第二次retry(reconsumeTimes=2, DELAY=4)

MQ_CON_MSG %RETRY%gmcf-lsc-consumerGroup MSG MessageExt [queueId=0, storeSize=608, queueOffset=1209, sysFlag=0, bornTimestamp=1566785215923, bornHost=/10.42.0.77:54608, storeTimestamp=1566785256680, storeHost=/10.42.0.244:10911, msgId=0A2A00F400002A9F000000000B791399, commitLogOffset=192484249, bodyCRC=893293938, reconsumeTimes=2, preparedTransactionOffset=0, toString()=Message{topic='gmcf-lsc-zhongbang-repu-calc-from-topic', flag=0, properties={_catChildMessageId1=gmcf-lsc-job-0a2a004d-435218-15075, _catRootMessageId=lts-0a2a006b-434483-0, CONSUME_START_TIME=1566785256728, SERIALIZE_CLASS=java.lang.String, _catParentMessageId=lts-0a2a006b-435218-524, _catChildMessageId=gmcf-lsc-job-0a2a004d-435218-15076, MIN_OFFSET=0, REAL_TOPIC=%RETRY%gmcf-lsc-consumerGroup, ORIGIN_MESSAGE_ID=0A2A00F400002A9F000000000B77D049, RETRY_TOPIC=gmcf-lsc-zhongbang-repu-calc-from-topic, HASH_CODE=1506102461, MAX_OFFSET=1210, UNIQ_KEY=0A2A004D000938AF386882EAA5B30113, WAIT=false, DELAY=4, REAL_QID=0}, body=[71, 77, 66, 69, 67, 49, 57, 48, 56, 50, 52, 49, 48, 52, 49, 49, 49, 50, 48, 55, 48, 52, 48, 56, 49], transactionId='null'}]

第三次retry(reconsumeTimes=3, DELAY=5)

MQ_CON_MSG %RETRY%gmcf-lsc-consumerGroup MSG MessageExt [queueId=0, storeSize=608, queueOffset=1228, sysFlag=0, bornTimestamp=1566785215923, bornHost=/10.42.0.77:54608, storeTimestamp=1566785316978, storeHost=/10.42.0.244:10911, msgId=0A2A00F400002A9F000000000B79F598, commitLogOffset=192542104, bodyCRC=893293938, reconsumeTimes=3, preparedTransactionOffset=0, toString()=Message{topic='gmcf-lsc-zhongbang-repu-calc-from-topic', flag=0, properties={_catChildMessageId1=gmcf-lsc-job-0a2a004d-435218-15075, _catRootMessageId=lts-0a2a006b-434483-0, CONSUME_START_TIME=1566785316980, SERIALIZE_CLASS=java.lang.String, _catParentMessageId=lts-0a2a006b-435218-524, _catChildMessageId=gmcf-lsc-job-0a2a004d-435218-15076, MIN_OFFSET=0, REAL_TOPIC=%RETRY%gmcf-lsc-consumerGroup, ORIGIN_MESSAGE_ID=0A2A00F400002A9F000000000B77D049, RETRY_TOPIC=gmcf-lsc-zhongbang-repu-calc-from-topic, HASH_CODE=1506102461, MAX_OFFSET=1231, UNIQ_KEY=0A2A004D000938AF386882EAA5B30113, WAIT=false, DELAY=5, REAL_QID=0}, body=[71, 77, 66, 69, 67, 49, 57, 48, 56, 50, 52, 49, 48, 52, 49, 49, 49, 50, 48, 55, 48, 52, 48, 56, 49], transactionId='null'}]

根據(jù)現(xiàn)象我們提出2個疑問?

1.為什么只會重試4次?而不是一直重試?

   try {            try {                if (messageExtWrappers.size() > 0) {                    try {                        var22 = messageExtWrappers.iterator();                        while(var22.hasNext()) {                            messageExt = (MessageExt)var22.next();                            span.addEvent("MQConsumer.from", messageExt.getBornHostString());                        }                    } catch (Throwable var16) {                        ;                    }                    this.consume(messageExtWrappers, context);                }                LOGGER.info("MQ_CON_SUCCESS {} BROKER {} QUEUE {}", new Object[]{topic, broker, queueId});                span.addEvent("MQConsumer", ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());                span.success();                ConsumeConcurrentlyStatus var23 = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;                return var23;            } catch (MessageListenerConcurrentlyException var17) {                LOGGER.error("MQ_CON_EX {} BROKER {} QUEUE {}", new Object[]{topic, broker, queueId, var17});                throw var17;            } catch (Throwable var18) {                LOGGER.error("MQ_CON_EX {} BROKER {} QUEUE {}", new Object[]{topic, broker, queueId, var18});                LOGGER.info("MQ_CON_RECONSUME {} BROKER {} QUEUE {}", new Object[]{topic, broker, queueId});                span.failed(var18);                span.addEvent("MQConsumer", ConsumeConcurrentlyStatus.RECONSUME_LATER.name());                if (CollectionUtils.isNotEmpty(msgs) && ((MessageExt)msgs.get(0)).getDelayTimeLevel() >= 2 + this.retryTimes) {                    context.setDelayLevelWhenNextConsume(-1);                }            }

從代碼可以看出,如果消費失敗了,我們自己的控制了重發(fā)次數(shù),代碼如下:

if (CollectionUtils.isNotEmpty(msgs) && ((MessageExt)msgs.get(0)).getDelayTimeLevel() >= 2 + this.retryTimes) {                    context.setDelayLevelWhenNextConsume(-1);                }

當(dāng)重試達(dá)到滿足條件的時候,不再重試,直接放到dlq隊列里面。如果不控制的,會一直重試到最高DelayLevel 18

2.DelayTimeLeve默認(rèn)的值為什么不是從0開始,而是從3開始?

我們知道RocketMQ的默認(rèn)的配置是messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,分別代表延遲level1-level18,為什么不是從1開始呢?

帶著疑問我們繼續(xù)深挖源碼,我們從DefaultMQPullConsumerImpl類里面找到一段代碼

   public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {        try {            String brokerAddr = null != brokerName ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());            if (UtilAll.isBlank(consumerGroup)) {                consumerGroup = this.defaultMQPullConsumer.getConsumerGroup();            }            this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, 3000L, this.defaultMQPullConsumer.getMaxReconsumeTimes());        } catch (Exception var8) {            this.log.error("sendMessageBack Exception, " + this.defaultMQPullConsumer.getConsumerGroup(), var8);            Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPullConsumer.getConsumerGroup()), msg.getBody());            String originMsgId = MessageAccessor.getOriginMessageId(msg);            MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);            newMsg.setFlag(msg.getFlag());            MessageAccessor.setProperties(newMsg, msg.getProperties());            MessageAccessor.putProperty(newMsg, "RETRY_TOPIC", msg.getTopic());            MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));            MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(this.defaultMQPullConsumer.getMaxReconsumeTimes()));            newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());            this.mQClientFactory.getDefaultMQProducer().send(newMsg);        }    }

從代碼中看到DelayTimeLevel =3+reconsumeTime

newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());

所以默認(rèn)重試時,實際是從3開始的,從時間的角度,也驗證為什么會重試4次,而且每次間隔的時間是10s/30s/1m .

看完上述內(nèi)容是否對您有幫助呢?如果還想對相關(guān)知識有進(jìn)一步的了解或閱讀更多相關(guān)文章,請關(guān)注億速云行業(yè)資訊頻道,感謝您對億速云的支持。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI