本章介绍Node.js中C++层的一些核心模块的原理和实现,这些模块是Node.js中很多模块都会使用的。理解这些模块的原理,才能更好地理解在Node.js中,JS是如何通过C++层调用Libuv,又是如何从Libuv返回的。
BaseObject是C++层大多数类的基类。
class BaseObject : public MemoryRetainer { public: // … private: v8::Local<v8::Object> WrappedObject() const override; // 指向封装的对象 v8::Global<v8::Object> persistent_handle_; Environment* env_; };
BaseObject的实现很复杂,这里只介绍常用的一些实现。
// 把对象存储到persistent_handle_中,必要的时候通过object()取出来 BaseObject::BaseObject(Environment* env, v8::Local<v8::Object> object) : persistent_handle_(env->isolate(), object), env_(env) { // 把this存到object中 object->SetAlignedPointerInInternalField(0, static_cast<void*>(this)); }
构造函数用于保存对象间的关系(JS使用的对象和与其关系的C++层对象,下图中的对象即我们平时在JS层使用C++模块创建的对象,比如new TCP())。后面我们可以看到用处,关系如图6-1所示。 图6-1
v8::Local<v8::Object> BaseObject::object() const { return PersistentToLocal::Default(env()->isolate(), persistent_handle_); }
// 通过obj取出里面保存的BaseObject对象 BaseObject* BaseObject::FromJSObject(v8::Local<v8::Object> obj) { return static_cast<BaseObject*>(obj->GetAlignedPointerFromInternalField(0)); } template <typename T> T* BaseObject::FromJSObject(v8::Local<v8::Object> object) { return static_cast<T*>(FromJSObject(object)); }
// 从obj中取出对应的BaseObject对象 template <typename T> inline T* Unwrap(v8::Local<v8::Object> obj) { return BaseObject::FromJSObject<T>(obj); } // 从obj中获取对应的BaseObject对象,如果为空则返回第三个参数的值(默认值) #define ASSIGN_OR_RETURN_UNWRAP(ptr, obj, ...) \ do { \ *ptr = static_cast<typename std::remove_reference<decltype(*ptr)>::type>( \ BaseObject::FromJSObject(obj)); \ if (*ptr == nullptr) \ return __VA_ARGS__; \ } while (0)
AsyncWrap实现async_hook的模块,不过这里我们只关注它回调JS的功能。
inline v8::MaybeLocal<v8::Value> AsyncWrap::MakeCallback( const v8::Local<v8::Name> symbol, int argc, v8::Local<v8::Value>* argv) { v8::Local<v8::Value> cb_v; // 根据字符串表示的属性值,从对象中取出该属性对应的值。是个函数 if (!object()->Get(env()->context(), symbol).ToLocal(&cb_v)) return v8::MaybeLocal<v8::Value>(); // 是个函数 if (!cb_v->IsFunction()) { return v8::MaybeLocal<v8::Value>(); } // 回调,见async_wrap.cc return MakeCallback(cb_v.As<v8::Function>(), argc, argv); }
以上只是入口函数,我们看看真正的实现。
MaybeLocal<Value> AsyncWrap::MakeCallback(const Local<Function> cb, int argc, Local<Value>* argv) { MaybeLocal<Value> ret = InternalMakeCallback(env(), object(), cb, argc, argv, context); return ret; }
接着看一下InternalMakeCallback
MaybeLocal<Value> InternalMakeCallback(Environment* env, Local<Object> recv, const Local<Function> callback, int argc, Local<Value> argv[], async_context asyncContext) { // …省略其他代码 // 执行回调 callback->Call(env->context(), recv, argc, argv);}
HandleWrap是对Libuv uv_handle_t的封装,也是很多C++类的基类。
class HandleWrap : public AsyncWrap { public: // 操作和判断handle状态函数,见Libuv static void Close(const v8::FunctionCallbackInfo<v8::Value>& args); static void Ref(const v8::FunctionCallbackInfo<v8::Value>& args); static void Unref(const v8::FunctionCallbackInfo<v8::Value>& args); static void HasRef(const v8::FunctionCallbackInfo<v8::Value>& args); static inline bool IsAlive(const HandleWrap* wrap) { return wrap != nullptr && wrap->state_ != kClosed; } static inline bool HasRef(const HandleWrap* wrap) { return IsAlive(wrap) && uv_has_ref(wrap->GetHandle()); } // 获取封装的handle inline uv_handle_t* GetHandle() const { return handle_; } // 关闭handle,关闭成功后执行回调 virtual void Close( v8::Local<v8::Value> close_callback = v8::Local<v8::Value>()); static v8::Local<v8::FunctionTemplate> GetConstructorTemplate( Environment* env); protected: HandleWrap(Environment* env, v8::Local<v8::Object> object, uv_handle_t* handle, AsyncWrap::ProviderType provider); virtual void OnClose() {} // handle状态 inline bool IsHandleClosing() const { return state_ == kClosing || state_ == kClosed; } private: friend class Environment; friend void GetActiveHandles(const v8::FunctionCallbackInfo<v8::Value>&); static void OnClose(uv_handle_t* handle); // handle队列 ListNode<HandleWrap> handle_wrap_queue_; // handle的状态 enum { kInitialized, kClosing, kClosed } state_; // 所有handle的基类 uv_handle_t* const handle_; };
Local<FunctionTemplate> HandleWrap::GetConstructorTemplate(Environment* env) { Local<FunctionTemplate> tmpl = env->handle_wrap_ctor_template(); if (tmpl.IsEmpty()) { tmpl = env->NewFunctionTemplate(nullptr); tmpl->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "HandleWrap")); tmpl->Inherit(AsyncWrap::GetConstructorTemplate(env)); env->SetProtoMethod(tmpl, "close", HandleWrap::Close); env->SetProtoMethodNoSideEffect(tmpl, "hasRef", HandleWrap::HasRef); env->SetProtoMethod(tmpl, "ref", HandleWrap::Ref); env->SetProtoMethod(tmpl, "unref", HandleWrap::Unref); env->set_handle_wrap_ctor_template(tmpl); } return tmpl; } /* object为C++层为JS层提供的对象 handle为子类具体的handle类型,不同模块不一样 */ HandleWrap::HandleWrap(Environment* env, Local<Object> object, uv_handle_t* handle, AsyncWrap::ProviderType provider) : AsyncWrap(env, object, provider), state_(kInitialized), handle_(handle) { // 保存Libuv handle和C++对象的关系 handle_->data = this; HandleScope scope(env->isolate()); CHECK(env->has_run_bootstrapping_code()); // 插入handle队列 env->handle_wrap_queue()->PushBack(this); }
HandleWrap继承BaseObject类,初始化后关系图如图6-2所示。 图6-2
// 修改handle为活跃状态 void HandleWrap::Ref(const FunctionCallbackInfo<Value>& args) { HandleWrap* wrap; ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); if (IsAlive(wrap)) uv_ref(wrap->GetHandle()); } // 修改hande为不活跃状态 void HandleWrap::Unref(const FunctionCallbackInfo<Value>& args) { HandleWrap* wrap; ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); if (IsAlive(wrap)) uv_unref(wrap->GetHandle()); } // 判断handle是否处于活跃状态 void HandleWrap::HasRef(const FunctionCallbackInfo<Value>& args) { HandleWrap* wrap; ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); args.GetReturnValue().Set(HasRef(wrap)); }
// 关闭handle(JS层调用),成功后执行回调 void HandleWrap::Close(const FunctionCallbackInfo<Value>& args) { HandleWrap* wrap; ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); // 传入回调 wrap->Close(args[0]); } // 真正关闭handle的函数 void HandleWrap::Close(Local<Value> close_callback) { // 正在关闭或已经关闭 if (state_ != kInitialized) return; // 调用Libuv函数 uv_close(handle_, OnClose); // 关闭中 state_ = kClosing; // 传了回调则保存起来 if (!close_callback.IsEmpty() && close_callback->IsFunction() && !persistent().IsEmpty()) { object()->Set(env()->context(), env()->handle_onclose_symbol(), close_callback).Check(); } } // 关闭handle成功后回调 void HandleWrap::OnClose(uv_handle_t* handle) { BaseObjectPtr<HandleWrap> wrap { static_cast<HandleWrap*>(handle->data) }; wrap->Detach(); Environment* env = wrap->env(); HandleScope scope(env->isolate()); Context::Scope context_scope(env->context()); wrap->state_ = kClosed; wrap->OnClose(); wrap->handle_wrap_queue_.Remove(); // 有onclose回调则执行 if (!wrap->persistent().IsEmpty() && wrap->object()->Has(env->context(), env->handle_onclose_symbol()) .FromMaybe(false)) { wrap->MakeCallback(env->handle_onclose_symbol(), 0, nullptr); } }
ReqWrap表示通过Libuv对handle的一次请求。
class ReqWrapBase { public: explicit inline ReqWrapBase(Environment* env); virtual ~ReqWrapBase() = default; virtual void Cancel() = 0; virtual AsyncWrap* GetAsyncWrap() = 0; private: // 一个带前后指针的节点 ListNode<ReqWrapBase> req_wrap_queue_; };
ReqWrapBase主要是定义接口的协议。我们看一下ReqWrapBase的实现
ReqWrapBase::ReqWrapBase(Environment* env) { env->req_wrap_queue()->PushBack(this); }
ReqWrapBase初始化的时候,会把自己加到env对象的req队列中。
template <typename T> class ReqWrap : public AsyncWrap, public ReqWrapBase { public: inline ReqWrap(Environment* env, v8::Local<v8::Object> object, AsyncWrap::ProviderType provider); inline ~ReqWrap() override; inline void Dispatched(); inline void Reset(); T* req() { return &req_; } inline void Cancel() final; inline AsyncWrap* GetAsyncWrap() override; static ReqWrap* from_req(T* req); template <typename LibuvFunction, typename... Args> // 调用Libuv inline int Dispatch(LibuvFunction fn, Args... args); public: typedef void (*callback_t)(); callback_t original_callback_ = nullptr; protected: T req_; }; }
我们看一下实现
template <typename T> ReqWrap<T>::ReqWrap(Environment* env, v8::Local<v8::Object> object, AsyncWrap::ProviderType provider) : AsyncWrap(env, object, provider), ReqWrapBase(env) { // 初始化状态 Reset(); } // 保存libuv数据结构和ReqWrap实例的关系 template <typename T> void ReqWrap<T>::Dispatched() { req_.data = this; } // 重置字段 template <typename T> void ReqWrap<T>::Reset() { original_callback_ = nullptr; req_.data = nullptr; } // 通过req成员找所属对象的地址 template <typename T> ReqWrap<T>* ReqWrap<T>::from_req(T* req) { return ContainerOf(&ReqWrap<T>::req_, req); } // 取消线程池中的请求 template <typename T> void ReqWrap<T>::Cancel() { if (req_.data == this) uv_cancel(reinterpret_cast<uv_req_t*>(&req_)); } template <typename T> AsyncWrap* ReqWrap<T>::GetAsyncWrap() { return this; } // 调用Libuv函数 template <typename T> template <typename LibuvFunction, typename... Args> int ReqWrap<T>::Dispatch(LibuvFunction fn, Args... args) { Dispatched(); int err = CallLibuvFunction<T, LibuvFunction>::Call( // Libuv函数 fn, env()->event_loop(), req(), MakeLibuvRequestCallback<T, Args>::For(this, args)...); if (err >= 0) env()->IncreaseWaitingRequestCounter(); return err; }
我们看到ReqWrap抽象了请求Libuv的过程,具体设计的数据结构由子类实现。我们看一下某个子类的实现。
// 请求Libuv时,数据结构是uv_connect_t,表示一次连接请求 class ConnectWrap : public ReqWrap<uv_connect_t> { public: ConnectWrap(Environment* env, v8::Local<v8::Object> req_wrap_obj, AsyncWrap::ProviderType provider); };
JS调用C++模块是V8提供的能力,Node.js是使用了这个能力。这样我们只需要面对JS,剩下的事情交给Node.js就行。本文首先讲一下利用V8如何实现JS调用C++,然后再讲一下Node.js是怎么做的。
1 JS调用C++ 首先介绍一下V8中两个非常核心的类FunctionTemplate和ObjectTemplate。顾名思义,这两个类是定义模板的,好比建房子时的设计图一样,通过设计图,我们就可以造出对应的房子。V8也是,定义某种模板,就可以通过这个模板创建出对应的实例。下面介绍一下这些概念(为了方便,下面都是伪代码)。
1.1 定义一个函数模板
Local<FunctionTemplate> functionTemplate = v8::FunctionTemplate::New(isolate(), New); // 定义函数的名字 functionTemplate->SetClassName(‘TCP’)
首先定义一个FunctionTemplate对象。我们看到FunctionTemplate的第二个入参是一个函数,当我们执行由FunctionTemplate创建的函数时,v8就会执行New函数。当然我们也可以不传。 1.2 定义函数模板的prototype内容 prototype就是JS里的function.prototype。如果你理解JS里的知识,就很容易理解C++的代码。
v8::Local<v8::FunctionTemplate> t = v8::FunctionTemplate::New(isolate(), callback); t->SetClassName('test'); // 在prototype上定义一个属性 t->PrototypeTemplate()->Set('hello', 'world');
1.3 定义函数模板对应的实例模板的内容 实例模板就是一个ObjectTemplate对象。它定义了,当以new的方式执行由函数模板创建出来的函数时,返回值所具有的属性。
function A() { this.a = 1; this.b = 2; } new A();
实例模板类似上面代码中A函数里面的代码。我们看看在V8里怎么定义。
t->InstanceTemplate()->Set(key, val); t->InstanceTemplate()->SetInternalFieldCount(1);
InstanceTemplate返回的是一个ObjectTemplate对象。SetInternalFieldCount这个函数比较特殊,也是比较重要的一个地方,我们知道对象就是一块内存,对象有它自己的内存布局,我们知道在C++里,我们定义一个类,也就定义了对象的布局。比如我们有以下定义。
class demo { private: int a; int b; };
在内存中布局如图6-3所示。 图6-3 上面这种方式有个问题,就是类定义之后,内存布局就固定了。而V8是自己去控制对象的内存布局的。当我们在V8中定义一个类的时候,是没有任何属性的。我们看一下V8中HeapObject类的定义。
class HeapObject: public Object { static const int kMapOffset = Object::kSize; // Object::kSize是0 static const int kSize = kMapOffset + kPointerSize; };
这时候的内存布局如下。 然后我们再看一下HeapObject子类HeapNumber的定义。
class HeapNumber: public HeapObject { // kSize之前的空间存储map对象的指针 static const int kValueOffset = HeapObject::kSize; // kValueOffset - kSize之间存储数字的值 static const int kSize = kValueOffset + kDoubleSize; };
内存布局如图6-4所示。 图6-4
我们发现这些类只有几个类变量,类变量是不保存在对象内存空间的。这些类变量就是定义了对象每个域所占内存空间的信息,当我们定义一个HeapObject对象的时候,V8首先申请一块内存,然后把这块内存首地址强行转成对应对象的指针。然后通过类变量对属性的内存进行存取。我们看看在V8里如何申请一个HeapNumber对象
Object* Heap::AllocateHeapNumber(double value, PretenureFlag pretenure) { // 在哪个空间分配内存,比如新生代,老生代 AllocationSpace space = (pretenure == TENURED) ? CODE_SPACE : NEW_SPACE; // 在space上分配一个HeapNumber对象大小的内存 Object* result = AllocateRaw(HeapNumber::kSize, space); /* 转成HeapObect,设置map属性,map属性是表示对象类型、大小等信息的 */ HeapObject::cast(result)->set_map(heap_number_map()); // 转成HeapNumber对象 HeapNumber::cast(result)->set_value(value); return result; }
回到对象模板的问题。我们看一下对象模板的定义。
class TemplateInfo: public Struct { static const int kTagOffset = HeapObject::kSize; static const int kPropertyListOffset = kTagOffset + kPointerSize; static const int kHeaderSize = kPropertyListOffset + kPointerSize; }; class ObjectTemplateInfo: public TemplateInfo { static const int kConstructorOffset = TemplateInfo::kHeaderSize; static const int kInternalFieldCountOffset = kConstructorOffset + kPointerSize; static const int kSize = kInternalFieldCountOffset + kHeaderSize; };
内存布局如图6-5所示。 图6-5
回到对象模板的问题,我们看看Set(key, val)做了什么。
void Template::Set(v8::Handle<String> name, v8::Handle<Data> value, v8::PropertyAttribute attribute) { // ... i::Handle<i::Object> list(Utils::OpenHandle(this)->property_list()); NeanderArray array(list); array.add(Utils::OpenHandle(*name)); array.add(Utils::OpenHandle(*value)); array.add(Utils::OpenHandle(*v8::Integer::New(attribute))); }
上面的代码大致就是给一个list后面追加一些内容。我们看看这个list是怎么来的,即property_list函数的实现。
// 读取对象中某个属性的值 #define READ_FIELD(p, offset) (*reinterpret_cast<Object**>(FIELD_ADDR(p, offset)) static Object* cast(Object* value) { return value; } Object* TemplateInfo::property_list() { return Object::cast(READ_FIELD(this, kPropertyListOffset)); }
从上面代码中我们知道,内部布局如图6-6所示。 图6-6
根据内存布局,我们知道property_list的值是list指向的值。所以Set(key, val)操作的内存并不是对象本身的内存,对象利用一个指针指向一块内存保存Set(key, val)的值。SetInternalFieldCount函数就不一样了,它会影响(扩张)对象本身的内存。我们来看一下它的实现。
void ObjectTemplate::SetInternalFieldCount(int value) { // 修改的是kInternalFieldCountOffset对应的内存的值 Utils::OpenHandle(this)->set_internal_field_count(i::Smi::FromInt(value)); }
我们看到SetInternalFieldCount函数的实现很简单,就是在对象本身的内存中保存一个数字。接下来我们看看这个字段的使用。后面会详细介绍它的用处。
Handle<JSFunction> Factory::CreateApiFunction( Handle<FunctionTemplateInfo> obj, bool is_global) { int internal_field_count = 0; if (!obj->instance_template()->IsUndefined()) { // 获取函数模板的实例模板 Handle<ObjectTemplateInfo> instance_template = Handle<ObjectTemplateInfo>(ObjectTemplateInfo::cast(obj->instance_template())); // 获取实例模板的internal_field_count字段的值(通过SetInternalFieldCount设置的那个值) internal_field_count = Smi::cast(instance_template->internal_field_count())->value(); } // 计算新建对象需要的空间,如果 int instance_size = kPointerSize * internal_field_count; if (is_global) { instance_size += JSGlobalObject::kSize; } else { instance_size += JSObject::kHeaderSize; } InstanceType type = is_global ? JS_GLOBAL_OBJECT_TYPE : JS_OBJECT_TYPE; // 新建一个函数对象 Handle<JSFunction> result = Factory::NewFunction(Factory::empty_symbol(), type, instance_size, code, true); }
我们看到internal_field_count的值的意义是,会扩张对象的内存,比如一个对象本身只有n字节,如果定义internal_field_count的值是1,对象的内存就会变成n+internal_field_count * 一个指针的字节数。内存布局如图6-7所示。 图6-7 1.4 通过函数模板创建一个函数 Local functionTemplate = v8::FunctionTemplate::New(isolate(), New); global->Set('demo', functionTemplate ->GetFunction()); 这样我们就可以在JS里直接调用demo这个变量,然后对应的函数就会被执行。这就是JS调用C++的原理。
2 Node.js是如何处理JS调用C++问题的 我们以TCP模块为例。
const { TCP } = process.binding('tcp_wrap'); new TCP(...);
Node.js通过定义一个全局变量process统一处理C++模块的调用,具体参考模块加载章节的内容。在Node.js中,C++模块(类)一般只会定义对应的Libuv结构体和一系列类函数,然后创建一个函数模版,并传入一个回调,接着把这些类函数挂载到函数模板中,最后通过函数模板返回一个函数F给JS层使用,翻译成JS大致如下
// Libuv function uv_tcp_connect(uv_tcp_t, addr,cb) { cb(); } // C++ class TCPWrap { uv_tcp_t = {}; static Connect(cb) { const tcpWrap = this[0]; uv_tcp_connect( tcpWrap.uv_tcp_t, {ip: '127.0.0.1', port: 80}, () => { cb(); } ); } } function FunctionTemplate(cb) { function Tmp() { Object.assign(this, map); cb(this); } const map = {}; return { PrototypeTemplate: function() { return { set: function(k, v) { Tmp.prototype[k] = v; } } }, InstanceTemplate: function() { return { set: function(k, v) { map[k] = v; } } }, GetFunction() { return Tmp; } } } const TCPFunctionTemplate = FunctionTemplate((target) => { target[0] = new TCPWrap(); }) TCPFunctionTemplate.PrototypeTemplate().set('connect', TCPWrap.Connect); TCPFunctionTemplate.InstanceTemplate().set('name', 'hi'); const TCP = TCPFunctionTemplate.GetFunction(); // js const tcp = new TCP(); tcp.connect(() => { console.log('连接成功'); }); tcp.name;
我们从C++的层面分析执行new TCP()的逻辑,然后再分析connect的逻辑,这两个逻辑涉及的机制是其它C++模块也会使用到的。因为TCP对应的函数是Initialize函数里的t->GetFunction()对应的值。所以new TCP()的时候,V8首先会创建一个C++对象,然后执行New函数。
void TCPWrap::New(const FunctionCallbackInfo<Value>& args) { Environment* env = Environment::GetCurrent(args); int type_value = args[0].As<Int32>()->Value(); TCPWrap::SocketType type = static_cast<TCPWrap::SocketType>(type_value); ProviderType provider; switch (type) { case SOCKET: provider = PROVIDER_TCPWRAP; break; case SERVER: provider = PROVIDER_TCPSERVERWRAP; break; default: UNREACHABLE(); } /* args.This()为v8提供的一个C++对象(由Initialize函数定义的模块创建的) 调用该C++对象的SetAlignedPointerInInternalField(0,this)关联this(new TCPWrap()), 见HandleWrap */ new TCPWrap(env, args.This(), provider); }
我们沿着TCPWrap的继承关系,一直到HandleWrap
HandleWrap::HandleWrap(Environment* env, Local<Object> object, uv_handle_t* handle, AsyncWrap::ProviderType provider) : AsyncWrap(env, object, provider), state_(kInitialized), handle_(handle) { // 保存Libuv handle和C++对象的关系 handle_->data = this; HandleScope scope(env->isolate()); // 插入handle队列 env->handle_wrap_queue()->PushBack(this); }
HandleWrap首先保存了Libuv结构体和C++对象的关系。然后我们继续沿着AsyncWrap分析,AsyncWrap继承BaseObject,我们直接看BaseObject。
// 把对象存储到persistent_handle_中,必要的时候通过object()取出来 BaseObject::BaseObject(Environment* env, v8::Local<v8::Object> object) : persistent_handle_(env->isolate(), object), env_(env) { // 把this存到object中 object->SetAlignedPointerInInternalField(0, static_cast<void*>(this)); env->AddCleanupHook(DeleteMe, static_cast<void*>(this)); env->modify_base_object_count(1); }
我们看SetAlignedPointerInInternalField。
void v8::Object::SetAlignedPointerInInternalField(int index, void* value) { i::Handle<i::JSReceiver> obj = Utils::OpenHandle(this); i::Handle<i::JSObject>::cast(obj)->SetEmbedderField( index, EncodeAlignedAsSmi(value, location)); } void JSObject::SetEmbedderField(int index, Smi* value) { // GetHeaderSize为对象固定布局的大小,kPointerSize * index为拓展的内存大小,根据索引找到对应位置 int offset = GetHeaderSize() + (kPointerSize * index); // 写对应位置的内存,即保存对应的内容到内存 WRITE_FIELD(this, offset, value); }
SetAlignedPointerInInternalField函数展开后,做的事情就是把一个值保存到V8 C++对象的内存里。那保存的这个值是啥呢?BaseObject的入参object是由函数模板创建的对象,this是一个TCPWrap对象。所以SetAlignedPointerInInternalField函数做的事情就是把一个TCPWrap对象保存到一个函数模板创建的对象里,如图6-8所示。 图6-8
这有啥用呢?我们继续分析。这时候new TCP就执行完毕了。我们看看这时候执行tcp.connect()函数的逻辑。
template <typename T> void TCPWrap::Connect(const FunctionCallbackInfo<Value>& args, std::function<int(const char* ip_address, T* addr)> uv_ip_addr) { Environment* env = Environment::GetCurrent(args); TCPWrap* wrap; ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder(), args.GetReturnValue().Set(UV_EBADF)); // 省略部分不相关代码 args.GetReturnValue().Set(err); }
我们只需看一下ASSIGN_OR_RETURN_UNWRAP宏的逻辑。其中args.Holder()表示Connect函数的属主,根据前面的分析我们知道属主是Initialize函数定义的函数模板创建出来的对象。这个对象保存了一个TCPWrap对象。ASSIGN_OR_RETURN_UNWRAP主要的逻辑是把在C++对象中保存的那个TCPWrap对象取出来。然后就可以使用TCPWrap对象的handle去请求Libuv了。
刚才我们分析了JS调用C++层时是如何串起来的,接着我们看一下C++调用Libuv和Libuv回调C++层又是如何串起来的。我们通过TCP模块的connect函数继续分析该过程。
template <typename T> void TCPWrap::Connect(const FunctionCallbackInfo<Value>& args, std::function<int(const char* ip_address, T* addr)> uv_ip_addr) { Environment* env = Environment::GetCurrent(args); TCPWrap* wrap; ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder(), args.GetReturnValue().Set(UV_EBADF)); // 第一个参数是TCPConnectWrap对象,见net模块 Local<Object> req_wrap_obj = args[0].As<Object>(); // 第二个是ip地址 node::Utf8Value ip_address(env->isolate(), args[1]); T addr; // 把端口,IP设置到addr上,端口信息在uv_ip_addr上下文里了 int err = uv_ip_addr(*ip_address, &addr); if (err == 0) { ConnectWrap* req_wrap = new ConnectWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_TCPCONNECTWRAP); err = req_wrap->Dispatch(uv_tcp_connect, &wrap->handle_, reinterpret_cast<const sockaddr*>(&addr), AfterConnect); if (err) delete req_wrap; } args.GetReturnValue().Set(err); }
我们首先看一下ConnectWrap。我们知道ConnectWrap是ReqWrap的子类。req_wrap_obj是JS层使用的对象。New ConnectWrap后结构如图6-9所示。 图6-9
接着我们看一下Dispatch。
// 调用Libuv函数 template <typename T> template <typename LibuvFunction, typename... Args> int ReqWrap<T>::Dispatch(LibuvFunction fn, Args... args) { // 保存Libuv结构体和C++层对象ConnectWrap的关系 req_.data = this; int err = CallLibuvFunction<T, LibuvFunction>::Call( fn, env()->event_loop(), req(), MakeLibuvRequestCallback<T, Args>::For(this, args)...); if (err >= 0) env()->IncreaseWaitingRequestCounter(); return err; }
调用Libuv之前的结构如图6-10所示。 图6-10
接下来我们分析调用Libuv的具体过程。我们看到Dispatch函数是一个函数模板。 首先看一下CallLibuvFunction的实现。
template <typename ReqT, typename T> struct CallLibuvFunction; // Detect `int uv_foo(uv_loop_t* loop, uv_req_t* request, ...);`. template <typename ReqT, typename... Args> struct CallLibuvFunction<ReqT, int(*)(uv_loop_t*, ReqT*, Args...)> { using T = int(*)(uv_loop_t*, ReqT*, Args...); template <typename... PassedArgs> static int Call(T fn, uv_loop_t* loop, ReqT* req, PassedArgs... args) { return fn(loop, req, args...); } }; // Detect `int uv_foo(uv_req_t* request, ...);`. template <typename ReqT, typename... Args> struct CallLibuvFunction<ReqT, int(*)(ReqT*, Args...)> { using T = int(*)(ReqT*, Args...); template <typename... PassedArgs> static int Call(T fn, uv_loop_t* loop, ReqT* req, PassedArgs... args) { return fn(req, args...); } }; // Detect `void uv_foo(uv_req_t* request, ...);`. template <typename ReqT, typename... Args> struct CallLibuvFunction<ReqT, void(*)(ReqT*, Args...)> { using T = void(*)(ReqT*, Args...); template <typename... PassedArgs> static int Call(T fn, uv_loop_t* loop, ReqT* req, PassedArgs... args) { fn(req, args...); return 0; } };
CallLibuvFunction的实现看起来非常复杂,那是因为用了大量的模板参数,CallLibuvFunction本质上是一个struct,在C++里和类作用类似,里面只有一个类函数Call,Node.js为了适配Libuv层各种类型函数的调用,所以实现了三种类型的CallLibuvFunction,并且使用了大量的模板参数。我们只需要分析一种就可以了。我们根据TCP的connect函数开始分析。我们首先具体下Dispatch函数的模板参数。
template <typename T> template <typename LibuvFunction, typename... Args>
T对应ReqWrap的类型,LibuvFunction对应Libuv的函数类型,这里是int uv_tcp_connect(uv_connect_t* req, ...),所以是对应LibuvFunction的第二种情况,Args是执行Dispatch时除了第一个实参外的剩余参数。下面我们具体化Dispatch。
int ReqWrap<uv_connect_t>::Dispatch(int(*)(uv_connect_t*, Args...), Args... args) { req_.data = this; int err = CallLibuvFunction<uv_connect_t, int(*)(uv_connect_t*, Args...)>::Call( fn, env()->event_loop(), req(), MakeLibuvRequestCallback<T, Args>::For(this, args)...); return err; }
接着我们看一下MakeLibuvRequestCallback的实现。
// 透传参数给Libuv template <typename ReqT, typename T> struct MakeLibuvRequestCallback { static T For(ReqWrap<ReqT>* req_wrap, T v) { static_assert(!is_callable<T>::value, "MakeLibuvRequestCallback missed a callback"); return v; } }; template <typename ReqT, typename... Args> struct MakeLibuvRequestCallback<ReqT, void(*)(ReqT*, Args...)> { using F = void(*)(ReqT* req, Args... args); // Libuv回调 static void Wrapper(ReqT* req, Args... args) { // 通过Libuv结构体拿到对应的C++对象 ReqWrap<ReqT>* req_wrap = ReqWrap<ReqT>::from_req(req); req_wrap->env()->DecreaseWaitingRequestCounter(); // 拿到原始的回调执行 F original_callback = reinterpret_cast<F>(req_wrap->original_callback_); original_callback(req, args...); } static F For(ReqWrap<ReqT>* req_wrap, F v) { // 保存原来的函数 CHECK_NULL(req_wrap->original_callback_); req_wrap->original_callback_ = reinterpret_cast<typename ReqWrap<ReqT>::callback_t>(v); // 返回包裹函数 return Wrapper; } };
MakeLibuvRequestCallback的实现有两种情况,模版参数的第一个一般是ReqWrap子类,第二个一般是handle,初始化ReqWrap类的时候,env中会记录ReqWrap实例的个数,从而知道有多少个请求正在被Libuv处理,模板参数的第二个如果是函数则说明没有使用ReqWrap请求Libuv,则使用第二种实现,劫持回调从而记录正在被Libuv处理的请求数(如GetAddrInfo的实现)。所以我们这里是适配第一种实现。透传C++层参数给Libuv。我们再来看一下 Dispatch
int ReqWrap<uv_connect_t>::Dispatch(int(*)(uv_connect_t*, Args...), Args... args) { req_.data = this; int err = CallLibuvFunction<uv_connect_t, int(*)(uv_connect_t*, Args...)>::Call( fn, env()->event_loop(), req(), args...); return err; }
再进一步展开。
static int Call(int(*fn)(uv_connect_t*, Args...), uv_loop_t* loop, uv_connect_t* req, PassedArgs... args) { return fn(req, args...); }
最后展开
static int Call(int(*fn)(uv_connect_t*, Args...), uv_loop_t* loop, uv_connect_t* req, PassedArgs... args) { return fn(req, args...); } Call( uv_tcp_connect, env()->event_loop(), req(), &wrap->handle_, AfterConnec ) uv_tcp_connect( env()->event_loop(), req(), &wrap->handle_, AfterConnect );
接着我们看看uv_tcp_connect做了什么。
int uv_tcp_connect(uv_connect_t* req, uv_tcp_t* handle, const struct sockaddr* addr, uv_connect_cb cb) { // ... return uv__tcp_connect(req, handle, addr, addrlen, cb); } int uv__tcp_connect(uv_connect_t* req, uv_tcp_t* handle, const struct sockaddr* addr, unsigned int addrlen, uv_connect_cb cb) { int err; int r; // 关联起来 req->handle = (uv_stream_t*) handle; // ... }
Libuv中把req和handle做了关联,如图6-11所示。 图6-11
分析完C++调用Libuv后,我们看看Libuv回调C++和C++回调JS的过程。当Libuv处理完请求后会执行AfterConnect 。
template <typename WrapType, typename UVType> void ConnectionWrap<WrapType, UVType>::AfterConnect(uv_connect_t* req, int status) { // 从Libuv结构体拿到C++的请求对象 std::unique_ptr<ConnectWrap> req_wrap (static_cast<ConnectWrap*>(req->data)); // 从C++层请求对象拿到对应的handle结构体(Libuv里关联起来的),再通过handle拿到对应的C++层handle对象(HandleWrap关联的) WrapType* wrap = static_cast<WrapType*>(req->handle->data); Environment* env = wrap->env(); ... Local<Value> argv[5] = { Integer::New(env->isolate(), status), wrap->object(), req_wrap->object(), Boolean::New(env->isolate(), readable), Boolean::New(env->isolate(), writable) }; // 回调JS层oncomplete req_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv); }
Node.js在C++层对流进行了非常多的封装,很多模块都依赖C++层流的机制,流机制的设计中,主要有三个概念 1 资源,这是流机制的核心(StreamResource), 2 对流进行操作(StreamReq) 3 流事件的监听者,当对流进行操作或流本身有事件触发时,会把事件和相关的上下文传递给监听者,监听者处理完后,再通知流(StreamListener)。 通过继承的模式,基类定义接口,子类实现接口的方式。对流的操作进行了抽象和封装。三者的类关系如图6-12所示。 图6-12
我们看一下读一个流的数据的过程,如图6-13所示。 图6-13
再看一下写的过程,如图6-14所示。 图6-14
StreamResource定义操作流的通用逻辑和操作结束后触发的回调。但是StreamResource不定义流的类型,流的类型由子类定义,我们可以在StreamResource上注册listener,表示对流感兴趣,当流上有数据可读或者事件发生时,就会通知listener。
class StreamResource { public: virtual ~StreamResource(); // 注册/注销等待流可读事件 virtual int ReadStart() = 0; virtual int ReadStop() = 0; // 关闭流 virtual int DoShutdown(ShutdownWrap* req_wrap) = 0; // 写入流 virtual int DoTryWrite(uv_buf_t** bufs, size_t* count); virtual int DoWrite(WriteWrap* w, uv_buf_t* bufs, size_t count, uv_stream_t* send_handle) = 0; // ...忽略一些 // 给流增加或删除监听者 void PushStreamListener(StreamListener* listener); void RemoveStreamListener(StreamListener* listener); protected: uv_buf_t EmitAlloc(size_t suggested_size); void EmitRead(ssize_t nread, const uv_buf_t& buf = uv_buf_init(nullptr, 0)); // 流的监听者,即数据消费者 StreamListener* listener_ = nullptr; uint64_t bytes_read_ = 0; uint64_t bytes_written_ = 0; friend class StreamListener; };
StreamResource是一个基类,其中有一个成员是StreamListener类的实例,我们后面分析。我们看一下StreamResource的实现。 1增加一个listener
// 增加一个listener inline void StreamResource::PushStreamListener(StreamListener* listener) { // 头插法 listener->previous_listener_ = listener_; listener->stream_ = this; listener_ = listener; }
我们可以在一个流上注册多个listener,流的listener_字段维护了流上所有的listener队列。关系图如图6-15所示。 图6-15 2删除listener
inline void StreamResource::RemoveStreamListener(StreamListener* listener) { StreamListener* previous; StreamListener* current; // 遍历单链表 for (current = listener_, previous = nullptr; /* No loop condition because we want a crash if listener is not found */ ; previous = current, current = current->previous_listener_) { if (current == listener) { // 非空说明需要删除的不是第一个节点 if (previous != nullptr) previous->previous_listener_ = current->previous_listener_; else // 删除的是第一个节点,更新头指针就行 listener_ = listener->previous_listener_; break; } } // 重置被删除listener的字段 listener->stream_ = nullptr; listener->previous_listener_ = nullptr; }
3 申请存储数据
// 申请一块内存 inline uv_buf_t StreamResource::EmitAlloc(size_t suggested_size) { DebugSealHandleScope handle_scope(v8::Isolate::GetCurrent()); return listener_->OnStreamAlloc(suggested_size); }
StreamResource只是定义了操作流的通用逻辑,数据存储和消费由listener定义。 4 数据可读
inline void StreamResource::EmitRead(ssize_t nread, const uv_buf_t& buf) { if (nread > 0) // 记录从流中读取的数据的字节大小 bytes_read_ += static_cast<uint64_t>(nread); listener_->OnStreamRead(nread, buf); }
5 写回调
inline void StreamResource::EmitAfterWrite(WriteWrap* w, int status) { DebugSealHandleScope handle_scope(v8::Isolate::GetCurrent()); listener_->OnStreamAfterWrite(w, status); }
6 关闭流回调
inline void StreamResource::EmitAfterShutdown(ShutdownWrap* w, int status) { DebugSealHandleScope handle_scope(v8::Isolate::GetCurrent()); listener_->OnStreamAfterShutdown(w, status); }
7 流销毁回调
inline StreamResource::~StreamResource() { while (listener_ != nullptr) { StreamListener* listener = listener_; listener->OnStreamDestroy(); if (listener == listener_) RemoveStreamListener(listener_); } }
流销毁后需要通知listener,并且解除关系。
StreamBase是StreamResource的子类,拓展了StreamResource的功能。
class StreamBase : public StreamResource { public: static constexpr int kStreamBaseField = 1; static constexpr int kOnReadFunctionField = 2; static constexpr int kStreamBaseFieldCount = 3; // 定义一些统一的逻辑 static void AddMethods(Environment* env, v8::Local<v8::FunctionTemplate> target); virtual bool IsAlive() = 0; virtual bool IsClosing() = 0; virtual bool IsIPCPipe(); virtual int GetFD(); // 执行JS回调 v8::MaybeLocal<v8::Value> CallJSOnreadMethod( ssize_t nread, v8::Local<v8::ArrayBuffer> ab, size_t offset = 0, StreamBaseJSChecks checks = DONT_SKIP_NREAD_CHECKS); Environment* stream_env() const; // 关闭流 int Shutdown(v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>()); // 写入流 StreamWriteResult Write( uv_buf_t* bufs, size_t count, uv_stream_t* send_handle = nullptr, v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>()); // 创建一个关闭请求 virtual ShutdownWrap* CreateShutdownWrap(v8::Local<v8::Object> object); // 创建一个写请求 virtual WriteWrap* CreateWriteWrap(v8::Local<v8::Object> object); virtual AsyncWrap* GetAsyncWrap() = 0; virtual v8::Local<v8::Object> GetObject(); static StreamBase* FromObject(v8::Local<v8::Object> obj); protected: explicit StreamBase(Environment* env); // JS Methods int ReadStartJS(const v8::FunctionCallbackInfo<v8::Value>& args); // 省略系列方法 void AttachToObject(v8::Local<v8::Object> obj); template <int (StreamBase::*Method)( const v8::FunctionCallbackInfo<v8::Value>& args)> static void JSMethod(const v8::FunctionCallbackInfo<v8::Value>& args); private: Environment* env_; EmitToJSStreamListener default_listener_; void SetWriteResult(const StreamWriteResult& res); static void AddMethod(Environment* env, v8::Local<v8::Signature> sig, enum v8::PropertyAttribute attributes, v8::Local<v8::FunctionTemplate> t, JSMethodFunction* stream_method, v8::Local<v8::String> str); };
1 初始化
inline StreamBase::StreamBase(Environment* env) : env_(env) { PushStreamListener(&default_listener_); }
StreamBase初始化的时候会默认设置一个listener。 2 关闭流
// 关闭一个流,req_wrap_obj是JS层传进来的对象 inline int StreamBase::Shutdown(v8::Local<v8::Object> req_wrap_obj) { Environment* env = stream_env(); HandleScope handle_scope(env->isolate()); AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap()); // 创建一个用于请求Libuv的数据结构 ShutdownWrap* req_wrap = CreateShutdownWrap(req_wrap_obj); // 子类实现,不同流关闭的逻辑不一样 int err = DoShutdown(req_wrap); // 执行出错则销毁JS层对象 if (err != 0 && req_wrap != nullptr) { req_wrap->Dispose(); } const char* msg = Error(); if (msg != nullptr) { req_wrap_obj->Set( env->context(), env->error_string(), OneByteString(env->isolate(), msg)).Check(); ClearError(); } return err; }
3 写
// 写Buffer,支持发送文件描述符 int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) { Environment* env = Environment::GetCurrent(args); Local<Object> req_wrap_obj = args[0].As<Object>(); uv_buf_t buf; // 数据内容和长度 buf.base = Buffer::Data(args[1]); buf.len = Buffer::Length(args[1]); uv_stream_t* send_handle = nullptr; // 是对象并且流支持发送文件描述符 if (args[2]->IsObject() && IsIPCPipe()) { Local<Object> send_handle_obj = args[2].As<Object>(); HandleWrap* wrap; // 从返回js的对象中获取internalField中指向的C++层对象 ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL); // 拿到Libuv层的handle send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle()); // Reference LibuvStreamWrap instance to prevent it from being garbage // collected before `AfterWrite` is called. // 设置到JS层请求对象中 req_wrap_obj->Set(env->context(), env->handle_string(), send_handle_obj).Check(); } StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj); SetWriteResult(res); return res.err; }
inline StreamWriteResult StreamBase::Write( uv_buf_t* bufs, size_t count, uv_stream_t* send_handle, v8::Local<v8::Object> req_wrap_obj) { Environment* env = stream_env(); int err; size_t total_bytes = 0; // 计算需要写入的数据大小 for (size_t i = 0; i < count; ++i) total_bytes += bufs[i].len; // 同上 bytes_written_ += total_bytes; // 是否需要发送文件描述符,不需要则直接写 if (send_handle == nullptr) { err = DoTryWrite(&bufs, &count); if (err != 0 || count == 0) { return StreamWriteResult { false, err, nullptr, total_bytes }; } } HandleScope handle_scope(env->isolate()); AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap()); // 创建一个用于请求Libuv的写请求对象 WriteWrap* req_wrap = CreateWriteWrap(req_wrap_obj); // 执行写,子类实现,不同流写操作不一样 err = DoWrite(req_wrap, bufs, count, send_handle); const char* msg = Error(); if (msg != nullptr) { req_wrap_obj->Set(env->context(), env->error_string(), OneByteString(env->isolate(), msg)).Check(); ClearError(); } return StreamWriteResult { async, err, req_wrap, total_bytes }; }
4 读
// 操作流,启动读取 int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) { return ReadStart(); } // 操作流,停止读取 int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) { return ReadStop(); } // 触发流事件,有数据可读 MaybeLocal<Value> StreamBase::CallJSOnreadMethod(ssize_t nread, Local<ArrayBuffer> ab, size_t offset, StreamBaseJSChecks checks) { Environment* env = env_; env->stream_base_state()[kReadBytesOrError] = nread; env->stream_base_state()[kArrayBufferOffset] = offset; Local<Value> argv[] = { ab.IsEmpty() ? Undefined(env->isolate()).As<Value>() : ab.As<Value>() }; // GetAsyncWrap在StreamBase子类实现,拿到StreamBase类对象 AsyncWrap* wrap = GetAsyncWrap(); // 获取回调执行 Local<Value> onread = wrap->object()->GetInternalField(kOnReadFunctionField); return wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv); }
4 流通用方法
void StreamBase::AddMethod(Environment* env, Local<Signature> signature, enum PropertyAttribute attributes, Local<FunctionTemplate> t, JSMethodFunction* stream_method, Local<String> string) { // 新建一个函数模板 Local<FunctionTemplate> templ = env->NewFunctionTemplate(stream_method, signature, v8::ConstructorBehavior::kThrow, v8::SideEffectType::kHasNoSideEffect); // 设置原型属性 t->PrototypeTemplate()->SetAccessorProperty( string, templ, Local<FunctionTemplate>(), attributes); } void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate> t) { HandleScope scope(env->isolate()); enum PropertyAttribute attributes = static_cast<PropertyAttribute>(ReadOnly | DontDelete | DontEnum); Local<Signature> sig = Signature::New(env->isolate(), t); // 设置原型属性 AddMethod(env, sig, attributes, t, GetFD, env->fd_string()); // 忽略部分 env->SetProtoMethod(t, "readStart", JSMethod<&StreamBase::ReadStartJS>); env->SetProtoMethod(t, "readStop", JSMethod<&StreamBase::ReadStopJS>); env->SetProtoMethod(t, "shutdown", JSMethod<&StreamBase::Shutdown>); env->SetProtoMethod(t, "writev", JSMethod<&StreamBase::Writev>); env->SetProtoMethod(t, "writeBuffer", JSMethod<&StreamBase::WriteBuffer>); env->SetProtoMethod( t, "writeAsciiString", JSMethod<&StreamBase::WriteString<ASCII>>); env->SetProtoMethod( t, "writeUtf8String", JSMethod<&StreamBase::WriteString<UTF8>>); t->PrototypeTemplate()->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "isStreamBase"), True(env->isolate())); // 设置访问器 t->PrototypeTemplate()->SetAccessor( // 键名 FIXED_ONE_BYTE_STRING(env->isolate(), "onread"), // getter BaseObject::InternalFieldGet<kOnReadFunctionField>, // setter,Value::IsFunction是set之前的校验函数,见InternalFieldSet(模板函数)定义 BaseObject::InternalFieldSet<kOnReadFunctionField, &Value::IsFunction>); }
5 其它函数
// 默认false,子类重写 bool StreamBase::IsIPCPipe() { return false; } // 子类重写 int StreamBase::GetFD() { return -1; } Local<Object> StreamBase::GetObject() { return GetAsyncWrap()->object(); } // 工具函数和实例this无关,和入参有关 void StreamBase::GetFD(const FunctionCallbackInfo<Value>& args) { // Mimic implementation of StreamBase::GetFD() and UDPWrap::GetFD(). // 从JS层对象获取它关联的C++对象,不一定是this StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>()); if (wrap == nullptr) return args.GetReturnValue().Set(UV_EINVAL); if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL); args.GetReturnValue().Set(wrap->GetFD()); } void StreamBase::GetBytesRead(const FunctionCallbackInfo<Value>& args) { StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>()); if (wrap == nullptr) return args.GetReturnValue().Set(0); // uint64_t -> double. 53bits is enough for all real cases. args.GetReturnValue().Set(static_cast<double>(wrap->bytes_read_)); }
LibuvStreamWrap是StreamBase的子类。实现了父类的接口,也拓展了流的能力。
class LibuvStreamWrap : public HandleWrap, public StreamBase { public: static void Initialize(v8::Local<v8::Object> target, v8::Local<v8::Value> unused, v8::Local<v8::Context> context, void* priv); int GetFD() override; bool IsAlive() override; bool IsClosing() override; bool IsIPCPipe() override; // JavaScript functions int ReadStart() override; int ReadStop() override; // Resource implementation int DoShutdown(ShutdownWrap* req_wrap) override; int DoTryWrite(uv_buf_t** bufs, size_t* count) override; int DoWrite(WriteWrap* w, uv_buf_t* bufs, size_t count, uv_stream_t* send_handle) override; inline uv_stream_t* stream() const { return stream_; } // 是否是Unix域或命名管道 inline bool is_named_pipe() const { return stream()->type == UV_NAMED_PIPE; } // 是否是Unix域并且支持传递文件描述符 inline bool is_named_pipe_ipc() const { return is_named_pipe() && reinterpret_cast<const uv_pipe_t*>(stream())->ipc != 0; } inline bool is_tcp() const { return stream()->type == UV_TCP; } // 创建请求Libuv的对象 ShutdownWrap* CreateShutdownWrap(v8::Local<v8::Object> object) override; WriteWrap* CreateWriteWrap(v8::Local<v8::Object> object) override; // 从JS层对象获取对于的C++对象 static LibuvStreamWrap* From(Environment* env, v8::Local<v8::Object> object); protected: LibuvStreamWrap(Environment* env, v8::Local<v8::Object> object, uv_stream_t* stream, AsyncWrap::ProviderType provider); AsyncWrap* GetAsyncWrap() override; static v8::Local<v8::FunctionTemplate> GetConstructorTemplate( Environment* env); private: static void GetWriteQueueSize( const v8::FunctionCallbackInfo<v8::Value>& info); static void SetBlocking(const v8::FunctionCallbackInfo<v8::Value>& args); // Callbacks for libuv void OnUvAlloc(size_t suggested_size, uv_buf_t* buf); void OnUvRead(ssize_t nread, const uv_buf_t* buf); static void AfterUvWrite(uv_write_t* req, int status); static void AfterUvShutdown(uv_shutdown_t* req, int status); uv_stream_t* const stream_; };
LibuvStreamWrap::LibuvStreamWrap(Environment* env, Local<Object> object, uv_stream_t* stream, AsyncWrap::ProviderType provider) : HandleWrap(env, object, reinterpret_cast<uv_handle_t*>(stream), provider), StreamBase(env), stream_(stream) { StreamBase::AttachToObject(object); }
LibuvStreamWrap初始化的时候,会把JS层使用的对象的内部指针指向自己,见HandleWrap。 2 写操作
// 工具函数,获取待写数据字节的大小 void LibuvStreamWrap::GetWriteQueueSize( const FunctionCallbackInfo<Value>& info) { LibuvStreamWrap* wrap; ASSIGN_OR_RETURN_UNWRAP(&wrap, info.This()); uint32_t write_queue_size = wrap->stream()->write_queue_size; info.GetReturnValue().Set(write_queue_size); } // 设置非阻塞 void LibuvStreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) { LibuvStreamWrap* wrap; ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); bool enable = args[0]->IsTrue(); args.GetReturnValue().Set(uv_stream_set_blocking(wrap->stream(), enable)); } // 定义一个关闭的请求 typedef SimpleShutdownWrap<ReqWrap<uv_shutdown_t>> LibuvShutdownWrap; // 定义一个写请求 typedef SimpleWriteWrap<ReqWrap<uv_write_t>> LibuvWriteWrap; ShutdownWrap* LibuvStreamWrap::CreateShutdownWrap(Local<Object> object) { return new LibuvShutdownWrap(this, object); } WriteWrap* LibuvStreamWrap::CreateWriteWrap(Local<Object> object) { return new LibuvWriteWrap(this, object); } // 发起关闭请求,由父类调用,req_wrap是C++层创建的对象 int LibuvStreamWrap::DoShutdown(ShutdownWrap* req_wrap_) { LibuvShutdownWrap* req_wrap = static_cast<LibuvShutdownWrap*>(req_wrap_); return req_wrap->Dispatch(uv_shutdown, stream(), AfterUvShutdown); } // 关闭请求结束后执行请求的通用回调Done void LibuvStreamWrap::AfterUvShutdown(uv_shutdown_t* req, int status) { LibuvShutdownWrap* req_wrap = static_cast<LibuvShutdownWrap*>( LibuvShutdownWrap::from_req(req)); HandleScope scope(req_wrap->env()->isolate()); Context::Scope context_scope(req_wrap->env()->context()); req_wrap->Done(status); } int LibuvStreamWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) { int err; size_t written; uv_buf_t* vbufs = *bufs; size_t vcount = *count; err = uv_try_write(stream(), vbufs, vcount); if (err == UV_ENOSYS || err == UV_EAGAIN) return 0; if (err < 0) return err; // 写成功的字节数,更新数据 written = err; for (; vcount > 0; vbufs++, vcount--) { // Slice if (vbufs[0].len > written) { vbufs[0].base += written; vbufs[0].len -= written; written = 0; break; // Discard } else { written -= vbufs[0].len; } } *bufs = vbufs; *count = vcount; return 0; } int LibuvStreamWrap::DoWrite(WriteWrap* req_wrap, uv_buf_t* bufs, size_t count, uv_stream_t* send_handle) { LibuvWriteWrap* w = static_cast<LibuvWriteWrap*>(req_wrap); return w->Dispatch(uv_write2, stream(), bufs, count, send_handle, AfterUvWrite); } void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) { LibuvWriteWrap* req_wrap = static_cast<LibuvWriteWrap*>( LibuvWriteWrap::from_req(req)); HandleScope scope(req_wrap->env()->isolate()); Context::Scope context_scope(req_wrap->env()->context()); req_wrap->Done(status); }
3 读操作
// 调用Libuv实现启动读逻辑 int LibuvStreamWrap::ReadStart() { return uv_read_start(stream(), [](uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { static_cast<LibuvStreamWrap*>(handle->data)->OnUvAlloc(suggested_size, buf); }, [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { static_cast<LibuvStreamWrap*>(stream->data)->OnUvRead(nread, buf); }); } // 实现停止读逻辑 int LibuvStreamWrap::ReadStop() { return uv_read_stop(stream()); } // 需要分配内存时的回调,由Libuv回调,具体分配内存逻辑由listener实现 void LibuvStreamWrap::OnUvAlloc(size_t suggested_size, uv_buf_t* buf) { HandleScope scope(env()->isolate()); Context::Scope context_scope(env()->context()); *buf = EmitAlloc(suggested_size); } // 处理传递的文件描述符 template <class WrapType> static MaybeLocal<Object> AcceptHandle(Environment* env, LibuvStreamWrap* parent) { EscapableHandleScope scope(env->isolate()); Local<Object> wrap_obj; // 根据类型创建一个表示客户端的对象,然后把文件描述符保存其中 if (!WrapType::Instantiate(env, parent, WrapType::SOCKET).ToLocal(&wrap_obj)) return Local<Object>(); // 解出C++层对象 HandleWrap* wrap = Unwrap<HandleWrap>(wrap_obj); CHECK_NOT_NULL(wrap); // 拿到C++对象中封装的handle uv_stream_t* stream = reinterpret_cast<uv_stream_t*>(wrap->GetHandle()); // 从服务器流中摘下一个fd保存到steam if (uv_accept(parent->stream(), stream)) ABORT(); return scope.Escape(wrap_obj); } // 实现OnUvRead,流中有数据或读到结尾时由Libuv回调 void LibuvStreamWrap::OnUvRead(ssize_t nread, const uv_buf_t* buf) { HandleScope scope(env()->isolate()); Context::Scope context_scope(env()->context()); uv_handle_type type = UV_UNKNOWN_HANDLE; // 是否支持传递文件描述符并且有待处理的文件描述符,则判断文件描述符类型 if (is_named_pipe_ipc() && uv_pipe_pending_count(reinterpret_cast<uv_pipe_t*>(stream())) > 0) { type = uv_pipe_pending_type(reinterpret_cast<uv_pipe_t*>(stream())); } // 读取成功 if (nread > 0) { MaybeLocal<Object> pending_obj; // 根据类型创建一个新的C++对象表示客户端,并且从服务器中摘下一个fd保存到客户端 if (type == UV_TCP) { pending_obj = AcceptHandle<TCPWrap>(env(), this); } else if (type == UV_NAMED_PIPE) { pending_obj = AcceptHandle<PipeWrap>(env(), this); } else if (type == UV_UDP) { pending_obj = AcceptHandle<UDPWrap>(env(), this); } else { CHECK_EQ(type, UV_UNKNOWN_HANDLE); } // 有需要处理的文件描述符则设置到JS层对象中,JS层使用 if (!pending_obj.IsEmpty()) { object() ->Set(env()->context(), env()->pending_handle_string(), pending_obj.ToLocalChecked()) .Check(); } } // 触发读事件,listener实现 EmitRead(nread, *buf); }
读操作不仅支持读取一般的数据,还可以读取文件描述符,C++层会新建一个流对象表示该文件描述符。在JS层可以使用。
ConnectionWrap是LibuvStreamWrap子类,拓展了连接的接口。适用于带有连接属性的流,比如Unix域和TCP。
// WrapType是C++层的类,UVType是Libuv的类型 template <typename WrapType, typename UVType> class ConnectionWrap : public LibuvStreamWrap { public: static void OnConnection(uv_stream_t* handle, int status); static void AfterConnect(uv_connect_t* req, int status); protected: ConnectionWrap(Environment* env, v8::Local<v8::Object> object, ProviderType provider); UVType handle_; };
1 发起连接后的回调
template <typename WrapType, typename UVType> void ConnectionWrap<WrapType, UVType>::AfterConnect(uv_connect_t* req, int status) { // 通过Libuv结构体拿到对应的C++对象 std::unique_ptr<ConnectWrap> req_wrap = (static_cast<ConnectWrap*>(req->data)); WrapType* wrap = static_cast<WrapType*>(req->handle->data); Environment* env = wrap->env(); HandleScope handle_scope(env->isolate()); Context::Scope context_scope(env->context()); bool readable, writable; // 连接结果 if (status) { readable = writable = false; } else { readable = uv_is_readable(req->handle) != 0; writable = uv_is_writable(req->handle) != 0; } Local<Value> argv[5] = { Integer::New(env->isolate(), status), wrap->object(), req_wrap->object(), Boolean::New(env->isolate(), readable), Boolean::New(env->isolate(), writable) }; // 回调js req_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv); }
2 连接到来时回调
// 有连接时触发的回调 template <typename WrapType, typename UVType> void ConnectionWrap<WrapType, UVType>::OnConnection(uv_stream_t* handle, int status) { // 拿到Libuv结构体对应的C++层对象 WrapType* wrap_data = static_cast<WrapType*>(handle->data); Environment* env = wrap_data->env(); HandleScope handle_scope(env->isolate()); Context::Scope context_scope(env->context()); // 和客户端通信的对象 Local<Value> client_handle; if (status == 0) { // Instantiate the client javascript object and handle. // 新建一个JS层使用对象 Local<Object> client_obj; if (!WrapType::Instantiate(env, wrap_data, WrapType::SOCKET) .ToLocal(&client_obj)) return; // Unwrap the client javascript object. WrapType* wrap; // 把JS层使用的对象client_obj所对应的C++层对象存到wrap中 ASSIGN_OR_RETURN_UNWRAP(&wrap, client_obj); // 拿到对应的handle uv_stream_t* client = reinterpret_cast<uv_stream_t*>(&wrap->handle_); // 从handleaccpet到的fd中拿一个保存到client,client就可以和客户端通信了 if (uv_accept(handle, client)) return; client_handle = client_obj; } else { client_handle = Undefined(env->isolate()); } // 回调JS,client_handle相当于在JS层执行new TCP Local<Value> argv[] = { Integer::New(env->isolate(), status), client_handle }; wrap_data->MakeCallback(env->onconnection_string(), arraysize(argv), argv); }
我们看一下TCP的Instantiate。
MaybeLocal<Object> TCPWrap::Instantiate(Environment* env, AsyncWrap* parent, TCPWrap::SocketType type) { EscapableHandleScope handle_scope(env->isolate()); AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(parent); // 拿到导出到JS层的TCP构造函数,缓存在env中 Local<Function> constructor = env->tcp_constructor_template() ->GetFunction(env->context()) .ToLocalChecked(); Local<Value> type_value = Int32::New(env->isolate(), type); // 相当于我们在JS层调用new TCP()时拿到的对象 return handle_scope.EscapeMaybe( constructor->NewInstance(env->context(), 1, &type_value)); }
StreamReq表示操作流的一次请求。主要保存了请求上下文和操作结束后的通用逻辑。
// 请求Libuv的基类 class StreamReq { public: // JS层传进来的对象的internalField[1]保存了StreamReq类对象 static constexpr int kStreamReqField = 1; // stream为所操作的流,req_wrap_obj为JS层传进来的对象 explicit StreamReq(StreamBase* stream, v8::Local<v8::Object> req_wrap_obj) : stream_(stream) { // JS层对象指向当前StreamReq对象 AttachToObject(req_wrap_obj); } // 子类定义 virtual AsyncWrap* GetAsyncWrap() = 0; // 获取相关联的原始js对象 v8::Local<v8::Object> object(); // 请求结束后的回调,会执行子类的onDone,onDone由子类实现 void Done(int status, const char* error_str = nullptr); // JS层对象不再执行StreamReq实例 void Dispose(); // 获取所操作的流 inline StreamBase* stream() const { return stream_; } // 从JS层对象获取StreamReq对象 static StreamReq* FromObject(v8::Local<v8::Object> req_wrap_obj); // 请求JS层对象的internalField所有指向 static inline void ResetObject(v8::Local<v8::Object> req_wrap_obj); protected: // 请求结束后回调 virtual void OnDone(int status) = 0; void AttachToObject(v8::Local<v8::Object> req_wrap_obj); private: StreamBase* const stream_; };
StreamReq有一个成员为stream_,表示StreamReq请求中操作的流。下面我们看一下实现。 1 JS层请求上下文和StreamReq的关系管理。
inline void StreamReq::AttachToObject(v8::Local<v8::Object> req_wrap_obj) { req_wrap_obj->SetAlignedPointerInInternalField(kStreamReqField, this); } inline StreamReq* StreamReq::FromObject(v8::Local<v8::Object> req_wrap_obj) { return static_cast<StreamReq*>( req_wrap_obj->GetAlignedPointerFromInternalField(kStreamReqField)); } inline void StreamReq::Dispose() { object()->SetAlignedPointerInInternalField(kStreamReqField, nullptr); delete this; } inline void StreamReq::ResetObject(v8::Local<v8::Object> obj) { obj->SetAlignedPointerInInternalField(0, nullptr); // BaseObject field. obj->SetAlignedPointerInInternalField(StreamReq::kStreamReqField, nullptr); }
2 获取原始JS层请求对象
// 获取和该请求相关联的原始js对象 inline v8::Local<v8::Object> StreamReq::object() { return GetAsyncWrap()->object(); }
3 请求结束回调
inline void StreamReq::Done(int status, const char* error_str) { AsyncWrap* async_wrap = GetAsyncWrap(); Environment* env = async_wrap->env(); if (error_str != nullptr) { async_wrap->object()->Set(env->context(), env->error_string(), OneByteString(env->isolate(), error_str)) .Check(); } // 执行子类的OnDone OnDone(status); }
流操作请求结束后会统一执行Done,Done会执行子类实现的OnDone函数。
ShutdownWrap是StreamReq的子类,表示一次关闭流请求。
class ShutdownWrap : public StreamReq { public: ShutdownWrap(StreamBase* stream, v8::Local<v8::Object> req_wrap_obj) : StreamReq(stream, req_wrap_obj) { } void OnDone(int status) override; };
ShutdownWrap实现了OnDone接口,在关闭流结束后被基类执行。
/* 关闭结束时回调,由请求类(ShutdownWrap)调用Libuv, 所以Libuv操作完成后,首先执行请求类的回调,请求类通知流,流触发 对应的事件,进一步通知listener */ inline void ShutdownWrap::OnDone(int status) { stream()->EmitAfterShutdown(this, status); Dispose(); }
SimpleShutdownWrap是ShutdownWrap的子类。实现了GetAsyncWrap接口。OtherBase可以是ReqWrap或者AsyncWrap。
template <typename OtherBase> class SimpleShutdownWrap : public ShutdownWrap, public OtherBase { public: SimpleShutdownWrap(StreamBase* stream, v8::Local<v8::Object> req_wrap_obj); AsyncWrap* GetAsyncWrap() override { return this; } };
WriteWrap是StreamReq的子类,表示一次往流写入数据的请求。
class WriteWrap : public StreamReq { public: void SetAllocatedStorage(AllocatedBuffer&& storage); WriteWrap(StreamBase* stream, v8::Local<v8::Object> req_wrap_obj) : StreamReq(stream, req_wrap_obj) { } void OnDone(int status) override; private: AllocatedBuffer storage_; };
WriteWrap实现了OnDone接口,在写结束时被基类执行。
inline void WriteWrap::OnDone(int status) { stream()->EmitAfterWrite(this, status); Dispose(); }
请求结束后调用流的接口通知流写结束了,流会通知listener,listener会调用流的接口通知JS层。
SimpleWriteWrap是WriteWrap的子类。实现了GetAsyncWrap接口。和SimpleShutdownWrap类型。
template <typename OtherBase> class SimpleWriteWrap : public WriteWrap, public OtherBase { public: SimpleWriteWrap(StreamBase* stream, v8::Local<v8::Object> req_wrap_obj); AsyncWrap* GetAsyncWrap() override { return this; } };
class StreamListener { public: virtual ~StreamListener(); // 分配存储数据的内存 virtual uv_buf_t OnStreamAlloc(size_t suggested_size) = 0; // 有数据可读时回调,消费数据的函数 virtual void OnStreamRead(ssize_t nread, const uv_buf_t& buf) = 0; // 流销毁时回调 virtual void OnStreamDestroy() {} // 监听者所属流 inline StreamResource* stream() { return stream_; } protected: // 流是监听者是一条链表,该函数把结构传递给下一个节点 void PassReadErrorToPreviousListener(ssize_t nread); // 监听者所属流 StreamResource* stream_ = nullptr; // 下一个节点,形成链表 StreamListener* previous_listener_ = nullptr; friend class StreamResource; };
StreamListener是类似一个订阅者,它会对流的状态感兴趣,比如数据可读、可写、流关闭等。一个流可以注册多个listener,多个listener形成一个链表。
// 从listen所属的流的listener队列中删除自己 inline StreamListener::~StreamListener() { if (stream_ != nullptr) stream_->RemoveStreamListener(this); } // 读出错,把信息传递给前一个listener inline void StreamListener::PassReadErrorToPreviousListener(ssize_t nread) { CHECK_NOT_NULL(previous_listener_); previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0)); } // 实现流关闭时的处理逻辑 inline void StreamListener::OnStreamAfterShutdown(ShutdownWrap* w, int status) { previous_listener_->OnStreamAfterShutdown(w, status); } // 实现写结束时的处理逻辑 inline void StreamListener::OnStreamAfterWrite(WriteWrap* w, int status) { previous_listener_->OnStreamAfterWrite(w, status); }
StreamListener的逻辑不多,具体的实现在子类。
ReportWritesToJSStreamListener是StreamListener的子类。覆盖了部分接口和拓展了一些功能。
class ReportWritesToJSStreamListener : public StreamListener { public: // 实现父类的这两个接口 void OnStreamAfterWrite(WriteWrap* w, int status) override; void OnStreamAfterShutdown(ShutdownWrap* w, int status) override; private: void OnStreamAfterReqFinished(StreamReq* req_wrap, int status); };
1 OnStreamAfterReqFinished OnStreamAfterReqFinished是请求操作流结束后的统一的回调。
void ReportWritesToJSStreamListener::OnStreamAfterWrite( WriteWrap* req_wrap, int status) { OnStreamAfterReqFinished(req_wrap, status); } void ReportWritesToJSStreamListener::OnStreamAfterShutdown( ShutdownWrap* req_wrap, int status) { OnStreamAfterReqFinished(req_wrap, status); }
我们看一下具体实现
void ReportWritesToJSStreamListener::OnStreamAfterReqFinished( StreamReq* req_wrap, int status) { // 请求所操作的流 StreamBase* stream = static_cast<StreamBase*>(stream_); Environment* env = stream->stream_env(); AsyncWrap* async_wrap = req_wrap->GetAsyncWrap(); HandleScope handle_scope(env->isolate()); Context::Scope context_scope(env->context()); // 获取原始的JS层对象 Local<Object> req_wrap_obj = async_wrap->object(); Local<Value> argv[] = { Integer::New(env->isolate(), status), stream->GetObject(), Undefined(env->isolate()) }; const char* msg = stream->Error(); if (msg != nullptr) { argv[2] = OneByteString(env->isolate(), msg); stream->ClearError(); } // 回调JS层 if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust()) async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv); }
OnStreamAfterReqFinished会回调JS层。 6.8.12 EmitToJSStreamListener EmitToJSStreamListener是ReportWritesToJSStreamListener的子类
class EmitToJSStreamListener : public ReportWritesToJSStreamListener { public: uv_buf_t OnStreamAlloc(size_t suggested_size) override; void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; };
// 分配一块内存 uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) { Environment* env = static_cast<StreamBase*>(stream_)->stream_env(); return env->AllocateManaged(suggested_size).release(); } // 读取数据结束后回调 void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) { StreamBase* stream = static_cast<StreamBase*>(stream_); Environment* env = stream->stream_env(); HandleScope handle_scope(env->isolate()); Context::Scope context_scope(env->context()); AllocatedBuffer buf(env, buf_); // 读取失败 if (nread <= 0) { if (nread < 0) stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>()); return; } buf.Resize(nread); // 读取成功回调JS层 stream->CallJSOnreadMethod(nread, buf.ToArrayBuffer()); }
我们看到listener处理完数据后又会回调流的接口,具体的逻辑由子类实现。我们来看一个子类的实现(流默认的listener)。
EmitToJSStreamListener会实现OnStreamRead等方法,接着我们看一下创建一个C++层的TCP对象是怎样的。下面是TCPWrap的继承关系。
class TCPWrap : public ConnectionWrap<TCPWrap, uv_tcp_t>{} // ConnectionWrap拓展了建立TCP连接时的逻辑 class ConnectionWrap : public LibuvStreamWrap{} class LibuvStreamWrap : public HandleWrap, public StreamBase{} class StreamBase : public StreamResource {}
我们看到TCP流是继承于StreamResource的。新建一个TCP的C++的对象时(tcp_wrap.cc),会不断往上调用父类的构造函数,其中在StreamBase中有一个关键的操作。
inline StreamBase::StreamBase(Environment* env) : env_(env) { PushStreamListener(&default_listener_); } EmitToJSStreamListener default_listener_;
StreamBase会默认给流注册一个listener。我们看下EmitToJSStreamListener 具体的定义。
class ReportWritesToJSStreamListener : public StreamListener { public: void OnStreamAfterWrite(WriteWrap* w, int status) override; void OnStreamAfterShutdown(ShutdownWrap* w, int status) override; private: void OnStreamAfterReqFinished(StreamReq* req_wrap, int status); }; class EmitToJSStreamListener : public ReportWritesToJSStreamListener { public: uv_buf_t OnStreamAlloc(size_t suggested_size) override; void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; };
EmitToJSStreamListener继承StreamListener ,定义了分配内存和读取接收数据的函数。接着我们看一下PushStreamListener做了什么事情。
inline void StreamResource::PushStreamListener(StreamListener* listener) { // 头插法 listener->previous_listener_ = listener_; listener->stream_ = this; listener_ = listener; }
PushStreamListener就是构造出一个listener链表结构。然后我们看一下对于流来说,读取数据的整个链路。首先是JS层调用readStart
function tryReadStart(socket) { socket._handle.reading = true; const err = socket._handle.readStart(); if (err) socket.destroy(errnoException(err, 'read')); } // 注册等待读事件 Socket.prototype._read = function(n) { tryReadStart(this); };
我们看看readStart
int LibuvStreamWrap::ReadStart() { return uv_read_start(stream(), [](uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { static_cast<LibuvStreamWrap*>(handle->data)->OnUvAlloc(suggested_size, buf); }, [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { static_cast<LibuvStreamWrap*>(stream->data)->OnUvRead(nread, buf); }); }
ReadStart调用Libuv的uv_read_start注册等待可读事件,并且注册了两个回调函数OnUvAlloc和OnUvRead。
void LibuvStreamWrap::OnUvRead(ssize_t nread, const uv_buf_t* buf) { EmitRead(nread, *buf); } inline void StreamResource::EmitRead(ssize_t nread, const uv_buf_t& buf) { // bytes_read_表示已读的字节数 if (nread > 0) bytes_read_ += static_cast<uint64_t>(nread); listener_->OnStreamRead(nread, buf); }
通过层层调用最后会调用listener_的OnStreamRead。我们看看TCP的OnStreamRead
void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) { StreamBase* stream = static_cast<StreamBase*>(stream_); Environment* env = stream->stream_env(); HandleScope handle_scope(env->isolate()); Context::Scope context_scope(env->context()); AllocatedBuffer buf(env, buf_); stream->CallJSOnreadMethod(nread, buf.ToArrayBuffer()); }
继续回调CallJSOnreadMethod
MaybeLocal<Value> StreamBase::CallJSOnreadMethod(ssize_t nread, Local<ArrayBuffer> ab, size_t offset, StreamBaseJSChecks checks) { Environment* env = env_; // ... AsyncWrap* wrap = GetAsyncWrap(); CHECK_NOT_NULL(wrap); Local<Value> onread = wrap->object()->GetInternalField(kOnReadFunctionField); CHECK(onread->IsFunction()); return wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv); }
CallJSOnreadMethod会回调JS层的onread回调函数。onread会把数据push到流中,然后触发data事件。
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8