溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

RocketMQ延遲消息的實(shí)現(xiàn)方法

發(fā)布時間:2021-07-08 17:18:35 來源:億速云 閱讀:794 作者:chen 欄目:大數(shù)據(jù)

這篇文章主要介紹“RocketMQ延遲消息的實(shí)現(xiàn)方法”,在日常操作中,相信很多人在RocketMQ延遲消息的實(shí)現(xiàn)方法問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”RocketMQ延遲消息的實(shí)現(xiàn)方法”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

延時消息即消息發(fā)送后并不立即對消費(fèi)者可見,而是在用戶指定的時間投遞給消費(fèi)者。比如我們現(xiàn)在發(fā)送一條延時30秒的消息,消息發(fā)送后立即發(fā)送給服務(wù)器,但是服務(wù)器在30秒后才將該消息交給消費(fèi)者。

RocketMQ通過配置的延遲級別延遲消息投遞到消費(fèi)者,其中不同的延遲級別對應(yīng)不同的延遲時間,可配置,默認(rèn)的延遲級別有18種,分別是1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,支持時間單位 s 秒 m分鐘 h小時 d天。

源碼 MessageStoreConfig.java 是定義如下:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

可以在brocker配置 messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,自定義其時間級別。

1、代碼驗(yàn)證

前提:先啟動消費(fèi)者等待消息的發(fā)送,先發(fā)送消息,消費(fèi)者啟動需要時間,影響測試結(jié)果。

1.1、生產(chǎn)者Producer

public class DelayProducer {
	
	 public static void main(String[] args) throws MQClientException, InterruptedException {
		 
		DefaultMQProducer producer = new DefaultMQProducer("producer_test");
		producer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
		producer.start();
		SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");
		for (int i = 0; i < 10; i++) {
		    try {
		    	//構(gòu)建消息
			    Message msg = new Message("TopicTest" /* Topic */,
			        "TagA" /* Tag */,
			        ("延遲消息" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
			    );
			    //延時的級別為3 對應(yīng)的時間為10s 就是發(fā)送后延時10S在把消息投遞出去
			    msg.setDelayTimeLevel(3);
			    SendResult sendResult = producer.send(msg);
			    
			    System.out.printf("%s%n", sd.format(new Date())+" == "+sendResult);
		    } catch (Exception e) {
		        e.printStackTrace();
		        Thread.sleep(1000);
		    }
		}
		producer.shutdown();
	}
}

查看結(jié)果:

RocketMQ延遲消息的實(shí)現(xiàn)方法

1.2、消費(fèi)者Consumer

public class DelayConsumer {

	public static void main(String[] args) {
		try {
			DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
			consumer.setConsumerGroup("consumer_delay");
			consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
			consumer.subscribe("TopicTest", "*");
			consumer.registerMessageListener(new MessageListenerConcurrently(){

				@Override
				public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList,
						ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) {
					try {
					    for(MessageExt msg : paramList){
					    	String msgbody = new String(msg.getBody(), "utf-8");
					    	SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");
					    	System.out.println("接收時間 :  "+ sd.format(new Date()) +" == MessageBody: "+ msgbody);//輸出消息內(nèi)容
					    }
					} catch (Exception e) {
					    e.printStackTrace();
					    return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再試
					}
					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費(fèi)成功
				}
			});
			consumer.start();
			System.out.println("DelayConsumer===啟動成功!");
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

查看結(jié)果:

RocketMQ延遲消息的實(shí)現(xiàn)方法

2、內(nèi)部機(jī)制分析

查看其消息投遞的核心方法org.apache.rocketmq.store.CommitLog.putMessage

    public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
        //設(shè)置消息存儲到文件中的時間
        msg.setStoreTimestamp(System.currentTimeMillis());
        // Set the message body BODY CRC (consider the most appropriate setting
        // on the client)
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
        // Back to Results
        AppendMessageResult result = null;

        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

        String topic = msg.getTopic();
        int queueId = msg.getQueueId();

        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // Delay Delivery消息的延遲級別是否大于0
            if (msg.getDelayTimeLevel() > 0) {
            	//如果消息的延遲級別大于最大的延遲級別則置為最大延遲級別
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }
                //將消息主題設(shè)置為SCHEDULE_TOPIC_XXXX
                topic = ScheduleMessageService.SCHEDULE_TOPIC;
                //將消息隊(duì)列設(shè)置為延遲的消息隊(duì)列的ID
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId
                //消息的原有的主題和消息隊(duì)列存入屬性中
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }

        long eclipseTimeInLock = 0;
        MappedFile unlockMappedFile = null;
        //獲取最后一個消息的映射文件,mappedFileQueue可看作是CommitLog文件夾下的一個個文件的映射
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

        //寫入消息之前先申請putMessageLock,也就是保證消息寫入CommitLog文件中串行的
        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        try {
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;

            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            //設(shè)置消息的存儲時間
            msg.setStoreTimestamp(beginLockTimestamp);
            //mappedFile==null標(biāo)識CommitLog文件還未創(chuàng)建,第一次存消息則創(chuàng)建CommitLog文件
            //mappedFile.isFull()表示mappedFile文件已滿,需要重新創(chuàng)建CommitLog文件
            if (null == mappedFile || mappedFile.isFull()) {
            	//里面的參數(shù)0代表偏移量
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
            }
            //mappedFile==null說明創(chuàng)建CommitLog文件失敗拋出異常,創(chuàng)建失敗可能是磁盤空間不足或者權(quán)限不夠
            if (null == mappedFile) {
                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
            }
            //mappedFile文件后面追加消息
            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
            switch (result.getStatus()) {
                case PUT_OK:
                    break;
                case END_OF_FILE:
                    unlockMappedFile = mappedFile;
                    // Create a new file, re-write the message
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                    if (null == mappedFile) {
                        // XXX: warn and notify me
                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                    }
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                    break;
                case MESSAGE_SIZE_EXCEEDED:
                case PROPERTIES_SIZE_EXCEEDED:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
                case UNKNOWN_ERROR:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                default:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
            }

            eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            beginTimeInLock = 0;
        } finally {
        	//釋放鎖
            putMessageLock.unlock();
        }

        if (eclipseTimeInLock > 500) {
            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
        }

        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        }

        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

        // Statistics
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
        //消息刷盤
        handleDiskFlush(result, putMessageResult, msg);
        //主從數(shù)據(jù)同步復(fù)制
        handleHA(result, putMessageResult, msg);

        return putMessageResult;
    }

我們發(fā)現(xiàn)在通過putMessage 延遲消息就被放存放到了主題為 SCHEDULE_TOPIC_XXXX的commitlog中,消息的原有的主題和消息隊(duì)列存入屬性中,后面再通過定時的方式對這這些消息進(jìn)行重新發(fā)送。

ScheduleMessageService.start()啟動會為每一個延遲隊(duì)列創(chuàng)建一個調(diào)度任務(wù)每一個調(diào)度任務(wù)對應(yīng)SCHEDULE_TOPIC_XXXX主題下的一個消息消費(fèi)隊(duì)列。

    public void start() {

        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            Long offset = this.offsetTable.get(level);
            if (null == offset) {
                offset = 0L;
            }
            if (timeDelay != null) {
                this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }
        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    ScheduleMessageService.this.persist();
                } catch (Throwable e) {
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
    }

定時任務(wù)的實(shí)現(xiàn)類DeliverDelayedMessageTimerTask,核心方法是executeOnTimeup

public void executeOnTimeup() {
        	//根據(jù)延遲級別獲取該延遲隊(duì)列信息
            ConsumeQueue cq =
                ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));

            long failScheduleOffset = offset;
            //未找到說明目前沒有該延遲級別的消息,忽略本次任務(wù)
            if (cq != null) {
            	//根據(jù)offset獲取隊(duì)列中獲取當(dāng)前隊(duì)列中有效的消息,
                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
                if (bufferCQ != null) {
                    try {
                        long nextOffset = offset;
                        int i = 0;
                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                        
                        //遍歷ConsumeQueue,每一個ConsumeQueue條目是20個字節(jié)解析消息
                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                        	//物理偏移量
                            long offsetPy = bufferCQ.getByteBuffer().getLong();
                            //消息長度
                            int sizePy = bufferCQ.getByteBuffer().getInt();
                            //消息的tag的Hash值
                            long tagsCode = bufferCQ.getByteBuffer().getLong();
                            //
                            if (cq.isExtAddr(tagsCode)) {
                                if (cq.getExt(tagsCode, cqExtUnit)) {
                                    tagsCode = cqExtUnit.getTagsCode();
                                } else {
                                    //can't find ext content.So re compute tags code.
                                    log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                                        tagsCode, offsetPy, sizePy);
                                    long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                                    tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                                }
                            }
                            long now = System.currentTimeMillis();
                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                            long countdown = deliverTimestamp - now;
                            if (countdown <= 0) {
                            	//根據(jù)物理偏移量和消息的大小從Commitlog文件中查找消息
                                MessageExt msgExt =
                                    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                        offsetPy, sizePy);
                                if (msgExt != null) {
                                    try {
                                        MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                        //消息存儲到Commitlog文件中,轉(zhuǎn)發(fā)到主題對應(yīng)的消息隊(duì)列上,供消費(fèi)者再次消費(fèi)。
                                        PutMessageResult putMessageResult =
                                            ScheduleMessageService.this.defaultMessageStore
                                                .putMessage(msgInner);

                                        if (putMessageResult != null
                                            && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                            continue;
                                        } else {
                                            // XXX: warn and notify me
                                            log.error(
                                                "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
                                                msgExt.getTopic(), msgExt.getMsgId());
                                            ScheduleMessageService.this.timer.schedule(
                                                new DeliverDelayedMessageTimerTask(this.delayLevel,
                                                    nextOffset), DELAY_FOR_A_PERIOD);
                                            ScheduleMessageService.this.updateOffset(this.delayLevel,
                                                nextOffset);
                                            return;
                                        }
                                    } catch (Exception e) {
                                        log.error(
                                            "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
                                                + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
                                                + offsetPy + ",sizePy=" + sizePy, e);
                                    }
                                }
                            } else {
                                ScheduleMessageService.this.timer.schedule(
                                    new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
                                    countdown);
                                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                                return;
                            }
                        } // end of for
                        
                        nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                            this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                        return;
                    } finally {

                        bufferCQ.release();
                    }
                }else {
                	//未找到有效的消息,更新延遲隊(duì)列定時拉取進(jìn)度,并創(chuàng)建定時任務(wù)帶下一次繼續(xù)嘗試
                    long cqMinOffset = cq.getMinOffsetInQueue();
                    if (offset < cqMinOffset) {
                        failScheduleOffset = cqMinOffset;
                        log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
                            + cqMinOffset + ", queueId=" + cq.getQueueId());
                    }
                }
            }
            //創(chuàng)建延遲任務(wù)
            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                failScheduleOffset), DELAY_FOR_A_WHILE);
        }

圖解:

RocketMQ延遲消息的實(shí)現(xiàn)方法

1、消息生產(chǎn)者發(fā)送消息,如果發(fā)送的消息DelayTimeLevel大于0,則改變消息主題為SCHEDULE_TOPIC_XXXX,消息的隊(duì)列為DelayTimeLevel-1

2、消息經(jīng)由Commitlog轉(zhuǎn)發(fā)到消息隊(duì)列SCHEDULE_TOPIC_XXXX的消費(fèi)隊(duì)列1。

3、定時任務(wù)Timer每隔1秒根據(jù)上次拉取消息的偏移量從消費(fèi)隊(duì)列中取出所有消息。

4、根據(jù)消息的物理偏移量和消息大小從Commitlog中拉取消息。(PS:消息存儲章節(jié)中會重點(diǎn)講解)

5、根據(jù)消息的屬性重新創(chuàng)建消息,并恢復(fù)原主題TopicTest、原消息隊(duì)列ID,清除DelayTimeLevel屬性存入Commitlog中。

6、記錄原主題TopicTest的消息隊(duì)列的消息偏移量,供消費(fèi)者索引檢索消息進(jìn)行消費(fèi)。

到此,關(guān)于“RocketMQ延遲消息的實(shí)現(xiàn)方法”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI