线程是操作系统的最小调度单位,它本质上是进程中的一个执行流,我们知道,进程有代码段,线程其实就是进程代码段中的其中一段代码。线程的一种实现是作为进程来实现的(pthread线程库),通过调用clone,新建一个进程,然后执行父进程代码段里的一个代码片段,其中文件描述符、内存等信息都是共享的。因为内存是共享的,所以线程不能共享栈,否则访问栈的地址的时候,会映射到相同的物理地址,那样就会互相影响,所以每个线程会有自己独立的栈。在调用clone函数的时候会设置栈的范围,比如在堆上分配一块内存用于做线程的栈,并且支持设置子线程和主线程共享哪些资源。具体可以参考clone系统调用。
由于Node.js是单线程的,虽然底层的Libuv实现了一个线程池,但是这个线程池只能执行C、C++层定义的任务。如果我们想自定义一些耗时的操作,那就只能在C++层处理,然后暴露接口给JS层调用,这个成本是非常高的,在早期的Node.js版本里,我们可以用进程去实现这样的需求。但是进程太重了,在新版的Node.js中,Node.js为我们提供了多线程的功能。这一章以Node.js多线程模块为背景,分析Node.js中多线程的原理,但是不分析Libuv的线程实现,它本质是对线程库的简单封装。Node.js中,线程的实现也非常复杂。虽然底层只是对线程库的封装,但是把它和Node.js原本的架构结合起来变得复杂起来。
对于同步文件操作、DNS解析等操作,Node.js使用了内置的线程池支持了异步。但是一些加解密、字符串运算、阻塞型API等操作。我们就不能在主线程里处理了,这时候就不得不使用线程,而且多线程还能利用多核的能力。Node.js的子线程本质上是一个新的事件循环,但是子线程和Node.js主线程共享一个Libuv线程池,所以如果在子线程里有文件、DNS等操作就会和主线程竞争Libuv线程池。如图14-1所示。 图14-1 我们看一下在Node.js中如何使用线程。
const { Worker, isMainThread, parentPort } = require('worker_threads'); if (isMainThread) { const worker = new Worker(__filename); worker.once('message', (message) => { ... }); worker.postMessage('Hello, world!'); } else { // 做点耗时的事情 parentPort.once('message', (message) => { parentPort.postMessage(message); }); }
上面这段代码会被执行两次,一次是在主线程,一次在子线程。所以首先通过isMainThread判断当前是主线程还是子线程。主线程的话,就创建一个子线程,然后监听子线程发过来的消息。子线程的话,首先执行业务相关的代码,还可以监听主线程传过来的消息。我们在子线程中可以做一些耗时或者阻塞性的操作,不会影响主线程的执行。我们也可以把这两个逻辑拆分到两个文件。
主线程
const { Worker, isMainThread, parentPort } = require('worker_threads'); const worker = new Worker(‘子线程文件路径’); worker.once('message', (message) => { ... }); worker.postMessage('Hello, world!');
子线程
const { Worker, isMainThread, parentPort } = require('worker_threads'); parentPort.once('message', (message) => { parentPort.postMessage(message); });
进程间的通信一般需要借助操作系统提供公共的内存来完成。因为进程间的内存是独立的,和进程间通信不一样。多线程的内存是共享的,同个进程的内存,多个线程都可以访问,所以线程间通信可以基于进程内的内存来完成。在Node.js中,线程间通信使用的是MessageChannel实现的,它是全双工的,任意一端都可以随时发送信息。MessageChannel类似socket通信,它包括两个端点。定义一个MessageChannel相当于建立一个TCP连接,它首先申请两个端点(MessagePort),然后把它们关联起来。下面我们看一下线程间通信的实现中,比较重要的几个数据结构。 1 Message代表一个消息。 2 MessagePortData是对操作Message的封装和对消息的承载。 3 MessagePort是代表通信的端点。 4 MessageChannel是代表通信的两端,即两个MessagePort。 下面我们看一下具体的实现。 14.2.1 Message Message类代表的是子线程间通信的一条消息。
class Message : public MemoryRetainer { public: explicit Message(MallocedBuffer<char>&& payload = MallocedBuffer<char>()); // 是否是最后一条消息,空消息代表是最后一条消息 bool IsCloseMessage() const; // 线程间通信的数据需要通过序列化和反序列化处理 v8::MaybeLocal<v8::Value> Deserialize(Environment* env, v8::Local<v8::Context> context); v8::Maybe<bool> Serialize(Environment* env, v8::Local<v8::Context> context, v8::Local<v8::Value> input, const TransferList& transfer_list, v8::Local<v8::Object> source_port = v8::Local<v8::Object>()); // 传递SharedArrayBuffer型变量 void AddSharedArrayBuffer(std::shared_ptr<v8::BackingStore> backing_store); // 传递MessagePort型变量 void AddMessagePort(std::unique_ptr<MessagePortData>&& data); // 消息所属端口,端口是消息到达的地方 const std::vector<std::unique_ptr<MessagePortData>>& message_ports() const { return message_ports_; } private: // 保存消息的内容 MallocedBuffer<char> main_message_buf_; std::vector<std::shared_ptr<v8::BackingStore>> array_buffers_; std::vector<std::shared_ptr<v8::BackingStore>> shared_array_buffers_; std::vector<std::unique_ptr<MessagePortData>> message_ports_; std::vector<v8::CompiledWasmModule> wasm_modules_; };
MessagePortData是管理消息发送和接收的类。
class MessagePortData : public MemoryRetainer { public: explicit MessagePortData(MessagePort* owner); ~MessagePortData() override; // 新增一个消息 void AddToIncomingQueue(Message&& message); // 关联/解关联通信两端的端口 static void Entangle(MessagePortData* a, MessagePortData* b); void Disentangle(); private: // 用于多线程往对端消息队列插入消息时的互斥变量 mutable Mutex mutex_; std::list<Message> incoming_messages_; // 所属端口 MessagePort* owner_ = nullptr; // 用于多线程访问对端sibling_属性时的互斥变量 std::shared_ptr<Mutex> sibling_mutex_ = std::make_shared<Mutex>(); // 指向通信对端的指针 MessagePortData* sibling_ = nullptr; };
我们看一下实现。
MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { } MessagePortData::~MessagePortData() { // 析构时解除和对端的关系 Disentangle(); } // 插入一个message void MessagePortData::AddToIncomingQueue(Message&& message) { // 先加锁,保证多线程安全,互斥访问 Mutex::ScopedLock lock(mutex_); // 插入消息队列 incoming_messages_.emplace_back(std::move(message)); // 通知owner if (owner_ != nullptr) { owner_->TriggerAsync(); } } // 关联通信的对端,并保持对端的互斥变量,访问对端时需要使用 void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) { a->sibling_ = b; b->sibling_ = a; a->sibling_mutex_ = b->sibling_mutex_; } // 解除关联 void MessagePortData::Disentangle() { // 加锁操作对端的sibling字段 std::shared_ptr<Mutex> sibling_mutex = sibling_mutex_; Mutex::ScopedLock sibling_lock(*sibling_mutex); sibling_mutex_ = std::make_shared<Mutex>(); // 对端 MessagePortData* sibling = sibling_; // 对端非空,则把对端的sibling也指向空,自己也指向空 if (sibling_ != nullptr) { sibling_->sibling_ = nullptr; sibling_ = nullptr; } // 插入一个空的消息通知对端和本端 AddToIncomingQueue(Message()); if (sibling != nullptr) { sibling->AddToIncomingQueue(Message()); } }
MessagePort表示的是通信的一端。
class MessagePort : public HandleWrap { public: MessagePort(Environment* env, v8::Local<v8::Context> context, v8::Local<v8::Object> wrap); ~MessagePort() override; static MessagePort* New(Environment* env, v8::Local<v8::Context> context, std::unique_ptr<MessagePortData> data = nullptr); // 发送消息 v8::Maybe<bool> PostMessage(Environment* env, v8::Local<v8::Value> message, const TransferList& transfer); // 开启/关闭接收消息 void Start(); void Stop(); static void New(const v8::FunctionCallbackInfo<v8::Value>& args); // 提供JS层使用的方法 static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args); static void Start(const v8::FunctionCallbackInfo<v8::Value>& args); static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args); static void Drain(const v8::FunctionCallbackInfo<v8::Value>& args); static void ReceiveMessage(const v8::FunctionCallbackInfo<v8::Value>& args); // 关联对端 static void Entangle(MessagePort* a, MessagePort* b); static void Entangle(MessagePort* a, MessagePortData* b); // 解除MessagePortData和端口的关系 std::unique_ptr<MessagePortData> Detach(); // 关闭端口 void Close( v8::Local<v8::Value> close_callback = v8::Local<v8::Value>()) override; inline bool IsDetached() const; private: void OnClose() override; void OnMessage(); void TriggerAsync(); v8::MaybeLocal<v8::Value> ReceiveMessage(v8::Local<v8::Context> context, bool only_if_receiving); // MessagePortData用于管理消息的发送和接收 std::unique_ptr<MessagePortData> data_ = nullptr; // 是否开启接收消息标记 bool receiving_messages_ = false; // 用于收到消息时通知事件循环,事件循环执行回调处理消息 uv_async_t async_; };
我们看一下实现,只列出部分函数。
// 端口是否不接收消息了 bool MessagePort::IsDetached() const { return data_ == nullptr || IsHandleClosing(); } // 有消息到达,通知事件循环执行回调 void MessagePort::TriggerAsync() { if (IsHandleClosing()) return; CHECK_EQ(uv_async_send(&async_), 0); } // 关闭接收消息的端口 void MessagePort::Close(v8::Local<v8::Value> close_callback) { if (data_) { // 持有锁,防止再接收消息 Mutex::ScopedLock sibling_lock(data_->mutex_); HandleWrap::Close(close_callback); } else { HandleWrap::Close(close_callback); } } // 新建一个端口,并且可以挂载一个MessagePortData MessagePort* MessagePort::New( Environment* env, Local<Context> context, std::unique_ptr<MessagePortData> data) { Context::Scope context_scope(context); Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env); Local<Object> instance; // JS层使用的对象 if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance)) return nullptr; // 新建一个消息端口 MessagePort* port = new MessagePort(env, context, instance); // 需要挂载MessagePortData if (data) { port->Detach(); port->data_ = std::move(data); Mutex::ScopedLock lock(port->data_->mutex_); // 修改data的owner为当前消息端口 port->data_->owner_ = port; // data中可能有消息 port->TriggerAsync(); } return port; } // 开始接收消息 void MessagePort::Start() { Debug(this, "Start receiving messages"); receiving_messages_ = true; Mutex::ScopedLock lock(data_->mutex_); // 有缓存的消息,通知上层 if (!data_->incoming_messages_.empty()) TriggerAsync(); } // 停止接收消息 void MessagePort::Stop() { Debug(this, "Stop receiving messages"); receiving_messages_ = false; } // JS层调用 void MessagePort::Start(const FunctionCallbackInfo<Value>& args) { MessagePort* port; ASSIGN_OR_RETURN_UNWRAP(&port, args.This()); if (!port->data_) { return; } port->Start(); } void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) { MessagePort* port; CHECK(args[0]->IsObject()); ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>()); if (!port->data_) { return; } port->Stop(); } // 读取消息 void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) { MessagePort* port; ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>()); port->OnMessage(); } // 获取某个端口的消息 void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) { CHECK(args[0]->IsObject()); // 第一个参数是端口 MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>()); // 调用对象的ReceiverMessage方法 MaybeLocal<Value> payload = port->ReceiveMessage(port->object()->CreationContext(), false); if (!payload.IsEmpty()) args.GetReturnValue().Set(payload.ToLocalChecked()); } // 关联两个端口 void MessagePort::Entangle(MessagePort* a, MessagePort* b) { Entangle(a, b->data_.get()); } void MessagePort::Entangle(MessagePort* a, MessagePortData* b) { MessagePortData::Entangle(a->data_.get(), b); }
MessageChannel表示线程间通信的两个端。
static void MessageChannel(const FunctionCallbackInfo<Value>& args) { Environment* env = Environment::GetCurrent(args); Local<Context> context = args.This()->CreationContext(); Context::Scope context_scope(context); MessagePort* port1 = MessagePort::New(env, context); MessagePort* port2 = MessagePort::New(env, context); MessagePort::Entangle(port1, port2); // port1->object()拿到JS层使用的对象,它关联了MessagePort对象 args.This()->Set(context, env->port1_string(), port1->object()) .Check(); args.This()->Set(context, env->port2_string(), port2->object()) .Check(); }
MessageChannel的逻辑比较简单,新建两个消息端口,并且关联起来,后续就可以基于这两个端口进行通信了。 Message、MessagePortData、MessagePort和MessageChannel的关系图如图14-2所示。 图14-2 最后我们看一下线程间通信模块导出的一些功能。
static void InitMessaging(Local<Object> target, Local<Value> unused, Local<Context> context, void* priv) { Environment* env = Environment::GetCurrent(context); { // 线程间通信的通道 Local<String> message_channel_string = FIXED_ONE_BYTE_STRING(env->isolate(), "MessageChannel"); Local<FunctionTemplate> templ = env->NewFunctionTemplate(MessageChannel); templ->SetClassName(message_channel_string); target->Set(context, message_channel_string, templ->GetFunction(context).ToLocalChecked()).Check(); } // 新建消息端口的构造函数 target->Set(context, env->message_port_constructor_string(), GetMessagePortConstructorTemplate(env) ->GetFunction(context).ToLocalChecked()).Check(); env->SetMethod(target, "stopMessagePort", MessagePort::Stop); env->SetMethod(target, "drainMessagePort", MessagePort::Drain); env->SetMethod(target, "receiveMessageOnPort", MessagePort::ReceiveMessage); env->SetMethod(target, "moveMessagePortToContext", MessagePort::MoveToContext); }
本节我们从worker_threads模块开始分析多线程的实现。这是一个C++模块。我们看一下它导出的功能。require("work_threads")的时候就是引用了InitWorker函数导出的功能。
void InitWorker(Local<Object> target, Local<Value> unused, Local<Context> context, void* priv) { Environment* env = Environment::GetCurrent(context); { Local<FunctionTemplate> w = env->NewFunctionTemplate(Worker::New); w->InstanceTemplate()->SetInternalFieldCount(1); w->Inherit(AsyncWrap::GetConstructorTemplate(env)); // 设置一系列原型方法,就不一一列举 env->SetProtoMethod(w, "setEnvVars", Worker::SetEnvVars); // 一系列原型方法 /* 导出函数模块对应的函数,即我们代码中 const { Worker } = require("worker_threads");中的Worker */ Local<String> workerString = FIXED_ONE_BYTE_STRING(env->isolate(), "Worker"); w->SetClassName(workerString); target->Set(env->context(), workerString, w->GetFunction(env->context()).ToLocalChecked()).Check(); /* 导出getEnvMessagePort方法,获取线程接收消息的端口 const {getEnvMessagePort} = require("worker_threads"); */ env->SetMethod(target, "getEnvMessagePort", GetEnvMessagePort); /* 线程id,这个不是操作系统分配的那个,而是Node.js分配的, 在创建线程的时候设置 const { threadId } = require("worker_threads"); */ target->Set(env->context(), env->thread_id_string(), Number::New(env->isolate(), static_cast<double>(env->thread_id()))) .Check(); /* 是否是主线程, const { isMainThread } = require("worker_threads"); 这边变量在Node.js启动的时候设置为true,新开子线程的时候,没有设 置,所以是false */ target->Set(env->context(), FIXED_ONE_BYTE_STRING(env->isolate(), "isMainThread"), Boolean::New(env->isolate(), env->is_main_thread())) .Check(); /* 如果不是主线程,导出资源限制的配置, 即在子线程中调用 const { resourceLimits } = require("worker_threads"); */ if (!env->is_main_thread()) { target->Set(env->context(), FIXED_ONE_BYTE_STRING(env->isolate(), "resourceLimits"), env->worker_context()->GetResourceLimits(env->isolate())).Check(); } // 导出几个常量 NODE_DEFINE_CONSTANT(target, kMaxYoungGenerationSizeMb); NODE_DEFINE_CONSTANT(target, kMaxOldGenerationSizeMb); NODE_DEFINE_CONSTANT(target, kCodeRangeSizeMb); NODE_DEFINE_CONSTANT(target, kTotalResourceLimitCount); }
了解work_threads模块导出的功能后,我们看在JS层执行new Worker的时候的逻辑。根据上面代码导出的逻辑,我们知道这时候首先会新建一个C++对象。然后执行New回调,并传入新建的C++对象。我们看New函数的逻辑。我们省略一系列的参数处理,主要代码如下。
// args.This()就是我们刚才传进来的this Worker* worker = new Worker(env, args.This(), url, per_isolate_opts, std::move(exec_argv_out));
我们再看Worker类的声明。
class Worker : public AsyncWrap { public: // 函数声明 private: std::shared_ptr<PerIsolateOptions> per_isolate_opts_; std::vector<std::string> exec_argv_; std::vector<std::string> argv_; MultiIsolatePlatform* platform_; v8::Isolate* isolate_ = nullptr; bool start_profiler_idle_notifier_; // 真正的线程id,底层返回的 uv_thread_t tid_; // This mutex protects access to all variables listed below it. mutable Mutex mutex_; bool thread_joined_ = true; const char* custom_error_ = nullptr; int exit_code_ = 0; // 线程id,Node.js分配,不是底层返回的 uint64_t thread_id_ = -1; uintptr_t stack_base_ = 0; // 线程资源限制配置 double resource_limits_[kTotalResourceLimitCount]; void UpdateResourceConstraints(v8::ResourceConstraints* constraints); // 栈信息 static constexpr size_t kStackSize = 4 * 1024 * 1024; static constexpr size_t kStackBufferSize = 192 * 1024; std::unique_ptr<MessagePortData> child_port_data_; std::shared_ptr<KVStore> env_vars_; // 用于线程间通信 MessagePort* child_port_ = nullptr; MessagePort* parent_port_ = nullptr; // 线程状态 bool stopped_ = true; // 是否影响事件循环退出 bool has_ref_ = true; // 子线程执行时的环境变量,基类也定义了 Environment* env_ = nullptr; };
这里只讲一下env_的定义,因为这是一个非常重要的地方。我们看到Worker类继承AsyncWrap,AsyncWrap继承了BaseObject。BaseObject中也定义了env_属性。我们看一下在C++中如果子类父类都定义了一个属性时是怎样的。我们来看一个例子
#include <iostream> using namespace std; class A { public: int value; A() { value=1; } void console() { cout<<value<<endl; } }; class B: public A { public: int value; B():A() { value=2; } }; int main() { B b; // b.value = 3;只会修改子类的,不会修改父类的 b.console(); cout<<b.value<<endl<<"内存大小:"<<sizeof(b)<<endl; return 0; }
以上代码执行时输出 1 2 内存大小:8 由输出结果我们可以知道,b内存大小是8个字节。即两个int。所以b的内存布局中两个a属性都分配了内存。当我们通过b.console输出value时,因为console是在A上定义的,所以输出1,但是我们通过b.value访问时,输出的是2。因为访问的是B中定义的value,同理如果我们在B中定义console,输出也会是2。Worker中定义的env_我们后续会看到它的作用。接着我们看一下Worker类的初始化逻辑。
Worker::Worker(Environment* env, Local<Object> wrap,...) : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER), ... // 分配线程id thread_id_(Environment::AllocateThreadId()), // 继承主线程的环境变量 env_vars_(env->env_vars()) { // 新建一个端口和子线程通信 parent_port_ = MessagePort::New(env, env->context()); /* 关联起来,用于通信 const parent_port_ = {data: {sibling: null}}; const child_port_data_ = {sibling: null}; parent_port_.data.sibling = child_port_data_; child_port_data_.sibling = parent_port_.data; */ child_port_data_ = std::make_unique<MessagePortData>(nullptr); MessagePort::Entangle(parent_port_, child_port_data_.get()); // 设置JS层Worker对象的messagePort属性为parent_port_ object()->Set(env->context(), env->message_port_string(), parent_port_->object()).Check(); // 设置Worker对象的线程id,即threadId属性 object()->Set(env->context(), env->thread_id_string(), Number::New(env->isolate(), static_cast<double>(thread_id_))) .Check(); }
新建一个Worker,结构如图14-3所示。 图14-3
了解了new Worker的逻辑后,我们看在JS层是如何使用的。我们看JS层Worker类的构造函数。
constructor(filename, options = {}) { super(); // 忽略一系列参数处理,new Worker就是上面提到的C++层的 this[kHandle] = new Worker(url, options.execArgv, parseResourceLimits(options.resourceLimits)); // messagePort指向_parent_port this[kPort] = this[kHandle].messagePort; this[kPort].on('message', (data) => this[kOnMessage](data)); // 开始接收消息 this[kPort].start(); // 申请一个通信通道,两个端口 const { port1, port2 } = new MessageChannel(); this[kPublicPort] = port1; this[kPublicPort].on('message', (message) => this.emit('message', message)); // 向另一端发送消息 this[kPort].postMessage({ argv, type: messageTypes.LOAD_SCRIPT, filename, doEval: !!options.eval, cwdCounter: cwdCounter || workerIo.sharedCwdCounter, workerData: options.workerData, publicPort: port2, manifestSrc: getOptionValue('--experimental-policy') ? require('internal/process/policy').src : null, hasStdin: !!options.stdin }, [port2]); // 开启线程 this[kHandle].startThread(); }
上面的代码主要逻辑如下 1 保存messagePort,监听该端口的message事件,然后给messagePort的对端发送消息,但是这时候还没有接收端口,所以消息会缓存到MessagePortData,即child_portdata 中。另外我们看到主线程把通信端口port2发送给了子线程。 2 申请一个通信通道port1和port2,用于主线程和子线程通信。_parent_port和child_port是给Node.js使用的,新申请的端口是给用户使用的。 3 创建子线程。 我们看创建线程的时候,做了什么。
void Worker::StartThread(const FunctionCallbackInfo<Value>& args) { Worker* w; ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); Mutex::ScopedLock lock(w->mutex_); // The object now owns the created thread and should not be garbage collected // until that finishes. w->ClearWeak(); // 加入主线程维护的子线程数据结构 w->env()->add_sub_worker_context(w); w->stopped_ = false; w->thread_joined_ = false; // 是否需要阻塞事件循环退出,默认true if (w->has_ref_) w->env()->add_refs(1); // 是否需要栈和栈大小 uv_thread_options_t thread_options; thread_options.flags = UV_THREAD_HAS_STACK_SIZE; thread_options.stack_size = kStackSize; // 创建线程 CHECK_EQ(uv_thread_create_ex(&w->tid_, &thread_options, [](void* arg) { Worker* w = static_cast<Worker*>(arg); const uintptr_t stack_top = reinterpret_cast<uintptr_t>(&arg); w->stack_base_ = stack_top - (kStackSize - kStackBufferSize); // 执行主逻辑 w->Run(); Mutex::ScopedLock lock(w->mutex_); // 给主线程提交一个任务,通知主线程子线程执行完毕,因为主线程不能直接执行join阻塞自己 w->env()->SetImmediateThreadsafe( [w = std::unique_ptr<Worker>(w)](Environment* env) { if (w->has_ref_) env->add_refs(-1); w->JoinThread(); // implicitly delete w }); }, static_cast<void*>(w)), 0); }
StartThread新建了一个子线程,然后在子线程中执行Run,我们继续看Run
void Worker::Run() { // 线程执行所需要的数据结构,比如loop,isolate,和主线程独立 WorkerThreadData data(this); { Locker locker(isolate_); Isolate::Scope isolate_scope(isolate_); SealHandleScope outer_seal(isolate_); // std::unique_ptr<Environment, FreeEnvironment> env_; DeleteFnPtr<Environment, FreeEnvironment> env_; // 线程执行完后执行的清除函数 auto cleanup_env = OnScopeLeave([&]() { // ... }); { HandleScope handle_scope(isolate_); Local<Context> context; // 新建一个context,和主线程独立 context = NewContext(isolate_); Context::Scope context_scope(context); { // 新建一个env并初始化,env中会和新的context关联 env_.reset(new Environment(data.isolate_data_.get(), context, std::move(argv_), std::move(exec_argv_), Environment::kNoFlags, thread_id_)); env_->set_env_vars(std::move(env_vars_)); env_->set_abort_on_uncaught_exception(false); env_->set_worker_context(this); env_->InitializeLibuv(start_profiler_idle_notifier_); } { Mutex::ScopedLock lock(mutex_); // 更新子线程所属的env this->env_ = env_.get(); } { if (!env_->RunBootstrapping().IsEmpty()) { CreateEnvMessagePort(env_.get()); USE(StartExecution(env_.get(), "internal/main/worker_thread")); } } { SealHandleScope seal(isolate_); bool more; // 开始事件循环 do { if (is_stopped()) break; uv_run(&data.loop_, UV_RUN_DEFAULT); if (is_stopped()) break; platform_->DrainTasks(isolate_); more = uv_loop_alive(&data.loop_); if (more && !is_stopped()) continue; EmitBeforeExit(env_.get()); more = uv_loop_alive(&data.loop_); } while (more == true && !is_stopped()); } } }
我们分步骤分析上面的代码 1 新建Isolate、context和Environment,子线程在独立的环境执行。然后初始化Environment。这个在Node.js启动过程章节已经分析过,不再分析。 2 更新子线程的env_。刚才已经分析过,Worker类中定义了env_属性,所以这里通过this.env_更新时,是不会影响基类(BaseObject)中的值的。因为子线程是在新的环境执行的,所以在新环境中使用该Worker实例时,需要使用新的环境变量。而在主线程使用该Worker实例时,是通过BaseObject的env()访问的。从而获取的是主线程的环境。因为Worker实例是在主线程和子线程之间共享的,Node.js在Worker类中重新定义了一个env_属性正是为了解决这个问题。 3 CreateEnvMessagePort
void Worker::CreateEnvMessagePort(Environment* env) { child_port_ = MessagePort::New(env, env->context(), std::move(child_port_data_)); if (child_port_ != nullptr) env->set_message_port(child_port_->object(isolate_)); }
child_port_data_这个变量刚才我们已经看到过,在这里首先申请一个新的端口。并且和child_port_data_互相关联起来。然后在env缓存起来。后续会使用。这时候的关系图如图14-4所示。 图14-4
4 执行internal/main/worker_thread.js
// 设置process对象 patchProcessObject(); // 获取刚才缓存的端口child_port_ onst port = getEnvMessagePort(); port.on('message', (message) => { // 加载脚本 if (message.type === LOAD_SCRIPT) { const { argv, cwdCounter, filename, doEval, workerData, publicPort, manifestSrc, manifestURL, hasStdin } = message; const CJSLoader = require('internal/modules/cjs/loader'); loadPreloadModules(); /* 由主线程申请的MessageChannel中某一端的端口, 主线程传递过来的,保存用于和主线程通信 */ publicWorker.parentPort = publicPort; // 执行时使用的数据 publicWorker.workerData = workerData; // 通知主线程,正在执行脚本 port.postMessage({ type: UP_AND_RUNNING }); // 执行new Worker(filename)时传入的文件 CJSLoader.Module.runMain(filename); }) // 开始接收消息 port.start()
我们看到worker_thread.js中通过runMain完成了子线程的代码执行,然后开始事件循环。 我们看一下当事件循环结束时,Node.js的逻辑。
// 给主线程提交一个任务,通知主线程子线程执行完毕,因为主线程不能直接执行join阻塞自己 w->env()->SetImmediateThreadsafe( [w = std::unique_ptr<Worker>(w)](Environment* env) { if (w->has_ref_) env->add_refs(-1); w->JoinThread(); // implicitly delete w }); }, static_cast<void*>(w)), 0);
通过w->env()获取的是主线程的执行环境。我们看一下SetImmediateThreadsafe。
template <typename Fn> void Environment::SetImmediateThreadsafe(Fn&& cb) { auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>( std::move(cb), false); { Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_); native_immediates_threadsafe_.Push(std::move(callback)); } uv_async_send(&task_queues_async_); }
SetImmediateThreadsafe用于通知执行环境所在的事件循环有异步任务完成。并且是线程安全的。因为可能有多个线程会操作native_immediatesthreadsafe。在主线程事件循环的Poll IO阶段就会执行task_queues_async_回调。我们看一下task_queues_async_对应的回调。
uv_async_init( event_loop(), &task_queues_async_, [](uv_async_t* async) { Environment* env = ContainerOf( &Environment::task_queues_async_, async); env->CleanupFinalizationGroups(); env->RunAndClearNativeImmediates(); });
所以在Poll IO阶段执行的回调是RunAndClearNativeImmediates
void Environment::RunAndClearNativeImmediates(bool only_refed) { TraceEventScope trace_scope(TRACING_CATEGORY_NODE1(environment), "RunAndClearNativeImmediates", this); size_t ref_count = 0; if (native_immediates_threadsafe_.size() > 0) { Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_); native_immediates_.ConcatMove(std::move(native_immediates_threadsafe_)); } auto drain_list = [&]() { TryCatchScope try_catch(this); DebugSealHandleScope seal_handle_scope(isolate()); while (std::unique_ptr<NativeImmediateCallback> head = native_immediates_.Shift()) { if (head->is_refed()) ref_count++; if (head->is_refed() || !only_refed) // 执行回调 head->Call(this); head.reset(); }; }
RunAndClearNativeImmediates会执行队列里的回调。对应Worker的JoinThread
void Worker::JoinThread() { // 阻塞等待子线程结束,执行到这子线程已经结束了 CHECK_EQ(uv_thread_join(&tid_), 0); thread_joined_ = true; // 从主线程数据结构中删除该线程对应的实例 env()->remove_sub_worker_context(this); { HandleScope handle_scope(env()->isolate()); Context::Scope context_scope(env()->context()); // Reset the parent port as we're closing it now anyway. object()->Set(env()->context(), env()->message_port_string(), Undefined(env()->isolate())).Check(); // 子线程退出码 Local<Value> args[] = { Integer::New(env()->isolate(), exit_code_), custom_error_ != nullptr ? OneByteString(env()->isolate(), custom_error_).As<Value>() : Null(env()->isolate()).As<Value>(), }; // 执行JS层回调,触发exit事件 MakeCallback(env()->onexit_string(), arraysize(args), args); } }
最后我们看一下如果结束正在执行的子线程。在JS中我能可以通过terminate函数终止线程的执行。
terminate(callback) { this[kHandle].stopThread(); } Terminate是对C++模块stopThread的封装。 void Worker::StopThread(const FunctionCallbackInfo<Value>& args) { Worker* w; ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); w->Exit(1); } void Worker::Exit(int code) { Mutex::ScopedLock lock(mutex_); // env_是子线程执行的env if (env_ != nullptr) { exit_code_ = code; Stop(env_); } else { stopped_ = true; } } int Stop(Environment* env) { env->ExitEnv(); return 0; } void Environment::ExitEnv() { set_can_call_into_js(false); set_stopping(true); isolate_->TerminateExecution(); SetImmediateThreadsafe([](Environment* env) { uv_stop(env->event_loop()); }); }
我们看到主线程最终通过SetImmediateThreadsafe给子线程所属的env提交了一个任务。子线程在Poll IO阶段会设置停止事件循环的标记,等到下一次事件循环开始的时候,就会跳出事件循环从而结束子线程的执行。
本节我们看一下线程间通信的过程。
我们知道isMainThread在子线程里是false,parentPort就是messageChannel中的一端。用于和主线程通信,所以parentPort.postMessage给对端发送消息,就是给主线程发送消息,我们再看看worker.postMessage('Hello, world!')。
postMessage(...args) { this[kPublicPort].postMessage(...args); }
kPublicPort指向的就是messageChannel的一端。this[kPublicPort].postMessage(...args)即给另一端发送消息。我们看一下postMessage的实现。
void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) { Environment* env = Environment::GetCurrent(args); Local<Object> obj = args.This(); Local<Context> context = obj->CreationContext(); TransferList transfer_list; if (args[1]->IsObject()) { // 处理transfer_list } // 拿到JS层使用的对象所关联的MessagePort MessagePort* port = Unwrap<MessagePort>(args.This()); port->PostMessage(env, args[0], transfer_list); }
我们接着看port->PostMessage
Maybe<bool> MessagePort::PostMessage(Environment* env, Local<Value> message_v, const TransferList& transfer_v) { Isolate* isolate = env->isolate(); Local<Object> obj = object(isolate); Local<Context> context = obj->CreationContext(); Message msg; // 序列化 Maybe<bool> serialization_maybe = msg.Serialize(env, context, message_v, transfer_v, obj); // 拿到操作对端sibling的锁 Mutex::ScopedLock lock(*data_->sibling_mutex_); // 把消息插入到对端队列 data_->sibling_->AddToIncomingQueue(std::move(msg)); return Just(true); }
PostMessage通过AddToIncomingQueue把消息插入对端的消息队列我们看一下AddToIncomingQueue
void MessagePortData::AddToIncomingQueue(Message&& message) { // 加锁操作消息队列 Mutex::ScopedLock lock(mutex_); incoming_messages_.emplace_back(std::move(message)); // 通知owner if (owner_ != nullptr) { owner_->TriggerAsync(); } }
插入消息队列后,如果有关联的端口,则会通知Libuv。我们继续看TriggerAsync。
void MessagePort::TriggerAsync() { if (IsHandleClosing()) return; CHECK_EQ(uv_async_send(&async_), 0); }
Libuv在Poll IO阶段就会执行对应的回调。回调是在new MessagePort时设置的。
auto onmessage = [](uv_async_t* handle) { MessagePort* channel = ContainerOf(&MessagePort::async_, handle); channel->OnMessage(); }; // 初始化async结构体,实现异步通信 CHECK_EQ(uv_async_init(env->event_loop(), &async_, onmessage), 0);
我们继续看OnMessage。
void MessagePort::OnMessage() { HandleScope handle_scope(env()->isolate()); Local<Context> context = object(env()->isolate())->CreationContext(); // 接收消息条数的阈值 size_t processing_limit; { // 加锁操作消息队列 Mutex::ScopedLock(data_->mutex_); processing_limit = std::max(data_->incoming_messages_.size(), static_cast<size_t>(1000)); } while (data_) { // 读取的条数达到阈值,通知Libuv下一轮Poll IO阶段继续读 if (processing_limit-- == 0) { // 通知事件循环 TriggerAsync(); return; } HandleScope handle_scope(env()->isolate()); Context::Scope context_scope(context); Local<Value> payload; // 读取消息 if (!ReceiveMessage(context, true).ToLocal(&payload)) break; // 没有了 if (payload == env()->no_message_symbol()) break; Local<Object> event; Local<Value> cb_args[1]; // 新建一个MessageEvent对象,回调onmessage事件 if (!env()->message_event_object_template()->NewInstance(context) .ToLocal(&event) || event->Set(context, env()->data_string(), payload).IsNothing() || event->Set(context, env()->target_string(), object()).IsNothing() || (cb_args[0] = event, false) || MakeCallback(env()->onmessage_string(), arraysize(cb_args), cb_args).IsEmpty()) { // 如果回调失败,通知Libuv下次继续读 if (data_) TriggerAsync(); return; } } }
我们看到这里会不断地调用ReceiveMessage读取数据,然后回调JS层。直到达到阈值或者回调失败。我们看一下ReceiveMessage的逻辑。
MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context, bool only_if_receiving) { Message received; { // Get the head of the message queue. // 互斥访问消息队列 Mutex::ScopedLock lock(data_->mutex_); bool wants_message = receiving_messages_ || !only_if_receiving; // 没有消息、不需要接收消息、消息是关闭消息 if (data_->incoming_messages_.empty() || (!wants_message && !data_->incoming_messages_.front().IsCloseMessage())) { return env()->no_message_symbol(); } // 获取队列第一个消息 received = std::move(data_->incoming_messages_.front()); data_->incoming_messages_.pop_front(); } // 是关闭消息则关闭端口 if (received.IsCloseMessage()) { Close(); return env()->no_message_symbol(); } // 反序列化后返回 return received.Deserialize(env(), context); }
ReceiveMessage会消息进行反序列化返回。以上就是线程间通信的整个过程。具体步骤如图14-5所示。 图14-5
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8