溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

node中如何實(shí)現(xiàn)RPC通信

發(fā)布時(shí)間:2022-11-04 09:44:40 來(lái)源:億速云 閱讀:166 作者:iii 欄目:web開(kāi)發(fā)

本篇內(nèi)容主要講解“node中如何實(shí)現(xiàn)RPC通信”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“node中如何實(shí)現(xiàn)RPC通信”吧!

什么是RPC?

RPC:Remote Procedure Call(遠(yuǎn)程過(guò)程調(diào)用)是指遠(yuǎn)程過(guò)程調(diào)用,也就是說(shuō)兩臺(tái)服務(wù)器A,B,一個(gè)應(yīng)用部署在A服務(wù)器上,想要調(diào)用B服務(wù)器上應(yīng)用提供的函數(shù)/方法,由于不在一個(gè)內(nèi)存空間,不能直接調(diào)用,需要通過(guò)網(wǎng)絡(luò)來(lái)表達(dá)調(diào)用的語(yǔ)義和傳達(dá)調(diào)用的數(shù)據(jù)。

服務(wù)器和服務(wù)器之間的通信

RPC vs HTTP

相同點(diǎn)

  • 都是兩臺(tái)計(jì)算機(jī)之間的網(wǎng)絡(luò)通信。ajax是瀏覽器和服務(wù)器之間的通行,RPC是服務(wù)器與服務(wù)器之間的通行

  • 需要雙方約定一個(gè)數(shù)據(jù)格式

不同點(diǎn)

  • 尋址服務(wù)器不同

ajax 是使用 DNS作為尋址服務(wù)獲取域名所對(duì)應(yīng)的ip地址,瀏覽器拿到ip地址之后發(fā)送請(qǐng)求獲取數(shù)據(jù)。

RPC一般是在內(nèi)網(wǎng)里面相互請(qǐng)求,所以它一般不用DNS做尋址服務(wù)。因?yàn)樵趦?nèi)網(wǎng),所以可以使用規(guī)定的id或者一個(gè)虛擬vip,比如v5:8001,然后到尋址服務(wù)器獲取v5所對(duì)應(yīng)的ip地址。

  • 應(yīng)用層協(xié)議不同

ajax使用http協(xié)議,它是一個(gè)文本協(xié)議,我們交互數(shù)據(jù)的時(shí)候文件格式要么是html,要么是json對(duì)象,使用json的時(shí)候就是key-value的形式。

RPC采用二進(jìn)制協(xié)議。采用二進(jìn)制傳輸,它傳輸?shù)陌沁@樣子的[0001 0001 0111 0110 0010],里面都是二進(jìn)制,一般采用那幾位表示一個(gè)字段,比如前6位是一個(gè)字段,依次類推。

這樣就不需要http傳輸json對(duì)象里面的key,所以有更小的數(shù)據(jù)體積。

因?yàn)閭鬏數(shù)氖嵌M(jìn)制,更適合于計(jì)算機(jī)來(lái)理解,文本協(xié)議更適合人類理解,所以計(jì)算機(jī)去解讀各個(gè)字段的耗時(shí)是比文本協(xié)議少很多的。

RPC采用二進(jìn)制有更小的數(shù)據(jù)體積,及更快的解讀速度。

  • TCP通訊方式

  • 單工通信:只能客戶端給服務(wù)端發(fā)消息,或者只能服務(wù)端給客戶端發(fā)消息

  • 半雙工通信:在某個(gè)時(shí)間段內(nèi)只能客戶端給服務(wù)端發(fā)消息,過(guò)了這個(gè)時(shí)間段服務(wù)端可以給客戶端發(fā)消息。如果把時(shí)間分成很多時(shí)間片,在一個(gè)時(shí)間片內(nèi)就屬于單工通信

  • 全雙工通信:客戶端和服務(wù)端能相互通信

選擇這三種通信方式的哪一種主要考慮的因素是:實(shí)現(xiàn)難度和成本。全雙工通信是要比半雙工通信的成本要高的,在某些場(chǎng)景下還是可以考慮使用半雙工通信。

ajax是一種半雙工通信。http是文本協(xié)議,但是它底層是tcp協(xié)議,http文本在tcp這一層會(huì)經(jīng)歷從二進(jìn)制數(shù)據(jù)流到文本的轉(zhuǎn)換過(guò)程。

理解RPC只是在更深入地理解前端技術(shù)。

buffer編解碼二進(jìn)制數(shù)據(jù)包

創(chuàng)建buffer

buffer.from: 從已有的數(shù)據(jù)創(chuàng)建二進(jìn)制

const buffer1 = Buffer.from('geekbang')
const buffer2 = Buffer.from([0, 1, 2, 3, 4])


<Buffer 67 65 65 6b 62 61 6e 67>
<Buffer 00 01 02 03 04>

buffer.alloc: 創(chuàng)建一個(gè)空的二進(jìn)制

const buffer3 = Buffer.alloc(20)

<Buffer 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00>

往buffer里面寫(xiě)東西

  • buffer.write(string, offset): 寫(xiě)入字符串

  • buffer.writeInt8(value, offset): int8表示二進(jìn)制8位(8位表示一個(gè)字節(jié))所能表示的整數(shù),offset開(kāi)始寫(xiě)入之前要跳過(guò)的字節(jié)數(shù)。

  • buffer.writeInt16BE(value, offset): int16(兩個(gè)字節(jié)數(shù)),表示16個(gè)二進(jìn)制位所能表示的整數(shù),即32767。超過(guò)這個(gè)數(shù)程序會(huì)報(bào)錯(cuò)。

const buffer = Buffer.from([1, 2, 3, 4]) // <Buffer 01 02 03 04>

// 往第二個(gè)字節(jié)里面寫(xiě)入12
buffer.writeInt8(12, 1) // <Buffer 01 0c 03 04>

大端BE與小端LE:主要是對(duì)于2個(gè)以上字節(jié)的數(shù)據(jù)排列方式不同(writeInt8因?yàn)橹挥幸粋€(gè)字節(jié),所以沒(méi)有大端和小端),大端的話就是低位地址放高位,小端就是低位地址放低位。如下:

const buffer = Buffer.from([1, 2, 3, 4])

buffer.writeInt16BE(512, 2) // <Buffer 01 02 02 00>
buffer.writeInt16LE(512, 2) // <Buffer 01 02 00 02>

RPC傳輸?shù)亩M(jìn)制如何表示傳遞的字段

PC傳輸?shù)亩M(jìn)制是如何表示字段的呢?現(xiàn)在有個(gè)二進(jìn)制包[00, 00, 00, 00, 00, 00, 00],我們假定前三個(gè)字節(jié)表示一個(gè)字段值,后面兩個(gè)表示一個(gè)字段的值,最后兩個(gè)也表示一個(gè)字段的值。那寫(xiě)法如下:

writeInt16BE(value, 0)
writeInt16BE(value, 2)
writeInt16BE(value, 4)

發(fā)現(xiàn)像這樣寫(xiě),不僅要知道寫(xiě)入的值,還要知道值的數(shù)據(jù)類型,這樣就很麻煩。不如json格式那么方便。針對(duì)這種情況業(yè)界也有解決方案。npm有個(gè)庫(kù)protocol-buffers,把我們寫(xiě)的參數(shù)轉(zhuǎn)化為buffer

// test.proto 定義的協(xié)議文件
message Column {
  required float num  = 1;
  required string payload = 2;
}
// index.js
const fs = require('fs')
var protobuf = require('protocol-buffers')
var messages = protobuf(fs.readFileSync('test.proto'))

var buf = messages.Column.encode({
	num: 42,
	payload: 'hello world'
})
console.log(buf)
// <Buffer 0d 00 00 28 42 12 0b 68 65 6c 6c 6f 20 77 6f 72 6c 64>

var obj = messages.Column.decode(buf)
console.log(obj)
// { num: 42, payload: 'hello world' }

net建立RPC通道

半雙工通信

服務(wù)端代碼:

const net = require('net')

const LESSON_DATA = {
  136797: '01 | 課程介紹',
  136798: '02 | 內(nèi)容綜述',
  136799: '03 | Node.js是什么?',
  136800: '04 | Node.js可以用來(lái)做什么?',
  136801: '05 | 課程實(shí)戰(zhàn)項(xiàng)目介紹',
  136803: '06 | 什么是技術(shù)預(yù)研?',
  136804: '07 | Node.js開(kāi)發(fā)環(huán)境安裝',
  136806: '08 | 第一個(gè)Node.js程序:石頭剪刀布游戲',
  136807: '09 | 模塊:CommonJS規(guī)范',
  136808: '10 | 模塊:使用模塊規(guī)范改造石頭剪刀布游戲',
  136809: '11 | 模塊:npm',
  141994: '12 | 模塊:Node.js內(nèi)置模塊',
  143517: '13 | 異步:非阻塞I/O',
  143557: '14 | 異步:異步編程之callback',
  143564: '15 | 異步:事件循環(huán)',
  143644: '16 | 異步:異步編程之Promise',
  146470: '17 | 異步:異步編程之a(chǎn)sync/await',
  146569: '18 | HTTP:什么是HTTP服務(wù)器?',
  146582: '19 | HTTP:簡(jiǎn)單實(shí)現(xiàn)一個(gè)HTTP服務(wù)器'
}

const server = net.createServer(socket => {
  // 監(jiān)聽(tīng)客戶端發(fā)送的消息
  socket.on('data', buffer => {
    const lessonId = buffer.readInt32BE()
    setTimeout(() => {
      // 往客戶端發(fā)送消息
      socket.write(LESSON_DATA[lessonId])
    }, 1000)
  })
})

server.listen(4000)

客戶端代碼:

const net = require('net')

const socket = new net.Socket({})

const LESSON_IDS = [
  '136797',
  '136798',
  '136799',
  '136800',
  '136801',
  '136803',
  '136804',
  '136806',
  '136807',
  '136808',
  '136809',
  '141994',
  '143517',
  '143557',
  '143564',
  '143644',
  '146470',
  '146569',
  '146582'
]

socket.connect({
  host: '127.0.0.1',
  port: 4000
})

let buffer = Buffer.alloc(4)
buffer.writeInt32BE(LESSON_IDS[Math.floor(Math.random() * LESSON_IDS.length)])

// 往服務(wù)端發(fā)送消息
socket.write(buffer)

// 監(jiān)聽(tīng)從服務(wù)端傳回的消息
socket.on('data', buffer => {
  console.log(buffer.toString())

  // 獲取到數(shù)據(jù)之后再次發(fā)送消息
  buffer = Buffer.alloc(4)
  buffer.writeInt32BE(LESSON_IDS[Math.floor(Math.random() * LESSON_IDS.length)])

  socket.write(buffer)
})

以上半雙工通信步驟如下:

  • 客戶端發(fā)送消息 socket.write(buffer)

  • 服務(wù)端接受消息后往客戶端發(fā)送消息 socket.write(buffer)

  • 客戶端接受消息后再次發(fā)送消息

這樣在一個(gè)時(shí)間端之內(nèi),只有一個(gè)端往另一個(gè)端發(fā)送消息,這樣就實(shí)現(xiàn)了半雙工通信。那如何實(shí)現(xiàn)全雙工通信呢,也就是在客戶端往服務(wù)端發(fā)送消息的同時(shí),服務(wù)端還沒(méi)有消息返回給客戶端之前,客戶端又發(fā)送了一個(gè)消息給服務(wù)端。

全雙工通信

先來(lái)看一個(gè)場(chǎng)景:

node中如何實(shí)現(xiàn)RPC通信

客戶端發(fā)送了一個(gè)id1的請(qǐng)求,但是服務(wù)端還來(lái)不及返回,接著客戶端又發(fā)送了一個(gè)id2的請(qǐng)求。

等了一個(gè)之后,服務(wù)端先把id2的結(jié)果返回了,然后再把id1的結(jié)果返回。

那如何結(jié)果匹配到對(duì)應(yīng)的請(qǐng)求上呢?

如果按照時(shí)間順序,那么id1的請(qǐng)求對(duì)應(yīng)了id2的結(jié)果,因?yàn)閕d2是先返回的;id2的請(qǐng)求對(duì)應(yīng)了id1的結(jié)果,這樣就導(dǎo)致請(qǐng)求包和返回包錯(cuò)位的情況。

怎么辦呢?

我們可以給請(qǐng)求包和返回包都帶上序號(hào),這樣就能對(duì)應(yīng)上。

錯(cuò)位處理

客戶端代碼:

socket.on('data', buffer => {
  // 包序號(hào)
  const seqBuffer = buffer.slice(0, 2)
  // 服務(wù)端返回的內(nèi)容
  const titleBuffer = buffer.slice(2)
    
  console.log(seqBuffer.readInt16BE(), titleBuffer.toString())
})

// 包序號(hào)
let seq = 0
function encode(index) {
  // 請(qǐng)求包的長(zhǎng)度現(xiàn)在是6 = 2(包序號(hào)) + 4(課程id)
  buffer = Buffer.alloc(6)
  buffer.writeInt16BE(seq)
  buffer.writeInt32BE(LESSON_IDS[index], 2)

  seq++
  return buffer
}

// 每50ms發(fā)送一次請(qǐng)求
setInterval(() => {
  id = Math.floor(Math.random() * LESSON_IDS.length)
  socket.write(encode(id))
}, 50)

服務(wù)端代碼:

const server = net.createServer(socket => {
  socket.on('data', buffer => {
    // 把包序號(hào)取出
    const seqBuffer = buffer.slice(0, 2)
    // 從第2個(gè)字節(jié)開(kāi)始讀取
    const lessonId = buffer.readInt32BE(2)
    setTimeout(() => {
      const buffer = Buffer.concat([
        seqBuffer,
        Buffer.from(LESSON_DATA[lessonId])
      ])
      socket.write(buffer)
      // 這里返回時(shí)間采用隨機(jī)的,這樣就不會(huì)按順序返回,就可以測(cè)試錯(cuò)位的情況
    }, 10 + Math.random() * 1000)
  })
})

  • 客戶端把包序號(hào)和對(duì)應(yīng)的id給服務(wù)端

  • 服務(wù)端取出包序號(hào)和對(duì)應(yīng)的id,然后把包序號(hào)和id對(duì)應(yīng)的內(nèi)容返回給客戶端,同時(shí)設(shè)置返回的時(shí)間是隨機(jī)的,這樣就不會(huì)按照順序返回。

粘包處理

如果我們這樣發(fā)送請(qǐng)求:

for (let i = 0; i < 100; i++) {
  id = Math.floor(Math.random() * LESSON_IDS.length)
  socket.write(encode(id))
}

我們發(fā)現(xiàn)服務(wù)端接收到的信息如下:

<Buffer 00 00 00 02 16 64 00 01 00 02 16 68 00 02 00 02 31 1c 00 03 00 02 3c 96 00 04 00 02 16 68 00 05 00 02 16 5e 00 06 00 02 16 66 00 07 00 02 16 67 00 08 ... 550 more bytes>

這是因?yàn)?code>TCP自己做的一個(gè)優(yōu)化,它會(huì)把所有的請(qǐng)求包拼接在一起,這樣就會(huì)產(chǎn)生粘包的現(xiàn)象。

服務(wù)端需要把包進(jìn)行拆分,拆分成100個(gè)小包。

那如何拆分呢?

首先客戶端發(fā)送的數(shù)據(jù)包包括兩部分:定長(zhǎng)的包頭和不定長(zhǎng)的包體。

包頭又分為兩部分:包序號(hào)及包體的長(zhǎng)度。只有知道包體的長(zhǎng)度,才能知道從哪里進(jìn)行分割。

let seq = 0
function encode(data) {
    // 正常情況下,這里應(yīng)該是使用 protocol-buffers 來(lái)encode一段代表業(yè)務(wù)數(shù)據(jù)的數(shù)據(jù)包
    // 為了不要混淆重點(diǎn),這個(gè)例子比較簡(jiǎn)單,就直接把課程id轉(zhuǎn)buffer發(fā)送
    const body = Buffer.alloc(4);
    body.writeInt32BE(LESSON_IDS[data.id]);

    // 一般來(lái)說(shuō),一個(gè)rpc調(diào)用的數(shù)據(jù)包會(huì)分為定長(zhǎng)的包頭和不定長(zhǎng)的包體兩部分
    // 包頭的作用就是用來(lái)記載包的序號(hào)和包的長(zhǎng)度,以實(shí)現(xiàn)全雙工通信
    const header = Buffer.alloc(6); // 包序號(hào)占2個(gè)字節(jié),包體長(zhǎng)度占4個(gè)字節(jié),共6個(gè)字節(jié)
    header.writeInt16BE(seq)
    header.writeInt32BE(body.length, 2);

    // 包頭和包體拼起來(lái)發(fā)送
    const buffer = Buffer.concat([header, body])

    console.log(`包${seq}傳輸?shù)恼n程id為${LESSON_IDS[data.id]}`);
    seq++;
    return buffer;
}

// 并發(fā)
for (let i = 0; i < 100; i++) {
    id = Math.floor(Math.random() * LESSON_IDS.length)
    socket.write(encode({ id }))
}

服務(wù)端進(jìn)行拆包

const server = net.createServer(socket => {
  let oldBuffer = null
  socket.on('data', buffer => {
    // 把上一次data事件使用殘余的buffer接上來(lái)
    if (oldBuffer) {
      buffer = Buffer.concat([oldBuffer, buffer])
    }
    let packageLength = 0
    // 只要還存在可以解成完整包的包長(zhǎng)
    while ((packageLength = checkComplete(buffer))) {
      // 確定包的長(zhǎng)度后進(jìn)行slice分割
      const package = buffer.slice(0, packageLength)
      // 剩余的包利用循環(huán)繼續(xù)分割
      buffer = buffer.slice(packageLength)

      // 把這個(gè)包解成數(shù)據(jù)和seq
      const result = decode(package)

      // 計(jì)算得到要返回的結(jié)果,并write返回
      socket.write(encode(LESSON_DATA[result.data], result.seq))
    }

    // 把殘余的buffer記下來(lái)
    oldBuffer = buffer
  })
})

checkComplete 函數(shù)的作用來(lái)確定一個(gè)數(shù)據(jù)包的長(zhǎng)度,然后進(jìn)行分割:

function checkComplete(buffer) {
  // 如果包的長(zhǎng)度小于6個(gè)字節(jié)說(shuō)明只有包頭,沒(méi)有包體,那么直接返回0
  if (buffer.length <= 6) {
    return 0
  }
  // 讀取包頭的第二個(gè)字節(jié),取出包體的長(zhǎng)度
  const bodyLength = buffer.readInt32BE(2)
  // 請(qǐng)求包包括包頭(6個(gè)字節(jié))和包體body
  return 6 + bodyLength
}

decode對(duì)包進(jìn)行解密:

function decode(buffer) {
  // 讀取包頭
  const header = buffer.slice(0, 6)
  const seq = header.readInt16BE()
    
  // 讀取包體  
  // 正常情況下,這里應(yīng)該是使用 protobuf 來(lái)decode一段代表業(yè)務(wù)數(shù)據(jù)的數(shù)據(jù)包
  // 為了不要混淆重點(diǎn),這個(gè)例子比較簡(jiǎn)單,就直接讀一個(gè)Int32即可
  const body = buffer.slice(6).readInt32BE()

  // 這里把seq和數(shù)據(jù)返回出去
  return {
    seq,
    data: body
  }
}

encode把客戶端想要的數(shù)據(jù)轉(zhuǎn)化為二進(jìn)制返回,這個(gè)包同樣包括包頭和包體,包頭又包括包需要包序號(hào)和包體的長(zhǎng)度。

function encode(data, seq) {
  // 正常情況下,這里應(yīng)該是使用 protobuf 來(lái)encode一段代表業(yè)務(wù)數(shù)據(jù)的數(shù)據(jù)包
  // 為了不要混淆重點(diǎn),這個(gè)例子比較簡(jiǎn)單,就直接把課程標(biāo)題轉(zhuǎn)buffer返回
  const body = Buffer.from(data)

  // 一般來(lái)說(shuō),一個(gè)rpc調(diào)用的數(shù)據(jù)包會(huì)分為定長(zhǎng)的包頭和不定長(zhǎng)的包體兩部分
  // 包頭的作用就是用來(lái)記載包的序號(hào)和包的長(zhǎng)度,以實(shí)現(xiàn)全雙工通信
  const header = Buffer.alloc(6)
  header.writeInt16BE(seq)
  header.writeInt32BE(body.length, 2)

  const buffer = Buffer.concat([header, body])

  return buffer
}

當(dāng)客戶端收到服務(wù)端發(fā)送的包之后,同樣也要進(jìn)行拆包,因?yàn)樗械陌瑯佣颊吃谝黄鹆?

 <Buffer 00 00 00 00 00 1d 30 36 20 7c 20 e4 bb 80 e4 b9 88 e6 98 af e6 8a 80 e6 9c af e9 a2 84 e7 a0 94 ef bc 9f 00 01 00 00 00 1d 30 36 20 7c 20 e4 bb 80 e4 ... 539 more bytes>

因此,客戶端也需要拆包,拆包策略與服務(wù)端的拆包策略是一致的:

let oldBuffer = null
socket.on('data', buffer => {
  // 把上一次data事件使用殘余的buffer接上來(lái)
  if (oldBuffer) {
    buffer = Buffer.concat([oldBuffer, buffer])
  }
  let completeLength = 0

  // 只要還存在可以解成完整包的包長(zhǎng)
  while ((completeLength = checkComplete(buffer))) {
    const package = buffer.slice(0, completeLength)
    buffer = buffer.slice(completeLength)

    // 把這個(gè)包解成數(shù)據(jù)和seq
    const result = decode(package)
    console.log(`包${result.seq},返回值是${result.data}`)
  }

  // 把殘余的buffer記下來(lái)
  oldBuffer = buffer
})

到這里就實(shí)現(xiàn)了雙全工通行,這樣客戶端和服務(wù)端隨時(shí)都可以往對(duì)方發(fā)小消息了。

到此,相信大家對(duì)“node中如何實(shí)現(xiàn)RPC通信”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI