溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

怎么使用Node實(shí)現(xiàn)輕量化進(jìn)程池和線程池

發(fā)布時(shí)間:2022-10-17 10:02:00 來源:億速云 閱讀:152 作者:iii 欄目:web開發(fā)

今天小編給大家分享一下怎么使用Node實(shí)現(xiàn)輕量化進(jìn)程池和線程池的相關(guān)知識點(diǎn),內(nèi)容詳細(xì),邏輯清晰,相信大部分人都還太了解這方面的知識,所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。

一、名詞定義

1. 進(jìn)程

學(xué)術(shù)上說,進(jìn)程是一個(gè)具有一定獨(dú)立功能的程序在一個(gè)數(shù)據(jù)集上的一次動(dòng)態(tài)執(zhí)行的過程,是操作系統(tǒng)進(jìn)行資源分配和調(diào)度的一個(gè)獨(dú)立單位,是應(yīng)用程序運(yùn)行的載體。我們這里將進(jìn)程比喻為工廠的車間,它代表 CPU 所能處理的單個(gè)任務(wù)。任一時(shí)刻,CPU 總是運(yùn)行一個(gè)進(jìn)程,其他進(jìn)程處于非運(yùn)行狀態(tài)。

進(jìn)程具有以下特性:

  • 進(jìn)程是擁有資源的基本單位,資源分配給進(jìn)程,同一進(jìn)程的所有線程共享該進(jìn)程的所有資源;

  • 進(jìn)程之間可以并發(fā)執(zhí)行;

  • 在創(chuàng)建或撤消進(jìn)程時(shí),系統(tǒng)都要為之分配和回收資源,與線程相比系統(tǒng)開銷較大;

  • 一個(gè)進(jìn)程可以有多個(gè)線程,但至少有一個(gè)線程;

2. 線程

在早期的操作系統(tǒng)中并沒有線程的概念,進(jìn)程是能擁有資源和獨(dú)立運(yùn)行的最小單位,也是程序執(zhí)行的最小單位。任務(wù)調(diào)度采用的是時(shí)間片輪轉(zhuǎn)的搶占式調(diào)度方式,而進(jìn)程是任務(wù)調(diào)度的最小單位,每個(gè)進(jìn)程有各自獨(dú)立的一塊內(nèi)存,使得各個(gè)進(jìn)程之間內(nèi)存地址相互隔離。

后來,隨著計(jì)算機(jī)的發(fā)展,對 CPU 的要求越來越高,進(jìn)程之間的切換開銷較大,已經(jīng)無法滿足越來越復(fù)雜的程序的要求了。于是就發(fā)明了線程,線程是程序執(zhí)行中一個(gè)單一的順序控制流程,是程序執(zhí)行流的最小單元。這里把線程比喻一個(gè)車間的工人,即一個(gè)車間可以允許由多個(gè)工人協(xié)同完成一個(gè)任務(wù),即一個(gè)進(jìn)程中可能包含多個(gè)線程。

線程具有以下特性:

  • 線程作為調(diào)度和分配的基本單位;

  • 多個(gè)線程之間也可并發(fā)執(zhí)行;

  • 線程是真正用來執(zhí)行程序的,執(zhí)行計(jì)算的;

  • 線程不擁有系統(tǒng)資源,但可以訪問隸屬于進(jìn)程的資源,一個(gè)線程只能屬于一個(gè)進(jìn)程;

Node.js 的多進(jìn)程有助于充分利用 CPU 等資源,Node.js 的多線程提升了單進(jìn)程上任務(wù)的并行處理能力。

在 Node.js 中,每個(gè) worker 線程都有他自己的 V8 實(shí)例和事件循環(huán)機(jī)制 (Event Loop)。但是,和進(jìn)程不同,workers 之間是可以共享內(nèi)存的。

二、Node.js 異步機(jī)制

1. Node.js 內(nèi)部線程池、異步機(jī)制以及宏任務(wù)優(yōu)先級劃分

Node.js 的單線程是指程序的主要執(zhí)行線程是單線程,這個(gè)主線程同時(shí)也負(fù)責(zé)事件循環(huán)。而其實(shí)語言內(nèi)部也會(huì)創(chuàng)建線程池來處理主線程程序的 網(wǎng)絡(luò) IO / 文件 IO / 定時(shí)器 等調(diào)用產(chǎn)生的異步任務(wù)。一個(gè)例子就是定時(shí)器 Timer 的實(shí)現(xiàn):在 Node.js 中使用定時(shí)器時(shí),Node.js 會(huì)開啟一個(gè)定時(shí)器線程進(jìn)行計(jì)時(shí),計(jì)時(shí)結(jié)束時(shí),定時(shí)器回調(diào)函數(shù)會(huì)被放入位于主線程的宏任務(wù)隊(duì)列。當(dāng)事件循環(huán)系統(tǒng)執(zhí)行完主線程同步代碼和當(dāng)前階段的所有微任務(wù)時(shí),該回調(diào)任務(wù)最后再被取出執(zhí)行。所以 Node.js 的定時(shí)器其實(shí)是不準(zhǔn)確的,只能保證在預(yù)計(jì)時(shí)間時(shí)我們的回調(diào)任務(wù)被放入隊(duì)列等待執(zhí)行,而不是直接被執(zhí)行。

怎么使用Node實(shí)現(xiàn)輕量化進(jìn)程池和線程池

多線程機(jī)制配合 Node.js 的 evet loop 事件循環(huán)系統(tǒng)讓開發(fā)者在一個(gè)線程內(nèi)就能夠使用異步機(jī)制,包括定時(shí)器、IO、網(wǎng)絡(luò)請求。但為了實(shí)現(xiàn)高響應(yīng)度的高性能服務(wù)器,Node.js 的 Event Loop 在宏任務(wù)上進(jìn)一步劃分了優(yōu)先級。

怎么使用Node實(shí)現(xiàn)輕量化進(jìn)程池和線程池

Node.js 宏任務(wù)之間的優(yōu)先級劃分:Timers > Pending > Poll > Check > Close。

  • Timers Callback: 涉及到時(shí)間,肯定越早執(zhí)行越準(zhǔn)確,所以這個(gè)優(yōu)先級最高很容易理解。

  • Pending Callback:處理網(wǎng)絡(luò)、IO 等異常時(shí)的回調(diào),有的 unix 系統(tǒng)會(huì)等待發(fā)生錯(cuò)誤的上報(bào),所以得處理下。

  • Poll Callback:處理 IO 的 data,網(wǎng)絡(luò)的 connection,服務(wù)器主要處理的就是這個(gè)。

  • Check Callback:執(zhí)行 setImmediate 的回調(diào),特點(diǎn)是剛執(zhí)行完 IO 之后就能回調(diào)這個(gè)。

  • Close Callback:關(guān)閉資源的回調(diào),晚點(diǎn)執(zhí)行影響也不到,優(yōu)先級最低。

Node.js 微任務(wù)之間的優(yōu)化及劃分:process.nextTick > Promise。

2. Node.js 宏任務(wù)和微任務(wù)的執(zhí)行時(shí)機(jī)

node 11 之前,Node.js 的 Event Loop 并不是瀏覽器那種一次執(zhí)行一個(gè)宏任務(wù),然后執(zhí)行所有的微任務(wù),而是執(zhí)行完一定數(shù)量的 Timers 宏任務(wù),再去執(zhí)行所有微任務(wù),然后再執(zhí)行一定數(shù)量的 Pending 的宏任務(wù),然后再去執(zhí)行所有微任務(wù),剩余的 Poll、Check、Close 的宏任務(wù)也是這樣。node 11 之后改為了每個(gè)宏任務(wù)都執(zhí)行所有微任務(wù)了。

而 Node.js 的 宏任務(wù)之間也是有優(yōu)先級的,如果 Node.js 的 Event Loop 每次都是把當(dāng)前優(yōu)先級的所有宏任務(wù)跑完再去跑下一個(gè)優(yōu)先級的宏任務(wù),那么會(huì)導(dǎo)致 “饑餓” 狀態(tài)的發(fā)生。如果某個(gè)階段宏任務(wù)太多,下個(gè)階段就一直執(zhí)行不到了,所以每個(gè)類型的宏任務(wù)有個(gè)執(zhí)行數(shù)量上限的機(jī)制,剩余的交給之后的 Event Loop 再繼續(xù)執(zhí)行。

最終表現(xiàn)就是:也就是執(zhí)行一定數(shù)量的 Timers 宏任務(wù),每個(gè)宏任務(wù)之間執(zhí)行所有微任務(wù),再一定數(shù)量的 Pending Callback 宏任務(wù),每個(gè)宏任務(wù)之間再執(zhí)行所有微任務(wù)。

三、Node.js 的多進(jìn)程

1. 使用 child_process 方式手動(dòng)創(chuàng)建進(jìn)程

Node.js 程序通過 child_process 模塊提供了衍生子進(jìn)程的能力,child_process 提供多種子進(jìn)程的創(chuàng)建方式:

  • spawn 創(chuàng)建新進(jìn)程,執(zhí)行結(jié)果以流的形式返回,只能通過事件來獲取結(jié)果數(shù)據(jù),操作麻煩。

const spawn = require('child_process').spawn;
const ls = spawn('ls', ['-lh', '/usr']);

ls.stdout.on('data', (data) => {
  console.log(`stdout: ${data}`);
});

ls.stderr.on('data', (data) => {
  console.log(`stderr: ${data}`);
});

ls.on('close', (code) => {
  console.log(`child process exited with code ${code}`);
});

  • execFile 創(chuàng)建新進(jìn)程,按照其后面的 File 名字,執(zhí)行一個(gè)可執(zhí)行文件,可以帶選項(xiàng),以回調(diào)形式返回調(diào)用結(jié)果,可以得到完整數(shù)據(jù),方便了很多。

execFile('/path/to/node', ['--version'], function(error, stdout, stderr){
    if(error){
        throw error;
    }
    console.log(stdout);
});

  • exec 創(chuàng)建新進(jìn)程,可以直接執(zhí)行 shell 命令,簡化了 shell 命令執(zhí)行方式,執(zhí)行結(jié)果以回調(diào)方式返回。

exec('ls -al', function(error, stdout, stderr){
    if(error) {
        console.error('error:' + error);
        return;
    }
    console.log('stdout:' + stdout);
    console.log('stderr:' + typeof stderr);
});

  • fork 創(chuàng)建新進(jìn)程,執(zhí)行 node 程序,進(jìn)程擁有完整的 V8 實(shí)例,創(chuàng)建后自動(dòng)開啟主進(jìn)程到子進(jìn)程的 IPC 通信,資源占用最多。

var child = child_process.fork('./anotherSilentChild.js', {
    silent: true
});

child.stdout.setEncoding('utf8');
child.stdout.on('data', function(data){
    console.log(data);
});

其中,spawn 是所有方法的基礎(chǔ),exec 底層是調(diào)用了 execFile。

2. 使用 cluster 方式半自動(dòng)創(chuàng)建進(jìn)程

以下是使用 Cluster 模塊創(chuàng)建一個(gè) http 服務(wù)集群的簡單示例。示例中創(chuàng)建 Cluster 時(shí)使用同一個(gè) Js 執(zhí)行文件,在文件內(nèi)使用 cluster.isPrimary 判斷當(dāng)前執(zhí)行環(huán)境是在主進(jìn)程還是子進(jìn)程,如果是主進(jìn)程則使用當(dāng)前執(zhí)行文件創(chuàng)建子進(jìn)程實(shí)例,如果時(shí)子進(jìn)程則進(jìn)入子進(jìn)程的業(yè)務(wù)處理流程。

/*
  簡單示例:使用同一個(gè) JS 執(zhí)行文件創(chuàng)建子進(jìn)程集群 Cluster
*/
const cluster = require('node:cluster');
const http = require('node:http');
const numCPUs = require('node:os').cpus().length;
const process = require('node:process');

if (cluster.isPrimary) {
  console.log(`Primary ${process.pid} is running`);
  // Fork workers.
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }
  cluster.on('exit', (worker, code, signal) => {
    console.log(`worker ${worker.process.pid} died`);
  });
} else {
  // Workers can share any TCP connection
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end('hello world\n');
  }).listen(8000);
  console.log(`Worker ${process.pid} started`);
}

Cluster 模塊允許設(shè)立一個(gè)主進(jìn)程和若干個(gè)子進(jìn)程,使用 child_process.fork() 在內(nèi)部隱式創(chuàng)建子進(jìn)程,由主進(jìn)程監(jiān)控和協(xié)調(diào)子進(jìn)程的運(yùn)行。

子進(jìn)程之間采用進(jìn)程間通信交換消息,Cluster 模塊內(nèi)置一個(gè)負(fù)載均衡器,采用 Round-robin 算法(輪流執(zhí)行)協(xié)調(diào)各個(gè)子進(jìn)程之間的負(fù)載。運(yùn)行時(shí),所有新建立的連接都由主進(jìn)程完成,然后主進(jìn)程再把 TCP 連接分配給指定的子進(jìn)程。

使用集群創(chuàng)建的子進(jìn)程可以使用同一個(gè)端口,Node.js 內(nèi)部對 http/net 內(nèi)置模塊進(jìn)行了特殊支持。Node.js 主進(jìn)程負(fù)責(zé)監(jiān)聽目標(biāo)端口,收到請求后根據(jù)負(fù)載均衡策略將請求分發(fā)給某一個(gè)子進(jìn)程。

3. 使用基于 Cluster 封裝的 PM2 工具全自動(dòng)創(chuàng)建進(jìn)程

PM2 是常用的 node 進(jìn)程管理工具,它可以提供 node.js 應(yīng)用管理能力,如自動(dòng)重載、性能監(jiān)控、負(fù)載均衡等。

其主要用于 獨(dú)立應(yīng)用 的進(jìn)程化管理,在 Node.js 單機(jī)服務(wù)部署方面比較適合??梢杂糜谏a(chǎn)環(huán)境下啟動(dòng)同個(gè)應(yīng)用的多個(gè)實(shí)例提高 CPU 利用率、抗風(fēng)險(xiǎn)、熱加載等能力。

由于是外部庫,需要使用 npm 包管理器安裝:

$: npm install -g pm2

pm2 支持直接運(yùn)行 server.js 啟動(dòng)項(xiàng)目,如下:

$: pm2 start server.js

即可啟動(dòng) Node.js 應(yīng)用,成功后會(huì)看到打印的信息:

┌──────────┬────┬─────────┬──────┬───────┬────────┬─────────┬────────┬─────┬───────────┬───────┬──────────┐
│ App name │ id │ version │ mode │ pid   │ status │ restart │ uptime │ cpu │ mem       │ user  │ watching │
├──────────┼────┼─────────┼──────┼───────┼────────┼─────────┼────────┼─────┼───────────┼───────┼──────────┤
│ server   │ 0  │ 1.0.0   │ fork │ 24776 │ online │ 9       │ 19m    │ 0%  │ 35.4 MB   │ 23101 │ disabled │
└──────────┴────┴─────────┴──────┴───────┴────────┴─────────┴────────┴─────┴───────────┴───────┴──────────┘

pm2 也支持配置文件啟動(dòng),通過配置文件 ecosystem.config.js 可以定制 pm2 的各項(xiàng)參數(shù):

module.exports = {
  apps : [{
    name: 'API', // 應(yīng)用名
    script: 'app.js', // 啟動(dòng)腳本
    args: 'one two', // 命令行參數(shù)
    instances: 1, // 啟動(dòng)實(shí)例數(shù)量
    autorestart: true, // 自動(dòng)重啟
    watch: false, // 文件更改監(jiān)聽器
    max_memory_restart: '1G', // 最大內(nèi)存使用亮
    env: { // development 默認(rèn)環(huán)境變量
      // pm2 start ecosystem.config.js --watch --env development
      NODE_ENV: 'development'
    },
    env_production: { // production 自定義環(huán)境變量
      NODE_ENV: 'production'
    }
  }],

  deploy : {
    production : {
      user : 'node',
      host : '212.83.163.1',
      ref  : 'origin/master',
      repo : 'git@github.com:repo.git',
      path : '/var/www/production',
      'post-deploy' : 'npm install && pm2 reload ecosystem.config.js --env production'
    }
  }
};

pm2 logs 日志功能也十分強(qiáng)大:

$: pm2 logs

II. Node.js 中進(jìn)程池和線程池的適用場景

一般我們使用計(jì)算機(jī)執(zhí)行的任務(wù)包含以下幾種類型的任務(wù):

  • 計(jì)算密集型任務(wù):任務(wù)包含大量計(jì)算,CPU 占用率高。


    const matrix = {};
    for (let i = 0; i < 10000; i++) {
     for (let j = 0; j < 10000; j++) {
       matrix[`${i}${j}`] = i * j;
     }
    }

  • IO 密集型任務(wù):任務(wù)包含頻繁的、持續(xù)的網(wǎng)絡(luò) IO 和磁盤 IO 的調(diào)用。


    const {copyFileSync, constants} = require('fs');
    copyFileSync('big-file.zip', 'destination.zip');

  • 混合型任務(wù):既有計(jì)算也有 IO。

一、進(jìn)程池的適用場景

使用進(jìn)程池的最大意義在于充分利用多核 CPU 資源,同時(shí)減少子進(jìn)程創(chuàng)建和銷毀的資源消耗

進(jìn)程是操作系統(tǒng)分配資源的基本單位,使用多進(jìn)程架構(gòu)能夠更多的獲取 CPU 時(shí)間、內(nèi)存等資源。為了應(yīng)對 CPU-Sensitive 場景,以及充分發(fā)揮 CPU 多核性能,Node 提供了 child_process 模塊用于創(chuàng)建子進(jìn)程。

子進(jìn)程的創(chuàng)建和銷毀需要較大的資源成本,因此池化子進(jìn)程的創(chuàng)建和銷毀過程,利用進(jìn)程池來管理所有子進(jìn)程。

除了這一點(diǎn),Node.js 中子進(jìn)程也是唯一的執(zhí)行二進(jìn)制文件的方式,Node.js 可通過流 (stdin/stdout/stderr) 或 IPC 和子進(jìn)程通信。

通過 Stream 通信

const {spawn} = require('child_process');
const ls = spawn('ls', ['-lh', '/usr']);

ls.stdout.on('data', (data) => {
  console.log(`stdout: ${data}`);
});

ls.stderr.on('data', (data) => {
  console.error(`stderr: ${data}`);
});

ls.on('close', (code) => {
  console.log(`child process exited with code ${code}`);
});

通過 IPC 通信

const cp = require('child_process');
const n = cp.fork(`${__dirname}/sub.js`);

n.on('message', (m) => {
  console.log('PARENT got message:', m);
});

n.send({hello: 'world'});

二、線程池的適用場景

使用線程池的最大意義在于多任務(wù)并行,為主線程降壓,同時(shí)減少線程創(chuàng)建和銷毀的資源消耗。單個(gè) CPU 密集性的計(jì)算任務(wù)使用線程執(zhí)行并不會(huì)更快,甚至線程的創(chuàng)建、銷毀、上下文切換、線程通信、數(shù)據(jù)序列化等操作還會(huì)額外增加資源消耗。

但是如果一個(gè)計(jì)算機(jī)程序中有很多同一類型的阻塞任務(wù)需要執(zhí)行,那么將他們交給線程池可以成倍的減少任務(wù)總的執(zhí)行時(shí)間,因?yàn)樵谕粫r(shí)刻多個(gè)線程在并行進(jìn)行計(jì)算。如果多個(gè)任務(wù)只使用主線程執(zhí)行,那么最終消耗的時(shí)間是線性疊加的,同時(shí)主線程阻塞之后也會(huì)影響其它任務(wù)的處理。

特別是對 Node.js 這種單主線程的語言來講,主線程如果消耗了過多的時(shí)間來執(zhí)行這些耗時(shí)任務(wù),那么對整個(gè) Node.js 單個(gè)進(jìn)程實(shí)例的性能影響將是致命的。這些占用著 CPU 時(shí)間的操作將導(dǎo)致其它任務(wù)獲取的 CPU 時(shí)間不足或 CPU 響應(yīng)不夠及時(shí),被影響的任務(wù)將進(jìn)入 “饑餓” 狀態(tài)。

因此 Node.js 啟動(dòng)后主線程應(yīng)盡量承擔(dān)調(diào)度的角色,批量重型 CPU 占用任務(wù)的執(zhí)行應(yīng)交由額外的工作線程處理,主線程最后拿到工作線程的執(zhí)行結(jié)果再返回給任務(wù)調(diào)用方。另一方面由于 IO 操作 Node.js 內(nèi)部作了優(yōu)化和支持,因此 IO 操作應(yīng)該直接交給主線程,主線程再使用內(nèi)部線程池處理。

Node.js 的異步能不能解決過多占用 CPU 任務(wù)的執(zhí)行問題?

答案是:不能,過多的異步 CPU 占用任務(wù)會(huì)阻塞事件循環(huán)。

Node.js 的異步在 網(wǎng)絡(luò) IO / 磁盤 IO 處理上很有用,宏任務(wù)微任務(wù)系統(tǒng) + 內(nèi)部線程調(diào)用能分擔(dān)主進(jìn)程的執(zhí)行壓力。但是如果單獨(dú)將 CPU 占用任務(wù)放入宏任務(wù)隊(duì)列或微任務(wù)隊(duì)列,對任務(wù)的執(zhí)行速度提升沒有任何幫助,只是一種任務(wù)調(diào)度方式的優(yōu)化而已。

我們只是延遲了任務(wù)的執(zhí)行或是將巨大任務(wù)分散成多個(gè)再分批執(zhí)行,但是任務(wù)最終還是要在主線程被執(zhí)行。如果這類任務(wù)過多,那么任務(wù)分片和延遲的效果將完全消失,一個(gè)任務(wù)可以,那十個(gè)一百個(gè)呢?量變將會(huì)引起質(zhì)變。

以下是 Node.js 官方博客中的原文:

“如果你需要做更復(fù)雜的任務(wù),拆分可能也不是一個(gè)好選項(xiàng)。這是因?yàn)椴鸱种笕蝿?wù)仍然在事件循環(huán)線程中執(zhí)行,并且你無法利用機(jī)器的多核硬件能力。 請記住,事件循環(huán)線程只負(fù)責(zé)協(xié)調(diào)客戶端的請求,而不是獨(dú)自執(zhí)行完所有任務(wù)。 對一個(gè)復(fù)雜的任務(wù),最好把它從事件循環(huán)線程轉(zhuǎn)移到工作線程池上?!?/p>

  • 場景:間歇性讓主進(jìn)程 癱瘓

每一秒鐘,主線程有一半時(shí)間被占用

// this task costs 100ms
function doHeavyTask() { ...}

setInterval(() => {
  doHeavyTask(); // 100ms
  doHeavyTask(); // 200ms
  doHeavyTask(); // 300ms
  doHeavyTask(); // 400ms
  doHeavyTask(); // 500ms
}, 1e3);

  • 場景:高頻性讓主進(jìn)程 半癱瘓

每 200ms,主線程有一半時(shí)間被占用

// this task costs 100ms
function doHeavyTask() { ...}

setInterval(() => {
  doHeavyTask();
}, 1e3);

setInterval(() => {
  doHeavyTask();
}, 1.2e3);

setInterval(() => {
  doHeavyTask();
}, 1.4e3);

setInterval(() => {
  doHeavyTask();
}, 1.6e3);

setInterval(() => {
  doHeavyTask();
}, 1.8e3);

III. 進(jìn)程池

進(jìn)程池是對進(jìn)程的創(chuàng)建、執(zhí)行任務(wù)、銷毀等流程進(jìn)行管控的一個(gè)應(yīng)用或是一套程序邏輯。之所以稱之為池是因?yàn)槠鋬?nèi)部包含多個(gè)進(jìn)程實(shí)例,進(jìn)程實(shí)例隨時(shí)都在進(jìn)程池內(nèi)進(jìn)行著狀態(tài)流轉(zhuǎn),多個(gè)創(chuàng)建的實(shí)例可以被重復(fù)利用,而不是每次執(zhí)行完一系列任務(wù)后就被銷毀。因此,進(jìn)程池的部分存在目的是為了減少進(jìn)程創(chuàng)建的資源消耗。

此外進(jìn)程池最重要的一個(gè)作用就是負(fù)責(zé)將任務(wù)分發(fā)給各個(gè)進(jìn)程執(zhí)行,各個(gè)進(jìn)程的任務(wù)執(zhí)行優(yōu)先級取決于進(jìn)程池上的負(fù)載均衡運(yùn)算,由算法決定應(yīng)該將當(dāng)前任務(wù)派發(fā)給哪個(gè)進(jìn)程,以達(dá)到最高的 CPU 和內(nèi)存利用率。常見的負(fù)載均衡算法有:

  • POLLING - 輪詢:子進(jìn)程輪流處理請求

  • WEIGHTS - 權(quán)重:子進(jìn)程根據(jù)設(shè)置的權(quán)重來處理請求

  • RANDOM - 隨機(jī):子進(jìn)程隨機(jī)處理請求

  • SPECIFY - 指定:子進(jìn)程根據(jù)指定的進(jìn)程 id 處理請求

  • WEIGHTS_POLLING - 權(quán)重輪詢:權(quán)重輪詢策略與輪詢策略類似,但是權(quán)重輪詢策略會(huì)根據(jù)權(quán)重來計(jì)算子進(jìn)程的輪詢次數(shù),從而穩(wěn)定每個(gè)子進(jìn)程的平均處理請求數(shù)量。

  • WEIGHTS_RANDOM - 權(quán)重隨機(jī):權(quán)重隨機(jī)策略與隨機(jī)策略類似,但是權(quán)重隨機(jī)策略會(huì)根據(jù)權(quán)重來計(jì)算子進(jìn)程的隨機(jī)次數(shù),從而穩(wěn)定每個(gè)子進(jìn)程的平均處理請求數(shù)量。

  • MINIMUM_CONNECTION - 最小連接數(shù):選擇子進(jìn)程上具有最小連接活動(dòng)數(shù)量的子進(jìn)程處理請求。

  • WEIGHTS_MINIMUM_CONNECTION - 權(quán)重最小連接數(shù):權(quán)重最小連接數(shù)策略與最小連接數(shù)策略類似,不過各個(gè)子進(jìn)程被選中的概率由連接數(shù)和權(quán)重共同決定。

一、要點(diǎn)

「 對單一任務(wù)的控制不重要,對單個(gè)進(jìn)程宏觀的資源占用更需關(guān)注 」

二、流程設(shè)計(jì)

怎么使用Node實(shí)現(xiàn)輕量化進(jìn)程池和線程池

1. 關(guān)鍵流程
  • 進(jìn)程池創(chuàng)建進(jìn)程時(shí)會(huì)初始化進(jìn)程實(shí)例內(nèi)的 ProcessHost 事務(wù)對象,進(jìn)程實(shí)例向事務(wù)對象注冊多種任務(wù)監(jiān)聽器。

  • 用戶向進(jìn)程池發(fā)起單個(gè)任務(wù)調(diào)用請求,可傳入進(jìn)程綁定的 ID 和指定的任務(wù)名。

  • 判斷用戶是否傳入 ID 參數(shù)指定使用某個(gè)進(jìn)程執(zhí)行任務(wù),如果未指定 ID:

    • 進(jìn)程池判斷當(dāng)前進(jìn)程池進(jìn)程數(shù)量是否已超過最大值,如果未超過則創(chuàng)建新進(jìn)程,用此進(jìn)程處理當(dāng)前任務(wù),并將進(jìn)程放入進(jìn)程池。

    • 如果進(jìn)程池進(jìn)程數(shù)量已達(dá)最大值,則根據(jù)負(fù)載均衡算法選擇一個(gè)進(jìn)程處理當(dāng)前任務(wù)。

  • 指定 ID 時(shí):

    • 通過用戶傳入的 ID 參數(shù)找到對應(yīng)進(jìn)程,將任務(wù)分發(fā)給此進(jìn)程執(zhí)行。

    • 如果未找到 ID 所對應(yīng)的進(jìn)程,則向用戶拋出異常。

  • 任務(wù)由進(jìn)程池派發(fā)給目標(biāo)進(jìn)程后,ProcessHost 事務(wù)對象會(huì)根據(jù)該任務(wù)的任務(wù)名觸發(fā)子進(jìn)程內(nèi)的監(jiān)聽器。

  • 子進(jìn)程內(nèi)的監(jiān)聽器函數(shù)可執(zhí)行同步任務(wù)和異步任務(wù),異步任務(wù)返回 Promise 對象,同步任務(wù)返回值。

  • ProcessHost 事務(wù)對象的監(jiān)聽器函數(shù)執(zhí)行完畢后,會(huì)將任務(wù)結(jié)果返回給進(jìn)程池,進(jìn)程池再將結(jié)果通過異步回調(diào)函數(shù)返回給用戶。

  • 用戶也可向進(jìn)程池所有子進(jìn)程發(fā)起個(gè)任務(wù)調(diào)用請求,最終將會(huì)通過 Promise 的返回所有子進(jìn)程的任務(wù)執(zhí)行結(jié)果。

2. 名詞解釋
  • ProcessHost 事務(wù)中心:運(yùn)行在子進(jìn)程中,用于事件觸發(fā)以及和主進(jìn)程通信。開發(fā)者在子進(jìn)程執(zhí)行文件中向其注冊多個(gè)具有特定任務(wù)名的任務(wù)事件,主進(jìn)程會(huì)向某個(gè)子進(jìn)程發(fā)送任務(wù)請求,并由事務(wù)中心調(diào)用指定的事件監(jiān)聽器處理請求。

  • LoadBalancer 負(fù)載均衡器:用于選擇一個(gè)進(jìn)程處理任務(wù),可根據(jù)不同的負(fù)載均衡算法實(shí)現(xiàn)不同的選擇策略。

  • LifeCycle: 設(shè)計(jì)之初用于管控子進(jìn)程的智能啟停,某個(gè)進(jìn)程在長時(shí)間未被使用時(shí)進(jìn)入休眠狀態(tài),當(dāng)有新任務(wù)到來時(shí)再喚醒進(jìn)程。目前還有些難點(diǎn)需要解決,比如進(jìn)程的喚醒和休眠不好實(shí)現(xiàn),進(jìn)程的使用情況不好統(tǒng)計(jì),該功能暫時(shí)不可用。

三、進(jìn)程池使用方式

1. 創(chuàng)建進(jìn)程池

main.js

const { ChildProcessPool, LoadBalancer } = require('electron-re');

const processPool = new ChildProcessPool({
  path: path.join(__dirname, 'child_process/child.js'),
  max: 4,
  strategy: LoadBalancer.ALGORITHM.POLLING,
);

child.js

const { ProcessHost } = require('electron-re');

ProcessHost
  .registry('test1', (params) => {
    console.log('test1');
    return 1 + 1;
  })
  .registry('test2', (params) => {
    console.log('test2');
    return new Promise((resolve) => resolve(true));
  });

2. 向一個(gè)子進(jìn)程發(fā)送任務(wù)請求

processPool.send('test1', { value: "test1"}).then((result) => {
  console.log(result);
});

3. 向所有子進(jìn)程發(fā)送任務(wù)請求

processPool.sendToAll('test1', { value: "test1"}).then((results) => {
  console.log(results);
});

四、進(jìn)程池實(shí)際使用場景

1. Electron 網(wǎng)頁代理工具中多進(jìn)程的應(yīng)用

1)基本代理原理:

怎么使用Node實(shí)現(xiàn)輕量化進(jìn)程池和線程池

2)單進(jìn)程下客戶端執(zhí)行原理:

  • 通過用戶預(yù)先保存的服務(wù)器配置信息,使用 node.js 子進(jìn)程來啟動(dòng) ss-local 可執(zhí)行文件建立和 ss 服務(wù)器的連接來代理用戶本地電腦的流量,每個(gè)子進(jìn)程占用一個(gè) socket 端口。

  • 其它支持 socks5 代理的 proxy 工具比如:瀏覽器上的 SwitchOmega 插件會(huì)和這個(gè)端口的 tcp 服務(wù)建立連接,將 tcp 流量加密后通過代理服務(wù)器轉(zhuǎn)發(fā)給我們需要訪問的目標(biāo)服務(wù)器。

怎么使用Node實(shí)現(xiàn)輕量化進(jìn)程池和線程池

3)多進(jìn)程下客戶端執(zhí)行原理:

以上描述的是客戶端連接單個(gè)節(jié)點(diǎn)的工作模式,節(jié)點(diǎn)訂閱組中的負(fù)載均衡模式需要同時(shí)啟動(dòng)多個(gè)子進(jìn)程,每個(gè)子進(jìn)程啟動(dòng) ss-local 執(zhí)行文件占用一個(gè)本地端口并連接到遠(yuǎn)端一個(gè)服務(wù)器節(jié)點(diǎn)。

每個(gè)子進(jìn)程啟動(dòng)時(shí)選擇的端口是會(huì)變化的,因?yàn)槟承┒丝诳赡芤呀?jīng)被系統(tǒng)占用,程序需要先選擇未被使用的端口。并且瀏覽器 proxy 工具也不可能同時(shí)連接到我們本地啟動(dòng)的子進(jìn)程上的多個(gè) ss-local 服務(wù)上。因此需要一個(gè)占用固定端口的中間節(jié)點(diǎn)接收 proxy 工具發(fā)出的連接請求,然后按照某種分發(fā)規(guī)則將 tcp 流量轉(zhuǎn)發(fā)到各個(gè)子進(jìn)程的 ss-local 服務(wù)的端口上。

怎么使用Node實(shí)現(xiàn)輕量化進(jìn)程池和線程池

2. 多進(jìn)程文件分片上傳 Electron 客戶端

之前做過一個(gè)支持 SMB 協(xié)議多文件分片上傳的客戶端,Node.js 端的上傳任務(wù)管理、IO 操作等都使用多進(jìn)程實(shí)現(xiàn)過一版本,不過是在 gitlab 實(shí)驗(yàn)分支自己搞得(逃)。

怎么使用Node實(shí)現(xiàn)輕量化進(jìn)程池和線程池

IV. 線程池

為了減小 CPU 密集型任務(wù)計(jì)算的系統(tǒng)開銷,Node.js 引入了新的特性:工作線程 worker_threads,其首次在 v10.5.0 作為實(shí)驗(yàn)性功能出現(xiàn)。通過 worker_threads 可以在進(jìn)程內(nèi)創(chuàng)建多個(gè)線程,主線程與 worker 線程使用 parentPort 通信,worker 線程之間可通過 MessageChannel 直接通信。worker_threads 做為開發(fā)者使用線程的重要特性,在 v12.11.0 穩(wěn)定版已經(jīng)能正常在生產(chǎn)環(huán)境使用了。

但是線程的創(chuàng)建需要額外的 CPU 和內(nèi)存資源,如果要多次使用一個(gè)線程的話,應(yīng)該將其保存起來,當(dāng)該線程完全不使用時(shí)需要及時(shí)關(guān)閉以減少內(nèi)存占用。想象我們在需要使用線程時(shí)直接創(chuàng)建,使用完后立刻銷毀,可能線程自身的創(chuàng)建和銷毀成本已經(jīng)超過了使用線程本身節(jié)省下的資源成本。Node.js 內(nèi)部雖然有使用線程池,但是對于開發(fā)者而言是完全透明不可見的,因此封裝一個(gè)能夠維護(hù)線程生命周期的線程池工具的重要性就體現(xiàn)了。

為了強(qiáng)化多異步任務(wù)的調(diào)度,線程池除了提供維護(hù)線程的能力,也提供維護(hù)任務(wù)隊(duì)列的能力。當(dāng)發(fā)送請求給線程池讓其執(zhí)行一個(gè)異步任務(wù)時(shí),如果線程池內(nèi)沒有空閑線程,那該任務(wù)就會(huì)被直接丟棄了,顯然這不是想要的效果。

因此可以考慮為線程池添加一個(gè)任務(wù)隊(duì)列的調(diào)度邏輯:當(dāng)線程池沒有空閑線程時(shí),將該任務(wù)放入待執(zhí)行任務(wù)隊(duì)列 (FIFO),線程池在某個(gè)時(shí)機(jī)取出任務(wù)交由某個(gè)空閑線程執(zhí)行,執(zhí)行完成后觸發(fā)異步回調(diào)函數(shù),將執(zhí)行結(jié)果返回給請求調(diào)用方。但是線程池的任務(wù)隊(duì)列內(nèi)的任務(wù)數(shù)量應(yīng)該考慮限制到一個(gè)特殊值,防止線程池負(fù)載過大影響 Node.js 應(yīng)用整體運(yùn)行性能。

一、要點(diǎn)

「 對單一任務(wù)的控制重要,對單個(gè)線程的資源占用無需關(guān)注 」

二、詳細(xì)設(shè)計(jì)

怎么使用Node實(shí)現(xiàn)輕量化進(jìn)程池和線程池

任務(wù)流轉(zhuǎn)過程
  • 調(diào)用者可通過 StaticPool/StaticExcutor/DynamicPool/DynamicExcutor 實(shí)例向線程池派發(fā)任務(wù)(以下有關(guān)鍵名詞說明),各種實(shí)例的之間最大的不同點(diǎn)就是參數(shù)動(dòng)態(tài)化能力。

  • 任務(wù)由線程池內(nèi)部生成,生成后任務(wù)做為主要的流轉(zhuǎn)載體,一方面承載用戶傳入的任務(wù)計(jì)算參數(shù),另一方面記錄任務(wù)流轉(zhuǎn)過程中的狀態(tài)變化,比如:任務(wù)狀態(tài)、開始時(shí)間、結(jié)束時(shí)間、任務(wù) ID、任務(wù)重試次數(shù)、任務(wù)是否支持重試、任務(wù)類型等。

  • 任務(wù)生成后,首先判斷當(dāng)前線程池的線程數(shù)是否已達(dá)上限,如果未達(dá)上限,則新建線程并將其放入線程存儲(chǔ)區(qū),然后使用該線程直接執(zhí)行當(dāng)前任務(wù)。

  • 如果線程池線程數(shù)超限,則判斷是否有未執(zhí)行任務(wù)的空閑線程,拿到空閑線程后,使用該線程直接執(zhí)行當(dāng)前任務(wù)。

  • 如果沒有空閑線程,則判斷當(dāng)前等待任務(wù)隊(duì)列是否已滿,任務(wù)隊(duì)列已滿則拋出錯(cuò)誤,第一時(shí)間讓調(diào)用者感知任務(wù)未執(zhí)行成功。

  • 如果任務(wù)隊(duì)列未滿的話,將該任務(wù)放入任務(wù)隊(duì)列,等待任務(wù)循環(huán)系統(tǒng)取出將其執(zhí)行。

  • 以上 4/5/6 步的三種情況下任務(wù)執(zhí)行后,判斷該任務(wù)是否執(zhí)行成功,成功時(shí)觸發(fā)成功的回調(diào)函數(shù),Promise 狀態(tài)為 fullfilled。如果失敗,則判斷是否支持重試,支持重試的情況下,將該任務(wù)重試次數(shù) + 1 后重新放入任務(wù)隊(duì)列尾部。任務(wù)不支持重試的情況下,直接失敗,并觸發(fā)失敗的異步回調(diào)函數(shù),Promise 狀態(tài)為 rejected。

  • 整個(gè)線程池生命周期中,存在一個(gè)任務(wù)循環(huán)系統(tǒng),以一定的周期頻率從任務(wù)隊(duì)列首部獲取任務(wù),并從線程存儲(chǔ)區(qū)域獲取空閑線程后使用該線程執(zhí)行任務(wù),該流程也符合第 7 步的描述。

  • 任務(wù)循環(huán)系統(tǒng)除了取任務(wù)執(zhí)行,如果線程池設(shè)置了任務(wù)超時(shí)時(shí)間的話,也會(huì)判斷正在執(zhí)行中的任務(wù)是否超時(shí),超時(shí)后會(huì)終止該線程的所有運(yùn)行中的代碼。

模塊說明
  • StaticPool

    • 定義:靜態(tài)線程池,可使用固定的 execFunction/execString/execFile 執(zhí)行參數(shù)來啟動(dòng)工作線程,執(zhí)行參數(shù)在進(jìn)程池創(chuàng)建后不能更改。

    • 進(jìn)程池創(chuàng)建之后除了執(zhí)行參數(shù)不可變外,其它參數(shù)比如:任務(wù)超時(shí)時(shí)間、任務(wù)重試次數(shù)、線程池任務(wù)輪詢間隔時(shí)間、最大任務(wù)數(shù)、最大線程數(shù)、是否懶創(chuàng)建線程等都可以通過 API 隨時(shí)更改。

  • StaticExcutor

    • 定義:靜態(tài)線程池的執(zhí)行器實(shí)例,繼承所屬線程池的固定執(zhí)行參數(shù) execFunction/execString/execFile 且不可更改。

    • 執(zhí)行器實(shí)例創(chuàng)建之后除了執(zhí)行參數(shù)不可變外,其它參數(shù)比如:任務(wù)超時(shí)時(shí)間、任務(wù)重試次數(shù)、transferList 等都可以通過 API 隨時(shí)更改。

    • 靜態(tài)線程池的各個(gè)執(zhí)行器實(shí)例的參數(shù)設(shè)置互不影響,參數(shù)默認(rèn)繼承于所屬線程池,參數(shù)在執(zhí)行器上更改后具有比所屬線程池同名參數(shù)更高的優(yōu)先級。

  • DynamicPool

    • 定義:動(dòng)態(tài)線程池,無需使用 execFunction/execString/execFile 執(zhí)行參數(shù)即可創(chuàng)建線程池。執(zhí)行參數(shù)在調(diào)用 exec() 方法時(shí)動(dòng)態(tài)傳入,因此執(zhí)行參數(shù)可能不固定。

    • 線程池創(chuàng)建之后執(zhí)行參數(shù)默認(rèn)為 null,其它參數(shù)比如:任務(wù)超時(shí)時(shí)間、任務(wù)重試次數(shù)、transferList 等都可以通過 API 隨時(shí)更改。

  • DynamicExcutor

    • 定義:動(dòng)態(tài)線程池的執(zhí)行器實(shí)例,繼承所屬線程池的其它參數(shù),執(zhí)行參數(shù)為 null

    • 執(zhí)行器實(shí)例創(chuàng)建之后,其它參數(shù)比如:任務(wù)超時(shí)時(shí)間、任務(wù)重試次數(shù)、transferList 等都可以通過 API 隨時(shí)更改。

    • 動(dòng)態(tài)線程池的各個(gè)執(zhí)行器實(shí)例的參數(shù)設(shè)置互不影響,參數(shù)默認(rèn)繼承于所屬線程池,參數(shù)在執(zhí)行器上更改后具有比所屬線程池同名參數(shù)更高的優(yōu)先級。

    • 動(dòng)態(tài)執(zhí)行器實(shí)例在執(zhí)行任務(wù)之前需要先設(shè)置執(zhí)行參數(shù) execFunction/execString/execFile,執(zhí)行參數(shù)可以隨時(shí)改變。

  • ThreadGenerator

    • 定義:線程創(chuàng)建的工廠方法,會(huì)進(jìn)行參數(shù)校驗(yàn)。

  • Thread

    • 定義:線程實(shí)例,內(nèi)部簡單封裝了 worker_threads API。

  • TaskGenerator

    • 定義:任務(wù)創(chuàng)建的工廠方法,會(huì)進(jìn)行參數(shù)校驗(yàn)。

  • Task

    • 定義:單個(gè)任務(wù),記錄了任務(wù)執(zhí)行狀態(tài)、任務(wù)開始結(jié)束時(shí)間、任務(wù)重試次數(shù)、任務(wù)攜帶參數(shù)等。

  • TaskQueue

    • 定義:任務(wù)隊(duì)列,在數(shù)組中存放任務(wù),以先入先出方式 (FIFO) 向線程池提供任務(wù),使用 Map 來存儲(chǔ) taskId 和 task 之間的映射關(guān)系。

  • Task Loop

    • 任務(wù)循環(huán),每個(gè)循環(huán)的默認(rèn)時(shí)間間隔為 2S,每次循環(huán)中會(huì)處理超時(shí)任務(wù)、將新任務(wù)派發(fā)給空閑線程等。

三、線程池使用方式

更多示例見:線程池 mocha 單元測試

1. 創(chuàng)建靜態(tài)線程池

main.js

const { StaticThreadPool } = require(`electron-re`);
const threadPool = new StaticThreadPool({
  execPath: path.join(__dirname, './worker_threads/worker.js'),
  lazyLoad: true, // 懶加載
  maxThreads: 24, // 最大線程數(shù)
  maxTasks: 48, // 最大任務(wù)數(shù)
  taskRetry: 1, // 任務(wù)重試次數(shù)
  taskLoopTime: 1e3, // 任務(wù)輪詢時(shí)間
});
const executor = threadPool.createExecutor();

登錄后復(fù)制

worker.js

const fibonaccis = (n) => {
  if (n < 2) {
    return n;
  }
  return fibonaccis(n - 1) + fibonaccis(n - 2);
};

module.exports = (value) => {
  return fibonaccis(value);
}

2. 使用靜態(tài)線程池發(fā)送任務(wù)請求

threadPool.exec(15).then((res) => {
  console.log(+res.data === 610)
});

executor
  .setTaskRetry(2) // 不影響 pool 的全局設(shè)置
  .setTaskTimeout(2e3) // 不影響 pool 的全局設(shè)置
  .exec(15).then((res) => {
    console.log(+res.data === 610)
  });

3. 動(dòng)態(tài)線程池和動(dòng)態(tài)執(zhí)行器

const { DynamicThreadPool } = require(`electron-re`);
const threadPool = new DynamicThreadPool({
  maxThreads: 24, // 最大線程數(shù)
  maxTasks: 48, // 最大任務(wù)數(shù)
  taskRetry: 1, // 任務(wù)重試次數(shù)
});
const executor = threadPool.createExecutor({
  execFunction: (value) => { return 'dynamic:' + value; },
});

threadPool.exec('test', {
  execString: `module.exports = (value) => { return 'dynamic:' + value; };`,
});
executor.exec('test');
executor
  .setExecPath('/path/to/exec-file.js')
  .exec('test');


以上就是“怎么使用Node實(shí)現(xiàn)輕量化進(jìn)程池和線程池”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會(huì)為大家更新不同的知識,如果還想學(xué)習(xí)更多的知識,請關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI