溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Node.js中怎么實現(xiàn)Stream流

發(fā)布時間:2021-12-23 09:40:32 來源:億速云 閱讀:135 作者:iii 欄目:web開發(fā)

本篇內(nèi)容主要講解“Node.js中怎么實現(xiàn)Stream流”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學(xué)習(xí)“Node.js中怎么實現(xiàn)Stream流”吧!

Node.js中怎么實現(xiàn)Stream流

引入 Stream

假設(shè)我們有這么一個需求,我們需要復(fù)制一個文件中的內(nèi)容到另一個文件中,我們會寫出以下代碼

const fs = require('fs');
const path = require('path');

const copy = (source, target) => {
    fs.readFile(path.resolve(source), (err, data) => {
        if(err) {
            throw new Error(err.toString());
            return;
        }
        fs.writeFile(path.resolve(target), data, (err) => {
            if(!err) {
                console.log("復(fù)制成功!");
            }
        })
    })
}

上面的代碼很簡單,就是先讀取 source 文件里面的內(nèi)容,然后將內(nèi)容寫入到 target 文件中。它的特點是需要讀取完 source 里面的所有內(nèi)容,然后將內(nèi)容寫入到 target 中。

這樣做就有一個缺點,當(dāng)我們讀取大文件時,可能會發(fā)生內(nèi)存不夠用的情況,因為它會先將文件的所有內(nèi)容都讀取到內(nèi)存;另外還就是時間,一次性讀取一個大文件到內(nèi)存,是需要比較長的時間的,用戶可能會有卡頓的感覺。

另一種解決辦法就是邊讀邊寫,讀取部分文件內(nèi)容,然后將內(nèi)容寫入到新文件中,這樣在內(nèi)存中的數(shù)據(jù)只是部分內(nèi)容,不會占有太多的內(nèi)存,由于是邊讀編寫,用戶可以很快的得到響應(yīng),提高用戶體驗。

Node.js 給我們提供 Stream 的 API,它是專門用來處理大文件的。因為數(shù)據(jù)是一部分一部分的處理,就像是水流一樣,所以這個模塊的名稱就稱為 Stream。

const fs = require('fs');

function copy(source, target) {
    const rs = fs.createReadStream(source);
    const ws = fs.createWriteStream(target);

    rs.on('data', data => {
        ws.write(data);
    });

    rs.on('end', () => {
        ws.end();
    });
}

上面代碼的細(xì)節(jié)將在后文揭曉。

Stream 的分類

Stream 可以分為四類

  • Readable:可讀流,數(shù)據(jù)的提供者

  • Writeable:可寫流,數(shù)據(jù)的消費者

  • Duplex:可寫可讀流(雙工流)

  • Transform:是 Duplex 的特殊情況,轉(zhuǎn)換流,對輸入的數(shù)據(jù)進(jìn)行處理,然后輸出

可讀流與可寫流是基礎(chǔ),常見的可讀流與可寫流如下

可讀流可寫流
HTTP RequestHTTP Reponse
fs read streamsfs write streams
process.stdinprocess.stdout
TCP socketsTCP sockets
zlib streamszlib streams
crypto streamscrypto streams

Stream 是 EventEmitter 的實例,有自定義的事件。

Readable Stream

可讀流有兩個模式,暫停模式與流動模式。當(dāng)我們創(chuàng)建一個流時,如果我們監(jiān)聽了 readable 事件,它就會來到暫停模式,在暫停模式下,它會不斷的讀取數(shù)據(jù)到緩沖區(qū),當(dāng)讀取到的數(shù)據(jù)超過預(yù)設(shè)的大小時,它由屬性 highWaterMark 指定(默認(rèn)為 64kB),便會觸發(fā) readable 事件,readable 事件的觸發(fā)有兩種情況:

  • 緩存區(qū)中的數(shù)據(jù)達(dá)到 highWaterMark 預(yù)設(shè)的大小

  • 數(shù)據(jù)源的數(shù)據(jù)已經(jīng)被讀取完畢

const fs = require('fs');

const rs = fs.createReadStream('a.txt', {
    highWaterMark: 1 // 緩存區(qū)最多存儲 1 字節(jié)
});

rs.on('readable', () => {
    let data;
    while(data=rs.read()) {
        console.log(data.toString());
    }
})

上面的程序設(shè)置 highWaterMark 為 1,即每次讀取到一個字節(jié)便會觸發(fā) readable 命令,每次當(dāng)觸發(fā) readable 命令時,我們調(diào)用可讀流的 read([size]) 方法從緩沖區(qū)中讀取數(shù)據(jù)(讀取到的數(shù)據(jù)為 Buffer),然后打印到控制臺。

當(dāng)我們?yōu)榭勺x流綁定 data 事件時,可讀流便會切換到流動狀態(tài),當(dāng)位于流動狀態(tài)時,可讀流會自動的從文件中讀取內(nèi)容到緩沖區(qū),當(dāng)緩沖區(qū)中的內(nèi)容大于設(shè)定的 highWaterMark 的大小時,便會觸發(fā) data 事件,將緩沖區(qū)中的數(shù)據(jù)傳遞給 data 事件綁定的函數(shù)。以上過程會自動不斷進(jìn)行。當(dāng)文件中的所有內(nèi)容都被讀取完成時,那么就會觸發(fā) end 事件。

const fs = require('fs');

const rs = fs.createReadStream('a.txt', {
    highWaterMark: 2
});

rs.on('data', data => {
    console.log(data.toString());
});

rs.on('end', () => {
    console.log("文件讀取完畢!");
});

暫停模式像是手動步槍,而流動模式則像是自動步槍。暫停模式與流動模式也可以相互切換,通過 pause() 可以從流動狀態(tài)切換到暫停狀態(tài),通過 resume() 則可以從暫停模式切換到流動模式。

可讀流的一個經(jīng)典實例就是 http 中的請求對象 req,下面的程序展示了通過監(jiān)聽 reqdata 事件來讀取 HTTP 請求體中的內(nèi)容

const http = require('http');

const app = http.createServer();

app.on('request', (req, res) => {
    let datas = [];
    req.on('data', data => {
        datas.push(data);
    });

    req.on('end', () => {
        req.body = Buffer.concat(datas);
        // 當(dāng)讀取完 body 中的內(nèi)容之后,將內(nèi)容返回給客戶端
        res.end(req.body);
    });
})

app.listen(3000, () => {
    console.log("服務(wù)啟動在 3000 端口... ...");
})

Node.js中怎么實現(xiàn)Stream流

Writable Stream

可寫流與可讀流相似,當(dāng)我們向可寫流寫入數(shù)據(jù)時(通過可寫流的 write() 方法寫數(shù)據(jù)),會直接將數(shù)據(jù)寫入到文件中,如果寫入的數(shù)據(jù)比較慢的話,那就就會將數(shù)據(jù)寫入到緩沖區(qū),當(dāng)緩沖區(qū)中的內(nèi)容達(dá)到 highWaterMark 設(shè)定的大小時,write 方法就會返回一個 false,表明不能接受更多的數(shù)據(jù)了。

當(dāng)緩沖區(qū)中的數(shù)據(jù)全部被消費完了(寫入了文件中或者被別的流消費了),那么就會觸發(fā) drain 事件。

const fs = require('fs');

const ws = fs.createWriteStream('b.txt', {
    highWaterMark: 16 * 1024
});

function writeMillionTimes(writer, data, encoding, callback) {
    let i = 10000;
    
    write();

    function write() {
        // 表示是否可以向可寫流中寫入數(shù)據(jù)
        let ok = true;
        while(i-- > 0 && ok) {
            // 當(dāng) writer.write() 方法返回  false 表示不可寫入數(shù)據(jù)
            ok = writer.write(data, encoding, i === 0 ? callback : null);
        }

        if(i > 0) {
            // 說明 ok 為 false,即不能向緩沖區(qū)中寫入內(nèi)容了
            console.log("drain", i);
            // 監(jiān)聽 drain 事件,當(dāng)隊列消費完畢時繼續(xù)調(diào)用 write() 方法寫入
            writer.once('drain', write);
        }
    }
}

writeMillionTimes(ws, 'simple', 'utf-8', () => {
    console.log("end");
})

輸出為

drain 7268
drain 4536
drain 1804
end

說明有三次緩沖區(qū)中的內(nèi)容達(dá)到了 16KB,可以驗算上面的數(shù)字之間的差值,在乘以 6(simple 的字節(jié)數(shù)),大小大約為 16 * 1024 左右,如

(7268?4536)?6=1639216384=16?1024(7268 - 4536) * 6 = 16392 \approx 16384 = 16 * 1024

我們還可以調(diào)用可寫流的 end() 方法,表示將緩存中的內(nèi)容清空寫入文件,并關(guān)閉文件,此時會觸發(fā) close 事件

const fs = require('fs');

const ws = fs.createWriteStream('b.txt');

ws.write('Hello');
ws.write('World');
ws.end('!');

ws.on('close', () => {
    console.log("close"); // close
})

當(dāng)調(diào)用 end() 方法之后就不能調(diào)用 write() 方法了,否則會報錯

const fs = require('fs');

const ws = fs.createWriteStream('b.txt');

ws.write('Hello');
ws.write('World');
ws.end('!');

ws.write('write again'); // Error [ERR_STREAM_WRITE_AFTER_END]: write after end

當(dāng)調(diào)用 end() 方法之后,并且數(shù)據(jù)緩沖區(qū)中的數(shù)據(jù)已經(jīng)寫入之后會觸發(fā)可寫流的 finish 事件

const fs = require('fs');

const ws = fs.createWriteStream('b.txt');

ws.write('Hello');
ws.write('World');
ws.end('!');

ws.on('close', () => {
    console.log("close");
});

ws.on('finish', () => {
    console.log("finish");
});

打印結(jié)果是

finish
close

說明 finish 事件會在 close 事件之前被觸發(fā)。

可寫流的經(jīng)典例子就是 http 模塊的響應(yīng)對象 res,下面的程序演示了當(dāng)請求到來時,我們讀取一個 html 頁面返回給客戶端

const http = require('http');
const fs = require('fs');

const app = http.createServer();

app.on('request', (req, res) => {
    const rs = fs.createReadStream('index.html');
    
    rs.on('data', data => {
        res.write(data);
    })

    rs.on('end', () => {
        res.end()
    });
});

app.listen(3000, () => {
    console.log("服務(wù)啟動在 3000 端口 ... ...");
})

Duplex Stream 與 Transform Stream

Duplex,即雙工的意思,它既可以接收數(shù)據(jù),也可以輸出數(shù)據(jù),它的輸入和輸出之間可以沒有任何的關(guān)系,就像是一個部件內(nèi)部有兩個獨立的系統(tǒng)。Duplex 繼承了可讀流(Readable),并且擁有可寫流(Writable)的所有方法。

Transform Stream 繼承了 Duplex Stream,它同樣具有可讀流與可寫流的能力,并且它的輸出與輸入之間是有關(guān)系的,中間做了一次轉(zhuǎn)換。常見的轉(zhuǎn)換流有 zlib,crypto。

出于文章結(jié)構(gòu)的考慮,在這里不詳細(xì)講解這兩個流,在后文中會實現(xiàn)這兩個流,以加深對這兩個流的理解。

pipe

我們可以混合使用可讀流與可寫流來進(jìn)行文件的復(fù)制

const fs = require('fs');

function copy(source, target) {
    const rs = fs.createReadStream(source);
    const ws = fs.createWriteStream(target);

    rs.on('data', data => {
        ws.write(data);
    });

    rs.on('end', () => {
        ws.end();
    });
}

copy('a.txt', 'b.txt');

但是上面的寫法卻不被建議使用,因為沒有考慮到可讀流與可寫流速度之間的差異,如果可讀流輸出數(shù)據(jù)的速度大于可寫流寫入數(shù)據(jù)的速度,這個時候就會有數(shù)據(jù)一直堆壓在緩存區(qū),導(dǎo)致占用過高的內(nèi)存,專業(yè)術(shù)語叫做積壓。

我們需要改善上面的程序,具體做法就是當(dāng) write() 方法返回  false 時,我們切換可讀流的模式為暫停模式,當(dāng)可寫流觸發(fā)了 drain 事件時,我們便將可讀流的狀態(tài)切換為流動模式

const fs = require('fs');

function copy(source, target) {
    const rs = fs.createReadStream(source);
    const ws = fs.createWriteStream(target);

    rs.on('data', data => {
        if (!ws.write(data)) {
            rs.pause();
        }
    });

    rs.on('end', () => {
        ws.end();
    });

    ws.on('drain', () => {
        rs.resume();
    })
}

那是不是每次我們使用流都需要寫這么多的代碼,當(dāng)然不是。官方為可讀流提供了一個 pipe(ws) 方法,pipe 方法接收一個可寫流,它的作用就是將可讀流中數(shù)據(jù)寫入到可寫流中去,并且它內(nèi)部有做速度差異的處理。所以上面的寫法可以改為下面的版本

const fs = require('fs');

function copy(source, target) {
    const rs = fs.createReadStream(source);
    const ws = fs.createWriteStream(target);

    rs.pipe(ws);
}

當(dāng)我們調(diào)用 pipe 方法時,會觸發(fā)可寫流的 pipe 事件。pipe 的實現(xiàn)參考如下

Readable.prototype.pipe = function(ws) {
    this.on('data', data => {
        if (!ws.write(data)) {
            this.pause();
        }
    });

    ws.on('drain', () => {
        this.resume();
    });

    // 觸發(fā) pipe 事件
    ws.emit('pipe', this);

    // 返回可寫流,以支持鏈?zhǔn)秸{(diào)用
    return ws;
}

這里給出官網(wǎng)畫的一個有關(guān) pipe 的流程圖

                                                     +===================+
                         x-->  Piping functions   +-->   src.pipe(dest)  |
                         x     are set up during     |===================|
                         x     the .pipe method.     |  Event callbacks  |
  +===============+      x                           |-------------------|
  |   Your Data   |      x     They exist outside    | .on('close', cb)  |
  +=======+=======+      x     the data flow, but    | .on('data', cb)   |
          |              x     importantly attach    | .on('drain', cb)  |
          |              x     events, and their     | .on('unpipe', cb) |
+---------v---------+    x     respective callbacks. | .on('error', cb)  |
|  Readable Stream  +----+                           | .on('finish', cb) |
+-^-------^-------^-+    |                           | .on('end', cb)    |
  ^       |       ^      |                           +-------------------+
  |       |       |      |
  |       ^       |      |
  ^       ^       ^      |    +-------------------+         +=================+
  ^       |       ^      +---->  Writable Stream  +--------->  .write(chunk)  |
  |       |       |           +-------------------+         +=======+=========+
  |       |       |                                                 |
  |       ^       |                              +------------------v---------+
  ^       |       +-> if (!chunk)                |    Is this chunk too big?  |
  ^       |       |     emit .end();             |    Is the queue busy?      |
  |       |       +-> else                       +-------+----------------+---+
  |       ^       |     emit .write();                   |                |
  |       ^       ^                                   +--v---+        +---v---+
  |       |       ^-----------------------------------<  No  |        |  Yes  |
  ^       |                                           +------+        +---v---+
  ^       |                                                               |
  |       ^               emit .pause();          +=================+     |
  |       ^---------------^-----------------------+  return false;  <-----+---+
  |                                               +=================+         |
  |                                                                           |
  ^            when queue is empty     +============+                         |
  ^------------^-----------------------<  Buffering |                         |
               |                       |============|                         |
               +> emit .drain();       |  ^Buffer^  |                         |
               +> emit .resume();      +------------+                         |
                                       |  ^Buffer^  |                         |
                                       +------------+   add chunk to queue    |
                                       |            <---^---------------------<
                                       +============+

實現(xiàn)流

在本節(jié)中我們來實現(xiàn)具體的流,通過實現(xiàn)流可以進(jìn)一步加深對 Stream 內(nèi)部工作細(xì)節(jié)的理解。

實現(xiàn)可讀流

上面我們都是通過 fs.createReadableStream() 方法來得到一個可讀流的,在這里我們自己實現(xiàn)一個可讀流。實現(xiàn)可讀流只需要繼承 Readable 類,然后實現(xiàn) _read() 方法即可

const { Readable } = require('stream');

class IeteratorReadableStream extends Readable {
    constructor(iterator) {
        super();
        this.iterator = iterator;
    }

    _read() {
        let data = this.iterator.next();
        // console.log(data);
        if(data.done) {
            this.push(null);
        } else {
            // 必須 push 字符串或者 Buffer
            this.push(data.value+'');
        }
    }
}

module.exports = IeteratorReadableStream;

上述我們實現(xiàn)了一個可讀流,可讀流接收一個迭代器作為參數(shù),這個迭代器作為這個可讀流的數(shù)據(jù)源。可讀流會自動的調(diào)用 _read 獲取數(shù)據(jù),在 _read 方法中我們從迭代器中獲取數(shù)據(jù),并且調(diào)用了 push 方法,該方法的作用就是將數(shù)據(jù)放入到緩存區(qū)中,只能向其中 push 字符串或者 Buffer,當(dāng)我們向其中 push null 時就表示數(shù)據(jù)已經(jīng)被全部讀取完畢。

所以可讀流的執(zhí)行邏輯為,每次調(diào)用 _read 方法從數(shù)據(jù)源讀取數(shù)據(jù),并將數(shù)據(jù)存入緩存區(qū),然后觸發(fā) data 事件,將緩存區(qū)中的數(shù)據(jù)作為參數(shù)傳遞給 data 事件綁定的回調(diào)函數(shù),循環(huán)上述過程直到向緩存區(qū) push null 時,就表示數(shù)據(jù)源中的數(shù)據(jù)已經(jīng)被讀取完畢,此時會觸發(fā) end 事件。

我們創(chuàng)建一個迭代器作為數(shù)據(jù)源傳入

const IeteratorReadableStream = require('./IteratorReadableStream');

function *getData() {
    for(let i = 0; i < 5; i++) {
        yield i;
    }
}

let rs = new IeteratorReadableStream(getData());

rs.on('data', data => {
    console.log(data.toString());
});

rs.on('end', () => {
    console.log("迭代結(jié)束");
});

輸出為

0
1
2
3
4
迭代結(jié)束

實現(xiàn)可寫流

實現(xiàn)可寫流的過程同實現(xiàn)可讀流的過程類似,首先需要繼承 Writable 類,接著實現(xiàn) _write 方法即可

const fs = require('fs');
const { Writable } = require('stream');

class FileWritableStream extends Writable {
    constructor(filepath) {
        super();
        this.filepath = filepath;
    }

    _write(chunk, encoding, callback) {
        fs.appendFile(this.filepath, chunk, {
            encoding
        }, callback)
    }
}

上面我們實現(xiàn)了一個可寫流,這個可寫流接收一個文件路徑作為參數(shù),它的作用就是向這個文件中追加數(shù)據(jù),每次當(dāng)我們調(diào)用可寫流的 write() 方法時,它會向緩沖區(qū)寫入數(shù)據(jù),當(dāng)達(dá)到閾值時,便會調(diào)用 _write() 方法將數(shù)據(jù)新增到文件中。

process.stdin.pipe(new FileWritableStream('c.txt'));

上面這行代碼的作用就是將從標(biāo)準(zhǔn)輸入的字符輸出到 c.txt 中。

實現(xiàn)雙工流

Duplex Stream 既可以作為可讀流,也可以作為可寫流,并且它的輸入與輸出之間可以沒有關(guān)系。Duplex Stream 繼承了 Readable,并且擁有 Writable 的所有,我們只要分別實現(xiàn) _read()_write() 方法即可

const { Duplex } = require('stream');

class CustomDuplexStream extends Duplex {
    constructor() {
        super();
        this.currentCharCode = 65;
    }

    _read() {
        if(this.currentCharCode <= 90) {
            this.push(String.fromCharCode(this.currentCharCode++))
        } else {
            this.push(null);
        }
    }

    _write(chunk, encoding, callback) {
        console.log(chunk.toString());
        callback();
    }
}

上面雙工流的可讀流部分就是將大寫的 26 個字母添加進(jìn)了緩存區(qū),而可寫流部分就是直接將數(shù)據(jù)輸出到控制臺??梢婋p工流可讀流與可寫流之間并沒有任何的關(guān)系

const dp = new CustomDuplexStream();

dp.write("1");
dp.write("2");
dp.end();

dp.pipe(process.stdout);

輸出為

1
2
ABCDEFGHIJKLMNOPQRSTUVWXYZ

實現(xiàn)轉(zhuǎn)換流

Tranform Stream 是 Duplex 的特例,它也是一個雙工流,不過它的輸入和輸出之間有關(guān)聯(lián),它的內(nèi)部通過 _transform() 方法將可寫流接收到的數(shù)據(jù)經(jīng)過轉(zhuǎn)換后傳入到可讀流中,所以我們要實現(xiàn)轉(zhuǎn)換流,只需要實現(xiàn) _transform() 方法即可

const { Transform } = require('stream');

class UpperTransformStream extends Transform {
    _transform(chunk, encoding, callback) {
        this.push(chunk.toString().toUpperCase());
        callback();
    }
}

上面我們實現(xiàn)了一個轉(zhuǎn)換流,它可以將輸入的小寫字符轉(zhuǎn)化為大寫字符然后輸出

const ts = new UpperTransformStream();
const rs = fs.createReadStream('a.txt');
rs.pipe(ts).pipe(process.stdout);

上面程序會讀取 a.txt 中的所有字符,將字符轉(zhuǎn)換為大寫然后輸出在控制臺。

轉(zhuǎn)換流在實際應(yīng)用中還是比較多的,這里介紹一個 Node.js 內(nèi)置的轉(zhuǎn)換流 zlib,它的作用對文件進(jìn)行解壓縮,將文件壓縮為壓縮文件,或者將壓縮文件解壓為正常文件,這不就是一個典型的轉(zhuǎn)換流嘛!

const zlib = require('zlib');
const fs = require('fs');

const args = process.argv.slice(2);

const source = fs.createReadStream(args[0]);
const target = fs.createWriteStream(args[1]);
const gzip = zlib.createGzip();

source.pipe(gzip).pipe(target);

我們可以通過

node gzip.js Graph.md Graph.md.gz

來運行上面的程序,它可以將 Graph.md 使用 gzip 壓縮為 Graph.md.gz。

Node.js中怎么實現(xiàn)Stream流

文件大小從 201KB 壓縮到了 51KB。

同樣的我們也可以通過 zlib.createGunzip() 來創(chuàng)建一個解壓縮的轉(zhuǎn)換流,具體細(xì)節(jié)同壓縮文件相同,不做介紹。

到此,相信大家對“Node.js中怎么實現(xiàn)Stream流”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI