您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關(guān)Node.js中多進程模型的示例分析,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
Node.js 提供了 Cluster 模塊解決上述問題,通過該模塊,開發(fā)者可以通過創(chuàng)建子進程的模式創(chuàng)建一個集群,充分利用機器或容器的資源,同時該模塊允許多個子進程監(jiān)聽同一個端口。
const cluster = require('cluster'); const http = require('http'); const numCPUs = require('os').cpus().length; if (cluster.isMaster) { // Fork workers. for (let i = 0; i < numCPUs; i++) { cluster.fork(); } cluster.on('exit', function(worker, code, signal) { console.log('worker ' + worker.process.pid + ' died'); }); } else { // Workers can share any TCP connection // In this case it is an HTTP server http.createServer(function(req, res) { res.writeHead(200); res.end("hello world\n"); }).listen(8000); }
首先從 const cluster = require('cluster')
說起,這行代碼導入了 Node 的 Cluster 模塊,而在 Node 內(nèi)部,Master 進程與 Worker 進程引入的文件卻不一樣,詳情見如下代碼:
'use strict'; const childOrPrimary = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'master'; module.exports = require(`internal/cluster/${childOrPrimary}`);
不同的文件意味著兩種進程在執(zhí)行中的表現(xiàn)也不一樣,例如:
// internal/cluster/master.js cluster.isWorker = false; cluster.isMaster = true; // internal/cluster/child.js cluster.isWorker = true; cluster.isMaster = false;
這也是為什么 Cluster 模塊到處的變量能區(qū)分不同類型進程的原因,接下來讓我們分別從主、子進程兩個方向去了解具體的過程
在上述代碼里,Master 進程并沒有做太多事情,只是根據(jù) CPU 數(shù)量去 fork 子進程,那么我們深入到源代碼里大致來看一下,相關(guān)描述均在代碼的注釋內(nèi)
// lib/internal/cluster/master.js // 初始化cluster const cluster = new EventEmitter(); // 創(chuàng)建監(jiān)聽地址與server對應(yīng)的map const handles = new SafeMap(); // 初始化 cluster.isWorker = false; cluster.isMaster = true; cluster.workers = {}; cluster.settings = {}; cluster.SCHED_NONE = SCHED_NONE; // Leave it to the operating system. cluster.SCHED_RR = SCHED_RR; // Master distributes connections. // 自增的子進程id let ids = 0; // 向cluster添加fork方法 cluster.fork = function(env) { // 初始化cluster.settings cluster.setupMaster(); // 為當前fork的子進程生成當前cluster內(nèi)的唯一id const id = ++ids; // 創(chuàng)建子進程 const workerProcess = createWorkerProcess(id, env); // 創(chuàng)建對應(yīng)的worker實例 const worker = new Worker({ id: id, process: workerProcess }); // 省略一些worker的事件監(jiān)聽.... // 監(jiān)聽內(nèi)部消息事件,并交由onmessage處理 worker.process.on('internalMessage', internal(worker, onmessage)); // cluster發(fā)出fork事件 process.nextTick(emitForkNT, worker); // 將worker實例放在cluster.workers中維護 cluster.workers[worker.id] = worker; // 返回worker return worker; }; // 創(chuàng)建子進程函數(shù) function createWorkerProcess(id, env) { // 將主進程的env、調(diào)用cluster.fork時傳入的env以及NODE_UNIQUE_ID env構(gòu)建成一個env對象 const workerEnv = { ...process.env, ...env, NODE_UNIQUE_ID: `${id}` }; // 執(zhí)行參數(shù) const execArgv = [...cluster.settings.execArgv]; // 省略debug模式相關(guān)邏輯... // 調(diào)用child_process模塊的fork函數(shù)創(chuàng)建子進程并返回,至此子進程實例創(chuàng)建完成 return fork(cluster.settings.exec, cluster.settings.args, { cwd: cluster.settings.cwd, env: workerEnv, serialization: cluster.settings.serialization, silent: cluster.settings.silent, windowsHide: cluster.settings.windowsHide, execArgv: execArgv, stdio: cluster.settings.stdio, gid: cluster.settings.gid, uid: cluster.settings.uid }); } // 內(nèi)部消息事件處理函數(shù) function onmessage(message, handle) { const worker = this; if (message.act === 'online') online(worker); // 當子進程向主進程發(fā)出queryServer消息后,執(zhí)行queryServer函數(shù),創(chuàng)建server else if (message.act === 'queryServer') queryServer(worker, message); else if (message.act === 'listening') listening(worker, message); else if (message.act === 'exitedAfterDisconnect') exitedAfterDisconnect(worker, message); else if (message.act === 'close') close(worker, message); } // 獲取server function queryServer(worker, message) { // Stop processing if worker already disconnecting if (worker.exitedAfterDisconnect) return; // 創(chuàng)建當前子進程監(jiān)聽地址信息的key const key = `${message.address}:${message.port}:${message.addressType}:` + `${message.fd}:${message.index}`; // 在handles map中查詢是否有已經(jīng)創(chuàng)建好的該監(jiān)聽地址的server let handle = handles.get(key); // 沒有對應(yīng)的server則進行創(chuàng)建 if (handle === undefined) { let address = message.address; // Find shortest path for unix sockets because of the ~100 byte limit if (message.port < 0 && typeof address === 'string' && process.platform !== 'win32') { address = path.relative(process.cwd(), address); if (message.address.length < address.length) address = message.address; } // 主、子進程處理連接的方式,默認為輪詢 let constructor = RoundRobinHandle; // UDP is exempt from round-robin connection balancing for what should // be obvious reasons: it's connectionless. There is nothing to send to // the workers except raw datagrams and that's pointless. if (schedulingPolicy !== SCHED_RR || message.addressType === 'udp4' || message.addressType === 'udp6') { constructor = SharedHandle; } // 將監(jiān)聽地址信息傳入構(gòu)造函數(shù)創(chuàng)建監(jiān)聽實例 handle = new constructor(key, address, message); // 緩存監(jiān)聽實例 handles.set(key, handle); } // 向server添加自定義信息,用于server發(fā)出listening事件后透傳到worker if (!handle.data) handle.data = message.data; // 添加server發(fā)出listening事件后的回調(diào)函數(shù)通知子進程 handle.add(worker, (errno, reply, handle) => { const { data } = handles.get(key); if (errno) handles.delete(key); // Gives other workers a chance to retry. send(worker, { errno, key, ack: message.seq, data, ...reply }, handle); }); }
// lib/internal/cluster/round_robin_handle.js // 構(gòu)造函數(shù),參數(shù)為server對應(yīng)的key,ip地址(對于http(s)來說),監(jiān)聽相關(guān)信息 function RoundRobinHandle(key, address, { port, fd, flags }) { // 初始化handle this.key = key; this.all = new SafeMap(); this.free = new SafeMap(); this.handles = []; this.handle = null; this.server = net.createServer(assert.fail); // 監(jiān)聽文件描述符,不討論 if (fd >= 0) this.server.listen({ fd }); // 監(jiān)聽ip:port else if (port >= 0) { this.server.listen({ port, host: address, // Currently, net module only supports `ipv6Only` option in `flags`. ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY), }); // 監(jiān)聽UNIX socket,不討論 } else this.server.listen(address); // UNIX socket path. // 注冊server發(fā)出listening事件的回調(diào)函數(shù) this.server.once('listening', () => { this.handle = this.server._handle; this.handle.onconnection = (err, handle) => this.distribute(err, handle); this.server._handle = null; this.server = null; }); } // 添加worker,server發(fā)出listening事件后調(diào)用master.js中傳入的回調(diào)函數(shù) RoundRobinHandle.prototype.add = function(worker, send) { assert(this.all.has(worker.id) === false); this.all.set(worker.id, worker); const done = () => { if (this.handle.getsockname) { const out = {}; this.handle.getsockname(out); // TODO(bnoordhuis) Check err. send(null, { sockname: out }, null); } else { send(null, null, null); // UNIX socket. } this.handoff(worker); // In case there are connections pending. }; if (this.server === null) return done(); // Still busy binding. this.server.once('listening', done); this.server.once('error', (err) => { send(err.errno, null); }); }; // 刪除worker,輪詢時不再分配給該worker RoundRobinHandle.prototype.remove = function(worker) { const existed = this.all.delete(worker.id); if (!existed) return false; this.free.delete(worker.id); if (this.all.size !== 0) return false; for (const handle of this.handles) { handle.close(); } this.handles = []; this.handle.close(); this.handle = null; return true; }; // 輪詢調(diào)度函數(shù) RoundRobinHandle.prototype.distribute = function(err, handle) { ArrayPrototypePush(this.handles, handle); const [ workerEntry ] = this.free; // this.free is a SafeMap if (ArrayIsArray(workerEntry)) { const { 0: workerId, 1: worker } = workerEntry; this.free.delete(workerId); this.handoff(worker); } }; // 將handle交給worker RoundRobinHandle.prototype.handoff = function(worker) { if (!this.all.has(worker.id)) { return; // Worker is closing (or has closed) the server. } const handle = ArrayPrototypeShift(this.handles); if (handle === undefined) { this.free.set(worker.id, worker); // Add to ready queue again. return; } // 向該worker發(fā)出newconn事件 const message = { act: 'newconn', key: this.key }; sendHelper(worker.process, message, handle, (reply) => { if (reply.accepted) handle.close(); else this.distribute(0, handle); // Worker is shutting down. Send to another. this.handoff(worker); }); };
在每個子進程中,我們都創(chuàng)建了一個 HTTP Server,然后執(zhí)行 listen
函數(shù)監(jiān)聽 8000 端口,而 HTTP Server 實例是由 Net Server 原型鏈繼承得到的,listen
函數(shù)即為 Net Server 原型上的 listen
函數(shù),具體如下:
// lib/_http_server.js function Server(options, requestListener) { .... } ObjectSetPrototypeOf(Server.prototype, net.Server.prototype); ObjectSetPrototypeOf(Server, net.Server);
// lib/net.js Server.prototype.listen = function(...args) { // 由于篇幅原因,省略一些參數(shù)nomolize和其他監(jiān)聽的處理 // 經(jīng)過這段邏輯中,會調(diào)用listenInCluster函數(shù)去真正的監(jiān)聽端口 if (typeof options.port === 'number' || typeof options.port === 'string') { validatePort(options.port, 'options.port'); backlog = options.backlog || backlogFromArgs; // start TCP server listening on host:port if (options.host) { lookupAndListen(this, options.port | 0, options.host, backlog, options.exclusive, flags); } else { // Undefined host, listens on unspecified address // Default addressType 4 will be used to search for master server listenInCluster(this, null, options.port | 0, 4, backlog, undefined, options.exclusive); } return this; } // 省略... }; // 集群監(jiān)聽函數(shù) function listenInCluster(server, address, port, addressType, backlog, fd, exclusive, flags) { exclusive = !!exclusive; if (cluster === undefined) cluster = require('cluster'); // 判斷是否是master,單進程中cluster.isMaster默認為true,然后進行監(jiān)聽并返回 if (cluster.isMaster || exclusive) { // Will create a new handle // _listen2 sets up the listened handle, it is still named like this // to avoid breaking code that wraps this method server._listen2(address, port, addressType, backlog, fd, flags); return; } // 在子進程中,會將監(jiān)聽地址信息傳入cluster實例中的_getServer函數(shù)從而獲取一個faux handle const serverQuery = { address: address, port: port, addressType: addressType, fd: fd, flags, }; // Get the master's server handle, and listen on it cluster._getServer(server, serverQuery, listenOnMasterHandle); // 獲取net server回調(diào)函數(shù),拿到faux handle之后,調(diào)用_listen2函數(shù),即setupListenHandle函數(shù) function listenOnMasterHandle(err, handle) { err = checkBindError(err, port, handle); if (err) { const ex = exceptionWithHostPort(err, 'bind', address, port); return server.emit('error', ex); } // Reuse master's server handle server._handle = handle; // _listen2 sets up the listened handle, it is still named like this // to avoid breaking code that wraps this method server._listen2(address, port, addressType, backlog, fd, flags); } } // 啟用監(jiān)聽handle function setupListenHandle(address, port, addressType, backlog, fd, flags) { debug('setupListenHandle', address, port, addressType, backlog, fd); // 如同英文注釋所說的那樣,如果沒有監(jiān)聽句柄,則創(chuàng)建,有監(jiān)聽句柄則跳過 // If there is not yet a handle, we need to create one and bind. // In the case of a server sent via IPC, we don't need to do this. if (this._handle) { debug('setupListenHandle: have a handle already'); } else { debug('setupListenHandle: create a handle'); let rval = null; // 篇幅原因,創(chuàng)建監(jiān)聽句柄的代碼... this._handle = rval; } // 在this上設(shè)置的faux handle上設(shè)置onconnection函數(shù)用于監(jiān)聽連接進入 this._handle.onconnection = onconnection; }
同時,在開始解析的時候我們說過,在引入 Cluster 模塊的時候,會根據(jù)當前進程的env中是否包含NODE_UNIQUE_ID去判斷是否為子進程,若為子進程,則執(zhí)行 child.js
文件
Tips:IPC 通信中發(fā)送的message.cmd的值如果以NODE為前綴,它將響應(yīng)一個內(nèi)部事件internalMessage
// lib/internal/cluster/child.js // 初始化 const cluster = new EventEmitter(); // 存儲生成的 faux handle const handles = new SafeMap(); // 存儲監(jiān)聽地址與監(jiān)聽地址index的對應(yīng)關(guān)系 const indexes = new SafeMap(); cluster.isWorker = true; cluster.isMaster = false; cluster.worker = null; cluster.Worker = Worker; // 子進程啟動時會執(zhí)行該函數(shù),進行初始化,同時在執(zhí)行完畢后,會刪除 env 中的 NODE_UNIQUE_ID 環(huán)境變量 // 詳細代碼見 lib/internal/bootstrap/pre_excution.js 中的 initializeClusterIPC 函數(shù) cluster._setupWorker = function() { // 初始化worker實例 const worker = new Worker({ id: +process.env.NODE_UNIQUE_ID | 0, process: process, state: 'online' }); cluster.worker = worker; // 處理斷開連接事件 process.once('disconnect', () => { worker.emit('disconnect'); if (!worker.exitedAfterDisconnect) { // Unexpected disconnect, master exited, or some such nastiness, so // worker exits immediately. process.exit(0); } }); // IPC 內(nèi)部通信事件監(jiān)聽 process.on('internalMessage', internal(worker, onmessage)); send({ act: 'online' }); function onmessage(message, handle) { // 如果為新連接,則執(zhí)行 onconnection 函數(shù)將得到的句柄傳入子進程中啟動的HTTP Server if (message.act === 'newconn') onconnection(message, handle); else if (message.act === 'disconnect') ReflectApply(_disconnect, worker, [true]); } }; // 添加獲取server函數(shù),會在net server監(jiān)聽端口時被執(zhí)行 // `obj` is a net#Server or a dgram#Socket object. cluster._getServer = function(obj, options, cb) { let address = options.address; // Resolve unix socket paths to absolute paths if (options.port < 0 && typeof address === 'string' && process.platform !== 'win32') address = path.resolve(address); // 生成地址信息的的key const indexesKey = ArrayPrototypeJoin( [ address, options.port, options.addressType, options.fd, ], ':'); // 檢查是否緩存了indexedKey,如果沒有,則表明是新的監(jiān)聽地址,在 master.js 中會生成新的net server let index = indexes.get(indexesKey); if (index === undefined) index = 0; else index++; // 設(shè)置 indexesKey 與 index的對應(yīng)關(guān)系 indexes.set(indexesKey, index); // 傳遞地址信息及index const message = { act: 'queryServer', index, data: null, ...options }; message.address = address; // Set custom data on handle (i.e. tls tickets key) if (obj._getServerData) message.data = obj._getServerData(); // 向主進程發(fā)送queryServer消息 send(message, (reply, handle) => { if (typeof obj._setServerData === 'function') obj._setServerData(reply.data); // 根據(jù)相應(yīng)負載均衡handle添加worker時的處理,執(zhí)行相應(yīng)的負載均衡代碼,并執(zhí)行 cb 函數(shù) // 輪詢是沒有傳遞handle的,對應(yīng)代碼在 RoundRobinHandle.prototype.add 內(nèi) if (handle) shared(reply, handle, indexesKey, cb); // Shared listen socket. else rr(reply, indexesKey, cb); // Round-robin. }); obj.once('listening', () => { cluster.worker.state = 'listening'; const address = obj.address(); message.act = 'listening'; message.port = (address && address.port) || options.port; send(message); }); }; // 創(chuàng)建 faux handle,并保存其對應(yīng)關(guān)系 // Round-robin. Master distributes handles across workers. function rr(message, indexesKey, cb) { if (message.errno) return cb(message.errno, null); let key = message.key; function listen(backlog) { // TODO(bnoordhuis) Send a message to the master that tells it to // update the backlog size. The actual backlog should probably be // the largest requested size by any worker. return 0; } function close() { // lib/net.js treats server._handle.close() as effectively synchronous. // That means there is a time window between the call to close() and // the ack by the master process in which we can still receive handles. // onconnection() below handles that by sending those handles back to // the master. if (key === undefined) return; send({ act: 'close', key }); handles.delete(key); indexes.delete(indexesKey); key = undefined; } function getsockname(out) { if (key) ObjectAssign(out, message.sockname); return 0; } // 創(chuàng)建Faux handle // Faux handle. Mimics a TCPWrap with just enough fidelity to get away // with it. Fools net.Server into thinking that it's backed by a real // handle. Use a noop function for ref() and unref() because the control // channel is going to keep the worker alive anyway. const handle = { close, listen, ref: noop, unref: noop }; if (message.sockname) { handle.getsockname = getsockname; // TCP handles only. } assert(handles.has(key) === false); // 保存faux handle handles.set(key, handle); // 執(zhí)行 net 模塊調(diào)用 cluster._getServer 函數(shù)傳進來的回調(diào)函數(shù) cb(0, handle); } // 處理請求 // Round-robin connection. function onconnection(message, handle) { // 獲取faux handle的key const key = message.key; // 獲取faux hadle const server = handles.get(key); const accepted = server !== undefined; send({ ack: message.seq, accepted }); // 調(diào)用在 net 模塊中 setupListenHandle 函數(shù)里為該 faux handle 設(shè)置的連接處理函數(shù)處理請求 if (accepted) server.onconnection(0, handle); }
至此,所有的內(nèi)容都聯(lián)系起來了。
在之前的代碼分析中我們可以知道,Cluster 集群會在 Master 進程中創(chuàng)建 Net Server,在 Worker 進程運行創(chuàng)建 HTTP Server 的時候,會將監(jiān)聽地址的信息傳入 cluster._getServer
函數(shù)創(chuàng)建一個 faux handle
并設(shè)置到子進程的 Net Server 上,在 Worker 進程初始化的時候會注冊 IPC 通信回調(diào)函數(shù),在回調(diào)函數(shù)內(nèi) ,調(diào)用在子進程中 Net Server 模塊初始化后的 {faux handle}.onconnection
函數(shù),并將傳過來的連接的 handle 傳入完成請求響應(yīng)。
我們可以在 Master 進程中監(jiān)聽 Worker 進程的 error
、disconntect
、exit
事件,在這些事件中去做對應(yīng)的處理,例如清理退出的進程并重新 fork
,或者使用已經(jīng)封裝好的 npm 包,例如 cfork
在 Egg.js 的多進程模型中,多了另外一個進程類型,即 Agent 進程,該進程主要用于處理多進程不好處理的一些事情還有減少長鏈接的數(shù)量,具體關(guān)系如下:
+---------+ +---------+ +---------+ | Master | | Agent | | Worker | +---------+ +----+----+ +----+----+ | fork agent | | +-------------------->| | | agent ready | | |<--------------------+ | | | fork worker | +----------------------------------------->| | worker ready | | |<-----------------------------------------+ | Egg ready | | +-------------------->| | | Egg ready | | +----------------------------------------->|
在 egg-cluster
包內(nèi),使用了 cfork
包去保證 Worker 進程掛掉后自動重啟
在我們的一個 Egg 應(yīng)用內(nèi),日志系統(tǒng)并沒有使用 Egg 原生的日志,使用了一個內(nèi)部基于
log4js
包的日志庫,在使用的時候,將需要用到的 Logger 擴展至 Application 對象上,這樣的話每個 Worker 進程在初始化的時候都會創(chuàng)建新的 Logger,也就是會存在多進程寫日志的問題,但是并沒有出現(xiàn)多進程寫日志的錯誤問題
在追蹤源碼的過程中發(fā)現(xiàn),log4js
雖然提供了 Cluster 模式,但是在上層封裝中并沒有開啟 log4js
的 Cluster 模式,所以每個 Logger 的 appender 都使用 flag a
打開一個寫入流,到這里并沒有得到答案
后來在 CNode 中找到了答案,在 unix 下使用 flag a
打開的可寫流對應(yīng)的 libuv 文件池實現(xiàn)是 UV_FS_O_APPEND
,即 O_APPEND
,而 O_APPEND
本身在 man 手冊里就定義為原子操作,內(nèi)核保證了對這個可寫流的并發(fā)寫是安全的不需要在應(yīng)用層額外加鎖(除了在 NFS 類的文件系統(tǒng)上并發(fā)寫會造成文件信息丟失或者損壞),NFS 類的網(wǎng)絡(luò)掛載的文件系統(tǒng)主要是靠模擬掉底層的 api 來實現(xiàn)的類本地操作,顯然無法在競爭條件下完美還原這類的原子操作 api,所以如果你的日志要寫到類似 oss 云盤掛載本地的這種就不能這么干,多進程寫的話必須在應(yīng)用層自己手動加鎖
關(guān)于“Node.js中多進程模型的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。