溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Node.js中怎么實現(xiàn)多線程

發(fā)布時間:2021-07-21 09:27:52 來源:億速云 閱讀:459 作者:Leah 欄目:web開發(fā)

Node.js中怎么實現(xiàn)多線程,針對這個問題,這篇文章詳細介紹了相對應(yīng)的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。

Node.js 是如何工作的

Node.js 使用兩種線程:event loop 處理的主線程和 worker pool 中的幾個輔助線程。

事件循環(huán)是一種機制,它采用回調(diào)(函數(shù))并注冊它們,準備在將來的某個時刻執(zhí)行。它與相關(guān)的 JavaScript 代碼在同一個線程中運行。當 JavaScript 操作阻塞線程時,事件循環(huán)也會被阻止。

工作池是一種執(zhí)行模型,它產(chǎn)生并處理單獨的線程,然后同步執(zhí)行任務(wù),并將結(jié)果返回到事件循環(huán)。事件循環(huán)使用返回的結(jié)果執(zhí)行提供的回調(diào)。

簡而言之,它負責異步 I/O操作 —— 主要是與系統(tǒng)磁盤和網(wǎng)絡(luò)的交互。它主要由諸如 fs(I/O 密集)或 crypto(CPU 密集)等模塊使用。工作池用 libuv 實現(xiàn),當 Node 需要在 JavaScript 和 C++ 之間進行內(nèi)部通信時,會導致輕微的延遲,但這幾乎不可察覺。

基于這兩種機制,我們可以編寫如下代碼:

fs.readFile(path.join(__dirname, './package.json'), (err, content) => {   if (err) {     return null;   }   console.log(content.toString());  });

前面提到的 fs 模塊告訴工作池使用其中一個線程來讀取文件的內(nèi)容,并在完成后通知事件循環(huán)。然后事件循環(huán)獲取提供的回調(diào)函數(shù),并用文件的內(nèi)容執(zhí)行它。

以上是非阻塞代碼的示例,我們不必同步等待某事的發(fā)生。只需告訴工作池去讀取文件,并用結(jié)果去調(diào)用提供的函數(shù)即可。由于工作池有自己的線程,因此事件循環(huán)可以在讀取文件時繼續(xù)正常執(zhí)行。

在不需要同步執(zhí)行某些復(fù)雜操作時,這一切都相安無事:任何運行時間太長的函數(shù)都會阻塞線程。如果應(yīng)用程序中有大量這類功能,就可能會明顯降低服務(wù)器的吞吐量,甚至完全凍結(jié)它。在這種情況下,無法繼續(xù)將工作委派給工作池。

在需要對數(shù)據(jù)進行復(fù)雜的計算時(如AI、機器學習或大數(shù)據(jù))無法真正有效地使用 Node.js,因為操作阻塞了主(且唯一)線程,使服務(wù)器無響應(yīng)。在 Node.js v10.5.0 發(fā)布之前就是這種情況,在這一版本增加了對多線程的支持。

簡介:worker_threads

worker_threads 模塊允許我們創(chuàng)建功能齊全的多線程 Node.js 程序。

thread worker 是在單獨的線程中生成的一段代碼(通常從文件中取出)。

注意,術(shù)語 thread worker,worker 和 thread 經(jīng)?;Q使用,他們都指的是同一件事。

要想使用 thread worker,必須導入 worker_threads 模塊。讓我們先寫一個函數(shù)來幫助我們生成這些thread worker,然后再討論它們的屬性。

type WorkerCallback = (err: any, result?: any) => any;  export function runWorker(path: string, cb: WorkerCallback, workerData: object | nullnull = null) {   const worker = new Worker(path, { workerData });   worker.on('message', cb.bind(null, null));   worker.on('error', cb);   worker.on('exit', (exitCode) => {     if (exitCode === 0) {       return null;     }     return cb(new Error(`Worker has stopped with code ${exitCode}`));   });   return worker;  }

要創(chuàng)建一個 worker,首先必須創(chuàng)建一個 Worker 類的實例。它的***個參數(shù)提供了包含 worker 的代碼的文件的路徑;第二個參數(shù)提供了一個名為 workerData 的包含一個屬性的對象。這是我們希望線程在開始運行時可以訪問的數(shù)據(jù)。

請注意:不管你是用的是 JavaScript, 還是最終要轉(zhuǎn)換為 JavaScript 的語言(例如,TypeScript),路徑應(yīng)該始終引用帶有 .js 或 .mjs 擴展名的文件。

我還想指出為什么使用回調(diào)方法,而不是返回在觸發(fā) message 事件時將解決的 promise。這是因為 worker 可以發(fā)送許多 message 事件,而不是一個。

正如你在上面的例子中所看到的,線程間的通信是基于事件的,這意味著我們設(shè)置了 worker 在發(fā)送給定事件后調(diào)用的偵聽器。

以下是最常見的事件:

worker.on('error', (error) => {});

只要 worker 中有未捕獲的異常,就會發(fā)出 error 事件。然后終止 worker,錯誤可以作為提供的回調(diào)中的***個參數(shù)。

worker.on('exit', (exitCode) => {});

在 worker 退出時會發(fā)出 exit 事件。如果在worker中調(diào)用了 process.exit(),那么 exitCode 將被提供給回調(diào)。如果 worker 以 worker.terminate() 終止,則代碼為1。

worker.on('online', () => {});

只要 worker 停止解析 JavaScript 代碼并開始執(zhí)行,就會發(fā)出 online 事件。它不常用,但在特定情況下可以提供信息。

worker.on('message', (data) => {});

只要 worker 將數(shù)據(jù)發(fā)送到父線程,就會發(fā)出 message 事件。

現(xiàn)在讓我們來看看如何在線程之間共享數(shù)據(jù)。

在線程之間交換數(shù)據(jù)

要將數(shù)據(jù)發(fā)送到另一個線程,可以用 port.postMessage() 方法。它的原型如下:

port.postMessage(data[, transferList])

port 對象可以是 parentPort,也可以是 MessagePort 的實例 —— 稍后會詳細講解。

數(shù)據(jù)參數(shù)

***個參數(shù) —— 這里被稱為 data —— 是一個被復(fù)制到另一個線程的對象。它可以是復(fù)制算法所支持的任何內(nèi)容。

數(shù)據(jù)由結(jié)構(gòu)化克隆算法進行復(fù)制。引用自 Mozilla:

它通過遞歸輸入對象來進行克隆,同時保持之前訪問過的引用的映射,以避免***遍歷循環(huán)。

該算法不復(fù)制函數(shù)、錯誤、屬性描述符或原型鏈。還需要注意的是,以這種方式復(fù)制對象與使用 JSON 不同,因為它可以包含循環(huán)引用和類型化數(shù)組,而 JSON 不能。

由于能夠復(fù)制類型化數(shù)組,該算法可以在線程之間共享內(nèi)存。

在線程之間共享內(nèi)存

人們可能會說像 cluster 或 child_process 這樣的模塊在很久以前就開始使用線程了。這話對,也不對。

cluster 模塊可以創(chuàng)建多個節(jié)點實例,其中一個主進程在它們之間對請求進行路由。集群能夠有效地增加服務(wù)器的吞吐量;但是我們不能用 cluster 模塊生成一個單獨的線程。

人們傾向于用 PM2 這樣的工具來集中管理他們的程序,而不是在自己的代碼中手動執(zhí)行,如果你有興趣,可以研究一下如何使用 cluster 模塊。

child_process 模塊可以生成任何可執(zhí)行文件,無論它是否是用 JavaScript 寫的。它和 worker_threads 非常相似,但缺少后者的幾個重要功能。

具體來說 thread workers 更輕量,并且與其父線程共享相同的進程 ID。它們還可以與父線程共享內(nèi)存,這樣可以避免對大的數(shù)據(jù)負載進行序列化,從而更有效地來回傳遞數(shù)據(jù)。

現(xiàn)在讓我們看一下如何在線程之間共享內(nèi)存。為了共享內(nèi)存,必須將 ArrayBuffer 或 SharedArrayBuffer 的實例作為數(shù)據(jù)參數(shù)發(fā)送到另一個線程。

這是一個與其父線程共享內(nèi)存的 worker:

import { parentPort } from 'worker_threads';  parentPort.on('message', () => {   const numberOfElements = 100;   const sharedBuffer = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT * numberOfElements);   const arr = new Int32Array(sharedBuffer);   for (let i = 0; i < numberOfElements; i += 1) {     arr[i] = Math.round(Math.random() * 30);   }   parentPort.postMessage({ arr });  });

首先,我們創(chuàng)建一個 SharedArrayBuffer,其內(nèi)存需要包含100個32位整數(shù)。接下來創(chuàng)建一個 Int32Array 實例,它將用緩沖區(qū)來保存其結(jié)構(gòu),然后用一些隨機數(shù)填充數(shù)組并將其發(fā)送到父線程。

在父線程中:

import path from 'path';  import { runWorker } from '../run-worker';  const worker = runWorker(path.join(__dirname, 'worker.js'), (err, { arr }) => {   if (err) {     return null;   }   arr[0] = 5;  });  worker.postMessage({});

把 arr [0] 的值改為5,實際上會在兩個線程中修改它。

當然,通過共享內(nèi)存,我們冒險在一個線程中修改一個值,同時也在另一個線程中進行了修改。但是我們在這個過程中也得到了一個好處:該值不需要進行序列化就可以另一個線程中使用,這極大地提高了效率。只需記住管理數(shù)據(jù)正確的引用,以便在完成數(shù)據(jù)處理后對其進行垃圾回收。

共享一個整數(shù)數(shù)組固然很好,但我們真正感興趣的是共享對象 &mdash;&mdash; 這是存儲信息的默認方式。不幸的是,沒有 SharedObjectBuffer 或類似的東西,但我們可以自己創(chuàng)建一個類似的結(jié)構(gòu)。

transferList參數(shù)

transferList 中只能包含 ArrayBuffer 和 MessagePort。一旦它們被傳送到另一個線程,就不能再次被傳送了;因為內(nèi)存里的內(nèi)容已經(jīng)被移動到了另一個線程。

目前,還不能通過 transferList(可以使用 child_process 模塊)來傳輸網(wǎng)絡(luò)套接字。

創(chuàng)建通信渠道

線程之間的通信是通過 port 進行的,port 是 MessagePort 類的實例,并啟用基于事件的通信。

使用 port 在線程之間進行通信的方法有兩種。***個是默認值,這個方法比較容易。在 worker 的代碼中,我們從worker_threads 模塊導入一個名為 parentPort 的對象,并使用對象的 .postMessage() 方法將消息發(fā)送到父線程。

這是一個例子:

import { parentPort } from 'worker_threads';  const data = {   // ...  };  parentPort.postMessage(data);

parentPort 是 Node.js 在幕后創(chuàng)建的 MessagePort 實例,用于與父線程進行通信。這樣就可以用 parentPort 和 worker 對象在線程之間進行通信。

線程間的第二種通信方式是創(chuàng)建一個 MessageChannel 并將其發(fā)送給 worker。以下代碼是如何創(chuàng)建一個新的 MessagePort 并與我們的 worker 共享它:

import path from 'path';  import { Worker, MessageChannel } from 'worker_threads';  const worker = new Worker(path.join(__dirname, 'worker.js'));  const { port1, port2 } = new MessageChannel();  port1.on('message', (message) => {   console.log('message from worker:', message);  });  worker.postMessage({ port: port2 }, [port2]);

在創(chuàng)建 port1 和 port2 之后,我們在 port1 上設(shè)置事件監(jiān)聽器并將 port2 發(fā)送給 worker。我們必須將它包含在 transferList 中,以便將其傳輸給 worker 。

在 worker 內(nèi)部:

import { parentPort, MessagePort } from 'worker_threads';  parentPort.on('message', (data) => {   const { port }: { port: MessagePort } = data;   port.postMessage('heres your message!');  });

這樣,我們就能使用父線程發(fā)送的 port 了。

使用 parentPort 不一定是錯誤的方法,但***用 MessageChannel 的實例創(chuàng)建一個新的 MessagePort,然后與生成的 worker 共享它。

請注意,在后面的例子中,為了簡便起見,我用了 parentPort。

使用 worker 的兩種方式

可以通過兩種方式使用 worker。***種是生成一個 worker,然后執(zhí)行它的代碼,并將結(jié)果發(fā)送到父線程。通過這種方法,每當出現(xiàn)新任務(wù)時,都必須重新創(chuàng)建一個工作者。

第二種方法是生成一個 worker 并為 message 事件設(shè)置監(jiān)聽器。每次觸發(fā) message 時,它都會完成工作并將結(jié)果發(fā)送回父線程,這會使 worker 保持活動狀態(tài)以供以后使用。

Node.js 文檔推薦第二種方法,因為在創(chuàng)建 thread worker 時需要創(chuàng)建虛擬機并解析和執(zhí)行代碼,這會產(chǎn)生比較大的開銷。所以這種方法比不斷產(chǎn)生新 worker 的效率更高。

這種方法被稱為工作池,因為我們創(chuàng)建了一個工作池并讓它們等待,在需要時調(diào)度 message 事件來完成工作。

以下是一個產(chǎn)生、執(zhí)行然后關(guān)閉 worker 例子:

import { parentPort } from 'worker_threads';  const collection = [];  for (let i = 0; i < 10; i += 1) {   collection[i] = i;  }  parentPort.postMessage(collection);

將 collection 發(fā)送到父線程后,它就會退出。

下面是一個 worker 的例子,它可以在給定任務(wù)之前等待很長一段時間:

import { parentPort } from 'worker_threads';  parentPort.on('message', (data: any) => {   const result = doSomething(data);   parentPort.postMessage(result);  });

worker_threads 模塊中可用的重要屬性

worker_threads 模塊中有一些可用的屬性:

isMainThread

當不在工作線程內(nèi)操作時,該屬性為 true 。如果你覺得有必要,可以在 worker 文件的開頭包含一個簡單的 if 語句,以確保它只作為 worker 運行。

import { isMainThread } from 'worker_threads';  if (isMainThread) {   throw new Error('Its not a worker');  }

workerData

產(chǎn)生線程時包含在 worker 的構(gòu)造函數(shù)中的數(shù)據(jù)。

const worker = new Worker(path, { workerData });

在工作線程中:

import { workerData } from 'worker_threads';  console.log(workerData.property);

parentPort

前面提到的 MessagePort 實例,用于與父線程通信。

threadId

分配給 worker 的唯一標識符。

現(xiàn)在我們知道了技術(shù)細節(jié),接下來實現(xiàn)一些東西并在實踐中檢驗學到的知識。

實現(xiàn) setTimeout

setTimeout 是一個***循環(huán),顧名思義,用來檢測程序運行時間是否超時。它在循環(huán)中檢查起始時間與給定毫秒數(shù)之和是否小于實際日期。

import { parentPort, workerData } from 'worker_threads';  const time = Date.now();  while (true) {      if (time + workerData.time <= Date.now()) {          parentPort.postMessage({});          break;      }  }

這個特定的實現(xiàn)產(chǎn)生一個線程,然后執(zhí)行它的代碼,***在完成后退出。

接下來實現(xiàn)使用這個 worker 的代碼。首先創(chuàng)建一個狀態(tài),用它來跟蹤生成的 worker:

const timeoutState: { [key: string]: Worker } = {};

然后時負責創(chuàng)建 worker 并將其保存到狀態(tài)的函數(shù):

export function setTimeout(callback: (err: any) => any, time: number) {   const id = uuidv4();   const worker = runWorker(     path.join(__dirname, './timeout-worker.js'),     (err) => {       if (!timeoutState[id]) {         return null;       }       timeoutState[id] = null;       if (err) {         return callback(err);       }       callback(null);     },     {       time,     },   );   timeoutState[id] = worker;   return id;  }

首先,我們使用 UUID 包為 worker 創(chuàng)建一個唯一的標識符,然后用先前定義的函數(shù) runWorker 來獲取 worker。我們還向 worker 傳入一個回調(diào)函數(shù),一旦 worker 發(fā)送了數(shù)據(jù)就會被觸發(fā)。***,把 worker 保存在狀態(tài)中并返回 id。

在回調(diào)函數(shù)中,我們必須檢查該 worker 是否仍然存在于該狀態(tài)中,因為有可能會 cancelTimeout(),這將會把它刪除。如果確實存在,就把它從狀態(tài)中刪除,并調(diào)用傳給 setTimeout 函數(shù)的 callback。

cancelTimeout 函數(shù)使用 .terminate() 方法強制 worker 退出,并從該狀態(tài)中刪除該這個worker:

export function cancelTimeout(id: string) {   if (timeoutState[id]) {     timeoutState[id].terminate();     timeoutState[id] = undefined;     return true;   }   return false;  }

如果你有興趣,我也實現(xiàn)了 setInterval,代碼在這里,但因為它對線程什么都沒做(我們重用setTimeout的代碼),所以我決定不在這里進行解釋。

我已經(jīng)創(chuàng)建了一個短小的測試代碼,目的是檢查這種方法與原生方法的不同之處。你可以在這里找到代碼。這些是結(jié)果:

native setTimeout { ms: 7004, averageCPUCost: 0.1416 }  worker setTimeout { ms: 7046, averageCPUCost: 0.308 }

我們可以看到 setTimeout 有一點延遲 - 大約40ms - 這時 worker 被創(chuàng)建時的消耗。平均 CPU 成本也略高,但沒什么難以忍受的(CPU 成本是整個過程持續(xù)時間內(nèi) CPU 使用率的平均值)。

如果我們可以重用 worker,就能夠降低延遲和 CPU 使用率,這就是要實現(xiàn)工作池的原因。

實現(xiàn)工作池

如上所述,工作池是給定數(shù)量的被事先創(chuàng)建的 worker,他們保持空閑并監(jiān)聽 message 事件。一旦 message 事件被觸發(fā),他們就會開始工作并發(fā)回結(jié)果。

為了更好地描述我們將要做的事情,下面我們來創(chuàng)建一個由八個 thread worker 組成的工作池:

const pool = new WorkerPool(path.join(__dirname, './test-worker.js'), 8);

如果你熟悉限制并發(fā)操作,那么你在這里看到的邏輯幾乎相同,只是一個不同的用例。

如上面的代碼片段所示,我們把指向 worker 的路徑和要生成的 worker 數(shù)量傳給了 WorkerPool 的構(gòu)造函數(shù)。

export class WorkerPool<T, N> {   private queue: QueueItem<T, N>[] = [];   private workersById: { [key: number]: Worker } = {};   private activeWorkersById: { [key: number]: boolean } = {};   public constructor(public workerPath: string, public numberOfThreads: number) {     this.init();   }  }

這里還有其他一些屬性,如 workersById 和 activeWorkersById,我們可以分別保存現(xiàn)有的 worker 和當前正在運行的 worker 的 ID。還有 queue,我們可以使用以下結(jié)構(gòu)來保存對象:

type QueueCallback<N> = (err: any, result?: N) => void;  interface QueueItem<T, N> {   callback: QueueCallback<N>;   getData: () => T;  }

callback 只是默認的節(jié)點回調(diào),***個參數(shù)是錯誤,第二個參數(shù)是可能的結(jié)果。 getData 是傳遞給工作池 .run() 方法的函數(shù)(如下所述),一旦項目開始處理就會被調(diào)用。 getData 函數(shù)返回的數(shù)據(jù)將傳給工作線程。

在 .init() 方法中,我們創(chuàng)建了 worker 并將它們保存在以下狀態(tài)中:

private init() {    if (this.numberOfThreads < 1) {      return null;    }    for (let i = 0; i < this.numberOfThreads; i += 1) {      const worker = new Worker(this.workerPath);      this.workersById[i] = worker;      this.activeWorkersById[i] = false;    }  }

為避免***循環(huán),我們首先要確保線程數(shù) > 1。然后創(chuàng)建有效的 worker 數(shù),并將它們的索引保存在 workersById 狀態(tài)。我們在 activeWorkersById 狀態(tài)中保存了它們當前是否正在運行的信息,默認情況下該狀態(tài)始終為false。

現(xiàn)在我們必須實現(xiàn)前面提到的 .run() 方法來設(shè)置一個 worker 可用的任務(wù)。

public run(getData: () => T) {    return new Promise<N>((resolve, reject) => {      const availableWorkerId = this.getInactiveWorkerId();      const queueItem: QueueItem<T, N> = {        getData,        callback: (error, result) => {          if (error) {            return reject(error);          }  return resolve(result);        },      };      if (availableWorkerId === -1) {        this.queue.push(queueItem);        return null;      }      this.runWorker(availableWorkerId, queueItem);    });  }

在 promise 函數(shù)里,我們首先通過調(diào)用 .getInactiveWorkerId() 來檢查是否存在空閑的 worker 可以來處理數(shù)據(jù):

private getInactiveWorkerId(): number {    for (let i = 0; i < this.numberOfThreads; i += 1) {      if (!this.activeWorkersById[i]) {        return i;      }    }    return -1;  }

接下來,我們創(chuàng)建一個 queueItem,在其中保存?zhèn)鬟f給 .run() 方法的 getData 函數(shù)以及回調(diào)。在回調(diào)中,我們要么 resolve 或者 reject promise,這取決于 worker 是否將錯誤傳遞給回調(diào)。

如果 availableWorkerId 的值是 -1,意味著當前沒有可用的 worker,我們將 queueItem 添加到 queue。如果有可用的 worker,則調(diào)用 .runWorker() 方法來執(zhí)行 worker。

在 .runWorker() 方法中,我們必須把當前 worker 的 activeWorkersById 設(shè)置為使用狀態(tài);為 message 和 error 事件設(shè)置事件監(jiān)聽器(并在之后清理它們);***將數(shù)據(jù)發(fā)送給 worker。

private async runWorker(workerId: number, queueItem: QueueItem<T, N>) {   const worker = this.workersById[workerId];   this.activeWorkersById[workerId] = true;   const messageCallback = (result: N) => {     queueItem.callback(null, result);     cleanUp();   };   const errorCallback = (error: any) => {     queueItem.callback(error);     cleanUp();   };   const cleanUp = () => {     worker.removeAllListeners('message');     worker.removeAllListeners('error');     this.activeWorkersById[workerId] = false;     if (!this.queue.length) {       return null;     }     this.runWorker(workerId, this.queue.shift());   };   worker.once('message', messageCallback);   worker.once('error', errorCallback);   worker.postMessage(await queueItem.getData());  }

首先,通過使用傳遞的 workerId,我們從 workersById 中獲得 worker 引用。然后,在 activeWorkersById 中,將 [workerId] 屬性設(shè)置為true,這樣我們就能知道在 worker 在忙,不要運行其他任務(wù)。

接下來,分別創(chuàng)建 messageCallback 和 errorCallback 用來在消息和錯誤事件上調(diào)用,然后注冊所述函數(shù)來監(jiān)聽事件并將數(shù)據(jù)發(fā)送給 worker。

在回調(diào)中,我們調(diào)用 queueItem 的回調(diào),然后調(diào)用 cleanUp 函數(shù)。在 cleanUp 函數(shù)中,要刪除事件偵聽器,因為我們會多次重用同一個 worker。如果沒有刪除監(jiān)聽器的話就會發(fā)生內(nèi)存泄漏,內(nèi)存會被慢慢耗盡。

在 activeWorkersById 狀態(tài)中,我們將 [workerId] 屬性設(shè)置為 false,并檢查隊列是否為空。如果不是,就從 queue 中刪除***個項目,并用另一個 queueItem 再次調(diào)用 worker。

接著創(chuàng)建一個在收到 message 事件中的數(shù)據(jù)后進行一些計算的 worker:

import { isMainThread, parentPort } from 'worker_threads';  if (isMainThread) {   throw new Error('Its not a worker');  }  const doCalcs = (data: any) => {   const collection = [];   for (let i = 0; i < 1000000; i += 1) {     collection[i] = Math.round(Math.random() * 100000);   }   return collection.sort((a, b) => {     if (a > b) {       return 1;     }     return -1;   });  };  parentPort.on('message', (data: any) => {   const result = doCalcs(data);   parentPort.postMessage(result);  });

worker 創(chuàng)建了一個包含 100 萬個隨機數(shù)的數(shù)組,然后對它們進行排序。只要能夠多花費一些時間才能完成,做些什么事情并不重要。

以下是工作池簡單用法的示例:

const pool = new WorkerPool<{ i: number }, number>(path.join(__dirname, './test-worker.js'), 8);  const items = [...new Array(100)].fill(null);  Promise.all(   items.map(async (_, i) => {     await pool.run(() => ({ i }));     console.log('finished', i);   }),  ).then(() => {   console.log('finished all');  });

首先創(chuàng)建一個由八個 worker 組成的工作池。然后創(chuàng)建一個包含 100 個元素的數(shù)組,對于每個元素,我們在工作池中運行一個任務(wù)。開始運行后將立即執(zhí)行八個任務(wù),其余任務(wù)被放入隊列并逐個執(zhí)行。通過使用工作池,我們不必每次都創(chuàng)建一個 worker,從而大大提高了效率。

關(guān)于Node.js中怎么實現(xiàn)多線程問題的解答就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注億速云行業(yè)資訊頻道了解更多相關(guān)知識。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI