您好,登錄后才能下訂單哦!
這篇文章主要介紹“Redis特殊數(shù)據(jù)類型之stream怎么應(yīng)用”,在日常操作中,相信很多人在Redis特殊數(shù)據(jù)類型之stream怎么應(yīng)用問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”Redis特殊數(shù)據(jù)類型之stream怎么應(yīng)用”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!
Redis Stream 是 Redis 5.0 版本新增加的數(shù)據(jù)類型,Redis 專門為消息隊(duì)列設(shè)計(jì)的數(shù)據(jù)類型。
在 Redis 5.0 Stream 沒(méi)出來(lái)之前,消息隊(duì)列的實(shí)現(xiàn)方式都有著各自的缺陷,例如:
發(fā)布訂閱模式,不能持久化也就無(wú)法可靠的保存消息,并且對(duì)于離線重連的客戶端不能讀取歷史消息的缺陷;
List 實(shí)現(xiàn)消息隊(duì)列的方式不能重復(fù)消費(fèi),一個(gè)消息消費(fèi)完就會(huì)被刪除,而且生產(chǎn)者需要自行實(shí)現(xiàn)全局唯一 ID。
基于以上問(wèn)題,Redis 5.0 便推出了 Stream 類型也是此版本最重要的功能,用于完美地實(shí)現(xiàn)消息隊(duì)列,它支持消息的持久化、支持自動(dòng)生成全局唯一 ID、支持 ack 確認(rèn)消息的模式、支持消費(fèi)組模式等,讓消息隊(duì)列更加的穩(wěn)定和可靠。
Stream 消息隊(duì)列操作命令:
XADD : 插入消息,保證有序,可以自動(dòng)生成全局唯一 ID
XDEL : 根據(jù)消息 ID 刪除消息;
DEL : 刪除整個(gè) Stream;
# XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|id field value [field value ...]
127.0.0.1:6379> XADD s1 * name sid10t
"1665047636078-0"
127.0.0.1:6379> XADD s1 * name sidiot
"1665047646214-0"
# XDEL key id [id ...]
127.0.0.1:6379> XDEL s1 1665047646214-0
(integer) 1
# DEL key [key ...]
127.0.0.1:6379> DEL s1
(integer) 1
XLEN : 查詢消息長(zhǎng)度;
XREAD : 用于讀取消息,可以按 ID 讀取數(shù)據(jù);
XRANGE : 讀取區(qū)間消息;
XTRIM : 裁剪隊(duì)列消息個(gè)數(shù);
# XLEN key
127.0.0.1:6379> XLEN s1
(integer) 2
# XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
127.0.0.1:6379> XREAD streams s1 0-0
1) 1) "s1"
2) 1) 1) "1665047636078-0"
2) 1) "name"
2) "sid10t"
2) 1) "1665047646214-0"
2) 1) "name"
2) "sidiot"
127.0.0.1:6379> XREAD count 1 streams s1 0-0
1) 1) "s1"
2) 1) 1) "1665047636078-0"
2) 1) "name"
2) "sid10t"
# XADD 了一條消息之后的擴(kuò)展
127.0.0.1:6379> XREAD streams s1 1665047636078-0
1) 1) "s1"
2) 1) 1) "1665047646214-0"
2) 1) "name"
2) "sidiot"
2) 1) "1665053702766-0"
2) 1) "age"
2) "18"
# XRANGE key start end [COUNT count]
127.0.0.1:6379> XRANGE s1 - +
1) 1) "1665047636078-0"
2) 1) "name"
2) "sid10t"
2) 1) "1665047646214-0"
2) 1) "name"
2) "sidiot"
3) 1) "1665053702766-0"
2) 1) "age"
2) "18"
127.0.0.1:6379> XRANGE s1 1665047636078-0 1665047646214-0
1) 1) "1665047636078-0"
2) 1) "name"
2) "sid10t"
2) 1) "1665047646214-0"
2) 1) "name"
2) "sidiot"
# XTRIM key MAXLEN|MINID [=|~] threshold [LIMIT count]
127.0.0.1:6379> XLEN s1
(integer) 3
127.0.0.1:6379> XTRIM s1 maxlen 2
(integer) 1
127.0.0.1:6379> XLEN s1
(integer) 2
XGROUP CREATE : 創(chuàng)建消費(fèi)者組;
XREADGROUP : 按消費(fèi)組形式讀取消息;
XPENDING 和 XACK :
XPENDING 命令可以用來(lái)查詢每個(gè)消費(fèi)組內(nèi)所有消費(fèi)者「已讀取、但尚未確認(rèn)」的消息;
XACK 命令用于向消息隊(duì)列確認(rèn)消息處理已完成;
# XGROUP CREATE key groupname id|$ [MKSTREAM] [ENTRIESREAD entries_read]
# 需要注意的是,XGROUP CREATE 的 streams 必須是一個(gè)存在的 streams,否則會(huì)報(bào)錯(cuò);
127.0.0.1:6379> XGROUP CREATE myStream cGroup-top 0-0
(error) ERR The XGROUP subcommand requires the key to exist. Note that for CREATE you may want to use the MKSTREAM option to create an empty stream automatically.
# 0-0 從頭開(kāi)始消費(fèi),$ 從尾開(kāi)始消費(fèi);
127.0.0.1:6379> XADD myStream * name sid10t
"1665057823181-0"
127.0.0.1:6379> XGROUP CREATE myStream cGroup-top 0-0
OK
127.0.0.1:6379> XGROUP CREATE myStream cGroup-tail $
OK
# XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
127.0.0.1:6379> XREADGROUP Group cGroup-top name count 2 STREAMS myStream >
1) 1) "myStream"
2) 1) 1) "1665058086931-0"
2) 1) "name"
2) "sid10t"
2) 1) "1665058090167-0"
2) 1) "name"
2) "sidiot"
消息隊(duì)列
生產(chǎn)者通過(guò) XADD 命令插入一條消息:
# * 表示讓 Redis 為插入的數(shù)據(jù)自動(dòng)生成一個(gè)全局唯一的 ID
# 往名稱為 mymq 的消息隊(duì)列中插入一條消息,消息的鍵是 name,值是 sid10t
127.0.0.1:6379> XADD mymq * name sid10t
"1665058759764-0"
插入成功后會(huì)返回全局唯一的 ID:"1665058759764-0"。消息的全局唯一 ID 由兩部分組成:
第一部分 “1665058759764” 是數(shù)據(jù)插入時(shí),以毫秒為單位計(jì)算的當(dāng)前服務(wù)器時(shí)間;
第二部分表示插入消息在當(dāng)前毫秒內(nèi)的消息序號(hào),這是從 0 開(kāi)始編號(hào)的。例如,“1665058759764-0” 就表示在 “1665058759764” 毫秒內(nèi)的第 1 條消息。
消費(fèi)者通過(guò) XREAD 命令從消息隊(duì)列中讀取消息時(shí),可以指定一個(gè)消息 ID,并從這個(gè)消息 ID 的下一條消息開(kāi)始進(jìn)行讀?。ㄗ⒁馐禽斎胂?ID 的下一條信息開(kāi)始讀取,不是查詢輸入 ID 的消息)。
127.0.0.1:6379> XREAD STREAMS mymq 1665058759764-0
(nil)
127.0.0.1:6379> XREAD STREAMS mymq 1665058759763-0
1) 1) "mymq"
2) 1) 1) "1665058759764-0"
2) 1) "name"
2) "sid10t"
如果想要實(shí)現(xiàn)阻塞讀(當(dāng)沒(méi)有數(shù)據(jù)時(shí),阻塞?。?,可以調(diào)用 XRAED 時(shí)設(shè)定 BLOCK 配置項(xiàng),實(shí)現(xiàn)類似于 BRPOP 的阻塞讀取操作。
比如,下面這命令,設(shè)置了 BLOCK 10000 的配置項(xiàng),10000 的單位是毫秒,表明 XREAD 在讀取最新消息時(shí),如果沒(méi)有消息到來(lái),XREAD 將阻塞 10000 毫秒(即 10 秒),然后再返回。
# 命令最后的 $ 符號(hào)表示讀取最新的消息
127.0.0.1:6379> XREAD BLOCK 10000 STREAMS mymq $
(nil)
(10.01s)
Stream 的基礎(chǔ)方法,使用 xadd 存入消息和 xread 循環(huán)阻塞讀取消息的方式可以實(shí)現(xiàn)簡(jiǎn)易版的消息隊(duì)列,交互流程如下圖所示:
前面介紹的這些操作 List 也支持的,接下來(lái)看看 Stream 特有的功能。
Stream 可以以使用 XGROUP 創(chuàng)建消費(fèi)組,創(chuàng)建消費(fèi)組之后,Stream 可以使用 XREADGROUP 命令讓消費(fèi)組內(nèi)的消費(fèi)者讀取消息。
創(chuàng)建兩個(gè)消費(fèi)組,這兩個(gè)消費(fèi)組消費(fèi)的消息隊(duì)列是 mymq,都指定從第一條消息開(kāi)始讀?。?/p>
# 創(chuàng)建一個(gè)名為 group1 的消費(fèi)組,0-0 表示從第一條消息開(kāi)始讀取。
127.0.0.1:6379> XGROUP CREATE mymq group1 0-0
OK
# 創(chuàng)建一個(gè)名為 group2 的消費(fèi)組,0-0 表示從第一條消息開(kāi)始讀取。
127.0.0.1:6379> XGROUP CREATE mymq group2 0-0
OK
消費(fèi)組 group1 內(nèi)的消費(fèi)者 consumer1 從 mymq 消息隊(duì)列中讀取所有消息的命令如下:
# 命令最后的參數(shù)“>”,表示從第一條尚未被消費(fèi)的消息開(kāi)始讀取。
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 STREAMS mymq >
1) 1) "mymq"
2) 1) 1) "1665058759764-0"
2) 1) "name"
2) "sid10t"
消息隊(duì)列中的消息一旦被消費(fèi)組里的一個(gè)消費(fèi)者讀取了,就不能再被該消費(fèi)組內(nèi)的其他消費(fèi)者讀取了,即同一個(gè)消費(fèi)組里的消費(fèi)者不能消費(fèi)同一條消息。
比如說(shuō),我們執(zhí)行完剛才的 XREADGROUP 命令后,再執(zhí)行一次同樣的命令,此時(shí)讀到的就是空值了:
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 STREAMS mymq >
(nil)
但是,不同消費(fèi)組的消費(fèi)者可以消費(fèi)同一條消息(但是有前提條件,創(chuàng)建消息組的時(shí)候,不同消費(fèi)組指定了相同位置開(kāi)始讀取消息) 。
比如說(shuō),剛才 group1 消費(fèi)組里的 consumer1 消費(fèi)者消費(fèi)了一條 id 為 1665058759764-0 的消息,現(xiàn)在用 group2 消費(fèi)組里的 consumer1 消費(fèi)者消費(fèi)消息:
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 STREAMS mymq >
1) 1) "mymq"
2) 1) 1) "1665058759764-0"
2) 1) "name"
2) "sid10t"
因?yàn)槲覄?chuàng)建兩組的消費(fèi)組都是從第一條消息開(kāi)始讀取,所以可以看到第二組的消費(fèi)者依然可以消費(fèi) id 為 1665058759764-0 的這一條消息。因此,不同的消費(fèi)組的消費(fèi)者可以消費(fèi)同一條消息。
使用消費(fèi)組的目的是讓組內(nèi)的多個(gè)消費(fèi)者共同分擔(dān)讀取消息,所以,我們通常會(huì)讓每個(gè)消費(fèi)者讀取部分消息,從而實(shí)現(xiàn)消息讀取負(fù)載在多個(gè)消費(fèi)者間是均衡分布的。
例如,我們執(zhí)行下列命令,讓 group2 中的 consumer1、2、3 各自讀取一條消息。
# 讓 group2 中的 consumer1 從 mymq 消息隊(duì)列中消費(fèi)一條消息
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 1 STREAMS mymq >
1) 1) "mymq"
2) 1) 1) "1665060632864-0"
2) 1) "name"
2) "sid10t"
# 讓 group2 中的 consumer2 從 mymq 消息隊(duì)列中消費(fèi)一條消息
127.0.0.1:6379> XREADGROUP GROUP group2 consumer2 COUNT 1 STREAMS mymq >
1) 1) "mymq"
2) 1) 1) "1665060633903-0"
2) 1) "name"
2) "sid10t"
# 讓 group2 中的 consumer3 從 mymq 消息隊(duì)列中消費(fèi)一條消息
127.0.0.1:6379> XREADGROUP GROUP group2 consumer3 COUNT 1 STREAMS mymq >
1) 1) "mymq"
2) 1) 1) "1665060634962-0"
2) 1) "name"
2) "sid10t"
基于 Stream 實(shí)現(xiàn)的消息隊(duì)列,如何保證消費(fèi)者在發(fā)生故障或宕機(jī)再次重啟后,仍然可以讀取未處理完的消息?
Streams 會(huì)自動(dòng)使用內(nèi)部隊(duì)列(也稱為 PENDING List)留存消費(fèi)組里每個(gè)消費(fèi)者讀取的消息,直到消費(fèi)者使用 XACK 命令通知 Streams “消息已經(jīng)處理完成”。
消費(fèi)確認(rèn)增加了消息的可靠性,一般在業(yè)務(wù)處理完成之后,需要執(zhí)行 XACK 命令確認(rèn)消息已經(jīng)被消費(fèi)完成,整個(gè)流程的執(zhí)行如下圖所示:
如果消費(fèi)者沒(méi)有成功處理消息,它就不會(huì)給 Streams 發(fā)送 XACK 命令,消息仍然會(huì)留存。此時(shí),消費(fèi)者可以在重啟后,用 XPENDING 命令查看已讀取、但尚未確認(rèn)處理完成的消息。
例如,我們來(lái)查看一下 group2 中各個(gè)消費(fèi)者已讀取、但尚未確認(rèn)的消息個(gè)數(shù),命令如下:
127.0.0.1:6379> XPENDING mymq group2
1) (integer) 4
2) "1665058759764-0"
3) "1665060634962-0"
4) 1) 1) "consumer1"
2) "2"
2) 1) "consumer2"
2) "1"
3) 1) "consumer3"
2) "1"
如果想查看某個(gè)消費(fèi)者具體讀取了哪些數(shù)據(jù),可以執(zhí)行下面的命令:
# 查看 group2 里 consumer2 已從 mymq 消息隊(duì)列中讀取了哪些消息
127.0.0.1:6379> XPENDING mymq group2 - + 10 consumer2
1) 1) "1665060633903-0"
2) "consumer2"
3) (integer) 1888805
4) (integer) 1
可以看到,consumer2 已讀取的消息的 ID 是 1665060633903-0。
一旦消息 1665060633903-0 被 consumer2 處理了,consumer2 就可以使用 XACK 命令通知 Streams,然后這條消息就會(huì)被刪除。
127.0.0.1:6379> XACK mymq group2 1665060633903-0
(integer) 1
當(dāng)我們?cè)偈褂?XPENDING 命令查看時(shí),就可以看到,consumer2 已經(jīng)沒(méi)有已讀取、但尚未確認(rèn)處理的消息了。
127.0.0.1:6379> XPENDING mymq group2 - + 10 consumer2
(empty array)
好了,基于 Stream 實(shí)現(xiàn)的消息隊(duì)列就說(shuō)到這里了,小結(jié)一下:
消息保序:XADD/XREAD
阻塞讀?。篨READ block
重復(fù)消息處理:Stream 在使用 XADD 命令,會(huì)自動(dòng)生成全局唯一 ID;
消息可靠性:內(nèi)部使用 PENDING List 自動(dòng)保存消息,使用 XPENDING 命令查看消費(fèi)組已經(jīng)讀取但是未被確認(rèn)的消息,消費(fèi)者使用 XACK 確認(rèn)消息;
支持消費(fèi)組形式消費(fèi)數(shù)據(jù)
Redis 基于 Stream 消息隊(duì)列與專業(yè)的消息隊(duì)列有哪些差距?
一個(gè)專業(yè)的消息隊(duì)列,必須要做到兩大塊:
消息不可丟。
消息可堆積。
1、Redis Stream 消息會(huì)丟失嗎?
使用一個(gè)消息隊(duì)列,其實(shí)就分為三大塊:生產(chǎn)者、隊(duì)列中間件、消費(fèi)者,所以要保證消息就是保證三個(gè)環(huán)節(jié)都不能丟失數(shù)據(jù)。
Redis Stream 消息隊(duì)列能不能保證三個(gè)環(huán)節(jié)都不丟失數(shù)據(jù)?
Redis 生產(chǎn)者會(huì)不會(huì)丟消息?生產(chǎn)者會(huì)不會(huì)丟消息,取決于生產(chǎn)者對(duì)于異常情況的處理是否合理。 從消息被生產(chǎn)出來(lái),然后提交給 MQ 的過(guò)程中,只要能正常收到 ( MQ 中間件) 的 ack 確認(rèn)響應(yīng),就表示發(fā)送成功,所以只要處理好返回值和異常,如果返回異常則進(jìn)行消息重發(fā),那么這個(gè)階段是不會(huì)出現(xiàn)消息丟失的。
Redis 消費(fèi)者會(huì)不會(huì)丟消息?不會(huì),因?yàn)?Stream ( MQ 中間件)會(huì)自動(dòng)使用內(nèi)部隊(duì)列(也稱為 PENDING List)留存消費(fèi)組里每個(gè)消費(fèi)者讀取的消息,但是未被確認(rèn)的消息。消費(fèi)者可以在重啟后,用 XPENDING 命令查看已讀取、但尚未確認(rèn)處理完成的消息。等到消費(fèi)者執(zhí)行完業(yè)務(wù)邏輯后,再發(fā)送消費(fèi)確認(rèn) XACK 命令,也能保證消息的不丟失。
Redis 消息中間件會(huì)不會(huì)丟消息?會(huì),Redis 在以下 2 個(gè)場(chǎng)景下,都會(huì)導(dǎo)致數(shù)據(jù)丟失:
AOF 持久化配置為每秒寫盤,但這個(gè)寫盤過(guò)程是異步的,Redis 宕機(jī)時(shí)會(huì)存在數(shù)據(jù)丟失的可能;
主從復(fù)制也是異步的,主從切換時(shí),也存在丟失數(shù)據(jù)的可能 (opens new window)。
可以看到,Redis 在隊(duì)列中間件環(huán)節(jié)無(wú)法保證消息不丟。像 RabbitMQ 或 Kafka 這類專業(yè)的隊(duì)列中間件,在使用時(shí)是部署一個(gè)集群,生產(chǎn)者在發(fā)布消息時(shí),隊(duì)列中間件通常會(huì)寫「多個(gè)節(jié)點(diǎn)」,也就是有多個(gè)副本,這樣一來(lái),即便其中一個(gè)節(jié)點(diǎn)掛了,也能保證集群的數(shù)據(jù)不丟失。
2、Redis Stream 消息可堆積嗎?
Redis 的數(shù)據(jù)都存儲(chǔ)在內(nèi)存中,這就意味著一旦發(fā)生消息積壓,則會(huì)導(dǎo)致 Redis 的內(nèi)存持續(xù)增長(zhǎng),如果超過(guò)機(jī)器內(nèi)存上限,就會(huì)面臨被 OOM 的風(fēng)險(xiǎn)。
所以 Redis 的 Stream 提供了可以指定隊(duì)列最大長(zhǎng)度的功能,就是為了避免這種情況發(fā)生。
當(dāng)指定隊(duì)列最大長(zhǎng)度時(shí),隊(duì)列長(zhǎng)度超過(guò)上限后,舊消息會(huì)被刪除,只保留固定長(zhǎng)度的新消息。這么來(lái)看,Stream 在消息積壓時(shí),如果指定了最大長(zhǎng)度,還是有可能丟失消息的。
但 Kafka、RabbitMQ 專業(yè)的消息隊(duì)列它們的數(shù)據(jù)都是存儲(chǔ)在磁盤上,當(dāng)消息積壓時(shí),無(wú)非就是多占用一些磁盤空間。
因此,把 Redis 當(dāng)作隊(duì)列來(lái)使用時(shí),會(huì)面臨的 2 個(gè)問(wèn)題:
Redis 本身可能會(huì)丟數(shù)據(jù);
面對(duì)消息擠壓,內(nèi)存資源會(huì)緊張;
所以,能不能將 Redis 作為消息隊(duì)列來(lái)使用,關(guān)鍵看你的業(yè)務(wù)場(chǎng)景:
如果你的業(yè)務(wù)場(chǎng)景足夠簡(jiǎn)單,對(duì)于數(shù)據(jù)丟失不敏感,而且消息積壓概率比較小的情況下,把 Redis 當(dāng)作隊(duì)列是完全可以的。
如果你的業(yè)務(wù)有海量消息,消息積壓的概率比較大,并且不能接受數(shù)據(jù)丟失,那么還是用專業(yè)的消息隊(duì)列中間件吧。
補(bǔ)充:Redis 發(fā)布/訂閱機(jī)制為什么不可以作為消息隊(duì)列?
發(fā)布訂閱機(jī)制存在以下缺點(diǎn),都是跟丟失數(shù)據(jù)有關(guān):
發(fā)布/訂閱機(jī)制沒(méi)有基于任何數(shù)據(jù)類型實(shí)現(xiàn),所以不具備「數(shù)據(jù)持久化」的能力,也就是發(fā)布/訂閱機(jī)制的相關(guān)操作,不會(huì)寫入到 RDB 和 AOF 中,當(dāng) Redis 宕機(jī)重啟,發(fā)布/訂閱機(jī)制的數(shù)據(jù)也會(huì)全部丟失。
發(fā)布訂閱模式是 “發(fā)后既忘” 的工作模式,如果有訂閱者離線重連之后不能消費(fèi)之前的歷史消息。
當(dāng)消費(fèi)端有一定的消息積壓時(shí),也就是生產(chǎn)者發(fā)送的消息,消費(fèi)者消費(fèi)不過(guò)來(lái)時(shí),如果超過(guò) 32M 或者是 60s 內(nèi)持續(xù)保持在 8M 以上,消費(fèi)端會(huì)被強(qiáng)行斷開(kāi),這個(gè)參數(shù)是在配置文件中設(shè)置的,默認(rèn)值是 client-output-buffer-limit pubsub 32mb 8mb 60。
所以,發(fā)布/訂閱機(jī)制只適合即使通訊的場(chǎng)景,比如構(gòu)建哨兵集群 (opens new window)的場(chǎng)景采用了發(fā)布/訂閱機(jī)制。
到此,關(guān)于“Redis特殊數(shù)據(jù)類型之stream怎么應(yīng)用”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。