溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ延遲消息怎么實現

發(fā)布時間:2022-08-23 11:15:22 來源:億速云 閱讀:140 作者:iii 欄目:開發(fā)技術

今天小編給大家分享一下RocketMQ延遲消息怎么實現的相關知識點,內容詳細,邏輯清晰,相信大部分人都還太了解這方面的知識,所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。

前言

場景可以是這樣的,雙11搶手機,一個新手機4000-5000,到0點的時候,沖著興奮勁,搶到了。但是摸了摸錢包,又冷靜下來了,好像不是很必要換手機。就放在那里沒有支付,過了30分鐘,自動取消了。這里就是使用延遲消息的場景,當下單之后,向消息隊列發(fā)送一條延遲30分鐘消費的消息。等到30分鐘過了,然后消費消息,執(zhí)行檢查任務,要是對應的訂單支付了,就什么都不做,要是沒支付,就取消訂單。

RocketMQ的延遲消息是org.apache.rocketmq.broker.schedule.ScheduleMessageService類實現的

核心屬性

RMQ_SYS_SCHEDULE_TOPIC

在之前的版本中叫SCHEDULE_TOPIC,是系統(tǒng)內置的Topic,用來保存所有的定時消息。沒有執(zhí)行的定時消息都會被保存在這個topic中。

FIRST_DELAY_TIME

第一次執(zhí)行定時任務的延遲時間,默認是1秒。

private static final long FIRST_DELAY_TIME = 1000L;

DELAY_FOR_A_WHILE

第二次以及之后每次定時任務執(zhí)行的間隔時間,默認100ms。

private static final long DELAY_FOR_A_WHILE = 100L;

DELAY_FOR_A_PERIOD

若是延遲消息投遞失敗,則在這個時間過后繼續(xù)投遞,默認10秒。

private static final long DELAY_FOR_A_PERIOD = 10000L;

delayLevelTable

這是保存延遲級別和延遲時間映射關系的地方

private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
    new ConcurrentHashMap<Integer, Long>(32);

offsetTable

保存延遲級別和對應的消費位點

private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
    new ConcurrentHashMap<Integer, Long>(32);

核心方法

queueId2DelayLevel

將queueId轉換為延遲級別

public static int queueId2DelayLevel(final int queueId) {
    return queueId + 1;
}

delayLevel2QueueId

將延遲級別轉換為queueId

public static int delayLevel2QueueId(final int delayLevel) {
    return delayLevel - 1;
}

updateOffset

更新延遲消息topic的消費位點

private void updateOffset(int delayLevel, long offset) {
    this.offsetTable.put(delayLevel, offset);
    if (versionChangeCounter.incrementAndGet() % brokerController.getBrokerConfig().getDelayOffsetUpdateVersionStep() == 0) {
        long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
        dataVersion.nextVersion(stateMachineVersion);
    }
}

computeDeliverTimestamp

根據延遲消息級別和消息的存儲時間計算該延遲消息的投遞時間

public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) {
    Long time = this.delayLevelTable.get(delayLevel);
    if (time != null) {
        return time + storeTimestamp;
    }
    return storeTimestamp + 1000;
}

start()

啟動延遲消息服務

shutdown()

關閉start方法中啟動的額timer任務

load()

加載消息的消費位點信息和全部的延遲級別信息。延遲級別信息默認如下。

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

parseDelayLevel

格式化所有的延遲級別信息,保存到內存中。

以上就是“RocketMQ延遲消息怎么實現”這篇文章的所有內容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會為大家更新不同的知識,如果還想學習更多的知識,請關注億速云行業(yè)資訊頻道。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI