溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

理解 Node.js 事件驅(qū)動(dòng)機(jī)制的原理

發(fā)布時(shí)間:2020-08-24 19:56:53 來(lái)源:腳本之家 閱讀:189 作者:痕跡絕陌路 欄目:web開(kāi)發(fā)

學(xué)習(xí) Node.js 一定要理解的內(nèi)容之一,文中主要涉及到了 EventEmitter 的使用和一些異步情況的處理,比較偏基礎(chǔ),值得一讀。

大多數(shù) Node.js 對(duì)象都依賴了 EventEmitter 模塊來(lái)監(jiān)聽(tīng)和響應(yīng)事件,比如我們常用的 HTTP requests, responses, 以及 streams。

const EventEmitter = require('events');

事件驅(qū)動(dòng)機(jī)制的最簡(jiǎn)單形式,是在 Node.js 中十分流行的回調(diào)函數(shù),例如 fs.readFile。 在回調(diào)函數(shù)這種形式中,事件每被觸發(fā)一次,回調(diào)就會(huì)被觸發(fā)一次。

我們先來(lái)探索下這個(gè)最基本的方式。

你準(zhǔn)備好了就叫我哈,Node!

很久很久以前,在 js 里還沒(méi)有原生支持 Promise,async/await 還只是一個(gè)遙遠(yuǎn)的夢(mèng)想,回調(diào)函數(shù)是處理異步問(wèn)題的最原始的方式。

回調(diào)從本質(zhì)上講是傳遞給其他函數(shù)的函數(shù),在 JavaScript 中函數(shù)是第一類對(duì)象,這也讓回調(diào)的存在成為可能。

一定要搞清楚的是,回調(diào)在代碼中的并不表示異步調(diào)用。 回調(diào)既可以是同步調(diào)用的,也可以是異步調(diào)用的。

舉個(gè)例子,這里有一個(gè)宿主函數(shù) fileSize,它接受一個(gè)回調(diào)函數(shù) cb,并且可以通過(guò)條件判斷來(lái)同步或者異步地調(diào)用該回調(diào)函數(shù):

function fileSize (fileName, cb) {
 if (typeof fileName !== 'string') {
  // Sync
  return cb(new TypeError('argument should be string')); 
 } 
 fs.stat(fileName, (err, stats) => {
  if (err) {  
   // Async
   return cb(err); 
   } 
   // Async
  cb(null, stats.size);
 });
}

這其實(shí)也是個(gè)反例,這樣寫經(jīng)常會(huì)引起一些意外的錯(cuò)誤,在設(shè)計(jì)宿主函數(shù)的時(shí)候,應(yīng)當(dāng)盡可能的使用同一種風(fēng)格,要么始終都是同步的使用回調(diào),要么始終都是異步的。

我們來(lái)研究下一個(gè)典型的異步 Node 函數(shù)的簡(jiǎn)單示例,它用回調(diào)樣式編寫:

const readFileAsArray = function(file, cb) {
 fs.readFile(file, function(err, data) {
  if (err) {
   return cb(err);
  }
  const lines = data.toString().trim().split('\n');
  cb(null, lines);
 });
};

readFileAsArray 函數(shù)接受兩個(gè)參數(shù):一個(gè)文件路徑和一個(gè)回調(diào)函數(shù)。它讀取文件內(nèi)容,將其拆分成行數(shù)組,并將該數(shù)組作為回調(diào)函數(shù)的參數(shù)傳入,調(diào)用回調(diào)函數(shù)。

現(xiàn)在設(shè)計(jì)一個(gè)用例,假設(shè)我們?cè)谕荒夸浿械奈募?numbers.txt 包含如下內(nèi)容:

10
11
12
13
14
15

如果我們有一個(gè)需求,要求統(tǒng)計(jì)該文件中的奇數(shù)數(shù)量,我們可以使用 readFileAsArray 來(lái)簡(jiǎn)化代碼:

readFileAsArray('./numbers.txt', (err, lines) => {
 if (err) throw err;
 const numbers = lines.map(Number);
 const oddNumbers = numbers.filter(n => n%2 === 1);
 console.log('Odd numbers count:', oddNumbers.length);
});

這段代碼將文件內(nèi)容讀入字符串?dāng)?shù)組中,回調(diào)函數(shù)將其解析為數(shù)字,并計(jì)算奇數(shù)的個(gè)數(shù)。

這才是最純粹的 Node 回調(diào)風(fēng)格。回調(diào)的第一個(gè)參數(shù)要遵循錯(cuò)誤優(yōu)先的原則,err 可以為空,我們要將回調(diào)作為宿主函數(shù)的最后一個(gè)參數(shù)傳遞。你應(yīng)該一直用這種方式這樣設(shè)計(jì)你的函數(shù),因?yàn)橛脩艨赡軙?huì)假設(shè)。讓宿主函數(shù)把回調(diào)當(dāng)做其最后一個(gè)參數(shù),并讓回調(diào)函數(shù)以一個(gè)可能為空的錯(cuò)誤對(duì)象作為其第一個(gè)參數(shù)。

回調(diào)在現(xiàn)代 JavaScript 中的替代品

在現(xiàn)代 JavaScript 中,我們有 Promise,Promise 可以用來(lái)替代異步 API 的回調(diào)?;卣{(diào)函數(shù)需要作為宿主函數(shù)的一個(gè)參數(shù)進(jìn)行傳遞(多個(gè)宿主回調(diào)進(jìn)行嵌套就形成了回調(diào)地獄),而且錯(cuò)誤和成功都只能在其中進(jìn)行處理。而 Promise 對(duì)象可以讓我們分開(kāi)處理成功和錯(cuò)誤,還允許我們鏈?zhǔn)秸{(diào)用多個(gè)異步事件。

如果 readFileAsArray 函數(shù)支持 Promise,我們可以這樣使用它,如下所示:

readFileAsArray('./numbers.txt')
 .then(lines => {
  const numbers = lines.map(Number);
  const oddNumbers = numbers.filter(n => n%2 === 1);
  console.log('Odd numbers count:', oddNumbers.length);
 })
 .catch(console.error);

我們?cè)谒拗骱瘮?shù)的返回值上調(diào)用了一個(gè)函數(shù)來(lái)處理我們的需求,這個(gè) .then 函數(shù)會(huì)把剛剛在回調(diào)版本中的那個(gè)行數(shù)組傳遞給這里的匿名函數(shù)。為了處理錯(cuò)誤,我們?cè)诮Y(jié)果上添加一個(gè) .catch 調(diào)用,當(dāng)發(fā)生錯(cuò)誤時(shí),它會(huì)捕捉到錯(cuò)誤并讓我們?cè)L問(wèn)到這個(gè)錯(cuò)誤。

在現(xiàn)代 JavaScript 中已經(jīng)支持了 Promise 對(duì)象,因此我們可以很容易的將其使用在宿主函數(shù)之中。下面是支持 Promise 版本的 readFileAsArray 函數(shù)(同時(shí)支持舊有的回調(diào)函數(shù)方式):

const readFileAsArray = function(file, cb = () => {}) {
 return new Promise((resolve, reject) => {
  fs.readFile(file, function(err, data) {
   if (err) {
    reject(err);
    return cb(err);
   }   
   const lines = data.toString().trim().split('\n');
   resolve(lines);
   cb(null, lines);
  });
 });
};

我們使該函數(shù)返回一個(gè) Promise 對(duì)象,該對(duì)象包裹了 fs.readFile 的異步調(diào)用。Promise 對(duì)象暴露了兩個(gè)參數(shù),一個(gè) resolve 函數(shù)和一個(gè) reject 函數(shù)。

當(dāng)有異常拋出時(shí),我們可以通過(guò)向回調(diào)函數(shù)傳遞 error 來(lái)處理錯(cuò)誤,也同樣可以使用 Promise 的 reject 函數(shù)。每當(dāng)我們將數(shù)據(jù)交給回調(diào)函數(shù)處理時(shí),我們同樣也可以用 Promise 的 resolve 函數(shù)。

在這種同時(shí)可以使用回調(diào)和 Promise 的情況下,我們需要做的唯一一件事情就是為這個(gè)回調(diào)參數(shù)設(shè)置默認(rèn)值,防止在沒(méi)有傳遞回調(diào)函數(shù)參數(shù)時(shí),其被執(zhí)行然后報(bào)錯(cuò)的情況。 在這個(gè)例子中使用了一個(gè)簡(jiǎn)單的默認(rèn)空函數(shù):()=> {}。

通過(guò) async/await 使用 Promise

當(dāng)需要連續(xù)調(diào)用異步函數(shù)時(shí),使用 Promise 會(huì)讓你的代碼更容易編寫。不斷的使用回調(diào)會(huì)讓事情變得越來(lái)越復(fù)雜,最終陷入回調(diào)地獄。

Promise 的出現(xiàn)改善了一點(diǎn),Generator 的出現(xiàn)又改善了一點(diǎn)。 處理異步問(wèn)題的最新解決方式是使用 async 函數(shù),它允許我們將異步代碼視為同步代碼,使其整體上更加可讀。

以下是使用 async/await 版本的調(diào)用 readFileAsArray 的例子:

async function countOdd () {
 try {
  const lines = await readFileAsArray('./numbers');
  const numbers = lines.map(Number);
  const oddCount = numbers.filter(n => n%2 === 1).length;
  console.log('Odd numbers count:', oddCount);
 } catch(err) {
  console.error(err);
 }
}
countOdd();

首先,我們創(chuàng)建了一個(gè) async 函數(shù) —— 就是一個(gè)普通的函數(shù)聲明之前,加了個(gè) async 關(guān)鍵字。在 async 函數(shù)內(nèi)部,我們調(diào)用了 readFileAsArray 函數(shù),就像把它的返回值賦值給變量 lines 一樣,為了真的拿到 readFileAsArray 處理生成的行數(shù)組,我們使用關(guān)鍵字 await。之后,我們繼續(xù)執(zhí)行代碼,就好像 readFileAsArray 的調(diào)用是同步的一樣。

要讓代碼運(yùn)行,我們可以直接調(diào)用 async 函數(shù)。這讓我們的代碼變得更加簡(jiǎn)單和易讀。為了處理異常,我們需要將異步調(diào)用包裝在一個(gè) try/catch 語(yǔ)句中。

有了 async/await 這個(gè)特性,我們不必使用任何特殊的API(如 .then 和 .catch )。我們只是把這種函數(shù)標(biāo)記出來(lái),然后使用純粹的 JavaScript 寫代碼。

我們可以把 async/await 這個(gè)特性用在支持使用 Promise 處理后續(xù)邏輯的函數(shù)上。但是,它無(wú)法用在只支持回調(diào)的異步函數(shù)上(例如setTimeout)。

EventEmitter 模塊

EventEmitter 是一個(gè)處理 Node 中各個(gè)對(duì)象之間通信的模塊。 EventEmitter 是 Node 異步事件驅(qū)動(dòng)架構(gòu)的核心。 Node 的許多內(nèi)置模塊都繼承自 EventEmitter。

它的概念其實(shí)很簡(jiǎn)單:emitter 對(duì)象會(huì)發(fā)出被定義過(guò)的事件,導(dǎo)致之前注冊(cè)的所有監(jiān)聽(tīng)該事件的函數(shù)被調(diào)用。所以,emitter 對(duì)象基本上有兩個(gè)主要特征:

  • 觸發(fā)定義過(guò)的事件
  • 注冊(cè)或者取消注冊(cè)監(jiān)聽(tīng)函數(shù)

為了使用 EventEmitter,我們需要?jiǎng)?chuàng)建一個(gè)繼承自 EventEmitter 的類。

class MyEmitter extends EventEmitter {
}

我們從 EventEmitter 的子類實(shí)例化的對(duì)象,就是 emitter 對(duì)象:

const myEmitter = new MyEmitter();

在這些 emitter 對(duì)象的生命周期里,我們可以調(diào)用 emit 函數(shù)來(lái)觸發(fā)我們想要的觸發(fā)的任何被命名過(guò)的事件。

myEmitter.emit('something-happened');
emit 函數(shù)的使用表示發(fā)生某種情況發(fā)生了,讓大家去做該做的事情。 這種情況通常是某些狀態(tài)變化引起的。

我們可以使用 on 方法添加監(jiān)聽(tīng)器函數(shù),并且每次 emitter 對(duì)象觸發(fā)其關(guān)聯(lián)的事件時(shí),將執(zhí)行這些監(jiān)聽(tīng)器函數(shù)。

事件 !== 異步

先看看這個(gè)例子:

const EventEmitter = require('events');

class WithLog extends EventEmitter {
 execute(taskFunc) {
  console.log('Before executing');
  this.emit('begin');
  taskFunc();
  this.emit('end');
  console.log('After executing');
 }
}

const withLog = new WithLog();

withLog.on('begin', () => console.log('About to execute'));
withLog.on('end', () => console.log('Done with execute'));

withLog.execute(() => console.log('*** Executing task ***'));

WithLog 是一個(gè)事件觸發(fā)器,它有一個(gè)方法 —— execute,該方法接受一個(gè)參數(shù),即具體要處理的任務(wù)函數(shù),并在其前后包裹 log 以輸出其執(zhí)行日志。

為了看到這里會(huì)以什么順序執(zhí)行,我們?cè)趦蓚€(gè)命名的事件上都注冊(cè)了監(jiān)聽(tīng)器,最后執(zhí)行一個(gè)簡(jiǎn)單的任務(wù)來(lái)觸發(fā)事件。

下面是上面程序的輸出結(jié)果:

Before executing
About to execute
*** Executing task ***
Done with execute
After executing

這里我想證實(shí)的是以上的輸出都是同步發(fā)生的,這段代碼里沒(méi)有什么異步的成分。

  • 第一行輸出了 "Before executing"
  • begin 事件被觸發(fā),輸出 "About to execute"
  • 真正應(yīng)該被執(zhí)行的任務(wù)函數(shù)被調(diào)用,輸出 " Executing task "
  • end 事件被觸發(fā),輸出 "Done with execute"
  • 最后輸出 "After executing"

就像普通的回調(diào)一樣,不要以為事件意味著同步或異步代碼。

跟之前的回調(diào)一樣,不要一提到事件就認(rèn)為它是異步的或者同步的,還要具體分析。

如果我們傳遞 taskFunc 是一個(gè)異步函數(shù),會(huì)發(fā)生什么呢?

// ...

withLog.execute(() => {
 setImmediate(() => {
  console.log('*** Executing task ***')
 });
});

輸出結(jié)果變成了這樣:

Before executing
About to execute
Done with execute
After executing
*** Executing task ***

這樣就有問(wèn)題了,異步函數(shù)的調(diào)用導(dǎo)致 "Done with execute" 和 "After executing" 的輸出并不準(zhǔn)確。

要在異步函數(shù)完成后發(fā)出事件,我們需要將回調(diào)(或 Promise)與基于事件的通信相結(jié)合。 下面的例子說(shuō)明了這一點(diǎn)。

使用事件而不是常規(guī)回調(diào)的一個(gè)好處是,我們可以通過(guò)定義多個(gè)監(jiān)聽(tīng)器對(duì)相同的信號(hào)做出多個(gè)不同的反應(yīng)。如果使用回調(diào)來(lái)完成這件事,我們要在單個(gè)回調(diào)中寫更多的處理邏輯。事件是應(yīng)用程序允許多個(gè)外部插件在應(yīng)用程序核心之上構(gòu)建功能的好辦法。你可以把它們當(dāng)成鉤子來(lái)掛一些由于狀態(tài)變化而引發(fā)執(zhí)行的程序。

異步事件

我們把剛剛那些同步代碼的示例改成異步的:

const fs = require('fs');
const EventEmitter = require('events');

class WithTime extends EventEmitter {
 execute(asyncFunc, ...args) {
  this.emit('begin');
  console.time('execute');
  asyncFunc(...args, (err, data) => {
   if (err) {
    return this.emit('error', err);
   }

   this.emit('data', data);
   console.timeEnd('execute');
   this.emit('end');
  });
 }
}

const withTime = new WithTime();

withTime.on('begin', () => console.log('About to execute'));
withTime.on('end', () => console.log('Done with execute'));

withTime.execute(fs.readFile, __filename);

用 WithTime 類執(zhí)行 asyncFunc 函數(shù),并通過(guò)調(diào)用 console.time 和 console.timeEnd 報(bào)告該asyncFunc 所花費(fèi)的時(shí)間。它在執(zhí)行之前和之后都將以正確的順序觸發(fā)相應(yīng)的事件,并且還會(huì)發(fā)出 error/data 事件作為處理異步調(diào)用的信號(hào)。

我們傳遞一個(gè)異步的 fs.readFile 函數(shù)來(lái)測(cè)試一下 withTime emitter。 我們現(xiàn)在可以直接通過(guò)監(jiān)聽(tīng) data 事件來(lái)處理讀取到的文件數(shù)據(jù),而不用把這套處理邏輯寫到 fs.readFile 的回調(diào)函數(shù)中。

執(zhí)行這段代碼,我們以預(yù)期的順序執(zhí)行了一系列事件,并且得到異步函數(shù)的執(zhí)行時(shí)間,這些是十分重要的。

About to execute
execute: 4.507ms
Done with execute

請(qǐng)注意,我們是將回調(diào)與事件觸發(fā)器 emitter 相結(jié)合實(shí)現(xiàn)的這部分功能。 如果 asynFunc 支持Promise,我們可以使用 async/await 函數(shù)來(lái)做同樣的事情:

class WithTime extends EventEmitter {
 async execute(asyncFunc, ...args) {
  this.emit('begin');
  try {
   console.time('execute');
   const data = await asyncFunc(...args);
   this.emit('data', data);
   console.timeEnd('execute');
   this.emit('end');
  } catch(err) {
   this.emit('error', err);
  }
 }
}

我認(rèn)為這段代碼比之前的回調(diào)風(fēng)格的代碼以及使用 .then/.catch 風(fēng)格的代碼更具可讀性。async/await 讓我們更加接近 JavaScript 語(yǔ)言本身(不必再使用 .then/.catch 這些 api)。

事件參數(shù)和錯(cuò)誤

在之前的例子中,有兩個(gè)事件被發(fā)出時(shí)還攜帶了別的參數(shù)。

error 事件被觸發(fā)時(shí)會(huì)攜帶一個(gè) error 對(duì)象。

this.emit('error', err);

data 事件被觸發(fā)時(shí)會(huì)攜帶一個(gè) data 對(duì)象。

this.emit('data', data);

我們可以在 emit 函數(shù)中不斷的添加參數(shù),當(dāng)然第一個(gè)參數(shù)一定是事件的名稱,除去第一個(gè)參數(shù)之外的所有參數(shù)都可以在該事件注冊(cè)的監(jiān)聽(tīng)器中使用。

例如,要處理 data 事件,我們注冊(cè)的監(jiān)聽(tīng)器函數(shù)將訪問(wèn)傳遞給 emit 函數(shù)的 data 參數(shù),而這個(gè) data 也正是由 asyncFunc 返回的數(shù)據(jù)。

withTime.on('data', (data) => {
 // do something with data
});

error 事件比較特殊。在我們基于回調(diào)的那個(gè)示例中,如果不使用監(jiān)聽(tīng)器處理 error 事件,node 進(jìn)程將會(huì)退出。

舉個(gè)由于錯(cuò)誤使用參數(shù)而造成程序崩潰的例子:

class WithTime extends EventEmitter {
 execute(asyncFunc, ...args) {
  console.time('execute');
  asyncFunc(...args, (err, data) => {
   if (err) {
    return this.emit('error', err); // Not Handled
   }

   console.timeEnd('execute');
  });
 }
}

const withTime = new WithTime();

withTime.execute(fs.readFile, ''); // BAD CALL
withTime.execute(fs.readFile, __filename);

第一次調(diào)用 execute 將會(huì)觸發(fā) error 事件,由于沒(méi)有處理 error ,Node 程序隨之崩潰:

events.js:163
   throw er; // Unhandled 'error' event
   ^
Error: ENOENT: no such file or directory, open ''

第二次執(zhí)行調(diào)用將受到此崩潰的影響,并且可能根本不會(huì)被執(zhí)行。

如果我們?yōu)檫@個(gè) error 事件注冊(cè)一個(gè)監(jiān)聽(tīng)器函數(shù)來(lái)處理 error,結(jié)果將大不相同:

withTime.on('error', (err) => {
 // do something with err, for example log it somewhere
 console.log(err)
});

如果我們執(zhí)行上述操作,將會(huì)報(bào)告第一次執(zhí)行 execute 時(shí)發(fā)送的錯(cuò)誤,但是這次 node 進(jìn)程不會(huì)崩潰退出,其他程序的調(diào)用也都能正常完成:

{ Error: ENOENT: no such file or directory, open '' errno: -2, code: 'ENOENT', syscall: 'open', path: '' }
execute: 4.276ms

需要注意的是,基于 Promise 的函數(shù)有些不同,它們暫時(shí)只是輸出一個(gè)警告:

UnhandledPromiseRejectionWarning: Unhandled promise rejection (rejection id: 1): Error: ENOENT: no such file or directory, open ''

DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.

另一種處理異常的方式是在監(jiān)聽(tīng)全局的 uncaughtException 進(jìn)程事件。 然而,使用該事件全局捕獲錯(cuò)誤并不是一個(gè)好辦法。

關(guān)于 uncaughtException,一般都會(huì)建議你避免使用它,但是如果必須用它,你應(yīng)該讓進(jìn)程退出:

process.on('uncaughtException', (err) => {
 // something went unhandled.
 // Do any cleanup and exit anyway!

 console.error(err); // don't do just that.

 // FORCE exit the process too.
 process.exit(1);
});

但是,假設(shè)在同一時(shí)間發(fā)生多個(gè)錯(cuò)誤事件,這意味著上面的 uncaughtException 監(jiān)聽(tīng)器將被多次觸發(fā),這可能會(huì)引起一些問(wèn)題。

EventEmitter 模塊暴露了 once 方法,這個(gè)方法發(fā)出的信號(hào)只會(huì)調(diào)用一次監(jiān)聽(tīng)器。所以,這個(gè)方法常與 uncaughtException 一起使用。

監(jiān)聽(tīng)器的順序

如果針對(duì)一個(gè)事件注冊(cè)多個(gè)監(jiān)聽(tīng)器函數(shù),當(dāng)事件被觸發(fā)時(shí),這些監(jiān)聽(tīng)器函數(shù)將按其注冊(cè)的順序被觸發(fā)。

// first
withTime.on('data', (data) => {
 console.log(`Length: ${data.length}`);
});

// second
withTime.on('data', (data) => {
 console.log(`Characters: ${data.toString().length}`);
});

withTime.execute(fs.readFile, __filename);

上述代碼會(huì)先輸出 Length 信息,再輸出 Characters 信息,執(zhí)行的順序與注冊(cè)的順序保持一致。

如果你想定義一個(gè)新的監(jiān)聽(tīng)函數(shù),但是希望它能夠第一個(gè)被執(zhí)行,你還可以使用 prependListener 方法:

withTime.on('data', (data) => {
 console.log(`Length: ${data.length}`);
});

withTime.prependListener('data', (data) => {
 console.log(`Characters: ${data.toString().length}`);
});

withTime.execute(fs.readFile, __filename);

上述代碼中,Charaters 信息將首先被輸出。

最后,你可以用 removeListener 函數(shù)來(lái)刪除某個(gè)監(jiān)聽(tīng)器函數(shù)。

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持億速云。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI