溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何使用Redis的streams

發(fā)布時(shí)間:2022-01-15 17:04:02 來源:億速云 閱讀:135 作者:iii 欄目:數(shù)據(jù)庫

這篇文章主要介紹“如何使用Redis的streams”,在日常操作中,相信很多人在如何使用Redis的streams問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”如何使用Redis的streams”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

起源

在 Redis 4.0 中引入模塊之后,用戶開始考慮他們自己怎么去修復(fù)這些問題。其中一個(gè)用戶 Timothy Downs 通過 IRC 和我說道:

\<forkfork> 我計(jì)劃給這個(gè)模塊增加一個(gè)事務(wù)日志式的數(shù)據(jù)類型 &mdash;&mdash; 這意味著大量的訂閱者可以在不導(dǎo)致 redis 內(nèi)存激增的情況下做一些像發(fā)布/訂閱那樣的事情\<forkfork> 訂閱者持有他們在消息隊(duì)列中的位置,而不是讓 Redis 必須維護(hù)每個(gè)消費(fèi)者的位置和為每個(gè)訂閱者復(fù)制消息

他的思路啟發(fā)了我。我想了幾天,并且意識到這可能是我們馬上同時(shí)解決上面所有問題的契機(jī)。我需要去重新構(gòu)思 “日志”  的概念是什么。日志是個(gè)基本的編程元素,每個(gè)人都使用過它,因?yàn)樗皇呛唵蔚匾宰芳幽J酱蜷_一個(gè)文件,并以一定的格式寫入數(shù)據(jù)。然而 Redis  數(shù)據(jù)結(jié)構(gòu)必須是抽象的。它們在內(nèi)存中,并且我們使用內(nèi)存并不是因?yàn)槲覀儜校且驗(yàn)槭褂靡恍┲羔槪覀兛梢愿拍罨瘮?shù)據(jù)結(jié)構(gòu)并把它們抽象,以使它們擺脫明確的限制。例如,一般來說日志有幾個(gè)問題:偏移不是邏輯化的,而是真實(shí)的字節(jié)偏移,如果你想要與條目插入的時(shí)間相關(guān)的邏輯偏移應(yīng)該怎么辦?我們有范圍查詢可用。同樣,日志通常很難進(jìn)行垃圾回收:在一個(gè)只能進(jìn)行追加操作的數(shù)據(jù)結(jié)構(gòu)中怎么去刪除舊的元素?好吧,在我們理想的日志中,我們只需要說,我想要數(shù)字***的那個(gè)條目,而舊的元素一個(gè)也不要,等等。

當(dāng)我從 Timothy 的想法中受到啟發(fā),去嘗試著寫一個(gè)規(guī)范的時(shí)候,我使用了 Redis 集群中的 radix 樹去實(shí)現(xiàn),優(yōu)化了它內(nèi)部的某些部分。這為實(shí)現(xiàn)一個(gè)有效利用空間的日志提供了基礎(chǔ),而且仍然有可能在對數(shù)時(shí)間logarithmic time內(nèi)訪問范圍。同時(shí),我開始去讀關(guān)于 Kafka 的流相關(guān)的內(nèi)容以獲得另外的靈感,它也非常適合我的設(shè)計(jì),***借鑒了 Kafka消費(fèi)組consumer groups的概念,并且再次針對  Redis 進(jìn)行優(yōu)化,以適用于 Redis  在內(nèi)存中使用的情況。然而,該規(guī)范僅停留在紙面上,在一段時(shí)間后我?guī)缀醢阉鼜念^到尾重寫了一遍,以便將我與別人討論的所得到的許多建議一起增加到  Redis 升級中。我希望 Redis 流能成為對于時(shí)間序列有用的特性,而不僅是一個(gè)常見的事件和消息類的應(yīng)用程序。

讓我們寫一些代碼吧

從 Redis 大會回來后,整個(gè)夏天我都在實(shí)現(xiàn)一個(gè)叫 listpack 的庫。這個(gè)庫是 ziplist.c 的繼任者,那是一個(gè)表示在單個(gè)分配中的字符串元素列表的數(shù)據(jù)結(jié)構(gòu)。它是一個(gè)非常特殊的序列化格式,其特點(diǎn)在于也能夠以逆序(從右到左)解析:以便在各種用例中替代 ziplists。

結(jié)合 radix 樹和 listpacks 的特性,它可以很容易地去構(gòu)建一個(gè)空間高效的日志,并且還是可索引的,這意味著允許通過 ID  和時(shí)間進(jìn)行隨機(jī)訪問。自從這些就緒后,我開始去寫一些代碼以實(shí)現(xiàn)流數(shù)據(jù)結(jié)構(gòu)。我還在完成這個(gè)實(shí)現(xiàn),不管怎樣,現(xiàn)在在 Github 上的 Redis 的  streams 分支里它已經(jīng)可以跑起來了。我并沒有聲稱那個(gè) API 是 100%  的最終版本,但是,這有兩個(gè)有意思的事實(shí):一,在那時(shí)只有消費(fèi)群組是缺失的,加上一些不太重要的操作流的命令,但是,所有的大的方面都已經(jīng)實(shí)現(xiàn)了。二,一旦各個(gè)方面比較穩(wěn)定了之后,我決定大概用兩個(gè)月的時(shí)間將所有的流的特性向后移植backport到  4.0 分支。這意味著 Redis 用戶想要使用流,不用等待 Redis 4.2  發(fā)布,它們在生產(chǎn)環(huán)境馬上就可用了。這是可能的,因?yàn)樽鳛橐粋€(gè)新的數(shù)據(jù)結(jié)構(gòu),幾乎所有的代碼改變都出現(xiàn)在新的代碼里面。除了阻塞列表操作之外:該代碼被重構(gòu)了,我們對于流和列表阻塞操作共享了相同的代碼,而極大地簡化了  Redis 內(nèi)部實(shí)現(xiàn)。

教程:歡迎使用 Redis 的 streams

在某些方面,你可以認(rèn)為流是 Redis 列表的一個(gè)增強(qiáng)版本。流元素不再是一個(gè)單一的字符串,而是一個(gè)字段fieldvalue組成的對象。范圍查詢更適用而且更快。在流中,每個(gè)條目都有一個(gè) ID,它是一個(gè)邏輯偏移量。不同的客戶端可以阻塞等待blocking-wait比指定的 ID 更大的元素。Redis 流的一個(gè)基本的命令是 XADD。是的,所有的 Redis 流命令都是以一個(gè) X 為前綴的。

> XADD mystream * sensor-id 1234 temperature 10.51506871964177.0

這個(gè) XADD 命令將追加指定的條目作為一個(gè)指定的流 &mdash;&mdash; “mystream” 的新元素。上面示例中的這個(gè)條目有兩個(gè)字段:sensor-idtemperature,每個(gè)條目在同一個(gè)流中可以有不同的字段。使用相同的字段名可以更好地利用內(nèi)存。有意思的是,字段的排序是可以保證順序的。XADD 僅返回插入的條目的 ID,因?yàn)樵诘谌齻€(gè)參數(shù)中是星號(*),表示由命令自動(dòng)生成 ID。通常這樣做就夠了,但是也可以去強(qiáng)制指定一個(gè) ID,這種情況用于復(fù)制這個(gè)命令到服務(wù)器slave serverAOFappend-only file文件。

這個(gè) ID 是由兩部分組成的:一個(gè)毫秒時(shí)間和一個(gè)序列號。1506871964177 是毫秒時(shí)間,它只是一個(gè)毫秒級的 UNIX 時(shí)間戳。圓點(diǎn)(.)后面的數(shù)字 0  是一個(gè)序號,它是為了區(qū)分相同毫秒數(shù)的條目增加上去的。這兩個(gè)數(shù)字都是 64  位的無符號整數(shù)。這意味著,我們可以在流中增加所有想要的條目,即使是在同一毫秒中。ID 的毫秒部分使用 Redis 服務(wù)器的當(dāng)前本地時(shí)間生成的  ID 和流中的***一個(gè)條目 ID 兩者間的***的一個(gè)。因此,舉例來說,即使是計(jì)算機(jī)時(shí)間回跳,這個(gè) ID  仍然是增加的。在某些情況下,你可以認(rèn)為流條目的 ID 是完整的 128  位數(shù)字。然而,事實(shí)上它們與被添加到的實(shí)例的本地時(shí)間有關(guān),這意味著我們可以在毫秒級的精度的范圍隨意查詢。

正如你想的那樣,快速添加兩個(gè)條目后,結(jié)果是僅一個(gè)序號遞增了。我們可以用一個(gè) MULTI/EXEC 塊來簡單模擬“快速插入”:

> MULTIOK> XADD mystream * foo 10QUEUED> XADD mystream * bar 20QUEUED> EXEC1) 1506872463535.02) 1506872463535.1

在上面的示例中,也展示了無需指定任何初始模式schema的情況下,對不同的條目使用不同的字段。會發(fā)生什么呢?就像前面提到的一樣,只有每個(gè)塊(它通常包含  50-150  個(gè)消息內(nèi)容)的***個(gè)消息被使用。并且,相同字段的連續(xù)條目都使用了一個(gè)標(biāo)志進(jìn)行了壓縮,這個(gè)標(biāo)志表示與“它們與這個(gè)塊中的***個(gè)條目的字段相同”。因此,使用相同字段的連續(xù)消息可以節(jié)省許多內(nèi)存,即使是字段集隨著時(shí)間發(fā)生緩慢變化的情況下也很節(jié)省內(nèi)存。

為了從流中檢索數(shù)據(jù),這里有兩種方法:范圍查詢,它是通過 XRANGE 命令實(shí)現(xiàn)的;流播streaming,它是通過 XREAD 命令實(shí)現(xiàn)的。XRANGE 命令僅取得包括從開始到停止范圍內(nèi)的全部條目。因此,舉例來說,如果我知道它的 ID,我可以使用如下的命名取得單個(gè)條目:

> XRANGE mystream 1506871964177.0 1506871964177.01) 1) 1506871964177.0   2) 1) "sensor-id"      2) "1234"      3) "temperature"      4) "10.5"

不管怎樣,你都可以使用指定的開始符號 - 和停止符號 + 表示最小和***的 ID。為了限制返回條目的數(shù)量,也可以使用 COUNT 選項(xiàng)。下面是一個(gè)更復(fù)雜的 XRANGE 示例:

> XRANGE mystream - + COUNT 21) 1) 1506871964177.0   2) 1) "sensor-id"      2) "1234"      3) "temperature"      4) "10.5"2) 1) 1506872463535.0   2) 1) "foo"      2) "10"

這里我們講的是 ID 的范圍,然后,為了取得在一個(gè)給定時(shí)間范圍內(nèi)的特定范圍的元素,你可以使用 XRANGE,因?yàn)?ID 的“序號” 部分可以省略。因此,你可以只指定“毫秒”時(shí)間即可,下面的命令的意思是:“從 UNIX 時(shí)間 1506872463 開始給我 10 個(gè)條目”:

127.0.0.1:6379> XRANGE mystream 1506872463000 + COUNT 101) 1) 1506872463535.0   2) 1) "foo"      2) "10"2) 1) 1506872463535.1   2) 1) "bar"      2) "20"

關(guān)于 XRANGE 需要注意的最重要的事情是,假設(shè)我們在回復(fù)中收到 ID,隨后連續(xù)的 ID 只是增加了序號部分,所以可以使用 XRANGE 遍歷整個(gè)流,接收每個(gè)調(diào)用的指定個(gè)數(shù)的元素。Redis 中的*SCAN 系列命令允許迭代 Redis 數(shù)據(jù)結(jié)構(gòu),盡管事實(shí)上它們不是為迭代設(shè)計(jì)的,但這樣可以避免再犯相同的錯(cuò)誤。

使用 XREAD 處理流播:阻塞新的數(shù)據(jù)

當(dāng)我們想通過 ID 或時(shí)間去訪問流中的一個(gè)范圍或者是通過 ID 去獲取單個(gè)元素時(shí),使用 XRANGE 是非常***的。然而,在使用流的案例中,當(dāng)數(shù)據(jù)到達(dá)時(shí),它必須由不同的客戶端來消費(fèi)時(shí),這就不是一個(gè)很好的解決方案,這需要某種形式的匯聚池pooling。(對于 某些 應(yīng)用程序來說,這可能是個(gè)好主意,因?yàn)樗鼈儍H是偶爾連接查詢的)

XREAD 命令是為讀取設(shè)計(jì)的,在同一個(gè)時(shí)間,從多個(gè)流中僅指定我們從該流中得到的***條目的  ID。此外,如果沒有數(shù)據(jù)可用,我們可以要求阻塞,當(dāng)數(shù)據(jù)到達(dá)時(shí),就解除阻塞。類似于阻塞列表操作產(chǎn)生的效果,但是這里并沒有消費(fèi)從流中得到的數(shù)據(jù),并且多個(gè)客戶端可以同時(shí)訪問同一份數(shù)據(jù)。

這里有一個(gè)典型的 XREAD 調(diào)用示例:

> XREAD BLOCK 5000 STREAMS mystream otherstream $ $

它的意思是:從 mystreamotherstream 取得數(shù)據(jù)。如果沒有數(shù)據(jù)可用,阻塞客戶端 5000 毫秒。在 STREAMS 選項(xiàng)之后指定我們想要監(jiān)聽的關(guān)鍵字,***的是指定想要監(jiān)聽的 ID,指定的 ID 為 $ 的意思是:假設(shè)我現(xiàn)在需要流中的所有元素,因此,只需要從下一個(gè)到達(dá)的元素開始給我。

如果我從另一個(gè)客戶端發(fā)送這樣的命令:

> XADD otherstream * message “Hi There”

XREAD 側(cè)會出現(xiàn)什么情況呢?

1) 1) "otherstream"   2) 1) 1) 1506935385635.0         2) 1) "message"            2) "Hi There"

與收到的數(shù)據(jù)一起,我們也得到了數(shù)據(jù)的關(guān)鍵字。在下次調(diào)用中,我們將使用接收到的***消息的 ID:

> XREAD BLOCK 5000 STREAMS mystream otherstream $ 1506935385635.0

依次類推。然而需要注意的是使用方式,客戶端有可能在一個(gè)非常大的延遲之后再次連接(因?yàn)樗幚硐⑿枰獣r(shí)間,或者其它什么原因)。在這種情況下,期間會有很多消息堆積,為了確??蛻舳瞬槐幌⒀蜎],以及服務(wù)器不會因?yàn)榻o單個(gè)客戶端提供大量消息而浪費(fèi)太多的時(shí)間,使用 XREADCOUNT 選項(xiàng)是非常明智的。

流封頂

目前看起來還不錯(cuò)&hellip;&hellip;然而,有些時(shí)候,流需要?jiǎng)h除一些舊的消息。幸運(yùn)的是,這可以使用 XADD 命令的 MAXLEN 選項(xiàng)去做:

> XADD mystream MAXLEN 1000000 * field1 value1 field2 value2

它是基本意思是,如果在流中添加新元素后發(fā)現(xiàn)消息數(shù)量超過了 1000000 個(gè),那么就刪除舊的消息,以便于元素總量重新回到 1000000 以內(nèi)。它很像是在列表中使用的 RPUSH + LTRIM,但是,這次我們是使用了一個(gè)內(nèi)置機(jī)制去完成的。然而,需要注意的是,上面的意思是每次我們增加一個(gè)新的消息時(shí),我們還需要另外的工作去從流中刪除舊的消息。這將消耗一些 CPU 資源,所以在計(jì)算 MAXLEN 之前,盡可能使用 ~ 符號,以表明我們不要求非常 精確 的 1000000 個(gè)消息,就是稍微多一些也不是大問題:

> XADD mystream MAXLEN ~ 1000000 * foo bar

這種方式的 XADD 僅當(dāng)它可以刪除整個(gè)節(jié)點(diǎn)的時(shí)候才會刪除消息。相比普通的 XADD,這種方式幾乎可以自由地對流進(jìn)行封頂。

消費(fèi)組(開發(fā)中)

這是***個(gè) Redis 中尚未實(shí)現(xiàn)而在開發(fā)中的特性。靈感也是來自 Kafka,盡管在這里是以不同的方式實(shí)現(xiàn)的。重點(diǎn)是使用了 XREAD,客戶端也可以增加一個(gè) GROUP <name> 選項(xiàng)。相同組的所有客戶端將自動(dòng)得到 不同的 消息。當(dāng)然,同一個(gè)流可以被多個(gè)組讀取。在這種情況下,所有的組將收到流中到達(dá)的消息的相同副本。但是,在每個(gè)組內(nèi),消息是不會重復(fù)的。

當(dāng)指定組時(shí),能夠指定一個(gè) RETRY <milliseconds> 選項(xiàng)去擴(kuò)展組:在這種情況下,如果消息沒有通過 XACK 進(jìn)行確認(rèn),它將在指定的毫秒數(shù)后進(jìn)行再次投遞。這將為消息投遞提供更佳的可靠性,這種情況下,客戶端沒有私有的方法將消息標(biāo)記為已處理。這一部分也正在開發(fā)中。

內(nèi)存使用和節(jié)省加載時(shí)間

因?yàn)橛脕斫?Redis 流的設(shè)計(jì),內(nèi)存使用率是非常低的。這取決于它們的字段、值的數(shù)量和長度,對于簡單的消息,每使用 100MB  內(nèi)存可以有幾百萬條消息。此外,該格式設(shè)想為需要極少的序列化:listpack 塊以 radix  樹節(jié)點(diǎn)方式存儲,在磁盤上和內(nèi)存中都以相同方式表示的,因此它們可以很輕松地存儲和讀取。例如,Redis 可以在 0.3 秒內(nèi)從 RDB 文件中讀取  500 萬個(gè)條目。這使流的復(fù)制和持久存儲非常高效。

我還計(jì)劃允許從條目中間進(jìn)行部分刪除。現(xiàn)在僅實(shí)現(xiàn)了一部分,策略是在條目在標(biāo)記中標(biāo)識條目為已刪除,并且,當(dāng)已刪除條目占全部條目的比例達(dá)到指定值時(shí),這個(gè)塊將被回收重寫,如果需要,它將被連到相鄰的另一個(gè)塊上,以避免碎片化。

到此,關(guān)于“如何使用Redis的streams”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI