溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

nodejs流基類怎么實現(xiàn)

發(fā)布時間:2021-12-17 09:29:35 來源:億速云 閱讀:110 作者:iii 欄目:大數(shù)據(jù)

這篇文章主要講解了“nodejs流基類怎么實現(xiàn)”,文中的講解內(nèi)容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“nodejs流基類怎么實現(xiàn)”吧!

流是對數(shù)據(jù)生產(chǎn),消費的一種抽象,今天先分析一下流基類的實現(xiàn)

const EE = require('events');
const util = require('util');
// 流的基類
function Stream() {
  EE.call(this);
}
// 繼承事件訂閱分發(fā)的能力
util.inherits(Stream, EE);
 

流的基類只提供了一個函數(shù)就是pipe。用于實現(xiàn)管道化。這個方法代碼比較多,分開說。

 

1 處理數(shù)據(jù)事件

 function ondata(chunk) {
    // 源流有數(shù)據(jù)到達,并且目的流可寫
    if (dest.writable) {
      // 目的流過載并且源流實現(xiàn)了pause方法,那就暫停可讀流的讀取操作,等待目的流觸發(fā)drain事件
      if (false === dest.write(chunk) && source.pause) {
        source.pause();
      }
    }
 }
  // 監(jiān)聽data事件,可讀流有數(shù)據(jù)的時候,會觸發(fā)data事件
  source.on('data', ondata);

  function ondrain() {
    // 目的流可寫了,并且可讀流可讀,切換成自動讀取模式
    if (source.readable && source.resume) {
      source.resume();
    }
  }
  // 監(jiān)聽drain事件,目的流可以消費數(shù)據(jù)了就會觸發(fā)該事件
  dest.on('drain', ondrain);
 

這是管道化時流控實現(xiàn)的地方,主要是利用了write返回值和drain事件。

 

流關(guān)閉/結(jié)束處理

 // 目的流不是標準輸出或標準錯誤,并且end不等于false
  if (!dest._isStdio && (!options || options.end !== false)) {
    // 源流沒有數(shù)據(jù)可讀了,執(zhí)行end回調(diào),告訴目的流,沒有數(shù)據(jù)可讀了
    source.on('end', onend);
    // 源流關(guān)閉了,執(zhí)行close回調(diào)
    source.on('close', onclose);
  }
  // 兩個函數(shù)只會執(zhí)行一次,也只會執(zhí)行一個
  var didOnEnd = false;
  function onend() {
    if (didOnEnd) return;
    didOnEnd = true;
    // 執(zhí)行目的流的end函數(shù),說明寫數(shù)據(jù)完畢
    dest.end();
  }

  function onclose() {
    if (didOnEnd) return;
    didOnEnd = true;
    // 銷毀目的流
    if (typeof dest.destroy === 'function') dest.destroy();
  }
 

這里是處理源流結(jié)束和關(guān)閉后,通知目的流的邏輯。

 

錯誤處理和事件清除

  // remove all the event listeners that were added.
  function cleanup() {
    source.removeListener('data', ondata);
    dest.removeListener('drain', ondrain);

    source.removeListener('end', onend);
    source.removeListener('close', onclose);

    source.removeListener('error', onerror);
    dest.removeListener('error', onerror);

    source.removeListener('end', cleanup);
    source.removeListener('close', cleanup);

    dest.removeListener('close', cleanup);
  }

  function onerror(er) {
    // 出錯了,清除注冊的事件,包括正在執(zhí)行的onerror函數(shù)
    cleanup();
    // 如果用戶沒有監(jiān)聽流的error事件,則拋出錯誤,所以我們業(yè)務(wù)代碼需要監(jiān)聽error事件
    if (EE.listenerCount(this, 'error') === 0) {
      throw er; // Unhandled stream error in pipe.
    }
  }
  // 監(jiān)聽流的error事件
  source.on('error', onerror);
  dest.on('error', onerror);
  // 源流關(guān)閉或者沒有數(shù)據(jù)可讀時,清除注冊的事件
  source.on('end', cleanup);
  source.on('close', cleanup);
  // 目的流關(guān)閉了也清除他注冊的事件
  dest.on('close', cleanup);
 

這里主要是處理了error事件和流關(guān)閉/結(jié)束/出錯時清除訂閱的事件。這就是流基類的所有邏輯。

感謝各位的閱讀,以上就是“nodejs流基類怎么實現(xiàn)”的內(nèi)容了,經(jīng)過本文的學習后,相信大家對nodejs流基類怎么實現(xiàn)這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI