溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

怎么理解Nodejs中的流

發(fā)布時間:2022-05-10 15:57:52 來源:億速云 閱讀:155 作者:iii 欄目:web開發(fā)

這篇文章主要講解了“怎么理解Nodejs中的流”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“怎么理解Nodejs中的流”吧!

怎么理解Nodejs中的流

如何理解流

  • 對于流的使用者來說,可以將流看作一個數(shù)組,我們只需要關(guān)注從中獲取(消費)和寫入(生產(chǎn))就可以了。

  • 對于流的開發(fā)者(使用stream模塊創(chuàng)建一個新實例),關(guān)注的是如何實現(xiàn)流中的一些方法,通常關(guān)注兩點,目標(biāo)資源是誰和如何操作目標(biāo)資源,確定了后就需要根據(jù)流的不同狀態(tài)和事件來對目標(biāo)資源進(jìn)行操作

緩存池

NodeJs 中所有的流都有緩沖池,緩沖池存在的目的是增加流的效率,當(dāng)數(shù)據(jù)的生產(chǎn)和消費都需要時間時,我們可以在下一次消費前提前生產(chǎn)數(shù)據(jù)存放到緩沖池。但是緩沖池并不是時刻都處于使用狀態(tài),例如緩存池為空時,數(shù)據(jù)生產(chǎn)后就不會放入緩存池而是直接消費。 。

如果數(shù)據(jù)生產(chǎn)的速度大于數(shù)據(jù)的消費速度,多余的數(shù)據(jù)會在某個地方等待。如果數(shù)據(jù)的生產(chǎn)速度小于進(jìn)程數(shù)據(jù)的消費速度,那么數(shù)據(jù)會在某個地方累計到一定的數(shù)量,然后在進(jìn)行消費。(開發(fā)者無法控制數(shù)據(jù)的生產(chǎn)和消費速度,只能盡量在何時的時機(jī)生產(chǎn)數(shù)據(jù)或者消費數(shù)據(jù))

那個數(shù)據(jù)等待,累計數(shù)據(jù),然后發(fā)生出去的地方。就是緩沖池。緩沖池通常位于電腦的RAM(內(nèi)存)中。

舉一個常見的緩沖區(qū)的例子,我們在觀看在線視頻的時候,如果你的網(wǎng)速很快,緩沖區(qū)總是會被立即填充,然后發(fā)送給系統(tǒng)播放,然后立即緩沖下一段視頻。觀看的過程中,不會有卡頓。如果網(wǎng)速很慢,則會看到loading,表示緩沖區(qū)正在被填充,當(dāng)填充完成后數(shù)據(jù)被發(fā)送給系統(tǒng),才能看到這段視頻。

NodeJs 流的緩存池是一個 Buffer 鏈表,每一次想緩存池中加入數(shù)據(jù)都會重新創(chuàng)建一個 Buffer 節(jié)點插入到鏈表尾部。

EventEmitter

NodeJs 中對 Stream 是一個實現(xiàn)了 EventEmitter 的抽象接口,所以我會先簡單的介紹一下 EventEmitter。

EventEmitter 是一個實現(xiàn)事件發(fā)布訂閱功能的類,其中常用的幾個方法(on, once, off, emit)相信大家都耳熟能詳了,就不一一介紹了。

const { EventEmitter } = require('events')

const eventEmitter = new EventEmitter()

// 為 eventA 事件綁定處理函數(shù)
eventEmitter.on('eventA', () => {
    console.log('eventA active 1');
});

// 為 eventB 事件綁定處理函數(shù)
eventEmitter.on('eventB', () => {
    console.log('eventB active 1');
});

eventEmitter.once('eventA', () => {
    console.log('eventA active 2');
});

// 觸發(fā) eventA
eventEmitter.emit('eventA')
// eventA active 1
// eventA active 2

值得注意的是, EventEmitter 有兩個叫做 newListenerremoveListener 的事件,當(dāng)你向一個事件對象中添加任何事件監(jiān)聽函數(shù)后,都會觸發(fā) newListener(eventEmitter.emit('newListener')),當(dāng)一個處理函數(shù)被移除時同理會觸發(fā) removeListener。

還需要注意的是, once 綁定的處理函數(shù)只會執(zhí)行一次,removeListener 將在其執(zhí)行前被觸發(fā),這意味著 once 綁定的監(jiān)聽函數(shù)是先被移除才被觸發(fā)的。

const { EventEmitter } = require('events')

const eventEmitter = new EventEmitter()

eventEmitter.on('newListener', (event, listener)=>{
    console.log('newListener', event, listener)
})

eventEmitter.on('removeListener', (event, listener) => {
    console.log('removeListener', event, listener)
})
//newListener removeListener[Function(anonymous)]


eventEmitter.on('eventA', () => {
    console.log('eventA active 1');
});
//newListener eventA [Function (anonymous)]

function listenerB() { console.log('eventB active 1'); }
eventEmitter.on('eventB', listenerB);
// newListener eventB [Function (anonymous)]

eventEmitter.once('eventA', () => {
    console.log('eventA active 2');
});
// newListener eventA [Function (anonymous)]

eventEmitter.emit('eventA')
// eventA active 1
// removeListener eventA [Function: bound onceWrapper] { listener: [Function (anonymous)] }
// eventA active 2

eventEmitter.off('eventB', listenerB)
// removeListener eventB[Function: listenerB]

不過這對于我們后面的內(nèi)容來說并不重要。

Stream

Stream 是在 Node.js 中處理流數(shù)據(jù)的抽象接口。Stream 并不是一個實際的接口,而是對所有流的一種統(tǒng)稱。實際的接口有 ReadableStream、 WritableStream、ReadWriteStream 這幾個。

interface ReadableStream extends EventEmitter {
    readable: boolean;
    read(size?: number): string | Buffer;
    setEncoding(encoding: BufferEncoding): this;
    pause(): this;
    resume(): this;
    isPaused(): boolean;
    pipe<T extends WritableStream>(destination: T, options?: { end?: boolean | undefined; }): T;
    unpipe(destination?: WritableStream): this;
    unshift(chunk: string | Uint8Array, encoding?: BufferEncoding): void;
    wrap(oldStream: ReadableStream): this;
    [Symbol.asyncIterator](): AsyncIterableIterator<string | Buffer>;
}

interface WritableStream extends EventEmitter {
    writable: boolean;
    write(buffer: Uint8Array | string, cb?: (err?: Error | null) => void): boolean;
    write(str: string, encoding?: BufferEncoding, cb?: (err?: Error | null) => void): boolean;
    end(cb?: () => void): this;
    end(data: string | Uint8Array, cb?: () => void): this;
    end(str: string, encoding?: BufferEncoding, cb?: () => void): this;
}

interface ReadWriteStream extends ReadableStream, WritableStream { }

可以看出 ReadableStream 和 WritableStream 都是繼承 EventEmitter 類的接口(ts中接口是可以繼承類的,因為他們只是在進(jìn)行類型的合并)。

上面這些接口對應(yīng)的實現(xiàn)類分別是 Readable、Writable 和 Duplex

NodeJs中的流有4種:

  • Readable 可讀流(實現(xiàn)ReadableStream)

  • Writable 可寫流(實現(xiàn)WritableStream)

  • Duplex 可讀可寫流(繼承Readable后實現(xiàn)WritableStream)

  • Transform 轉(zhuǎn)換流(繼承Duplex)

背壓問題

磁盤寫入數(shù)據(jù)的速度是遠(yuǎn)低于內(nèi)存的,我們想象內(nèi)存和磁盤之間有一個“管道”,“管道”中是“流”,內(nèi)存的數(shù)據(jù)流入管道是非??斓?,當(dāng)管道塞滿時,內(nèi)存中就會產(chǎn)生數(shù)據(jù)背壓,數(shù)據(jù)積壓在內(nèi)存中,占用資源。

怎么理解Nodejs中的流

NodeJs Stream 的解決辦法是為每一個流的 緩存池(就是圖中寫入隊列)設(shè)置一個浮標(biāo)值,當(dāng)其中數(shù)據(jù)量達(dá)到這個浮標(biāo)值后,往緩存池再次 push 數(shù)據(jù)時就會返回 false,表示當(dāng)前流中緩存池內(nèi)容已經(jīng)達(dá)到浮標(biāo)值,不希望再有數(shù)據(jù)寫入了,這時我們應(yīng)該立即停止數(shù)據(jù)的生產(chǎn),防止緩存池過大產(chǎn)生背壓。

Readable

可讀流(Readable)是流的一種類型,他有兩種模式三種狀態(tài)

兩種讀取模式:

  • 流動模式:數(shù)據(jù)會從底層系統(tǒng)讀取寫入到緩沖區(qū),當(dāng)緩沖區(qū)被寫滿后自動通過 EventEmitter 盡快的將數(shù)據(jù)傳遞給所注冊的事件處理程序中

  • 暫停模式:在這種模式下將不會主動觸發(fā) EventEmitter 傳輸數(shù)據(jù),必須顯示的調(diào)用 Readable.read() 方法來從緩沖區(qū)中讀取數(shù)據(jù),read 會觸發(fā)響應(yīng)到 EventEmitter 事件。

三種狀態(tài):

  • readableFlowing === null(初始狀態(tài))

  • readableFlowing === false(暫停模式)

  • readableFlowing === true(流動模式)

初始時流的 readable.readableFlowingnull

添加data事件后變?yōu)?true 。調(diào)用 pause()、unpipe()、或接收到背壓或者添加 readable 事件,則 readableFlowing 會被設(shè)為 false ,在這個狀態(tài)下,為 data 事件綁定監(jiān)聽器不會使 readableFlowing 切換到 true

調(diào)用 resume() 可以讓可讀流的 readableFlowing 切換到 true

移除所有的 readable 事件是使 readableFlowing 變?yōu)?null 的唯一方法。

事件名說明
readable當(dāng)緩沖區(qū)有新的可讀取數(shù)據(jù)時觸發(fā)(每一個想緩存池插入節(jié)點都會觸發(fā))
data每一次消費數(shù)據(jù)后都會觸發(fā),參數(shù)是本次消費的數(shù)據(jù)
close流關(guān)閉時觸發(fā)
error流發(fā)生錯誤時觸發(fā)
方法名說明
read(size)消費長度為size的數(shù)據(jù),返回null表示當(dāng)前數(shù)據(jù)不足size,否則返回本次消費的數(shù)據(jù)。size不傳遞時表示消費緩存池中所有數(shù)據(jù)
const fs = require('fs');

const readStreams = fs.createReadStream('./EventEmitter.js', {
    highWaterMark: 100// 緩存池浮標(biāo)值
})

readStreams.on('readable', () => {
    console.log('緩沖區(qū)滿了')
    readStreams.read()// 消費緩存池的所有數(shù)據(jù),返回結(jié)果并且觸發(fā)data事件
})


readStreams.on('data', (data) => {
    console.log('data')
})

https://github1s.com/nodejs/node/blob/v16.14.0/lib/internal/streams/readable.js#L527

當(dāng) size 為 0 會觸發(fā) readable 事件。

當(dāng)緩存池中的數(shù)據(jù)長度達(dá)到浮標(biāo)值 highWaterMark 后,就不會在主動請求生產(chǎn)數(shù)據(jù),而是等待數(shù)據(jù)被消費后在生產(chǎn)數(shù)據(jù)

暫停狀態(tài)的流如果不調(diào)用 read 來消費數(shù)據(jù)時,后續(xù)也不會在觸發(fā) datareadable,當(dāng)調(diào)用 read 消費時會先判斷本次消費后剩余的數(shù)據(jù)長度是否低于 浮標(biāo)值,如果低于 浮標(biāo)值 就會在消費前請求生產(chǎn)數(shù)據(jù)。這樣在 read 后的邏輯執(zhí)行完成后新的數(shù)據(jù)大概率也已經(jīng)生產(chǎn)完成,然后再次觸發(fā) readable,這種提前生產(chǎn)下一次消費的數(shù)據(jù)存放在緩存池的機(jī)制也是緩存流為什么快的原因

