流的实现在Libuv里占了很大的篇幅,是非常核心的逻辑。流的本质是封装了对文件描述符的操作,例如读、写,连接、监听。我们首先看看数据结构,流在Libuv里用uv_stream_s表示,继承于uv_handle_s。
struct uv_stream_s { // uv_handle_s的字段 void* data; // 所属事件循环 uv_loop_t* loop; // handle类型 uv_handle_type type; // 关闭handle时的回调 uv_close_cb close_cb; // 用于插入事件循环的handle队列 void* handle_queue[2]; union { int fd; void* reserved[4]; } u; // 用于插入事件循环的closing阶段 uv_handle_t* next_closing; // 各种标记 unsigned int flags; // 流拓展的字段 /* 户写入流的字节大小,流缓存用户的输入, 然后等到可写的时候才执行真正的写 */ size_t write_queue_size; // 分配内存的函数,内存由用户定义,用来保存读取的数据 uv_alloc_cb alloc_cb; // 读回调 uv_read_cb read_cb; // 连接请求对应的结构体 uv_connect_t *connect_req; /* 关闭写端的时候,发送完缓存的数据, 执行shutdown_req的回调(shutdown_req在uv_shutdown的时候赋值) */ uv_shutdown_t *shutdown_req; /* 流对应的IO观察者 */ uv__io_t io_watcher; // 缓存待写的数据,该字段用于插入队列 void* write_queue[2]; // 已经完成了数据写入的队列,该字段用于插入队列 void* write_completed_queue[2]; // 有连接到来并且完成三次握手后,执行的回调 uv_connection_cb connection_cb; // 操作流时出错码 int delayed_error; // accept返回的通信socket对应的文件描述 int accepted_fd; // 同上,用于IPC时,缓存多个传递的文件描述符 void* queued_fds; }
流的实现中,最核心的字段是IO观察者,其余的字段是和流的性质相关的。IO观察者封装了流对应的文件描述符和文件描述符事件触发时的回调。比如读一个流、写一个流、关闭一个流、连接一个流、监听一个流,在uv_stream_s中都有对应的字段去支持。但是本质上是靠IO观察者去驱动的。
1 读一个流,就是IO观察者中的文件描述符的可读事件触发时,执行用户的读回调。 2 写一个流,先把数据写到流中,等到IO观察者中的文件描述符可写事件触发时,执行真正的写入,并执行用户的写结束回调。 3 关闭一个流,就是IO观察者中的文件描述符可写事件触发时,就会执行关闭流的写端。如果流中还有数据没有写完,则先写完(比如发送)后再执行关闭操作,接着执行用户的回调。 4 连接流,比如作为客户端去连接服务器。就是IO观察者中的文件描述符可读事件触发时(比如建立三次握手成功),执行用户的回调。 5 监听流,就是IO观察者中的文件描述符可读事件触发时(比如有完成三次握手的连接),执行用户的回调。
下面我们看一下流的具体实现
在使用uv_stream_t之前需要首先初始化,我们看一下如何初始化一个流。
void uv__stream_init(uv_loop_t* loop, uv_stream_t* stream, uv_handle_type type) { int err; // 记录handle的类型 uv__handle_init(loop, (uv_handle_t*)stream, type); stream->read_cb = NULL; stream->alloc_cb = NULL; stream->close_cb = NULL; stream->connection_cb = NULL; stream->connect_req = NULL; stream->shutdown_req = NULL; stream->accepted_fd = -1; stream->queued_fds = NULL; stream->delayed_error = 0; QUEUE_INIT(&stream->write_queue); QUEUE_INIT(&stream->write_completed_queue); stream->write_queue_size = 0; /* 初始化IO观察者,把文件描述符(这里还没有,所以是-1)和 回调uv__stream_io记录在io_watcher上,fd的事件触发时,统一 由uv__stream_io函数处理,但也会有特殊情况(下面会讲到) */ uv__io_init(&stream->io_watcher, uv__stream_io, -1); }
初始化一个流的逻辑很简单明了,就是初始化相关的字段,需要注意的是初始化IO观察者时,设置的处理函数是uv__stream_io,后面我们会分析这个函数的具体逻辑。
int uv__stream_open(uv_stream_t* stream, int fd, int flags) { // 还没有设置fd或者设置的同一个fd则继续,否则返回UV_EBUSY if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd)) return UV_EBUSY; // 设置流的标记 stream->flags |= flags; // 是TCP流则可以设置下面的属性 if (stream->type == UV_TCP) { // 关闭nagle算法 if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1)) return UV__ERR(errno); /* 开启keepalive机制 */ if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) && uv__tcp_keepalive(fd, 1, 60)) { return UV__ERR(errno); } } /* 保存socket对应的文件描述符到IO观察者中,Libuv会在 Poll IO阶段监听该文件描述符 */ stream->io_watcher.fd = fd; return 0; }
打开一个流,本质上就是给这个流关联一个文件描述符,后续的操作的时候都是基于这个文件描述符的,另外还有一些属性的设置。
我们在一个流上执行uv_read_start后,流的数据(如果有的话)就会通过read_cb回调源源不断地流向调用方。
int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, uv_read_cb read_cb) { // 流已经关闭,不能读 if (stream->flags & UV_HANDLE_CLOSING) return UV_EINVAL; // 流不可读,说明可能是只写流 if (!(stream->flags & UV_HANDLE_READABLE)) return -ENOTCONN; // 标记正在读 stream->flags |= UV_HANDLE_READING; // 记录读回调,有数据的时候会执行这个回调 stream->read_cb = read_cb; // 分配内存函数,用于存储读取的数据 stream->alloc_cb = alloc_cb; // 注册等待读事件 uv__io_start(stream->loop, &stream->io_watcher, POLLIN); // 激活handle,有激活的handle,事件循环不会退出 uv__handle_start(stream); return 0; }
执行uv_read_start本质上是给流对应的文件描述符在epoll中注册了一个等待可读事件,并记录相应的上下文,比如读回调函数,分配内存的函数。接着打上正在做读取操作的标记。当可读事件触发的时候,读回调就会被执行,除了读取数据,还有一个读操作就是停止读取。对应的函数是uv_read_stop。
int uv_read_stop(uv_stream_t* stream) { // 是否正在执行读取操作,如果不是,则没有必要停止 if (!(stream->flags & UV_HANDLE_READING)) return 0; // 清除正在读取的标记 stream->flags &= ~UV_HANDLE_READING; // 撤销等待读事件 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); // 对写事件也不感兴趣,停掉handle。允许事件循环退出 if (!uv__io_active(&stream->io_watcher, POLLOUT)) uv__handle_stop(stream); stream->read_cb = NULL; stream->alloc_cb = NULL; return 0; }
另外还有一个辅助函数,判断流是否设置了可读属性。
int uv_is_readable(const uv_stream_t* stream) { return !!(stream->flags & UV_HANDLE_READABLE); }
上面的函数只是注册和注销读事件,如果可读事件触发的时候,我们还需要自己去读取数据,我们看一下真正的读逻辑
static void uv__read(uv_stream_t* stream) { uv_buf_t buf; ssize_t nread; struct msghdr msg; char cmsg_space[CMSG_SPACE(UV__CMSG_FD_SIZE)]; int count; int err; int is_ipc; // 清除读取部分标记 stream->flags &= ~UV_STREAM_READ_PARTIAL; count = 32; /* 流是Unix域类型并且用于IPC,Unix域不一定用于IPC, 用作IPC可以支持传递文件描述符 */ is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc; // 设置了读回调,正在读,count大于0 while (stream->read_cb && (stream->flags & UV_STREAM_READING) && (count-- > 0)) { buf = uv_buf_init(NULL, 0); // 调用调用方提供的分配内存函数,分配内存承载数据 stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf); /* 不是IPC则直接读取数据到buf,否则用recvmsg读取数据 和传递的文件描述符(如果有的话) */ if (!is_ipc) { do { nread = read(uv__stream_fd(stream), buf.base, buf.len); } while (nread < 0 && errno == EINTR); } else { /* ipc uses recvmsg */ msg.msg_flags = 0; msg.msg_iov = (struct iovec*) &buf; msg.msg_iovlen = 1; msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_controllen = sizeof(cmsg_space); msg.msg_control = cmsg_space; do { nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0); } while (nread < 0 && errno == EINTR); } // 读失败 if (nread < 0) { // 读繁忙 if (errno == EAGAIN || errno == EWOULDBLOCK) { // 执行读回调 stream->read_cb(stream, 0, &buf); } else { /* Error. User should call uv_close(). */ // 读失败 stream->read_cb(stream, -errno, &buf); } return; } else if (nread == 0) { // 读到结尾了 uv__stream_eof(stream, &buf); return; } else { // 读成功,读取数据的长度 ssize_t buflen = buf.len; /* 是IPC则解析读取的数据,把文件描述符解析出来, 放到stream的accepted_fd和queued_fds字段 */ if (is_ipc) { err = uv__stream_recv_cmsg(stream, &msg); if (err != 0) { stream->read_cb(stream, err, &buf); return; } } // 执行读回调 stream->read_cb(stream, nread, &buf); } } }
uv_read除了可以读取一般的数据外,还支持读取传递的文件描述符。我们看一下描述符传递的原理。我们知道,父进程fork出子进程的时候,子进程是继承父进程的文件描述符列表的。我们看一下进程和文件描述符的关系。 fork之前如图5-1所示。
我们再看一下fork之后的结构如图5-2所示。
如果父进程或者子进程在fork之后创建了新的文件描述符,那父子进程间就不能共享了,假设父进程要把一个文件描述符传给子进程,那怎么办呢?根据进程和文件描述符的关系。传递文件描述符要做的事情,不仅仅是在子进程中新建一个fd,还要建立起fd->file->inode的关联,不过我们不需要关注这些,因为操作系统都帮我们处理了,我们只需要通过sendmsg把想传递的文件描述符发送给Unix域的另一端。Unix域另一端就可以通过recvmsg把文件描述符从数据中读取出来。接着使用uv__stream_recv_cmsg函数保存数据里解析出来的文件描述符。
static int uv__stream_recv_cmsg(uv_stream_t* stream, struct msghdr* msg) { struct cmsghdr* cmsg; // 遍历msg for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) { char* start; char* end; int err; void* pv; int* pi; unsigned int i; unsigned int count; pv = CMSG_DATA(cmsg); pi = pv; start = (char*) cmsg; end = (char*) cmsg + cmsg->cmsg_len; count = 0; while (start + CMSG_LEN(count * sizeof(*pi)) < end) count++; for (i = 0; i < count; i++) { /* accepted_fd代表当前待处理的文件描述符, 如果已经有值则剩余描述符就通过uv__stream_queue_fd排队 如果还没有值则先赋值 */ if (stream->accepted_fd != -1) { err = uv__stream_queue_fd(stream, pi[i]); } else { stream->accepted_fd = pi[i]; } } } return 0; }
uv__stream_recv_cmsg会从数据中解析出一个个文件描述符存到stream中,第一个文件描述符保存在accepted_fd,剩下的使用uv__stream_queue_fd处理。
struct uv__stream_queued_fds_s { unsigned int size; unsigned int offset; int fds[1]; }; static int uv__stream_queue_fd(uv_stream_t* stream, int fd) { uv__stream_queued_fds_t* queued_fds; unsigned int queue_size; // 原来的内存 queued_fds = stream->queued_fds; // 没有内存,则分配 if (queued_fds == NULL) { // 默认8个 queue_size = 8; /* 一个元数据内存+多个fd的内存 (前面加*代表解引用后的值的类型所占的内存大小, 减一是因为uv__stream_queued_fds_t 结构体本身有一个空间) */ queued_fds = uv__malloc((queue_size - 1) * sizeof(*queued_fds->fds) + sizeof(*queued_fds)); if (queued_fds == NULL) return UV_ENOMEM; // 容量 queued_fds->size = queue_size; // 已使用个数 queued_fds->offset = 0; // 指向可用的内存 stream->queued_fds = queued_fds; // 之前的内存用完了,扩容 } else if (queued_fds->size == queued_fds->offset) { // 每次加8个 queue_size = queued_fds->size + 8; queued_fds = uv__realloc(queued_fds, (queue_size - 1) * sizeof(*queued_fds->fds) + sizeof(*queued_fds)); if (queued_fds == NULL) return UV_ENOMEM; // 更新容量大小 queued_fds->size = queue_size; // 保存新的内存 stream->queued_fds = queued_fds; } /* Put fd in a queue */ // 保存fd queued_fds->fds[queued_fds->offset++] = fd; return 0; }
内存结构如图5-3所示。
最后我们看一下读结束后的处理,
static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) { // 打上读结束标记 stream->flags |= UV_STREAM_READ_EOF; // 注销等待可读事件 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); // 没有注册等待可写事件则停掉handle,否则会影响事件循环的退出 if (!uv__io_active(&stream->io_watcher, POLLOUT)) uv__handle_stop(stream); uv__stream_osx_interrupt_select(stream); // 执行读回调 stream->read_cb(stream, UV_EOF, buf); // 清除正在读标记 stream->flags &= ~UV_STREAM_READING; }
我们看到,流结束的时候,首先注销等待可读事件,然后通过回调通知上层。
我们在流上执行uv_write就可以往流中写入数据。
int uv_write( /* 一个写请求,记录了需要写入的数据和信息。 数据来自下面的const uv_buf_t bufs[] */ uv_write_t* req, // 往哪个流写 uv_stream_t* handle, // 需要写入的数据 const uv_buf_t bufs[], // uv_buf_t个数 unsigned int nbufs, // 写完后执行的回调 uv_write_cb cb ) { return uv_write2(req, handle, bufs, nbufs, NULL, cb); }
uv_write是直接调用uv_write2。第四个参数是NULL。代表是一般的写数据,不传递文件描述符。
int uv_write2(uv_write_t* req, uv_stream_t* stream, const uv_buf_t bufs[], unsigned int nbufs, uv_stream_t* send_handle, uv_write_cb cb) { int empty_queue; // 待发送队列是否为空 empty_queue = (stream->write_queue_size == 0); // 构造一个写请求 uv__req_init(stream->loop, req, UV_WRITE); // 写请求对应的回调 req->cb = cb; // 写请求对应的流 req->handle = stream; req->error = 0; // 需要发送的文件描述符,也可以是NULL说明不需要发送文件描述符 req->send_handle = send_handle; QUEUE_INIT(&req->queue); // bufs指向待写的数据 req->bufs = req->bufsml; // 复制调用方的数据过来 memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0])); // buf个数 req->nbufs = nbufs; // 当前写成功的buf索引,针对bufs数组 req->write_index = 0; // 待写的数据大小 = 之前的大小 + 本次大小 stream->write_queue_size += uv__count_bufs(bufs, nbufs); // 插入待写队列 QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue); // 非空说明正在连接,还不能写,比如TCP流 if (stream->connect_req) { /* Still connecting, do nothing. */ } else if (empty_queue) { // 当前待写队列为空,直接写 uv__write(stream); } else { // 还有数据没有写完,注册等待写事件 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT); uv__stream_osx_interrupt_select(stream); } return 0; }
uv_write2的主要逻辑就是封装一个写请求,插入到流的待写队列。然后根据当前流的情况。看是直接写入还是等待会再写入。关系图大致如图5-4所示。
uv_write2只是对写请求进行一些预处理,真正执行写的函数是uv__write
static void uv__write(uv_stream_t* stream) { struct iovec* iov; QUEUE* q; uv_write_t* req; int iovmax; int iovcnt; ssize_t n; int err; start: // 没有数据需要写 if (QUEUE_EMPTY(&stream->write_queue)) return; q = QUEUE_HEAD(&stream->write_queue); req = QUEUE_DATA(q, uv_write_t, queue); // 从哪里开始写 iov = (struct iovec*) &(req->bufs[req->write_index]); // 还有多少没写 iovcnt = req->nbufs - req->write_index; // 最多可以写多少 iovmax = uv__getiovmax(); // 取最小值 if (iovcnt > iovmax) iovcnt = iovmax; // 需要传递文件描述符 if (req->send_handle) { int fd_to_send; struct msghdr msg; struct cmsghdr *cmsg; union { char data[64]; struct cmsghdr alias; } scratch; if (uv__is_closing(req->send_handle)) { err = -EBADF; goto error; } // 待发送的文件描述符 fd_to_send = uv__handle_fd((uv_handle_t*) req->send_handle); memset(&scratch, 0, sizeof(scratch)); msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_iov = iov; msg.msg_iovlen = iovcnt; msg.msg_flags = 0; msg.msg_control = &scratch.alias; msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send)); cmsg = CMSG_FIRSTHDR(&msg); cmsg->cmsg_level = SOL_SOCKET; cmsg->cmsg_type = SCM_RIGHTS; cmsg->cmsg_len = CMSG_LEN(sizeof(fd_to_send)); { void* pv = CMSG_DATA(cmsg); int* pi = pv; *pi = fd_to_send; } do { // 使用sendmsg函数发送文件描述符 n = sendmsg(uv__stream_fd(stream), &msg, 0); } while (n == -1 && errno == EINTR); } else { do { // 写一个或者写批量写 if (iovcnt == 1) { n = write(uv__stream_fd(stream), iov[0].iov_base, iov[0].iov_len); } else { n = writev(uv__stream_fd(stream), iov, iovcnt); } } while (n == -1 && errno == EINTR); } // 写失败 if (n < 0) { /* 不是写繁忙,则报错, 否则如果设置了同步写标记,则继续尝试写 */ if (errno != EAGAIN && errno != EWOULDBLOCK && errno != ENOBUFS) { err = -errno; goto error; } else if (stream->flags & UV_STREAM_BLOCKING) { /* If this is a blocking stream, try again. */ goto start; } } else { // 写成功 while (n >= 0) { // 当前buf首地址 uv_buf_t* buf = &(req->bufs[req->write_index]); // 当前buf的数据长度 size_t len = buf->len; // 小于说明当前buf还没有写完(还没有被消费完) if ((size_t)n < len) { // 更新待写的首地址 buf->base += n; // 更新待写的数据长度 buf->len -= n; /* 更新待写队列的长度,这个队列是待写数据的 总长度,等于多个buf的和 */ stream->write_queue_size -= n; n = 0; /* 还没写完,设置了同步写,则继续尝试写, 否则退出,注册待写事件 */ if (stream->flags & UV_STREAM_BLOCKING) { goto start; } else { break; } } else { /* 当前buf的数据都写完了,则更新待写数据的的首 地址,即下一个buf,因为当前buf写完了 */ req->write_index++; // 更新n,用于下一个循环的计算 n -= len; // 更新待写队列的长度 stream->write_queue_size -= len; /* 等于最后一个buf了,说明待写队列的数据 都写完了 */ if (req->write_index == req->nbufs) { /* 释放buf对应的内存,并把请求插入写完成 队列,然后准备触发写完成回调 */ uv__write_req_finish(req); return; } } } } /* 写成功,但是还没有写完,注册待写事件, 等待可写的时候继续写 */ uv__io_start(stream->loop, &stream->io_watcher, POLLOUT); uv__stream_osx_interrupt_select(stream); return; // 写出错 error: // 记录错误 req->error = err; /* 释放内存,丢弃数据,插入写完成队列, 把IO观察者插入pending队列,等待pending阶段执行回调 */ uv__write_req_finish(req); // 注销待写事件 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); // 如果也没有注册等待可读事件,则把handle关闭 if (!uv__io_active(&stream->io_watcher, POLLIN)) uv__handle_stop(stream); uv__stream_osx_interrupt_select(stream); }
我们看一下一个写请求结束后(成功或者失败),Libuv如何处理的。逻辑在uv__write_req_finish函数。
static void uv__write_req_finish(uv_write_t* req) { uv_stream_t* stream = req->handle; // 从待写队列中移除 QUEUE_REMOVE(&req->queue); // 写成功,并且分配了额外的堆内存,则需要释放,见uv__write if (req->error == 0) { if (req->bufs != req->bufsml) uv__free(req->bufs); req->bufs = NULL; } // 插入写完成队列 QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue); /* 把IO观察者插入pending队列,Libuv在处理pending阶段时, 会触发IO观察者的写事件 */ uv__io_feed(stream->loop, &stream->io_watcher); }
uv__write_req_finish的逻辑比较简单
1把节点从待写队列中移除 2 req->bufs != req->bufsml不相等说明分配了堆内存,需要自己释放 3并把请求插入写完成队列,把IO观察者插入pending队列,等待pending阶段执行回调,在pending节点会执行IO观察者的回调(uv__stream_io)。
我们看一下uv__stream_io如何处理的,下面是具体的处理逻辑。
// 可写事件触发 if (events & (POLLOUT | POLLERR | POLLHUP)) { // 继续执行写 uv__write(stream); // 处理写成功回调 uv__write_callbacks(stream); // 待写队列空,注销等待可写事件,即不需要写了 if (QUEUE_EMPTY(&stream->write_queue)) uv__drain(stream); }
我们只关注uv__write_callbacks。
static void uv__write_callbacks(uv_stream_t* stream) { uv_write_t* req; QUEUE* q; // 写完成队列非空 while (!QUEUE_EMPTY(&stream->write_completed_queue)) { q = QUEUE_HEAD(&stream->write_completed_queue); req = QUEUE_DATA(q, uv_write_t, queue); QUEUE_REMOVE(q); uv__req_unregister(stream->loop, req); // bufs的内存还没有被释放 if (req->bufs != NULL) { // 更新待写队列的大小,即减去req对应的所有数据的大小 stream->write_queue_size -= uv__write_req_size(req); /* bufs默认指向bufsml,超过默认大小时, bufs指向新申请的堆内存,所以需要释放 */ if (req->bufs != req->bufsml) uv__free(req->bufs); req->bufs = NULL; } // 执行回调 if (req->cb) req->cb(req, req->error); } }
uv__write_callbacks负责更新流的待写队列大小、释放额外申请的堆内存、执行每个写请求的回调。
// 关闭流的写端 int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) { // 初始化一个关闭请求,关联的handle是stream uv__req_init(stream->loop, req, UV_SHUTDOWN); req->handle = stream; // 关闭后执行的回调 req->cb = cb; stream->shutdown_req = req; // 设置正在关闭的标记 stream->flags |= UV_HANDLE_SHUTTING; // 注册等待可写事件 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT); return 0; }
关闭流的写端就是相当于给流发送一个关闭请求,把请求挂载到流中,然后注册等待可写事件,在可写事件触发的时候就会执行关闭操作。在分析写流的章节中我们提到,可写事件触发的时候,会执行uvdrain注销等待可写事件,除此之外,uvdrain还做了一个事情,就是关闭流的写端。我们看看具体的逻辑。
static void uv__drain(uv_stream_t* stream) { uv_shutdown_t* req; int err; // 撤销等待可写事件,因为没有数据需要写入了 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); uv__stream_osx_interrupt_select(stream); // 设置了关闭写端,但是还没有关闭,则执行关闭写端 if ((stream->flags & UV_HANDLE_SHUTTING) && !(stream->flags & UV_HANDLE_CLOSING) && !(stream->flags & UV_HANDLE_SHUT)) { req = stream->shutdown_req; stream->shutdown_req = NULL; // 清除标记 stream->flags &= ~UV_HANDLE_SHUTTING; uv__req_unregister(stream->loop, req); err = 0; // 关闭写端 if (shutdown(uv__stream_fd(stream), SHUT_WR)) err = UV__ERR(errno); // 标记已关闭写端 if (err == 0) stream->flags |= UV_HANDLE_SHUT; // 执行回调 if (req->cb != NULL) req->cb(req, err); } }
通过调用shutdown关闭流的写端,比如TCP流发送完数据后可以关闭写端。但是仍然可以读。
void uv__stream_close(uv_stream_t* handle) { unsigned int i; uv__stream_queued_fds_t* queued_fds; // 从事件循环中删除IO观察者,移出pending队列 uv__io_close(handle->loop, &handle->io_watcher); // 停止读 uv_read_stop(handle); // 停掉handle uv__handle_stop(handle); // 不可读、写 handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE); // 关闭非标准流的文件描述符 if (handle->io_watcher.fd != -1) { /* Don't close stdio file descriptors. Nothing good comes from it. */ if (handle->io_watcher.fd > STDERR_FILENO) uv__close(handle->io_watcher.fd); handle->io_watcher.fd = -1; } // 关闭通信socket对应的文件描述符 if (handle->accepted_fd != -1) { uv__close(handle->accepted_fd); handle->accepted_fd = -1; } // 同上,这是在排队等待处理的文件描述符 if (handle->queued_fds != NULL) { queued_fds = handle->queued_fds; for (i = 0; i < queued_fds->offset; i++) uv__close(queued_fds->fds[i]); uv__free(handle->queued_fds); handle->queued_fds = NULL; } }
关闭流就是把流注册在epoll的事件注销,关闭所持有的文件描述符。
连接流是针对TCP和Unix域的,所以我们首先介绍一下一些网络编程相关的内容,首先我们先要有一个socket。我们看Libuv中如何新建一个socket。
int uv__socket(int domain, int type, int protocol) { int sockfd; int err; // 新建一个socket,并设置非阻塞和LOEXEC标记 sockfd = socket(domain, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol); // 不触发SIGPIPE信号,比如对端已经关闭,本端又执行写 #if defined(SO_NOSIGPIPE) { int on = 1; setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &on, sizeof(on)); } #endif return sockfd; }
在Libuv中,socket的模式都是非阻塞的,uv__socket是Libuv中申请socket的函数,不过Libuv不直接调用该函数,而是封装了一下。
/* 1 获取一个新的socket fd 2 把fd保存到handle里,并根据flag进行相关设置 3 绑定到本机随意的地址(如果设置了该标记的话) */ static int new_socket(uv_tcp_t* handle, int domain, unsigned long flags) { struct sockaddr_storage saddr; socklen_t slen; int sockfd; // 获取一个socket sockfd = uv__socket(domain, SOCK_STREAM, 0); // 设置选项和保存socket的文件描述符到IO观察者中 uv__stream_open((uv_stream_t*) handle, sockfd, flags); // 设置了需要绑定标记UV_HANDLE_BOUND if (flags & UV_HANDLE_BOUND) { slen = sizeof(saddr); memset(&saddr, 0, sizeof(saddr)); // 获取fd对应的socket信息,比如IP,端口,可能没有 getsockname(uv__stream_fd(handle), (struct sockaddr*) &saddr, &slen); // 绑定到socket中,如果没有则绑定到系统随机选择的地址 bind(uv__stream_fd(handle),(struct sockaddr*) &saddr, slen); } return 0; }
上面的代码就是在Libuv申请一个socket的逻辑,另外它还支持新建的socket,可以绑定到一个用户设置的,或者操作系统随机选择的地址。不过Libuv并不直接使用这个函数。而是又封装了一层。
// 如果流还没有对应的fd,则申请一个新的,如果有则修改流的配置 static int maybe_new_socket(uv_tcp_t* handle, int domain, unsigned long flags) { struct sockaddr_storage saddr; socklen_t slen; // 已经有fd了 if (uv__stream_fd(handle) != -1) { // 该流需要绑定到一个地址 if (flags & UV_HANDLE_BOUND) { /* 流是否已经绑定到一个地址了。handle的flag是在 new_socket里设置的,如果有这个标记说明已经执行过绑定了, 直接更新flags就行。 */ if (handle->flags & UV_HANDLE_BOUND) { handle->flags |= flags; return 0; } // 有fd,但是可能还没绑定到一个地址 slen = sizeof(saddr); memset(&saddr, 0, sizeof(saddr)); // 获取socket绑定到的地址 if (getsockname(uv__stream_fd(handle), (struct sockaddr*) &saddr, &slen)) return UV__ERR(errno); // 绑定过了socket地址,则更新flags就行 if ((saddr.ss_family == AF_INET6 && ((struct sockaddr_in6*) &saddr)->sin6_port != 0) || (saddr.ss_family == AF_INET && ((struct sockaddr_in*) &saddr)->sin_port != 0)) { handle->flags |= flags; return 0; } // 没绑定则绑定到随机地址,bind中实现 if (bind(uv__stream_fd(handle), (struct sockaddr*) &saddr, slen)) return UV__ERR(errno); } handle->flags |= flags; return 0; } // 申请一个新的fd关联到流 return new_socket(handle, domain, flags); }
maybe_new_socket函数的逻辑分支很多,主要如下 1 如果流还没有关联到fd,则申请一个新的fd关联到流上 2 如果流已经关联了一个fd。 如果流设置了绑定地址的标记,但是已经通过Libuv绑定了一个地址(Libuv会设置UV_HANDLE_BOUND标记,用户也可能是直接调bind函数绑定了)。则不需要再次绑定,更新flags就行。 如果流设置了绑定地址的标记,但是还没有通过Libuv绑定一个地址,这时候通过getsocketname判断用户是否自己通过bind函数绑定了一个地址,是的话则不需要再次执行绑定操作。否则随机绑定到一个地址。
以上两个函数的逻辑主要是申请一个socket和给socket绑定一个地址。下面我们开看一下连接流的实现。
int uv__tcp_connect(uv_connect_t* req, uv_tcp_t* handle, const struct sockaddr* addr, unsigned int addrlen, uv_connect_cb cb) { int err; int r; // 已经发起了connect了 if (handle->connect_req != NULL) return UV_EALREADY; // 申请一个socket和绑定一个地址,如果还没有的话 err = maybe_new_socket(handle, addr->sa_family, UV_HANDLE_READABLE | UV_HANDLE_WRITABLE if (err) return err; handle->delayed_error = 0; do { // 清除全局错误变量的值 errno = 0; // 非阻塞发起三次握手 r = connect(uv__stream_fd(handle), addr, addrlen); } while (r == -1 && errno == EINTR); if (r == -1 && errno != 0) { // 三次握手还没有完成 if (errno == EINPROGRESS) ; /* not an error */ else if (errno == ECONNREFUSED) // 对方拒绝建立连接,延迟报错 handle->delayed_error = UV__ERR(errno); else // 直接报错 return UV__ERR(errno); } // 初始化一个连接型request,并设置某些字段 uv__req_init(handle->loop, req, UV_CONNECT); req->cb = cb; req->handle = (uv_stream_t*) handle; QUEUE_INIT(&req->queue); // 连接请求 handle->connect_req = req; // 注册到Libuv观察者队列 uv__io_start(handle->loop, &handle->io_watcher, POLLOUT); // 连接出错,插入pending队尾 if (handle->delayed_error) uv__io_feed(handle->loop, &handle->io_watcher); return 0; }
连接流的逻辑,大致如下 1 申请一个socket,绑定一个地址。 2 根据给定的服务器地址,发起三次握手,非阻塞的,会直接返回继续执行,不会等到三次握手完成。 3 往流上挂载一个connect型的请求。 4 设置IO观察者感兴趣的事件为可写。然后把IO观察者插入事件循环的IO观察者队列。等待可写的时候时候(完成三次握手),就会执行cb回调。
可写事件触发时,会执行uv__stream_io,我们看一下具体的逻辑。
if (stream->connect_req) { uv__stream_connect(stream); return; }
我们继续看uv__stream_connect。
static void uv__stream_connect(uv_stream_t* stream) { int error; uv_connect_t* req = stream->connect_req; socklen_t errorsize = sizeof(int); // 连接出错 if (stream->delayed_error) { error = stream->delayed_error; stream->delayed_error = 0; } else { // 还是需要判断一下是不是出错了 getsockopt(uv__stream_fd(stream), SOL_SOCKET, SO_ERROR, &error, &errorsize); error = UV__ERR(error); } // 还没连接成功,先返回,等待下次可写事件的触发 if (error == UV__ERR(EINPROGRESS)) return; // 清空 stream->connect_req = NULL; uv__req_unregister(stream->loop, req); /* 连接出错则注销之前注册的等待可写队列, 连接成功如果待写队列为空,也注销事件,有数据需要写的时候再注册 */ if (error < 0 || QUEUE_EMPTY(&stream->write_queue)) { uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); } // 执行回调,通知上层连接结果 if (req->cb) req->cb(req, error); if (uv__stream_fd(stream) == -1) return; // 连接失败,清空待写的数据和执行写请求的回调(如果有的话) if (error < 0) { uv__stream_flush_write_queue(stream, UV_ECANCELED); uv__write_callbacks(stream); } }
连接流的逻辑是 1发起非阻塞式连接 2 注册等待可写事件 3 可写事件触发时,把连接结果告诉调用方 4 连接成功则发送写队列的数据,连接失败则清除写队列的数据并执行每个写请求的回调(有的话)。
监听流是针对TCP或Unix域的,主要是把一个socket变成listen状态。并且设置一些属性。
int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) { static int single_accept = -1; unsigned long flags; int err; if (tcp->delayed_error) return tcp->delayed_error; // 是否设置了不连续accept。默认是连续accept。 if (single_accept == -1) { const char* val = getenv("UV_TCP_SINGLE_ACCEPT"); single_accept = (val != NULL && atoi(val) != 0); } // 设置不连续accept if (single_accept) tcp->flags |= UV_HANDLE_TCP_SINGLE_ACCEPT; flags = 0; /* 可能还没有用于listen的fd,socket地址等。 这里申请一个socket和绑定到一个地址 (如果调listen之前没有调bind则绑定到随机地址) */ err = maybe_new_socket(tcp, AF_INET, flags); if (err) return err; // 设置fd为listen状态 if (listen(tcp->io_watcher.fd, backlog)) return UV__ERR(errno); // 建立连接后的业务回调 tcp->connection_cb = cb; tcp->flags |= UV_HANDLE_BOUND; // 设置io观察者的回调,由epoll监听到连接到来时执行 tcp->io_watcher.cb = uv__server_io; /* 插入观察者队列,这时候还没有增加到epoll, Poll IO阶段再遍历观察者队列进行处理(epoll_ctl) */ uv__io_start(tcp->loop, &tcp->io_watcher, POLLIN); return 0; }
监听流的逻辑看起来很多,但是主要的逻辑是把流对的fd改成listen状态,这样流就可以接收连接请求了。接着设置连接到来时执行的回调。最后注册IO观察者到事件循环。等待连接到来。就会执行uvserver_io。uvserver_io再执行connection_cb。监听流和其它流有一个区别是,当IO观察者的事件触发时,监听流执行的回调是uvserver_io函数。而其它流是在uvstream_io里统一处理。我们看一下连接到来或者Unix域上有数据到来时的处理逻辑。
void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { uv_stream_t* stream; int err; stream = container_of(w, uv_stream_t, io_watcher); // 注册等待可读事件 uv__io_start(stream->loop, &stream->io_watcher, POLLIN); while (uv__stream_fd(stream) != -1) { /* 通过accept拿到和客户端通信的fd,我们看到这个 fd和服务器的fd是不一样的 */ err = uv__accept(uv__stream_fd(stream)); // 错误处理 if (err < 0) { /* uv__stream_fd(stream)对应的fd是非阻塞的, 返回这个错说明没有连接可用accept了,直接返回 */ if (err == -EAGAIN || err == -EWOULDBLOCK) return; /* Not an error. */ if (err == -ECONNABORTED) continue; // 进程的打开的文件描述符个数达到阈值,看是否有备用的 if (err == -EMFILE || err == -ENFILE) { err = uv__emfile_trick(loop, uv__stream_fd(stream)); if (err == -EAGAIN || err == -EWOULDBLOCK) break; } // 发生错误,执行回调 stream->connection_cb(stream, err); continue; } // 记录拿到的通信socket对应的fd stream->accepted_fd = err; // 执行上传回调 stream->connection_cb(stream, 0); /* stream->accepted_fd为-1说明在回调connection_cb里已经消费 了 accepted_fd,否则先注销服务器在epoll中的fd的读事件,等 待消费后再注册,即不再处理请求了 */ if (stream->accepted_fd != -1) { /* The user hasn't yet accepted called uv_accept() */ uv__io_stop(loop, &stream->io_watcher, POLLIN); return; } /* 是TCP类型的流并且设置每次只accpet一个连接,则定时阻塞, 被唤醒后再accept,否则一直accept(如果用户在connect回 调里消费了accept_fd的话),定时阻塞用于多进程竞争处理连接 */ if (stream->type == UV_TCP && (stream->flags & UV_TCP_SINGLE_ACCEPT)) { struct timespec timeout = { 0, 1 }; nanosleep(&timeout, NULL); } } }
我们看到连接到来时,Libuv会从已完成连接的队列中摘下一个节点,然后执行connection_cb回调。在connection_cb回调里,需要uv_accept消费accpet_fd。
int uv_accept(uv_stream_t* server, uv_stream_t* client) { int err; switch (client->type) { case UV_NAMED_PIPE: case UV_TCP: // 把文件描述符保存到client err = uv__stream_open(client, server->accepted_fd, UV_STREAM_READABLE | UV_STREAM_WRITABLE); if (err) { uv__close(server->accepted_fd); goto done; } break; case UV_UDP: err = uv_udp_open((uv_udp_t*) client, server->accepted_fd); if (err) { uv__close(server->accepted_fd); goto done; } break; default: return -EINVAL; } client->flags |= UV_HANDLE_BOUND; done: // 非空则继续放一个到accpet_fd中等待accept,用于文件描述符传递 if (server->queued_fds != NULL) { uv__stream_queued_fds_t* queued_fds; queued_fds = server->queued_fds; // 把第一个赋值到accept_fd server->accepted_fd = queued_fds->fds[0]; /* offset减去一个单位,如果没有了,则释放内存, 否则需要把后面的往前挪,offset执行最后一个 */ if (--queued_fds->offset == 0) { uv__free(queued_fds); server->queued_fds = NULL; } else { memmove(queued_fds->fds, queued_fds->fds + 1, queued_fds->offset * sizeof(*queued_fds->fds)); } } else { // 没有排队的fd了,则注册等待可读事件,等待accept新的fd server->accepted_fd = -1; if (err == 0) uv__io_start(server->loop, &server->io_watcher, POLLIN); } return err; }
client是用于和客户端进行通信的流,accept就是把accept_fd保存到client中,client就可以通过fd和对端进行通信了。消费完accept_fd后,如果还有待处理的fd的话,需要补充一个到accept_fd(针对Unix域),其它的继续排队等待处理,如果没有待处理的fd则注册等待可读事件,继续处理新的连接。
当我们不再需要一个流的时候,我们会首先调用uv_close关闭这个流,关闭流只是注销了事件和释放了文件描述符,调用uv_close之后,流对应的结构体就会被加入到closing队列,在closing阶段的时候,才会执行销毁流的操作,比如丢弃还没有写完成的数据,执行对应流的回调,我们看看销毁流的函数uv__stream_destroy。
void uv__stream_destroy(uv_stream_t* stream) { // 正在连接,则执行回调 if (stream->connect_req) { uv__req_unregister(stream->loop, stream->connect_req); stream->connect_req->cb(stream->connect_req, -ECANCELED); stream->connect_req = NULL; } // 丢弃待写的数据,如果有的话 uv__stream_flush_write_queue(stream, -ECANCELED); // 处理写完成队列,这里是处理被丢弃的数据 uv__write_callbacks(stream); // 正在关闭流,直接回调 if (stream->shutdown_req) { uv__req_unregister(stream->loop, stream->shutdown_req); stream->shutdown_req->cb(stream->shutdown_req, -ECANCELED); stream->shutdown_req = NULL; } }
我们看到,销毁流的时候,如果流中还有待写的数据,则会丢弃。我们看一下uv__stream_flush_write_queue和uv__write_callbacks。
void uv__stream_flush_write_queue(uv_stream_t* stream, int error) { uv_write_t* req; QUEUE* q; while (!QUEUE_EMPTY(&stream->write_queue)) { q = QUEUE_HEAD(&stream->write_queue); QUEUE_REMOVE(q); req = QUEUE_DATA(q, uv_write_t, queue); // 把错误写到每个请求中 req->error = error; QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue); } }
uv__stream_flush_write_queue丢弃待写队列中的请求,并直接插入写完成队列中。uv__write_callbacks是写完或者写出错时执行的函数,它逐个处理写完成队列中的节点,每个节点是一个写请求,执行它的回调,如何分配了堆内存,则释放内存。在写流章节已经分析,不再具体展开。
在流的实现中,读写等操作都只是注册事件到epoll,事件触发的时候,会执行统一的回调函数uv__stream_io。下面列一下该函数的代码,具体实现在其它章节已经分析。
static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { uv_stream_t* stream; stream = container_of(w, uv_stream_t, io_watcher); // 是连接流,则执行连接处理函数 if (stream->connect_req) { uv__stream_connect(stream); return; } /* Ignore POLLHUP here. Even it it's set, there may still be data to read. */ // 可读是触发,则执行读处理 if (events & (POLLIN | POLLERR | POLLHUP)) uv__read(stream); // 读回调关闭了流 if (uv__stream_fd(stream) == -1) return; /* read_cb closed stream. */ /* ¬¬ POLLHUP说明对端关闭了,即不会发生数据过来了。 如果流的模式是持续读, 1 如果只读取了部分(设置UV_STREAM_READ_PARTIAL), 并且没有读到结尾(没有设置UV_STREAM_READ_EOF), 则直接作读结束处理, 2 如果只读取了部分,上面的读回调执行了读结束操作, 则这里就不需要处理了 3 如果没有设置只读了部分,还没有执行读结束操作, 则不能作读结束操作,因为对端虽然关闭了,但是之 前的传过来的数据可能还没有被消费完 4 如果没有设置只读了部分,执行了读结束操作,那这 里也不需要处理 */ if ((events & POLLHUP) && (stream->flags & UV_STREAM_READING) && (stream->flags & UV_STREAM_READ_PARTIAL) && !(stream->flags & UV_STREAM_READ_EOF)) { uv_buf_t buf = { NULL, 0 }; uv__stream_eof(stream, &buf); } if (uv__stream_fd(stream) == -1) return; /* read_cb closed stream. */ // 可写事件触发 if (events & (POLLOUT | POLLERR | POLLHUP)) { // 写数据 uv__write(stream); // 写完后做后置处理,释放内存,执行回调等 uv__write_callbacks(stream); // 待写队列为空,则注销等待写事件 if (QUEUE_EMPTY(&stream->write_queue)) uv__drain(stream); } }
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8