溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Node.js(十五)——Net模塊之Stream

發(fā)布時間:2020-07-25 03:21:27 來源:網(wǎng)絡(luò) 閱讀:719 作者:mazongfei 欄目:web開發(fā)

在Unix中流是一個標準的概念,有標準的輸入、輸出和標準的錯誤

例如:

打印出所有的js文件交給grep 來過濾出包含http文件的內(nèi)容,稱之為Unix的管道

cat *.js | grep http

從上節(jié)得知Buffer是保存字節(jié)的數(shù)據(jù),而流是用來暫存和移動數(shù)據(jù)的,它倆通常是結(jié)合起來來使用,我們來拷貝文件,像讀取logo,是全部的讀取入到內(nèi)存中,然后再寫入到文件中,對于體積比較大的的文件就不夠用了假設(shè)我們的服務(wù)器需要不斷的去讀取文件,然后返回給客戶端,同時又有好多人都在請求這個文件,這樣每個請求都去讀入一次內(nèi)存,然后內(nèi)存很快就爆掉了,最好的方式是邊讀邊寫

這就需要借助流來完成,那NodeJs中哪些模塊涉及到了流;比如http、文件系統(tǒng)、壓縮模塊、tcp socket并且流是以buffer的形式存在,這樣更高效。

改造logo圖片讀取操作

var fs = require('fs')

var source = fs.readFileSync('logo.png')

fs.writeFileSync('stream_copy_logo.png',source)


但是這樣的操作會不會太簡單了,而沒有辦法精細的控制數(shù)據(jù)在流里面的傳輸,

以上這些都不用擔心,Stream是基于事件機制進行工作的,

流在各個方面的變化都可以被我們監(jiān)聽到

var fs = require('fs')

//聲明一個可讀流
var readStream = fs.createReadStream('logo_stream.js')

//Stream在傳輸?shù)臅r候會觸發(fā)data事件
readStream
	.on('data',function(chunk){
		console.log('data emits')
		console.log(Buffer.isBuffer(chunk))
		console.log(chunk.toString('utf8'))
	})
	//還有readable事件,可讀的
	.on('readable',function(){
		console.log('data readable')
	})
	.on('end',function(){
		console.log('data ends')
	})
	.on('close',function(){
		console.log('data close')
	})
	.on('error',function(e){
		console.log('data read error:'+e)
	})

運行結(jié)果如下:

Node.js(十五)——Net模塊之Stream


借助于Stream的事件機制,我們就能實現(xiàn)更多個性化的定制,

從而對流里面的流程進行更精細化的控制,改造上述代碼:

var fs = require('fs')

//聲明一個可讀流
var readStream = fs.createReadStream('logo_stream.js')

var n = 0

//Stream在傳輸?shù)臅r候會觸發(fā)data事件
readStream
	.on('data',function(chunk){
		n++
		console.log('data emits')
		console.log(Buffer.isBuffer(chunk))
		//console.log(chunk.toString('utf8'))
		//流暫停
		readStream.pause()
		//設(shè)置定時器,模擬異步處理
		console.log('data pause')
		setTimeout(function(){
			console.log('data pause end')
			//再重新啟動
			readStream.resume()
		},3000)
	})
	//還有readable事件,可讀的
	.on('readable',function(){
		console.log('data readable')
	})
	.on('end',function(){
		console.log(n)
		console.log('data ends')
	})
	.on('close',function(){
		console.log('data close')
	})
	.on('error',function(e){
		console.log('data read error:'+e)
	})

運行效果如下:

Node.js(十五)——Net模塊之Stream

換一個大一點的文件,3M左右的;打印結(jié)果如下:

Node.js(十五)——Net模塊之Stream

大概每次是64kb

用事件的方式來重構(gòu)復(fù)制圖片的操作

var fs = require('fs')

//放入一個大文件
var readStream = fs.createReadStream('1.pdf')
var writeStream = fs.createWriteStream('1_stream.pdf');

//必然觸發(fā)一個事件
readStream.on('data',function(chunk){
	//寫入目標
	if(writeStream.write(chunk)=== false){
		//判斷是否已經(jīng)寫入到目標,來解決爆倉
		console.log('still cached')
		readStream.pause()
	}
})

readStream.on('end',function(){
	writeStream.end()
})

//耗盡方法
writeStream.on('drain',function(){
	console.log('data drains')
	readStream.resume()
})
/*
這是個標準的文件的拷貝操作,但是會有問題;
如果讀的快,寫的慢;因為讀寫的速度并不是恒定的,這個時候數(shù)據(jù)流內(nèi)部的
緩存可能會被爆倉,那應(yīng)該怎么辦
*/

運行結(jié)果如下:

Node.js(十五)——Net模塊之Stream

邊讀邊寫效果.



Stream的種類

Readable:可讀流,用來提供數(shù)據(jù);外部來源的數(shù)據(jù)會被存儲到buffer里緩存起來,兩種模式:流動模式,暫停模式

Writable:可寫流,消費數(shù)據(jù);

Duplex:雙通流,可讀可寫

Transform:轉(zhuǎn)換流,雙通

各自事件,屬性都大同小異.場景如下:請求一張圖片的數(shù)據(jù),在瀏覽器中顯示出來

var http = require('http')
var fs = require('fs')

http
	.createServer(function(req,res){
		/*fs.readFile('logo.png',function(err,data){
			if(err){
				res.end('file not exist')
			}else{
				res.wirteHeader(200,{'Context-Type':'text/html'})
				res.end(data)
			}
		})*/
		//利用pipe就能夠更簡約的實現(xiàn)這套邏輯
		fs.createReadStream('logo.png').pipe(res)
	})
	.listen(8090)	

運行效果如下:

Node.js(十五)——Net模塊之Stream

不止是本地圖片的讀取,也可以是網(wǎng)絡(luò)環(huán)境下的

使用NodeJs中的request模塊

//使用之前先安裝,npm install request
var request = require('request')
request('url').pipe(res)
//這樣就可以實現(xiàn)邊下載邊顯示

運行結(jié)果同上。


在這里pipe方法會自動幫我們監(jiān)聽data和end事件,還可以自動控制后端壓力,通過對內(nèi)存空間的調(diào)度就能自動控制流量、避免掉目標被快速讀取,只有末端真正需要數(shù)據(jù)的時候,數(shù)據(jù)才會從源頭被取出來然后順著管道一路走下去

再次重構(gòu)讀取pdf文件

//只需要2行代碼
var fs = require('fs')
fs.createReadStream('1.pdf').pipe(fs.createWriteStream('1_pipe.pdf'))


pipe做通道連接時的例子:

var Readable = require('stream').Readable
var Writable = require('stream').Writable

//拿到兩個實例
var readStream = new Readable()
var writStream = new Writable()

//push一些數(shù)據(jù)
readStream.push('I ')
readStream.push('Love ')
readStream.push('NodeJs ')
//讀取完畢
readStream.push(null)

//重寫方法
writStream._write = function(chunk,encode,cb){
	console.log(chunk.toString())
	cb()
}

//最后,使用 pipe連接起來
readStream.pipe(writStream)

運行結(jié)果如下:

Node.js(十五)——Net模塊之Stream


來實現(xiàn)一個定制的可讀流,可寫流、轉(zhuǎn)換流

var stream = require('stream')
var util = require('util')

//定制的可寫流
function ReadStream(){
	//首先改變它的上下文,讓它可以調(diào)用Stream里面可讀類的方法
	stream.Readable.call(this)
}

//來讓我們聲明的可讀流繼承流里面可讀的原型
util.inherits(ReadStream,stream.Readable)

//然后就可以為可讀流添加原型鏈上的read方法
ReadStream.prototype._read = function(){
	//只干一件事,push數(shù)據(jù)
	this.push('I ')
	this.push('Love ')
	this.push('NodeJs ')
	this.push(null)
}

//聲明可寫流
function WriteStream(){
	stream.Writable.call(this)
	//聲明cache
	this._cached = new Buffer('')
}
util.inherits(WriteStream,stream.Writable)

WriteStream.prototype._write = function(chunk,encode,cb){
	console.log(chunk.toString())
	cb()
}

//聲明轉(zhuǎn)換流
function TransformStream(){
	stream.Transform.call(this)
}
util.inherits(TransformStream,stream.Transform)

TransformStream.prototype._transform = function(chunk,encode,cb){
	this.push(chunk)
	cb()
}

//flush
TransformStream.prototype._flush = function(cb){
	this.push('Oh Year!')
	cb()
}

//生成實例
var rs = new ReadStream()
var ws = new WriteStream()
var ts = new TransformStream()

//讀到的數(shù)據(jù)pipe給轉(zhuǎn)換流
rs.pipe(ts).pipe(ws)

運行結(jié)果如下:

Node.js(十五)——Net模塊之Stream

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI