溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

如何進(jìn)行Redis5新特性中Streams作消息隊(duì)列的分析

發(fā)布時(shí)間:2021-11-30 09:45:44 來源:億速云 閱讀:107 作者:柒染 欄目:數(shù)據(jù)庫(kù)

這期內(nèi)容當(dāng)中小編將會(huì)給大家?guī)碛嘘P(guān)如何進(jìn)行Redis5新特性中Streams作消息隊(duì)列的分析,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

前言

Redis 5 新特性中,Streams 數(shù)據(jù)結(jié)構(gòu)的引入,可以說它是在本次迭代中最大特性。它使本次 5.x 版本迭代中,Redis 作為消息隊(duì)列使用時(shí),得到更完善,更強(qiáng)大的原生支持,其中尤為明顯的是持久化消息隊(duì)列。同時(shí),stream 借鑒了 kafka 的消費(fèi)組模型概念和設(shè)計(jì),使消費(fèi)消息處理上更加高效快速。 數(shù)據(jù)結(jié)構(gòu)中常用 API 進(jìn)行分析。

準(zhǔn)備

本文所使用 Redis 版本為 5.0.5 。如果使用更早的 5.x 版本,有些 API 使用效果,與本文中描述略有不同。

添加消息

Streams 添加數(shù)據(jù)使用 XADD 指令進(jìn)行添加,消息中的數(shù)據(jù)以 K-V 鍵值對(duì)的形式進(jìn)行操作。一條消息可以存在多個(gè)鍵值對(duì),添加命令格式:

XADD key ID field string [field string ...]

其中 key 為 Streams 的名稱,ID 為消息的唯一標(biāo)志,不可重復(fù),field string 就為鍵值對(duì)。下面我們就添加以 person 為名稱的流,進(jìn)行操作。

XADD person * name ytao des https://ytao.top

上面添加案例中,ID 使用 * 號(hào)復(fù)制,這里代表著服務(wù)端自動(dòng)生成 Id,添加后返回?cái)?shù)據(jù) "1578238486193-0"

這里自動(dòng)生成的 Id 格式為 <millisecondstime>-<sequencenumber> Id 是由兩部分組成:

  1. 鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)

  2.  millisecondsTime 為當(dāng)前服務(wù)器時(shí)間毫秒時(shí)間戳。

  3.  sequenceNumber 當(dāng)前序列號(hào),取值來源于當(dāng)前毫秒內(nèi),生成消息的順序,默認(rèn)從 0 開始加 1 遞增。

比如:1578238486193-3 表示在 1578238486193 毫秒的時(shí)間戳?xí)r,添加的第 4 條消息。

除了服務(wù)端自動(dòng)生成 Id 方式外,也支持指定 Id 的生成,但是指定 Id 有以下條件限制:

  1. 鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)

  2.  Id 中的前后部分必須為數(shù)字。

  3.  最小 Id 為 0-1,不能為 0-0,但是 2-0,3-0 .... 是被允許的。

  4.  添加的消息,Id 的前半部分不能比存在 Id 最大的值小,Id 后半部分不能比存在前半部分相同的最大后半部分小。

否則,當(dāng)不滿足上述條件時(shí),添加后會(huì)拋出異常:

(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

實(shí)際上,當(dāng)添加一條消息時(shí),會(huì)進(jìn)行兩部操作。第一步,先判斷如果不存在 Streams,則創(chuàng)建 Streams 的名稱,再添加消息到 Streams 中。即使添加消息時(shí),由于 Id 異常,也可以在 Redis 中存在以當(dāng)前 Streams 的名稱。 Streams 中 Id 也可作為指針使用,因?yàn)樗且粋€(gè)有序的標(biāo)記。

生產(chǎn)中,如果這樣使用添加消息,會(huì)存在一個(gè)問題,那就是消息數(shù)量太大時(shí),會(huì)使服務(wù)宕機(jī)。這里 Streams 的設(shè)計(jì)初期也有考慮到這個(gè)問題,那就是可以指定 Streams 的容量。如果容量操作這個(gè)設(shè)定的值,就會(huì)對(duì)調(diào)舊的消息。在添加消息時(shí),設(shè)置 MAXLEN 參數(shù)。

XADD person MAXLEN 5 * name ytao des https://ytao.top

這樣就指定該了 Streams 中的容量為 5 條消息。也可使用 XTRIM 截取消息,從小到大剔除多余的消息:

XTRIM person MAXLEN 8

消息數(shù)量

查看消息數(shù)量使用 XLEN 指令進(jìn)行操作。

XLEN key

例:查看 person 流中的消息數(shù)量:

> XLEN person  (integer) 5

查詢消息

查詢 Streams 中的消息使用 XRANGE 和 XREVRANGE 指令。

XRANGE

查詢數(shù)據(jù)時(shí),可以按照指定 Id 范圍進(jìn)行查詢,XRANGE 查詢指令格式:

XRANGE key start end [COUNT count]

參數(shù)說明:

  •  key 為 Streams 的名稱

  •  start 為范圍查詢開始 Id,包含本 Id。

  •  start 為范圍查詢結(jié)束 Id,包含本 Id。

  •  Count 為查詢返回最大的消息數(shù)量,非必填。

這里 start 和 end 有-和+兩個(gè)非指定值,他們分別表示無窮小和無窮大,所以當(dāng)使用這個(gè)兩個(gè)值時(shí),會(huì)查詢出全部的消息。

> XRANGE person - +  1) 1) "0-1"     2) 1) "name"        2) "ytao"        3) "des"        4) "https://ytao.top"  2) 1) "0-2"     2) 1) "name"        2) "luffy"        3) "des"        4) "valiant!"  3) 1) "2-0"     2) 1) "name"        2) "gaga"        3) "des"        4) "fishion!"

上面查詢的消息數(shù)據(jù),可以看到是按照先進(jìn)先出的順序查詢出來的。

使用 COUNT 指定查詢返回的數(shù)量:

# 查詢所有的消息,并且返回一條數(shù)據(jù)  > XRANGE person - + COUNT 1  1) 1) "0-1"     2) 1) "name"        2) "ytao"        3) "des"        4) "https://ytao.top"

在范圍查詢中,Id 的后半部分可省略,后半部分中的數(shù)據(jù)會(huì)全部查詢到。

XREVRANGE

XREVRANGE 的查詢和 XRANGE 指令中的使用類似,但查詢的 start 和 end 參數(shù)順序進(jìn)行了調(diào)換:

XREVRANGE key end start [COUNT count]

使用案例:

> XREVRANGE person +  -  1) 1) "2-0"     2) 1) "name"        2) "gaga"        3) "des"        4) "fishion!"  2) 1) "0-2"     2) 1) "name"        2) "luffy"        3) "des"        4) "valiant!"  3) 1) "0-1"     2) 1) "name"        2) "ytao"        3) "des"        4) "https://ytao.top"

查詢后的結(jié)果與 XRANGE 的結(jié)果順序剛好相反,其他都一樣,這兩個(gè)指令可進(jìn)行消息的升序和降序的返回。

刪除消息

刪除消息使用 XDEL 指令操作,只需指定將要?jiǎng)h除的 Streams 名稱和 Id 即可,支持一次刪除多個(gè)消息 。

XDEL key ID [ID ...]

刪除案例:

# 查詢所有消息  > XRANGE person - +  1) 1) "0-1"     2) 1) "name"        2) "ytao"        3) "des"        4) "https://ytao.top"  2) 1) "0-2"     2) 1) "name"        2) "luffy"        3) "des"        4) "valiant!"  3) 1) "2-0"     2) 1) "name"        2) "gaga"        3) "des"        4) "fishion!"  # 刪除消息        > XDEL person 2-0  (integer) 1  # 再次查詢刪除后的所有消息  > XRANGE person - +  1) 1) "0-1"     2) 1) "name"        2) "ytao"        3) "des"        4) "https://ytao.top"  2) 1) "0-2"     2) 1) "name"        2) "luffy"        3) "des"        4) "valiant!"  # 查詢刪除后的長(zhǎng)度        > XLEN person  (integer) 2

從上面可以看到,刪除消息后,長(zhǎng)度也會(huì)減少相應(yīng)的數(shù)量。

消費(fèi)消息

在 Redis 的 PUB/SUB 中,我們是通過訂閱來消費(fèi)消息,在 Streams 數(shù)據(jù)結(jié)構(gòu)中,同樣也能實(shí)現(xiàn)同等功能,當(dāng)沒有新的消息時(shí),可進(jìn)行阻塞等待。不僅支持單獨(dú)消費(fèi),而且還可以支持群組消費(fèi)。

單獨(dú)消費(fèi)

單獨(dú)消費(fèi)使用 XREAD 指令??梢钥吹?,下面命令中,STREAMS,key, 以及 ID 為必填項(xiàng)。ID 表示將要讀取大于該 ID 的消息。當(dāng) ID 值使用 $ 賦予時(shí),表示已存在消息的最大 Id 值。

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

上面的 COUNT 參數(shù)用來指定讀取的最大數(shù)量,與 XRANGE 的用法一樣。

> XREAD COUNT 1 STREAMS person 0  1) 1) "person"     2) 1) 1) "0-1"           2) 1) "name"              2) "ytao"              3) "des"              4) "https://ytao.top"  > XREAD COUNT 2 STREAMS person 0  1) 1) "person"     2) 1) 1) "0-1"           2) 1) "name"              2) "ytao"              3) "des"              4) "https://ytao.top"        2) 1) "0-2"           2) 1) "name"              2) "luffy"              3) "des"              4) "valiant!"

在 XREAD 里面還有個(gè) BLOCK 參數(shù),這個(gè)是用來阻塞訂閱消息的,BLOCK 攜帶的參數(shù)為阻塞時(shí)間,單位為毫秒,如果在這個(gè)時(shí)間內(nèi)沒有新的消息消費(fèi),那么就會(huì)釋放該阻塞。當(dāng)這里的時(shí)間指定為 0 時(shí),會(huì)一直阻塞,直到有新的消息來消費(fèi)到。

# 窗口 1 開啟阻塞,等待新消息的到來  > XREAD BLOCK 0 STREAMS person $  # 另開一個(gè)連接窗口 2,添加一條新的消息  > XADD person 2-2 name tao des coder  "2-2"  # 窗口 1,獲取到有新的消息來消費(fèi),并且?guī)в凶枞臅r(shí)間  > XREAD BLOCK 0 STREAMS person $  1) 1) "person"     2) 1) 1) "2-2"           2) 1) "name"              2) "tao"              3) "des"              4) "coder"  (60.81s)

當(dāng)使用 XREAD 進(jìn)行順序消費(fèi)時(shí),需要額外記錄下讀取到位置的 Id,方便下次繼續(xù)消費(fèi)。

群組消費(fèi)

群組消費(fèi)的主要目的也就是為了分流消息給不同的客戶端處理,以更高效的速率處理消息。為達(dá)到這一肝功能需求,我們需要做三件事:創(chuàng)建群組,群組讀取消息,向服務(wù)端確認(rèn)消息以處理。

群組操作

操作群組使用 XGROUP 指令:

XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]

上面命令中,包含操作有:

  •  CREATE 創(chuàng)建消費(fèi)組。

  •  SETID 修改下一個(gè)處理消息的 Id。

  •  DESTROY 銷毀消費(fèi)組。

  •  DELCONSUMER 刪除消費(fèi)組中指定的消費(fèi)者。

我們當(dāng)前需要使用的是創(chuàng)建消費(fèi)組:

# 以當(dāng)前存在的最大 Id 作為消費(fèi)起始   > XGROUP CREATE person group1 $  OK

群組讀取消息

群組讀取使用 XREADGROUP 指令,COUNT和BLOCK的使用類似 XREAD 的操作,只是多了個(gè)群組和消費(fèi)者的指定:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

由于群組消費(fèi)和單獨(dú)消費(fèi)類似,這里只進(jìn)行個(gè)阻塞分析,這里 Id 也有個(gè)特殊值>,表示還未進(jìn)行消費(fèi)的消息:

# 窗口 1,消費(fèi)群組中,taotao 消費(fèi)者建立阻塞監(jiān)聽  XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >  # 窗口 2,消費(fèi)群組中,yangyang 消費(fèi)者建立阻塞監(jiān)聽   XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >  # 窗口 3,添加消費(fèi)消息  > XADD person 3-1 name tony des 666  "3-1"  # 窗口 1,讀取到新消息,此時(shí) 窗口 2 沒有任何反應(yīng)  > XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >  1) 1) "person"     2) 1) 1) "3-1"           2) 1) "name"              2) "tony"              3) "des"              4) "666"  (77.54s)  # 窗口 3,再次添加消費(fèi)消息  > XADD person 3-2 name james des abc!  "3-2"  # 窗口 2,讀取到新消息,此時(shí) 窗口 1 沒有任何反應(yīng)  > XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >  1) 1) "person"     2) 1) 1) "3-2"           2) 1) "name"              2) "james"              3) "des"              4) "abc!"  (76.36s)

以上執(zhí)行流程中,group1 群組中有兩個(gè)消費(fèi)者,當(dāng)添加兩條消息后,這兩個(gè)消費(fèi)者輪流消費(fèi)。

消息ACK

消息消費(fèi)后,為避免再次重復(fù)消費(fèi),這是需要向服務(wù)端發(fā)送 ACK,確保消息被消費(fèi)后的標(biāo)記。 例如下列情況,我們上面我們將最新兩條消息已進(jìn)行了消費(fèi),但是當(dāng)我們?cè)俅巫x取消息時(shí),還是被讀到:

>  XREADGROUP GROUP group1 yangyang STREAMS person 0  1) 1) "person"     2) 1) 1) "3-2"           2) 1) "name"              2) "james"              3) "des"              4) "abc!"

這時(shí),我們使用 XACK 指令告訴服務(wù)器,我們已處理的消息:

XACK key group ID [ID ...]0

讓服務(wù)器標(biāo)記 3-2 已處理:

> XACK person group1 3-2  (integer) 1

再次獲取群組讀取消息:

>  XREADGROUP GROUP group1 yangyang STREAMS person 0  1) 1) "person"     2) (empty list or set)

隊(duì)列中沒有了可讀消息。 除了上面以講解到的 API 外,查看消費(fèi)群組信息可使用 XINFO 指令查看。

上面對(duì) Streams 常用 API 進(jìn)行了分析,我們可以感受到 Redis 在消息隊(duì)列支持的道路上,也越來越強(qiáng)大。如果使用過它的 PUB/SUB 功能的話,就會(huì)感受到 5.x 迭代正是將你的一些痛點(diǎn)進(jìn)行了優(yōu)化。

上述就是小編為大家分享的如何進(jìn)行Redis5新特性中Streams作消息隊(duì)列的分析了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI