您好,登錄后才能下訂單哦!
這篇文章主要介紹“如何使用Redis的streams”,在日常操作中,相信很多人在如何使用Redis的streams問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”如何使用Redis的streams”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!
在 Redis 4.0 中引入模塊之后,用戶開始考慮他們自己怎么去修復(fù)這些問題。其中一個(gè)用戶 Timothy Downs 通過 IRC 和我說道:
\<forkfork> 我計(jì)劃給這個(gè)模塊增加一個(gè)事務(wù)日志式的數(shù)據(jù)類型 —— 這意味著大量的訂閱者可以在不導(dǎo)致 redis 內(nèi)存激增的情況下做一些像發(fā)布/訂閱那樣的事情\<forkfork> 訂閱者持有他們在消息隊(duì)列中的位置,而不是讓 Redis 必須維護(hù)每個(gè)消費(fèi)者的位置和為每個(gè)訂閱者復(fù)制消息
他的思路啟發(fā)了我。我想了幾天,并且意識到這可能是我們馬上同時(shí)解決上面所有問題的契機(jī)。我需要去重新構(gòu)思 “日志” 的概念是什么。日志是個(gè)基本的編程元素,每個(gè)人都使用過它,因?yàn)樗皇呛唵蔚匾宰芳幽J酱蜷_一個(gè)文件,并以一定的格式寫入數(shù)據(jù)。然而 Redis 數(shù)據(jù)結(jié)構(gòu)必須是抽象的。它們在內(nèi)存中,并且我們使用內(nèi)存并不是因?yàn)槲覀儜校且驗(yàn)槭褂靡恍┲羔槪覀兛梢愿拍罨瘮?shù)據(jù)結(jié)構(gòu)并把它們抽象,以使它們擺脫明確的限制。例如,一般來說日志有幾個(gè)問題:偏移不是邏輯化的,而是真實(shí)的字節(jié)偏移,如果你想要與條目插入的時(shí)間相關(guān)的邏輯偏移應(yīng)該怎么辦?我們有范圍查詢可用。同樣,日志通常很難進(jìn)行垃圾回收:在一個(gè)只能進(jìn)行追加操作的數(shù)據(jù)結(jié)構(gòu)中怎么去刪除舊的元素?好吧,在我們理想的日志中,我們只需要說,我想要數(shù)字***的那個(gè)條目,而舊的元素一個(gè)也不要,等等。
當(dāng)我從 Timothy 的想法中受到啟發(fā),去嘗試著寫一個(gè)規(guī)范的時(shí)候,我使用了 Redis 集群中的 radix 樹去實(shí)現(xiàn),優(yōu)化了它內(nèi)部的某些部分。這為實(shí)現(xiàn)一個(gè)有效利用空間的日志提供了基礎(chǔ),而且仍然有可能在對數(shù)時(shí)間內(nèi)訪問范圍。同時(shí),我開始去讀關(guān)于 Kafka 的流相關(guān)的內(nèi)容以獲得另外的靈感,它也非常適合我的設(shè)計(jì),***借鑒了 Kafka消費(fèi)組的概念,并且再次針對 Redis 進(jìn)行優(yōu)化,以適用于 Redis 在內(nèi)存中使用的情況。然而,該規(guī)范僅停留在紙面上,在一段時(shí)間后我?guī)缀醢阉鼜念^到尾重寫了一遍,以便將我與別人討論的所得到的許多建議一起增加到 Redis 升級中。我希望 Redis 流能成為對于時(shí)間序列有用的特性,而不僅是一個(gè)常見的事件和消息類的應(yīng)用程序。
從 Redis 大會回來后,整個(gè)夏天我都在實(shí)現(xiàn)一個(gè)叫 listpack 的庫。這個(gè)庫是 ziplist.c
的繼任者,那是一個(gè)表示在單個(gè)分配中的字符串元素列表的數(shù)據(jù)結(jié)構(gòu)。它是一個(gè)非常特殊的序列化格式,其特點(diǎn)在于也能夠以逆序(從右到左)解析:以便在各種用例中替代 ziplists。
結(jié)合 radix 樹和 listpacks 的特性,它可以很容易地去構(gòu)建一個(gè)空間高效的日志,并且還是可索引的,這意味著允許通過 ID 和時(shí)間進(jìn)行隨機(jī)訪問。自從這些就緒后,我開始去寫一些代碼以實(shí)現(xiàn)流數(shù)據(jù)結(jié)構(gòu)。我還在完成這個(gè)實(shí)現(xiàn),不管怎樣,現(xiàn)在在 Github 上的 Redis 的 streams 分支里它已經(jīng)可以跑起來了。我并沒有聲稱那個(gè) API 是 100% 的最終版本,但是,這有兩個(gè)有意思的事實(shí):一,在那時(shí)只有消費(fèi)群組是缺失的,加上一些不太重要的操作流的命令,但是,所有的大的方面都已經(jīng)實(shí)現(xiàn)了。二,一旦各個(gè)方面比較穩(wěn)定了之后,我決定大概用兩個(gè)月的時(shí)間將所有的流的特性向后移植到 4.0 分支。這意味著 Redis 用戶想要使用流,不用等待 Redis 4.2 發(fā)布,它們在生產(chǎn)環(huán)境馬上就可用了。這是可能的,因?yàn)樽鳛橐粋€(gè)新的數(shù)據(jù)結(jié)構(gòu),幾乎所有的代碼改變都出現(xiàn)在新的代碼里面。除了阻塞列表操作之外:該代碼被重構(gòu)了,我們對于流和列表阻塞操作共享了相同的代碼,而極大地簡化了 Redis 內(nèi)部實(shí)現(xiàn)。
在某些方面,你可以認(rèn)為流是 Redis 列表的一個(gè)增強(qiáng)版本。流元素不再是一個(gè)單一的字符串,而是一個(gè)字段和值組成的對象。范圍查詢更適用而且更快。在流中,每個(gè)條目都有一個(gè) ID,它是一個(gè)邏輯偏移量。不同的客戶端可以阻塞等待比指定的 ID 更大的元素。Redis 流的一個(gè)基本的命令是 XADD
。是的,所有的 Redis 流命令都是以一個(gè) X
為前綴的。
> XADD mystream * sensor-id 1234 temperature 10.51506871964177.0
這個(gè) XADD
命令將追加指定的條目作為一個(gè)指定的流 —— “mystream” 的新元素。上面示例中的這個(gè)條目有兩個(gè)字段:sensor-id
和 temperature
,每個(gè)條目在同一個(gè)流中可以有不同的字段。使用相同的字段名可以更好地利用內(nèi)存。有意思的是,字段的排序是可以保證順序的。XADD
僅返回插入的條目的 ID,因?yàn)樵诘谌齻€(gè)參數(shù)中是星號(*
),表示由命令自動(dòng)生成 ID。通常這樣做就夠了,但是也可以去強(qiáng)制指定一個(gè) ID,這種情況用于復(fù)制這個(gè)命令到從服務(wù)器和AOF文件。
這個(gè) ID 是由兩部分組成的:一個(gè)毫秒時(shí)間和一個(gè)序列號。1506871964177
是毫秒時(shí)間,它只是一個(gè)毫秒級的 UNIX 時(shí)間戳。圓點(diǎn)(.
)后面的數(shù)字 0
是一個(gè)序號,它是為了區(qū)分相同毫秒數(shù)的條目增加上去的。這兩個(gè)數(shù)字都是 64 位的無符號整數(shù)。這意味著,我們可以在流中增加所有想要的條目,即使是在同一毫秒中。ID 的毫秒部分使用 Redis 服務(wù)器的當(dāng)前本地時(shí)間生成的 ID 和流中的***一個(gè)條目 ID 兩者間的***的一個(gè)。因此,舉例來說,即使是計(jì)算機(jī)時(shí)間回跳,這個(gè) ID 仍然是增加的。在某些情況下,你可以認(rèn)為流條目的 ID 是完整的 128 位數(shù)字。然而,事實(shí)上它們與被添加到的實(shí)例的本地時(shí)間有關(guān),這意味著我們可以在毫秒級的精度的范圍隨意查詢。
正如你想的那樣,快速添加兩個(gè)條目后,結(jié)果是僅一個(gè)序號遞增了。我們可以用一個(gè) MULTI
/EXEC
塊來簡單模擬“快速插入”:
> MULTIOK> XADD mystream * foo 10QUEUED> XADD mystream * bar 20QUEUED> EXEC1) 1506872463535.02) 1506872463535.1
在上面的示例中,也展示了無需指定任何初始模式的情況下,對不同的條目使用不同的字段。會發(fā)生什么呢?就像前面提到的一樣,只有每個(gè)塊(它通常包含 50-150 個(gè)消息內(nèi)容)的***個(gè)消息被使用。并且,相同字段的連續(xù)條目都使用了一個(gè)標(biāo)志進(jìn)行了壓縮,這個(gè)標(biāo)志表示與“它們與這個(gè)塊中的***個(gè)條目的字段相同”。因此,使用相同字段的連續(xù)消息可以節(jié)省許多內(nèi)存,即使是字段集隨著時(shí)間發(fā)生緩慢變化的情況下也很節(jié)省內(nèi)存。
為了從流中檢索數(shù)據(jù),這里有兩種方法:范圍查詢,它是通過 XRANGE
命令實(shí)現(xiàn)的;流播,它是通過 XREAD
命令實(shí)現(xiàn)的。XRANGE
命令僅取得包括從開始到停止范圍內(nèi)的全部條目。因此,舉例來說,如果我知道它的 ID,我可以使用如下的命名取得單個(gè)條目:
> XRANGE mystream 1506871964177.0 1506871964177.01) 1) 1506871964177.0 2) 1) "sensor-id" 2) "1234" 3) "temperature" 4) "10.5"
不管怎樣,你都可以使用指定的開始符號 -
和停止符號 +
表示最小和***的 ID。為了限制返回條目的數(shù)量,也可以使用 COUNT
選項(xiàng)。下面是一個(gè)更復(fù)雜的 XRANGE
示例:
> XRANGE mystream - + COUNT 21) 1) 1506871964177.0 2) 1) "sensor-id" 2) "1234" 3) "temperature" 4) "10.5"2) 1) 1506872463535.0 2) 1) "foo" 2) "10"
這里我們講的是 ID 的范圍,然后,為了取得在一個(gè)給定時(shí)間范圍內(nèi)的特定范圍的元素,你可以使用 XRANGE
,因?yàn)?ID 的“序號” 部分可以省略。因此,你可以只指定“毫秒”時(shí)間即可,下面的命令的意思是:“從 UNIX 時(shí)間 1506872463 開始給我 10 個(gè)條目”:
127.0.0.1:6379> XRANGE mystream 1506872463000 + COUNT 101) 1) 1506872463535.0 2) 1) "foo" 2) "10"2) 1) 1506872463535.1 2) 1) "bar" 2) "20"
關(guān)于 XRANGE
需要注意的最重要的事情是,假設(shè)我們在回復(fù)中收到 ID,隨后連續(xù)的 ID 只是增加了序號部分,所以可以使用 XRANGE
遍歷整個(gè)流,接收每個(gè)調(diào)用的指定個(gè)數(shù)的元素。Redis 中的*SCAN
系列命令允許迭代 Redis 數(shù)據(jù)結(jié)構(gòu),盡管事實(shí)上它們不是為迭代設(shè)計(jì)的,但這樣可以避免再犯相同的錯(cuò)誤。
當(dāng)我們想通過 ID 或時(shí)間去訪問流中的一個(gè)范圍或者是通過 ID 去獲取單個(gè)元素時(shí),使用 XRANGE
是非常***的。然而,在使用流的案例中,當(dāng)數(shù)據(jù)到達(dá)時(shí),它必須由不同的客戶端來消費(fèi)時(shí),這就不是一個(gè)很好的解決方案,這需要某種形式的匯聚池。(對于 某些 應(yīng)用程序來說,這可能是個(gè)好主意,因?yàn)樗鼈儍H是偶爾連接查詢的)
XREAD
命令是為讀取設(shè)計(jì)的,在同一個(gè)時(shí)間,從多個(gè)流中僅指定我們從該流中得到的***條目的 ID。此外,如果沒有數(shù)據(jù)可用,我們可以要求阻塞,當(dāng)數(shù)據(jù)到達(dá)時(shí),就解除阻塞。類似于阻塞列表操作產(chǎn)生的效果,但是這里并沒有消費(fèi)從流中得到的數(shù)據(jù),并且多個(gè)客戶端可以同時(shí)訪問同一份數(shù)據(jù)。
這里有一個(gè)典型的 XREAD
調(diào)用示例:
> XREAD BLOCK 5000 STREAMS mystream otherstream $ $
它的意思是:從 mystream
和 otherstream
取得數(shù)據(jù)。如果沒有數(shù)據(jù)可用,阻塞客戶端 5000 毫秒。在 STREAMS
選項(xiàng)之后指定我們想要監(jiān)聽的關(guān)鍵字,***的是指定想要監(jiān)聽的 ID,指定的 ID 為 $
的意思是:假設(shè)我現(xiàn)在需要流中的所有元素,因此,只需要從下一個(gè)到達(dá)的元素開始給我。
如果我從另一個(gè)客戶端發(fā)送這樣的命令:
> XADD otherstream * message “Hi There”
在 XREAD
側(cè)會出現(xiàn)什么情況呢?
1) 1) "otherstream" 2) 1) 1) 1506935385635.0 2) 1) "message" 2) "Hi There"
與收到的數(shù)據(jù)一起,我們也得到了數(shù)據(jù)的關(guān)鍵字。在下次調(diào)用中,我們將使用接收到的***消息的 ID:
> XREAD BLOCK 5000 STREAMS mystream otherstream $ 1506935385635.0
依次類推。然而需要注意的是使用方式,客戶端有可能在一個(gè)非常大的延遲之后再次連接(因?yàn)樗幚硐⑿枰獣r(shí)間,或者其它什么原因)。在這種情況下,期間會有很多消息堆積,為了確??蛻舳瞬槐幌⒀蜎],以及服務(wù)器不會因?yàn)榻o單個(gè)客戶端提供大量消息而浪費(fèi)太多的時(shí)間,使用 XREAD
的 COUNT
選項(xiàng)是非常明智的。
目前看起來還不錯(cuò)……然而,有些時(shí)候,流需要?jiǎng)h除一些舊的消息。幸運(yùn)的是,這可以使用 XADD
命令的 MAXLEN
選項(xiàng)去做:
> XADD mystream MAXLEN 1000000 * field1 value1 field2 value2
它是基本意思是,如果在流中添加新元素后發(fā)現(xiàn)消息數(shù)量超過了 1000000
個(gè),那么就刪除舊的消息,以便于元素總量重新回到 1000000
以內(nèi)。它很像是在列表中使用的 RPUSH
+ LTRIM
,但是,這次我們是使用了一個(gè)內(nèi)置機(jī)制去完成的。然而,需要注意的是,上面的意思是每次我們增加一個(gè)新的消息時(shí),我們還需要另外的工作去從流中刪除舊的消息。這將消耗一些 CPU 資源,所以在計(jì)算 MAXLEN
之前,盡可能使用 ~
符號,以表明我們不要求非常 精確 的 1000000 個(gè)消息,就是稍微多一些也不是大問題:
> XADD mystream MAXLEN ~ 1000000 * foo bar
這種方式的 XADD 僅當(dāng)它可以刪除整個(gè)節(jié)點(diǎn)的時(shí)候才會刪除消息。相比普通的 XADD
,這種方式幾乎可以自由地對流進(jìn)行封頂。
這是***個(gè) Redis 中尚未實(shí)現(xiàn)而在開發(fā)中的特性。靈感也是來自 Kafka,盡管在這里是以不同的方式實(shí)現(xiàn)的。重點(diǎn)是使用了 XREAD
,客戶端也可以增加一個(gè) GROUP <name>
選項(xiàng)。相同組的所有客戶端將自動(dòng)得到 不同的 消息。當(dāng)然,同一個(gè)流可以被多個(gè)組讀取。在這種情況下,所有的組將收到流中到達(dá)的消息的相同副本。但是,在每個(gè)組內(nèi),消息是不會重復(fù)的。
當(dāng)指定組時(shí),能夠指定一個(gè) RETRY <milliseconds>
選項(xiàng)去擴(kuò)展組:在這種情況下,如果消息沒有通過 XACK
進(jìn)行確認(rèn),它將在指定的毫秒數(shù)后進(jìn)行再次投遞。這將為消息投遞提供更佳的可靠性,這種情況下,客戶端沒有私有的方法將消息標(biāo)記為已處理。這一部分也正在開發(fā)中。
因?yàn)橛脕斫?Redis 流的設(shè)計(jì),內(nèi)存使用率是非常低的。這取決于它們的字段、值的數(shù)量和長度,對于簡單的消息,每使用 100MB 內(nèi)存可以有幾百萬條消息。此外,該格式設(shè)想為需要極少的序列化:listpack 塊以 radix 樹節(jié)點(diǎn)方式存儲,在磁盤上和內(nèi)存中都以相同方式表示的,因此它們可以很輕松地存儲和讀取。例如,Redis 可以在 0.3 秒內(nèi)從 RDB 文件中讀取 500 萬個(gè)條目。這使流的復(fù)制和持久存儲非常高效。
我還計(jì)劃允許從條目中間進(jìn)行部分刪除。現(xiàn)在僅實(shí)現(xiàn)了一部分,策略是在條目在標(biāo)記中標(biāo)識條目為已刪除,并且,當(dāng)已刪除條目占全部條目的比例達(dá)到指定值時(shí),這個(gè)塊將被回收重寫,如果需要,它將被連到相鄰的另一個(gè)塊上,以避免碎片化。
到此,關(guān)于“如何使用Redis的streams”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。