溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

RocketMQ出現(xiàn)消息重試的場景分析以及代碼實現(xiàn)

發(fā)布時間:2021-09-07 07:57:37 來源:億速云 閱讀:133 作者:chen 欄目:大數(shù)據(jù)

這篇文章主要講解了“RocketMQ出現(xiàn)消息重試的場景分析以及代碼實現(xiàn)”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“RocketMQ出現(xiàn)消息重試的場景分析以及代碼實現(xiàn)”吧!

1、分析

我們分析下什么場景下會出現(xiàn)消息的重試

  • 業(yè)務(wù)消費方明確返回ConsumeConcurrentlyStatus.RECONSUME_LATER,即消費者對消息業(yè)務(wù)處理時自己的業(yè)務(wù)邏輯明確要求重新發(fā)送消息

  • 業(yè)務(wù)消費方主動/被動拋出異常

  • 由于網(wǎng)絡(luò)問題導(dǎo)致消息一直得不到確認

注意 對于拋出異常的情況,只要我們在業(yè)務(wù)邏輯中顯式拋出異常或者非顯式拋出異常,broker也會重新投遞消息,如果業(yè)務(wù)對異常做了捕獲,那么該消息將不會發(fā)起重試。因此對于需要重試的業(yè)務(wù),消費方在捕獲異常時要注意返回ConsumeConcurrentlyStatus.RECONSUME_LATER或null,輸出日志并打印當前重試次數(shù)。推薦返回ConsumeConcurrentlyStatus.RECONSUME_LATER。

只有當消費模式為 MessageModel.CLUSTERING(集群模式) 時,Broker才會自動進行重試,對于廣播消息是不會重試的

對于一直無法消費成功的消息,RocketMQ會在達到最大重試次數(shù)之后默認最大是16,將該消息投遞至死信隊列。然后我們需要關(guān)注死信隊列,并對死信隊列中的消息做人工的業(yè)務(wù)補償操作

重試次數(shù)就是延遲級別中的,重試次數(shù)增加其間隔時間也不同

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

可以在brocker配置 messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,自定義其時間級別。

2、代碼實現(xiàn)

2.1、生產(chǎn)者

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("gumx_test_delay");
        producer.setNamesrvAddr("10.10.15.205:9876;10.10.15.206:9876");
        producer.start();
        for (int i = 0; i < 1; i++) {
            try {
                Message msg = new Message("TopicDelayTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("測試延遲消息==Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }
}

2.2、消費者

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("gumx_test_delay_1");
        consumer.setNamesrvAddr("10.10.15.205:9876;10.10.15.206:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicDelayTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
            	try{
            		
	            	SimpleDateFormat sf = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");
	                System.out.printf("當前時間:%s 延遲級別:%s 重試次數(shù):%s 主題:%s 延遲主題:%s 消息內(nèi)容:%s %n",sf.format(new Date()),msgs.get(0).getDelayTimeLevel(),msgs.get(0).getReconsumeTimes(),msgs.get(0).getTopic(),msgs.get(0).getProperties().get("REAL_TOPIC"), new String(msgs.get(0).getBody(),"UTF-8"));
	                int i = 1/0; //故意報錯
	                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            	}catch (Exception e) {
            		return ConsumeConcurrentlyStatus.RECONSUME_LATER;
				}
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

查看結(jié)果:

RocketMQ出現(xiàn)消息重試的場景分析以及代碼實現(xiàn)

分析其結(jié)果其時間規(guī)則1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h就是默認配置的對應(yīng)延遲級別。發(fā)現(xiàn)有個問題延遲級別從0直接到3,我們知道普通消息的延遲級別默認是0,第二條才是真正開始重試的消息。為什么從3開始呢?下面我們分析下源碼,一探究竟。

3、源碼分析

我們先看一下其處理流程

RocketMQ出現(xiàn)消息重試的場景分析以及代碼實現(xiàn)

3.1、客戶端代碼分析

在RocketMQ的客戶端源碼DefaultMQPushConsumerImpl.java中,對重試機制做了說明,源碼如下:

private int getMaxReconsumeTimes() {
    // default reconsume times: 16
    if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
        return 16;
    } else {
        return this.defaultMQPushConsumer.getMaxReconsumeTimes();
    }
}

消費者可以設(shè)置其最大的消費次數(shù)MaxReconsumeTimes,如果沒有設(shè)置則默認的消費次數(shù)是16次為最大重試次數(shù),我們查看客戶端代碼

ConsumeMessageConcurrentlyService的內(nèi)部類方法ConsumeRequest.run()入口方法

long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
    ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
    if (msgs != null && !msgs.isEmpty()) {
        for (MessageExt msg : msgs) {
            MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
        }
    }
    status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
    log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
        RemotingHelper.exceptionSimpleDesc(e),
        ConsumeMessageConcurrentlyService.this.consumerGroup,
        msgs,
        messageQueue);
    hasException = true;
}

獲取這批消息的狀態(tài)調(diào)用ConsumeMessageConcurrentlyService.processConsumeResult()核心方法處理其返回的狀態(tài)信息。

//ackIndex = Integer.MAX_VALUE
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty())
    return;
//消費狀態(tài)
switch (status) {
    case CONSUME_SUCCESS:
    	//設(shè)置成功消息的下標
        if (ackIndex >= consumeRequest.getMsgs().size()) {
            ackIndex = consumeRequest.getMsgs().size() - 1;
        }
        int ok = ackIndex + 1;
        int failed = consumeRequest.getMsgs().size() - ok;
        this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
        this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
        break;
    case RECONSUME_LATER:
        ackIndex = -1;
        this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
            consumeRequest.getMsgs().size());
        break;
    default:
        break;
}

switch (this.defaultMQPushConsumer.getMessageModel()) {
    case BROADCASTING:
        for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
            MessageExt msg = consumeRequest.getMsgs().get(i);
            log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
        }
        break;
    case CLUSTERING:
        List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
        for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
            MessageExt msg = consumeRequest.getMsgs().get(i);
            //給broker反饋消費的進度
            boolean result = this.sendMessageBack(msg, context);
            if (!result) {
                msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                msgBackFailed.add(msg);
            }
        }
        if (!msgBackFailed.isEmpty()) {
            consumeRequest.getMsgs().removeAll(msgBackFailed);

            this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
        }
        break;
    default:
        break;
}

如果返回結(jié)果是 CONSUME_SUCCESS,此時 ackIndex = msg.size() - 1,,再看發(fā)送sendMessageBack 循環(huán)的條件,for (int i = ackIndex + 1; i < msg.size() ;;)從這里可以看出如果消息成功,則無需發(fā)送sendMsgBack給broker 如果返回結(jié)果是RECONSUME_LATER, 此時 ackIndex = -1 ,則這批所有的消息都會發(fā)送消息給Broker,也就是這一批消息都得重新消費。

如果發(fā)送ack消息失敗,則會延遲5S后重新在消費端重新消費。 首先消費者向Broker發(fā)送ACK消息,如果發(fā)生成功,重試機制由broker處理,如果發(fā)送ack消息失敗,則將該任務(wù)直接在消費者這邊,再次將本次消費任務(wù),默認演出5S后在消費者重新消費。

1)根據(jù)消費結(jié)果,設(shè)置ackIndex的值 2)如果是消費失敗,根據(jù)消費模式(集群消費還是廣播消費),廣播模式,直接丟棄,集群模式發(fā)送sendMessageBack 3) 更新消息消費進度,不管消費成功與否,上述這些消息消費成功,其實就是修改消費偏移量。(失敗的,會進行重試,會創(chuàng)建新的消息)

this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue())給broker發(fā)送消費狀態(tài)失敗則將本次失敗的消息放入msgBackFailed集合中,5秒后供消費端消費。

private void submitConsumeRequestLater(final List<MessageExt> msgs, 
		final ProcessQueue processQueue,  final MessageQueue messageQueue) {
    this.scheduledExecutorService.schedule(new Runnable() {
        @Override
        public void run() {
            ConsumeMessageConcurrentlyService.this.submitConsumeRequest(msgs, processQueue, messageQueue, true);
        }
    }, 5000, TimeUnit.MILLISECONDS);
}

3.2、服務(wù)端代碼分析

當消息消費失敗,客戶端會反饋其消費狀態(tài),Broker服務(wù)端會接收其反饋的消息消費狀態(tài)的處理邏輯代碼在 SendMessageProcessor.consumerSendMsgBack()方法,我們查看部分的核心源碼:

//設(shè)置主題%RETRY% + consumerGroup
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
    topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
    newTopic,
    subscriptionGroupConfig.getRetryQueueNums(),
    PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
if (null == topicConfig) {
    response.setCode(ResponseCode.SYSTEM_ERROR);
    response.setRemark("topic[" + newTopic + "] not exist");
    return response;
}
if (!PermName.isWriteable(topicConfig.getPerm())) {
    response.setCode(ResponseCode.NO_PERMISSION);
    response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
    return response;
}
MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
if (null == msgExt) {
    response.setCode(ResponseCode.SYSTEM_ERROR);
    response.setRemark("look message by offset failed, " + requestHeader.getOffset());
    return response;
}

final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (null == retryTopic) {
    MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
}
msgExt.setWaitStoreMsgOK(false);
//延遲級別
int delayLevel = requestHeader.getDelayLevel();

int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
    maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}
//最大等于消息的最大重試次數(shù),消息丟入到死信隊列中
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
    || delayLevel < 0) {
	//重新設(shè)置其主題: %DLQ% + consumerGroup
    newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
    queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
    //基礎(chǔ)參數(shù)設(shè)置
    topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
        DLQ_NUMS_PER_GROUP,
        PermName.PERM_WRITE, 0
    );
    if (null == topicConfig) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("topic[" + newTopic + "] not exist");
        return response;
    }
} else {
	//第一次delayLevel==0時則下一次默認的延遲級別是3
    if (0 == delayLevel) {
        delayLevel = 3 + msgExt.getReconsumeTimes();
    }
    msgExt.setDelayTimeLevel(delayLevel);
}

判斷消息當前重試次數(shù)是否大于等于最大重試次數(shù),如果達到最大重試次數(shù),或者配置的重試級別小于0,則重新創(chuàng)建Topic,規(guī)則是 %DLQ% + consumerGroup,后續(xù)處理消息send到死信隊列中。

正常的消息會進入else分支,對于首次重試的消息,默認的delayLevel是0,rocketMQ會將給該level + 3,也就是加到3,這就是說,如果沒有顯示的配置延時級別,消息消費重試首次,是延遲了第三個級別發(fā)起的重試,也就是距離首次發(fā)送10s后重,其主題的默認規(guī)則是**%RETRY% + consumerGroup**。

當延時級別設(shè)置完成,刷新消息的重試次數(shù)為當前次數(shù)加1,broker將該消息刷盤,邏輯如下:

MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(newTopic);
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));

msgInner.setQueueId(queueIdInt);
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(this.getStoreHost());
//刷新消息的重試次數(shù)為當前次數(shù)加
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);

String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
//將消息持久化到commitlog文件中
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

那么什么是msgInner呢,即:MessageExtBrokerInner,也就是對重試的消息,rocketMQ會創(chuàng)建一個新的 MessageExtBrokerInner 對象,它實際上是繼承了MessageExt。

我們繼續(xù)進入消息刷盤邏輯,即putMessage(msgInner)方法,實現(xiàn)類為:DefaultMessageStore.java, 核心代碼如下:

PutMessageResult result = this.commitLog.putMessage(msg);

主要關(guān)注 this.commitLog.putMessage(msg); 這句代碼,通過commitLog我們可以認為這里是真實刷盤操作,也就是消息被持久化了。

我們繼續(xù)進入commitLog的putMessage方法,看到如下核心代碼段:

final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
    || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
    // Delay Delivery消息的延遲級別是否大于0
    if (msg.getDelayTimeLevel() > 0) {
    	//如果消息的延遲級別大于最大的延遲級別則置為最大延遲級別
        if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
            msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
        }
        //將消息主題設(shè)置為SCHEDULE_TOPIC_XXXX
        topic = ScheduleMessageService.SCHEDULE_TOPIC;
        //將消息隊列設(shè)置為延遲的消息隊列的ID
        queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
        //消息的原有的主題和消息隊列存入屬性中
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
        msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
        msg.setTopic(topic);
        msg.setQueueId(queueId);
    }
}

可以看到,如果是重試消息,在進行延時級別判斷時候,返回true,則進入分支邏輯,通過這段邏輯我們可以知道,對于重試的消息,rocketMQ并不會從原隊列中獲取消息,而是創(chuàng)建了一個新的Topic進行消息存儲的。也就是代碼中的SCHEDULE_TOPIC,看一下具體是什么內(nèi)容:

public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";

主題名稱改為: SCHEDULE_TOPIC_XXXX。

到這里我們可以得到一個結(jié)論:

對于所有消費者消費失敗的消息,rocketMQ都會把重試的消息 重新new出來(即上文提到的MessageExtBrokerInner對象),然后投遞到主題 SCHEDULE_TOPIC_XXXX 下的隊列中,然后由定時任務(wù)進行調(diào)度重試,而重試的周期符合我們在上文中提到的delayLevel周期,也就是:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

同時為了保證消息可被找到,也會將原先的topic存儲到properties中,也就是如下這段代碼

MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

這里將原先的topic和隊列id做了備份。

參照《RocketMQ延遲消息》一文,里面有具體的分析,消息重試和延遲消息的處理流程是一樣的都需要創(chuàng)建一個延遲消息的主題隊列。后臺啟動定時任務(wù)定時掃描需要的發(fā)送的消息將其發(fā)送到原有的主題和消息隊列中供消費,只是其重試消息的主題是%RETRY_TOPIC%+ consumerGroup并且其隊列只有一個queue0,延遲消息和普通消息一樣發(fā)送到原主題的原隊列中。

3.3、死信的業(yè)務(wù)處理

默認的處理機制中,如果我們只對消息做重復(fù)消費,達到最大重試次數(shù)之后消息就進入死信隊列了。

我們也可以根據(jù)業(yè)務(wù)的需要,定義消費的最大重試次數(shù),每次消費的時候判斷當前消費次數(shù)是否等于最大重試次數(shù)的閾值。

如:重試三次就認為當前業(yè)務(wù)存在異常,繼續(xù)重試下去也沒有意義了,那么我們就可以將當前的這條消息進行提交,返回broker狀態(tài)ConsumeConcurrentlyStatus.CONSUME_SUCCES,讓消息不再重發(fā),同時將該消息存入我們業(yè)務(wù)自定義的死信消息表,將業(yè)務(wù)參數(shù)入庫,相關(guān)的運營通過查詢死信表來進行對應(yīng)的業(yè)務(wù)補償操作。

RocketMQ 的處理方式為將達到最大重試次數(shù)(16次)的消息標記為死信消息,將該死信消息投遞到 DLQ 死信隊列中,業(yè)務(wù)需要進行人工干預(yù)。實現(xiàn)的邏輯在 SendMessageProcessor 的 consumerSendMsgBack 方法中,大致思路為首先判斷重試次數(shù)是否超過16或者消息發(fā)送延時級別是否小于0,如果已經(jīng)超過16或者發(fā)送延時級別小于0,則將消息設(shè)置為新的死信。死信 topic 為:%DLQ%+consumerGroup。

RocketMQ出現(xiàn)消息重試的場景分析以及代碼實現(xiàn)

圖中展示的就是整個消息重試涉及的消息在相關(guān)主題之間的流轉(zhuǎn)

感謝各位的閱讀,以上就是“RocketMQ出現(xiàn)消息重試的場景分析以及代碼實現(xiàn)”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對RocketMQ出現(xiàn)消息重試的場景分析以及代碼實現(xiàn)這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI