溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何實現(xiàn)一個延遲隊列

發(fā)布時間:2021-10-26 11:51:27 來源:億速云 閱讀:151 作者:iii 欄目:web開發(fā)

本篇內(nèi)容介紹了“如何實現(xiàn)一個延遲隊列”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!

延遲隊列定義

首先,隊列這種數(shù)據(jù)結(jié)構(gòu)相信大家都不陌生,它是一種先進先出的數(shù)據(jù)結(jié)構(gòu)。普通隊列中的元素是有序的,先進入隊列中的元素會被優(yōu)先取出進行消費;

延時隊列相比于普通隊列最大的區(qū)別就體現(xiàn)在其延時的屬性上,普通隊列的元素是先進先出,按入隊順序進行處理,而延時隊列中的元素在入隊時會指定一個延遲時間,表示其希望能夠在經(jīng)過該指定時間后處理。從某種意義上來講,延遲隊列的結(jié)構(gòu)并不像一個隊列,而更像是一種以時間為權(quán)重的有序堆結(jié)構(gòu)。

應用場景

我在開發(fā)業(yè)務需求時遇到的使用場景是這樣的,用戶可以在小程序中訂閱不同的微信或者 QQ 的模板消息,產(chǎn)品同學可以在小程序的管理端新建消息推送計劃,當?shù)竭_指定的時間節(jié)點的時候給所有訂閱模板消息的用戶進行消息推送。

如果僅僅是服務單一的小程序,那也許起個定時任務,或者甚至人工的定時去執(zhí)行能夠最便捷最快速的去完成這項需求,但我們希望能夠抽象出一個消息訂閱的模塊服務出來給所有業(yè)務使用,這時候就需要一種通用的系統(tǒng)的解決方案,這時候便需要使用到延遲隊列了。

除了上述我所遇到的這樣的典型的需求以外,延遲隊列的應用場景其實也非常的廣泛,比如說以下的場景:

  1. 鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)

  2. 新建的訂單,如果用戶在 15 分鐘內(nèi)未支付,則自動取消。

  3. 公司的會議預定系統(tǒng),在會議預定成功后,會在會議開始前半小時通知所有預定該會議的用戶。

  4. 安全工單超過 24 小時未處理,則自動拉企業(yè)微信群提醒相關(guān)責任人。

  5. 用戶下單外賣以后,距離超時時間還有 10 分鐘時提醒外賣小哥即將超時。

對于數(shù)據(jù)量比較少并且時效性要求不那么高的場景,一種比較簡單的方式是輪詢數(shù)據(jù)庫,比如每秒輪詢一下數(shù)據(jù)庫中所有數(shù)據(jù),處理所有到期的數(shù)據(jù),比如如果我是公司內(nèi)部的會議預定系統(tǒng)的開發(fā)者,我可能就會采用這種方案,因為整個系統(tǒng)的數(shù)據(jù)量必然不會很大并且會議開始前提前 30 分鐘提醒與提前 29 分鐘提醒的差別并不大。

但是如果需要處理的數(shù)據(jù)量比較大實時性要求比較高,比如淘寶每天的所有新建訂單 15 分鐘內(nèi)未支付的自動超時,數(shù)量級高達百萬甚至千萬,這時候如果你還敢輪詢數(shù)據(jù)庫怕是要被你老板打死,不被老板打死估計也要被運維同學打死。

這種場景下,就需要使用到我們今天的主角 —— 延遲隊列了。延遲隊列為我們提供了一種高效的處理大量需要延遲消費消息的解決方案。那么話不多說,下面我們就來看一下幾種常見的延遲隊列的解決方案以及他們各自的優(yōu)缺點。

實現(xiàn)方案

Redis ZSet

我們知道 Redis 有一個有序集合的數(shù)據(jù)結(jié)構(gòu) ZSet,ZSet 中每個元素都有一個對應 Score,ZSet 中所有元素是按照其 Score 進行排序的。

那么我們可以通過以下這幾個操作使用 Redis 的 ZSet 來實現(xiàn)一個延遲隊列:

  1. 鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)

  2. 入隊操作: ZADD KEY timestamp task , 我們將需要處理的任務,按其需要延遲處理時間作為 Score 加入到 ZSet 中。Redis 的 ZAdd 的時間復雜度是 O(logN) , N 是 ZSet 中元素個數(shù),因此我們能相對比較高效的進行入隊操作。

  3. 起一個進程定時(比如每隔一秒)通過 ZREANGEBYSCORE 方法查詢 ZSet 中 Score 最小的元素,具體操作為: ZRANGEBYSCORE KEY -inf +inf limit 0 1 WITHSCORES 。查詢結(jié)果有兩種情況: 
    a. 查詢出的分數(shù)小于等于當前時間戳,說明到這個任務需要執(zhí)行的時間了,則去異步處理該任務; 
    b. 查詢出的分數(shù)大于當前時間戳,由于剛剛的查詢操作取出來的是分數(shù)最小的元素,所以說明 ZSet 中所有的任務都還沒有到需要執(zhí)行的時間,則休眠一秒后繼續(xù)查詢; 
    同樣的, ZRANGEBYSCORE 操作的時間復雜度為 O(logN + M) ,其中 N 為 ZSet 中元素個數(shù), M 為查詢的元素個數(shù),因此我們定時查詢操作也是比較高效的。

這里從網(wǎng)上搬運了一套 Redis 實現(xiàn)延遲隊列的后端架構(gòu),其在原來 Redis 的 ZSet 實現(xiàn)上進行了一系列的優(yōu)化,使得整個系統(tǒng)更穩(wěn)定、更健壯,能夠應對高并發(fā)場景,并且具有更好的可擴展性,是一個挺不錯的架構(gòu)設(shè)計,其整體架構(gòu)圖如下:

如何實現(xiàn)一個延遲隊列

其核心設(shè)計思路:

  1. 鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)

  2. 將延遲的消息任務通過 hash 算法路由至不同的 Redis Key 上,這樣做有兩大好處: 
    a. 避免了當一個 KEY 在存儲了較多的延時消息后,入隊操作以及查詢操作速度變慢的問題(兩個操作的時間復雜度均為 O(logN) )。 
    b. 系統(tǒng)具有了更好的橫向可擴展性,當數(shù)據(jù)量激增時,我們可以通過增加 Redis Key 的數(shù)量來快速的擴展整個系統(tǒng),來抗住數(shù)據(jù)量的增長。

  3. 每個 Redis Key 都對應建立一個處理進程,稱為 Event 進程,通過上述步驟 2 中所述的 ZRANGEBYSCORE 方法輪詢 Key,查詢是否有待處理的延遲消息。

  4. 所有的 Event 進程只負責分發(fā)消息,具體的業(yè)務邏輯通過一個額外的消息隊列異步處理,這么做的好處也是顯而易見的: 
    a. 一方面,Event 進程只負責分發(fā)消息,那么其處理消息的速度就會非???,就不太會出現(xiàn)因為業(yè)務邏輯復雜而導致消息堆積的情況。 
    b. 另一方面,采用一個額外的消息隊列后,消息處理的可擴展性也會更好,我們可以通過增加消費者進程數(shù)量來擴展整個系統(tǒng)的消息處理能力。

  5. Event 進程采用 Zookeeper 選主單進程部署的方式,避免 Event 進程宕機后,Redis Key 中消息堆積的情況。一旦 Zookeeper 的 leader 主機宕機,Zookeeper 會自動選擇新的 leader 主機來處理 Redis Key 中的消息。

從上述的討論中我們可以看到,通過 Redis Zset 實現(xiàn)延遲隊列是一種理解起來較為直觀,可以快速落地的方案。并且我們可以依賴 Redis 自身的持久化來實現(xiàn)持久化,使用 Redis 集群來支持高并發(fā)和高可用,是一種不錯的延遲隊列的實現(xiàn)方案。

RabbitMQ

RabbitMQ 本身并不直接提供對延遲隊列的支持,我們依靠 RabbitMQ 的 TTL 以及 死信隊列功能,來實現(xiàn)延遲隊列的效果。那就讓我們首先來了解一下,RabbitMQ 的死信隊列以及 TTL 功能。

死信隊列

死信隊列實際上是一種 RabbitMQ 的消息處理機制,當 RabbmitMQ 在生產(chǎn)和消費消息的時候,消息遇到如下的情況,就會變成“死信”:

  1. 鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)

  2. 消息被拒絕 basic.reject/ basic.nack 并且不再重新投遞 requeue=false

  3. 消息超時未消費,也就是 TTL 過期了

  4. 消息隊列到達最大長度

消息一旦變成一條死信,便會被重新投遞到死信交換機(Dead-Letter-Exchange),然后死信交換機根據(jù)綁定規(guī)則轉(zhuǎn)發(fā)到對應的死信隊列上,監(jiān)聽該隊列就可以讓消息被重新消費。

消息生存時間 TTL

TTL(Time-To-Live)是 RabbitMQ 的一種高級特性,表示了一條消息的最大生存時間,單位為毫秒。如果一條消息在 TTL 設(shè)置的時間內(nèi)沒有被消費,那么它就會變成一條死信,進入我們上面所說的死信隊列。

有兩種不同的方式可以設(shè)置消息的 TTL 屬性,一種方式是直接在創(chuàng)建隊列的時候設(shè)置整個隊列的 TTL 過期時間,所有進入隊列的消息,都被設(shè)置成了統(tǒng)一的過期時間,一旦消息過期,馬上就會被丟棄,進入死信隊列,參考代碼如下:

Map<String, Object> args = new HashMap<String, Object>(); args.put("x-message-ttl", 6000); channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

在延遲隊列的延遲時間為固定值的時候,比較適合使用這種方式。

另一種方式是針對單條消息設(shè)置,參考代碼如下,該消息被設(shè)置了 6 秒的過期時間:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.expiration("6000"); AMQP.BasicProperties properties = builder.build(); channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg content".getBytes());

如果需要不同的消息設(shè)置不同的延遲時間,上面針對隊列的 TTL 設(shè)置便無法滿足我們的需求,需要使用這種針對單個消息的 TTL 設(shè)置。

不過需要注意的是,使用這種方式設(shè)置的 TTL,消息可能不會按時死亡,因為 RabbitMQ 只會檢查第一個消息是否過期。比如這種情況,第一個消息設(shè)置了 20s 的 TTL,第二個消息設(shè)置了 10s 的 TTL,那么 RabbitMQ 會等到第一個消息過期之后,才會讓第二個消息過期。

解決這個問題的方法也很簡單,只需要安裝 RabbitMQ 的一個插件即可:

https://www.rabbitmq.com/community-plugins.html

安裝好這個插件后,所有的消息就都能按照被設(shè)置的 TTL 過期了。

RabbitMQ 實現(xiàn)延遲隊列

好了,介紹完 RabbitMQ 的死信隊列以及 TTL 這兩種特性之后,我們離實現(xiàn)延遲隊列就只差一步之遙了。

聰明的讀者可能已經(jīng)發(fā)現(xiàn)了,TTL 不就是延遲隊列中消息要延遲的時間么?如果我們把需要延遲的消息,將 TTL 設(shè)置為其延遲時間,投遞到 RabbitMQ 的普通隊列中,一直不去消費它,那么經(jīng)過 TTL 的時間后,消息就會自動被投遞到死信隊列,這時候我們使用消費者進程實時地去消費死信隊列中的消息,不就實現(xiàn)了延遲隊列的效果。

從下圖可以直觀的看出使用 RabbitMQ 實現(xiàn)延遲隊列的整體流程:

如何實現(xiàn)一個延遲隊列

使用 RabbitMQ 來實現(xiàn)延遲隊列,我們可以很好的利用一些 RabbitMQ 的特性,比如消息可靠發(fā)送、消息可靠投遞、死信隊列來保障消息至少被消費一次以及未被正確處理的消息不會被丟棄。另外,通過 RabbitMQ 集群的特性,可以很好的解決單點故障問題,不會因為單個節(jié)點掛掉導致延遲隊列不可用或者消息丟失。

TimeWheel

TimeWheel 時間輪算法,是一種實現(xiàn)延遲隊列的巧妙且高效的算法,被應用在 Netty,Zookeeper,Kafka 等各種框架中。

時間輪

如何實現(xiàn)一個延遲隊列

如上圖所示,時間輪是一個存儲延遲消息的環(huán)形隊列,其底層采用數(shù)組實現(xiàn),可以高效循環(huán)遍歷。這個環(huán)形隊列中的每個元素對應一個延遲任務列表,這個列表是一個雙向環(huán)形鏈表,鏈表中每一項都代表一個需要執(zhí)行的延遲任務。

時間輪會有表盤指針,表示時間輪當前所指時間,隨著時間推移,該指針會不斷前進,并處理對應位置上的延遲任務列表。

添加延遲任務

由于時間輪的大小固定,并且時間輪中每個元素都是一個雙向環(huán)形鏈表,我們可以在 O(1) 的時間復雜度下向時間輪中添加延遲任務。

如下圖,例如我們有一個這樣的時間輪,在表盤指針指向當前時間為 2 時,我們需要新添加一個延遲 3 秒的任務,我們可以快速計算出延遲任務在時間輪中所對應的位置為 5,并添加到位置 5 上任務列表尾部。

如何實現(xiàn)一個延遲隊列

多層時間輪

到現(xiàn)在為止一切都非常棒,但是細心的同學可能發(fā)現(xiàn)了,上面的時間輪的大小是固定的,只有 12 秒。如果此時我們有一個需要延遲 200 秒的任務,我們應該怎么處理呢?直接擴充整個時間輪的大小嗎?這顯然不可取,因為這樣做的話我們就需要維護一個非常非常大的時間輪,內(nèi)存是不可接受的,而且底層數(shù)組大了之后尋址效率也會降低,影響性能。

為此,Kafka 引入了多層時間輪的概念。其實多層時間輪的概念和我們的機械表上時針、分針、秒針的概念非常類似,當僅使用秒針無法表示當前時間時,就使用分針結(jié)合秒針一起表示。同樣的,當任務的到期時間超過了當前時間輪所表示的時間范圍時,就會嘗試添加到上層時間輪中,如下圖所示:

如何實現(xiàn)一個延遲隊列

第一層時間輪整個時間輪所表示時間范圍是 0-12 秒,第二層時間輪每格能表示的時間范圍是整個第一層時間輪所表示的范圍也就是 12 秒,所以整個第二層時間輪能表示的時間范圍即 12*12=144 秒,依次類推第三層時間輪能表示的范圍是 1728 秒,第四層為 20736 秒等等。

比如現(xiàn)在我們需要添加一個延時為 200 秒的延遲消息,我們發(fā)現(xiàn)其已經(jīng)超過了第一層時間輪能表示的時間范圍,我們就需要繼續(xù)往上層時間輪看,將其添加在第二層時間輪 200/12 = 17 的位置,然后我們發(fā)現(xiàn) 17 也超過了第二次時間輪的表示范圍,那么我們就需要繼續(xù)往上層看,將其添加在第三層時間輪的 17/12 = 2 的位置。

Kafka 中時間輪算法添加延遲任務以及推動時間輪滾動的核心流程如下,其中 Bucket 即時間輪中的延遲任務隊列,并且 Kafka 引入的 DelayQueue 解決了多數(shù) Bucket 為空導致的時間輪滾動效率低下的問題:

如何實現(xiàn)一個延遲隊列

使用時間輪實現(xiàn)的延遲隊列,能夠支持大量任務的高效觸發(fā)。并且在 Kafka 的時間輪算法的實現(xiàn)方案中,還引入了 DelayQueue,使用 DelayQueue 來推送時間輪滾動,而延遲任務的添加與刪除操作都放在時間輪中,這樣的設(shè)計大幅提升了整個延遲隊列的執(zhí)行效率。

總結(jié)

延遲隊列在我們?nèi)粘i_發(fā)中應用非常廣泛,本文介紹了三種不同的實現(xiàn)延遲隊列的方案,三種方案各自有各自的特點,例如 Redis 的實現(xiàn)方案理解起來最為簡單,能夠快速落地,但 Redis 畢竟是基于內(nèi)存的,雖然有數(shù)據(jù)持久化方案,但還是有數(shù)據(jù)丟失的可能性。而 RabbitMQ 的實現(xiàn)方案,由于 RabbitMQ 本身的消息可靠發(fā)送、消息可靠投遞、死信隊列等特性,可以保障消息至少被消費一次以及未被正確處理的消息不會被丟棄,讓消息的可靠性有了保障。最后 Kafka 的時間輪算法,個人覺得是三種實現(xiàn)方案中最難理解但也不失為一種非常巧妙實現(xiàn)方案。

“如何實現(xiàn)一個延遲隊列”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI