溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

socket.io與pm2(cluster)集群搭配的解決方案

發(fā)布時(shí)間:2020-09-25 20:27:01 來(lái)源:腳本之家 閱讀:248 作者:RoyalRover 欄目:web開發(fā)

socket.io與cluster

在線上系統(tǒng)中,需要使用node的多進(jìn)程模型,我們可以自己實(shí)現(xiàn)簡(jiǎn)易的基于cluster模式的socket分發(fā)模型,也可以使用比較穩(wěn)定的pm2這樣進(jìn)程管理工具。在常規(guī)的http服務(wù)中,這套模式一切正常,可是一旦server中集成了socket.io服務(wù)就會(huì)導(dǎo)致ws通道建立失敗,即使通過(guò)backup的polling方式仍會(huì)出現(xiàn)時(shí)斷時(shí)連的現(xiàn)象,因此我們需要解決這種問(wèn)題,讓socket.io充分利用多核。

在這里之所以提到socket.io而未說(shuō)websocket服務(wù),是因?yàn)閟ocket.io在封裝websocket基礎(chǔ)上又保證了可用性。在客戶端未提供websocket功能的基礎(chǔ)上使用xhr polling、jsonp或forever iframe的方式進(jìn)行兼容,同時(shí)在建立ws連接前往往通過(guò)幾次http輪訓(xùn)確保ws服務(wù)可用,因此socket.io并不等于websocket。再往底層深入研究,socket.io其實(shí)并沒有做真正的websocket兼容,而是提供了上層的接口以及namespace服務(wù),真正的邏輯則是在“engine.io”模塊。該模塊實(shí)現(xiàn)握手的http代理、連接升級(jí)、心跳、傳輸方式等,因此研究engine.io模塊才能清楚的了解socket.io實(shí)現(xiàn)機(jī)制。

場(chǎng)景重現(xiàn)

服務(wù)端采用express+socket.io的組合方案,搭配pm2的cluster模式,實(shí)現(xiàn)一個(gè)簡(jiǎn)易的b/s通信demo:

app.js

var path = require('path');
var app = require('express')(),
 server = require('http').createServer(app),
 io = require('socket.io')(server);

io
 .on('connection', function(socket) {
  socket.on('disconnect', function() {
   console.log('/: disconnect-------->')
  });

  socket.on('b:message', function() {
   socket.emit('s:message', '/: '+port);
   console.log('/: '+port)
  });
 });

io.of('/ws')
 .on('connection', function(socket) {
 socket.on('disconnect', function() {
  console.log('/ws: disconnect-------->')
 });

 socket.on('b:message', function() {
  socket.emit('/ws: message', port);
 });
});

app.get('/page',function(req,res){
 res.sendFile(path.join(process.cwd(),'./index.html'));
});

server.listen(8080);

index.html

<script>
  var btn = document.getElementById('btn1');
  btn.addEventListener('click',function(){
   var socket = io.connect('http://127.0.0.1:8080/ws',{
    reconnection: false
   });
   socket.on('connect',function(){
    // 發(fā)起“腳手架安裝”請(qǐng)求
    socket.emit('b:message',{});

    socket.on('s:message',function(d){
     console.log(d);
    });

   });

   socket.on('error',function(err){
    console.log(err);
   })
  });
 </script>

pm2.json

{
 "apps": [
 {
  "name": "ws",
  "script": "./app.js",
  "env": {
  "NODE_ENV": "development"
  },
  "env_production": {
  "NODE_ENV": "production"
  },
  "instances": 4,
  "exec_mode": "cluster",
  "max_restarts" : 3,
  "restart_delay" : 5000,
  "log_date_format" : "YYYY-MM-DD HH:mm Z",
  "combine_logs" : true
 }
 ]
}

這樣,執(zhí)行命令pm2 start pm2.json即可開啟服務(wù),訪問(wèn)127.0.0.1:8080/page,點(diǎn)擊按鈕發(fā)起ws連接,觀察控制臺(tái)即可。

