溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Nodejs中怎么實現(xiàn)一個線程池

發(fā)布時間:2021-08-09 14:52:49 來源:億速云 閱讀:164 作者:Leah 欄目:開發(fā)技術(shù)

Nodejs中怎么實現(xiàn)一個線程池,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。

nodejs雖然提供了線程的能力,但是很多時候,往往不能直接使用線程或者無限制地創(chuàng)建線程,比如我們有一個功能是cpu密集型的,如果一個請求就開一個線程,這很明顯不是最好的實踐,這時候,我們需要使用池化的技術(shù),本文介紹在nodejs線程模塊的基礎上,如何設計和實現(xiàn)一個線程池庫(https://github.com/theanarkh/nodejs-threadpool或npm  i nodejs-threadpool )。下面是線程池的總體架構(gòu)。

Nodejs中怎么實現(xiàn)一個線程池

設計一個線程池,在真正寫代碼之前,有很多設計需要考慮,大概如下:

1任務隊列的設計,一個隊列,多個線程互斥訪問,或者每個線程一個隊列,不需要互斥訪問。

2  線程退出的設計,可以由主線程檢測空閑線程,然后使子線程退出。或者子線程退出,通知主線程??臻e不一定是沒有任務就退出,可以設計空閑時間達到閾值后退出,因為創(chuàng)建線程是有時間開銷的。

3 任務數(shù)的設計,每個線程可以有個任務數(shù),還可以增加一個總?cè)蝿諗?shù),即全部線程任務數(shù)加起來

4 選擇線程的設計,選擇任務數(shù)最少的線程。

5 線程類型的設計,可以區(qū)分核心線程和預備線程,任務少的時候,核心線程處理就行。任務多也創(chuàng)建預備線程幫忙處理。

6 線程池類型的設計,cpu密集型的,線程數(shù)等于核數(shù),否則自定義線程數(shù)就行。

7 支持任務的取消和超時機制,防止一個任務時間過長或者死循環(huán)。

本文介紹的線程池具體設計思想如下(參考java):

1 主線程維護一個隊列,子線程的任務由子線程負責分發(fā),不需要互斥訪問,子線程也不需要維護自己的隊列。

2 線程退出的設計,主線程負責檢查子線程空閑時間是否達到閾值,是則使子線程退出。

3 任務數(shù)的設計,主線程負責管理任務個數(shù)并應有相應的策略。

4 選擇線程的設計,選擇任務數(shù)最少的線程。

5 線程類型的設計,區(qū)分核心線程和預備線程,任務少的時候,核心線程處理就行。任務多也創(chuàng)建預備線程幫忙處理。

6 線程池類型的設計,cpu密集型的,線程數(shù)等于核數(shù),否則自定義線程數(shù)就行。

7  支持任務的取消和超時機制,超時或者取消的時候,主線程判斷任務是待執(zhí)行還是正在執(zhí)行,如果是待執(zhí)行則從任務隊列中刪除,如果是正在執(zhí)行則殺死對應的子線程。下面我們看一下具體的設計。

1 主線程和子線程通信的數(shù)據(jù)結(jié)構(gòu)

// 任務類,一個任務對應一個id class Work {     constructor({workId, filename, options}) {         // 任務id         this.workId = workId;         // 任務邏輯,字符串或者js文件路徑         this.filename = filename;         // 任務返回的結(jié)果         this.data = null;         // 任務返回的錯誤         this.error = null;         // 執(zhí)行任務時傳入的參數(shù),用戶定義         this.options = options;     } }

主線程給子線程分派一個任務的時候,就給子線程發(fā)送一個Work對象。在nodejs中線程間通信需要經(jīng)過序列化和反序列化,所以通信的數(shù)據(jù)結(jié)構(gòu)包括的信息不能過多。

2 子線程處理任務邏輯

const { parentPort } = require('worker_threads'); const vm = require('vm'); const { isFunction, isJSFile } = require('./utils');  // 監(jiān)聽主線程提交過來的任務 parentPort.on('message', async (work) => {     try {         const { filename, options } = work;         let aFunction;         if (isJSFile(filename)) {             aFunction = require(filename);         } else {             aFunction = vm.runInThisContext(`(${filename})`);         }         if (!isFunction(aFunction)) {             throw new Error('work type error: js file or string');         }         work.data = await aFunction(options);         parentPort.postMessage({event: 'done', work});     } catch (error) {         work.error = error.toString();         parentPort.postMessage({event: 'error', work});     } });  process.on('uncaughtException', (...rest) => {     console.error(...rest); });  process.on('unhandledRejection', (...rest) => {     console.error(...rest); });

子線程的邏輯比較簡單,就是監(jiān)聽主線程分派過來的任務,然后執(zhí)行任務,執(zhí)行完之后通知主線程。任務支持js文件和字符串代碼的形式。需要返回一個Promise或者async函數(shù)。用于用于通知主線程任務已經(jīng)完成。

3 線程池和業(yè)務的通信

// 提供給用戶側(cè)的接口 class UserWork extends EventEmitter {     constructor({ workId }) {         super();         // 任務id         this.workId = workId;         // 支持超時取消任務         this.timer = null;         // 任務狀態(tài)         this.state = WORK_STATE.PENDDING;     }     // 超時后取消任務     setTimeout(timeout) {         this.timer = setTimeout(() => {             this.timer && this.cancel() && this.emit('timeout');         }, ~~timeout);     }     // 取消之前設置的定時器     clearTimeout() {         clearTimeout(this.timer);         this.timer = null;     }     // 直接取消任務,如果執(zhí)行完了就不能取消了,this.terminate是動態(tài)設置的     cancel() {         if (this.state === WORK_STATE.END || this.state === WORK_STATE.CANCELED) {            return false;         } else {             this.terminate();             return true;         }     }     // 修改任務狀態(tài)     setState(state) {         this.state = state;     } }

業(yè)務提交一個任務給線程池的時候,線程池會返回一個UserWork類,業(yè)務側(cè)通過UserWork類和線程池通信。

4 管理子線程的數(shù)據(jù)結(jié)構(gòu)

// 管理子線程的數(shù)據(jù)結(jié)構(gòu) class Thread {     constructor({ worker }) {         // nodejs的Worker對象,nodejs的worker_threads模塊的Worker         this.worker = worker;         // 線程狀態(tài)         this.state = THREAD_STATE.IDLE;         // 上次工作的時間         this.lastWorkTime = Date.now();     }     // 修改線程狀態(tài)     setState(state) {         this.state = state;     }     // 修改線程最后工作時間     setLastWorkTime(time) {         this.lastWorkTime = time;     } }

線程池中維護了多個子線程,Thread類用于管理子線程的信息。

5 線程池 線程池的實現(xiàn)是核心,我們分為幾個部分講。

5.1 支持的配置

constructor(options = {}) {         this.options = options;         // 子線程隊列         this.workerQueue = [];         // 核心線程數(shù)         this.coreThreads = ~~options.coreThreads || config.CORE_THREADS;         // 線程池最大線程數(shù),如果不支持動態(tài)擴容則最大線程數(shù)等于核心線程數(shù)         this.maxThreads = options.expansion !== false ? Math.max(this.coreThreads, config.MAX_THREADS) : this.coreThreads;         // 超過任務隊列長度時的處理策略         this.discardPolicy = options.discardPolicy ? options.discardPolicy : DISCARD_POLICY.NOT_DISCARD;         // 是否預創(chuàng)建子線程         this.preCreate = options.preCreate === true;         // 線程最大空閑時間,達到后自動退出         this.maxIdleTime = ~~options.maxIdleTime || config.MAX_IDLE_TIME;         // 是否預創(chuàng)建線程池         this.preCreate && this.preCreateThreads();         // 保存線程池中任務對應的UserWork         this.workPool = {};         // 線程池中當前可用的任務id,每次有新任務時自增1         this.workId = 0;         // 線程池中的任務隊列         this.queue = [];         // 線程池總?cè)蝿諗?shù)         this.totalWork = 0;         // 支持的最大任務數(shù)         this.maxWork = ~~options.maxWork || config.MAX_WORK;         // 處理任務的超時時間,全局配置         this.timeout = ~~options.timeout;         this.pollIdle();     }

上面的代碼列出了線程池所支持的能力。

5.2 創(chuàng)建線程

newThread() {         const worker = new Worker(workerPath);         const thread = new Thread({worker});         this.workerQueue.push(thread);         const threadId = worker.threadId;         worker.on('exit', () => {             // 找到該線程對應的數(shù)據(jù)結(jié)構(gòu),然后刪除該線程的數(shù)據(jù)結(jié)構(gòu)             const position = this.workerQueue.findIndex(({worker}) => {                 return worker.threadId === threadId;             });             const exitedThread = this.workerQueue.splice(position, 1);             // 退出時狀態(tài)是BUSY說明還在處理任務(非正常退出)             this.totalWork -= exitedThread.state === THREAD_STATE.BUSY ? 1 : 0;         });         // 和子線程通信         worker.on('message', (result) => {             const {                 work,                 event,             } = result;             const { data, error, workId } = work;             // 通過workId拿到對應的userWork             const userWork = this.workPool[workId];             // 不存在說明任務被取消了             if (!userWork) {                 return;             }             // 修改線程池數(shù)據(jù)結(jié)構(gòu)             this.endWork(userWork);              // 修改線程數(shù)據(jù)結(jié)構(gòu)             thread.setLastWorkTime(Date.now());              // 還有任務則通知子線程處理,否則修改子線程狀態(tài)為空閑             if (this.queue.length) {                 // 從任務隊列拿到一個任務交給子線程                 this.submitWorkToThread(thread, this.queue.shift());             } else {                 thread.setState(THREAD_STATE.IDLE);             }              switch(event) {                 case 'done':                     // 通知用戶,任務完成                     userWork.emit('done', data);                     break;                 case 'error':                     // 通知用戶,任務出錯                     if (EventEmitter.listenerCount(userWork, 'error')) {                         userWork.emit('error', error);                     }                     break;                 default: break;             }         });         worker.on('error', (...rest) => {             console.error(...rest);         });         return thread;     }

創(chuàng)建線程,并保持線程對應的數(shù)據(jù)結(jié)構(gòu)、退出、通信管理、任務分派。子線程執(zhí)行完任務后,會通知線程池,主線程通知用戶。

5.3 選擇線程

selectThead() {         // 找出空閑的線程,把任務交給他         for (let i = 0; i < this.workerQueue.length; i++) {             if (this.workerQueue[i].state === THREAD_STATE.IDLE) {                 return this.workerQueue[i];             }         }         // 沒有空閑的則隨機選擇一個         return this.workerQueue[~~(Math.random() * this.workerQueue.length)];     }

當用戶給線程池提交一個任務時,線程池會選擇一個空閑的線程處理該任務。如果沒有可用線程則任務插入待處理隊列等待處理。

5.4 提交任務

// 給線程池提交一個任務     submit(filename, options = {}) {         return new Promise(async (resolve, reject) => {             let thread;             // 沒有線程則創(chuàng)建一個             if (this.workerQueue.length) {                 thread = this.selectThead();                 // 該線程還有任務需要處理                 if (thread.state === THREAD_STATE.BUSY) {                     // 子線程個數(shù)還沒有達到核心線程數(shù),則新建線程處理                     if (this.workerQueue.length < this.coreThreads) {                         thread = this.newThread();                     } else if (this.totalWork + 1 > this.maxWork){                         // 總?cè)蝿諗?shù)已達到閾值,還沒有達到線程數(shù)閾值,則創(chuàng)建                         if(this.workerQueue.length < this.maxThreads) {                             thread = this.newThread();                         } else {                             // 處理溢出的任務                             switch(this.discardPolicy) {                                 case DISCARD_POLICY.ABORT:                                      return reject(new Error('queue overflow'));                                 case DISCARD_POLICY.CALLER_RUN:                                     const workId = this.generateWorkId();                                     const userWork =  new UserWork({workId});                                      userWork.setState(WORK_STATE.RUNNING);                                     userWork.terminate = () => {                                         userWork.setState(WORK_STATE.CANCELED);                                     };                                     this.timeout && userWork.setTimeout(this.timeout);                                     resolve(userWork);                                     try {                                         let aFunction;                                         if (isJSFile(filename)) {                                             aFunction = require(filename);                                         } else {                                             aFunction = vm.runInThisContext(`(${filename})`);                                         }                                         if (!isFunction(aFunction)) {                                             throw new Error('work type error: js file or string');                                         }                                         const result = await aFunction(options);                                         // 延遲通知,讓用戶有機會取消或者注冊事件                                         setImmediate(() => {                                             if (userWork.state !== WORK_STATE.CANCELED) {                                                 userWork.setState(WORK_STATE.END);                                                 userWork.emit('done', result);                                             }                                         });                                     } catch (error) {                                         setImmediate(() => {                                             if (userWork.state !== WORK_STATE.CANCELED) {                                                 userWork.setState(WORK_STATE.END);                                                 userWork.emit('error', error.toString());                                             }                                         });                                     }                                     return;                                 case DISCARD_POLICY.OLDEST_DISCARD:                                      const work = this.queue.shift();                                     // maxWork為1時,work會為空                                     if (work && this.workPool[work.workId]) {                                         this.cancelWork(this.workPool[work.workId]);                                     } else {                                         return reject(new Error('no work can be discarded'));                                     }                                     break;                                 case DISCARD_POLICY.DISCARD:                                     return reject(new Error('discard'));                                 case DISCARD_POLICY.NOT_DISCARD:                                     break;                                 default:                                      break;                             }                         }                     }                 }             } else {                 thread = this.newThread();             }             // 生成一個任務id             const workId = this.generateWorkId();              // 新建一個UserWork             const userWork =  new UserWork({workId});              this.timeout && userWork.setTimeout(this.timeout);              // 新建一個work             const work = new Work({ workId, filename, options });              // 修改線程池數(shù)據(jù)結(jié)構(gòu),把UserWork和Work關聯(lián)起來             this.addWork(userWork);              // 選中的線程正在處理任務,則先緩存到任務隊列             if (thread.state === THREAD_STATE.BUSY) {                 this.queue.push(work);                 userWork.terminate = () => {                     this.cancelWork(userWork);                     this.queue = this.queue.filter((node) => {                         return node.workId !== work.workId;                     });                 }             } else {                 this.submitWorkToThread(thread, work);             }              resolve(userWork);         })     }      submitWorkToThread(thread, work) {         const userWork = this.workPool[work.workId];         userWork.setState(WORK_STATE.RUNNING);         // 否則交給線程處理,并修改狀態(tài)和記錄該線程當前處理的任務id         thread.setState(THREAD_STATE.BUSY);         thread.worker.postMessage(work);         userWork.terminate = () => {             this.cancelWork(userWork);             thread.setState(THREAD_STATE.DEAD);             thread.worker.terminate();         }     }      addWork(userWork) {         userWork.setState(WORK_STATE.PENDDING);         this.workPool[userWork.workId] = userWork;         this.totalWork++;     }      endWork(userWork) {         delete this.workPool[userWork.workId];         this.totalWork--;         userWork.setState(WORK_STATE.END);         userWork.clearTimeout();      }      cancelWork(userWork) {         delete this.workPool[userWork.workId];         this.totalWork--;         userWork.setState(WORK_STATE.CANCELED);         userWork.emit('cancel');     }

提交任務是線程池暴露給用戶側(cè)的接口,主要處理的邏輯包括,根據(jù)當前的策略判斷是否需要新建線程、選擇線程處理任務、排隊任務等,如果任務數(shù)達到閾值,則根據(jù)丟棄策略處理該任務。

5.5 空閑處理

pollIdle() {         setTimeout(() => {             for (let i = 0; i < this.workerQueue.length; i++) {                 const node = this.workerQueue[i];                 if (node.state === THREAD_STATE.IDLE && Date.now() - node.lastWorkTime > this.maxIdleTime) {                     node.worker.terminate();                 }             }             this.pollIdle();         }, 1000);     }

當子線程空閑時間達到閾值后,主線程會殺死子線程,避免浪費系統(tǒng)資源。

看完上述內(nèi)容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業(yè)資訊頻道,感謝您對億速云的支持。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI