您好,登錄后才能下訂單哦!
本篇內(nèi)容主要講解“Node中的Stream是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Node中的Stream是什么”吧!
stream 是一個抽象的數(shù)據(jù)接口,它繼承了 EventEmitter,它能夠發(fā)送/接受數(shù)據(jù),本質(zhì)就是讓數(shù)據(jù)流動起來,如下圖:
流不是 Node 中獨有的概念,是操作系統(tǒng)最基本的操作方式,在 Linux 中 | 就是 Stream,只是 Node 層面對其做了封裝,提供了對應的 API
首先使用下面的代碼創(chuàng)建一個文件,大概在 400MB 左右
當我們使用 readFile 去讀取的時候,如下代碼
正常啟動服務時,占用 10MB 左右的內(nèi)存
使用curl http://127.0.0.1:8000
發(fā)起請求時,內(nèi)存變?yōu)榱?420MB 左右,和我們創(chuàng)建的文件大小差不多
改為使用使用 stream 的寫法,代碼如下
再次發(fā)起請求時,發(fā)現(xiàn)內(nèi)存只占用了 35MB 左右,相比 readFile 大幅減少
如果我們不采用流的模式,等待大文件加載完成在操作,會有如下的問題:
內(nèi)存暫用過多,導致系統(tǒng)崩潰
CPU 運算速度有限制,且服務于多個程序,大文件加載過大且時間久
總結來說就是,一次性讀取大文件,內(nèi)存和網(wǎng)絡都吃不消
我們讀取文件的時候,可以采用讀取完成之后在輸出數(shù)據(jù)
上述說到 stream 繼承了 EventEmitter 可以是實現(xiàn)監(jiān)聽數(shù)據(jù)。首先將讀取數(shù)據(jù)改為流式讀取,使用 on("data", ()?{})
接收數(shù)據(jù),最后通過 on("end", ()?{})
最后的結果
有數(shù)據(jù)傳遞過來的時候就會觸發(fā) data 事件,接收這段數(shù)據(jù)做處理,最后等待所有的數(shù)據(jù)全部傳遞完成之后觸發(fā) end 事件。
數(shù)據(jù)是從一個地方流向另一個地方,先看看數(shù)據(jù)的來源。
http 請求,請求接口來的數(shù)據(jù)
console 控制臺,標準輸入 stdin
file 文件,讀取文件內(nèi)容,例如上面的例子
在 source 和 dest 中有一個連接的管道 pipe,基本語法為 source.pipe(dest)
,source 和 dest 通過 pipe 連接,讓數(shù)據(jù)從 source 流向 dest
我們不需要向上面的代碼那樣手動監(jiān)聽 data/end 事件.
pipe 使用時有嚴格的要求,source 必須是一個可讀流,dest 必須是一個可寫流
??? 流動的數(shù)據(jù)到底是一個什么東西?代碼中的 chunk 是什么?
stream 常見的三種輸出方式
console 控制臺,標準輸出 stdout
http 請求,接口請求中的 response
file 文件,寫入文件
可讀流是對提供數(shù)據(jù)的源頭(source)的抽象
所有的 Readable 都實現(xiàn)了 stream.Readable 類定義的接口
? 讀取文件流創(chuàng)建
fs.createReadStream 創(chuàng)建一個 Readable 對象
可讀流有兩種模式,流動模式(flowing mode)和暫停模式(pause mode),這個決定了 chunk 數(shù)據(jù)的流動方式:自動流動和手工流動
在 ReadableStream 中有一個 _readableState 屬性,在其中有一個 flowing 的一個屬性來判斷流的模式,他有三種狀態(tài)值:
ture:表示為流動模式
false:表示為暫停模式
null:初始狀態(tài)
可以使用熱水器模型來模擬數(shù)據(jù)的流動。熱水器水箱(buffer 緩存區(qū))存儲著熱水(需要的數(shù)據(jù)),當我們打開水龍頭的時候,熱水就會從水箱中不斷流出來,并且自來水也會不斷的流入水箱,這就是流動模式。當我們關閉水龍頭時,水箱會暫停進水,水龍頭則會暫停出水,這就是暫停模式。
數(shù)據(jù)自動地從底層讀取,形成流動現(xiàn)象,并通過事件提供給應用程序。
監(jiān)聽 data 事件即可進入該模式
當 data 事件被添加后,可寫流中有數(shù)據(jù)后會將數(shù)據(jù)推到該事件回調(diào)函數(shù)中,需要自己去消費數(shù)據(jù)塊,如果不處理則該數(shù)據(jù)會丟失
調(diào)用 stream.pipe 方法將數(shù)據(jù)發(fā)送到 Writeable
調(diào)用 stream.resume 方法
數(shù)據(jù)會堆積在內(nèi)部緩沖器中,必須顯式調(diào)用 stream.read() 讀取數(shù)據(jù)塊
監(jiān)聽 readable 事件 可寫流在數(shù)據(jù)準備好后會觸發(fā)該事件回調(diào),此時需要在回調(diào)函數(shù)中使用 stream.read() 來主動消費數(shù)據(jù)。readable 事件表明流有新的動態(tài):要么有新的數(shù)據(jù),要么流已經(jīng)讀取所有數(shù)據(jù)
可讀流在創(chuàng)建完成之后處于初始狀態(tài) //TODO:和網(wǎng)上的分享不一致
暫停模式切換到流動模式
- 監(jiān)聽 data 事件
- 調(diào)用 stream.resume 方法
- 調(diào)用 stream.pipe 方法將數(shù)據(jù)發(fā)送到 Writable
流動模式切換到暫停模式
- 移除 data 事件
- 調(diào)用 stream.pause 方法
- 調(diào)用 stream.unpipe 移除管道目標
創(chuàng)建可讀流的時候,需要繼承 Readable 對象,并且實現(xiàn) _read 方法
創(chuàng)建一個自定義可讀流
當我們調(diào)用 read 方法時,整體的流程如下:
doRead
流中維護了一個緩存,當調(diào)用 read 方法的時候來判斷是否需要向底層請求數(shù)據(jù)
當緩存區(qū)長度為0或者小于 highWaterMark 這個值得時候就會調(diào)用 _read 去底層獲取數(shù)據(jù) 源碼鏈接
可寫流 是對數(shù)據(jù)寫入目的地的一種抽象,是用來消費上游流過來的數(shù)據(jù),通過可寫流把數(shù)據(jù)寫入設備,常見的寫入流就是本地磁盤的寫入
通過 write 寫入數(shù)據(jù)
通過 end 寫數(shù)據(jù)并且關閉流,end = write + close
當寫入數(shù)據(jù)達到 highWaterMark 的大小時,會觸發(fā) drain 事件
調(diào)用 ws.write(chunk) 返回 false,表示當前緩沖區(qū)數(shù)據(jù)大于或等于 highWaterMark 的值,就會觸發(fā) drain 事件。其實是起到一個警示作用,我們依舊可以寫入數(shù)據(jù),只是未處理的數(shù)據(jù)會一直積壓在可寫流的內(nèi)部緩沖區(qū)中,直到積壓沾滿 Node.js 緩沖區(qū)后,才會被強行中斷
所有的 Writeable 都實現(xiàn)了 stream.Writeable 類定義的接口
只需要實現(xiàn) _write 方法就能夠?qū)?shù)據(jù)寫入底層
通過調(diào)用調(diào)用 writable.write 方法將數(shù)據(jù)寫入流中,會調(diào)用 _write 方法將數(shù)據(jù)寫入底層
當 _write 數(shù)據(jù)成功后,需要調(diào)用 next 方法去處理下一個數(shù)據(jù)
必須調(diào)用 writable.end(data) 來結束可寫流,data 是可選的。此后,不能再調(diào)用 write 新增數(shù)據(jù),否則會報錯
在 end 方法調(diào)用后,當所有底層的寫操作均完成時,會觸發(fā) finish 事件
雙工流,既可讀,也可寫。實際上繼承了 Readable 和 Writable 的一種流,那它既可以當做可讀流來用又可以當做可寫流來用
自定義的雙工流需要實現(xiàn) Readable 的 _read 方法和 Writable 的 _write 方法
net 模塊可以用來創(chuàng)建 socket,socket 在 NodeJS 中是一個典型的 Duplex,看一個 TCP 客戶端的例子
client 就是一個 Duplex,可寫流用于向服務器發(fā)送消息,可讀流用于接受服務器消息,兩個流內(nèi)的數(shù)據(jù)并沒有直接的關系
上述的例子中,可讀流中的數(shù)據(jù)(0/1)和可寫流中的數(shù)據(jù)(’F’,’B’,’B’)是隔離的,兩者并沒有產(chǎn)生關系,但對于 Transform 來說在可寫端寫入的數(shù)據(jù)經(jīng)過變換后會自動添加到可讀端。
Transform 繼承于 Duplex,并且已經(jīng)實現(xiàn)了 _write 和 _read 方法,只需要實現(xiàn) _tranform 方法即可
gulp 基于 Stream 的自動化構建工具,看一段官網(wǎng)的示例代碼
less → less 轉為 css → 執(zhí)行 css 壓縮 → 壓縮后的 css
其實 less() 和 minifyCss() 都是對輸入的數(shù)據(jù)做了一些處理,然后交給了輸出數(shù)據(jù)
Duplex 和 Transform 的選擇
和上面的示例對比起來,我們發(fā)現(xiàn)一個流同時面向生產(chǎn)者和消費者服務的時候我們會選擇 Duplex,當只是對數(shù)據(jù)做一些轉換工作的時候我們便會選擇使用 Tranform
背壓問題來源于生產(chǎn)者消費者模式中,消費者處理速度過慢
比如說,我們下載過程,處理速度為3Mb/s,而壓縮過程,處理速度為1Mb/s,這樣的話,很快緩沖區(qū)隊列就會形成堆積
要么導致整個過程內(nèi)存消耗增加,要么導致整個緩沖區(qū)慢,部分數(shù)據(jù)丟失
背壓處理可以理解為一個向上”喊話”的過程
當壓縮處理發(fā)現(xiàn)自己的緩沖區(qū)數(shù)據(jù)擠壓超過閾值的時候,就對下載處理“喊話”,我忙不過來了,不要再發(fā)了
下載處理收到消息就暫停向下發(fā)送數(shù)據(jù)
我們有不同的函數(shù)將數(shù)據(jù)從一個進程傳入另外一個進程。在 Node.js 中,有一個內(nèi)置函數(shù)稱為 .pipe(),同樣地最終,在這個進程的基本層面上我們有二個互不相關的組件:數(shù)據(jù)的_源頭_,和_消費者_
當 .pipe() 被源調(diào)用之后,它通知消費者有數(shù)據(jù)需要傳輸。管道函數(shù)為事件觸發(fā)建立了合適的積壓封裝
在數(shù)據(jù)緩存超出了 highWaterMark 或者寫入的列隊處于繁忙狀態(tài),.write() 會返回 false
當 false 返回之后,積壓系統(tǒng)介入了。它將暫停從任何發(fā)送數(shù)據(jù)的數(shù)據(jù)流中進入的 Readable。一旦數(shù)據(jù)流清空了,drain 事件將被觸發(fā),消耗進來的數(shù)據(jù)流
一旦隊列全部處理完畢,積壓機制將允許數(shù)據(jù)再次發(fā)送。在使用中的內(nèi)存空間將自我釋放,同時準備接收下一次的批量數(shù)據(jù)
我們可以看到 pipe 的背壓處理:
將數(shù)據(jù)按照chunk進行劃分,寫入
當chunk過大,或者隊列忙碌時,暫停讀取
當隊列為空時,繼續(xù)讀取數(shù)據(jù)
到此,相信大家對“Node中的Stream是什么”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關內(nèi)容可以進入相關頻道進行查詢,關注我們,繼續(xù)學習!
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。