溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何用nodejs源碼分析線程

發(fā)布時(shí)間:2021-12-13 17:46:06 來源:億速云 閱讀:120 作者:柒染 欄目:大數(shù)據(jù)

這篇文章將為大家詳細(xì)講解有關(guān)如何用nodejs源碼分析線程,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。

我們先看一下一般的使用例子。

const { Worker, isMainThread, parentPort } = require('worker_threads');
if (isMainThread) {
  const worker = new Worker(__filename);
  worker.once('message', (message) => {
    ...
  });
  worker.postMessage('Hello, world!');
} else {
  // 做點(diǎn)耗時(shí)的事情
  parentPort.once('message', (message) => {
    parentPort.postMessage(message);
  });
}
 

我們先分析一下這個(gè)代碼的意思。因?yàn)樯厦娴拇a在主線程和子線程都會(huì)被執(zhí)行一遍。所以首先通過isMainThread判斷當(dāng)前是主線程還是子線程。主線程的話,就創(chuàng)建一個(gè)子線程,然后監(jiān)聽子線程發(fā)過來的消息。子線程的話,首先執(zhí)行業(yè)務(wù)相關(guān)的代碼,還可以監(jiān)聽主線程傳過來的消息。下面我們開始分析源碼。分析完,會(huì)對(duì)上面的代碼有更多的理解。
   首先我們從worker_threads模塊開始分析。這是一個(gè)c++模塊。我們看一下他導(dǎo)出的功能。require("work_threads")的時(shí)候就是引用了InitWorker函數(shù)導(dǎo)出的功能。

void InitWorker(Local<Object> target,
                Local<Value> unused,
                Local<Context> context,
                void* priv) {
  Environment* env = Environment::GetCurrent(context);

  {
    // 執(zhí)行下面的方法時(shí),入?yún)⒍际莣->GetFunction() new出來的對(duì)象
    // 新建一個(gè)函數(shù)模板,Worker::New是對(duì)w->GetFunction()執(zhí)行new的時(shí)候會(huì)執(zhí)行的回調(diào)
    Local<FunctionTemplate> w = env->NewFunctionTemplate(Worker::New);
    // 設(shè)置需要拓展的內(nèi)存,因?yàn)閏++對(duì)象的內(nèi)存是固定的
    w->InstanceTemplate()->SetInternalFieldCount(1);
    w->Inherit(AsyncWrap::GetConstructorTemplate(env));
    // 設(shè)置一系列原型方法,就不一一列舉
    env->SetProtoMethod(w, "setEnvVars", Worker::SetEnvVars);
    // 一系列原型方法
    // 導(dǎo)出函數(shù)模塊對(duì)應(yīng)的函數(shù),即我們代碼中const { Worker } = require("worker_threads");中的Worker
    Local<String> workerString =
        FIXED_ONE_BYTE_STRING(env->isolate(), "Worker");
    w->SetClassName(workerString);
    target->Set(env->context(),
                workerString,
                w->GetFunction(env->context()).ToLocalChecked()).Check();
  }
  // 導(dǎo)出getEnvMessagePort方法,const { getEnvMessagePort } = require("worker_threads");
  env->SetMethod(target, "getEnvMessagePort", GetEnvMessagePort);
  /*
      線程id,這個(gè)不是操作系統(tǒng)分配的那個(gè),而是nodejs分配的,在新開線程的時(shí)候設(shè)置
      const { threadId } = require("worker_threads");
  */
  target
      ->Set(env->context(),
            env->thread_id_string(),
            Number::New(env->isolate(), static_cast<double>(env->thread_id())))
      .Check();
  /*
      是否是主線程,const { isMainThread } = require("worker_threads");
      這邊變量在nodejs啟動(dòng)的時(shí)候設(shè)置為true,新開子線程的時(shí)候,沒有設(shè)置,所以是false
  */
  target
      ->Set(env->context(),
            FIXED_ONE_BYTE_STRING(env->isolate(), "isMainThread"),
            Boolean::New(env->isolate(), env->is_main_thread()))
      .Check();
  /*
      如果不是主線程,導(dǎo)出資源限制的配置,
      即在子線程中調(diào)用const { resourceLimits } = require("worker_threads");
  */
  if (!env->is_main_thread()) {
    target
        ->Set(env->context(),
              FIXED_ONE_BYTE_STRING(env->isolate(), "resourceLimits"),
              env->worker_context()->GetResourceLimits(env->isolate()))
        .Check();
  }
  // 導(dǎo)出幾個(gè)常量
  NODE_DEFINE_CONSTANT(target, kMaxYoungGenerationSizeMb);
  NODE_DEFINE_CONSTANT(target, kMaxOldGenerationSizeMb);
  NODE_DEFINE_CONSTANT(target, kCodeRangeSizeMb);
  NODE_DEFINE_CONSTANT(target, kTotalResourceLimitCount);
}
 

翻譯成js大概是

function c++Worker(object) {
    // 關(guān)聯(lián)起來,后續(xù)在js層調(diào)用c++層函數(shù)時(shí),取出來,拿到c++層真正的worker對(duì)象 
    object[0] = this;
    ...
}
function New(object) {
    const worker = new c++Worker(object);
}
function Worker() {
    New(this);
}
Worker.prototype = {
    startThread,StartThread,
    StopThread: StopThread,
    ...
}
module.exports = {
    Worker: Worker,
    getEnvMessagePort: GetEnvMessagePort,
    isMainThread: true | false
    ...
}
 

了解work_threads模塊導(dǎo)出的功能后,我們看new Worker的時(shí)候的邏輯。根據(jù)上面代碼導(dǎo)出的邏輯,我們知道這時(shí)候首先會(huì)新建一個(gè)c++對(duì)象。對(duì)應(yīng)上面的Worker函數(shù)中的this。然后執(zhí)行New回調(diào),并傳入tihs。我們看New函數(shù)的邏輯。我們省略一系列的參數(shù)處理,主要代碼如下。

// args.This()就是我們剛才傳進(jìn)來的this
Worker* worker = new Worker(env, args.This(), 
                            url, per_isolate_opts,
                             std::move(exec_argv_out));
 

我們再看Worker類。

Worker::Worker(Environment* env,
               Local<Object> wrap,...)
    // 在父類構(gòu)造函數(shù)中完成對(duì)象的Worker對(duì)象和args.This()對(duì)象的關(guān)聯(lián)
    : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER),
      ...
      // 分配線程id
      thread_id_(Environment::AllocateThreadId()),
      env_vars_(env->env_vars()) {

  // 新建一個(gè)端口和子線程通信
  parent_port_ = MessagePort::New(env, env->context());
  /*
    關(guān)聯(lián)起來,用于通信
    const parent_port_ = {data: {sibling: null}};
    const child_port_data_  = {sibling: null};
    parent_port_.data.sibling = child_port_data_;
    child_port_data_.sibling = parent_port_.data;
  */
  child_port_data_ = std::make_unique<MessagePortData>(nullptr);
  MessagePort::Entangle(parent_port_, child_port_data_.get());
  // 設(shè)置Worker對(duì)象的messagePort屬性為parent_port_
  object()->Set(env->context(),
                env->message_port_string(),
                parent_port_->object()).Check();
  // 設(shè)置Worker對(duì)象的線程id,即threadId屬性
  object()->Set(env->context(),
                env->thread_id_string(),
                Number::New(env->isolate(), static_cast<double>(thread_id_)))
      .Check();
}
 

新建一個(gè)Worker,結(jié)構(gòu)如下

如何用nodejs源碼分析線程  

了解了new Worker的邏輯后,我們看在js層是如何使用的。我們看js層Worker類的構(gòu)造函數(shù)。  
constructor(filename, options = {}) {
    super();
    // 忽略一系列參數(shù)處理,new Worker就是上面提到的c++層的
    this[kHandle] = new Worker(url, options.execArgv, parseResourceLimits(options.resourceLimits));
    // messagePort就是上面圖中的messagePort,指向_parent_port
    this[kPort] = this[kHandle].messagePort;
    this[kPort].on('message', (data) => this[kOnMessage](data));
    // 開始接收消息,我們這里不深入messagePort,后續(xù)單獨(dú)分析
    this[kPort].start();
    // 申請一個(gè)通信管道,兩個(gè)端口
    const { port1, port2 } = new MessageChannel();
    this[kPublicPort] = port1;
    this[kPublicPort].on('message', (message) => this.emit('message', message));
    // 向另一端發(fā)送消息
    this[kPort].postMessage({
      argv,
      type: messageTypes.LOAD_SCRIPT,
      filename,
      doEval: !!options.eval,
      cwdCounter: cwdCounter || workerIo.sharedCwdCounter,
      workerData: options.workerData,
      publicPort: port2,
      manifestSrc: getOptionValue('--experimental-policy') ?
        require('internal/process/policy').src :
        null,
      hasStdin: !!options.stdin
    }, [port2]);
    // 開啟線程
    this[kHandle].startThread();
  }
 

上面的代碼主要邏輯如下
1 保存messagePort,然后給messagePort的對(duì)端(看上面的圖)發(fā)送消息,但是這時(shí)候還沒有接收者,所以消息會(huì)緩存到MessagePortData,即child_port_data_ 中。
2 申請一個(gè)通信管道,用于主線程和子線程通信。_parent_port和child_port是給nodejs使用的,新申請的管道是給用戶使用的。
3 創(chuàng)建子線程。
我們看創(chuàng)建線程的時(shí)候,做了什么。

void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
  Worker* w;
  // 解包出對(duì)應(yīng)的Worker對(duì)象
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
  // 新建一個(gè)子線程,然后執(zhí)行Run函數(shù),從此在子線程里執(zhí)行
  uv_thread_create_ex(&w->tid_, &thread_options, [](void* arg) {
    w->Run();
  }, static_cast<void*>(w))
}
 

我們繼續(xù)看Run

void Worker::Run() {
    {
        // 新建一個(gè)env
        env_.reset(new Environment(data.isolate_data_.get(),
                                   context,
                                   std::move(argv_),
                                   std::move(exec_argv_),
                                   Environment::kNoFlags,
                                   thread_id_));
        // 初始化libuv,往libuv注冊
        env_->InitializeLibuv(start_profiler_idle_notifier_);
        // 創(chuàng)建一個(gè)MessagePort
        CreateEnvMessagePort(env_.get());
        // 執(zhí)行internal/main/worker_thread.js
        StartExecution(env_.get(), "internal/main/worker_thread");
        // 開始事件循環(huán)
        do {
          uv_run(&data.loop_, UV_RUN_DEFAULT);
          platform_->DrainTasks(isolate_);
          more = uv_loop_alive(&data.loop_);
          if (more && !is_stopped()) continue;
          more = uv_loop_alive(&data.loop_);
        } while (more == true && !is_stopped());
     }
}
 

我們分步驟分析上面的代碼
1 CreateEnvMessagePort

void Worker::CreateEnvMessagePort(Environment* env) {
  child_port_ = MessagePort::New(env,
                                 env->context(),
                                 std::move(child_port_data_));

  if (child_port_ != nullptr)
    env->set_message_port(child_port_->object(isolate_));
}
 

child_port_data_這個(gè)變量我們應(yīng)該很熟悉,在這里首先申請一個(gè)新的端口。負(fù)責(zé)端口中數(shù)據(jù)管理的對(duì)象是child_port_data_。然后在env緩存起來。一會(huì)要用。

如何用nodejs源碼分析線程  

2 執(zhí)行internal/main/worker_thread.js  
// 設(shè)置process對(duì)象
patchProcessObject();
// 獲取剛才緩存的端口
onst port = getEnvMessagePort();
port.on('message', (message) => {
  // 加載腳本
  if (message.type === LOAD_SCRIPT) {
    const {
      argv,
      cwdCounter,
      filename,
      doEval,
      workerData,
      publicPort,
      manifestSrc,
      manifestURL,
      hasStdin
    } = message;

    const CJSLoader = require('internal/modules/cjs/loader');
    loadPreloadModules();
    /*
        由主線程申請的MessageChannel管道中,某一端的端口,
        設(shè)置publicWorker的parentPort字段,publicWorker就是worker_threads導(dǎo)出的對(duì)象,后面需要用
    */
    publicWorker.parentPort = publicPort;
    // 執(zhí)行時(shí)使用的數(shù)據(jù)
    publicWorker.workerData = workerData;
    // 通知主線程,正在執(zhí)行腳本
    port.postMessage({ type: UP_AND_RUNNING });
    // 執(zhí)行new Worker(filename)時(shí)傳入的文件
    CJSLoader.Module.runMain(filename);
})
// 開始接收消息
port.start()
 

這時(shí)候我們再回頭看一下,我們調(diào)用new Worker(filename),然后在子線程里執(zhí)行我們的filename時(shí)的場景。我們再次回顧前面的代碼。

const { Worker, isMainThread, parentPort } = require('worker_threads');
if (isMainThread) {
  const worker = new Worker(__filename);
  worker.once('message', (message) => {
    ...
  });
  worker.postMessage('Hello, world!');
} else {
  // 做點(diǎn)耗時(shí)的事情
  parentPort.once('message', (message) => {
    parentPort.postMessage(message);
  });
}
 

我們知道isMainThread在子線程里是false,parentPort 就是就是messageChannel中的一端。所以parentPort.postMessage給對(duì)端發(fā)送消息,就是給主線程發(fā)送消息,我們再看看worker.postMessage('Hello, world!')。

 postMessage(...args) {
    this[kPublicPort].postMessage(...args);
 }
 

kPublicPort指向的就是messageChannel的另一端。即給子線程發(fā)送消息。那么on('message')就是接收對(duì)端發(fā)過來的消息。

關(guān)于如何用nodejs源碼分析線程就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI