溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Node中的Stream是什么

發(fā)布時間:2023-01-30 17:28:21 來源:億速云 閱讀:132 作者:iii 欄目:web開發(fā)

本篇內(nèi)容主要講解“Node中的Stream是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Node中的Stream是什么”吧!

stream 是一個抽象的數(shù)據(jù)接口,它繼承了 EventEmitter,它能夠發(fā)送/接受數(shù)據(jù),本質(zhì)就是讓數(shù)據(jù)流動起來,如下圖:Node中的Stream是什么

流不是 Node 中獨有的概念,是操作系統(tǒng)最基本的操作方式,在 Linux 中 | 就是 Stream,只是 Node 層面對其做了封裝,提供了對應的 API

為啥要一點一點?

首先使用下面的代碼創(chuàng)建一個文件,大概在 400MB 左右

Node中的Stream是什么

當我們使用 readFile 去讀取的時候,如下代碼

Node中的Stream是什么

正常啟動服務時,占用 10MB 左右的內(nèi)存

Node中的Stream是什么

使用curl http://127.0.0.1:8000發(fā)起請求時,內(nèi)存變?yōu)榱?420MB 左右,和我們創(chuàng)建的文件大小差不多

Node中的Stream是什么

改為使用使用 stream 的寫法,代碼如下

Node中的Stream是什么

再次發(fā)起請求時,發(fā)現(xiàn)內(nèi)存只占用了 35MB 左右,相比 readFile 大幅減少

Node中的Stream是什么

如果我們不采用流的模式,等待大文件加載完成在操作,會有如下的問題:

  • 內(nèi)存暫用過多,導致系統(tǒng)崩潰

  • CPU 運算速度有限制,且服務于多個程序,大文件加載過大且時間久

總結來說就是,一次性讀取大文件,內(nèi)存和網(wǎng)絡都吃不消

如何才能一點一點?

我們讀取文件的時候,可以采用讀取完成之后在輸出數(shù)據(jù)

Node中的Stream是什么

上述說到 stream 繼承了 EventEmitter 可以是實現(xiàn)監(jiān)聽數(shù)據(jù)。首先將讀取數(shù)據(jù)改為流式讀取,使用 on("data", ()?{}) 接收數(shù)據(jù),最后通過 on("end", ()?{}) 最后的結果

Node中的Stream是什么

有數(shù)據(jù)傳遞過來的時候就會觸發(fā) data 事件,接收這段數(shù)據(jù)做處理,最后等待所有的數(shù)據(jù)全部傳遞完成之后觸發(fā) end 事件。

數(shù)據(jù)的流轉過程

數(shù)據(jù)從哪里來—source

數(shù)據(jù)是從一個地方流向另一個地方,先看看數(shù)據(jù)的來源。

  • http 請求,請求接口來的數(shù)據(jù)

    Node中的Stream是什么

  • console 控制臺,標準輸入 stdin

    Node中的Stream是什么

  • file 文件,讀取文件內(nèi)容,例如上面的例子

連接的管道—pipe

在 source 和 dest 中有一個連接的管道 pipe,基本語法為 source.pipe(dest) ,source 和 dest 通過 pipe 連接,讓數(shù)據(jù)從 source 流向 dest

我們不需要向上面的代碼那樣手動監(jiān)聽 data/end 事件.

pipe 使用時有嚴格的要求,source 必須是一個可讀流,dest 必須是一個可寫流

??? 流動的數(shù)據(jù)到底是一個什么東西?代碼中的 chunk 是什么?

到哪里去—dest

stream 常見的三種輸出方式

  • console 控制臺,標準輸出 stdout

    Node中的Stream是什么

  • http 請求,接口請求中的 response

    Node中的Stream是什么

  • file 文件,寫入文件

    Node中的Stream是什么

流的種類

Node中的Stream是什么

可讀流 Readable Streams

可讀流是對提供數(shù)據(jù)的源頭(source)的抽象

所有的 Readable 都實現(xiàn)了 stream.Readable 類定義的接口

Node中的Stream是什么

? 讀取文件流創(chuàng)建

fs.createReadStream 創(chuàng)建一個 Readable 對象

Node中的Stream是什么

讀取模式

可讀流有兩種模式,流動模式(flowing mode)暫停模式(pause mode),這個決定了 chunk 數(shù)據(jù)的流動方式:自動流動和手工流動

在 ReadableStream 中有一個 _readableState 屬性,在其中有一個 flowing 的一個屬性來判斷流的模式,他有三種狀態(tài)值:

  • ture:表示為流動模式

  • false:表示為暫停模式

  • null:初始狀態(tài)

Node中的Stream是什么

可以使用熱水器模型來模擬數(shù)據(jù)的流動。熱水器水箱(buffer 緩存區(qū))存儲著熱水(需要的數(shù)據(jù)),當我們打開水龍頭的時候,熱水就會從水箱中不斷流出來,并且自來水也會不斷的流入水箱,這就是流動模式。當我們關閉水龍頭時,水箱會暫停進水,水龍頭則會暫停出水,這就是暫停模式。

流動模式

數(shù)據(jù)自動地從底層讀取,形成流動現(xiàn)象,并通過事件提供給應用程序。

  • 監(jiān)聽 data 事件即可進入該模式
    當 data 事件被添加后,可寫流中有數(shù)據(jù)后會將數(shù)據(jù)推到該事件回調(diào)函數(shù)中,需要自己去消費數(shù)據(jù)塊,如果不處理則該數(shù)據(jù)會丟失

  • 調(diào)用 stream.pipe 方法將數(shù)據(jù)發(fā)送到 Writeable

  • 調(diào)用 stream.resume 方法

    Node中的Stream是什么

暫停模式

數(shù)據(jù)會堆積在內(nèi)部緩沖器中,必須顯式調(diào)用 stream.read() 讀取數(shù)據(jù)塊

  • 監(jiān)聽 readable 事件 可寫流在數(shù)據(jù)準備好后會觸發(fā)該事件回調(diào),此時需要在回調(diào)函數(shù)中使用 stream.read() 來主動消費數(shù)據(jù)。readable 事件表明流有新的動態(tài):要么有新的數(shù)據(jù),要么流已經(jīng)讀取所有數(shù)據(jù)

    Node中的Stream是什么

兩種模式之間如何進行轉換呢
  • 可讀流在創(chuàng)建完成之后處于初始狀態(tài)   //TODO:和網(wǎng)上的分享不一致

  • 暫停模式切換到流動模式


    - 監(jiān)聽 data 事件
    - 調(diào)用 stream.resume 方法
    - 調(diào)用 stream.pipe 方法將數(shù)據(jù)發(fā)送到 Writable


    Node中的Stream是什么

  • 流動模式切換到暫停模式


    - 移除 data 事件
    - 調(diào)用 stream.pause 方法
    - 調(diào)用 stream.unpipe 移除管道目標


實現(xiàn)原理

創(chuàng)建可讀流的時候,需要繼承 Readable 對象,并且實現(xiàn) _read 方法

Node中的Stream是什么

創(chuàng)建一個自定義可讀流

Node中的Stream是什么

當我們調(diào)用 read 方法時,整體的流程如下:Node中的Stream是什么

  • doRead

    流中維護了一個緩存,當調(diào)用 read 方法的時候來判斷是否需要向底層請求數(shù)據(jù)

    當緩存區(qū)長度為0或者小于 highWaterMark 這個值得時候就會調(diào)用 _read 去底層獲取數(shù)據(jù) 源碼鏈接

    Node中的Stream是什么

可寫流 Writeable Stream

可寫流 是對數(shù)據(jù)寫入目的地的一種抽象,是用來消費上游流過來的數(shù)據(jù),通過可寫流把數(shù)據(jù)寫入設備,常見的寫入流就是本地磁盤的寫入

Node中的Stream是什么

可寫流的特點
  • 通過 write 寫入數(shù)據(jù)

    Node中的Stream是什么

  • 通過 end 寫數(shù)據(jù)并且關閉流,end = write + close

    Node中的Stream是什么Node中的Stream是什么

  • 當寫入數(shù)據(jù)達到 highWaterMark 的大小時,會觸發(fā) drain 事件

    Node中的Stream是什么

    調(diào)用 ws.write(chunk) 返回 false,表示當前緩沖區(qū)數(shù)據(jù)大于或等于 highWaterMark 的值,就會觸發(fā) drain 事件。其實是起到一個警示作用,我們依舊可以寫入數(shù)據(jù),只是未處理的數(shù)據(jù)會一直積壓在可寫流的內(nèi)部緩沖區(qū)中,直到積壓沾滿 Node.js 緩沖區(qū)后,才會被強行中斷

自定義可寫流

所有的 Writeable 都實現(xiàn)了 stream.Writeable 類定義的接口

只需要實現(xiàn) _write 方法就能夠?qū)?shù)據(jù)寫入底層

Node中的Stream是什么

  • 通過調(diào)用調(diào)用 writable.write 方法將數(shù)據(jù)寫入流中,會調(diào)用 _write 方法將數(shù)據(jù)寫入底層

  • 當 _write 數(shù)據(jù)成功后,需要調(diào)用 next 方法去處理下一個數(shù)據(jù)

  • 必須調(diào)用 writable.end(data) 來結束可寫流,data 是可選的。此后,不能再調(diào)用 write 新增數(shù)據(jù),否則會報錯

  • 在 end 方法調(diào)用后,當所有底層的寫操作均完成時,會觸發(fā) finish 事件

雙工流 Duplex Stream

雙工流,既可讀,也可寫。實際上繼承了 Readable 和 Writable 的一種流,那它既可以當做可讀流來用又可以當做可寫流來用

自定義的雙工流需要實現(xiàn) Readable 的 _read 方法和 Writable 的 _write 方法

Node中的Stream是什么

net 模塊可以用來創(chuàng)建 socket,socket 在 NodeJS 中是一個典型的 Duplex,看一個 TCP 客戶端的例子

Node中的Stream是什么

client 就是一個 Duplex,可寫流用于向服務器發(fā)送消息,可讀流用于接受服務器消息,兩個流內(nèi)的數(shù)據(jù)并沒有直接的關系

轉換流 Transform Stream

上述的例子中,可讀流中的數(shù)據(jù)(0/1)和可寫流中的數(shù)據(jù)(’F’,’B’,’B’)是隔離的,兩者并沒有產(chǎn)生關系,但對于 Transform 來說在可寫端寫入的數(shù)據(jù)經(jīng)過變換后會自動添加到可讀端。

Transform 繼承于 Duplex,并且已經(jīng)實現(xiàn)了 _write 和 _read 方法,只需要實現(xiàn) _tranform 方法即可

Node中的Stream是什么

gulp 基于 Stream 的自動化構建工具,看一段官網(wǎng)的示例代碼

Node中的Stream是什么

less → less 轉為 css → 執(zhí)行 css 壓縮 → 壓縮后的 css

其實 less() 和 minifyCss() 都是對輸入的數(shù)據(jù)做了一些處理,然后交給了輸出數(shù)據(jù)

Duplex 和 Transform 的選擇

和上面的示例對比起來,我們發(fā)現(xiàn)一個流同時面向生產(chǎn)者和消費者服務的時候我們會選擇 Duplex,當只是對數(shù)據(jù)做一些轉換工作的時候我們便會選擇使用 Tranform

背壓問題

什么是背壓

背壓問題來源于生產(chǎn)者消費者模式中,消費者處理速度過慢

比如說,我們下載過程,處理速度為3Mb/s,而壓縮過程,處理速度為1Mb/s,這樣的話,很快緩沖區(qū)隊列就會形成堆積

要么導致整個過程內(nèi)存消耗增加,要么導致整個緩沖區(qū)慢,部分數(shù)據(jù)丟失

Node中的Stream是什么

什么是背壓處理

背壓處理可以理解為一個向上”喊話”的過程

當壓縮處理發(fā)現(xiàn)自己的緩沖區(qū)數(shù)據(jù)擠壓超過閾值的時候,就對下載處理“喊話”,我忙不過來了,不要再發(fā)了

下載處理收到消息就暫停向下發(fā)送數(shù)據(jù)

Node中的Stream是什么

如何處理背壓

我們有不同的函數(shù)將數(shù)據(jù)從一個進程傳入另外一個進程。在 Node.js 中,有一個內(nèi)置函數(shù)稱為 .pipe(),同樣地最終,在這個進程的基本層面上我們有二個互不相關的組件:數(shù)據(jù)的_源頭_,和_消費者_

當 .pipe() 被源調(diào)用之后,它通知消費者有數(shù)據(jù)需要傳輸。管道函數(shù)為事件觸發(fā)建立了合適的積壓封裝

在數(shù)據(jù)緩存超出了 highWaterMark 或者寫入的列隊處于繁忙狀態(tài),.write() 會返回 false

當 false 返回之后,積壓系統(tǒng)介入了。它將暫停從任何發(fā)送數(shù)據(jù)的數(shù)據(jù)流中進入的 Readable。一旦數(shù)據(jù)流清空了,drain 事件將被觸發(fā),消耗進來的數(shù)據(jù)流

一旦隊列全部處理完畢,積壓機制將允許數(shù)據(jù)再次發(fā)送。在使用中的內(nèi)存空間將自我釋放,同時準備接收下一次的批量數(shù)據(jù)

Node中的Stream是什么

我們可以看到 pipe 的背壓處理:

  • 將數(shù)據(jù)按照chunk進行劃分,寫入

  • 當chunk過大,或者隊列忙碌時,暫停讀取

  • 當隊列為空時,繼續(xù)讀取數(shù)據(jù)

到此,相信大家對“Node中的Stream是什么”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關內(nèi)容可以進入相關頻道進行查詢,關注我們,繼續(xù)學習!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。

AI