溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Redis中如何實(shí)現(xiàn)消息隊(duì)列和延時(shí)消息隊(duì)列

發(fā)布時(shí)間:2021-12-10 10:04:05 來源:億速云 閱讀:313 作者:小新 欄目:關(guān)系型數(shù)據(jù)庫

這篇文章將為大家詳細(xì)講解有關(guān)Redis中如何實(shí)現(xiàn)消息隊(duì)列和延時(shí)消息隊(duì)列,小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。

list的幾個(gè)命令

lpush (left push)

由隊(duì)列的左邊存放進(jìn)去

rpush (right push)

由隊(duì)列的右邊存放進(jìn)去

lpop  (left pop)

由隊(duì)列的左邊取出來

rpop (right pop)

由隊(duì)列的右邊取出來

以上的四個(gè)命令,可以讓 list 幫我們實(shí)現(xiàn)隊(duì)列 或者 棧,隊(duì)列的特性是先進(jìn)先出,棧的特性是先進(jìn)后出,

所以隊(duì)列的實(shí)現(xiàn)可以使用 lpush + rpop 或者 rpush + lpop,

棧的實(shí)現(xiàn)則是lpush + lpop 或者 rpush + rpop。

Redis中如何實(shí)現(xiàn)消息隊(duì)列和延時(shí)消息隊(duì)列

使用命令演示隊(duì)列

生產(chǎn)者發(fā)布消息

首先我們使用 rpush 對(duì)一個(gè)叫做notify-queue的隊(duì)列,增加五個(gè)元素,即 1 2 3 4 5,也就是作為生產(chǎn)者發(fā)布消息啦

Redis中如何實(shí)現(xiàn)消息隊(duì)列和延時(shí)消息隊(duì)列

消費(fèi)者消費(fèi)消息

既然生產(chǎn)者使用的是rpush,那么消費(fèi)者就要用lpop,可以看下下圖,我們不停對(duì)notify-queue進(jìn)行消息消費(fèi),并且是按照順序的,從1一直到5,按順序讀出,最終隊(duì)列中沒有消息了,彈出的則一直是空

Redis中如何實(shí)現(xiàn)消息隊(duì)列和延時(shí)消息隊(duì)列

空輪詢問題

在上面使用lpop消費(fèi)消息時(shí),可以看到,消息消費(fèi)完后,我們每次再去pop時(shí),讀到的都是一個(gè)空的信息,

上面是手動(dòng)執(zhí)行命令,但是如果是寫好的代碼程序不停的去pop數(shù)據(jù)(拉取數(shù)據(jù))的話,會(huì)造成空輪詢(無用的讀取),

既拉高了客戶端的CPU消耗,又拉高了redis的QPS,并且還是無用操作,這些無用操作可能會(huì)造成其他客戶端對(duì)redis的訪問變得響應(yīng)緩慢。

解決方案A (休眠)

既然空輪詢會(huì)讓客戶端和redis的資源消耗都會(huì)變得較高,那么我們可以讓客戶端在收到空數(shù)據(jù)的時(shí)候,進(jìn)行1s的休眠,1s后再進(jìn)行數(shù)據(jù)拉取,這樣可以降低消耗

Thread.sleep(1000)

這個(gè)方案也是存在瑕疵的,即消息消費(fèi)延遲性增大了,如果只有一個(gè)消費(fèi)者的話,延遲就是1s,即空輪詢后,正好休眠了,但是這時(shí)候剛好有消息過來了,還是要等到1s醒來后才能消費(fèi),

如果有多個(gè)消費(fèi)者的話,由于每個(gè)消費(fèi)者的睡眠時(shí)間是岔開的,會(huì)降低一些延遲性,但是有沒有辦法更好的方法,可以做到幾乎 0 延遲?

解決方案B (阻塞讀)

redis中關(guān)于隊(duì)列取數(shù)據(jù)其實(shí)還有兩個(gè)命令,即阻塞讀取,

blpop (blocking left pop)

brpop (blocking right pop)

阻塞讀在隊(duì)列沒有數(shù)據(jù)的時(shí)候,會(huì)進(jìn)入休眠狀態(tài),一旦有消息來了以后,則立刻做出反應(yīng),讀取數(shù)據(jù),因此使用 blpop/brpop 替換 lpop/rpop 則可以解決消息延遲性的問題,

繼續(xù)入隊(duì)3個(gè)屬性,6、7、8

Redis中如何實(shí)現(xiàn)消息隊(duì)列和延時(shí)消息隊(duì)列

使用 blpop 進(jìn)行隊(duì)列的讀取,最后一個(gè)參數(shù)是阻塞讀的等待時(shí)間,如果超過這個(gè)時(shí)間還沒有消息,將會(huì)返回nil,此時(shí)可以繼續(xù)重復(fù)blpop操作,

Redis中如何實(shí)現(xiàn)消息隊(duì)列和延時(shí)消息隊(duì)列

阻塞讀的空閑連接自動(dòng)斷開問題

客戶端使用阻塞讀時(shí),如果阻塞的時(shí)間過長(zhǎng),服務(wù)一般會(huì)當(dāng)成空閑連接,從而對(duì)其進(jìn)行主動(dòng)斷開,減少無用的連接占用資源,這個(gè)時(shí)候客戶端會(huì)拋出異常,

所以請(qǐng)注意,在客戶端使用阻塞讀的時(shí)候,要進(jìn)行異常的捕獲,從而做出相應(yīng)的處理,例如重試。

java客戶端實(shí)現(xiàn)消息隊(duì)列

思路和上述一樣,只是由命令行客戶端redis-cli變成了java語言,一個(gè)線程或多個(gè)線程進(jìn)行 rpush 的發(fā)布,

另外一個(gè)或多個(gè)線程進(jìn)行 blpop 消費(fèi),完成的代碼在:https://github.com/qiaomengnan16/redis-demo/tree/main/redis-queue

發(fā)布者

Redis中如何實(shí)現(xiàn)消息隊(duì)列和延時(shí)消息隊(duì)列

訂閱者

Redis中如何實(shí)現(xiàn)消息隊(duì)列和延時(shí)消息隊(duì)列

延時(shí)隊(duì)列的實(shí)現(xiàn)思路

延時(shí)隊(duì)列指的是,消息發(fā)送的一段時(shí)間后,再由消費(fèi)者進(jìn)行消費(fèi),而不是發(fā)送過去后,消費(fèi)者就能立即讀取到,

zset的可以幫我們做到這個(gè)事情,首先zset可以通過score進(jìn)行排序,score可以存一個(gè)時(shí)間戳,所以我們每次發(fā)布消息的時(shí)候,用當(dāng)前時(shí)間戳加上延時(shí)的時(shí)間戳,

隨后消費(fèi)者取消息的時(shí)候,通過截取zset的數(shù)據(jù),取到已經(jīng)滿足當(dāng)前時(shí)間的消息(即取score小于等于當(dāng)前時(shí)間戳的數(shù)據(jù),score小于等于當(dāng)前時(shí)間戳代表消息已經(jīng)到時(shí)間了,如果大于的話,說明還得等一會(huì)才能消費(fèi))。

關(guān)鍵命令 zadd (發(fā)布者),zrangebyscore(訂閱者),zrem (訂閱者消費(fèi)完數(shù)據(jù)后刪除)

命令實(shí)現(xiàn)

我們使用zadd添加了4個(gè)數(shù)據(jù),分別是1、2、3秒(偽說法,這里其實(shí)只是個(gè)score)后才能消費(fèi)的數(shù)據(jù),還有一個(gè)10秒后才能消費(fèi)的kafka,

Redis中如何實(shí)現(xiàn)消息隊(duì)列和延時(shí)消息隊(duì)列

假如現(xiàn)在已經(jīng)到了第三秒,我們?nèi)set中大于等于1秒的和小于等于3秒的數(shù)據(jù),因?yàn)檫@個(gè)區(qū)間的數(shù)據(jù)正好是我們可以消費(fèi)的,可以看到,我們?nèi)〕隽朔蠗l件的3條數(shù)據(jù),

Redis中如何實(shí)現(xiàn)消息隊(duì)列和延時(shí)消息隊(duì)列

如果每次只能消費(fèi)一個(gè)數(shù)據(jù)的話,可以加一個(gè)limit限制條件,可以看下圖取出了第一個(gè)可以消費(fèi)的數(shù)據(jù),redis

Redis中如何實(shí)現(xiàn)消息隊(duì)列和延時(shí)消息隊(duì)列

同時(shí)注意,和list的lpop/和blpop不同(它們彈出即自動(dòng)刪除原始隊(duì)列里的該數(shù)據(jù)),

雖然獲取到了數(shù)據(jù),但是如果不使用zrem進(jìn)行刪除的話,這條數(shù)據(jù)還會(huì)被其他人讀到,因?yàn)樗€一直存在zset中,

不過zrem可能會(huì)發(fā)生已經(jīng)被別人搶先一步刪除(消費(fèi))的情況,所以代碼中還需要根據(jù)zrem的返回值是否大于0判斷,本次消息我們是否搶占成功,成功后再進(jìn)行正確消費(fèi)。

代碼實(shí)現(xiàn)

發(fā)布者

Redis中如何實(shí)現(xiàn)消息隊(duì)列和延時(shí)消息隊(duì)列

訂閱者

Redis中如何實(shí)現(xiàn)消息隊(duì)列和延時(shí)消息隊(duì)列

測(cè)試延遲效果

Redis中如何實(shí)現(xiàn)消息隊(duì)列和延時(shí)消息隊(duì)列

完整代碼地址:https://github.com/qiaomengnan16/redis-demo/tree/main/redis-delayed-queue

優(yōu)化, 使用lua實(shí)現(xiàn)

上面實(shí)現(xiàn)的延遲隊(duì)列中,有一個(gè)問題,就是使用zrem判斷是否搶到這個(gè)數(shù)據(jù)時(shí),很有可能沒有搶到,這樣繼續(xù)進(jìn)行讀取,可能幾輪都搶不到,資源白白浪費(fèi)了,所以可以通過lua腳本,來進(jìn)行優(yōu)化,

讓zrangebyscore 和 zrem成為一個(gè)原子化操作,這就可以避免多線程爭(zhēng)搶,搶不到的資源浪費(fèi)了。

Redis中如何實(shí)現(xiàn)消息隊(duì)列和延時(shí)消息隊(duì)列

Redis中如何實(shí)現(xiàn)消息隊(duì)列和延時(shí)消息隊(duì)列

關(guān)于“Redis中如何實(shí)現(xiàn)消息隊(duì)列和延時(shí)消息隊(duì)列”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI