溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Node.js高級編程之UDP可靠性源碼分析

發(fā)布時(shí)間:2023-03-27 15:38:32 來源:億速云 閱讀:227 作者:iii 欄目:開發(fā)技術(shù)

本篇內(nèi)容介紹了“Node.js高級編程之UDP可靠性源碼分析”的有關(guān)知識,在實(shí)際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

不可靠的 UDP

實(shí)驗(yàn)前,我們先介紹一下需要用到的工具(Mac 環(huán)境,其他環(huán)境請自行搜索相關(guān)工具):

  • Network Link Conditioner:模擬丟包場景,可以去蘋果開發(fā)者網(wǎng)站上下載

  • Wireshark:抓包分析工具

  • 云主機(jī):因?yàn)閷?shí)現(xiàn)發(fā)現(xiàn) Network Link Conditioner 對本地回環(huán)地址不起作用,如果有更好的方法求大佬指出

然后我們準(zhǔn)備兩段代碼,一段作為 UDP Server,一段作為 UDP Client,Client 會向 Server 發(fā)送 26 個英文大寫字母,Server 會將他們存到文件:

// udp-server.js
const udp = require('dgram')
const server = udp.createSocket('udp4')
const fs = require('fs')
server.on('listening', function () {
  var address = server.address()
  var port = address.port
  console.log('Server is listening at port ' + port)
})
server.on('message', function (msg, info) {
  console.log(
    `Data received from ${info.address}:${info.port}: ${msg.toString()}`
  )
  fs.appendFileSync('./out', msg.toString())
})
server.on('error', function (error) {
  console.log('Error: ' + error)
  server.close()
})
server.bind(7788)
// udp-client.js
const udp = require('dgram')
const client = udp.createSocket('udp4')
for (let i = 0; i < 26; i++) {
  const char = String.fromCharCode(0x41 + i)
  client.send(Buffer.from(char), 7788, '********', function (error) {
    if (error) {
      console.log(error)
    }
  })
}

接著我們按照下面步驟開始實(shí)驗(yàn):

  • 通過 Network Link Conditioner 把丟包率設(shè)置為 50%:

Node.js高級編程之UDP可靠性源碼分析

  • 設(shè)置好 Wireshark 的抓包參數(shù):

Node.js高級編程之UDP可靠性源碼分析

  • 在云主機(jī)上啟動 Server,在本地啟動 Client。

接著,我們來看一下實(shí)驗(yàn)結(jié)果:

  • 首先,我們可以看到服務(wù)端接收到的字母少了很多,只有 14 個:

Node.js高級編程之UDP可靠性源碼分析

  • 服務(wù)端接收到的字母順序是亂序的,比如 U 跑到了 T 的前面:

Node.js高級編程之UDP可靠性源碼分析

為了進(jìn)行對比,我們可以換成 TCP 試試,代碼如下,結(jié)果就不貼了:

// tcp-server.js
const net = require('net')
const server = net.createServer()
const fs = require('fs')
server.on('connection', function (conn) {
  conn.on('data', (msg) => {
    console.log(
      `Data received from ${conn.address().address}:${
        conn.address().port
      }: ${msg.toString()}`
    )
    fs.appendFileSync('./out', msg.toString())
  })
})
server.listen(8899, () => {
  console.log('server listening to %j', server.address().port)
})
// tcp-client.js
var net = require('net')
var client = new net.Socket()
client.connect(8899, '********', function () {
  for (let i = 0; i < 26; i++) {
    const char = String.fromCharCode(0x41 + i)
    client.write(char)
  }
})

接下我們試試基于 UDP 來實(shí)現(xiàn)一個可靠的傳輸協(xié)議,主要解決上面的丟包和亂序問題。

基于 UDP 的簡單可靠傳輸協(xié)議

首先,需要設(shè)計(jì)一下我們的協(xié)議格式。為了簡單起見,我們只在原來 UDP 的數(shù)據(jù)部分分別新增 4 個字節(jié)的 SEQ 和 ACK:

+-------------------------------+
|      64 個字節(jié)的 UDP 首部       |
+-------------------------------+
|  SEQ(4 個字節(jié)) |  ACK(4 個字節(jié)) |
+-------------------------------+
|             Data              |
+-------------------------------+

其中 SEQ 表示當(dāng)前包的序號,ACK 表示回復(fù)序號。

接下來看看,我們?nèi)绾谓鉀Q前面的兩個問題。

亂序問題

接收方需要維護(hù)一個變量 expectedSeq 的變量表示期待接收到的包序號。為了簡單起見,我們制定如下規(guī)則:如果當(dāng)前接收到的包序號等于 expectedSeq,則把包交給應(yīng)用層處理,并發(fā)送 ACK 給發(fā)送方;否則我們都直接丟棄。當(dāng)然更好的做法是維護(hù)一個接收窗口,這樣可以批量的提交數(shù)據(jù)給應(yīng)用層,也可以用來緩存大于 expectedSeq 的包。

假設(shè)現(xiàn)在發(fā)送方發(fā)送了 1 2 3 兩個包,但是到達(dá)接收方的順序是 3 2 1,按照我們的規(guī)則接收方會丟棄 3 和 2,接收 1。好家伙,順序倒是不亂了,但是包沒了。

所以還得把丟包問題也解決了才行。

丟包問題

發(fā)送方維護(hù)一個發(fā)送窗口用來存儲已發(fā)送但是還未被確認(rèn)的包:

+---+---+---+---+
| 1 | 2 | 3 | 4 |
+---+---+---+---+

發(fā)送方每發(fā)送一個包的同時(shí)還需要將包放入發(fā)送窗口,并設(shè)置一個定時(shí)器用來重發(fā)這個包。當(dāng)發(fā)送方接收到來自接收方的 ACK 時(shí),需要取消掉對應(yīng)包的定時(shí)器,并將發(fā)送窗口中小于 ACK 的包都刪除。

+---+---+---+---+
| 1 | 2 | 3 | 4 |
+---+---+---+---+
// ACK = 4,刪除 1 2 3,并取消掉他們的定時(shí)器
+---+
| 4 |
+---+

完整代碼及使用 Demo 見文末,現(xiàn)在可以正常按順序輸出 26 個字母了,但是離“可靠”協(xié)議還差得遠(yuǎn)。比如第一次輸出完 26 個字母后,我們再次啟動客戶端時(shí)發(fā)現(xiàn)就沒有任何輸出了。原因在于此時(shí)接收端的 expectedSeq 已經(jīng)是 20 多了,但是新啟動的 client 發(fā)送的 SEQ 還是從 1 開始的,結(jié)果就是接收端一直丟棄接收到的包,發(fā)送端一直重試。

要解決這個問題,可以參考 TCP 在傳輸兩端建立“連接”的概念,在開始發(fā)送前通過“三次握手”建立連接,也就是確定起始 SEQ,初始化窗口等工作,結(jié)束前通過“四次揮手”斷開連接,即清理窗口定時(shí)器等工作。這個就留到以后再說吧。

代碼

// packet.js
class Packet {
  constructor({seq, ack, data = ''}) {
    this.seq = seq // 序列號
    this.ack = ack // 確認(rèn)號
    this.data = data // 數(shù)據(jù)
  }
  // 將 Packet 轉(zhuǎn)換成 Buffer,以便通過網(wǎng)絡(luò)傳輸
  toBuffer() {
    const seqBuffer = Buffer.alloc(4)
    seqBuffer.writeUInt32BE(this.seq)
    const ackBuffer = Buffer.alloc(4)
    ackBuffer.writeUInt32BE(this.ack)
    const dataBuffer = Buffer.from(this.data)
    return Buffer.concat([seqBuffer, ackBuffer, dataBuffer])
  }
  // 從 Buffer 中解析出 Packet
  static fromBuffer(buffer) {
    const seq = buffer.readUInt32BE()
    const ack = buffer.readUInt32BE(4)
    const data = buffer.slice(8)
    return new Packet({seq, ack, data})
  }
}
module.exports = Packet
// reliableUDP.js
const dgram = require('dgram')
const Packet = require('./packet')
class ReliableUDP {
  constructor() {
    this.socket = dgram.createSocket('udp4')
    this.socket.on('message', this.handleMessage.bind(this))
    this.sendWindow = [] // 發(fā)送窗口,用于存放待確認(rèn)的數(shù)據(jù)包
    this.receiveWindow = [] // 接收窗口,用于存放已接收的數(shù)據(jù)包
    this.expectedSeq = 1 // 期望接收的數(shù)據(jù)包序列號
    this.nextSeq = 1 // 下一個要發(fā)送的數(shù)據(jù)包序列號
    this.timeout = 100 // 超時(shí)時(shí)間,單位為毫秒
    this.timeoutIds = {} // 用于存放定時(shí)器 ID
  }
  listen(port, address, fn) {
    this.socket.bind(port, address, fn)
  }
  // 發(fā)送數(shù)據(jù)包
  sendPacket(packet, address, port) {
    const buffer = packet.toBuffer()
    this.socket.send(buffer, port, address, (err) => {
      if (err) {
        console.error(err)
      }
    })
    if (packet.ack) return
    if (!this.sendWindow.includes((p) => p.seq === packet.seq))
      this.sendWindow.push(packet)
    // 設(shè)置超時(shí)定時(shí)器
    const timeoutId = setTimeout(() => {
      this.handleTimeout(packet.seq, address, port)
    }, this.timeout)
    this.timeoutIds[packet.seq] = timeoutId
  }
  // 處理接收到的數(shù)據(jù)包
  handleMessage(msg, rinfo) {
    const {address, port} = rinfo
    const packet = Packet.fromBuffer(msg)
    // 收到的是應(yīng)答的包
    if (packet.ack) {
      const ackNum = packet.ack - 1
      // 處理發(fā)送窗口中已經(jīng)確認(rèn)的數(shù)據(jù)包
      while (this.sendWindow.length > 0 && this.sendWindow[0].seq <= ackNum) {
        this.sendWindow.shift()
      }
      // 清除超時(shí)定時(shí)器
      if (this.timeoutIds[ackNum]) {
        clearTimeout(this.timeoutIds[ackNum])
        delete this.timeoutIds[ackNum]
      }
    } else {
      // 如果是重復(fù)的數(shù)據(jù)包,則忽略
      if (packet.seq < this.expectedSeq) {
        return
      }
      // 如果是期望接收的數(shù)據(jù)包
      if (packet.seq === this.expectedSeq) {
        this.receiveWindow.push(packet)
        this.expectedSeq++
        // 處理接收窗口中已經(jīng)確認(rèn)的數(shù)據(jù)包
        while (
          this.receiveWindow.length > 0 &&
          this.receiveWindow[0].seq <= this.expectedSeq
        ) {
          const packet = this.receiveWindow.shift()
          this.onPacketReceived(packet.data)
        }
        const ackPacket = new Packet({
          seq: this.nextSeq++,
          ack: this.expectedSeq,
        })
        this.sendPacket(ackPacket, address, port)
      } else {
        // 如果是未來的數(shù)據(jù)包,暫不做處理,更好的做法是緩存起來
      }
    }
  }
  // 應(yīng)用層調(diào)用該方法發(fā)送數(shù)據(jù)
  send(data, address, port) {
    const packet = new Packet({
      seq: this.nextSeq,
      ack: null,
      data,
    })
    this.sendPacket(packet, address, port)
    this.nextSeq++
  }
  // 應(yīng)用層調(diào)用該方法注冊回調(diào)函數(shù),接收數(shù)據(jù)
  onReceive(callback) {
    this.onPacketReceived = callback
  }
  // 處理超時(shí)
  handleTimeout(seq, address, port) {
    // 重傳超時(shí)的數(shù)據(jù)包
    const packet = this.sendWindow.find((p) => p.seq === seq)
    if (packet) {
      this.sendPacket(packet, address, port)
    }
  }
}
module.exports = ReliableUDP
// server.js
const ReliableUDP = require('./reliableUDP')
const server = new ReliableUDP()
server.listen(7788, 'localhost')
server.onReceive((data) => {
  console.log(data.toString())
})
// client.js
const ReliableUDP = require('./reliableUDP')
const client = new ReliableUDP()
for (let i = 0; i < 26; i++) {
  const char = String.fromCharCode(0x41 + i)
  client.send(char, 'localhost', 7788)
}

“Node.js高級編程之UDP可靠性源碼分析”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI