您好,登錄后才能下訂單哦!
本篇內(nèi)容介紹了“Node.js高級編程之UDP可靠性源碼分析”的有關(guān)知識,在實(shí)際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
實(shí)驗(yàn)前,我們先介紹一下需要用到的工具(Mac 環(huán)境,其他環(huán)境請自行搜索相關(guān)工具):
Network Link Conditioner:模擬丟包場景,可以去蘋果開發(fā)者網(wǎng)站上下載
Wireshark:抓包分析工具
云主機(jī):因?yàn)閷?shí)現(xiàn)發(fā)現(xiàn) Network Link Conditioner 對本地回環(huán)地址不起作用,如果有更好的方法求大佬指出
然后我們準(zhǔn)備兩段代碼,一段作為 UDP Server,一段作為 UDP Client,Client 會向 Server 發(fā)送 26 個英文大寫字母,Server 會將他們存到文件:
// udp-server.js const udp = require('dgram') const server = udp.createSocket('udp4') const fs = require('fs') server.on('listening', function () { var address = server.address() var port = address.port console.log('Server is listening at port ' + port) }) server.on('message', function (msg, info) { console.log( `Data received from ${info.address}:${info.port}: ${msg.toString()}` ) fs.appendFileSync('./out', msg.toString()) }) server.on('error', function (error) { console.log('Error: ' + error) server.close() }) server.bind(7788) // udp-client.js const udp = require('dgram') const client = udp.createSocket('udp4') for (let i = 0; i < 26; i++) { const char = String.fromCharCode(0x41 + i) client.send(Buffer.from(char), 7788, '********', function (error) { if (error) { console.log(error) } }) }
接著我們按照下面步驟開始實(shí)驗(yàn):
通過 Network Link Conditioner 把丟包率設(shè)置為 50%:
設(shè)置好 Wireshark 的抓包參數(shù):
在云主機(jī)上啟動 Server,在本地啟動 Client。
接著,我們來看一下實(shí)驗(yàn)結(jié)果:
首先,我們可以看到服務(wù)端接收到的字母少了很多,只有 14 個:
服務(wù)端接收到的字母順序是亂序的,比如 U 跑到了 T 的前面:
為了進(jìn)行對比,我們可以換成 TCP 試試,代碼如下,結(jié)果就不貼了:
// tcp-server.js const net = require('net') const server = net.createServer() const fs = require('fs') server.on('connection', function (conn) { conn.on('data', (msg) => { console.log( `Data received from ${conn.address().address}:${ conn.address().port }: ${msg.toString()}` ) fs.appendFileSync('./out', msg.toString()) }) }) server.listen(8899, () => { console.log('server listening to %j', server.address().port) }) // tcp-client.js var net = require('net') var client = new net.Socket() client.connect(8899, '********', function () { for (let i = 0; i < 26; i++) { const char = String.fromCharCode(0x41 + i) client.write(char) } })
接下我們試試基于 UDP 來實(shí)現(xiàn)一個可靠的傳輸協(xié)議,主要解決上面的丟包和亂序問題。
首先,需要設(shè)計(jì)一下我們的協(xié)議格式。為了簡單起見,我們只在原來 UDP 的數(shù)據(jù)部分分別新增 4 個字節(jié)的 SEQ 和 ACK:
+-------------------------------+ | 64 個字節(jié)的 UDP 首部 | +-------------------------------+ | SEQ(4 個字節(jié)) | ACK(4 個字節(jié)) | +-------------------------------+ | Data | +-------------------------------+
其中 SEQ 表示當(dāng)前包的序號,ACK 表示回復(fù)序號。
接下來看看,我們?nèi)绾谓鉀Q前面的兩個問題。
接收方需要維護(hù)一個變量 expectedSeq
的變量表示期待接收到的包序號。為了簡單起見,我們制定如下規(guī)則:如果當(dāng)前接收到的包序號等于 expectedSeq
,則把包交給應(yīng)用層處理,并發(fā)送 ACK 給發(fā)送方;否則我們都直接丟棄。當(dāng)然更好的做法是維護(hù)一個接收窗口,這樣可以批量的提交數(shù)據(jù)給應(yīng)用層,也可以用來緩存大于 expectedSeq
的包。
假設(shè)現(xiàn)在發(fā)送方發(fā)送了 1 2 3 兩個包,但是到達(dá)接收方的順序是 3 2 1,按照我們的規(guī)則接收方會丟棄 3 和 2,接收 1。好家伙,順序倒是不亂了,但是包沒了。
所以還得把丟包問題也解決了才行。
發(fā)送方維護(hù)一個發(fā)送窗口用來存儲已發(fā)送但是還未被確認(rèn)的包:
+---+---+---+---+ | 1 | 2 | 3 | 4 | +---+---+---+---+
發(fā)送方每發(fā)送一個包的同時(shí)還需要將包放入發(fā)送窗口,并設(shè)置一個定時(shí)器用來重發(fā)這個包。當(dāng)發(fā)送方接收到來自接收方的 ACK 時(shí),需要取消掉對應(yīng)包的定時(shí)器,并將發(fā)送窗口中小于 ACK 的包都刪除。
+---+---+---+---+ | 1 | 2 | 3 | 4 | +---+---+---+---+ // ACK = 4,刪除 1 2 3,并取消掉他們的定時(shí)器 +---+ | 4 | +---+
完整代碼及使用 Demo 見文末,現(xiàn)在可以正常按順序輸出 26 個字母了,但是離“可靠”協(xié)議還差得遠(yuǎn)。比如第一次輸出完 26 個字母后,我們再次啟動客戶端時(shí)發(fā)現(xiàn)就沒有任何輸出了。原因在于此時(shí)接收端的 expectedSeq
已經(jīng)是 20 多了,但是新啟動的 client 發(fā)送的 SEQ 還是從 1 開始的,結(jié)果就是接收端一直丟棄接收到的包,發(fā)送端一直重試。
要解決這個問題,可以參考 TCP 在傳輸兩端建立“連接”的概念,在開始發(fā)送前通過“三次握手”建立連接,也就是確定起始 SEQ,初始化窗口等工作,結(jié)束前通過“四次揮手”斷開連接,即清理窗口定時(shí)器等工作。這個就留到以后再說吧。
// packet.js class Packet { constructor({seq, ack, data = ''}) { this.seq = seq // 序列號 this.ack = ack // 確認(rèn)號 this.data = data // 數(shù)據(jù) } // 將 Packet 轉(zhuǎn)換成 Buffer,以便通過網(wǎng)絡(luò)傳輸 toBuffer() { const seqBuffer = Buffer.alloc(4) seqBuffer.writeUInt32BE(this.seq) const ackBuffer = Buffer.alloc(4) ackBuffer.writeUInt32BE(this.ack) const dataBuffer = Buffer.from(this.data) return Buffer.concat([seqBuffer, ackBuffer, dataBuffer]) } // 從 Buffer 中解析出 Packet static fromBuffer(buffer) { const seq = buffer.readUInt32BE() const ack = buffer.readUInt32BE(4) const data = buffer.slice(8) return new Packet({seq, ack, data}) } } module.exports = Packet // reliableUDP.js const dgram = require('dgram') const Packet = require('./packet') class ReliableUDP { constructor() { this.socket = dgram.createSocket('udp4') this.socket.on('message', this.handleMessage.bind(this)) this.sendWindow = [] // 發(fā)送窗口,用于存放待確認(rèn)的數(shù)據(jù)包 this.receiveWindow = [] // 接收窗口,用于存放已接收的數(shù)據(jù)包 this.expectedSeq = 1 // 期望接收的數(shù)據(jù)包序列號 this.nextSeq = 1 // 下一個要發(fā)送的數(shù)據(jù)包序列號 this.timeout = 100 // 超時(shí)時(shí)間,單位為毫秒 this.timeoutIds = {} // 用于存放定時(shí)器 ID } listen(port, address, fn) { this.socket.bind(port, address, fn) } // 發(fā)送數(shù)據(jù)包 sendPacket(packet, address, port) { const buffer = packet.toBuffer() this.socket.send(buffer, port, address, (err) => { if (err) { console.error(err) } }) if (packet.ack) return if (!this.sendWindow.includes((p) => p.seq === packet.seq)) this.sendWindow.push(packet) // 設(shè)置超時(shí)定時(shí)器 const timeoutId = setTimeout(() => { this.handleTimeout(packet.seq, address, port) }, this.timeout) this.timeoutIds[packet.seq] = timeoutId } // 處理接收到的數(shù)據(jù)包 handleMessage(msg, rinfo) { const {address, port} = rinfo const packet = Packet.fromBuffer(msg) // 收到的是應(yīng)答的包 if (packet.ack) { const ackNum = packet.ack - 1 // 處理發(fā)送窗口中已經(jīng)確認(rèn)的數(shù)據(jù)包 while (this.sendWindow.length > 0 && this.sendWindow[0].seq <= ackNum) { this.sendWindow.shift() } // 清除超時(shí)定時(shí)器 if (this.timeoutIds[ackNum]) { clearTimeout(this.timeoutIds[ackNum]) delete this.timeoutIds[ackNum] } } else { // 如果是重復(fù)的數(shù)據(jù)包,則忽略 if (packet.seq < this.expectedSeq) { return } // 如果是期望接收的數(shù)據(jù)包 if (packet.seq === this.expectedSeq) { this.receiveWindow.push(packet) this.expectedSeq++ // 處理接收窗口中已經(jīng)確認(rèn)的數(shù)據(jù)包 while ( this.receiveWindow.length > 0 && this.receiveWindow[0].seq <= this.expectedSeq ) { const packet = this.receiveWindow.shift() this.onPacketReceived(packet.data) } const ackPacket = new Packet({ seq: this.nextSeq++, ack: this.expectedSeq, }) this.sendPacket(ackPacket, address, port) } else { // 如果是未來的數(shù)據(jù)包,暫不做處理,更好的做法是緩存起來 } } } // 應(yīng)用層調(diào)用該方法發(fā)送數(shù)據(jù) send(data, address, port) { const packet = new Packet({ seq: this.nextSeq, ack: null, data, }) this.sendPacket(packet, address, port) this.nextSeq++ } // 應(yīng)用層調(diào)用該方法注冊回調(diào)函數(shù),接收數(shù)據(jù) onReceive(callback) { this.onPacketReceived = callback } // 處理超時(shí) handleTimeout(seq, address, port) { // 重傳超時(shí)的數(shù)據(jù)包 const packet = this.sendWindow.find((p) => p.seq === seq) if (packet) { this.sendPacket(packet, address, port) } } } module.exports = ReliableUDP // server.js const ReliableUDP = require('./reliableUDP') const server = new ReliableUDP() server.listen(7788, 'localhost') server.onReceive((data) => { console.log(data.toString()) }) // client.js const ReliableUDP = require('./reliableUDP') const client = new ReliableUDP() for (let i = 0; i < 26; i++) { const char = String.fromCharCode(0x41 + i) client.send(char, 'localhost', 7788) }
“Node.js高級編程之UDP可靠性源碼分析”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。