流動狀態(tài)下的流有兩種情況

  • 生產(chǎn)速度慢于消費速度時:這種情況下每一個生產(chǎn)數(shù)據(jù)后一般緩存池中都不會有剩余數(shù)據(jù),直接將本次生產(chǎn)的數(shù)據(jù)傳遞給 data 事件即可(因為沒有進(jìn)入緩存池,所以也不用調(diào)用 read 來消費),然后立即開始生產(chǎn)新數(shù)據(jù),待上一次數(shù)據(jù)消費完后新數(shù)據(jù)才生產(chǎn)好,再次觸發(fā) data ,一只到流結(jié)束。

  • 生產(chǎn)速度快于消費速度時:此時每一次生產(chǎn)完數(shù)據(jù)后一般緩存池都還存在未消費的數(shù)據(jù),這種情況一般會在消費數(shù)據(jù)時開始生產(chǎn)下一次消費的數(shù)據(jù),待舊數(shù)據(jù)消費完后新數(shù)據(jù)已經(jīng)生產(chǎn)完并且放入緩存池

他們的區(qū)別僅僅在于數(shù)據(jù)生產(chǎn)后緩存池是否還存在數(shù)據(jù),如果存在數(shù)據(jù)則將生產(chǎn)的數(shù)據(jù) push 到緩存池等待消費,如果不存在則直接將數(shù)據(jù)交給 data 而不加入緩存池。

值得注意的是當(dāng)一個緩存池中存在數(shù)據(jù)的流從暫停模式進(jìn)入的流動模式時,會先循環(huán)調(diào)用 read 來消費數(shù)據(jù)只到返回 null

暫停模式

怎么理解Nodejs中的流

暫停模式下,一個可讀流讀創(chuàng)建時,模式是暫停模式,創(chuàng)建后會自動調(diào)用 _read 方法,把數(shù)據(jù)從數(shù)據(jù)源 push 到緩沖池中,直到緩沖池中的數(shù)據(jù)達(dá)到了浮標(biāo)值。每當(dāng)數(shù)據(jù)到達(dá)浮標(biāo)值時,可讀流會觸發(fā)一個 " readable " 事件,告訴消費者有數(shù)據(jù)已經(jīng)準(zhǔn)備好了,可以繼續(xù)消費。

一般來說, 'readable' 事件表明流有新的動態(tài):要么有新的數(shù)據(jù),要么到達(dá)流的盡頭。所以,數(shù)據(jù)源的數(shù)據(jù)被讀完前,也會觸發(fā)一次 'readable' 事件;

消費者 " readable " 事件的處理函數(shù)中,通過 stream.read(size) 主動消費緩沖池中的數(shù)據(jù)。

const { Readable } = require('stream')

let count = 1000
const myReadable = new Readable({
    highWaterMark: 300,
    // 參數(shù)的 read 方法會作為流的 _read 方法,用于獲取源數(shù)據(jù)
    read(size) {
        // 假設(shè)我們的源數(shù)據(jù)上 1000 個1
        let chunk = null
        // 讀取數(shù)據(jù)的過程一般是異步的,例如IO操作
        setTimeout(() => {
            if (count > 0) {
                let chunkLength = Math.min(count, size)
                chunk = '1'.repeat(chunkLength)
                count -= chunkLength
            }
            this.push(chunk)
        }, 500)
    }
})
// 每一次成功 push 數(shù)據(jù)到緩存池后都會觸發(fā) readable
myReadable.on('readable', () => {
    const chunk = myReadable.read()//消費當(dāng)前緩存池中所有數(shù)據(jù)
    console.log(chunk.toString())
})

值得注意的是, 如果 read(size) 的 size 大于浮標(biāo)值,會重新計算新的浮標(biāo)值,新浮標(biāo)值是size的下一個二次冪(size <= 2^n,n取最小值)

//  hwm 不會大于 1GB.
const MAX_HWM = 0x40000000;
function computeNewHighWaterMark(n) {
  if (n >= MAX_HWM) {
    // 1GB限制
    n = MAX_HWM;
  } else {
    //取下一個2最高冪,以防止過度增加hwm
    n--;
    n |= n >>> 1;
    n |= n >>> 2;
    n |= n >>> 4;
    n |= n >>> 8;
    n |= n >>> 16;
    n++;
  }
  return n;
}

流動模式

怎么理解Nodejs中的流

所有可讀流開始的時候都是暫停模式,可以通過以下方法可以切換至流動模式:

  • 添加 " data " 事件句柄;

  • 調(diào)用 “ resume ”方法;

  • 使用 " pipe " 方法把數(shù)據(jù)發(fā)送到可寫流

流動模式下,緩沖池里面的數(shù)據(jù)會自動輸出到消費端進(jìn)行消費,同時,每次輸出數(shù)據(jù)后,會自動回調(diào) _read 方法,把數(shù)據(jù)源的數(shù)據(jù)放到緩沖池中,如果此時緩存池中不存在數(shù)據(jù)則會直接吧數(shù)據(jù)傳遞給 data 事件,不會經(jīng)過緩存池;直到流動模式切換至其他暫停模式,或者數(shù)據(jù)源的數(shù)據(jù)被讀取完了( push(null) );

可讀流可以通過以下方式切換回暫停模式:

  • 如果沒有管道目標(biāo),則調(diào)用 stream.pause() 。

  • 如果有管道目標(biāo),則移除所有管道目標(biāo)。調(diào)用 stream.unpipe() 可以移除多個管道目標(biāo)。

const { Readable } = require('stream')

let count = 1000
const myReadable = new Readable({
    highWaterMark: 300,
    read(size) {
        let chunk = null
        setTimeout(() => {
            if (count > 0) {
                let chunkLength = Math.min(count, size)
                chunk = '1'.repeat(chunkLength)
                count -= chunkLength
            }
            this.push(chunk)
        }, 500)
    }
})

myReadable.on('data', data => {
    console.log(data.toString())
})

Writable

相對可讀流來說,可寫流要簡單一些。

怎么理解Nodejs中的流

當(dāng)生產(chǎn)者調(diào)用 write(chunk) 時,內(nèi)部會根據(jù)一些狀態(tài)(corked,writing等)選擇是否緩存到緩沖隊列中或者調(diào)用 _write,每次寫完數(shù)據(jù)后,會嘗試清空緩存隊列中的數(shù)據(jù)。如果緩沖隊列中的數(shù)據(jù)大小超出了浮標(biāo)值(highWaterMark),消費者調(diào)用 write(chunk) 后會返回 false,這時候生產(chǎn)者應(yīng)該停止繼續(xù)寫入。

那么什么時候可以繼續(xù)寫入呢?當(dāng)緩沖中的數(shù)據(jù)都被成功 _write 之后,清空了緩沖隊列后會觸發(fā) drain 事件,這時候生產(chǎn)者可以繼續(xù)寫入數(shù)據(jù)。

當(dāng)生產(chǎn)者需要結(jié)束寫入數(shù)據(jù)時,需要調(diào)用 stream.end 方法通知可寫流結(jié)束。

const { Writable, Duplex } = require('stream')
let fileContent = ''
const myWritable = new Writable({
    highWaterMark: 10,
    write(chunk, encoding, callback) {// 會作為_write方法
        setTimeout(()=>{
            fileContent += chunk
            callback()// 寫入結(jié)束后調(diào)用
        }, 500)
    }
})

myWritable.on('close', ()=>{
    console.log('close', fileContent)
})
myWritable.write('123123')// true
myWritable.write('123123')// false
myWritable.end()

注意,在緩存池中數(shù)據(jù)到達(dá)浮標(biāo)值后,此時緩存池中可能存在多個節(jié)點,在清空緩存池的過程中(循環(huán)調(diào)用_read),并不會向可讀流一樣盡量一次消費長度為浮標(biāo)值的數(shù)據(jù),而是每次消費一個緩沖區(qū)節(jié)點,即使這個緩沖區(qū)長度于浮標(biāo)值不一致也是如此

const { Writable } = require('stream')


let fileContent = ''
const myWritable = new Writable({
    highWaterMark: 10,
    write(chunk, encoding, callback) {
        setTimeout(()=>{
            fileContent += chunk
            console.log('消費', chunk.toString())
            callback()// 寫入結(jié)束后調(diào)用
        }, 100)
    }
})

myWritable.on('close', ()=>{
    console.log('close', fileContent)
})

let count = 0
function productionData(){
    let flag = true
    while (count <= 20 && flag){
        flag = myWritable.write(count.toString())
        count++
    }
    if(count > 20){
        myWritable.end()
    }
}
productionData()
myWritable.on('drain', productionData)

上述是一個浮標(biāo)值為 10 的可寫流,現(xiàn)在數(shù)據(jù)源是一個 0——20 到連續(xù)的數(shù)字字符串,productionData 用于寫入數(shù)據(jù)。

  • 首先第一次調(diào)用 myWritable.write("0") 時,因為緩存池不存在數(shù)據(jù),所以 "0" 不進(jìn)入緩存池,而是直接交給 _wirtemyWritable.write("0") 返回值為 true

  • 當(dāng)執(zhí)行 myWritable.write("1") 時,因為 _wirtecallback 還未調(diào)用,表明上一次數(shù)據(jù)還未寫入完,位置保證數(shù)據(jù)寫入的有序性,只能創(chuàng)建一個緩沖區(qū)將 "1" 加入緩存池中。后面 2-9 都是如此

  • 當(dāng)執(zhí)行 myWritable.write("10") 時,此時緩沖區(qū)長度為 9(1-9),還未到達(dá)浮標(biāo)值, "10" 繼續(xù)作為一個緩沖區(qū)加入緩存池中,此時緩存池長度變?yōu)?11,所以 myWritable.write("1") 返回 false,這意味著緩沖區(qū)的數(shù)據(jù)已經(jīng)足夠,我們需要等待 drain 事件通知時再生產(chǎn)數(shù)據(jù)。

  • 100ms過后,_write("0", encoding, callback)callback 被調(diào)用,表明 "0" 已經(jīng)寫入完成。然后會檢查緩存池中是否存在數(shù)據(jù),如果存在則會先調(diào)用 _read 消費緩存池的頭節(jié)點("1"),然后繼續(xù)重復(fù)這個過程直到緩存池為空后觸發(fā) drain 事件,再次執(zhí)行 productionData

  • 調(diào)用 myWritable.write("11"),觸發(fā)第1步開始的過程,直到流結(jié)束。

Duplex

在理解了可讀流與可寫流后,雙工流就好理解了,雙工流事實上是繼承了可讀流然后實現(xiàn)了可寫流(源碼是這么寫的,但是應(yīng)該說是同時實現(xiàn)了可讀流和可寫流更加好)。

怎么理解Nodejs中的流

Duplex 流需要同時實現(xiàn)下面兩個方法

  • 實現(xiàn) _read() 方法,為可讀流生產(chǎn)數(shù)據(jù)

  • 實現(xiàn) _write() 方法,為可寫流消費數(shù)據(jù)

上面兩個方法如何實現(xiàn)在上面可寫流可讀流的部分已經(jīng)介紹過了,這里需要注意的是,雙工流是存在兩個獨立的緩存池分別提供給兩個流,他們的數(shù)據(jù)源也不一樣

以 NodeJs 的標(biāo)準(zhǔn)輸入輸出流為例:

  • 當(dāng)我們在控制臺輸入數(shù)據(jù)時會觸發(fā)其 data 事件,這證明他有可讀流的功能,每一次用戶鍵入回車相當(dāng)于調(diào)用可讀的 push 方法推送生產(chǎn)的數(shù)據(jù)。

  • 當(dāng)我們調(diào)用其 write 方法時也可以向控制臺輸出內(nèi)容,但是不會觸發(fā) data 事件,這說明他有可寫流的功能,而且有獨立的緩沖區(qū),_write 方法的實現(xiàn)內(nèi)容就是讓控制臺展示文字。

// 每當(dāng)用戶在控制臺輸入數(shù)據(jù)(_read),就會觸發(fā)data事件,這是可讀流的特性
process.stdin.on('data', data=>{
    process.stdin.write(data);
})

// 每隔一秒向標(biāo)準(zhǔn)輸入流生產(chǎn)數(shù)據(jù)(這是可寫流的特性,會直接輸出到控制臺上),不會觸發(fā)data
setInterval(()=>{
    process.stdin.write('不是用戶控制臺輸入的數(shù)據(jù)')
}, 1000)

Transform

怎么理解Nodejs中的流

可以將 Duplex 流視為具有可寫流的可讀流。兩者都是獨立的,每個都有獨立的內(nèi)部緩沖區(qū)。讀寫事件獨立發(fā)生。

                             Duplex Stream
                          ------------------|
                    Read  <-----               External Source
            You           ------------------|  
                    Write ----->               External Sink
                          ------------------|

Transform 流是雙工的,其中讀寫以因果關(guān)系進(jìn)行。雙工流的端點通過某種轉(zhuǎn)換鏈接。讀取要求發(fā)生寫入。

                                 Transform Stream
                           --------------|--------------
            You     Write  ---->                   ---->  Read  You
                           --------------|--------------

對于創(chuàng)建 Transform 流,最重要的是要實現(xiàn) _transform 方法而不是 _write 或者 _read。 _transform 中對可寫流寫入的數(shù)據(jù)做處理(消費)然后為可讀流生產(chǎn)數(shù)據(jù)。

轉(zhuǎn)換流還經(jīng)常會實現(xiàn)一個 `_flush` 方法,他會在流結(jié)束前被調(diào)用,一般用于對流的末尾追加一些東西,例如壓縮文件時的一些壓縮信息就是在這里加上的
const { write } = require('fs')
const { Transform, PassThrough } = require('stream')

const reurce = '1312123213124341234213423428354816273513461891468186499126412'

const transform = new Transform({
    highWaterMark: 10,
    transform(chunk ,encoding, callback){// 轉(zhuǎn)換數(shù)據(jù),調(diào)用push將轉(zhuǎn)換結(jié)果加入緩存池
        this.push(chunk.toString().replace('1', '@'))
        callback()
    },
    flush(callback){// end觸發(fā)前執(zhí)行
        this.push('<<<')
        callback()
    }
})


// write 不斷寫入數(shù)據(jù)
let count = 0
transform.write('>>>')
function productionData() {
    let flag = true
    while (count <= 20 && flag) {
        flag = transform.write(count.toString())
        count++
    }
    if (count > 20) {
        transform.end()
    }
}
productionData()
transform.on('drain', productionData)


let result = ''
transform.on('data', data=>{
    result += data.toString()
})
transform.on('end', ()=>{
    console.log(result)
    // >>>0@23456789@0@1@2@3@4@5@6@7@8@920<<<
})

Pipe

管道是將上一個程序的輸出作為下一個程序的輸入,這是管道在 Linux 中管道的作用。NodeJs 中的管道其實也類似,它管道用于連接兩個流,上游的流的輸出會作為下游的流的輸入。

怎么理解Nodejs中的流

管道 sourec.pipe(dest, options) 要求 sourec 是可讀的,dest是可寫的。其返回值是 dest。

對于處于管道中間的流既是下一個流的上游也是上一個流的下游,所以其需要時一個可讀可寫的雙工流,一般我們會使用轉(zhuǎn)換流來作為管道中間的流。

Stream.prototype.pipe = function(dest, options) {
  const source = this;

  function ondata(chunk) {
    if (dest.writable && dest.write(chunk) === false && source.pause) {
      source.pause();
    }
  }

  source.on('data', ondata);

  function ondrain() {
    if (source.readable && source.resume) {
      source.resume();
    }
  }

  dest.on('drain', ondrain);
  // ...后面的代碼省略
}

pipe 的實現(xiàn)非常清晰,當(dāng)上游的流發(fā)出 data 事件時會調(diào)用下游流的 write 方法寫入數(shù)據(jù),然后立即調(diào)用 source.pause() 使得上游變?yōu)闀和顟B(tài),這主要是為了防止背壓。

當(dāng)下游的流將數(shù)據(jù)消費完成后會調(diào)用 source.resume() 使上游再次變?yōu)榱鲃訝顟B(tài)。

我們實現(xiàn)一個將 data 文件中所有 1 替換為 @ 然后輸出到 result 文件到管道。

const { Transform } = require('stream')
const { createReadStream, createWriteStream } = require('fs')

// 一個位于管道中的轉(zhuǎn)換流
function createTransformStream(){
    return new Transform({
        transform(chunk, encoding, callback){
            this.push(chunk.toString().replace(/1/g, '@'))
            callback()
        }
    })
}
createReadStream('./data')
.pipe(createTransformStream())
.pipe(createWriteStream('./result'))

在管道中只存在兩個流時,其功能和轉(zhuǎn)換流有點類似,都是將一個可讀流與一個可寫流串聯(lián)起來,但是管道可以串聯(lián)多個流。

感謝各位的閱讀,以上就是“怎么理解Nodejs中的流”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對怎么理解Nodejs中的流這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI