溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Redis中Stream類型怎么用

發(fā)布時(shí)間:2021-11-10 10:44:33 來源:億速云 閱讀:158 作者:小新 欄目:開發(fā)技術(shù)

這篇文章主要介紹Redis中Stream類型怎么用,文中介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們一定要看完!

一、背景

最近在看redis這方面的知識(shí),發(fā)現(xiàn)在redis5中產(chǎn)生了一種新的數(shù)據(jù)類型Stream,它和kafka的設(shè)計(jì)有些類似,可以當(dāng)作一個(gè)簡(jiǎn)單的消息隊(duì)列來使用。

二、redis中Stream類型的特點(diǎn)

  • 是可持久化的,可以保證數(shù)據(jù)不丟失。

  • 支持消息的多播、分組消費(fèi)。

  • 支持消息的有序性。

三、Stream的結(jié)構(gòu)

Redis中Stream類型怎么用

解釋:

消費(fèi)者組: Consumer Group,即使用 XGROUP CREATE 命令創(chuàng)建的,一個(gè)消費(fèi)者組中可以存在多個(gè)消費(fèi)者,這些消費(fèi)者之間是競(jìng)爭(zhēng)關(guān)系。

  • 同一條消息,只能被這個(gè)消費(fèi)者組中的某個(gè)消費(fèi)者獲取。

  • 多個(gè)消費(fèi)者之間是相互獨(dú)立的,互不干擾。

消費(fèi)者: Consumer 消費(fèi)消息。

last_delivered_id: 這個(gè)id保證了在同一個(gè)消費(fèi)者組中,一個(gè)消息只能被一個(gè)消費(fèi)者獲取。每當(dāng)消費(fèi)者組的某個(gè)消費(fèi)者讀取到了這個(gè)消息后,這個(gè)last_delivered_id的值會(huì)往后移動(dòng)一位,保證消費(fèi)者不會(huì)讀取到重復(fù)的消息。

pending_ids:記錄了消費(fèi)者讀取到的消息id列表,但是這些消息可能還沒有處理,如果認(rèn)為某個(gè)消息處理,需要調(diào)用ack命令。這樣就確保了某個(gè)消息一定會(huì)被執(zhí)行一次。

消息內(nèi)容:是一個(gè)鍵值對(duì)的格式。

Stream 中 消息的 ID: 默認(rèn)情況下,ID使用 * ,redis可以自動(dòng)生成一個(gè),格式為 時(shí)間戳-序列號(hào),也可以自己指定,一般使用默認(rèn)生成的即可,且后生成的id號(hào)要比之前生成的大。

四、Stream的命令

1、XADD 往Stream末尾添加消息

1、命令格式
xadd key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]

Redis中Stream類型怎么用

2、舉例

xadd 命令 返回的是數(shù)據(jù)的id, xx-yy (xx指的是毫秒數(shù),yy指的是在這個(gè)毫秒內(nèi)的第幾條消息)

1、向流中增加一條數(shù)據(jù),

127.0.0.1:6379> xadd stream-key * username zhangsan # 向stream-key這個(gè)流中增加一個(gè) username 是zhangsan的數(shù)據(jù) *表示自動(dòng)生成id
"1635999858912-0" # 返回的是ID
127.0.0.1:6379> keys *
1) "stream-key" # 可以看到stream自動(dòng)創(chuàng)建了
127.0.0.1:6379>

2、向流中增加數(shù)據(jù),不自動(dòng)創(chuàng)建流

127.0.0.1:6379> xadd not-exists-stream nomkstream * username lisi # 因?yàn)橹付薾omkstream參數(shù),而not-exists-stream之前不存在,所以加入失敗
(nil)
127.0.0.1:6379> keys *
(empty array)
127.0.0.1:6379>

3、手動(dòng)指定ID的值

127.0.0.1:6379> xadd stream-key 1-1 username lisi # 此處id的值是自己傳遞的1-1,而不是使用*自動(dòng)生成
"1-1" # 返回的是id的值
127.0.0.1:6379>

4、設(shè)置一個(gè)固定大小的Stream1、精確指定Stream的大小

指定指定Stream的大小比模糊指定Stream的大小會(huì)稍微多少消耗一些性能。

Redis中Stream類型怎么用

2、模糊指定Stream的大小

127.0.0.1:6379> xadd stream-key maxlen ~ 1 * first first
"1636001034141-0"
127.0.0.1:6379> xadd stream-key maxlen ~ 1 * second second
"1636001044506-0"
127.0.0.1:6379> xadd stream-key maxlen ~ 1 * third third
"1636001057846-0"
127.0.0.1:6379> xinfo stream stream-key
 1) "length"
 2) (integer) 3
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "last-generated-id"
 8) "1636001057846-0"
 9) "groups"
10) (integer) 0
11) "first-entry"
12) 1) "1636001034141-0"
    2) 1) "first"
       2) "first"
13) "last-entry"
14) 1) "1636001057846-0"
    2) 1) "third"
       2) "third"
127.0.0.1:6379>

~ 模糊指定流的大小,可以看到指定的是1,實(shí)際上已經(jīng)到了3.

2、XRANGE查看Stream中的消息

1、命令格式
xrange key start end [COUNT count]

Redis中Stream類型怎么用

2、準(zhǔn)備數(shù)據(jù)
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> xadd stream-key * username zhangsan
QUEUED
127.0.0.1:6379(TX)> xadd stream-key * username lisi
QUEUED
127.0.0.1:6379(TX)> exec
1) "1636003481706-0"
2) "1636003481706-1"
127.0.0.1:6379> xadd stream-key * username wangwu
"1636003499055-0"
127.0.0.1:6379>

使用redis的事務(wù)操作,獲取到同一毫秒產(chǎn)生的多條數(shù)據(jù),時(shí)間戳一樣,序列號(hào)不一樣

3、舉例

1、獲取所有的數(shù)據(jù)(-+的使用)

127.0.0.1:6379> xrange stream-key - +
1) 1) "1636003481706-0"
   2) 1) "username"
      2) "zhangsan"
2) 1) "1636003481706-1"
   2) 1) "username"
      2) "lisi"
3) 1) "1636003499055-0"
   2) 1) "username"
      2) "wangwu"
127.0.0.1:6379>

-: 表示最小id的值

+:表示最大id的值

2、獲取指定id范圍內(nèi)的數(shù)據(jù),閉區(qū)間

127.0.0.1:6379> xrange stream-key 1636003481706-1 1636003499055-0
1) 1) "1636003481706-1"
   2) 1) "username"
      2) "lisi"
2) 1) "1636003499055-0"
   2) 1) "username"
      2) "wangwu"
127.0.0.1:6379>

3、獲取指定id范圍內(nèi)的數(shù)據(jù),開區(qū)間

127.0.0.1:6379> xrange stream-key (1636003481706-0 (1636003499055-0
1) 1) "1636003481706-1"
   2) 1) "username"
      2) "lisi"
127.0.0.1:6379>

(:表示開區(qū)間

4、獲取某個(gè)毫秒后所有的數(shù)據(jù)

127.0.0.1:6379> xrange stream-key 1636003481706 +
1) 1) "1636003481706-0"
   2) 1) "username"
      2) "zhangsan"
2) 1) "1636003481706-1"
   2) 1) "username"
      2) "lisi"
3) 1) "1636003499055-0"
   2) 1) "username"
      2) "wangwu"
127.0.0.1:6379>

直接寫毫秒不寫后面的序列號(hào)即可。

5、獲取單條數(shù)據(jù)

127.0.0.1:6379> xrange stream-key 1636003499055-0 1636003499055-0
1) 1) "1636003499055-0"
   2) 1) "username"
      2) "wangwu"
127.0.0.1:6379>

startend的值寫的一樣即可獲取單挑數(shù)據(jù)。

6、獲取固定條數(shù)的數(shù)據(jù)

127.0.0.1:6379> xrange stream-key - + count 1
1) 1) "1636003481706-0"
   2) 1) "username"
      2) "zhangsan"
127.0.0.1:6379>

使用 count進(jìn)行限制

3、XREVRANGE反向查看Stream中的消息

XREVRANGE key end start [COUNT count]

使用方式和XRANGE類似,略。

4、XDEL刪除消息

1、命令格式
xdel key ID [ID ...]
2、準(zhǔn)備數(shù)據(jù)
127.0.0.1:6379> xadd stream-key * username zhangsan
"1636004176924-0"
127.0.0.1:6379> xadd stream-key * username lisi
"1636004183638-0"
127.0.0.1:6379> xadd stream-key * username wangwu
"1636004189211-0"
127.0.0.1:6379>
3、舉例

需求:往Stream中加入3條消息,然后刪除第2條消息

127.0.0.1:6379> xdel stream-key 1636004183638-0
(integer) 1 # 返回的是刪除記錄的數(shù)量
127.0.0.1:6379> xrang stream -key - +
127.0.0.1:6379> xrange stream-key - +
1) 1) "1636004176924-0"
   2) 1) "username"
      2) "zhangsan"
2) 1) "1636004189211-0"
   2) 1) "username"
      2) "wangwu"
127.0.0.1:6379>

注意:

需要注意的是,我們從Stream中刪除一個(gè)消息,這個(gè)消息并不是被真正的刪除了,而是被標(biāo)記為刪除,這個(gè)時(shí)候這個(gè)消息還是占據(jù)著內(nèi)容空間的。如果所有Stream中所有的消息都被標(biāo)記刪除,這個(gè)時(shí)候才會(huì)回收內(nèi)存空間。但是這個(gè)Stream并不會(huì)被刪除。

5、XLEN查看Stream中元素的長(zhǎng)度

1、命令格式
xlen key
2、舉例

查看Stream中元素的長(zhǎng)度

127.0.0.1:6379> xadd stream-key * username zhangsan
"1636004690578-0"
127.0.0.1:6379> xlen stream-key
(integer) 1
127.0.0.1:6379> xlen not-exists-stream-key
(integer) 0
127.0.0.1:6379>

注意:

如果xlen后方的key不存在則返回0,否則返回元素的個(gè)數(shù)。

6、XTRIM對(duì)Stream中的元素進(jìn)行修剪

1、命令格式
xtrim key MAXLEN|MINID [=|~] threshold [LIMIT count]
2、準(zhǔn)備數(shù)據(jù)
127.0.0.1:6379>  xadd stream-key * username zhangsan
"1636009745401-0"
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> xadd stream-key * username lisi
QUEUED
127.0.0.1:6379(TX)> xadd stream-key * username wangwu
QUEUED
127.0.0.1:6379(TX)> exec
1) "1636009763955-0"
2) "1636009763955-1"
127.0.0.1:6379> xadd stream-key * username zhaoliu
"1636009769625-0"
127.0.0.1:6379>
3、舉例

1、maxlen精確限制

127.0.0.1:6379> xtrim stream-key maxlen 2 # 保留最后的2個(gè)消息
(integer) 2
127.0.0.1:6379> xrange stream-key - + # 可以看到之前加入的2個(gè)消息被刪除了
1) 1) "1636009763955-1"
   2) 1) "username"
      2) "wangwu"
2) 1) "1636009769625-0"
   2) 1) "username"
      2) "zhaoliu"
127.0.0.1:6379>

上方的意思是,保留stream-key這個(gè)Stream中最后的2個(gè)消息。

2、minid模糊限制

minid 是刪除比這個(gè)id小的數(shù)據(jù),本地測(cè)試的時(shí)候沒有測(cè)試出來,略。

7、XREAD獨(dú)立消費(fèi)消息

XREAD只是讀取消息,讀取完之后并不會(huì)刪除消息。 使用XREAD讀取消息,是完全獨(dú)立與消費(fèi)者組的,多個(gè)客戶端可以同時(shí)讀取消息。

1、命令格式
xread [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

Redis中Stream類型怎么用

2、準(zhǔn)備數(shù)據(jù)
127.0.0.1:6379> xadd stream-key * username zhangsan
"1636011801365-0"
127.0.0.1:6379> xadd stream-key * username lisi
"1636011806261-0"
127.0.0.1:6379> xadd stream-key * username wangwu
"1636011810905-0"
127.0.0.1:6379>
3、舉例

1、獲取用戶名是wangwu的數(shù)據(jù)

127.0.0.1:6379> xread streams stream-key 1636011806261-0 # 此處寫的是lisi的id,即讀取到的數(shù)據(jù)需要是 > 1636011806261-0
1) 1) "stream-key"
   2) 1) 1) "1636011810905-0"
         2) 1) "username"
            2) "wangwu"

2、獲取2條數(shù)據(jù)

127.0.0.1:6379> xread count 2 streams stream-key 0-0
1) 1) "stream-key"
   2) 1) 1) "1636011801365-0"
         2) 1) "username"
            2) "zhangsan"
      2) 1) "1636011806261-0"
         2) 1) "username"
            2) "lisi"
127.0.0.1:6379>

count限制單次讀取最后的消息,因?yàn)楫?dāng)前讀取可能沒有這么多。

3、非阻塞讀取Stream對(duì)尾的數(shù)據(jù)

即讀取隊(duì)列尾的下一個(gè)消息,在非阻塞模式下始終是nil

127.0.0.1:6379> xread streams stream-key $
(nil)

4、阻塞讀取Stream對(duì)尾的數(shù)據(jù)

Redis中Stream類型怎么用

注意:

  • $表示讀取隊(duì)列最新進(jìn)來的一個(gè)消息,不是Stream的最后一個(gè)消息。是xread block執(zhí)行后,再次使用xadd添加消息后,xread block才會(huì)返回。

  • block 0表示永久阻塞,當(dāng)消息到來時(shí),才接觸阻塞。block 1000表示阻塞1000ms,如果1000ms還沒有消息到來,則返回nil

  • xread進(jìn)行順序消費(fèi) 當(dāng)使用xread進(jìn)行順序消息時(shí),需要記住返回的消息id,同時(shí)下次調(diào)用xread時(shí),需要將上次返回的消息id傳遞進(jìn)去。

  • xread讀取消息,完全無視消費(fèi)組,此時(shí)Stream就可以理解為一個(gè)普通的list。

8、消費(fèi)者組相關(guān)操作

1、消費(fèi)者組命令

Redis中Stream類型怎么用

2、準(zhǔn)備數(shù)據(jù)

1、創(chuàng)建Stream的名稱是 stream-key

2、創(chuàng)建2個(gè)消息,aa和bb

127.0.0.1:6379> xadd stream-key * aa aa
"1636362619125-0"
127.0.0.1:6379> xadd stream-key * bb bb
"1636362623191-0"
3、創(chuàng)建消費(fèi)者組

1、創(chuàng)建一個(gè)從頭開始消費(fèi)的消費(fèi)者組

xgroup create stream-key(Stream 名) g1(消費(fèi)者組名) 0-0(表示從頭開始消費(fèi))

2、創(chuàng)建一個(gè)從Stream最新的一個(gè)消息消費(fèi)的消費(fèi)者組

xgroup create stream-key g2 $

$表示從最后一個(gè)元素消費(fèi),不包括Stream中的最后一個(gè)元素,即消費(fèi)最新的消息。

4、創(chuàng)建一個(gè)從某個(gè)消息之后消費(fèi)的消費(fèi)者組
xgroup create stream-key g3 1636362619125-0  #1636362619125-0 這個(gè)是上方aa消息的id的值

1636362619125-0某個(gè)消息的具體的ID,這個(gè)g3消費(fèi)者組中的消息都是大于>這個(gè)id的消息。

3、從消費(fèi)者中讀取消息

127.0.0.1:6379> xreadgroup group g1(消費(fèi)組名) c1(消費(fèi)者名,自動(dòng)創(chuàng)建) count 3(讀取3條) streams stream-key(Stream 名) >(從該消費(fèi)者組中還未分配給另外的消費(fèi)者的消息開始讀取)
1) 1) "stream-key"
   2) 1) 1) "1636362619125-0"
         2) 1) "aa"
            2) "aa"
      2) 1) "1636362623191-0"
         2) 1) "bb"
            2) "bb"
127.0.0.1:6379> xreadgroup group g2 c1 count 3 streams stream-key >
(nil) # 返回 nil 是因?yàn)?nbsp;g2消費(fèi)組是從最新的一條信息開始讀取(創(chuàng)建消費(fèi)者組時(shí)使用了$),需要在另外的窗口執(zhí)行`xadd`命令,才可以再次讀取到消息
127.0.0.1:6379> xreadgroup group g3 c1 count 3 streams stream-key >  #只讀取到一條消息是因?yàn)?,在?chuàng)建消費(fèi)者組時(shí),指定了aa消息的id,bb消息的id大于aa,所以讀取出來了。
1) 1) "stream-key"
   2) 1) 1) "1636362623191-0"
         2) 1) "bb"
            2) "bb"
127.0.0.1:6379>

4、讀取消費(fèi)者的pending消息

127.0.0.1:6379> xgroup create stream-key g4 0-0
OK
127.0.0.1:6379> xinfo consumers stream-key g1
1) 1) "name"
   2) "c1"
   3) "pending"
   4) (integer) 2
   5) "idle"
   6) (integer) 88792
127.0.0.1:6379> xinfo consumers stream-key g4
(empty array)
127.0.0.1:6379> xreadgroup group g1 c1 count 1 streams stream-key 1636362619125-0
1) 1) "stream-key"
   2) 1) 1) "1636362623191-0"
         2) 1) "bb"
            2) "bb"
127.0.0.1:6379> xreadgroup group g4 c1 count 1 block 0 streams stream-key 1636362619125-0
1) 1) "stream-key"
   2) (empty array)
127.0.0.1:6379>

Redis中Stream類型怎么用

5、轉(zhuǎn)移消費(fèi)者的消息

127.0.0.1:6379> xpending stream-key g1 - + 10 c1
1) 1) "1636362619125-0"
   2) "c1"
   3) (integer) 2686183
   4) (integer) 1
2) 1) "1636362623191-0"
   2) "c1"
   3) (integer) 102274
   4) (integer) 7
127.0.0.1:6379> xpending stream-key g1 - + 10 c2
(empty array)
127.0.0.1:6379> xclaim stream-key g1 c2 102274 1636362623191-0
1) 1) "1636362623191-0"
   2) 1) "bb"
      2) "bb"
127.0.0.1:6379> xpending stream-key g1 - + 10 c2
1) 1) "1636362623191-0"
   2) "c2"
   3) (integer) 17616
   4) (integer) 8
127.0.0.1:6379>

Redis中Stream類型怎么用

也可以通過xautoclaim來實(shí)現(xiàn)。

6、一些監(jiān)控命令

1、查看消費(fèi)組中消費(fèi)者的pending消息

127.0.0.1:6379> xpending stream-key g1 - + 10 c2
1) 1) "1636362623191-0"
   2) "c2"
   3) (integer) 1247680
   4) (integer) 8
127.0.0.1:6379>

2、查看消費(fèi)組中的消費(fèi)者信息

127.0.0.1:6379> xinfo consumers stream-key g1
1) 1) "name"
   2) "c1"
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 1474864
2) 1) "name"
   2) "c2"
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 1290069
127.0.0.1:6379>

3、查看消費(fèi)組信息

127.0.0.1:6379> xinfo groups stream-key
1) 1) "name"
   2) "g1"
   3) "consumers"
   4) (integer) 2
   5) "pending"
   6) (integer) 2
   7) "last-delivered-id"
   8) "1636362623191-0"
2) 1) "name"
   2) "g2"
   3) "consumers"
......

4、查看Stream信息

127.0.0.1:6379> xinfo stream stream-key
 1) "length"
 2) (integer) 2
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "last-generated-id"
 8) "1636362623191-0"
 9) "groups"
10) (integer) 4
11) "first-entry"
12) 1) "1636362619125-0"
    2) 1) "aa"
       2) "aa"
13) "last-entry"
14) 1) "1636362623191-0"
    2) 1) "bb"
       2) "bb"
127.0.0.1:6379>

以上是“Redis中Stream類型怎么用”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對(duì)大家有幫助,更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI