一文聊透 Netty IO 事件的编排利器 pipeline | 详解所有 IO 事件的触发时机以及传播路径

565次阅读  |  发布于2年以前

本系列Netty源码解析文章基于 4.1.56.Final版本

1 . 前文回顾

在前边的系列文章中,笔者为大家详细剖析了 Reactor 模型在 netty 中的[创建] ,[运行] ,[接收连接] ,[接收数据] ,[发送数据] 的完整流程,在详细剖析整个 Reactor 模型如何在 netty 中实现的过程里,我们或多或少的见到了 pipeline 的身影。

Reactor启动后的结构.png

比如在 Reactor 启动的过程中首先需要创建 NioServerSocketChannel ,在创建的过程中会为 NioServerSocketChannel 创建分配一个 pipeline ,用于对 OP_ACCEPT 事件的编排。

当 NioServerSocketChannel 向 main reactor 注册成功后,会在 pipeline 中触发 ChannelRegistered 事件的传播。

当 NioServerSocketChannel 绑定端口成功后,会在 pipeline 中触发 ChannelActive 事件的传播。

主从Reactor组完整结构.png

又比如在 Reactor 接收连接的过程中,当客户端发起一个连接并完成三次握手之后,连接对应的 Socket 会存放在内核中的全连接队列中,随后 JDK Selector 会通知 main reactor 此时 NioServerSocketChannel 上有 OP_ACCEPT 事件活跃,最后 main reactor 开始执行 NioServerSocketChannel 的底层操作类 NioMessageUnsafe#read 方法在 NioServerSocketChannel 中的 pipeline 中传播 ChannelRead 事件。

传播ChannelRead事件.png

最终会在 NioServerSocketChannel 的 pipeline 中的 ServerBootstrapAcceptor 中响应 ChannelRead 事件并创建初始化 NioSocketChannel ,随后会为每一个新创建的 NioSocetChannel 创建分配一个独立的 pipeline ,用于各自 NioSocketChannel 上的 IO 事件的编排。并向 sub reactor 注册 NioSocketChannel ,随后在 NioSocketChannel 的 pipeline 中传播 ChannelRegistered 事件,最后传播 ChannelActive 事件。

传播ChannelRegister事件.png

还有在[《Netty如何高效接收网络数据》] 一文中,我们也提过当 sub reactor 读取 NioSocketChannel 中来自客户端的请求数据时,会在 NioSocketChannel 的 pipeline 中传播 ChannelRead 事件,在一个完整的 read loop 读取完毕后会传播 ChannelReadComplete 事件。

在[《一文搞懂Netty发送数据全流程》] 一文中,我们讲到了在用户经过业务处理后,通过 write 方法和 flush 方法分别在 NioSocketChannel 的 pipeline 中传播 write 事件和 flush 事件的过程。

笔者带大家又回顾了一下在前边系列文章中关于 pipeline 的使用场景,但是在这些系列文章中并未对 pipeline 相关的细节进行完整全面地描述,那么本文笔者将为大家详细的剖析下 pipeline 在 IO 事件的编排和传播场景下的完整实现原理。

内容概要.png

2 . pipeline的创建

主从Reactor组完整结构.png

Netty 会为每一个 Channel 分配一个独立的 pipeline ,pipeline 伴随着 channel 的创建而创建。

前边介绍到 NioServerSocketChannel 是在 netty 服务端启动的过程中创建的。而 NioSocketChannel 的创建是在当 NioServerSocketChannel 上的 OP_ACCEPT 事件活跃时,由 main reactor 线程在 NioServerSocketChannel 中创建,并在 NioServerSocketChannel 的 pipeline 中对 OP_ACCEPT 事件进行编排时(图中的 ServerBootstrapAcceptor 中)初始化的。

无论是创建 NioServerSocketChannel 里的 pipeline 还是创建 NioSocketChannel 里的 pipeline , 最终都会委托给它们的父类 AbstractChannel 。

image.png

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        //channel全局唯一ID machineId+processId+sequence+timestamp+random
        id = newId();
        //unsafe用于底层socket的相关操作
        unsafe = newUnsafe();
        //为channel分配独立的pipeline用于IO事件编排
        pipeline = newChannelPipeline();
    }

    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }

}
public class DefaultChannelPipeline implements ChannelPipeline {

      ....................

    //pipeline中的头结点
    final AbstractChannelHandlerContext head;
    //pipeline中的尾结点
    final AbstractChannelHandlerContext tail;

    //pipeline中持有对应channel的引用
    private final Channel channel;

       ....................

    protected DefaultChannelPipeline(Channel channel) {
        //pipeline中持有对应channel的引用
        this.channel = ObjectUtil.checkNotNull(channel, "channel");

        ............省略.......

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

       ....................
}

在前边的系列文章中笔者多次提到过,pipeline 的结构是由 ChannelHandlerContext 类型的节点构成的双向链表。其中头结点为 HeadContext ,尾结点为 TailContext 。其初始结构如下:

pipeline的初始结构.png

2.1 HeadContext

    private static final String HEAD_NAME = generateName0(HeadContext.class);

    final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {
       //headContext中持有对channel unsafe操作类的引用 用于执行channel底层操作
        private final Unsafe unsafe;

        HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, HeadContext.class);
            //持有channel unsafe操作类的引用,后续用于执行channel底层操作
            unsafe = pipeline.channel().unsafe();
            //设置channelHandler的状态为ADD_COMPLETE
            setAddComplete();
        }

        @Override
        public ChannelHandler handler() {
            return this;
        }

        .......................
    }

我们知道双向链表结构的 pipeline 中的节点元素为 ChannelHandlerContext ,既然 HeadContext 作为 pipeline 的头结点,那么它一定是 ChannelHandlerContext 类型的,所以它需要继承实现 AbstractChannelHandlerContext ,相当于一个哨兵的作用,因为用户可以以任意顺序向 pipeline 中添加 ChannelHandler ,需要用 HeadContext 来固定指向第一个 ChannelHandlerContext 。

在[《一文搞懂Netty发送数据全流程》] 一文中的《1. ChannelHandlerContext》小节中,笔者曾为大家详细介绍过 ChannelHandlerContext 在 pipeline 中的作用,忘记的同学可以在回看下。

于此同时 HeadContext 又实现了 ChannelInboundHandler 和 ChannelOutboundHandler 接口,说明 HeadContext 即是一个 ChannelHandlerContext 又是一个 ChannelHandler ,它可以同时处理 Inbound 事件和 Outbound 事件。

我们也注意到 HeadContext 中持有了对应 channel 的底层操作类 unsafe ,这也说明 IO 事件在 pipeline 中的传播最终会落在 HeadContext 中进行最后的 IO 处理。它是 Inbound 事件的处理起点,也是 Outbound 事件的处理终点。这里也可以看出 HeadContext 除了起到哨兵的作用,它还承担了对 channel 底层相关的操作。

比如我们在[《Reactor在Netty中的实现(启动篇)》] 中介绍的 NioServerSocketChannel 在向 main reactor 注册完成后会触发 ChannelRegistered 事件从 HeadContext 开始依次在 pipeline 中向后传播。

      @Override
        public void channelRegistered(ChannelHandlerContext ctx) {
            //此时firstRegistration已经变为false,在pipeline.invokeHandlerAddedIfNeeded中已被调用过
            invokeHandlerAddedIfNeeded();
            ctx.fireChannelRegistered();
        }

以及 NioServerSocketChannel 在与端口绑定成功后会触发 ChannelActive 事件从 HeadContext 开始依次在 pipeline 中向后传播,并在 HeadContext 中通过 unsafe.beginRead() 注册 OP_ACCEPT 事件到 main reactor 中。

     @Override
        public void read(ChannelHandlerContext ctx) {
            //触发注册OP_ACCEPT或者OP_READ事件
            unsafe.beginRead();
        }

同理在 NioSocketChannel 在向 sub reactor 注册成功后。会先后触发 ChannelRegistered 事件和 ChannelActive 事件从 HeadContext 开始在 pipeline 中向后传播。并在 HeadContext 中通过 unsafe.beginRead() 注册 OP_READ 事件到 sub reactor 中。

        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            //pipeline中继续向后传播channelActive事件
            ctx.fireChannelActive();
            //如果是autoRead 则自动触发read事件传播
            //在read回调函数中 触发OP_ACCEPT或者OP_READ事件注册
            readIfIsAutoRead();
        }

在[《一文搞懂Netty发送数据全流程》] 中介绍的 write 事件和 flush 事件最终会在 pipeline 中从后向前一直传播到 HeadContext ,并在 HeadContext 中相应事件回调函数中调用 unsafe 类操作底层 channel 发送数据。

        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
            //到headContext这里 msg的类型必须是ByteBuffer,也就是说必须经过编码器将业务层写入的实体编码为ByteBuffer
            unsafe.write(msg, promise);
        }

        @Override
        public void flush(ChannelHandlerContext ctx) {
            unsafe.flush();
        }

从本小节的内容介绍中,我们可以看出在 Netty 中对于 Channel 的相关底层操作调用均是在 HeadContext 中触发的。

2.2 TailContext

    private static final String TAIL_NAME = generateName0(TailContext.class);

    final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

        TailContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, TAIL_NAME, TailContext.class);
            //设置channelHandler的状态为ADD_COMPLETE
            setAddComplete();
        }

        @Override
        public ChannelHandler handler() {
            return this;
        }

        ......................
}

同样 TailContext 作为双向链表结构的 pipeline 中的尾结点,也需要继承实现 AbstractChannelHandlerContext 。但它同时又实现了 ChannelInboundHandler 。

这说明 TailContext 除了是一个 ChannelHandlerContext 同时也是一个 ChannelInboundHandler 。

2.2.1 TailContext 作为一个 ChannelHandlerContext 的作用

TailContext 作为一个 ChannelHandlerContext 的作用是负责将 outbound 事件从 pipeline 的末尾一直向前传播直到 HeadContext 。当然前提是用户需要调用 channel 的相关 outbound 方法。

public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {

    ChannelFuture write(Object msg);

    ChannelFuture write(Object msg, ChannelPromise promise);

    ChannelOutboundInvoker flush();

    ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);

    ChannelFuture writeAndFlush(Object msg);

}
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

   @Override
    public ChannelFuture write(Object msg) {
        return pipeline.write(msg);
    }

    @Override
    public Channel flush() {
        pipeline.flush();
        return this;
    }

    @Override
    public ChannelFuture writeAndFlush(Object msg) {
        return pipeline.writeAndFlush(msg);
    }
}
public class DefaultChannelPipeline implements ChannelPipeline {

   @Override
    public final ChannelFuture write(Object msg) {
        return tail.write(msg);
    }

    @Override
    public final ChannelPipeline flush() {
        tail.flush();
        return this;
    }

   @Override
    public final ChannelFuture writeAndFlush(Object msg) {
        return tail.writeAndFlush(msg);
    }

}

这里我们可以看到,当我们在自定义 ChannelHandler 中调用 ctx.channel().write(msg) 时,会在 AbstractChannel 中触发 pipeline.write(msg) ,最终在 DefaultChannelPipeline 中调用 tail.write(msg) 。使得 write 事件可以从 pipeline 的末尾开始向前传播,其他 outbound 事件的传播也是一样的道理。

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
        ctx.channel().write(msg);
    }

}

而我们自定义的 ChannelHandler 会被封装在一个 ChannelHandlerContext 中从而加入到 pipeline 中,而这个用于装载自定义 ChannelHandler 的 ChannelHandlerContext 与 TailContext 一样本质也都是 ChannelHandlerContext ,只不过在 pipeline 中的位置不同罢了。

客户端channel pipeline结构.png

public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {

    ChannelFuture write(Object msg);

    ChannelFuture write(Object msg, ChannelPromise promise);

    ChannelOutboundInvoker flush();

    ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);

    ChannelFuture writeAndFlush(Object msg);

}

我们看到 ChannelHandlerContext 接口本身也会继承 ChannelInboundInvoker 和 ChannelOutboundInvoker 接口,所以说 ContextHandlerContext 也可以触发 inbound 事件和 outbound 事件,只不过表达的语义是在 pipeline 中从当前 ChannelHandler 开始向前或者向后传播 outbound 事件或者 inbound 事件。

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
        ctx.write(msg);
    }

}

这里表示 write 事件从当前 EchoServerHandler 开始在 pipeline 中向前传播直到 HeadContext 。

客户端channel pipeline结构.png

2.2.2 TailContext 作为一个 ChannelInboundHandler 的作用

最后 TailContext 作为一个 ChannelInboundHandler 的作用就是为 inbound 事件在 pipeline 中的传播做一个兜底的处理。

这里提到的兜底处理是什么意思呢?

比如我们前边介绍到的,在 NioSocketChannel 向 sub reactor 注册成功后之后触发的 ChannelRegistered 事件和 ChannelActive 事件。或者在 reactor 线程读取 NioSocketChannel 中的请求数据时所触发的 channelRead 事件和 ChannelReadComplete 事件。

这些 inbound 事件都会首先从 HeadContext 开始在 pipeline 中一个一个的向后传递。

极端的情况是如果 pipeline 中所有 ChannelInboundHandler 中相应的 inbound 事件回调方法均不对事件作出处理,并继续向后传播。如下示例代码所示:

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
        ctx.fireChannelRead(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.fireChannelReadComplete();
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelRegistered();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
    }
}

最终这些 inbound 事件在 pipeline 中得不到处理,最后会传播到 TailContext 中。

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            onUnhandledInboundMessage(ctx, msg);
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) {
            onUnhandledInboundChannelReadComplete();
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            onUnhandledInboundChannelActive();
        }

}

而在 TailContext 中需要对这些得不到任何处理的 inbound 事件做出最终处理。比如丢弃该 msg,并释放所占用的 directByteBuffer,以免发生内存泄露。

    protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
        onUnhandledInboundMessage(msg);
        if (logger.isDebugEnabled()) {
            logger.debug("Discarded message pipeline : {}. Channel : {}.",
                         ctx.pipeline().names(), ctx.channel());
        }
    }

    protected void onUnhandledInboundMessage(Object msg) {
        try {
            logger.debug(
                    "Discarded inbound message {} that reached at the tail of the pipeline. " +
                            "Please check your pipeline configuration.", msg);
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

3 . pipeline中的事件分类

在前边的系列文章中,笔者多次介绍过,Netty 中的 IO 事件一共分为两大类:inbound 类事件和 outbound 类事件。其实如果严格来分的话应该分为三类。第三种事件类型为 exceptionCaught 异常事件类型。

而 exceptionCaught 事件在事件传播角度上来说和 inbound 类事件一样,都是从 pipeline 的 HeadContext 开始一直向后传递或者从当前 ChannelHandler 开始一直向后传递直到 TailContext 。所以一般也会将 exceptionCaught 事件统一归为 inbound 类事件。

而根据事件类型的分类,相应负责处理事件回调的 ChannelHandler 也会被分为两类:

那么我们常说的 inbound 类事件和 outbound 类事件具体都包含哪些事件呢?

3.1 inbound类事件

final class ChannelHandlerMask {

    // inbound事件集合
    static final int MASK_ONLY_INBOUND =  MASK_CHANNEL_REGISTERED |
            MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
            MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;

    private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_INBOUND;

    // inbound 类事件相关掩码
    static final int MASK_EXCEPTION_CAUGHT = 1;
    static final int MASK_CHANNEL_REGISTERED = 1 << 1;
    static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
    static final int MASK_CHANNEL_ACTIVE = 1 << 3;
    static final int MASK_CHANNEL_INACTIVE = 1 << 4;
    static final int MASK_CHANNEL_READ = 1 << 5;
    static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
    static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
    static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;

}

netty 会将其支持的所有异步事件用掩码来表示,定义在 ChannelHandlerMask 类中, netty 框架通过这些事件掩码可以很方便的知道用户自定义的 ChannelHandler 是属于什么类型的(ChannelInboundHandler or ChannelOutboundHandler )。

除此之外,inbound 类事件如此之多,用户也并不是对所有的 inbound 类事件感兴趣,用户可以在自定义的 ChannelInboundHandler 中覆盖自己感兴趣的 inbound 事件回调,从而达到针对特定 inbound 事件的监听。

这些用户感兴趣的 inbound 事件集合同样也会用掩码的形式保存在自定义 ChannelHandler 对应的 ChannelHandlerContext 中,这样当特定 inbound 事件在 pipeline 中开始传播的时候,netty 可以根据对应 ChannelHandlerContext 中保存的 inbound 事件集合掩码来判断,用户自定义的 ChannelHandler 是否对该 inbound 事件感兴趣,从而决定是否执行用户自定义 ChannelHandler 中的相应回调方法或者跳过对该 inbound 事件不感兴趣的 ChannelHandler 继续向后传播。

从以上描述中,我们也可以窥探出,Netty 引入 ChannelHandlerContext 来封装 ChannelHandler 的原因,在代码设计上还是遵循单一职责的原则, ChannelHandler 是用户接触最频繁的一个 netty 组件,netty 希望用户能够把全部注意力放在最核心的 IO 处理上,用户只需要关心自己对哪些异步事件感兴趣并考虑相应的处理逻辑即可,而并不需要关心异步事件在 pipeline 中如何传递,如何选择具有执行条件的 ChannelHandler 去执行或者跳过。这些切面性质的逻辑,netty 将它们作为上下文信息全部封装在 ChannelHandlerContext 中由netty框架本身负责处理。

以上这些内容,笔者还会在事件传播相关小节做详细的介绍,之所以这里引出,还是为了让大家感受下利用掩码进行集合操作的便利性,netty 中类似这样的设计还有很多,比如前边系列文章中多次提到过的,channel 再向 reactor 注册 IO 事件时,netty 也是将 channel 感兴趣的 IO 事件用掩码的形式存储于 SelectionKey 中的 int interestOps 中。

接下来笔者就为大家介绍下这些 inbound 事件,并梳理出这些 inbound 事件的触发时机。方便大家根据各自业务需求灵活地进行监听。

3.1.1 ExceptionCaught 事件

在本小节介绍的这些 inbound 类事件在 pipeline 中传播的过程中,如果在相应事件回调函数执行的过程中发生异常,那么就会触发对应 ChannelHandler 中的 exceptionCaught 事件回调。

    private void invokeExceptionCaught(final Throwable cause) {
        if (invokeHandler()) {
            try {
                handler().exceptionCaught(this, cause);
            } catch (Throwable error) {
                if (logger.isDebugEnabled()) {
                    logger.debug(
                        "An exception {}" +
                        "was thrown by a user handler's exceptionCaught() " +
                        "method while handling the following exception:",
                        ThrowableUtil.stackTraceToString(error), cause);
                } else if (logger.isWarnEnabled()) {
                    logger.warn(
                        "An exception '{}' [enable DEBUG level for full stacktrace] " +
                        "was thrown by a user handler's exceptionCaught() " +
                        "method while handling the following exception:", error, cause);
                }
            }
        } else {
            fireExceptionCaught(cause);
        }
    }

当然用户可以选择在 exceptionCaught 事件回调中是否执行 ctx.fireExceptionCaught(cause) 从而决定是否将 exceptionCaught 事件继续向后传播。

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ..........
        ctx.fireExceptionCaught(cause);
    }

当 netty 内核处理连接的接收,以及数据的读取过程中如果发生异常,会在整个 pipeline 中触发 exceptionCaught 事件的传播。

这里笔者为什么要单独强调在 inbound 事件传播的过程中发生异常,才会回调 exceptionCaught 呢 ?

因为 inbound 事件一般都是由 netty 内核触发传播的,而 outbound 事件一般都是由用户选择触发的,比如用户在处理完业务逻辑触发的 write 事件或者 flush 事件。

而在用户触发 outbound 事件后,一般都会得到一个 ChannelPromise 。用户可以向 ChannelPromise 添加各种 listener 。当 outbound 事件在传播的过程中发生异常时,netty 会通知用户持有的这个 ChannelPromise ,但不会触发 exceptionCaught 的回调

比如我们在[《一文搞懂Netty发送数据全流程》] 一文中介绍到的在 write 事件传播的过程中就不会触发 exceptionCaught 事件回调。只是去通知用户的 ChannelPromise 。

    private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            //调用当前ChannelHandler中的write方法
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }

    private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
        PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
    }

而 outbound 事件中只有 flush 事件的传播是个例外,当 flush 事件在 pipeline 传播的过程中发生异常时,会触发对应异常 ChannelHandler 的 exceptionCaught 事件回调。因为 flush 方法的签名中不会给用户返回 ChannelPromise 。

    @Override
    ChannelHandlerContext flush();
    private void invokeFlush0() {
        try {
            ((ChannelOutboundHandler) handler()).flush(this);
        } catch (Throwable t) {
            invokeExceptionCaught(t);
        }
    }

3.1.2 ChannelRegistered 事件

当 main reactor 在启动的时候,NioServerSocketChannel 会被创建并初始化,随后就会向main reactor注册,当注册成功后就会在 NioServerSocketChannel 中的 pipeline 中传播 ChannelRegistered 事件。

当 main reactor 接收客户端发起的连接后,NioSocketChannel 会被创建并初始化,随后会向 sub reactor 注册,当注册成功后会在 NioSocketChannel 中的 pipeline 传播 ChannelRegistered 事件。

传播ChannelRegister事件.png

private void register0(ChannelPromise promise) {

        ................
        //执行真正的注册操作
        doRegister();

        ...........

        //触发channelRegister事件
        pipeline.fireChannelRegistered();

        .......
}

注意:此时对应的 channel 还没有注册 IO 事件到相应的 reactor 中。

3.1.3 ChannelActive 事件

当 NioServerSocketChannel 再向 main reactor 注册成功并触发 ChannelRegistered 事件传播之后,随后就会在 pipeline 中触发 bind 事件,而 bind 事件是一个 outbound 事件,会从 pipeline 中的尾结点 TailContext 一直向前传播最终在 HeadContext 中执行真正的绑定操作。

     @Override
        public void bind(
                ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
            //触发AbstractChannel->bind方法 执行JDK NIO SelectableChannel 执行底层绑定操作
            unsafe.bind(localAddress, promise);
        }
       @Override
        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
             ..............

            doBind(localAddress);

            ...............

            //绑定成功后 channel激活 触发channelActive事件传播
            if (!wasActive && isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        //HeadContext->channelActive回调方法 执行注册OP_ACCEPT事件
                        pipeline.fireChannelActive();
                    }
                });
            }

            ...............
        }

当 netty 服务端 NioServerSocketChannel 绑定端口成功之后,才算是真正的 Active ,随后触发 ChannelActive 事件在 pipeline 中的传播。

之前我们也提到过判断 NioServerSocketChannel 是否 Active 的标准就是 : 底层 JDK Nio ServerSocketChannel 是否 open 并且 ServerSocket 是否已经完成绑定。

    @Override
    public boolean isActive() {
        return isOpen() && javaChannel().socket().isBound();
    }

而客户端 NioSocketChannel 中触发 ChannelActive 事件就会比较简单,当 NioSocketChannel 再向 sub reactor 注册成功并触发 ChannelRegistered 之后,紧接着就会触发 ChannelActive 事件在 pipeline 中传播。

传播ChannelActive事件.png

private void register0(ChannelPromise promise) {

        ................
        //执行真正的注册操作
        doRegister();

        ...........

        //触发channelRegister事件
        pipeline.fireChannelRegistered();

        .......

        if (isActive()) {

                    if (firstRegistration) {
                        //触发channelActive事件
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
          }
}

而客户端 NioSocketChannel 是否 Active 的标识是:底层 JDK NIO SocketChannel 是否 open 并且底层 socket 是否连接。毫无疑问,这里的 socket 一定是 connected 。所以直接触发 ChannelActive 事件。

    @Override
    public boolean isActive() {
        SocketChannel ch = javaChannel();
        return ch.isOpen() && ch.isConnected();
    }

注意:此时 channel 才会到相应的 reactor 中去注册感兴趣的 IO 事件。当用户自定义的 ChannelHandler 接收到 ChannelActive 事件时,表明 IO 事件已经注册到 reactor 中了。

3.1.4 ChannelRead 和 ChannelReadComplete 事件

接收客户端连接.png

当客户端有新连接请求的时候,服务端的 NioServerSocketChannel 上的 OP_ACCEPT 事件会活跃,随后 main reactor 会在一个 read loop 中不断的调用 serverSocketChannel.accept() 接收新的连接直到全部接收完毕或者达到 read loop 最大次数 16 次。

在 NioServerSocketChannel 中,每 accept 一个新的连接,就会在 pipeline 中触发 ChannelRead 事件。一个完整的 read loop 结束之后,会触发 ChannelReadComplete 事件。

    private final class NioMessageUnsafe extends AbstractNioUnsafe {

        @Override
        public void read() {
            ......................


                try {
                    do {
                        //底层调用NioServerSocketChannel->doReadMessages 创建客户端SocketChannel
                        int localRead = doReadMessages(readBuf);
                        .................
                    } while (allocHandle.continueReading());

                } catch (Throwable t) {
                    exception = t;
                }

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {            
                    pipeline.fireChannelRead(readBuf.get(i));
                }

                pipeline.fireChannelReadComplete();

                     .................
        }
    }

当客户端 NioSocketChannel 上有请求数据到来时,NioSocketChannel 上的 OP_READ 事件活跃,随后 sub reactor 也会在一个 read loop 中对 NioSocketChannel 中的请求数据进行读取直到读取完毕或者达到 read loop 的最大次数 16 次。

在 read loop 的读取过程中,每读取一次就会在 pipeline 中触发 ChannelRead 事件。当一个完整的 read loop 结束之后,会在 pipeline 中触发 ChannelReadComplete 事件。

Netty接收网络数据流程.png

这里需要注意的是当 ChannelReadComplete 事件触发时,此时并不代表 NioSocketChannel 中的请求数据已经读取完毕,可能的情况是发送的请求数据太多,在一个 read loop 中读取不完达到了最大限制次数 16 次,还没全部读取完毕就退出了 read loop 。一旦退出 read loop 就会触发 ChannelReadComplete 事件。详细内容可以查看笔者的这篇文章[《Netty如何高效接收网络数据》] 。

3.1.5 ChannelWritabilityChanged 事件

当我们处理完业务逻辑得到业务处理结果后,会调用 ctx.write(msg) 触发 write 事件在 pipeline 中的传播。

    @Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
         ctx.write(msg);
    }

最终 netty 会将发送数据 msg 写入 NioSocketChannel 中的待发送缓冲队列 ChannelOutboundBuffer 中。并等待用户调用 flush 操作从 ChannelOutboundBuffer 中将待发送数据 msg ,写入到底层 Socket 的发送缓冲区中。

ChannelOutboundBuffer中缓存待发送数据.png

当对端的接收处理速度非常慢或者网络状况极度拥塞时,使得 TCP 滑动窗口不断的缩小,这就导致发送端的发送速度也变得越来越小,而此时用户还在不断的调用 ctx.write(msg) ,这就会导致 ChannelOutboundBuffer 会急剧增大,从而可能导致 OOM 。netty 引入了高低水位线来控制 ChannelOutboundBuffer 的内存占用。

public final class WriteBufferWaterMark {

    private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024;
    private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024;
}

当 ChanneOutboundBuffer 中的内存占用量超过高水位线时,netty 就会将对应的 channel 置为不可写状态,并在 pipeline 中触发 ChannelWritabilityChanged 事件。

    private void setUnwritable(boolean invokeLater) {
        for (;;) {
            final int oldValue = unwritable;
            final int newValue = oldValue | 1;
            if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                if (oldValue == 0) {
                    //触发fireChannelWritabilityChanged事件 表示当前channel变为不可写
                    fireChannelWritabilityChanged(invokeLater);
                }
                break;
            }
        }
    }

当 ChannelOutboundBuffer 中的内存占用量低于低水位线时,netty 又会将对应的 NioSocketChannel 设置为可写状态,并再次触发 ChannelWritabilityChanged 事件。

响应channelWritabilityChanged事件.png

    private void setWritable(boolean invokeLater) {
        for (;;) {
            final int oldValue = unwritable;
            final int newValue = oldValue & ~1;
            if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                if (oldValue != 0 && newValue == 0) {
                    fireChannelWritabilityChanged(invokeLater);
                }
                break;
            }
        }
    }

用户可在自定义 ChannelHandler 中通过 ctx.channel().isWritable() 判断当前 channel 是否可写。

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {

        if (ctx.channel().isWritable()) {
            ...........当前channel可写.........
        } else {
            ...........当前channel不可写.........
        }
    }

3.1.6 UserEventTriggered 事件

netty 提供了一种事件扩展机制可以允许用户自定义异步事件,这样可以使得用户能够灵活的定义各种复杂场景的处理机制。

下面我们来看下如何在 Netty 中自定义异步事件。

1 . 定义异步事件。

public final class OurOwnDefinedEvent {

    public static final OurOwnDefinedEvent INSTANCE = new OurOwnDefinedEvent();

    private OurOwnDefinedEvent() { }
}

2 . 触发自定义事件的传播

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
            ......省略.......
            //事件在pipeline中从当前ChannelHandlerContext开始向后传播
            ctx.fireUserEventTriggered(OurOwnDefinedEvent.INSTANCE);
            //事件从pipeline的头结点headContext开始向后传播
            ctx.channel().pipeline().fireUserEventTriggered(OurOwnDefinedEvent.INSTANCE);

    }
}

3 . 自定义事件的响应和处理。

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

        if (OurOwnDefinedEvent.INSTANCE == evt) {
              .....自定义事件处理......
        }
    }

}

后续随着我们源码解读的深入,我们还会看到 Netty 自己本身也定义了许多 UserEvent 事件,我们后面还会在介绍,大家这里只是稍微了解一下相关的用法即可。

3.1.7 ChannelInactive和ChannelUnregistered事件

当 Channel 被关闭之后会在 pipeline 中先触发 ChannelInactive 事件的传播然后在触发 ChannelUnregistered 事件的传播。

我们可以在 Inbound 类型的 ChannelHandler 中响应 ChannelInactive 和 ChannelUnregistered 事件。

   @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

        ......响应inActive事件...

        //继续向后传播inActive事件
        super.channelInactive(ctx);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {

          ......响应Unregistered事件...

        //继续向后传播Unregistered事件
        super.channelUnregistered(ctx);
    }

这里和连接建立之后的事件触发顺序正好相反,连接建立之后是先触发 ChannelRegistered 事件然后在触发 ChannelActive 事件。

3.2 Outbound 类事件

final class ChannelHandlerMask {

    // outbound 事件的集合
    static final int MASK_ONLY_OUTBOUND =  MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
            MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;

    private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_OUTBOUND;

    // outbound 事件掩码
    static final int MASK_BIND = 1 << 9;
    static final int MASK_CONNECT = 1 << 10;
    static final int MASK_DISCONNECT = 1 << 11;
    static final int MASK_CLOSE = 1 << 12;
    static final int MASK_DEREGISTER = 1 << 13;
    static final int MASK_READ = 1 << 14;
    static final int MASK_WRITE = 1 << 15;
    static final int MASK_FLUSH = 1 << 16;
}

和 Inbound 类事件一样,Outbound 类事件也有对应的掩码表示。下面我们来看下 Outbound类事件的触发时机:

3.2.1 read 事件

大家这里需要注意区分 read 事件和 ChannelRead 事件的不同

ChannelRead 事件前边我们已经介绍了,当 NioServerSocketChannel 接收到新连接时,会触发 ChannelRead 事件在其 pipeline 上传播。

当 NioSocketChannel 上有请求数据时,在 read loop 中读取请求数据时会触发 ChannelRead 事件在其 pipeline 上传播。

而 read 事件则和 ChannelRead 事件完全不同,read 事件特指使 Channel 具备感知 IO 事件的能力。NioServerSocketChannel 对应的 OP_ACCEPT 事件的感知能力,NioSocketChannel 对应的是 OP_READ 事件的感知能力。

read 事件的触发是在当 channel 需要向其对应的 reactor 注册读类型事件时(比如 OP_ACCEPT 事件 和 OP_READ 事件)才会触发。read 事件的响应就是将 channel 感兴趣的 IO 事件注册到对应的 reactor 上。

比如 NioServerSocketChannel 感兴趣的是 OP_ACCEPT 事件, NioSocketChannel 感兴趣的是 OP_READ 事件。

在前边介绍 ChannelActive 事件时我们提到,当 channel 处于 active 状态后会在 pipeline 中传播 ChannelActive 事件。而在 HeadContext 中的 ChannelActive 事件回调中会触发 Read 事件的传播。

final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            ctx.fireChannelActive();  
            readIfIsAutoRead();
        }

        private void readIfIsAutoRead() {
            if (channel.config().isAutoRead()) {
                //如果是autoRead 则触发read事件传播
                channel.read();
            }
        }

        @Override
        public void read(ChannelHandlerContext ctx) {
            //触发注册OP_ACCEPT或者OP_READ事件
            unsafe.beginRead();
        }
 }

而在 HeadContext 中的 read 事件回调中会调用 Channel 的底层操作类 unsafe 的 beginRead 方法,在该方法中会向 reactor 注册 channel 感兴趣的 IO 事件。对于 NioServerSocketChannel 来说这里注册的就是 OP_ACCEPT 事件,对于 NioSocketChannel 来说这里注册的则是 OP_READ 事件。

    @Override
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();

        if ((interestOps & readInterestOp) == 0) {
            //注册监听OP_ACCEPT或者OP_READ事件
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

细心的同学可能注意到了 channel 对应的配置类中包含了一个 autoRead 属性,那么这个 autoRead 到底是干什么的呢?

其实这是 netty 为大家提供的一种背压机制,用来防止 OOM ,想象一下当对端发送数据非常多并且发送速度非常快,而服务端处理速度非常慢,一时间消费不过来。而对端又在不停的大量发送数据,服务端的 reactor 线程不得不在 read loop 中不停的读取,并且为读取到的数据分配 ByteBuffer 。而服务端业务线程又处理不过来,这就导致了大量来不及处理的数据占用了大量的内存空间,从而导致 OOM 。

面对这种情况,我们可以通过 channelHandlerContext.channel().config().setAutoRead(false) 将 autoRead 属性设置为 false 。随后 netty 就会将 channel 中感兴趣的读类型事件从 reactor 中注销,从此 reactor 不会再对相应事件进行监听。这样 channel 就不会在读取数据了。

这里 NioServerSocketChannel 对应的是 OP_ACCEPT 事件, NioSocketChannel 对应的是 OP_READ 事件。

       protected final void removeReadOp() {
            SelectionKey key = selectionKey();
            if (!key.isValid()) {
                return;
            }
            int interestOps = key.interestOps();
            if ((interestOps & readInterestOp) != 0) {        
                key.interestOps(interestOps & ~readInterestOp);
            }
        }

而当服务端的处理速度恢复正常,我们又可以通过 channelHandlerContext.channel().config().setAutoRead(true) 将 autoRead 属性设置为 true 。这样 netty 会在 pipeline 中触发 read 事件,最终在 HeadContext 中的 read 事件回调方法中通过调用 unsafe#beginRead 方法将 channel 感兴趣的读类型事件重新注册到对应的 reactor 中。

    @Override
    public ChannelConfig setAutoRead(boolean autoRead) {
        boolean oldAutoRead = AUTOREAD_UPDATER.getAndSet(this, autoRead ? 1 : 0) == 1;
        if (autoRead && !oldAutoRead) {
            //autoRead从false变为true
            channel.read();
        } else if (!autoRead && oldAutoRead) {
            //autoRead从true变为false
            autoReadCleared();
        }
        return this;
    }

read 事件可以理解为使 channel 拥有读的能力,当有了读的能力后, channelRead 就可以读取具体的数据了。

3.2.2 write 和 flush 事件

write 事件和 flush 事件我们在[《一文搞懂Netty发送数据全流程》] 一文中已经非常详尽的介绍过了,这里笔者在带大家简单回顾一下。

write 事件和 flush 事件均由用户在处理完业务请求得到业务结果后在业务线程中主动触发。

用户既可以通过 ChannelHandlerContext 触发也可以通过 Channel 来触发。

不同之处在于如果通过 ChannelHandlerContext 触发,那么 write 事件或者 flush 事件就会在 pipeline 中从当前 ChannelHandler 开始一直向前传播直到 HeadContext 。

    @Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
       ctx.write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

如果通过 Channel 触发,那么 write 事件和 flush 事件就会从 pipeline 的尾部节点 TailContext 开始一直向前传播直到 HeadContext 。

    @Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
       ctx.channel().write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.channel().flush();
    }

当然还有一个 writeAndFlush 方法,也会分为 ChannelHandlerContext 触发和 Channel 的触发。触发 writeAndFlush 后,write 事件首先会在 pipeline 中传播,最后 flush 事件在 pipeline 中传播。

netty 对 write 事件的处理最终会将发送数据写入 Channel 对应的写缓冲队列 ChannelOutboundBuffer 中。此时数据并没有发送出去而是在写缓冲队列中缓存,这也是 netty 实现异步写的核心设计。

最终通过 flush 操作从 Channel 中的写缓冲队列 ChannelOutboundBuffer 中获取到待发送数据,并写入到 Socket 的发送缓冲区中。

3.2.3 close 事件

当用户在 ChannelHandler 中调用如下方法对 Channel 进行关闭时,会触发 Close 事件在 pipeline 中从后向前传播。

//close事件从当前ChannelHandlerContext开始在pipeline中向前传播
ctx.close();
//close事件从pipeline的尾结点tailContext开始向前传播
ctx.channel().close();

我们可以在Outbound类型的ChannelHandler中响应close事件。

public class ExampleChannelHandler extends ChannelOutboundHandlerAdapter {

    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {

        .....客户端channel关闭之前的处理回调.....

        //继续向前传播close事件
        super.close(ctx, promise);
    }
}

最终 close 事件会在 pipeline 中一直向前传播直到头结点 HeadConnect 中,并在 HeadContext 中完成连接关闭的操作,当连接完成关闭之后,会在 pipeline中先后触发 ChannelInactive 事件和 ChannelUnregistered 事件。

3.2.4 deRegister 事件

用户可调用如下代码将当前 Channel 从 Reactor 中注销掉。

//deregister事件从当前ChannelHandlerContext开始在pipeline中向前传播
ctx.deregister();
//deregister事件从pipeline的尾结点tailContext开始向前传播
ctx.channel().deregister();

我们可以在 Outbound 类型的 ChannelHandler 中响应 deregister 事件。

public class ExampleChannelHandler extends ChannelOutboundHandlerAdapter {

    @Override
    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {


        .....客户端channel取消注册之前的处理回调.....

        //继续向前传播connect事件
        super.deregister(ctx, promise);
    }
}

最终 deRegister 事件会传播至 pipeline 中的头结点 HeadContext 中,并在 HeadContext 中完成底层 channel 取消注册的操作。当 Channel 从 Reactor 上注销之后,从此 Reactor 将不会在监听 Channel 上的 IO 事件,并触发 ChannelUnregistered 事件在 pipeline 中传播。

3.2.5 connect 事件

在 Netty 的客户端中我们可以利用 NioSocketChannel 的 connect 方法触发 connect 事件在 pipeline 中传播。

//connect事件从当前ChannelHandlerContext开始在pipeline中向前传播
ctx.connect(remoteAddress);
//connect事件从pipeline的尾结点tailContext开始向前传播
ctx.channel().connect(remoteAddress);

我们可以在 Outbound 类型的 ChannelHandler 中响应 connect 事件。

public class ExampleChannelHandler extends ChannelOutboundHandlerAdapter {

    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
                        ChannelPromise promise) throws Exception {


        .....客户端channel连接成功之前的处理回调.....

        //继续向前传播connect事件
        super.connect(ctx, remoteAddress, localAddress, promise);
    }
}

最终 connect 事件会在 pipeline 中的头结点 headContext 中触发底层的连接建立请求。当客户端成功连接到服务端之后,会在客户端 NioSocketChannel 的 pipeline 中传播 channelActive 事件。

3.2.6 disConnect 事件

在 Netty 的客户端中我们也可以调用 NioSocketChannel 的 disconnect 方法在 pipeline 中触发 disconnect 事件,这会导致 NioSocketChannel 的关闭。

//disconnect事件从当前ChannelHandlerContext开始在pipeline中向前传播
ctx.disconnect();
//disconnect事件从pipeline的尾结点tailContext开始向前传播
ctx.channel().disconnect();

我们可以在 Outbound 类型的 ChannelHandler 中响应 disconnect 事件。

public class ExampleChannelHandler extends ChannelOutboundHandlerAdapter {


    @Override
    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {

        .....客户端channel即将关闭前的处理回调.....

        //继续向前传播disconnect事件
        super.disconnect(ctx, promise);
    }
}

最终 disconnect 事件会传播到 HeadContext 中,并在 HeadContext 中完成底层的断开连接操作,当客户端断开连接成功关闭之后,会在 pipeline 中先后触发 ChannelInactive 事件和 ChannelUnregistered 事件。

4 . 向pipeline添加channelHandler

在我们详细介绍了全部的 inbound 类事件和 outbound 类事件的掩码表示以及事件的触发和传播路径后,相信大家现在可以通过 ChannelInboundHandler 和 ChannelOutboundHandler 来根据具体的业务场景选择合适的 ChannelHandler 类型以及监听合适的事件来完成业务需求了。

本小节就该介绍一下自定义的 ChannelHandler 是如何添加到 pipeline 中的,netty 在这个过程中帮我们作了哪些工作?

           final EchoServerHandler serverHandler = new EchoServerHandler();

            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)

             .............

             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();          
                     p.addLast(serverHandler);

                     ......可添加多个channelHandler......
                 }
             });

以上是笔者简化的一个 netty 服务端配置 ServerBootstrap 启动类的一段示例代码。我们可以看到再向 channel 对应的 pipeline 中添加 ChannelHandler 是通过 ChannelPipeline#addLast 方法将指定 ChannelHandler 添加到 pipeline 的末尾处。

public interface ChannelPipeline
        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {

    //向pipeline的末尾处批量添加多个channelHandler
    ChannelPipeline addLast(ChannelHandler... handlers);

    //指定channelHandler的executor,由指定的executor执行channelHandler中的回调方法
    ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);

     //为channelHandler指定名称
    ChannelPipeline addLast(String name, ChannelHandler handler);

    //为channelHandler指定executor和name
    ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
}
public class DefaultChannelPipeline implements ChannelPipeline {

    @Override
    public final ChannelPipeline addLast(ChannelHandler... handlers) {
        return addLast(null, handlers);
    }

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        ObjectUtil.checkNotNull(handlers, "handlers");

        for (ChannelHandler h: handlers) {
            if (h == null) {
                break;
            }
            addLast(executor, null, h);
        }

        return this;
    }

    @Override
    public final ChannelPipeline addLast(String name, ChannelHandler handler) {
        return addLast(null, name, handler);
    }
}

最终 addLast 的这些重载方法都会调用到 DefaultChannelPipeline#addLast(EventExecutorGroup, String, ChannelHandler) 这个方法从而完成 ChannelHandler 的添加。

   @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            //检查同一个channelHandler实例是否允许被重复添加
            checkMultiplicity(handler);

            //创建channelHandlerContext包裹channelHandler并封装执行传播事件相关的上下文信息
            newCtx = newContext(group, filterName(name, handler), handler);

            //将channelHandelrContext插入到pipeline中的末尾处。双向链表操作
            //此时channelHandler的状态还是ADD_PENDING,只有当channelHandler的handlerAdded方法被回调后,状态才会为ADD_COMPLETE
            addLast0(newCtx);

            //如果当前channel还没有向reactor注册,则将handlerAdded方法的回调添加进pipeline的任务队列中
            if (!registered) {
                //这里主要是用来处理ChannelInitializer的情况
                //设置channelHandler的状态为ADD_PENDING 即等待添加,当状态变为ADD_COMPLETE时 channelHandler中的handlerAdded会被回调
                newCtx.setAddPending();
                //向pipeline中添加PendingHandlerAddedTask任务,在任务中回调handlerAdded
                //当channel注册到reactor后,pipeline中的pendingHandlerCallbackHead任务链表会被挨个执行
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            //如果当前channel已经向reactor注册成功,那么就直接回调channelHandler中的handlerAddded方法
            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                //这里需要确保channelHandler中handlerAdded方法的回调是在channel指定的executor中
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }
        }
        //回调channelHandler中的handlerAddded方法
        callHandlerAdded0(newCtx);
        return this;
    }

这个方法的逻辑还是比较复杂的,涉及到很多细节,为了清晰地为大家讲述,笔者这里还是采用总分总的结构,先描述该方法的总体逻辑,然后在针对核心细节要点展开细节分析。

因为向 pipeline 中添加 channelHandler 的操作可能会在多个线程中进行,所以为了确保添加操作的线程安全性,这里采用一个 synchronized 语句块将整个添加逻辑包裹起来。

  1. 通过 checkMultiplicity 检查被添加的 ChannelHandler 是否是共享的(标注 @Sharable 注解),如果不是共享的那么则不会允许该 ChannelHandler 的同一实例被添加进多个 pipeline 中。如果是共享的,则允许该 ChannelHandler 的同一个实例被多次添加进多个 pipeline 中。
    private static void checkMultiplicity(ChannelHandler handler) {
        if (handler instanceof ChannelHandlerAdapter) {
            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
            //只有标注@Sharable注解的channelHandler,才被允许同一个实例被添加进多个pipeline中
            //注意:标注@Sharable之后,一个channelHandler的实例可以被添加到多个channel对应的pipeline中
            //可能被多线程执行,需要确保线程安全
            if (!h.isSharable() && h.added) {
                throw new ChannelPipelineException(
                        h.getClass().getName() +
                        " is not a @Sharable handler, so can't be added or removed multiple times.");
            }
            h.added = true;
        }
    }

这里大家需要注意的是,如果一个 ChannelHandler 被标注了 @Sharable 注解,这就意味着它的一个实例可以被多次添加进多个 pipeline 中(每个 channel 对应一个 pipeline 实例),而这多个不同的 pipeline 可能会被不同的 reactor 线程执行,所以在使用共享 ChannelHandler 的时候需要确保其线程安全性。

比如下面的实例代码:

@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
            .............需要确保线程安全.......
}
  final EchoServerHandler serverHandler = new EchoServerHandler();

  ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
               ..................
            .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     p.addLast(serverHandler);
                 }
             });

EchoServerHandler 为我们自定义的 ChannelHandler ,它被 @Sharable 注解标注,全局只有一个实例,被添加进多个 Channel 的 pipeline 中。从而会被多个 reactor 线程执行到。

共享channelHandler.png

2 . 为 ChannelHandler 创建其 ChannelHandlerContext ,用于封装 ChannelHandler 的名称,状态信息,执行上下文信息,以及用于感知 ChannelHandler 在 pipeline 中的位置信息。newContext 方法涉及的细节较多,后面我们单独介绍。

3 . 通过 addLast0 将新创建出来的 ChannelHandlerContext 插入到 pipeline 中末尾处。方法的逻辑很简单其实就是一个普通的双向链表插入操作。

    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

但是这里大家需要注意的点是:虽然此时 ChannelHandlerContext 被物理的插入到了 pipeline 中,但是此时 channelHandler 的状态依然为 INIT 状态,从逻辑上来说并未算是真正的插入到 pipeline 中,需要等到 ChannelHandler 的 handlerAdded 方法被回调时,状态才变为 ADD_COMPLETE ,而只有 ADD_COMPLETE 状态的 ChannelHandler 才能响应 pipeline 中传播的事件。

channelhandelr的状态.png

在上篇文章[《一文搞懂Netty发送数据全流程》] 中的《3.1.5 触发nextChannelHandler的write方法回调》小节中我们也提过,在每次 write 事件或者 flush 事件传播的时候,都需要通过 invokeHandler 方法来判断 channelHandler 的状态是否为 ADD_COMPLETE ,否则当前 channelHandler 则不能响应正在 pipeline 中传播的事件。必须要等到对应的 handlerAdded 方法被回调才可以,因为 handlerAdded 方法中可能包含一些 ChannelHandler 初始化的重要逻辑。

    private boolean invokeHandler() {
        // 这里是一个优化点,netty 用一个局部变量保存 handlerState
        // 目的是减少 volatile 变量 handlerState 的读取次数
        int handlerState = this.handlerState;
        return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
    }

    void invokeWrite(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            invokeWrite0(msg, promise);
        } else {
            // 当前channelHandler虽然添加到pipeline中,但是并没有调用handlerAdded
            // 所以不能调用当前channelHandler中的回调方法,只能继续向前传递write事件
            write(msg, promise);
        }
    }

    private void invokeFlush() {
        if (invokeHandler()) {
            invokeFlush0();
        } else {
            //如果该ChannelHandler虽然加入到pipeline中但handlerAdded方法并未被回调,则继续向前传递flush事件
            flush();
        }
    }

事实上不仅仅是 write 事件和 flush 事件在传播的时候需要判断 ChannelHandler 的状态,所有的 inbound 类事件和 outbound 类事件在传播的时候都需要通过 invokeHandler 方法来判断当前 ChannelHandler 的状态是否为 ADD_COMPLETE ,需要确保在 ChannelHandler 响应事件之前,它的 handlerAdded 方法被回调。

4 . 如果向 pipeline 中添加 ChannelHandler 的时候, channel 还没来得及注册到 reactor中,那么需要将当前 ChannelHandler 的状态先设置为 ADD_PENDING ,并将回调该 ChannelHandler 的 handlerAdded 方法封装成 PendingHandlerAddedTask 任务添加进 pipeline 中的任务列表中,等到 channel 向 reactor 注册之后,reactor 线程会挨个执行 pipeline 中任务列表中的任务。

这段逻辑主要用来处理 ChannelInitializer 的添加场景,因为目前只有 ChannelInitializer 这个特殊的 channelHandler 会在 channel 没有注册之前被添加进 pipeline 中

        if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
         }

向 pipeline 的任务列表 pendingHandlerCallbackHead 中添加 PendingHandlerAddedTask 任务:

public class DefaultChannelPipeline implements ChannelPipeline {

    // pipeline中的任务列表
    private PendingHandlerCallback pendingHandlerCallbackHead;

    // 向任务列表尾部添加PendingHandlerAddedTask
    private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
        assert !registered;

        PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
        PendingHandlerCallback pending = pendingHandlerCallbackHead;
        if (pending == null) {
            pendingHandlerCallbackHead = task;
        } else {
            // Find the tail of the linked-list.
            while (pending.next != null) {
                pending = pending.next;
            }
            pending.next = task;
        }
    }
}

PendingHandlerAddedTask 任务负责回调 ChannelHandler 中的 handlerAdded 方法。

private final class PendingHandlerAddedTask extends PendingHandlerCallback {
        ...............

        @Override
        public void run() {
            callHandlerAdded0(ctx);
        }

       ...............
}    private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
       try {
            ctx.callHandlerAdded();
        } catch (Throwable t) {
           ...............
        }
    }
    private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
       try {
            ctx.callHandlerAdded();
        } catch (Throwable t) {
           ...............
        }
    }

pipeline任务列表.png

5 . 除了 ChannelInitializer 这个特殊的 ChannelHandler 的添加是在 channel 向 reactor 注册之前外,剩下的这些用户自定义的 ChannelHandler 的添加,均是在 channel 向 reactor 注册之后被添加进 pipeline 的。这种场景下的处理就会变得比较简单,在 ChannelHandler 被插入到 pipeline 中之后,就会立即回调该 ChannelHandler 的 handlerAdded 方法。但是需要确保 handlerAdded 方法的回调在 channel 指定的 executor 中进行。

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }  

            callHandlerAdded0(newCtx);

如果当前执行线程并不是 ChannelHandler 指定的 executor ( !executor.inEventLoop() ),那么就需要确保 handlerAdded 方法的回调在 channel 指定的 executor 中进行。

    private void callHandlerAddedInEventLoop(final AbstractChannelHandlerContext newCtx, EventExecutor executor) {
        newCtx.setAddPending();
        executor.execute(new Runnable() {
            @Override
            public void run() {
                callHandlerAdded0(newCtx);
            }
        });
    }

这里需要注意的是需要在回调 handlerAdded 方法之前将 ChannelHandler 的状态提前设置为 ADD_COMPLETE 。 因为用户可能在 ChannelHandler 中的 handerAdded 回调中触发一些事件,而如果此时 ChannelHandler 的状态不是 ADD_COMPLETE 的话,就会停止对事件的响应,从而错过事件的处理。

这种属于一种用户极端的使用情况。

   final void callHandlerAdded() throws Exception {
        if (setAddComplete()) {
            handler().handlerAdded(this);
        }
    }

5 . ChanneHandlerContext 的创建

在介绍完 ChannelHandler 向 pipeline 添加的整个逻辑过程后,本小节我们来看下如何为 ChannelHandler 创建对应的 ChannelHandlerContext ,以及 ChannelHandlerContext 中具体包含了哪些上下文信息。

public class DefaultChannelPipeline implements ChannelPipeline {
    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {

            ................

            //创建channelHandlerContext包裹channelHandler并封装执行传播相关的上下文信息
            newCtx = newContext(group, filterName(name, handler), handler);

             ................
        }

    }

    private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }

}

在创建 ChannelHandlerContext 之前,需要做两个重要的前置操作:

5.1 filterName

    private String filterName(String name, ChannelHandler handler) {
        if (name == null) {
            // 如果没有指定name,则会为handler默认生成一个name,该方法可确保默认生成的name在pipeline中不会重复
            return generateName(handler);
        }

        // 如果指定了name,需要确保name在pipeline中是唯一的
        checkDuplicateName(name);
        return name;
    }

如果用户再向 pipeline 添加 ChannelHandler 的时候,为其指定了具体的名称,那么这里需要确保用户指定的名称在 pipeline 中是唯一的。

   private void checkDuplicateName(String name) {
        if (context0(name) != null) {
            throw new IllegalArgumentException("Duplicate handler name: " + name);
        }
    }

    /**
     * 通过指定名称在pipeline中查找对应的channelHandler 没有返回null
     * */
    private AbstractChannelHandlerContext context0(String name) {
        AbstractChannelHandlerContext context = head.next;
        while (context != tail) {
            if (context.name().equals(name)) {
                return context;
            }
            context = context.next;
        }
        return null;
    }

如果用户没有为 ChannelHandler 指定名称,那么就需要为 ChannelHandler 在 pipeline 中默认生成一个唯一的名称。

    // pipeline中channelHandler对应的name缓存
    private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
            new FastThreadLocal<Map<Class<?>, String>>() {
        @Override
        protected Map<Class<?>, String> initialValue() {
            return new WeakHashMap<Class<?>, String>();
        }
    };

    private String generateName(ChannelHandler handler) {
        // 获取pipeline中channelHandler对应的name缓存
        Map<Class<?>, String> cache = nameCaches.get();
        Class<?> handlerType = handler.getClass();
        String name = cache.get(handlerType);
        if (name == null) {
            // 当前handler还没对应的name缓存,则默认生成:simpleClassName + #0
            name = generateName0(handlerType);
            cache.put(handlerType, name);
        }

        if (context0(name) != null) {
            // 不断重试名称后缀#n + 1 直到没有重复
            String baseName = name.substring(0, name.length() - 1); 
            for (int i = 1;; i ++) {
                String newName = baseName + i;
                if (context0(newName) == null) {
                    name = newName;
                    break;
                }
            }
        }
        return name;
    }

    private static String generateName0(Class<?> handlerType) {
        return StringUtil.simpleClassName(handlerType) + "#0";
    }

pipeline 中使用了一个 FastThreadLocal 类型的 nameCaches 来缓存各种类型 ChannelHandler 的基础名称。后面会根据这个基础名称不断的重试生成一个没有冲突的正式名称。缓存 nameCaches 中的 key 表示特定的 ChannelHandler 类型,value 表示该特定类型的 ChannelHandler 的基础名称 simpleClassName + #0

自动为 ChannelHandler 生成默认名称的逻辑是:

虽然用户不大可能将同一类型的 channelHandler 重复添加到 pipeline 中,但是 netty 为了防止这种反复添加同一类型 ChannelHandler 的行为导致的名称冲突,从而利用 nameCaches 来缓存同一类型 ChannelHandler 的基础名称 simpleClassName + #0,然后通过不断的重试递增名称后缀,来生成一个在pipeline中唯一的名称。

5.2 childExecutor

通过前边的介绍我们了解到,当我们向 pipeline 添加 ChannelHandler 的时候,netty 允许我们为 ChannelHandler 指定特定的 executor 去执行 ChannelHandler 中的各种事件回调方法。

通常我们会为 ChannelHandler 指定一个EventExecutorGroup,在创建ChannelHandlerContext 的时候,会通过 childExecutor 方法从 EventExecutorGroup 中选取一个 EventExecutor 来与该 ChannelHandler 绑定。

EventExecutorGroup 是 netty 自定义的一个线程池模型,其中包含多个 EventExecutor ,而 EventExecutor 在 netty 中是一个线程的执行模型。相关的具体实现和用法笔者已经在[《Reactor在Netty中的实现(创建篇)》] 一文中给出了详尽的介绍,忘记的同学可以在回顾下。

在介绍 executor 的绑定逻辑之前,这里笔者需要先为大家介绍一个相关的重要参数:SINGLE_EVENTEXECUTOR_PER_GROUP ,默认为 true 。


ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
  .........
.childOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP,true)

我们知道在 netty 中,每一个 channel 都会对应一个独立的 pipeline ,如果我们开启了 SINGLE_EVENTEXECUTOR_PER_GROUP 参数,表示在一个 channel 对应的 pipeline 中,如果我们为多个 ChannelHandler 指定了同一个 EventExecutorGroup ,那么这多个 channelHandler 只能绑定到 EventExecutorGroup 中的同一个 EventExecutor 上。

什么意思呢??比如我们有下面一段初始化pipeline的代码:

            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                 ........................
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline pipeline = ch.pipeline();
                     pipeline.addLast(eventExecutorGroup,channelHandler1)
                     pipeline.addLast(eventExecutorGroup,channelHandler2)
                     pipeline.addLast(eventExecutorGroup,channelHandler3)
                 }
             });

eventExecutorGroup 中包含 EventExecutor1,EventExecutor2 , EventExecutor3 三个执行线程。

假设此时第一个连接进来,在创建 channel1 后初始化 pipeline1 的时候,如果在开启 SINGLE_EVENTEXECUTOR_PER_GROUP 参数的情况下,那么在 channel1 对应的 pipeline1 中 channelHandler1,channelHandler2 , channelHandler3 绑定的 EventExecutor 均为 EventExecutorGroup 中的 EventExecutor1 。

第二个连接 channel2 对应的 pipeline2 中 channelHandler1 , channelHandler2 ,channelHandler3 绑定的 EventExecutor 均为 EventExecutorGroup 中的 EventExecutor2 。

第三个连接 channel3 对应的 pipeline3 中 channelHandler1 , channelHandler2 ,channelHandler3 绑定的 EventExecutor 均为 EventExecutorGroup 中的 EventExecutor3 。

以此类推........

SINGLE_EVENTEXECUTOR_PER_GROUP绑定.png

如果在关闭 SINGLE_EVENTEXECUTOR_PER_GROUP 参数的情况下, channel1 对应的 pipeline1 中 channelHandler1 会绑定到 EventExecutorGroup 中的 EventExecutor1 ,channelHandler2 会绑定到 EventExecutor2 ,channelHandler3 会绑定到 EventExecutor3 。

同理其他 channel 对应的 pipeline 中的 channelHandler 绑定逻辑同 channel1 。它们均会绑定到 EventExecutorGroup 中的不同 EventExecutor 中。

SINGLE_EVENTEXECUTOR_PER_GROUP关闭.png

当我们了解了 SINGLE_EVENTEXECUTOR_PER_GROUP 参数的作用之后,再来看下面这段绑定逻辑就很容易理解了。

     // 在每个pipeline中都会保存EventExecutorGroup中绑定的线程
    private Map<EventExecutorGroup, EventExecutor> childExecutors;

    private EventExecutor childExecutor(EventExecutorGroup group) {
        if (group == null) {
            return null;
        }

        Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
        if (pinEventExecutor != null && !pinEventExecutor) {
            //如果没有开启SINGLE_EVENTEXECUTOR_PER_GROUP,则按顺序从指定的EventExecutorGroup中为channelHandler分配EventExecutor
            return group.next();
        }

        //获取pipeline绑定到EventExecutorGroup的线程(在一个pipeline中会为每个指定的EventExecutorGroup绑定一个固定的线程)
        Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
        if (childExecutors == null) {
            childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
        }

        //获取该pipeline绑定在指定EventExecutorGroup中的线程
        EventExecutor childExecutor = childExecutors.get(group);
        if (childExecutor == null) {
            childExecutor = group.next();
            childExecutors.put(group, childExecutor);
        }
        return childExecutor;
    }

如果我们并未特殊指定 ChannelHandler 的 executor ,那么默认会是对应 channel 绑定的 reactor 线程负责执行该 ChannelHandler 。

如果我们未开启 SINGLE_EVENTEXECUTOR_PER_GROUP ,netty 就会从我们指定的 EventExecutorGroup 中按照 round-robin 的方式为 ChannelHandler 绑定其中一个 eventExecutor 。

SINGLE_EVENTEXECUTOR_PER_GROUP关闭.png

如果我们开启了 SINGLE_EVENTEXECUTOR_PER_GROUP相同的 EventExecutorGroup 在同一个 pipeline 实例中的绑定关系是固定的。在 pipeline 中如果多个 channelHandler 指定了同一个 EventExecutorGroup ,那么这些 channelHandler 的 executor 均会绑定到一个固定的 eventExecutor 上。

SINGLE_EVENTEXECUTOR_PER_GROUP绑定.png

这种固定的绑定关系缓存于每个 pipeline 中的 Map<EventExecutorGroup, EventExecutor> childExecutors 字段中,key 是用户为 channelHandler 指定的 EventExecutorGroup ,value 为该 EventExecutorGroup 在 pipeline 实例中的绑定 eventExecutor 。

接下来就是从 childExecutors 中获取指定 EventExecutorGroup 在该 pipeline 实例中的绑定 eventExecutor,如果绑定关系还未建立,则通过 round-robin 的方式从 EventExecutorGroup 中选取一个 eventExecutor 进行绑定,并在 childExecutor 中缓存绑定关系。

如果绑定关系已经建立,则直接为 ChannelHandler 指定绑定好的 eventExecutor。

5.3 ChanneHandlerContext

在介绍完创建 ChannelHandlerContext 的两个前置操作后,我们回头来看下 ChannelHandlerContext 中包含了哪些具体的上下文信息。

final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {

    // ChannelHandlerContext包裹的channelHandler
    private final ChannelHandler handler;

    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, handler.getClass());
        //包裹的channelHandler
        this.handler = handler;
    }

    @Override
    public ChannelHandler handler() {
        return handler;
    }
}
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
    //对应channelHandler的名称
    private final String name;

    //ChannelHandlerContext中持有pipeline的引用
    private final DefaultChannelPipeline pipeline;

    // channelHandler对应的executor 默认为reactor
    final EventExecutor executor;

    //channelHandlerContext中保存channelHandler的执行条件掩码(是什么类型的ChannelHandler,对什么事件感兴趣)
    private final int executionMask;

    //false表示 当channelHandler的状态为ADD_PENDING的时候,也可以响应pipeline中的事件
    //true表示只有在channelHandler的状态为ADD_COMPLETE的时候才能响应pipeline中的事件
    private final boolean ordered;

    //channelHandelr的状态,初始化为INIT
    private volatile int handlerState = INIT;

    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
                                  String name, Class<? extends ChannelHandler> handlerClass) {
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
        //channelHandlerContext中保存channelHandler的执行条件掩码(是什么类型的ChannelHandler,对什么事件感兴趣)
        this.executionMask = mask(handlerClass);
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }

}

ChannelHandlerContext.png

这里笔者重点介绍 orderd 属性和 executionMask 属性,其他的属性大家很容易理解。

  ordered = executor == null || executor instanceof OrderedEventExecutor;

当我们不指定 channelHandler 的 executor 时或者指定的 executor 类型为 OrderedEventExecutor 时,ordered = true。

那么这个 ordered 属性对于 ChannelHandler 响应 pipeline 中的事件有什么影响呢?

我们之前介绍过在 ChannelHandler 响应 pipeline 中的事件之前都会调用 invokeHandler() 方法来判断是否回调 ChannelHandler 的事件回调方法还是跳过。

   private boolean invokeHandler() {
        int handlerState = this.handlerState;
        return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
    }

另一个重要的属性 executionMask 保存的是当前 ChannelHandler 的一些执行条件信息掩码,比如:

    private static final FastThreadLocal<Map<Class<? extends ChannelHandler>, Integer>> MASKS =
            new FastThreadLocal<Map<Class<? extends ChannelHandler>, Integer>>() {
                @Override
                protected Map<Class<? extends ChannelHandler>, Integer> initialValue() {
                    return new WeakHashMap<Class<? extends ChannelHandler>, Integer>(32);
                }
            };

    static int mask(Class<? extends ChannelHandler> clazz) {
        // 因为每建立一个channel就会初始化一个pipeline,这里需要将ChannelHandler对应的mask缓存
        Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
        Integer mask = cache.get(clazz);
        if (mask == null) {
            // 计算ChannelHandler对应的mask(什么类型的ChannelHandler,对什么事件感兴趣)
            mask = mask0(clazz);
            cache.put(clazz, mask);
        }
        return mask;
    }

这里需要一个 FastThreadLocal 类型的 MASKS 字段来缓存 ChannelHandler 对应的执行掩码。因为 ChannelHandler 类一旦被定义出来它的执行掩码就固定了,而 netty 需要接收大量的连接,创建大量的 channel ,并为这些 channel 初始化对应的 pipeline ,需要频繁的记录 channelHandler 的执行掩码到 context 类中,所以这里需要将掩码缓存起来。

    private static int mask0(Class<? extends ChannelHandler> handlerType) {
        int mask = MASK_EXCEPTION_CAUGHT;
        try {
            if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
                //如果该ChannelHandler是Inbound类型的,则先将inbound事件全部设置进掩码中
                mask |= MASK_ALL_INBOUND;

                //最后在对不感兴趣的事件一一排除(handler中的事件回调方法如果标注了@Skip注解,则认为handler对该事件不感兴趣)
                if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
                    mask &= ~MASK_CHANNEL_REGISTERED;
                }
                if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
                    mask &= ~MASK_CHANNEL_UNREGISTERED;
                }
                if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {
                    mask &= ~MASK_CHANNEL_ACTIVE;
                }
                if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) {
                    mask &= ~MASK_CHANNEL_INACTIVE;
                }
                if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
                    mask &= ~MASK_CHANNEL_READ;
                }
                if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) {
                    mask &= ~MASK_CHANNEL_READ_COMPLETE;
                }
                if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) {
                    mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED;
                }
                if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) {
                    mask &= ~MASK_USER_EVENT_TRIGGERED;
                }
            }

            if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
                //如果handler为Outbound类型的,则先将全部outbound事件设置进掩码中
                mask |= MASK_ALL_OUTBOUND;

                //最后对handler不感兴趣的事件从掩码中一一排除
                if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
                        SocketAddress.class, ChannelPromise.class)) {
                    mask &= ~MASK_BIND;
                }
                if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,
                        SocketAddress.class, ChannelPromise.class)) {
                    mask &= ~MASK_CONNECT;
                }
                if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {
                    mask &= ~MASK_DISCONNECT;
                }
                if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {
                    mask &= ~MASK_CLOSE;
                }
                if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {
                    mask &= ~MASK_DEREGISTER;
                }
                if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {
                    mask &= ~MASK_READ;
                }
                if (isSkippable(handlerType, "write", ChannelHandlerContext.class,
                        Object.class, ChannelPromise.class)) {
                    mask &= ~MASK_WRITE;
                }
                if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) {
                    mask &= ~MASK_FLUSH;
                }
            }

            if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {
                mask &= ~MASK_EXCEPTION_CAUGHT;
            }
        } catch (Exception e) {
            // Should never reach here.
            PlatformDependent.throwException(e);
        }

        //计算出的掩码需要缓存,因为每次向pipeline中添加该类型的handler的时候都需要获取掩码(创建一个channel 就需要为其初始化pipeline)
        return mask;
    }

计算 ChannelHandler 的执行掩码 mask0 方法虽然比较长,但是逻辑却十分简单。在本文的第三小节《3. pipeline中的事件分类》中,笔者为大家详细介绍了各种事件类型的掩码表示,这里我来看下如何利用这些基本事件掩码来计算出 ChannelHandler 的执行掩码的。

如果 ChannelHandler 是 ChannelInboundHandler 类型的,那么首先会将所有 Inbound 事件掩码设置进执行掩码 mask 中。

最后挨个遍历所有 Inbound 事件,从掩码集合 mask 中排除该 ChannelHandler 不感兴趣的事件。这样一轮下来,就得到了 ChannelHandler 的执行掩码。

从这个过程中我们可以看到,ChannelHandler 的执行掩码包含的是该 ChannelHandler 感兴趣的事件掩码集合。当事件在 pipeline 中传播的时候,在 ChannelHandlerContext 中可以利用这个执行掩码来判断,当前 ChannelHandler 是否符合响应该事件的资格。

同理我们也可以计算出 ChannelOutboundHandler 类型的 ChannelHandler 对应的执行掩码。

那么 netty 框架是如何判断出我们自定义的 ChannelHandler 对哪些事件感兴趣,对哪些事件不感兴趣的呢?

这里我们以 ChannelInboundHandler 类型举例说明,在本文第三小节中,笔者对所有 Inbound 类型的事件作了一个全面的介绍,但是在实际开发中,我们可能并不需要监听所有的 Inbound 事件,可能只是需要监听其中的一到两个事件。

对于我们不感兴趣的事件,我们只需要在其对应的回调方法上标注 @Skip 注解即可,netty 就会认为该 ChannelHandler 对标注 @Skip 注解的事件不感兴趣,当不感兴趣的事件在 pipeline 传播的时候,该 ChannelHandler 就不需要执行响应。

    private static boolean isSkippable(
            final Class<?> handlerType, final String methodName, final Class<?>... paramTypes) throws Exception {
        return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
            @Override
            public Boolean run() throws Exception {
                Method m;
                try {
                    // 首先查看类中是否覆盖实现了对应的事件回调方法
                    m = handlerType.getMethod(methodName, paramTypes);
                } catch (NoSuchMethodException e) {
                    if (logger.isDebugEnabled()) {
                        logger.debug(
                            "Class {} missing method {}, assume we can not skip execution", handlerType, methodName, e);
                    }
                    return false;
                }
                return m != null && m.isAnnotationPresent(Skip.class);
            }
        });
    }

那我们在编写自定义 ChannelHandler 的时候是不是要在 ChannelInboundHandler 或者 ChannelOutboundHandler 接口提供的所有事件回调方法上,对我们不感兴趣的事件繁琐地一一标注 @Skip 注解呢?

其实是不需要的,netty 为我们提供了 ChannelInboundHandlerAdapter 类和 ChannelOutboundHandlerAdapter 类,netty 事先已经在这些 Adapter 类中的事件回调方法上全部标注了 @Skip 注解,我们在自定义实现 ChannelHandler 的时候只需要继承这些 Adapter 类并覆盖我们感兴趣的事件回调方法即可。

public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {

    @Skip
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelRegistered();
    }

    @Skip
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelUnregistered();
    }

    @Skip
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
    }

    @Skip
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelInactive();
    }

    @Skip
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg);
    }

    @Skip
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelReadComplete();
    }

    @Skip
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        ctx.fireUserEventTriggered(evt);
    }

    @Skip
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelWritabilityChanged();
    }

    @Skip
    @Override
    @SuppressWarnings("deprecation")
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        ctx.fireExceptionCaught(cause);
    }
}

6 . 从 pipeline 删除 channelHandler

从上个小节的内容中我们可以看到向 pipeline 中添加 ChannelHandler 的逻辑还是比较复杂的,涉及到的细节比较多。

那么在了解了向 pipeline 中添加 ChannelHandler 的过程之后,从 pipeline 中删除 ChannelHandler 的逻辑就变得很好理解了。

public interface ChannelPipeline
        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {

    //从pipeline中删除指定的channelHandler
    ChannelPipeline remove(ChannelHandler handler);
    //从pipeline中删除指定名称的channelHandler
    ChannelHandler remove(String name);
    //从pipeline中删除特定类型的channelHandler
    <T extends ChannelHandler> T remove(Class<T> handlerType);
}

netty 提供了以上三种方式从 pipeline 中删除指定 ChannelHandler ,下面我们以第一种方式为例来介绍 ChannelHandler 的删除过程。

public class DefaultChannelPipeline implements ChannelPipeline {

    @Override
    public final ChannelPipeline remove(ChannelHandler handler) {
        remove(getContextOrDie(handler));
        return this;
    }

}

6.1 getContextOrDie

首先需要通过 getContextOrDie 方法在 pipeline 中查找到指定的 ChannelHandler 对应的 ChannelHandelrContext 。以便确认要删除的 ChannelHandler 确实是存在于 pipeline 中。

context 方法是通过遍历 pipeline 中的双向链表来查找要删除的 ChannelHandlerContext 。

    private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
        AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
        if (ctx == null) {
            throw new NoSuchElementException(handler.getClass().getName());
        } else {
            return ctx;
        }
    }

    @Override
    public final ChannelHandlerContext context(ChannelHandler handler) {
        ObjectUtil.checkNotNull(handler, "handler");
        // 获取 pipeline 双向链表结构的头结点
        AbstractChannelHandlerContext ctx = head.next;
        for (;;) {

            if (ctx == null) {
                return null;
            }

            if (ctx.handler() == handler) {
                return ctx;
            }

            ctx = ctx.next;
        }
    }

6.2 remove

remove 方法的整体代码结构和 addLast0 方法的代码结构一样,整体逻辑也是先从 pipeline 中的双向链表结构中将指定的 ChanneHandlerContext 删除,然后在处理被删除的 ChannelHandler 中 handlerRemoved 方法的回调。

   private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
        assert ctx != head && ctx != tail;

        synchronized (this) {
            //从pipeline的双向列表中删除指定channelHandler对应的context
            atomicRemoveFromHandlerList(ctx);

            if (!registered) {
                //如果此时channel还未向reactor注册,则通过向pipeline中添加PendingHandlerRemovedTask任务
                //在注册之后回调channelHandelr中的handlerRemoved方法
                callHandlerCallbackLater(ctx, false);
                return ctx;
            }

            //channelHandelr从pipeline中删除后,需要回调其handlerRemoved方法
            //需要确保handlerRemoved方法在channelHandelr指定的executor中进行
            EventExecutor executor = ctx.executor();
            if (!executor.inEventLoop()) {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerRemoved0(ctx);
                    }
                });
                return ctx;
            }
        }
        callHandlerRemoved0(ctx);
        return ctx;
    }

1 . 从 pipeline 中删除指定 ChannelHandler 对应的 ChannelHandlerContext 。逻辑比较简单,就是普通双向链表的删除操作。

    private synchronized void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) {
        AbstractChannelHandlerContext prev = ctx.prev;
        AbstractChannelHandlerContext next = ctx.next;
        prev.next = next;
        next.prev = prev;
    }

2 . 如果此时 channel 并未向对应的 reactor 进行注册,则需要向 pipeline 的任务列表中添加 PendingHandlerRemovedTask 任务,再该任务中会执行 ChannelHandler 的 handlerRemoved 回调,当 channel 向 reactor 注册成功后,reactor 会执行 pipeline 中任务列表中的任务,从而回调被删除 ChannelHandler 的 handlerRemoved 方法。

pipeline任务.png

    private final class PendingHandlerRemovedTask extends PendingHandlerCallback {

        PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) {
            super(ctx);
        }

        @Override
        public void run() {
            callHandlerRemoved0(ctx);
        }
    }

在执行 ChannelHandler 中 handlerRemoved 回调的时候,需要对 ChannelHandler 的状态进行判断:只有当 handlerState 为 ADD_COMPLETE 的时候才能回调 handlerRemoved 方法。

这里表达的语义是只有当 ChannelHanler 的 handlerAdded 方法被回调之后,那么在 ChannelHanler 被从 pipeline 中删除的时候它的 handlerRemoved 方法才可以被回调。

在 ChannelHandler 的 handlerRemove 方法被回调之后,将 ChannelHandler 的状态设置为 REMOVE_COMPLETE 。

   private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {

        try {
            // 在这里回调 handlerRemoved 方法
            ctx.callHandlerRemoved();
        } catch (Throwable t) {
            fireExceptionCaught(new ChannelPipelineException(
                    ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
        }
    }

    final void callHandlerRemoved() throws Exception {
        try {
            if (handlerState == ADD_COMPLETE) {
                handler().handlerRemoved(this);
            }
        } finally {
            // Mark the handler as removed in any case.
            setRemoved();
        }
    }

   final void setRemoved() {
        handlerState = REMOVE_COMPLETE;
    }

3 . 如果 channel 已经在 reactor 中注册成功,那么当 channelHandler 从 pipeline 中删除之后,需要立即回调其 handlerRemoved 方法。但是需要确保 handlerRemoved 方法在 channelHandler 指定的 executor 中进行。

7 . pipeline 的初始化

其实关于 pipeline 初始化的相关内容我们在[《详细图解 Netty Reactor 启动全流程》] 中已经简要介绍了 NioServerSocketChannel 中的 pipeline 的初始化时机以及过程。

在[《Netty 如何高效接收网络连接》] 中笔者也简要介绍了 NioSocketChannel 中 pipeline 的初始化时机以及过程。

本小节笔者将结合这两种类型的 Channel 来完整全面的介绍 pipeline 的整个初始化过程。

7.1 NioServerSocketChannel 中 pipeline 的初始化

从前边提到的这两篇文章以及本文前边的相关内容我们知道,Netty 提供了一个特殊的 ChannelInboundHandler 叫做 ChannelInitializer ,用户可以利用这个特殊的 ChannelHandler 对 Channel 中的 pipeline 进行自定义的初始化逻辑。

如果用户只希望在 pipeline 中添加一个固定的 ChannelHandler 可以通过如下代码直接添加。

            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)//配置主从Reactor
                  ...........
             .handler(new LoggingHandler(LogLevel.INFO))

如果希望添加多个 ChannelHandler ,则可以通过 ChannelInitializer 来自定义添加逻辑。

由于使用 ChannelInitializer 初始化 NioServerSocketChannel 中 pipeline 的逻辑会稍微复杂一点,下面我们均以这个复杂的案例来讲述 pipeline 的初始化过程。

            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)//配置主从Reactor
                  ...........
               .handler(new ChannelInitializer<NioServerSocketChannel>() {
                 @Override
                 protected void initChannel(NioServerSocketChannel ch) throws Exception {
                              ....自定义pipeline初始化逻辑....
                               ChannelPipeline p = ch.pipeline();
                               p.addLast(channelHandler1);
                               p.addLast(channelHandler2);
                               p.addLast(channelHandler3);
                                    ........
                 }
             })

以上这些由用户自定义的用于初始化 pipeline 的 ChannelInitializer ,被保存至 ServerBootstrap 启动类中的 handler 字段中。用于后续的初始化调用

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable
   private volatile ChannelHandler handler;
}

在服务端启动的时候,会伴随着 NioServeSocketChannel 的创建以及初始化,在初始化 NioServerSokcetChannel 的时候会将一个新的 ChannelInitializer 添加进 pipeline 中,在新的 ChannelInitializer 中才会将用户自定义的 ChannelInitializer 添加进 pipeline 中,随后才执行初始化过程。

Netty 这里之所以引入一个新的 ChannelInitializer 来初始化 NioServerSocketChannel 中的 pipeline 的原因是需要兼容前边介绍的这两种初始化 pipeline 的方式。

忘记 netty 启动过程的同学可以在回看下笔者的[《详细图解 Netty Reactor 启动全流程》] 这篇文章。

   @Override
    void init(Channel channel) {
       .........

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                //ServerBootstrap中用户指定的channelHandler
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                .........
            }
        });
   }

注意此时 NioServerSocketChannel 并未开始向 Main Reactor 注册,根据本文第四小节《4. 向 pipeline 添加 channelHandler 》中的介绍,此时向 pipeline 中添加这个新的 ChannelInitializer 之后,netty 会向 pipeline 的任务列表中添加 PendingHandlerAddedTask 。当 NioServerSocketChannel 向 Main Reactor 注册成功之后,紧接着 Main Reactor 线程会调用这个 PendingHandlerAddedTask ,在任务中会执行这个新的 ChannelInitializer 的 handlerAdded 回调。在这个回调方法中会执行上边 initChannel 方法里的代码。

server channel pipeline 注册前结构.png

当 NioServerSocketChannel 在向 Main Reactor 注册成功之后,就挨个执行 pipeline 中的任务列表中的任务。

       private void register0(ChannelPromise promise) {
                     .........
                boolean firstRegistration = neverRegistered;
                //执行真正的注册操作
                doRegister();
                //修改注册状态
                neverRegistered = false;
                registered = true;
                //调用pipeline中的任务链表,执行PendingHandlerAddedTask
                pipeline.invokeHandlerAddedIfNeeded();
                .........
   final void invokeHandlerAddedIfNeeded() {
        assert channel.eventLoop().inEventLoop();
        if (firstRegistration) {
            firstRegistration = false;
            // 执行 pipeline 任务列表中的 PendingHandlerAddedTask 任务。
            callHandlerAddedForAllHandlers();
        }
    }

执行 pipeline 任务列表中的 PendingHandlerAddedTask 任务:

    private void callHandlerAddedForAllHandlers() {
        // pipeline 任务列表中的头结点
        final PendingHandlerCallback pendingHandlerCallbackHead;
        synchronized (this) {
            assert !registered;
            // This Channel itself was registered.
            registered = true;
            pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
            // Null out so it can be GC'ed.
            this.pendingHandlerCallbackHead = null;
        }

        PendingHandlerCallback task = pendingHandlerCallbackHead;
        // 挨个执行任务列表中的任务
        while (task != null) {
            //触发 ChannelInitializer 的 handlerAdded 回调
            task.execute();
            task = task.next;
        }
    }

最终在 PendingHandlerAddedTask 中执行 pipeline 中 ChannelInitializer 的 handlerAdded 回调。

这个 ChannelInitializer 就是在初始化 NioServerSocketChannel 的 init 方法中向 pipeline 添加的 ChannelInitializer。

@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {

            if (initChannel(ctx)) {
                //初始化工作完成后,需要将自身从pipeline中移除
                removeState(ctx);
            }
        }
    }

}

在 handelrAdded 回调中执行 ChannelInitializer 匿名类中 initChannel 方法,注意此时执行的 ChannelInitializer 类为在本小节开头 init 方法中由 Netty 框架添加的 ChannelInitializer ,并不是用户自定义的 ChannelInitializer 。


    @Override
    void init(Channel channel) {
       .........

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                //ServerBootstrap中用户指定的ChannelInitializer
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                .........
            }
        });
   }

执行完 ChannelInitializer 匿名类中 initChannel 方法后,需将 ChannelInitializer 从 pipeline 中删除。并回调 ChannelInitializer 的 handlerRemoved 方法。删除过程笔者已经在第六小节《6. 从 pipeline 删除 channelHandler》详细介绍过了。

    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.add(ctx)) { // Guard against re-entrance.
            try {
                //执行ChannelInitializer匿名类中的initChannel方法
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                exceptionCaught(ctx, cause);
            } finally {
                ChannelPipeline pipeline = ctx.pipeline();
                if (pipeline.context(this) != null) {
                    //初始化完毕后,从pipeline中移除自身
                    pipeline.remove(this);
                }
            }
            return true;
        }
        return false;
    }

当执行完 initChannel 方法后此时 pipeline 的结构如下图所示:

serverSocketChannel注册之后pipeline结构.png

当用户的自定义 ChannelInitializer 被添加进 pipeline 之后,根据第四小节所讲的添加逻辑,此时 NioServerSocketChannel 已经向 main reactor 成功注册完毕,不再需要向 pipeine 的任务列表中添加 PendingHandlerAddedTask 任务,而是直接调用自定义 ChannelInitializer 中的 handlerAdded 回调,和上面的逻辑一样。不同的是这里最终回调至用户自定义的初始化逻辑实现 initChannel 方法中。执行完用户自定义的初始化逻辑之后,从 pipeline 删除用户自定义的 ChannelInitializer 。

            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)//配置主从Reactor
                  ...........
               .handler(new ChannelInitializer<NioServerSocketChannel>() {
                 @Override
                 protected void initChannel(NioServerSocketChannel ch) throws Exception {
                              ....自定义pipeline初始化逻辑....
                               ChannelPipeline p = ch.pipeline();
                               p.addLast(channelHandler1);
                               p.addLast(channelHandler2);
                               p.addLast(channelHandler3);
                                    ........
                 }
             })

随后 netty 会以异步任务的形式向 pipeline 的末尾添加 ServerBootstrapAcceptor ,至此 NioServerSocketChannel 中 pipeline 的初始化工作就全部完成了。

7.2 NioSocketChannel 中 pipeline 的初始化

在 7.1 小节中笔者举的这个 pipeline 初始化的例子相对来说比较复杂,当我们把这个复杂例子的初始化逻辑搞清楚之后,NioSocketChannel 中 pipeline 的初始化过程就变的很简单了。

            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)//配置主从Reactor
                  ...........
               .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(SocketChannel ch) throws Exception {
                              ....自定义pipeline初始化逻辑....
                               ChannelPipeline p = ch.pipeline();
                               p.addLast(channelHandler1);
                               p.addLast(channelHandler2);
                               p.addLast(channelHandler3);
                                    ........
                 }
             })
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
    //保存用户自定义ChannelInitializer
    private volatile ChannelHandler childHandler;
}

在[《Netty 如何高效接收网络连接》] 一文中我们介绍过,当客户端发起连接,完成三次握手之后,NioServerSocketChannel 上的 OP_ACCEPT 事件活跃,随后会在 NioServerSocketChannel 的 pipeline 中触发 channelRead 事件。并最终在 ServerBootstrapAcceptor 中初始化客户端 NioSocketChannel 。

主从Reactor组完整结构.png

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

       @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;
            child.pipeline().addLast(childHandler);
                    ...........
       }
}

在这里会将用户自定义的 ChannelInitializer 添加进 NioSocketChannel 中的 pipeline 中,由于此时 NioSocketChannel 还没有向 sub reactor 开始注册。所以在向 pipeline 中添加 ChannelInitializer 的同时会伴随着 PendingHandlerAddedTask 被添加进 pipeline 的任务列表中。

niosocketchannel中pipeline的初始化.png

后面的流程大家应该很熟悉了,和我们在7.1小节中介绍的一模一样,当 NioSocketChannel 再向 sub reactor 注册成功之后,会执行 pipeline 中的任务列表中的 PendingHandlerAddedTask 任务,在 PendingHandlerAddedTask 任务中会回调用户自定义 ChannelInitializer 的 handelrAdded 方法,在该方法中执行 initChannel 方法,用户自定义的初始化逻辑就封装在这里面。在初始化完 pipeline 后,将 ChannelInitializer 从 pipeline 中删除,并回调其 handlerRemoved 方法。

至此客户端 NioSocketChannel 中 pipeline 初始化工作就全部完成了。

8 . 事件传播

在本文第三小节《3. pipeline中的事件分类》中我们介绍了 Netty 事件类型共分为三大类,分别是 Inbound类事件,Outbound类事件,ExceptionCaught事件。并详细介绍了这三类事件的掩码表示,和触发时机,以及事件传播的方向。

本小节我们就来按照 Netty 中异步事件的分类从源码角度分析下事件是如何在 pipeline 中进行传播的。

8.1 Inbound事件的传播

在第三小节中我们介绍了所有的 Inbound 类事件,这些事件在 pipeline 中的传播逻辑和传播方向都是一样的,唯一的区别就是执行的回调方法不同。

本小节我们就以 ChannelRead 事件的传播为例,来说明 Inbound 类事件是如何在 pipeline 中进行传播的。

第三小节中我们提到过,在 NioSocketChannel 中,ChannelRead 事件的触发时机是在每一次 read loop 读取数据之后在 pipeline 中触发的。

              do {
                          ............               
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));

                          ............

                    // 在客户端NioSocketChannel的pipeline中触发ChannelRead事件
                    pipeline.fireChannelRead(byteBuf);

                } while (allocHandle.continueReading());

从这里可以看到,任何 Inbound 类事件在 pipeline 中的传播起点都是从 HeadContext 头结点开始的。

public class DefaultChannelPipeline implements ChannelPipeline {

    @Override
    public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }

                    .........
}

ChannelRead 事件从 HeadContext 开始在 pipeline 中传播,首先就会回调 HeadContext 中的 channelRead 方法。

在执行 ChannelHandler 中的相应事件回调方法时,需要确保回调方法的执行在指定的 executor 中进行。

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        //需要保证channelRead事件回调在channelHandler指定的executor中进行
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }

    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                invokeExceptionCaught(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }

在执行 HeadContext 的 channelRead 方法发生异常时,就会回调 HeadContext 的 exceptionCaught 方法。并在相应的事件回调方法中决定是否将事件继续在 pipeline 中传播。

    final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            ctx.fireChannelRead(msg);
        }

       @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            ctx.fireExceptionCaught(cause);
        }
    }

在 HeadContext 中通过 ctx.fireChannelRead(msg) 继续将 ChannelRead 事件在 pipeline 中向后传播。

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {

    @Override
    public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
        return this;
    }

}

这里的 findContextInbound 方法是整个 inbound 类事件在 pipeline 中传播的核心所在。

因为我们现在需要继续将 ChannelRead 事件在 pipeline 中传播,所以我们目前的核心问题就是通过 findContextInbound 方法在 pipeline 中找到下一个对 ChannelRead 事件感兴趣的 ChannelInboundHandler 。然后执行该 ChannelInboundHandler 的 ChannelRead 事件回调。

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        //需要保证channelRead事件回调在channelHandler指定的executor中进行
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }

ChannelRead 事件就这样循环往复的一直在 pipeline 中传播,在传播的过程中只有对 ChannelRead 事件感兴趣的 ChannelInboundHandler 才可以响应。其他类型的 ChannelHandler 则直接跳过。

如果 ChannelRead 事件在 pipeline 中传播的过程中,没有得到其他 ChannelInboundHandler 的有效处理,最终会被传播到 pipeline 的末尾 TailContext 中。而在本文第二小节中,我们也提到过 TailContext 对于 inbound 事件存在的意义就是做一个兜底的处理。比如:打印日志,释放 bytebuffer 。

 final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
            onUnhandledInboundMessage(ctx, msg);
    }

    protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
        onUnhandledInboundMessage(msg);
        if (logger.isDebugEnabled()) {
            logger.debug("Discarded message pipeline : {}. Channel : {}.",
                         ctx.pipeline().names(), ctx.channel());
        }
    }

    protected void onUnhandledInboundMessage(Object msg) {
        try {
            logger.debug(
                    "Discarded inbound message {} that reached at the tail of the pipeline. " +
                            "Please check your pipeline configuration.", msg);
        } finally {
            // 释放DirectByteBuffer
            ReferenceCountUtil.release(msg);
        }
    }

}

8.2 findContextInbound

本小节要介绍的 findContextInbound 方法和我们在上篇文章[《一文聊透 Netty 发送数据全流程》] 中介绍的 findContextOutbound 方法均是 netty 异步事件在 pipeline 中传播的核心所在。

事件传播的核心问题就是需要高效的在 pipeline 中按照事件的传播方向,找到下一个具有响应事件资格的 ChannelHandler 。

比如:这里我们在 pipeline 中传播的 ChannelRead 事件,我们就需要在 pipeline 中找到下一个对 ChannelRead 事件感兴趣的 ChannelInboundHandler ,并执行该 ChannelInboudnHandler 的 ChannelRead 事件回调,在 ChannelRead 事件回调中对事件进行业务处理,并决定是否通过 ctx.fireChannelRead(msg) 将 ChannelRead 事件继续向后传播。

   private AbstractChannelHandlerContext findContextInbound(int mask) {
        AbstractChannelHandlerContext ctx = this;
        EventExecutor currentExecutor = executor();
        do {
            ctx = ctx.next;
        } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));

        return ctx;
    }

参数 mask 表示我们正在传播的 ChannelRead 事件掩码 MASK_CHANNEL_READ 。

    static final int MASK_EXCEPTION_CAUGHT = 1;
    static final int MASK_CHANNEL_REGISTERED = 1 << 1;
    static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
    static final int MASK_CHANNEL_ACTIVE = 1 << 3;
    static final int MASK_CHANNEL_INACTIVE = 1 << 4;
    static final int MASK_CHANNEL_READ = 1 << 5;
    static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
    static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
    static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;

通过 ctx = ctx.next 在 pipeline 中找到下一个 ChannelHandler ,并通过 skipContext 方法判断下一个 ChannelHandler 是否具有响应事件的资格。如果没有则跳过继续向后查找。

比如:下一个 ChannelHandler 如果是一个 ChannelOutboundHandler,或者下一个 ChannelInboundHandler 对 ChannelRead 事件不感兴趣,那么就直接跳过。

8.3 skipContext

该方法主要用来判断下一个 ChannelHandler 是否具有 mask 代表的事件的响应资格。

    private static boolean skipContext(
            AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {

        return (ctx.executionMask & (onlyMask | mask)) == 0 ||
                (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
    }

以上就是 skipContext 方法的核心逻辑,这里表达的核心语义是:

这里大部分同学可能会对 ctx.executor() == currentExecutor 这个条件感到很疑惑。加上这个条件,其实对我们这里的核心语义并没有多大影响。

public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {

    @Skip
    @Override
    public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
            ChannelPromise promise) throws Exception {
        ctx.bind(localAddress, promise);
    }

    @Skip
    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception {
        ctx.connect(remoteAddress, localAddress, promise);
    }

    @Skip
    @Override
    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
            throws Exception {
        ctx.disconnect(promise);
    }

    @Skip
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise)
            throws Exception {
        ctx.close(promise);
    }

    @Skip
    @Override
    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        ctx.deregister(promise);
    }

    @Skip
    @Override
    public void read(ChannelHandlerContext ctx) throws Exception {
        ctx.read();
    }

    @Skip
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ctx.write(msg, promise);
    }

    @Skip
    @Override
    public void flush(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
}

而这里之所以需要加入 ctx.executor() == currentExecutor 条件的判断,是为了防止 HttpContentCompressor 在被指定不同的 executor 情况下无法正确的创建压缩内容,导致的一些异常。但这个不是本文的重点,大家只需要理解这里的核心语义就好,这种特殊情况的特殊处理了解一下就好。

8.4 Outbound事件的传播

关于 Outbound 类事件的传播,笔者在上篇文章[《一文搞懂 Netty 发送数据全流程》] 中已经进行了详细的介绍,本小节就不在赘述。

8.5 ExceptionCaught事件的传播

在最后我们来介绍下异常事件在 pipeline 中的传播,ExceptionCaught 事件和 Inbound 类事件一样都是在 pipeline 中从前往后开始传播。

ExceptionCaught 事件的触发有两种情况:一种是 netty 框架内部产生的异常,这时 netty 会直接在 pipeline 中触发 ExceptionCaught 事件的传播。异常事件会在 pipeline 中从 HeadContext 开始一直向后传播直到 TailContext。

比如 netty 在 read loop 中读取数据时发生异常:

     try {
               ...........

               do {
                          ............               
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));

                          ............

                    //客户端NioSocketChannel的pipeline中触发ChannelRead事件
                    pipeline.fireChannelRead(byteBuf);

                } while (allocHandle.continueReading());

                         ...........
        }  catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
       } 

这时会 netty 会直接从 pipeline 中触发 ExceptionCaught 事件的传播。

      private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
                RecvByteBufAllocator.Handle allocHandle) {

                    .............

            pipeline.fireExceptionCaught(cause);

                    .............

        }

和 Inbound 类事件一样,ExceptionCaught 事件会在 pipeline 中从 HeadContext 开始一直向后传播。

    @Override
    public final ChannelPipeline fireExceptionCaught(Throwable cause) {
        AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
        return this;
    }

第二种触发 ExceptionCaught 事件的情况是,当 Inbound 类事件或者 flush 事件在 pipeline 中传播的过程中,在某个 ChannelHandler 中的事件回调方法处理中发生异常,这时该 ChannelHandler 的 exceptionCaught 方法会被回调。用户可以在这里处理异常事件,并决定是否通过 ctx.fireExceptionCaught(cause) 继续向后传播异常事件。

比如我们在 ChannelInboundHandler 中的 ChannelRead 回调中处理业务请求时发生异常,就会触发该 ChannelInboundHandler 的 exceptionCaught 方法。

    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                invokeExceptionCaught(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }
    private void invokeExceptionCaught(final Throwable cause) {
        if (invokeHandler()) {
            try {
                //触发channelHandler的exceptionCaught回调
                handler().exceptionCaught(this, cause);
            } catch (Throwable error) {
                  ........
        } else {
                  ........
        }
    }

再比如:当我们在 ChannelOutboundHandler 中的 flush 回调中处理业务结果发送的时候发生异常,也会触发该 ChannelOutboundHandler 的 exceptionCaught 方法。

   private void invokeFlush0() {
        try {
            ((ChannelOutboundHandler) handler()).flush(this);
        } catch (Throwable t) {
            invokeExceptionCaught(t);
        }
    }

我们可以在 ChannelHandler 的 exceptionCaught 回调中进行异常处理,并决定是否通过 ctx.fireExceptionCaught(cause) 继续向后传播异常事件。

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {

        .........异常处理.......

        ctx.fireExceptionCaught(cause);
    }
    @Override
    public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
        invokeExceptionCaught(findContextInbound(MASK_EXCEPTION_CAUGHT), cause);
        return this;
    }

   static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
        ObjectUtil.checkNotNull(cause, "cause");
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeExceptionCaught(cause);
        } else {
            try {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        next.invokeExceptionCaught(cause);
                    }
                });
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Failed to submit an exceptionCaught() event.", t);
                    logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
                }
            }
        }
    }

8.6 ExceptionCaught 事件和 Inbound 类事件的区别

虽然 ExceptionCaught 事件和 Inbound 类事件在传播方向都是在 pipeline 中从前向后传播。但是大家这里注意区分这两个事件的区别。

在 Inbound 类事件传播过程中是会查找下一个具有事件响应资格的 ChannelInboundHandler 。遇到 ChannelOutboundHandler 会直接跳过。

而 ExceptionCaught 事件无论是在哪种类型的 channelHandler 中触发的,都会从当前异常 ChannelHandler 开始一直向后传播,ChannelInboundHandler 可以响应该异常事件,ChannelOutboundHandler 也可以响应该异常事件。

由于无论异常是在 ChannelInboundHandler 中产生的还是在 ChannelOutboundHandler 中产生的, exceptionCaught 事件都会在 pipeline 中是从前向后传播,并且不关心 ChannelHandler 的类型。所以我们一般将负责统一异常处理的 ChannelHandler 放在 pipeline 的最后,这样它对于 inbound 类异常和 outbound 类异常均可以捕获得到。

异常事件的传播.png


总结

本文涉及到的内容比较多,通过 netty 异步事件在 pipeline 中的编排和传播这条主线,我们相当于将之前的文章内容重新又回顾总结了一遍。

本文中我们详细介绍了 pipeline 的组成结构,它主要是由 ChannelHandlerContext 类型节点组成的双向链表。ChannelHandlerContext 包含了 ChannelHandler 执行上下文的信息,从而可以使 ChannelHandler 只关注于 IO 事件的处理,遵循了单一原则和开闭原则。

此外 pipeline 结构中还包含了一个任务链表,用来存放执行 ChannelHandler 中的 handlerAdded 回调和 handlerRemoved 回调。pipeline 还持有了所属 channel 的引用。

我们还详细介绍了 Netty 中异步事件的分类:Inbound 类事件,Outbound 类事件,ExceptionCaught 事件。并详细介绍了每种分类下的所有事件的触发时机和在 pipeline 中的传播路径。

最后介绍了 pipeline 的结构以及创建和初始化过程,以及对 pipeline 相关操作的源码实现。

中间我们又穿插介绍了 ChannelHanderContext 的结构,介绍了 ChannelHandlerContext 具体封装了哪些关于 ChannelHandler 执行的上下文信息。

本文的内容到这里就结束了,感谢大家的观看,我们下篇文章见~~~

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8