下圖清晰顯示了socket.io握手的錯(cuò)誤:

socket.io與pm2(cluster)集群搭配的解決方案

可見在websocket連接建立之前多出了3個(gè)xhr請(qǐng)求,而websocket連接建立失敗后又多出了幾個(gè)xhr請(qǐng)求,同時(shí)最后兩個(gè)xhr請(qǐng)求失敗了。

socket.io沒有采用直接建立websocket連接的粗暴方式,而是首先通過(guò)http請(qǐng)求(xhr)訪問(wèn)服務(wù)端的相關(guān)輪訓(xùn)配置信息以及sid。此處sid類似sessionID,但是它唯一標(biāo)識(shí)連接,可理解為socketId,以后每次http請(qǐng)求cookie中都必須攜帶sid(httponly);

socket.io與pm2(cluster)集群搭配的解決方案

第二、三個(gè)請(qǐng)求用于確認(rèn)連接,在socket.io中,post請(qǐng)求是客戶端發(fā)送消息給服務(wù)端的唯一形式,而且post響應(yīng)一定是“ok”,它的“content-length”一定為2;而get請(qǐng)求主要用于輪訓(xùn),同時(shí)獲取服務(wù)端的相關(guān)消息,這會(huì)在下文中有體現(xiàn);

第四個(gè)websocket連接請(qǐng)求失敗,這主要是由于與后端http握手失敗造成的;

第五個(gè)請(qǐng)求為xhr方式的post請(qǐng)求,它是作為websocket通道建立失敗后的一種兼容性處理,上文講述了socket.io的post請(qǐng)求只在客戶端需要發(fā)送消息給服務(wù)端時(shí)才會(huì)使用,因此,為了證實(shí)我們查看消息體:

socket.io與pm2(cluster)集群搭配的解決方案

可見,它攜帶了客戶端發(fā)出的消息類型b:message,同時(shí)包含消息體{}空對(duì)象。對(duì)應(yīng)的,服務(wù)端返回“OK”;

第六個(gè)請(qǐng)求為xhr方式的get請(qǐng)求,用來(lái)獲取服務(wù)端對(duì)第五個(gè)請(qǐng)求的響應(yīng)。

socket.io與pm2(cluster)集群搭配的解決方案

至此,大致分析了socket.io建立連接的大致過(guò)程以及連接建立失敗后如何兜底的方案,下面分析為何出現(xiàn)握手失敗的問(wèn)題。

原因何在

實(shí)例中pm2主進(jìn)程開啟了4個(gè)工作進(jìn)程,由主進(jìn)程偵聽8080端口并分發(fā)請(qǐng)求給工作進(jìn)程。pm2進(jìn)程在分發(fā)請(qǐng)求的階段采用了某種算法的均衡,如round-robin或者其他hash方式(但不是iphash),因此在socket.io客戶端連接建立階段發(fā)送的多個(gè)xhr請(qǐng)求,會(huì)被pm2定位到不同的worker進(jìn)程中。前文中提到每個(gè)xhr請(qǐng)求都會(huì)攜帶sid字段標(biāo)識(shí)當(dāng)前連接,因此當(dāng)一個(gè)攜帶sid字段的請(qǐng)求被pm2定位到另一個(gè)與該連接無(wú)關(guān)的worker時(shí),就會(huì)造成請(qǐng)求失敗,返回{"code":1,"message":"Session ID unknown"}錯(cuò)誤;即使前三次xhr握手成功,進(jìn)入websocket連接升級(jí)階段,負(fù)責(zé)偵聽update事件的worker也往往不是之前的那個(gè)worder,因此導(dǎo)致websocket連接建立失敗。

一言以蔽之,客戶端多次請(qǐng)求的服務(wù)端進(jìn)程不是同一個(gè)進(jìn)程才導(dǎo)致的ws連接無(wú)法成功建立。那么如何才能解決呢?最簡(jiǎn)單的方案就是確??蛻舳说拿看握?qǐng)求都可以定位到同一個(gè)服務(wù)進(jìn)程即可。當(dāng)然,分布式session同樣可以解決問(wèn)題,依托第三方緩存類似redis并配合一致性hash算法,確保所有服務(wù)進(jìn)程都可以獲取到連接信息,相互配合完成連接建立。但這也僅僅是作者在理論上分析的一種實(shí)現(xiàn)方式,并沒有測(cè)試通過(guò),因?yàn)檫@種分布式架構(gòu)不僅實(shí)現(xiàn)繁雜而且引入了相關(guān)依賴redis,不太可取。

那么下文主要針對(duì)確??蛻舳说拿看握?qǐng)求都可以定位到同一個(gè)服務(wù)進(jìn)程這一點(diǎn)實(shí)現(xiàn)解決方案。

多種實(shí)現(xiàn)

官方實(shí)現(xiàn)

官方提供了一種比較輕便的架構(gòu):nginx反向代理+iphash

我們的示例demo中的http服務(wù)器只偵聽8080端口,因此必須由pm2分發(fā)請(qǐng)求,否則會(huì)出現(xiàn)端口占用的錯(cuò)誤發(fā)生。但是,官方的解決方案是每個(gè)進(jìn)程的socket.io服務(wù)器創(chuàng)建不同端口的http服務(wù)器,專注用于http握手和升級(jí),由nginx做握手請(qǐng)求的代理。而且針對(duì)nginx必須設(shè)置iphash,保證同一個(gè)客戶端的多次請(qǐng)求定位到后端同一個(gè)服務(wù)進(jìn)程。

這樣,示例demo中會(huì)占用5個(gè)端口,其中8080端口為公用的http服務(wù)器使用,其他四個(gè)端口則只用于ws連接握手。但是這四個(gè)端口卻如何選取呢?為了保證擴(kuò)展性以及順序性,采用與pm2相兼容的方案。pm2會(huì)為每個(gè)worker進(jìn)程分配一個(gè)id,并且將該id綁定到進(jìn)程的環(huán)境變量中,那么我們就可以利用該worker id生成4個(gè)不同的端口號(hào)。

app.js

var path = require('path');
var app = require('express')(),
 server = require('http').createServer(app),
 port = 3131 + parseInt(process.env.NODE_APP_INSTANCE),
 io = require('socket.io')(port);

io
 .on('connection', function(socket) {
  socket.on('disconnect', function() {
   console.log('/: disconnect-------->')
  });

  socket.on('b:message', function() {
   socket.emit('s:message', '/: '+port);
   console.log('/: '+port)
  });
 });

io.of('/ws')
 .on('connection', function(socket) {
 socket.on('disconnect', function() {
  console.log('disconnect-------->')
 });

 socket.on('b:message', function() {
  socket.emit('s:message', port);
 });
});

app.get('/abc',function(req,res){
 res.sendFile(path.join(process.cwd(),'./index.html'));
});

server.listen(8080);

index.html

 <script>
  var btn = document.getElementById('btn1');
  btn.addEventListener('click',function(){
   var socket = io.connect('http://ws.vd.net/ws',{
    reconnection: false
   });
   socket.on('connect',function(){
    // 發(fā)起“腳手架安裝”請(qǐng)求
    socket.emit('b:message',{a:1});

    socket.on('s:message',function(d){
     console.log(d);
    });

   });

   socket.on('error',function(err){
    console.log(err);
   })
  });
 </script>

nginx.conf

 upstream io_nodes {
  ip_hash;
  server 127.0.0.1:3131;
  server 127.0.0.1:3132;
  server 127.0.0.1:3133;
  server 127.0.0.1:3134;
 }
 server {
  listen 80;
  server_name ws.vd.net;
  location / {
   proxy_set_header Upgrade $http_upgrade;
   proxy_set_header Connection "upgrade";
   proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
   proxy_set_header Host $host;
   proxy_http_version 1.1;
   proxy_pass http://io_nodes;
  }
 }

在本機(jī)綁定hosts地址后開啟nginx服務(wù),同時(shí)開啟服務(wù)器,點(diǎn)擊按鈕建立ws連接成功。

服務(wù)端路由

服務(wù)端路由,意義在于“服務(wù)端做worker的負(fù)載均衡,并將選擇的worker ip和端口渲染在頁(yè)面,之后瀏覽器的所有ws連接默認(rèn)連接到對(duì)應(yīng) ip:port的服務(wù)器中”。這樣只要是服務(wù)端渲染的頁(yè)面都可以采用這種方式實(shí)現(xiàn)。

如果頁(yè)面采用前端異步渲染,仍可以采用這種方式,不過(guò)首先通過(guò)xhr請(qǐng)求向服務(wù)端獲取需要握手的http服務(wù)器的ip和端口,然后在進(jìn)行ws連接。

服務(wù)端路由的前提仍然是需要針對(duì)每個(gè)ws服務(wù)器分配一個(gè)端口,只不過(guò)去掉nginx由服務(wù)端做ip hash。采用服務(wù)端路由架構(gòu)清晰,而且實(shí)現(xiàn)容易,兼容性好。

上帝進(jìn)程路由

此處的上帝進(jìn)程即為主進(jìn)程,類似pm2進(jìn)程。上帝進(jìn)程路由則是在上帝進(jìn)程層面上做請(qǐng)求的定向分發(fā),保證請(qǐng)求主機(jī)和進(jìn)程的一致性。在上帝進(jìn)程中,針對(duì)每個(gè)請(qǐng)求的ip做hash,并對(duì)每一個(gè)ws服務(wù)器創(chuàng)建單獨(dú)的http服務(wù)器用于握手升級(jí)。

簡(jiǎn)易代碼:

var express = require('express'),
 cluster = require('cluster'),
 net = require('net'),
 sio = require('socket.io');

var port = 3000,
 num_processes = require('os').cpus().length;

if (cluster.isMaster) {
 var workers = [];

 var spawn = function(i) {
  workers[i] = cluster.fork();
  workers[i].on('exit', function(code, signal) {
   console.log('respawning worker', i);
   spawn(i);
  });
 };

 for (var i = 0; i < num_processes; i++) {
  spawn(i);
 }

 // ip hash
 var worker_index = function(ip, len) {
  var s = '';
  for (var i = 0, _len = ip.length; i < _len; i++) {
   if (!isNaN(ip[i])) {
    s += ip[i];
   }
  }

  return Number(s) % len;
 };

 var server = net.createServer({ pauseOnConnect: true }, function(connection) {
  var worker = workers[worker_index(connection.remoteAddress, num_processes)];
  worker.send('sticky-session:connection', connection);
 }).listen(port);
} else {
 // worker
 var app = new express();

 // handshake server.
 var server = app.listen(0, 'localhost'),
  io = sio(server);

 process.on('message', function(message, connection) {
  if (message !== 'sticky-session:connection') {
   return;
  }

  server.emit('connection', connection);

  connection.resume();
 });
}

總結(jié)

本文實(shí)現(xiàn)了三種解決方案,歸根到底就是“ip hash”,不同點(diǎn)在于在請(qǐng)求處理的不同階段做ip hash。

可以在請(qǐng)求處理最前端做iphash,即nginx方式,這也就是第一種方案;

可以在請(qǐng)求處理的第二層分發(fā)處做iphash,即上帝進(jìn)程路由的方式,即第三種;

也可以在請(qǐng)求處理的終端做iphash,即服務(wù)端路由的方式,也就是第二種;

同時(shí)共享session也同樣可以實(shí)現(xiàn),借助socket.io-redis模塊也可以實(shí)現(xiàn)。

好了,以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作能帶來(lái)一定的幫助,如果有疑問(wèn)大家可以留言交流,謝謝大家對(duì)億速云的支持。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI