Netty之旅:你想要的NIO知识点,这里都有!

328次阅读  |  发布于3年以前

大家好, 本篇给大家带来的是 Netty 核心知识。建议收藏起来慢慢看~

NIO思维导图.png

一、网络编程基础回顾

1. Socket

Socket本身有“插座”的意思,不是Java中特有的概念,而是一个语言无关的标准,任何可以实现网络编程的编程语言都有Socket。在Linux环境下,用于表示进程间网络通信的特殊文件类型,其本质为内核借助缓冲区形成的伪文件。既然是文件,那么理所当然的,我们可以使用文件描述符引用套接字。

与管道类似的,Linux系统将其封装成文件的目的是为了统一接口,使得读写套接字和读写文件的操作一致。区别是管道主要应用于本地进程间通信,而套接字多应用于网络进程间数据的传递。

可以这么理解:Socket就是网络上的两个应用程序通过一个双向通信连接实现数据交换的编程接口API。

Socket通信的基本流程具体步骤如下所示:

(1)服务端通过Listen开启监听,等待客户端接入。

(2)客户端的套接字通过Connect连接服务器端的套接字,服务端通过Accept接收客户端连接。在connect-accept过程中,操作系统将会进行三次握手。

(3)客户端和服务端通过writeread发送和接收数据,操作系统将会完成TCP数据的确认、重发等步骤。

(4)通过close关闭连接,操作系统会进行四次挥手。

针对Java编程语言,java.net包是网络编程的基础类库。其中ServerSocketSocket是网络编程的基础类型。

SeverSocket是服务端应用类型。Socket是建立连接的类型。当连接建立成功后,服务器和客户端都会有一个Socket对象示例,可以通过这个Socket对象示例,完成会话的所有操作。对于一个完整的网络连接来说,Socket是平等的,没有服务器客户端分级情况。

2. IO模型介绍

对于一次IO操作,数据会先拷贝到内核空间中,然后再从内核空间拷贝到用户空间中,所以一次read操作,会经历两个阶段:

(1)等待数据准备

(2)数据从内核空间拷贝到用户空间

基于以上两个阶段就产生了五种不同的IO模式。

  1. 阻塞IO:从进程发起IO操作,一直等待上述两个阶段完成,此时两阶段一起阻塞。
  2. 非阻塞IO:进程一直询问IO准备好了没有,准备好了再发起读取操作,这时才把数据从内核空间拷贝到用户空间。第一阶段不阻塞但要轮询,第二阶段阻塞。
  3. 多路复用IO:多个连接使用同一个select去询问IO准备好了没有,如果有准备好了的,就返回有数据准备好了,然后对应的连接再发起读取操作,把数据从内核空间拷贝到用户空间。两阶段分开阻塞。
  4. 信号驱动IO:进程发起读取操作会立即返回,当数据准备好了会以通知的形式告诉进程,进程再发起读取操作,把数据从内核空间拷贝到用户空间。第一阶段不阻塞,第二阶段阻塞。
  5. 异步IO:进程发起读取操作会立即返回,等到数据准备好且已经拷贝到用户空间了再通知进程拿数据。两个阶段都不阻塞。

这五种IO模式不难发现存在这两对关系:同步和异步、阻塞和非阻塞。那么稍微解释一下:

同步和异步

同步和异步的区别最大在于异步的话调用者不需要等待处理结果,被调用者会通过回调等机制来通知调用者其返回结果。

阻塞和非阻塞

阻塞和非阻塞是针对进程在访问数据的时候,根据IO操作的就绪状态来采取的不同方式,说白了是一种读取或者写入操作方法的实现方式,阻塞方式下读取或者写入函数将一直等待,而非阻塞方式下,读取或者写入方法会立即返回一个状态值。

如果组合后的同步阻塞(blocking-IO)简称BIO、同步非阻塞(non-blocking-IO)简称NIO和异步非阻塞(asynchronous-non-blocking-IO)简称AIO又代表什么意思呢?

java 中的 BIONIOAIO理解为是 Java 语言在操作系统层面对这三种 IO 模型的封装。程序员在使用这些 封装API 的时候,不需要关心操作系统层面的知识,也不需要根据不同操作系统编写不同的代码,只需要使用Java的API就可以了。由此,为了使读者对这三种模型有个比较具体和递推式的了解,并且和本文主题NIO有个清晰的对比,下面继续延伸。

Java BIO

BIO编程方式通常是是Java的上古产品,自JDK 1.0-JDK1.4就有的东西。编程实现过程为:首先在服务端启动一个ServerSocket来监听网络请求,客户端启动Socket发起网络请求,默认情况下SeverSocket会建立一个线程来处理此请求,如果服务端没有线程可用,客户端则会阻塞等待或遭到拒绝。服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理。大致结构如下:

如果要让 BIO 通信模型能够同时处理多个客户端请求,就必须使用多线程(主要原因是 socket.accept()socket.read()socket.write() 涉及的三个主要函数都是同步阻塞的),也就是说它在接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理,处理完成之后,通过输出流返回应答给客户端,线程销毁。这就是典型的 一请求一应答通信模型 。我们可以设想一下如果这个连接不做任何事情的话就会造成不必要的线程开销,不过可以通过线程池机制改善,线程池还可以让线程的创建和回收成本相对较低。使用线程池机制改善后的 BIO 模型图如下:

BIO方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,是JDK1.4以前的唯一选择,但程序直观简单易懂。Java BIO编程示例网上很多,这里就不进行coding举例了,毕竟后面NIO才是重点。

Java NIO

NIO(New IO或者No-Blocking IO),从JDK1.4 开始引入的非阻塞IO,是一种非阻塞+ 同步的通信模式。这里的No Blocking IO用于区分上面的BIO

NIO本身想解决 BIO的并发问题,通过Reactor模式的事件驱动机制来达到Non Blocking的。当 socket 有流可读或可写入 socket 时,操作系统会相应的通知应用程序进行处理,应用再将流读取到缓冲区或写入操作系统。

也就是说,这个时候,已经不是一个连接就 要对应一个处理线程了,而是有效的请求,对应一个线程,当连接没有数据时,是没有工作线程来处理的。

当一个连接创建后,不需要对应一个线程,这个连接会被注册到 多路复用器上面,所以所有的连接只需要一个线程就可以搞定,当这个线程中的多路复用器 进行轮询的时候,发现连接上有请求的话,才开启一个线程进行处理,也就是一个请求一个线程模式。

NIO提供了与传统BIO模型中的SocketServerSocket相对应的SocketChannelServerSocketChannel两种不同的套接字通道实现,如下图结构所示。这里涉及的Reactor设计模式、多路复用SelectorBuffer等暂时不用管,后面会讲到。

NIO 方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局 限于应用中,编程复杂,JDK1.4 开始支持。同时,NIO和普通IO的区别主要可以从存储数据的载体、是否阻塞等来区分:

NIO和普通IO区别.png

Java AIO

NIO 不同,当进行读写操作时,只须直接调用 API 的 readwrite 方法即可。这两种方法均为异步的,对于读操作而言,当有流可读取时,操作系统会将可读的流传入 read 方 法的缓冲区,并通知应用程序;对于写操作而言,当操作系统将 write 方法传递的流写入完毕时,操作系统主动通知应用程序。即可以理解为,read/write 方法都是异步的,完成后会主动调用回调函数。在 JDK7 中,提供了异步文件通道和异步套接字通道的实现,这部分内容被称作 NIO.

AIO 方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用 OS 参与并发操作,编程比较复杂,JDK7 开始支持。

目前来说 AIO 的应用还不是很广泛,Netty 之前也尝试使用过 AIO,不过又放弃了。

二、NIO核心组件介绍

1. Channel

NIO中,基本所有的IO操作都是从Channel开始的,Channel通过Buffer(缓冲区)进行读写操作。

read()表示读取通道中数据到缓冲区,write()表示把缓冲区数据写入到通道。

Channel和Buffer互相操作.png

Channel有好多实现类,这里有三个最常用:

其中SocketChannelServerSocketChannel是网络编程中最常用的,一会在最后的示例代码中会有讲解到具体用法。

2. Buffer

概念

Buffer也被成为内存缓冲区,本质上就是内存中的一块,我们可以将数据写入这块内存,之后从这块内存中读取数据。也可以将这块内存封装成NIO Buffer对象,并提供一组常用的方法,方便我们对该块内存进行读写操作。

Bufferjava.nio中被定义为抽象类:

Buffer结构体系.png

我们可以将Buffer理解为一个数组的封装,我们最常用的ByteBuffer对应的数据结构就是byte[]

属性

Buffer中有4个非常重要的属性:capacity、limit、position、mark

Buffer中基本属性.png

capacity.png

上图中,初始化Buffer的容量为8(图中从0~7,共8个元素),所以capacity = 8

NIO属性概念.png

从上图就能很清晰看出,读写模式下capacity、limit、position的关系了。

这几个重要属性讲完,我们可以再来回顾下:

0 <= mark <= position <= limit <= capacity

现在应该很清晰这几个属性的关系了~

Buffer常见操作

创建Buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
int count = channel.read(buffer);

例子中创建的ByteBuffer是基于堆内存的一个对象。

wrap方法可以将数组包装成一个Buffer对象:

ByteBuffer buffer = ByteBuffer.wrap("hello world".getBytes());
channel.write(buffer);

通过allocateDirect方法也可以快速实例化一个Buffer对象,和allocate很相似,这里区别的是allocateDirect创建的是基于堆外内存的对象。

堆外内存不在JVM堆上,不受GC的管理。堆外内存进行一些底层系统的IO操作时,效率会更高。

Buffer写操作

Buffer写入可以通过put()channel.read(buffer)两种方式写入。

通常我们NIO的读操作的时候,都是从Channel中读取数据写入Buffer,这个对应的是Buffer写操作

Buffer读操作

Buffer读取可以通过get()channel.write(buffer)两种方式读入。

还是同上,我们对Buffer的读入操作,反过来说就是对Channel写操作。读取Buffer中的数据然后写入Channel中。

Channel和Buffer互相操作.png

其他常见方法
public final Buffer rewind() {
    position = 0;
    mark = -1;
    return this;
}
public final Buffer flip() {
    limit = position;
    position = 0;
    mark = -1;
    return this;
}
public final Buffer clear() {
    position = 0;
    limit = capacity;
    mark = -1;
    return this;
}
public final Buffer mark() {
    mark = position;
    return this;
}

public final Buffer reset() {
    int m = mark;
    if (m < 0)
        throw new InvalidMarkException();
    position = m;
    return this;
}

常用的读写方法可以用一张图总结一下:

Buffer读写操作.png

3. Selector

概念

Selector是NIO中最为重要的组件之一,我们常常说的多路复用器就是指的Selector组件。Selector组件用于轮询一个或多个NIO Channel的状态是否处于可读、可写。通过轮询的机制就可以管理多个Channel,也就是说可以管理多个网络连接。

Selector原理图.png

轮询机制

  1. 首先,需要将Channel注册到Selector上,这样Selector才知道需要管理哪些Channel
  2. 接着Selector会不断轮询其上注册的Channel,如果某个Channel发生了读或写的时间,这个Channel就会被Selector轮询出来,然后通过SelectionKey可以获取就绪的Channel集合,进行后续的IO操作。

轮询机制.png

属性操作

1 . 创建Selector

通过open()方法,我们可以创建一个Selector对象。

Selector selector = Selector.open();

2 . 注册Channel到Selector中

我们需要将Channel注册到Selector中,才能够被Selector管理。

channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

某个Channel要注册到Selector中,那么该Channel必须是非阻塞,所有上面代码中有个configureBlocking()的配置操作。

register(Selector selector, int interestSet)方法的第二个参数,标识一个interest集合,意思是Selector对哪些事件感兴趣,可以监听四种不同类型的事件:

public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << ;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;

Channel触发了一个事件,表明该时间已经准备就绪:

当然,Selector是可以同时对多个事件感兴趣的,我们使用或运算即可组合多个事件:

int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

Selector其他一些操作

选择Channel
public abstract int select() throws IOException;
public abstract int select(long timeout) throws IOException;
public abstract int selectNow() throws IOException;

当Selector执行select()方法就会产生阻塞,等到注册在其上的Channel准备就绪就会立即返回,返回准备就绪的数量。

select(long timeout)则是在select()的基础上增加了超时机制。selectNow()立即返回,不产生阻塞。

有一点非常需要注意:select 方法返回的 int 值,表示有多少 Channel 已经就绪。

自上次调用select 方法后有多少 Channel 变成就绪状态。如果调用 select 方法,因为有一个 Channel 变成就绪状态则返回了 1 ;

若再次调用 select 方法,如果另一个 Channel 就绪了,它会再次返回1。

获取可操作的Channel
Set selectedKeys = selector.selectedKeys();

当有新增就绪的Channel,调用select()方法,就会将key添加到Set集合中。

三、代码示例

前面铺垫了这么多,主要是想让大家能够看懂NIO代码示例,也方便后续大家来自己手写NIO 网络编程的程序。创建NIO服务端的主要步骤如下:

1. 打开ServerSocketChannel,监听客户端连接
2. 绑定监听端口,设置连接为非阻塞模式
3. 创建Reactor线程,创建多路复用器并启动线程
4. 将ServerSocketChannel注册到Reactor线程中的Selector上,监听ACCEPT事件
5. Selector轮询准备就绪的key
6. Selector监听到新的客户端接入,处理新的接入请求,完成TCP三次握手,建立物理链路
7. 设置客户端链路为非阻塞模式
8. 将新接入的客户端连接注册到Reactor线程的Selector上,监听读操作,读取客户端发送的网络消息
9. 异步读取客户端消息到缓冲区
10.对Buffer编解码,处理半包消息,将解码成功的消息封装成Task
11.将应答消息编码为Buffer,调用SocketChannel的write将消息异步发送给客户端

NIOServer.java

public class NIOServer {


    private static Selector selector;

    public static void main(String[] args) {
        init();
        listen();
    }

    private static void init() {
        ServerSocketChannel serverSocketChannel = null;

        try {
            selector = Selector.open();

            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.socket().bind(new InetSocketAddress(9000));
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("NioServer 启动完成");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static void listen() {
        while (true) {
            try {
                selector.select();
                Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();
                while (keysIterator.hasNext()) {
                    SelectionKey key = keysIterator.next();
                    keysIterator.remove();
                    handleRequest(key);
                }
            } catch (Throwable t) {
                t.printStackTrace();
            }
        }
    }

    private static void handleRequest(SelectionKey key) throws IOException {
        SocketChannel channel = null;
        try {
            if (key.isAcceptable()) {
                ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                channel = serverSocketChannel.accept();
                channel.configureBlocking(false);
                System.out.println("接受新的 Channel");
                channel.register(selector, SelectionKey.OP_READ);
            }

            if (key.isReadable()) {
                channel = (SocketChannel) key.channel();
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                int count = channel.read(buffer);
                if (count > 0) {
                    System.out.println("服务端接收请求:" + new String(buffer.array(), 0, count));
                    channel.register(selector, SelectionKey.OP_WRITE);
                }
            }

            if (key.isWritable()) {
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                buffer.put("收到".getBytes());
                buffer.flip();

                channel = (SocketChannel) key.channel();
                channel.write(buffer);
                channel.register(selector, SelectionKey.OP_READ);
            }
        } catch (Throwable t) {
            t.printStackTrace();
            if (channel != null) {
                channel.close();
            }
        }
    }
}

NIOClient.java

public class NIOClient {

    public static void main(String[] args) {
        new Worker().start();
    }

    static class Worker extends Thread {
        @Override
        public void run() {
            SocketChannel channel = null;
            Selector selector = null;
            try {
                channel = SocketChannel.open();
                channel.configureBlocking(false);

                selector = Selector.open();
                channel.register(selector, SelectionKey.OP_CONNECT);
                channel.connect(new InetSocketAddress(9000));
                while (true) {
                    selector.select();
                    Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();
                    while (keysIterator.hasNext()) {
                        SelectionKey key = keysIterator.next();
                        keysIterator.remove();

                        if (key.isConnectable()) {
                            System.out.println();
                            channel = (SocketChannel) key.channel();

                            if (channel.isConnectionPending()) {
                                channel.finishConnect();

                                ByteBuffer buffer = ByteBuffer.allocate(1024);
                                buffer.put("你好".getBytes());
                                buffer.flip();
                                channel.write(buffer);
                            }

                            channel.register(selector, SelectionKey.OP_READ);
                        }

                        if (key.isReadable()) {
                            channel = (SocketChannel) key.channel();
                            ByteBuffer buffer = ByteBuffer.allocate(1024);
                            int len = channel.read(buffer);

                            if (len > 0) {
                                System.out.println("[" + Thread.currentThread().getName()
                                        + "]收到响应:" + new String(buffer.array(), 0, len));
                                Thread.sleep(5000);
                                channel.register(selector, SelectionKey.OP_WRITE);
                            }
                        }

                        if(key.isWritable()) {
                            ByteBuffer buffer = ByteBuffer.allocate(1024);
                            buffer.put("你好".getBytes());
                            buffer.flip();

                            channel = (SocketChannel) key.channel();
                            channel.write(buffer);
                            channel.register(selector, SelectionKey.OP_READ);
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally{
                if(channel != null){
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }

                if(selector != null){
                    try {
                        selector.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

打印结果:

// Server端
NioServer 启动完成
接受新的 Channel
服务端接收请求:你好
服务端接收请求:你好
服务端接收请求:你好

// Client端
[Thread-0]收到响应:收到
[Thread-0]收到响应:收到
[Thread-0]收到响应:收到

四、总结

回顾一下使用 NIO 开发服务端程序的步骤:

  1. 创建 ServerSocketChannel 和业务处理线程池。
  2. 绑定监听端口,并配置为非阻塞模式。
  3. 创建 Selector,将之前创建的 ServerSocketChannel 注册到 Selector 上,监听 SelectionKey.OP_ACCEPT
  4. 循环执行 Selector.select()`` 方法,轮询就绪的Channel`。
  5. 轮询就绪的 Channel 时,如果是处于 OP_ACCEPT 状态,说明是新的客户端接入,调用 ServerSocketChannel.accept 接收新的客户端。
  6. 设置新接入的 SocketChannel 为非阻塞模式,并注册到 Selector 上,监听 OP_READ
  7. 如果轮询的 Channel 状态是 OP_READ,说明有新的就绪数据包需要读取,则构造 ByteBuffer 对象,读取数据。

那从这些步骤中基本知道开发者需要熟悉的知识点有:

  1. jdk-nio提供的几个关键类:Selector , SocketChannel , ServerSocketChannel , FileChannel ,ByteBuffer ,SelectionKey
  2. 需要知道网络知识:tcp粘包拆包 、网络闪断、包体溢出及重复发送等
  3. 需要知道linux底层实现,如何正确的关闭channel,如何退出注销selector ,如何避免selector太过于频繁
  4. 需要知道如何让client端获得server端的返回值,然后才返回给前端,需要如何等待或在怎样作熔断机制
  5. 需要知道对象序列化,及序列化算法
  6. 省略等等,因为我已经有点不舒服了,作为程序员的我习惯了舒舒服服简单的API,不用太知道底层细节,就能写出比较健壮和没有Bug的代码...

NIO 原生 API 的弊端 :

① NIO 组件复杂 : 使用原生 NIO 开发服务器端与客户端 , 需要涉及到 服务器套接字通道 ( ServerSocketChannel ) , 套接字通道 ( SocketChannel ) , 选择器 ( Selector ) , 缓冲区 ( ByteBuffer ) 等组件 , 这些组件的原理 和API 都要熟悉 , 才能进行 NIO 的开发与调试 , 之后还需要针对应用进行调试优化

② NIO 开发基础 :NIO 门槛略高 , 需要开发者掌握多线程、网络编程等才能开发并且优化 NIO 网络通信的应用程序

③ 原生 API 开发网络通信模块的基本的传输处理 : 网络传输不光是实现服务器端和客户端的数据传输功能 , 还要处理各种异常情况 , 如 连接断开重连机制 , 网络堵塞处理 , 异常处理 , 粘包处理 , 拆包处理 , 缓存机制 等方面的问题 , 这是所有成熟的网络应用程序都要具有的功能 , 否则只能说是入门级的 Demo

④ NIO BUG :NIO 本身存在一些 BUG , 如 Epoll , 导致 选择器 ( Selector ) 空轮询 , 在 JDK 1.7 中还没有解决

NettyNIO 的基础上 , 封装了 Java 原生的 NIO API , 解决了上述哪些问题呢 ?

相比 Java NIO,使用 Netty 开发程序,都简化了哪些步骤呢?...等等这系列问题也都是我们要问的问题。不过因为这篇只是介绍NIO相关知识,没有介绍Netty API的使用,所以介绍Netty API使用简单开发门槛低等优点有点站不住脚。那么就留到后面跟大家一起开启Netty学习之旅,探讨人人说好的Netty到底是不是江湖传言的那么好。

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8