您好,登錄后才能下訂單哦!
這篇文章主要介紹了Node.js中stream模塊怎么用,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
Node.js stream
提供了四種類型的流
可讀流(Readable Streams)
可寫流(Writable Streams)
雙工流(Duplex Streams)
轉(zhuǎn)換流(Transform Streams)
更多詳情請查看 Node.js 官方文檔
https://nodejs.org/api/stream.html#stream_types_of_streams
讓我們在高層面來看看每一種流類型吧。
可讀流可以從一個特定的數(shù)據(jù)源中讀取數(shù)據(jù),最常見的是從一個文件系統(tǒng)中讀取。Node.js 應(yīng)用中其他常見的可讀流用法有:
process.stdin
-通過 stdin
在終端應(yīng)用中讀取用戶輸入。
http.IncomingMessage
- 在 HTTP 服務(wù)中讀取傳入的請求內(nèi)容或者在 HTTP 客戶端中讀取服務(wù)器的 HTTP 響應(yīng)。
你可以使用可寫流將來自應(yīng)用的數(shù)據(jù)寫入到特定的地方,比如一個文件。
process.stdout
可以用來將數(shù)據(jù)寫成標(biāo)準(zhǔn)輸出且被 console.log
內(nèi)部使用。
接下來是雙工流和轉(zhuǎn)換流,可以被定義為基于可讀流和可寫流的混合流類型。
雙工流是可讀流和可寫流的結(jié)合,它既可以將數(shù)據(jù)寫入到特定的地方也可以從數(shù)據(jù)源讀取數(shù)據(jù)。最常見的雙工流案例是 net.Socket
,它被用來從 socket 讀寫數(shù)據(jù)。
有一點很重要,雙工流中的可讀端和可寫端的操作是相互獨立的,數(shù)據(jù)不會從一端流向另一端。
轉(zhuǎn)換流與雙工流略有相似,但在轉(zhuǎn)換流中,可讀端和可寫端是相關(guān)聯(lián)的。
crypto.Cipher
類是一個很好的例子,它實現(xiàn)了加密流。通過 crypto.Cipher
流,應(yīng)用可以往流的可寫端寫入純文本數(shù)據(jù)并從流的可讀端讀取加密后的密文。之所以將這種類型的流稱之為轉(zhuǎn)換流就是因為其轉(zhuǎn)換性質(zhì)。
附注:另一個轉(zhuǎn)換流是 stream.PassThrough
。stream.PassThrough
從可寫端傳遞數(shù)據(jù)到可讀端,沒有任何轉(zhuǎn)換。這聽起來可能有點多余,但 Passthrough 流對構(gòu)建自定義流以及流管道非常有幫助。(比如創(chuàng)建一個流的數(shù)據(jù)的多個副本)
一旦可讀流連接到生產(chǎn)數(shù)據(jù)的源頭,比如一個文件,就可以用幾種方法通過該流讀取數(shù)據(jù)。
首先,先創(chuàng)建一個名為 myfile
的簡單的 text 文件,85 字節(jié)大小,包含以下字符串:
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Curabitur nec mauris turpis.
現(xiàn)在,我們看下從可讀流讀取數(shù)據(jù)的兩種不同方式。
data
事件從可讀流讀取數(shù)據(jù)的最常見方式是監(jiān)聽流發(fā)出的 data
事件。以下代碼演示了這種方式:
const fs = require('fs') const readable = fs.createReadStream('./myfile', { highWaterMark: 20 }); readable.on('data', (chunk) => { console.log(`Read ${chunk.length} bytes\n"${chunk.toString()}"\n`); })
highWaterMark
屬性作為一個選項傳遞給 fs.createReadStream
,用于決定該流中有多少數(shù)據(jù)緩沖。然后數(shù)據(jù)被沖到讀取機制(在這個案例中,是我們的 data
處理程序)。默認(rèn)情況下,可讀 fs
流的 highWaterMark
值是 64kb。我們刻意重寫該值為 20 字節(jié)用于觸發(fā)多個 data
事件。
如果你運行上述程序,它會在五個迭代內(nèi)從 myfile
中讀取 85 個字節(jié)。你會在 console 看到以下輸出:
Read 20 bytes "Lorem ipsum dolor si" Read 20 bytes "t amet, consectetur " Read 20 bytes "adipiscing elit. Cur" Read 20 bytes "abitur nec mauris tu" Read 5 bytes "rpis."
從可讀流中讀取數(shù)據(jù)的另一種方法是使用異步迭代器:
const fs = require('fs') const readable = fs.createReadStream('./myfile', { highWaterMark: 20 }); (async () => { for await (const chunk of readable) { console.log(`Read ${chunk.length} bytes\n"${chunk.toString()}"\n`); } })()
如果你運行這個程序,你會得到和前面例子一樣的輸出。
當(dāng)一個監(jiān)聽器監(jiān)聽到可讀流的 data
事件時,流的狀態(tài)會切換成”流動”狀態(tài)(除非該流被顯式的暫停了)。你可以通過流對象的 readableFlowing
屬性檢查流的”流動”狀態(tài)
我們可以稍微修改下前面的例子,通過 data
處理器來示范:
const fs = require('fs') const readable = fs.createReadStream('./myfile', { highWaterMark: 20 }); let bytesRead = 0 console.log(`before attaching 'data' handler. is flowing: ${readable.readableFlowing}`); readable.on('data', (chunk) => { console.log(`Read ${chunk.length} bytes`); bytesRead += chunk.length // 在從可讀流中讀取 60 個字節(jié)后停止閱讀 if (bytesRead === 60) { readable.pause() console.log(`after pause() call. is flowing: ${readable.readableFlowing}`); // 在等待 1 秒后繼續(xù)讀取 setTimeout(() => { readable.resume() console.log(`after resume() call. is flowing: ${readable.readableFlowing}`); }, 1000) } }) console.log(`after attaching 'data' handler. is flowing: ${readable.readableFlowing}`);
在這個例子中,我們從一個可讀流中讀取 myfile
,但在讀取 60 個字節(jié)后,我們臨時暫停了數(shù)據(jù)流 1 秒。我們也在不同的時間打印了 readableFlowing
屬性的值去理解他是如何變化的。
如果你運行上述程序,你會得到以下輸出:
before attaching 'data' handler. is flowing: null after attaching 'data' handler. is flowing: true Read 20 bytes Read 20 bytes Read 20 bytes after pause() call. is flowing: false after resume() call. is flowing: true Read 20 bytes Read 5 bytes
我們可以用以下來解釋輸出:
當(dāng)我們的程序開始時,readableFlowing
的值是 null
,因為我們沒有提供任何消耗流的機制。
在連接到 data
處理器后,可讀流變?yōu)椤傲鲃印蹦J剑?code>readableFlowing 變?yōu)?true
。
一旦讀取 60 個字節(jié),通過調(diào)用 pause()
來暫停流,readableFlowing
也轉(zhuǎn)變?yōu)?false
。
在等待 1 秒后,通過調(diào)用 resume()
,流再次切換為“流動”模式,readableFlowing
改為 `true'。然后剩下的文件內(nèi)容在流中流動。
因為有流,應(yīng)用不需要在內(nèi)存中保留大型的二進制對象:小型的數(shù)據(jù)塊可以接收到就進行處理。
在這部分,讓我們組合不同的流來構(gòu)建一個可以處理大量數(shù)據(jù)的真實應(yīng)用。我們會使用一個小型的工具程序來生成一個給定文件的 SHA-256。
但首先,我們需要創(chuàng)建一個大型的 4GB 的假文件來測試。你可以通過一個簡單的 shell 命令來完成:
On macOS: mkfile -n 4g 4gb_file
On Linux: xfs_mkfile 4096m 4gb_file
在我們創(chuàng)建了假文件 4gb_file
后,讓我們在不使用 stream
模塊的情況下來生成來文件的 SHA-256 hash。
const fs = require("fs"); const crypto = require("crypto"); fs.readFile("./4gb_file", (readErr, data) => { if (readErr) return console.log(readErr) const hash = crypto.createHash("sha256").update(data).digest("base64"); fs.writeFile("./checksum.txt", hash, (writeErr) => { writeErr && console.error(err) }); });
如果你運行以上代碼,你可能會得到以下錯誤:
RangeError [ERR_FS_FILE_TOO_LARGE]: File size (4294967296) is greater than 2 GB at FSReqCallback.readFileAfterStat [as oncomplete] (fs.js:294:11) { code: 'ERR_FS_FILE_TOO_LARGE' }
以上報錯之所以發(fā)生是因為 JavaScript 運行時無法處理隨機的大型緩沖。運行時可以處理的最大尺寸的緩沖取決于你的操作系統(tǒng)結(jié)構(gòu)。你可以通過使用內(nèi)建的 buffer
模塊里的 buffer.constants.MAX_LENGTH
變量來查看你操作系統(tǒng)緩存的最大尺寸。
即使上述報錯沒有發(fā)生,在內(nèi)存中保留大型文件也是有問題的。我們所擁有的可用的物理內(nèi)存會限制我們應(yīng)用能使用的內(nèi)存量。高內(nèi)存使用率也會造成應(yīng)用在 CPU 使用方面性能低下,因為垃圾回收會變得昂貴。
pipeline()
減少 APP 的內(nèi)存占用現(xiàn)在,讓我們看看如何修改應(yīng)用去使用流且避免遇到這個報錯:
const fs = require("fs"); const crypto = require("crypto"); const { pipeline } = require("stream"); const hashStream = crypto.createHash("sha256"); hashStream.setEncoding('base64') const inputStream = fs.createReadStream("./4gb_file"); const outputStream = fs.createWriteStream("./checksum.txt"); pipeline( inputStream, hashStream, outputStream, (err) => { err && console.error(err) } )
在這個例子中,我們使用 crypto.createHash
函數(shù)提供的流式方法。它返回一個“轉(zhuǎn)換”流對象 hashStream
,為隨機的大型文件生成 hash。
為了將文件內(nèi)容傳輸?shù)竭@個轉(zhuǎn)換流中,我們使用 fs.createReadStream
為 4gb_file
創(chuàng)建了一個可讀流 inputStream
。我們將 hashStream
轉(zhuǎn)換流的輸出傳遞到可寫流 outputStream
中,而 checksum.txt
通過 fs.createWriteStream
創(chuàng)建的。
如果你運行以上程序,你將看見在 checksum.txt
文件中看見 4GB 文件的 SHA-256 hash。
pipeline()
和 pipe()
的對比在前面的案例中,我們使用 pipeline
函數(shù)來連接多個流。另一種常見的方法是使用 .pipe()
函數(shù),如下所示:
inputStream .pipe(hashStream) .pipe(outputStream)
但這里有幾個原因,所以并不推薦在生產(chǎn)應(yīng)用中使用 .pipe()
。如果其中一個流被關(guān)閉或者出現(xiàn)報錯,pipe()
不會自動銷毀連接的流,這會導(dǎo)致應(yīng)用內(nèi)存泄露。同樣的,pipe()
不會自動跨流轉(zhuǎn)發(fā)錯誤到一個地方處理。
因為這些問題,所以就有了 pipeline()
,所以推薦你使用 pipeline()
而不是 pipe()
來連接不同的流。 我們可以重寫上述的 pipe()
例子來使用 pipeline()
函數(shù),如下:
pipeline( inputStream, hashStream, outputStream, (err) => { err && console.error(err) } )
pipeline()
接受一個回調(diào)函數(shù)作為最后一個參數(shù)。任何來自被連接的流的報錯都將觸發(fā)該回調(diào)函數(shù),所以可以很輕松的在一個地方處理報錯。
在 Node.js 中使用流有助于我們構(gòu)建可以處理大型數(shù)據(jù)的高性能應(yīng)用。
在這篇文章中,我們覆蓋了:
四種類型的 Node.js 流(可讀流、可寫流、雙工流以及轉(zhuǎn)換流)。
如何通過監(jiān)聽 data
事件或者使用異步迭代器來從可讀流中讀取數(shù)據(jù)。
通過使用 pipeline
連接多個流來減少內(nèi)存占用。
一個簡短的警告:你很可能不會遇到太多必須使用流的場景,而基于流的方案會提高你的應(yīng)用的復(fù)雜性。務(wù)必確保使用流的好處勝于它所帶來的復(fù)雜性。
感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“Node.js中stream模塊怎么用”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關(guān)注億速云行業(yè)資訊頻道,更多相關(guān)知識等著你來學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。