溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

nodejs中怎么利用cluster實(shí)現(xiàn)多進(jìn)程

發(fā)布時(shí)間:2021-07-08 16:08:48 來(lái)源:億速云 閱讀:270 作者:Leah 欄目:大數(shù)據(jù)

本篇文章為大家展示了nodejs中怎么利用cluster實(shí)現(xiàn)多進(jìn)程,內(nèi)容簡(jiǎn)明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過(guò)這篇文章的詳細(xì)介紹希望你能有所收獲。

cluster
  • node進(jìn)行多進(jìn)程的模塊;

屬性和方法
  1. isMaster屬性,返回是不是主進(jìn)程,boolean值;

  2. isWorker屬性, 返回該進(jìn)程是不是工作進(jìn)程;

  3. fork()方法,只能通過(guò)主進(jìn)程調(diào)用,衍生出一個(gè)新的worker子進(jìn)程,返回worker對(duì)象;

  4. setupMaster([setting])方法,用于修改fork的默認(rèn)行為,一旦調(diào)用,將會(huì)按照cluster.settings進(jìn)行設(shè)置;

  5. settings屬性;用于配置;

  • 參數(shù)exec:worker文件路徑,

  • args:傳遞給worker的參數(shù);

  • execArgv: 傳遞給nodejs可執(zhí)行文件的參數(shù)列表;

const cluster = require('cluster');
const cpuNums = require('os').cpus().length;
const http = require('http');

if (cluster.isMaster) {
  console.log(cpuNums);
  for (var i = 0; i < cpuNums; i++) {
    cluster.fork(); // 相當(dāng)于node main.js,重新執(zhí)行自己
    // 和process_child相比,不用重新創(chuàng)建child.js,
  }
  
  cluster.on('fork', worker => {
    console.log(`主進(jìn)程fork了一個(gè)worker,pid為${worker.process.pid}`)
  })
  
  cluster.on('listening', worker => {
    console.log(`主進(jìn)程fork了一個(gè)worker,pid為${worker.process.pid}`)
  })
  
  cluster.on('message', data => {
      console.log('主進(jìn)程接收到了子進(jìn)程消息為:',data)
  })
  
  Object.keys(cluster.workers).forEach(item => {
    cluster.workers[item].on('message', data => {
      console.log(data);
    });
  });
  
  cluster.on('disconnect', (worker) => {
    console.log('有工作進(jìn)程退出了',worker.process.pid)
  })

} else {

  http.createServer((req, res) => {
    res.end('hello')
  }).listen(8001, () =>{
      console.log('child server is runing')
  })    
  
  console.log('我是子進(jìn)程');
  
  process.send('子說(shuō):你好');
}
事件
  1. fork事件,當(dāng)新的工作進(jìn)程被fork時(shí)觸發(fā),可以用來(lái)記錄工作進(jìn)程活動(dòng);

  2. listening事件,當(dāng)一個(gè)工作進(jìn)程調(diào)用listen()后觸發(fā),事件處理器兩個(gè)參數(shù):worker:工作進(jìn)程對(duì)象, address:包含了鏈接屬性

  • 只有http的服務(wù)的listen事件才能觸發(fā)此事件

  1. message事件,監(jiān)聽(tīng)子進(jìn)程的消息;當(dāng)cluster主進(jìn)程接收任何工作進(jìn)程發(fā)送的消息時(shí)觸發(fā);

  • 比較特殊需要在單獨(dú)的worker上監(jiān)聽(tīng);

  1. online事件,

  2. disconnect事件,當(dāng)工作進(jìn)程斷開(kāi)時(shí)調(diào)用;

  3. exit事件,

  4. setup事件,cluster.setupMaster()執(zhí)行后觸發(fā);

cluster多進(jìn)程模型
  • 每個(gè)worker進(jìn)程通過(guò)使用cluster.fork()函數(shù),基于IPC(Inter-Process-Communication),實(shí)現(xiàn)與master進(jìn)程間通信;

那通過(guò)child_process.fork()直接創(chuàng)建不就可以了,為什么要通過(guò)cluster

這種方式只實(shí)現(xiàn)了多進(jìn)程,多進(jìn)程運(yùn)行還涉及父子進(jìn)程通信,子進(jìn)程管理,以及負(fù)載均衡等問(wèn)題,這些特性cluster已經(jīng)做了處理了;

驚群現(xiàn)象
  • 多個(gè)進(jìn)程間會(huì)競(jìng)爭(zhēng)一個(gè)accept連接,產(chǎn)生驚群現(xiàn)象,效率比較低;

  • 由于無(wú)法控制一個(gè)新的連接由哪個(gè)進(jìn)程來(lái)處理,導(dǎo)致worker進(jìn)程間負(fù)載不均衡;

master.js

const net = require('net'); // 是最基礎(chǔ)的網(wǎng)絡(luò)模塊,http的基礎(chǔ)就是網(wǎng)絡(luò)模塊,最底層是socket
const fork = require('child_process').fork; // 驚群
var handle = net._createServerHandle('0.0.0.0', 5000); // net模塊創(chuàng)建一個(gè)服務(wù),綁定到3000端口,返回一個(gè)callback
for (var i = 0; i < 4; i++) {
  console.log('fork', i);
  fork('./worket.js').send({}, handle); // 主進(jìn)程fork子進(jìn)程,send信息
}

worker.js

const net = require('net');
process.on('message', (m, handle) => { // 子進(jìn)程接收到master信息
  // master接收客戶(hù)端的請(qǐng)求,worker去響應(yīng)
  start(handle);
});

var buf = 'hello nodejs';
var res =
  ['HTTP/1.1 200 OK', 'content-length' + buf.length].join('\r\n') +
  ' \r\n\r\n' +
  buf;

var data = {};

function start(server) {
  // 響應(yīng)邏輯,重點(diǎn)關(guān)注驚群效果,計(jì)數(shù)
  server.listen();
  server.onconnection = function(err, hand) {
    var pid = process.pid;
    if (!data[pid]) {
      data[pid] = 0;
    }
    data[pid]++;
    console.log('get a connection on worker,pid = %d', process.pid, data[pid]);
    var socket = net.Socket({
      handle: hand
    });
    socket.readable = socket.writable = true; // 修改socket的讀寫(xiě)屬性
    socket.end(res);
  };
}
nginx proxy
  • Nginx 是俄羅斯人編寫(xiě)的十分輕量級(jí)的http服務(wù)器,是一個(gè)高性能的HTTP和反向代理服務(wù)器,異步非阻塞I/O,而且能夠高并發(fā);

  • 正向代理:客戶(hù)端為代理,服務(wù)端不知道代理是誰(shuí);

  • 反向代理:服務(wù)器為代理,客戶(hù)端不知道代理是誰(shuí);

  • nginx的實(shí)際應(yīng)用場(chǎng)景: 比較適合穩(wěn)定的服務(wù)

    • 靜態(tài)資源服務(wù)器:js,css, html

    • 企業(yè)級(jí)集群

守護(hù)進(jìn)程:退出命令窗口之后,服務(wù)一直處于運(yùn)行狀態(tài);

cluster多進(jìn)程調(diào)度模型
  • cluster是由master監(jiān)聽(tīng)請(qǐng)求,在通過(guò)round-robin算法分發(fā)給各個(gè)worker,避免了驚群現(xiàn)象的發(fā)生;

round-robin 輪詢(xún)調(diào)度算法的原理是每一次把來(lái)自用戶(hù)的請(qǐng)求輪流分配給內(nèi)部中的服務(wù)器;

cluster-model.js

const net = require('net');
const fork = require('child_process').fork; // cluster 簡(jiǎn)單版本,cluster就是基于child_process去封裝的;

var workers = [];
for (var i = 0; i < 4; i++) {
  workers.push(fork('./child')); // cluster workers
}
var handle = net._createServerHandle('0.0.0.0', 3001); // master
handle.listen();
handle.onconnection = function(err, handle) {
  var worker = workers.pop();
  worker.send({}, handle);
  workers.unshift(worker); // 通過(guò)pop 和 unshift實(shí)現(xiàn)一個(gè)簡(jiǎn)單的輪詢(xún)
};

child.js

const net = require('net');
process.on('message', (m, handle) => {
  debugger;
  start(handle);
});

var buf = 'hello cluster';
var res =
  ['HTTP/1.1 200 OK', 'content-length' + buf.length].join('\r\n') +
  '\r\n\r\n' +
  buf;

function start(handle) {
  console.log('get a worker on server,pid = ' + process.pid);
  var socket = net.Socket({
    handle
  });
  socket.readable = socket.writable = true; // 修改socket的讀寫(xiě)屬性
  socket.end(res);
}
cluster中的優(yōu)雅退出
  1. 關(guān)閉異常worker進(jìn)程所有的TCP server(將已有的快速斷開(kāi),且不再接受新的連接),斷開(kāi)和Master的IPC通道,不再接受新的用戶(hù)請(qǐng)求;

  2. Master立刻fork一個(gè)新的worker進(jìn)程,保證總的進(jìn)程數(shù)量不變;

  3. 異常worker等待一段時(shí)間,處理完已接受的請(qǐng)求后退出;

    if(cluster.isMaster){
        cluster.fork();
    }else {
        // 出錯(cuò)之后
        try{
            res.end(dddd);  // 報(bào)錯(cuò),整個(gè)線(xiàn)程掛掉,不能提供服務(wù),
        }catch(err){
         // 斷開(kāi)連接,斷開(kāi)和Master的連接,守護(hù)進(jìn)程其實(shí)就是重啟;
            process.disconnect(); // or exit()
        }
    }
進(jìn)程守護(hù)
  • Master進(jìn)程除了接收新的連接,分發(fā)給各worker處理之外,還像天使一樣默默守護(hù)著這些進(jìn)程,保障應(yīng)用的穩(wěn)定性,一旦某個(gè)worker進(jìn)程退出就fork一個(gè)新的子進(jìn)程頂替上去;

  • 這一切cluster模塊已經(jīng)處理好了,當(dāng)某個(gè)worker進(jìn)程發(fā)生異常退出或者與Master進(jìn)程失去聯(lián)系(disconnected)時(shí),master進(jìn)程都會(huì)收到相應(yīng)的事件通知;

cluster.on('exit',function(){
    cluster.fork();
})

cluster.on('disconnect',function(){
    cluster.fork();
})
IPC通信
  • IPC通信就是進(jìn)程間的通信;

  • 雖然每個(gè)worker進(jìn)程是相對(duì)獨(dú)立的,但是他們之間還是需要通信的;叫進(jìn)程間通信(IPC)通信;

  • worker和worker之間的通信通過(guò)Master轉(zhuǎn)發(fā):通過(guò)worker的pid

const cluster = require('cluster');
if(cluster.isMaster){
    var worker = cluster.fork();
    worker.send('hi, i am master');
    worker.on('message', (msg) =>{
        console.log(`${msg} is from worker ${worker.id}`)   
    })
}else if(cluster.isWorker){
   process.on('message', (msg) =>{
        process.send(msg);
   }) 
}

上述內(nèi)容就是nodejs中怎么利用cluster實(shí)現(xiàn)多進(jìn)程,你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI