您好,登錄后才能下訂單哦!
這篇文章將為大家詳細(xì)講解有關(guān)Redis中如何實(shí)現(xiàn)消息隊(duì)列和延時(shí)消息隊(duì)列,小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。
lpush (left push)
由隊(duì)列的左邊存放進(jìn)去
rpush (right push)
由隊(duì)列的右邊存放進(jìn)去
lpop (left pop)
由隊(duì)列的左邊取出來
rpop (right pop)
由隊(duì)列的右邊取出來
以上的四個(gè)命令,可以讓 list 幫我們實(shí)現(xiàn)隊(duì)列 或者 棧,隊(duì)列的特性是先進(jìn)先出,棧的特性是先進(jìn)后出,
所以隊(duì)列的實(shí)現(xiàn)可以使用 lpush + rpop 或者 rpush + lpop,
棧的實(shí)現(xiàn)則是lpush + lpop 或者 rpush + rpop。
生產(chǎn)者發(fā)布消息
首先我們使用 rpush 對(duì)一個(gè)叫做notify-queue的隊(duì)列,增加五個(gè)元素,即 1 2 3 4 5,也就是作為生產(chǎn)者發(fā)布消息啦
消費(fèi)者消費(fèi)消息
既然生產(chǎn)者使用的是rpush,那么消費(fèi)者就要用lpop,可以看下下圖,我們不停對(duì)notify-queue進(jìn)行消息消費(fèi),并且是按照順序的,從1一直到5,按順序讀出,最終隊(duì)列中沒有消息了,彈出的則一直是空
在上面使用lpop消費(fèi)消息時(shí),可以看到,消息消費(fèi)完后,我們每次再去pop時(shí),讀到的都是一個(gè)空的信息,
上面是手動(dòng)執(zhí)行命令,但是如果是寫好的代碼程序不停的去pop數(shù)據(jù)(拉取數(shù)據(jù))的話,會(huì)造成空輪詢(無用的讀取),
既拉高了客戶端的CPU消耗,又拉高了redis的QPS,并且還是無用操作,這些無用操作可能會(huì)造成其他客戶端對(duì)redis的訪問變得響應(yīng)緩慢。
解決方案A (休眠)
既然空輪詢會(huì)讓客戶端和redis的資源消耗都會(huì)變得較高,那么我們可以讓客戶端在收到空數(shù)據(jù)的時(shí)候,進(jìn)行1s的休眠,1s后再進(jìn)行數(shù)據(jù)拉取,這樣可以降低消耗
Thread.sleep(1000)
這個(gè)方案也是存在瑕疵的,即消息消費(fèi)延遲性增大了,如果只有一個(gè)消費(fèi)者的話,延遲就是1s,即空輪詢后,正好休眠了,但是這時(shí)候剛好有消息過來了,還是要等到1s醒來后才能消費(fèi),
如果有多個(gè)消費(fèi)者的話,由于每個(gè)消費(fèi)者的睡眠時(shí)間是岔開的,會(huì)降低一些延遲性,但是有沒有辦法更好的方法,可以做到幾乎 0 延遲?
解決方案B (阻塞讀)
redis中關(guān)于隊(duì)列取數(shù)據(jù)其實(shí)還有兩個(gè)命令,即阻塞讀取,
blpop (blocking left pop)
brpop (blocking right pop)
阻塞讀在隊(duì)列沒有數(shù)據(jù)的時(shí)候,會(huì)進(jìn)入休眠狀態(tài),一旦有消息來了以后,則立刻做出反應(yīng),讀取數(shù)據(jù),因此使用 blpop/brpop 替換 lpop/rpop 則可以解決消息延遲性的問題,
繼續(xù)入隊(duì)3個(gè)屬性,6、7、8
使用 blpop 進(jìn)行隊(duì)列的讀取,最后一個(gè)參數(shù)是阻塞讀的等待時(shí)間,如果超過這個(gè)時(shí)間還沒有消息,將會(huì)返回nil,此時(shí)可以繼續(xù)重復(fù)blpop操作,
阻塞讀的空閑連接自動(dòng)斷開問題
客戶端使用阻塞讀時(shí),如果阻塞的時(shí)間過長(zhǎng),服務(wù)一般會(huì)當(dāng)成空閑連接,從而對(duì)其進(jìn)行主動(dòng)斷開,減少無用的連接占用資源,這個(gè)時(shí)候客戶端會(huì)拋出異常,
所以請(qǐng)注意,在客戶端使用阻塞讀的時(shí)候,要進(jìn)行異常的捕獲,從而做出相應(yīng)的處理,例如重試。
java客戶端實(shí)現(xiàn)消息隊(duì)列
思路和上述一樣,只是由命令行客戶端redis-cli變成了java語言,一個(gè)線程或多個(gè)線程進(jìn)行 rpush 的發(fā)布,
另外一個(gè)或多個(gè)線程進(jìn)行 blpop 消費(fèi),完成的代碼在:https://github.com/qiaomengnan16/redis-demo/tree/main/redis-queue
發(fā)布者
訂閱者
延時(shí)隊(duì)列指的是,消息發(fā)送的一段時(shí)間后,再由消費(fèi)者進(jìn)行消費(fèi),而不是發(fā)送過去后,消費(fèi)者就能立即讀取到,
zset的可以幫我們做到這個(gè)事情,首先zset可以通過score進(jìn)行排序,score可以存一個(gè)時(shí)間戳,所以我們每次發(fā)布消息的時(shí)候,用當(dāng)前時(shí)間戳加上延時(shí)的時(shí)間戳,
隨后消費(fèi)者取消息的時(shí)候,通過截取zset的數(shù)據(jù),取到已經(jīng)滿足當(dāng)前時(shí)間的消息(即取score小于等于當(dāng)前時(shí)間戳的數(shù)據(jù),score小于等于當(dāng)前時(shí)間戳代表消息已經(jīng)到時(shí)間了,如果大于的話,說明還得等一會(huì)才能消費(fèi))。
關(guān)鍵命令 zadd (發(fā)布者),zrangebyscore(訂閱者),zrem (訂閱者消費(fèi)完數(shù)據(jù)后刪除)
命令實(shí)現(xiàn)
我們使用zadd添加了4個(gè)數(shù)據(jù),分別是1、2、3秒(偽說法,這里其實(shí)只是個(gè)score)后才能消費(fèi)的數(shù)據(jù),還有一個(gè)10秒后才能消費(fèi)的kafka,
假如現(xiàn)在已經(jīng)到了第三秒,我們?nèi)set中大于等于1秒的和小于等于3秒的數(shù)據(jù),因?yàn)檫@個(gè)區(qū)間的數(shù)據(jù)正好是我們可以消費(fèi)的,可以看到,我們?nèi)〕隽朔蠗l件的3條數(shù)據(jù),
如果每次只能消費(fèi)一個(gè)數(shù)據(jù)的話,可以加一個(gè)limit限制條件,可以看下圖取出了第一個(gè)可以消費(fèi)的數(shù)據(jù),redis
同時(shí)注意,和list的lpop/和blpop不同(它們彈出即自動(dòng)刪除原始隊(duì)列里的該數(shù)據(jù)),
雖然獲取到了數(shù)據(jù),但是如果不使用zrem進(jìn)行刪除的話,這條數(shù)據(jù)還會(huì)被其他人讀到,因?yàn)樗€一直存在zset中,
不過zrem可能會(huì)發(fā)生已經(jīng)被別人搶先一步刪除(消費(fèi))的情況,所以代碼中還需要根據(jù)zrem的返回值是否大于0判斷,本次消息我們是否搶占成功,成功后再進(jìn)行正確消費(fèi)。
代碼實(shí)現(xiàn)
發(fā)布者
訂閱者
測(cè)試延遲效果
完整代碼地址:https://github.com/qiaomengnan16/redis-demo/tree/main/redis-delayed-queue
優(yōu)化, 使用lua實(shí)現(xiàn)
上面實(shí)現(xiàn)的延遲隊(duì)列中,有一個(gè)問題,就是使用zrem判斷是否搶到這個(gè)數(shù)據(jù)時(shí),很有可能沒有搶到,這樣繼續(xù)進(jìn)行讀取,可能幾輪都搶不到,資源白白浪費(fèi)了,所以可以通過lua腳本,來進(jìn)行優(yōu)化,
讓zrangebyscore 和 zrem成為一個(gè)原子化操作,這就可以避免多線程爭(zhēng)搶,搶不到的資源浪費(fèi)了。
關(guān)于“Redis中如何實(shí)現(xiàn)消息隊(duì)列和延時(shí)消息隊(duì)列”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。