Unix域一种进程间通信的方式,Unix域不仅支持没有继承关系的进程间进行通信,而且支持进程间传递文件描述符。Unix域是Node.js中核心的功能,它是进程间通信的底层基础,child_process和cluster模块都依赖Unix域的能力。从实现和使用上来看,Unix域类似TCP,但是因为它是基于同主机进程的,不像TCP需要面临复杂的网络的问题,所以实现也没有TCP那么复杂。Unix域和传统的socket通信一样,遵循网络编程的那一套流程,由于在同主机内,就不必要使用IP和端口的方式。Node.js中,Unix域采用的是一个文件作为标记。大致原理如下。 1 服务器首先拿到一个socket。 2 服务器bind一个文件,类似bind一个IP和端口一样,对于操作系统来说,就是新建一个文件(不一定是在硬盘中创建,可以设置抽象路径名),然后把文件路径信息存在socket中。 3 调用listen修改socket状态为监听状态。 4 客户端通过同样的文件路径调用connect去连接服务器。这时候用于表示客户端的结构体插入服务器的连接队列,等待处理。 5 服务器调用accept摘取队列的节点,然后新建一个通信socket和客户端进行通信。 Unix域通信本质还是基于内存之间的通信,客户端和服务器都维护一块内存,这块内存分为读缓冲区和写缓冲区。从而实现全双工通信,而Unix域的文件路径,只不过是为了让客户端进程可以找到服务端进程,后续就可以互相往对方维护的内存里写数据,从而实现进程间通信。
接下来我们看一下在Libuv中关于Unix域的实现和使用。
Unix域使用uv_pipe_t结构体表示,使用之前首先需要初始化uv_pipe_t。下面看一下它的实现逻辑。
int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE); handle->shutdown_req = NULL; handle->connect_req = NULL; handle->pipe_fname = NULL; handle->ipc = ipc; return 0; }
uv_pipe_init逻辑很简单,就是初始化uv_pipe_t结构体的一些字段。uv_pipe_t继承于stream,uv__stream_init就是初始化stream(父类)的字段。uv_pipe_t中有一个字段ipc,该字段标记了是否允许在该Unix域通信中传递文件描述符。
开头说过,Unix域的实现类似TCP的实现。遵循网络socket编程那一套流程。服务端使用bind,listen等函数启动服务。
// name是unix路径名称 int uv_pipe_bind(uv_pipe_t* handle, const char* name) { struct sockaddr_un saddr; const char* pipe_fname; int sockfd; int err; pipe_fname = NULL; pipe_fname = uv__strdup(name); name = NULL; // 流式Unix域套接字 sockfd = uv__socket(AF_UNIX, SOCK_STREAM, 0); memset(&saddr, 0, sizeof saddr); strncpy(saddr.sun_path, pipe_fname, sizeof(saddr.sun_path) - 1); saddr.sun_path[sizeof(saddr.sun_path) - 1] = '\0'; saddr.sun_family = AF_UNIX; // 绑定到路径,TCP是绑定到IP和端口 if (bind(sockfd, (struct sockaddr*)&saddr, sizeof saddr)) { // ... } // 设置绑定成功标记 handle->flags |= UV_HANDLE_BOUND; // Unix域的路径 handle->pipe_fname = pipe_fname; // 保存socket对应的fd handle->io_watcher.fd = sockfd; return 0; }
uv_pipe_bind函数首先申请一个socket,然后调用操作系统的bind函数把Unix域路径保存到socket中。最后标记已经绑定标记,并且保存Unix域的路径和socket对应的fd到handle中,后续需要使用。我们看到Node.js中Unix域的类型是SOCK_STREAM。Unix域支持两种数据模式。 1 流式( SOCK_STREAM),类似TCP,数据为字节流,需要应用层处理粘包问题。 2 数据报模式( SOCK_DGRAM ),类似UDP,不需要处理粘包问题。 通过Unix域虽然可以实现进程间的通信,但是我们拿到的数据可能是"乱的",这是为什么呢?一般情况下,客户端给服务器发送1个字节,然后服务器处理,如果是基于这种场景,那么数据就不会是乱的。因为每次就是一个需要处理的数据单位。但是如果客户端给服务器发送1个字节,服务器还没来得及处理,客户端又发送了一个字节,那么这时候服务器再处理的时候,就会有问题。因为两个字节混一起了。就好比在一个TCP连接上先后发送两个HTTP请求一样,如果服务器没有办法判断两个请求的数据边界,那么处理就会有问题。所以这时候,我们需要定义一个应用层协议,并且实现封包解包的逻辑,才能真正完成进程间通信。
绑定了路径后,就可以调用listen函数使得socket处于监听状态。
int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) { // uv__stream_fd(handle)得到bind函数中获取的socket if (listen(uv__stream_fd(handle), backlog)) return UV__ERR(errno); // 保存回调,有进程调用connect的时候时触发,由uv__server_io函数触发 handle->connection_cb = cb; // IO观察者的回调 handle->io_watcher.cb = uv__server_io; // 注册IO观察者到Libuv,等待连接,即读事件到来 uv__io_start(handle->loop, &handle->io_watcher, POLLIN); return 0; }
uv_pipe_listen执行操作系统的listen函数使得socket成为监听型的套接字。然后把socket对应的文件描述符和回调封装成IO观察者。注册到Libuv中。等到有读事件到来(有连接到来)。就会执行uv__server_io函数,摘下对应的客户端节点。最后执行connection_cb回调。
这时候,我们已经成功启动了一个Unix域服务。接下来就是看客户端的逻辑。
void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle, const char* name, uv_connect_cb cb) { struct sockaddr_un saddr; int new_sock; int err; int r; // 判断是否已经有socket了,没有的话需要申请一个,见下面 new_sock = (uv__stream_fd(handle) == -1); // 客户端还没有对应的socket fd if (new_sock) { handle->io_watcher.fd= uv__socket(AF_UNIX, SOCK_STREAM, 0); } // 需要连接的服务器信息。主要是Unix域路径信息 memset(&saddr, 0, sizeof saddr); strncpy(saddr.sun_path, name, sizeof(saddr.sun_path) - 1); saddr.sun_path[sizeof(saddr.sun_path) - 1] = '\0'; saddr.sun_family = AF_UNIX; // 非阻塞式连接服务器,Unix域路径是name do { r = connect(uv__stream_fd(handle), (struct sockaddr*)&saddr, sizeof saddr); } while (r == -1 && errno == EINTR); // 忽略错误处理逻辑 err = 0; // 设置socket的可读写属性 if (new_sock) { err = uv__stream_open((uv_stream_t*)handle, uv__stream_fd(handle), UV_HANDLE_READABLE | UV_HANDLE_WRITABLE); } // 把IO观察者注册到Libuv,等到连接成功或者可以发送请求 if (err == 0) uv__io_start(handle->loop, &handle->io_watcher, POLLIN | POLLOUT); out: // 记录错误码,如果有的话 handle->delayed_error = err; // 保存调用者信息 handle->connect_req = req; uv__req_init(handle->loop, req, UV_CONNECT); req->handle = (uv_stream_t*)handle; req->cb = cb; QUEUE_INIT(&req->queue); /* 如果连接出错,在pending阶段会执行uv__stream_io, 从而执行req对应的回调。错误码是delayed_error */ if (err) uv__io_feed(handle->loop, &handle->io_watcher); }
uv_pipe_connect函数首先以非阻塞的方式调用操作系统的connect函数,调用connect后操作系统把客户端对应的socket直接插入服务器socket的待处理socket队列中,等待服务器处理。这时候socket是处于连接中的状态,当服务器调用accept函数处理连接时,会修改连接状态为已连接(这和TCP不一样,TCP是完成三次握手后就会修改为连接状态,而不是accept的时候),并且会触发客户端socket的可写事件。事件驱动模块就会执行相应的回调(uv__stream_io),从而执行C++和JS的回调。
我们可以通过uv_close关闭一个Unix域handle。uv_close中会调用uv__pipe_close。
void uv__pipe_close(uv_pipe_t* handle) { // 如果是Unix域服务器则需要删除Unix域路径并删除指向的堆内存 if (handle->pipe_fname) { unlink(handle->pipe_fname); uv__free((void*)handle->pipe_fname); handle->pipe_fname = NULL; } // 关闭流相关的内容 uv__stream_close((uv_stream_t*)handle); }
关闭Unix域handle时,Libuv会自动删除Unix域路径对应的文件。但是如果进程异常退出时,该文件可能不会被删除,这样会导致下次监听的时候报错listen EADDRINUSE,所以安全起见,我们可以在进程退出或者监听之前判断该文件是否存在,存在的话则删除。另外还有一个问题是,如果两个不相关的进程使用了同一个文件则会导致误删,所以Unix域对应的文件,我们需要小心处理,最好能保证唯一性。
Unix域大致的流程和网络编程一样。分为服务端和客户端两面。Libuv在操作系统提供的API的基础上。和Libuv的异步非阻塞结合。在Libuv中为进程间提供了一种通信方式。下面看一下在Node.js中是如何使用Libuv提供的功能的。
在Node.js中,我们可以通过以下代码创建一个Unix域服务器
const server = net.createServer((client) => { // 处理client }); server.listen('/tmp/test.sock', () => { console.log(`bind uinx domain success`); });
我们从listen函数开始分析这个过程。
Server.prototype.listen = function(...args) { const normalized = normalizeArgs(args); let options = normalized[0]; const cb = normalized[1]; // 调用底层的listen函数成功后执行的回调 if (cb !== null) { this.once('listening', cb); } if (options.path && isPipeName(options.path)) { const pipeName = this._pipeName = options.path; backlog = options.backlog || backlogFromArgs; listenIncluster(this, pipeName, -1, -1, backlog, undefined, options.exclusive); /* Unix域使用文件实现的,客户端需要访问该文件的权限才能通信, 这里做权限控制 */ let mode = 0; if (options.readableAll === true) mode |= PipeConstants.UV_READABLE; if (options.writableAll === true) mode |= PipeConstants.UV_WRITABLE; if (mode !== 0) { // 修改文件的访问属性 const err = this._handle.fchmod(mode); if (err) { this._handle.close(); this._handle = null; throw errnoException(err, 'uv_pipe_chmod'); } } return this; } }
这段代码中最主要的是listenIncluster函数。我们看一下该函数的逻辑。
function listenIncluster(server, address, port, addressType, backlog, fd, exclusive, flags) { exclusive = !!exclusive; if (cluster === undefined) cluster = require('cluster'); if (cluster.isMaster || exclusive) { server._listen2(address, port, addressType, backlog, fd, flags); return; } }
直接调用_listen2(isMaster只有在cluster.fork创建的进程中才是false,其余情况都是true,包括child_process模块创建的子进程)。我们继续看listen函数。
Server.prototype._listen2 = setupListenHandle; function setupListenHandle(address, port, addressType, backlog, fd, flags) { this._handle = createServerHandle(address, port, addressType, fd, flags); // 有完成连接完成时触发 this._handle.onconnection = onconnection; const err = this._handle.listen(backlog || 511); if (err) { // 触发error事件 } // 下一个tick触发listen回调 defaultTriggerAsyncIdScope(this[async_id_symbol], process.nextTick, emitListeningNT, this); }
首先调用createServerHandle创建一个handle,然后执行listen函数。我们首先看一下createServerHandle。
function createServerHandle(address, port, addressType, fd, flags) { let handle = new Pipe(PipeConstants.SERVER); handle.bind(address, port); return handle; }
创建了一个Pipe对象,然后调用它的bind和listen函数,我们看new Pipe的逻辑,从pipe_wrap.cc的导出逻辑,我们知道,这时候会新建一个C++对象,然后执行New函数,并且把新建的C++对象等信息作为入参。
void PipeWrap::New(const FunctionCallbackInfo<Value>& args) { Environment* env = Environment::GetCurrent(args); // 类型 int type_value = args[0].As<Int32>()->Value(); PipeWrap::SocketType type = static_cast<PipeWrap::SocketType>(type_value); // 是否是用于IPC bool ipc; ProviderType provider; switch (type) { case SOCKET: provider = PROVIDER_PIPEWRAP; ipc = false; break; case SERVER: provider = PROVIDER_PIPESERVERWRAP; ipc = false; break; case IPC: provider = PROVIDER_PIPEWRAP; ipc = true; break; default: UNREACHABLE(); } new PipeWrap(env, args.This(), provider, ipc); }
New函数处理了参数,然后执行了new PipeWrap创建一个对象。
PipeWrap::PipeWrap(Environment* env, Local<Object> object, ProviderType provider, bool ipc) : ConnectionWrap(env, object, provider) { int r = uv_pipe_init(env->event_loop(), &handle_, ipc); }
new Pipe执行完后,就会通过该C++对象调用Libuv的bind和listen完成服务器的启动,就不再展开分析。
接着我们看一下Unix域作为客户端使用时的过程。
Socket.prototype.connect = function(...args) { const path = options.path; // Unix域路径 var pipe = !!path; if (!this._handle) { // 创建一个C++层handle,即pipe_wrap.cc导出的Pipe类 this._handle = pipe ? new Pipe(PipeConstants.SOCKET) : new TCP(TCPConstants.SOCKET); // 挂载onread方法到this中 initSocketHandle(this); } if (cb !== null) { this.once('connect', cb); } // 执行internalConnect defaultTriggerAsyncIdScope( this[async_id_symbol], internalConnect, this, path ); return this; };
首先新建一个handle,值是new Pipe。接着执行了internalConnect,internalConnect函数的主要逻辑如下
const req = new PipeConnectWrap(); // address为Unix域路径 req.address = address; req.oncomplete = afterConnect; // 调用C++层connect err = self._handle.connect(req, address, afterConnect);
我们看C++层的connect函数,
void PipeWrap::Connect(const FunctionCallbackInfo<Value>& args) { Environment* env = Environment::GetCurrent(args); PipeWrap* wrap; ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); // PipeConnectWrap对象 Local<Object> req_wrap_obj = args[0].As<Object>(); // Unix域路径 node::Utf8Value name(env->isolate(), args[1]); /* 新建一个ConnectWrap对象,ConnectWrap是对handle进行一次连接请求 的封装,内部维护一个uv_connect_t结构体, req_wrap_obj的一个字段 指向ConnectWrap对象,用于保存对应的请求上下文 */ ConnectWrap* req_wrap = new ConnectWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_PIPECONNECTWRAP); // 调用Libuv的connect函数 uv_pipe_connect(req_wrap->req(), &wrap->handle_, *name, AfterConnect); // req_wrap->req_.data = req_wrap;关联起来 req_wrap->Dispatched(); // uv_pipe_connect() doesn't return errors. args.GetReturnValue().Set(0); }
uv_pipe_connect函数,第一个参数是uv_connect_t结构体(request),第二个是一个uv_pipe_t结构体(handle),handle是对Unix域客户端的封装,request是请求的封装,它表示基于handle发起一次连接请求。连接成功后会执行AfterConnect。由前面分析我们知道,当连接成功时,首先会执行回调Libuv的uv__stream_io,然后执行C++层的AfterConnect。
// 主动发起连接,成功/失败后的回调 template <typename WrapType,typename UVType> = PipeWrap, uv_pipe_t void ConnectionWrap<WrapType, UVType>::AfterConnect(uv_connect_t* req ,int status) { // 在Connect函数里关联起来的 ConnectWrap* req_wrap = static_cast<ConnectWrap*>(req->data); // 在uv_pipe_connect中完成关联的 WrapType* wrap = static_cast<WrapType*>(req->handle->data); Environment* env = wrap->env(); HandleScope handle_scope(env->isolate()); Context::Scope context_scope(env->context()); bool readable, writable; // 是否连接成功 if (status) { readable = writable = 0; } else { readable = uv_is_readable(req->handle) != 0; writable = uv_is_writable(req->handle) != 0; } Local<Value> argv[5] = { Integer::New(env->isolate(), status), wrap->object(), req_wrap->object(), Boolean::New(env->isolate(), readable), Boolean::New(env->isolate(), writable) }; // 执行JS层的oncomplete回调 req_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv); delete req_wrap; }
我们再回到JS层的afterConnect
function afterConnect(status, handle, req, readable, writable) { var self = handle.owner; handle = self._handle; if (status === 0) { self.readable = readable; self.writable = writable; self._unrefTimer(); // 触发connect事件 self.emit('connect'); // 可读并且没有处于暂停模式,则注册等待可读事件 if (readable && !self.isPaused()) self.read(0); } }
至此,作为客户端对服务器的连接就完成了。后续就可以进行通信。
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8