溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

Node.Js中實(shí)現(xiàn)端口重用原理詳解

發(fā)布時(shí)間:2020-09-24 03:26:25 來(lái)源:腳本之家 閱讀:115 作者:G_Doe 欄目:web開(kāi)發(fā)

本文介紹了Node.Js中實(shí)現(xiàn)端口重用原理詳解,分享給大家,具體如下:

起源,從官方實(shí)例中看多進(jìn)程共用端口

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
 console.log(`Master ${process.pid} is running`);

 for (let i = 0; i < numCPUs; i++) {
  cluster.fork();
 }

 cluster.on('exit', (worker, code, signal) => {
  console.log(`worker ${worker.process.pid} died`);
 });
} else {
 http.createServer((req, res) => {
  res.writeHead(200);
  res.end('hello world\n');
 }).listen(8000);

 console.log(`Worker ${process.pid} started`);
}

執(zhí)行結(jié)果:

$ node server.js
Master 3596 is running
Worker 4324 started
Worker 4520 started
Worker 6056 started
Worker 5644 started

了解http.js模塊:

我們都只有要?jiǎng)?chuàng)建一個(gè)http服務(wù),必須引用http模塊,http模塊最終會(huì)調(diào)用net.js實(shí)現(xiàn)網(wǎng)絡(luò)服務(wù)

// lib/net.js
'use strict';

 ...
Server.prototype.listen = function(...args) {
  ...
 if (options instanceof TCP) {
   this._handle = options;
   this[async_id_symbol] = this._handle.getAsyncId();
   listenInCluster(this, null, -1, -1, backlogFromArgs); // 注意這個(gè)方法調(diào)用了cluster模式下的處理辦法
   return this;
  }
  ...
};

function listenInCluster(server, address, port, addressType,backlog, fd, exclusive) {
// 如果是master 進(jìn)程或者沒(méi)有開(kāi)啟cluster模式直接啟動(dòng)listen
if (cluster.isMaster || exclusive) {
  //_listen2,細(xì)心的人一定會(huì)發(fā)現(xiàn)為什么是listen2而不直接使用listen
 // _listen2 包裹了listen方法,如果是Worker進(jìn)程,會(huì)調(diào)用被hack后的listen方法,從而避免出錯(cuò)端口被占用的錯(cuò)誤
  server._listen2(address, port, addressType, backlog, fd);
  return;
 }
 const serverQuery = {
  address: address,
  port: port,
  addressType: addressType,
  fd: fd,
  flags: 0
 };

// 是fork 出來(lái)的進(jìn)程,獲取master上的handel,并且監(jiān)聽(tīng),
// 現(xiàn)在是不是很好奇_getServer方法做了什么
 cluster._getServer(server, serverQuery, listenOnMasterHandle);
}
 ...

答案很快就可以通過(guò)cluster._getServer 這個(gè)函數(shù)找到

  1. 代理了server._listen2 這個(gè)方法在work進(jìn)程的執(zhí)行操作
  2. 向master發(fā)送queryServer消息,向master注冊(cè)一個(gè)內(nèi)部TCP服務(wù)器
// lib/internal/cluster/child.js
cluster._getServer = function(obj, options, cb) {
 // ...
 const message = util._extend({
  act: 'queryServer',  // 關(guān)鍵點(diǎn):構(gòu)建一個(gè)queryServer的消息
  index: indexes[indexesKey],
  data: null
 }, options);

 message.address = address;

// 發(fā)送queryServer消息給master進(jìn)程,master 在收到這個(gè)消息后,會(huì)創(chuàng)建一個(gè)開(kāi)始一個(gè)server,并且listen
 send(message, (reply, handle) => {
   rr(reply, indexesKey, cb);       // Round-robin.
 });

 obj.once('listening', () => {
  cluster.worker.state = 'listening';
  const address = obj.address();
  message.act = 'listening';
  message.port = address && address.port || options.port;
  send(message);
 });
};
 //...
 // Round-robin. Master distributes handles across workers.
function rr(message, indexesKey, cb) {
  if (message.errno) return cb(message.errno, null);
  var key = message.key;
  // 這里hack 了listen方法
  // 子進(jìn)程調(diào)用的listen方法,就是這個(gè),直接返回0,所以不會(huì)報(bào)端口被占用的錯(cuò)誤
  function listen(backlog) {
    return 0;
  }
  // ...
  const handle = { close, listen, ref: noop, unref: noop };
  handles[key] = handle;
  // 這個(gè)cb 函數(shù)是net.js 中的listenOnMasterHandle 方法
  cb(0, handle);
}
// lib/net.js
/*
function listenOnMasterHandle(err, handle) {
  err = checkBindError(err, port, handle);
  server._handle = handle;
  // _listen2 函數(shù)中,調(diào)用的handle.listen方法,也就是上面被hack的listen
  server._listen2(address, port, addressType, backlog, fd);
 }
*/

master進(jìn)程收到queryServer消息后進(jìn)行啟動(dòng)服務(wù)

  1. 如果地址沒(méi)被監(jiān)聽(tīng)過(guò),通過(guò)RoundRobinHandle監(jiān)聽(tīng)開(kāi)啟服務(wù)
  2. 如果地址已經(jīng)被監(jiān)聽(tīng),直接綁定handel到已經(jīng)監(jiān)聽(tīng)到服務(wù)上,去消費(fèi)請(qǐng)求
// lib/internal/cluster/master.js
function queryServer(worker, message) {

  const args = [
    message.address,
    message.port,
    message.addressType,
    message.fd,
    message.index
  ];

  const key = args.join(':');
  var handle = handles[key];

  // 如果地址沒(méi)被監(jiān)聽(tīng)過(guò),通過(guò)RoundRobinHandle監(jiān)聽(tīng)開(kāi)啟服務(wù)
  if (handle === undefined) {
    var constructor = RoundRobinHandle;
    if (schedulingPolicy !== SCHED_RR ||
      message.addressType === 'udp4' ||
      message.addressType === 'udp6') {
      constructor = SharedHandle;
    }

    handles[key] = handle = new constructor(key,
      address,
      message.port,
      message.addressType,
      message.fd,
      message.flags);
  }

  // 如果地址已經(jīng)被監(jiān)聽(tīng),直接綁定handel到已經(jīng)監(jiān)聽(tīng)到服務(wù)上,去消費(fèi)請(qǐng)求
  // Set custom server data
  handle.add(worker, (errno, reply, handle) => {
    reply = util._extend({
      errno: errno,
      key: key,
      ack: message.seq,
      data: handles[key].data
    }, reply);

    if (errno)
      delete handles[key]; // Gives other workers a chance to retry.

    send(worker, reply, handle);
  });
}

看到這一步,已經(jīng)很明顯,我們知道了多進(jìn)行端口共享的實(shí)現(xiàn)原理

  1. 其實(shí)端口僅由master進(jìn)程中的內(nèi)部TCP服務(wù)器監(jiān)聽(tīng)了一次
  2. 因?yàn)閚et.js 模塊中會(huì)判斷當(dāng)前的進(jìn)程是master還是Worker進(jìn)程
  3. 如果是Worker進(jìn)程調(diào)用cluster._getServer 去hack原生的listen 方法
  4. 所以在child調(diào)用的listen方法,是一個(gè)return 0 的空方法,所以不會(huì)報(bào)端口占用錯(cuò)誤

那現(xiàn)在問(wèn)題來(lái)了,既然Worker進(jìn)程是如何獲取到master進(jìn)程監(jiān)聽(tīng)服務(wù)接收到的connect呢?

  1. 監(jiān)聽(tīng)master進(jìn)程啟動(dòng)的TCP服務(wù)器的connection事件
  2. 通過(guò)輪詢(xún)挑選出一個(gè)worker
  3. 向其發(fā)送newconn內(nèi)部消息,消息體中包含了客戶(hù)端句柄
  4. 有了句柄,誰(shuí)都知道要怎么處理了哈哈
// lib/internal/cluster/round_robin_handle.js

function RoundRobinHandle(key, address, port, addressType, fd) {

  this.server = net.createServer(assert.fail);

  if (fd >= 0)
    this.server.listen({ fd });
  else if (port >= 0)
    this.server.listen(port, address);
  else
    this.server.listen(address); // UNIX socket path.

  this.server.once('listening', () => {
    this.handle = this.server._handle;
    // 監(jiān)聽(tīng)onconnection方法
    this.handle.onconnection = (err, handle) => this.distribute(err, handle);
    this.server._handle = null;
    this.server = null;
  });
}

RoundRobinHandle.prototype.add = function (worker, send) {
  // ...
};

RoundRobinHandle.prototype.remove = function (worker) {
  // ...
};

RoundRobinHandle.prototype.distribute = function (err, handle) {
  // 負(fù)載均衡地挑選出一個(gè)worker
  this.handles.push(handle);
  const worker = this.free.shift();
  if (worker) this.handoff(worker);
};

RoundRobinHandle.prototype.handoff = function (worker) {
  const handle = this.handles.shift();
  const message = { act: 'newconn', key: this.key };
  // 向work進(jìn)程其發(fā)送newconn內(nèi)部消息和客戶(hù)端的句柄handle
  sendHelper(worker.process, message, handle, (reply) => {
  // ...
    this.handoff(worker);
  });
};

下面讓我們看看Worker進(jìn)程接收到newconn消息后進(jìn)行了哪些操作

// lib/child.js
function onmessage(message, handle) {
  if (message.act === 'newconn')
   onconnection(message, handle);
  else if (message.act === 'disconnect')
   _disconnect.call(worker, true);
 }

// Round-robin connection.
// 接收連接,并且處理
function onconnection(message, handle) {
 const key = message.key;
 const server = handles[key];
 const accepted = server !== undefined;

 send({ ack: message.seq, accepted });

 if (accepted) server.onconnection(0, handle);
}

總結(jié)

  1. net模塊會(huì)對(duì)進(jìn)程進(jìn)行判斷,是worker 還是master, 是worker的話進(jìn)行hack net.Server實(shí)例的listen方法
  2. worker 調(diào)用的listen 方法是hack掉的,直接return 0,不過(guò)會(huì)向master注冊(cè)一個(gè)connection接手的事件
  3. master 收到客戶(hù)端connection事件后,會(huì)輪詢(xún)向worker發(fā)送connection上來(lái)的客戶(hù)端句柄
  4. worker收到master發(fā)送過(guò)來(lái)客戶(hù)端的句柄,這時(shí)候就可以處理客戶(hù)端請(qǐng)求了

分享出于共享學(xué)習(xí)的目的,如有錯(cuò)誤,歡迎大家留言指導(dǎo),不喜勿噴。也希望大家多多支持億速云。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI