您好,登錄后才能下訂單哦!
這篇文章將為大家詳細(xì)講解有關(guān)如何用nodejs源碼分析線程,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。
我們先看一下一般的使用例子。
const { Worker, isMainThread, parentPort } = require('worker_threads');
if (isMainThread) {
const worker = new Worker(__filename);
worker.once('message', (message) => {
...
});
worker.postMessage('Hello, world!');
} else {
// 做點(diǎn)耗時(shí)的事情
parentPort.once('message', (message) => {
parentPort.postMessage(message);
});
}
我們先分析一下這個(gè)代碼的意思。因?yàn)樯厦娴拇a在主線程和子線程都會(huì)被執(zhí)行一遍。所以首先通過isMainThread判斷當(dāng)前是主線程還是子線程。主線程的話,就創(chuàng)建一個(gè)子線程,然后監(jiān)聽子線程發(fā)過來的消息。子線程的話,首先執(zhí)行業(yè)務(wù)相關(guān)的代碼,還可以監(jiān)聽主線程傳過來的消息。下面我們開始分析源碼。分析完,會(huì)對(duì)上面的代碼有更多的理解。
首先我們從worker_threads模塊開始分析。這是一個(gè)c++模塊。我們看一下他導(dǎo)出的功能。require("work_threads")的時(shí)候就是引用了InitWorker函數(shù)導(dǎo)出的功能。
void InitWorker(Local<Object> target,
Local<Value> unused,
Local<Context> context,
void* priv) {
Environment* env = Environment::GetCurrent(context);
{
// 執(zhí)行下面的方法時(shí),入?yún)⒍际莣->GetFunction() new出來的對(duì)象
// 新建一個(gè)函數(shù)模板,Worker::New是對(duì)w->GetFunction()執(zhí)行new的時(shí)候會(huì)執(zhí)行的回調(diào)
Local<FunctionTemplate> w = env->NewFunctionTemplate(Worker::New);
// 設(shè)置需要拓展的內(nèi)存,因?yàn)閏++對(duì)象的內(nèi)存是固定的
w->InstanceTemplate()->SetInternalFieldCount(1);
w->Inherit(AsyncWrap::GetConstructorTemplate(env));
// 設(shè)置一系列原型方法,就不一一列舉
env->SetProtoMethod(w, "setEnvVars", Worker::SetEnvVars);
// 一系列原型方法
// 導(dǎo)出函數(shù)模塊對(duì)應(yīng)的函數(shù),即我們代碼中const { Worker } = require("worker_threads");中的Worker
Local<String> workerString =
FIXED_ONE_BYTE_STRING(env->isolate(), "Worker");
w->SetClassName(workerString);
target->Set(env->context(),
workerString,
w->GetFunction(env->context()).ToLocalChecked()).Check();
}
// 導(dǎo)出getEnvMessagePort方法,const { getEnvMessagePort } = require("worker_threads");
env->SetMethod(target, "getEnvMessagePort", GetEnvMessagePort);
/*
線程id,這個(gè)不是操作系統(tǒng)分配的那個(gè),而是nodejs分配的,在新開線程的時(shí)候設(shè)置
const { threadId } = require("worker_threads");
*/
target
->Set(env->context(),
env->thread_id_string(),
Number::New(env->isolate(), static_cast<double>(env->thread_id())))
.Check();
/*
是否是主線程,const { isMainThread } = require("worker_threads");
這邊變量在nodejs啟動(dòng)的時(shí)候設(shè)置為true,新開子線程的時(shí)候,沒有設(shè)置,所以是false
*/
target
->Set(env->context(),
FIXED_ONE_BYTE_STRING(env->isolate(), "isMainThread"),
Boolean::New(env->isolate(), env->is_main_thread()))
.Check();
/*
如果不是主線程,導(dǎo)出資源限制的配置,
即在子線程中調(diào)用const { resourceLimits } = require("worker_threads");
*/
if (!env->is_main_thread()) {
target
->Set(env->context(),
FIXED_ONE_BYTE_STRING(env->isolate(), "resourceLimits"),
env->worker_context()->GetResourceLimits(env->isolate()))
.Check();
}
// 導(dǎo)出幾個(gè)常量
NODE_DEFINE_CONSTANT(target, kMaxYoungGenerationSizeMb);
NODE_DEFINE_CONSTANT(target, kMaxOldGenerationSizeMb);
NODE_DEFINE_CONSTANT(target, kCodeRangeSizeMb);
NODE_DEFINE_CONSTANT(target, kTotalResourceLimitCount);
}
翻譯成js大概是
function c++Worker(object) {
// 關(guān)聯(lián)起來,后續(xù)在js層調(diào)用c++層函數(shù)時(shí),取出來,拿到c++層真正的worker對(duì)象
object[0] = this;
...
}
function New(object) {
const worker = new c++Worker(object);
}
function Worker() {
New(this);
}
Worker.prototype = {
startThread,StartThread,
StopThread: StopThread,
...
}
module.exports = {
Worker: Worker,
getEnvMessagePort: GetEnvMessagePort,
isMainThread: true | false
...
}
了解work_threads模塊導(dǎo)出的功能后,我們看new Worker的時(shí)候的邏輯。根據(jù)上面代碼導(dǎo)出的邏輯,我們知道這時(shí)候首先會(huì)新建一個(gè)c++對(duì)象。對(duì)應(yīng)上面的Worker函數(shù)中的this。然后執(zhí)行New回調(diào),并傳入tihs。我們看New函數(shù)的邏輯。我們省略一系列的參數(shù)處理,主要代碼如下。
// args.This()就是我們剛才傳進(jìn)來的this
Worker* worker = new Worker(env, args.This(),
url, per_isolate_opts,
std::move(exec_argv_out));
我們再看Worker類。
Worker::Worker(Environment* env,
Local<Object> wrap,...)
// 在父類構(gòu)造函數(shù)中完成對(duì)象的Worker對(duì)象和args.This()對(duì)象的關(guān)聯(lián)
: AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER),
...
// 分配線程id
thread_id_(Environment::AllocateThreadId()),
env_vars_(env->env_vars()) {
// 新建一個(gè)端口和子線程通信
parent_port_ = MessagePort::New(env, env->context());
/*
關(guān)聯(lián)起來,用于通信
const parent_port_ = {data: {sibling: null}};
const child_port_data_ = {sibling: null};
parent_port_.data.sibling = child_port_data_;
child_port_data_.sibling = parent_port_.data;
*/
child_port_data_ = std::make_unique<MessagePortData>(nullptr);
MessagePort::Entangle(parent_port_, child_port_data_.get());
// 設(shè)置Worker對(duì)象的messagePort屬性為parent_port_
object()->Set(env->context(),
env->message_port_string(),
parent_port_->object()).Check();
// 設(shè)置Worker對(duì)象的線程id,即threadId屬性
object()->Set(env->context(),
env->thread_id_string(),
Number::New(env->isolate(), static_cast<double>(thread_id_)))
.Check();
}
新建一個(gè)Worker,結(jié)構(gòu)如下
constructor(filename, options = {}) {
super();
// 忽略一系列參數(shù)處理,new Worker就是上面提到的c++層的
this[kHandle] = new Worker(url, options.execArgv, parseResourceLimits(options.resourceLimits));
// messagePort就是上面圖中的messagePort,指向_parent_port
this[kPort] = this[kHandle].messagePort;
this[kPort].on('message', (data) => this[kOnMessage](data));
// 開始接收消息,我們這里不深入messagePort,后續(xù)單獨(dú)分析
this[kPort].start();
// 申請一個(gè)通信管道,兩個(gè)端口
const { port1, port2 } = new MessageChannel();
this[kPublicPort] = port1;
this[kPublicPort].on('message', (message) => this.emit('message', message));
// 向另一端發(fā)送消息
this[kPort].postMessage({
argv,
type: messageTypes.LOAD_SCRIPT,
filename,
doEval: !!options.eval,
cwdCounter: cwdCounter || workerIo.sharedCwdCounter,
workerData: options.workerData,
publicPort: port2,
manifestSrc: getOptionValue('--experimental-policy') ?
require('internal/process/policy').src :
null,
hasStdin: !!options.stdin
}, [port2]);
// 開啟線程
this[kHandle].startThread();
}
上面的代碼主要邏輯如下
1 保存messagePort,然后給messagePort的對(duì)端(看上面的圖)發(fā)送消息,但是這時(shí)候還沒有接收者,所以消息會(huì)緩存到MessagePortData,即child_port_data_ 中。
2 申請一個(gè)通信管道,用于主線程和子線程通信。_parent_port和child_port是給nodejs使用的,新申請的管道是給用戶使用的。
3 創(chuàng)建子線程。
我們看創(chuàng)建線程的時(shí)候,做了什么。
void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
Worker* w;
// 解包出對(duì)應(yīng)的Worker對(duì)象
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
// 新建一個(gè)子線程,然后執(zhí)行Run函數(shù),從此在子線程里執(zhí)行
uv_thread_create_ex(&w->tid_, &thread_options, [](void* arg) {
w->Run();
}, static_cast<void*>(w))
}
我們繼續(xù)看Run
void Worker::Run() {
{
// 新建一個(gè)env
env_.reset(new Environment(data.isolate_data_.get(),
context,
std::move(argv_),
std::move(exec_argv_),
Environment::kNoFlags,
thread_id_));
// 初始化libuv,往libuv注冊
env_->InitializeLibuv(start_profiler_idle_notifier_);
// 創(chuàng)建一個(gè)MessagePort
CreateEnvMessagePort(env_.get());
// 執(zhí)行internal/main/worker_thread.js
StartExecution(env_.get(), "internal/main/worker_thread");
// 開始事件循環(huán)
do {
uv_run(&data.loop_, UV_RUN_DEFAULT);
platform_->DrainTasks(isolate_);
more = uv_loop_alive(&data.loop_);
if (more && !is_stopped()) continue;
more = uv_loop_alive(&data.loop_);
} while (more == true && !is_stopped());
}
}
我們分步驟分析上面的代碼
1 CreateEnvMessagePort
void Worker::CreateEnvMessagePort(Environment* env) {
child_port_ = MessagePort::New(env,
env->context(),
std::move(child_port_data_));
if (child_port_ != nullptr)
env->set_message_port(child_port_->object(isolate_));
}
child_port_data_這個(gè)變量我們應(yīng)該很熟悉,在這里首先申請一個(gè)新的端口。負(fù)責(zé)端口中數(shù)據(jù)管理的對(duì)象是child_port_data_。然后在env緩存起來。一會(huì)要用。
// 設(shè)置process對(duì)象
patchProcessObject();
// 獲取剛才緩存的端口
onst port = getEnvMessagePort();
port.on('message', (message) => {
// 加載腳本
if (message.type === LOAD_SCRIPT) {
const {
argv,
cwdCounter,
filename,
doEval,
workerData,
publicPort,
manifestSrc,
manifestURL,
hasStdin
} = message;
const CJSLoader = require('internal/modules/cjs/loader');
loadPreloadModules();
/*
由主線程申請的MessageChannel管道中,某一端的端口,
設(shè)置publicWorker的parentPort字段,publicWorker就是worker_threads導(dǎo)出的對(duì)象,后面需要用
*/
publicWorker.parentPort = publicPort;
// 執(zhí)行時(shí)使用的數(shù)據(jù)
publicWorker.workerData = workerData;
// 通知主線程,正在執(zhí)行腳本
port.postMessage({ type: UP_AND_RUNNING });
// 執(zhí)行new Worker(filename)時(shí)傳入的文件
CJSLoader.Module.runMain(filename);
})
// 開始接收消息
port.start()
這時(shí)候我們再回頭看一下,我們調(diào)用new Worker(filename),然后在子線程里執(zhí)行我們的filename時(shí)的場景。我們再次回顧前面的代碼。
const { Worker, isMainThread, parentPort } = require('worker_threads');
if (isMainThread) {
const worker = new Worker(__filename);
worker.once('message', (message) => {
...
});
worker.postMessage('Hello, world!');
} else {
// 做點(diǎn)耗時(shí)的事情
parentPort.once('message', (message) => {
parentPort.postMessage(message);
});
}
我們知道isMainThread在子線程里是false,parentPort 就是就是messageChannel中的一端。所以parentPort.postMessage給對(duì)端發(fā)送消息,就是給主線程發(fā)送消息,我們再看看worker.postMessage('Hello, world!')。
postMessage(...args) {
this[kPublicPort].postMessage(...args);
}
kPublicPort指向的就是messageChannel的另一端。即給子線程發(fā)送消息。那么on('message')就是接收對(duì)端發(fā)過來的消息。
關(guān)于如何用nodejs源碼分析線程就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。