Java I/O模型——NIO


发布于 2016-04-09 / 39 阅读 / 0 评论 /
NIO全称non-blocking I/O,是jdk1.4及以上版本里提供的新api,也称为New I/O。

NIO为原始类型(boolean除外)提供缓存支持的数据容器,使用他们可以提供非阻塞式的高伸缩性网络。

在 Linux 系统下,socket 连接默认是阻塞模式,可以将 socket 设置成非阻塞模式。

1.同步非阻塞IO流程

在 NIO 模型中,应用程序一旦开始 IO 系统调用,就会出现以下两种情况:

(1)在内核缓冲区中没有数据的情况下,系统调用会立即返回一个调用失败的信息。

(2)在内核缓冲区中有数据的情况下,在数据的复制过程中系统调用是阻塞的,直到完成数据从内核缓冲区复制到用户缓冲区。复制完成后,系统调用返回成功,用户进程(或者线程)可以开始处理用户空间的缓冲区数据。

NIO流程如下图所示:

举个例子,发起一个非阻塞 socket 的 read 操作的系统调用,流程如下:

(1)在内核数据没有准备好的阶段,用户线程发起 IO 请求时立即返回。所以,为了读取最终的数据,用户进程(或者线程)需要不断地发起 IO 系统调用。

(2)内核数据到达后,用户进程(或者线程)发起系统调用,用户进程(或者线程)阻塞。内核开始复制数据,它会将数据从内核缓冲区复制到用户缓冲区,然后内核返回结果(例如返回复制到的用户缓冲区的字节数)。

(3)用户进程(或者线程)读到数据后,才会解除阻塞状态,重新运行起来。也就是说,用户空间需要经过多次尝试才能保证最终真正读到数据,而后继续执行。

1.1.NIO 的特点

应用程序的线程需要不断地进行 IO 系统调用,轮询数据是否已经准备好,如果没有准备好就继续轮询,直到完成 IO 系统调用为止。

1.2.NIO 的优点

每次发起的 IO 系统调用在内核等待数据过程中可以立即返回,用户线程不会阻塞,实时性较好。

1.3.NIO 的缺点

不断地轮询内核,这将占用大量的 CPU 时间,效率低下。

2.同步非阻塞IO模型

模型如下图所示:

应用程序可从buffer中读取数据。

NIO三大组件为:Channel、Buffer、Selector。

2.1.Channel

Channel是一个对象,可以通过它读取和写入数据。拿 NIO 与原来的 I/O 做个比较,通道就像是流,而且他们面向缓冲区(Buffer)的。所有数据都通过Buffer对象来处理,永远不会将字节直接写入通道中,而是将数据写入包含一个或多个字节的缓冲区。也不会直接从通道中读取字节,而是将数据从通道读入缓冲区,再从缓冲区获取这个字节。

Channel是读写数据的双向通道,可以从Channel将数据读取Buffer,也可将Buffer的数据写入Channel,而之前的Stream要么是输入(InputStream)、要么是输出(OutputStream),只在一个方向上流通。 而通道(Channel)可以用于读、写或者同时用于读写。

常见的Channel有以下四种:

(1)FileChannel:文件通道,用于文件的读写。

(2)DatagramChannel:UDP通道,用于监听UDP请求和发送UDP请求。

(3)SocketChannel:TCP通道,用于发起TCP请求。

(4)ServerSocketChannel:用于服务端监听外部过来的TCP请求。

NIO的读操作就是将数据从Channel读到Buffer中,进行后续处理,调用方法如下:

channel.read(buffer)

NIO的写操作就是将数据从Buffer中写入到Channel中,调用方法如下:

channel.write(buffer)

2.2.Buffer

Buffer的本质就是内存块,所有的数据读写都要依赖这个,我们可以将数据写入到这个内存块,然后从这个内存块读取数据。

Buffer缓冲区用来读写数据,常见的Buffer有:ByteBuffer、ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer、CharBuffer。

每一种Buffer都可以理解为一个数组,有几个重要属性:position、limit、capacity。如下图所示:

position 的初始值是 0,每往 Buffer 中写入一个值,position 就自动加 1,代表下一次的写入位置。读操作的时候也是类似的,每读一个值,position 就自动加 1。

Buffer可以读写模式切换,从写操作模式到读操作模式切换的时候(flip),position 都会归零,这样就可以从头开始读写了。

写操作模式下,limit 代表的是最大能写入的数据,这个时候 limit 等于 capacity。写结束后,切换到读模式,此时的 limit 等于 Buffer 中实际的数据大小,因为 Buffer 不一定被写满了。

Buffer的使用案例如下代码所示:

// 创建一个Buffer对象
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 从Channel中读取数据到Buffer中
int num = channel.read(buf);
// 将Buffer中的数据写入到Channel中。
int num = channel.write(buf);

2.3.Selector

​在多线程模式下,BIO时,一个线程只能处理一个请求,比如http请求,当请求响应式关闭连接,释放线程资源。Selector选择器的作用就是配合一个线程来管理多个Channel,获取这些Channel上发生的事件,这些Channel工作在非阻塞模式下,不会让线程一直在一个Channel上,适合连接数特别多,但流量低的场景。

调用Selector的select()方法会阻塞直到Channel发送了读写就绪事件,这些事件发生,select()方法就会返回这些事件交给thread来处理。

Selector的使用案例如下代码所示:

Selector selector = Selector.open();
// 将通道设置为非阻塞模式,因为默认都是阻塞模式的
channel.configureBlocking(false);
// 注册
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

Selector有四个事件:

(1)SelectKey.OP_ACCEPT:接收TCP连接

(2)SelectKey.OP_READ:通道有数据可以读

(3)SelectKey.OP_WRITE:通道有数据可以写

(4)SelectKey.OP_CONNECT:成功建立TCP连接

非阻塞IO核心是一个Selector管理多个通道,将各个通道注册到 Selector 上,指定监听的事件,之后可以只用一个线程来轮询这个 Selector,看看上面是否有通道是准备好的,当通道准备好可读或可写,然后才去开始真正的读写,这样速度就很快了。

Selector底层实现有三种方式。

模式

select

poll

epoll

操作方式

遍历

遍历

回调

底层实现

数组

链表

哈希表

IO效率

每次调用都进行线性遍历,时间复杂度为O(n)

每次调用都进行线性遍历,时间复杂度为O(n)

事件通知方式,每当fd就绪,系统注册的回调函数就会被调用,将就绪fd放到rdllist里面。时间复杂度为O(1)

最大连接数

1024(x86)

2048(x64)

无上限

无上限

Fd拷贝

每次调用select,都需要把fd集合从用户态拷贝到内核态

每次调用poll,都需要把fd集合从用户态拷贝到内核态

调用epoll_ctl时拷贝进内核并保存,之后每次epoll_wait不拷贝

2.3.1.select模式

缺点:

(1)单个进程能够监视的文件描述符的数量存在最大限制

(2)内核 / 用户空间内存拷贝问题,select需要复制大量的句柄数据结构,产生巨大的开销

(3)Select返回的是含有整个句柄的数组,应用程序需要遍历整个数组才能发现哪些句柄发生了事件

(4)select的触发方式是水平触发,应用程序如果没有完成对一个已经就绪的文件描述符进行IO操作,那么之后每次select调用还是会将这些文件描述符通知进程。

2.3.2.poll模式

相比select模型,poll使用链表保存文件描述符,因此没有了监视文件数量的限制,但其他三个缺点依然存在。

select 和 poll 都有一个共同的问题,那就是它们都只会告诉你有几个通道准备好了,但是不会告诉你具体是哪几个通道。所以,一旦知道有通道准备好以后,自己还是需要进行一次扫描,显然这个不太好,通道少的时候还行,一旦通道的数量是几十万个以上的时候,扫描一次的时间都很可观了,时间复杂度 O(n)。

2.3.3.epoll模式

通过红黑树和双链表数据结构,并结合回调机制,造就了epoll的高效,解决了select/poll缺点。

每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件,这些事件都会挂载在红黑树中。而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当相应的事件发生时会调用这个回调方法。这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中。

(1)调用epoll_create()建立一个epoll对象(在epoll文件系统中为这个句柄对象分配资源)

(2)调用epoll_ctl向epoll对象中添加这100万个连接的套接字

(3)调用epoll_wait收集发生的事件的连接

3.NIO简单demo

Server端代码如下所示:

public class SelectorServer {
    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        ServerSocketChannel server = ServerSocketChannel.open();
        server.socket().bind(new InetSocketAddress(8080));
        // 将其注册到 Selector 中,监听 OP_ACCEPT 事件
        server.configureBlocking(false);
        server.register(selector, SelectionKey.OP_ACCEPT);
        while (true) {
            // 需要不断地去调用 select() 方法获取最新的准备好的通道
            int readyChannels = selector.select();
            if (readyChannels == 0) {
                continue;
            }
            Set<SelectionKey> readyKeys = selector.selectedKeys();
            // 遍历
            Iterator<SelectionKey> iterator = readyKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                if (key.isAcceptable()) {
                    // 有已经接受的新的到服务端的连接
                    SocketChannel socketChannel = server.accept();
                    // 有新的连接并不代表这个通道就有数据,
                    // 这里将这个新的 SocketChannel 注册到 Selector,监听 OP_READ 事件,等待数据
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector, SelectionKey.OP_READ);
                } else if (key.isReadable()) {
                    // 有数据可读
                    // 上面一个 if 分支中注册了监听 OP_READ 事件的 SocketChannel
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    int num = socketChannel.read(readBuffer);
                    if (num > 0) {
                        // 处理进来的数据...
                        System.out.println("收到数据:" + new String(readBuffer.array()).trim());
                        socketChannel.register(selector, SelectionKey.OP_WRITE);
                    } else if (num == -1) {
                        // -1 代表连接已经关闭
                        socketChannel.close();
                    }
                } else if (key.isWritable()) {
                    // 通道可写
                    // 给用户返回数据的通道可以进行写操作了
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.wrap("返回给客户端的数据...".getBytes());
                    socketChannel.write(buffer);
                    // 重新注册这个通道,监听 OP_READ 事件,客户端还可以继续发送内容过来
                    socketChannel.register(selector, SelectionKey.OP_READ);
                }
            }
        }
    }
}

Client端代码与BIO相同。