NIO为原始类型(boolean除外)提供缓存支持的数据容器,使用他们可以提供非阻塞式的高伸缩性网络。
在 Linux 系统下,socket 连接默认是阻塞模式,可以将 socket 设置成非阻塞模式。
1.同步非阻塞IO流程
在 NIO 模型中,应用程序一旦开始 IO 系统调用,就会出现以下两种情况:
(1)在内核缓冲区中没有数据的情况下,系统调用会立即返回一个调用失败的信息。
(2)在内核缓冲区中有数据的情况下,在数据的复制过程中系统调用是阻塞的,直到完成数据从内核缓冲区复制到用户缓冲区。复制完成后,系统调用返回成功,用户进程(或者线程)可以开始处理用户空间的缓冲区数据。
NIO流程如下图所示:
举个例子,发起一个非阻塞 socket 的 read 操作的系统调用,流程如下:
(1)在内核数据没有准备好的阶段,用户线程发起 IO 请求时立即返回。所以,为了读取最终的数据,用户进程(或者线程)需要不断地发起 IO 系统调用。
(2)内核数据到达后,用户进程(或者线程)发起系统调用,用户进程(或者线程)阻塞。内核开始复制数据,它会将数据从内核缓冲区复制到用户缓冲区,然后内核返回结果(例如返回复制到的用户缓冲区的字节数)。
(3)用户进程(或者线程)读到数据后,才会解除阻塞状态,重新运行起来。也就是说,用户空间需要经过多次尝试才能保证最终真正读到数据,而后继续执行。
1.1.NIO 的特点
应用程序的线程需要不断地进行 IO 系统调用,轮询数据是否已经准备好,如果没有准备好就继续轮询,直到完成 IO 系统调用为止。
1.2.NIO 的优点
每次发起的 IO 系统调用在内核等待数据过程中可以立即返回,用户线程不会阻塞,实时性较好。
1.3.NIO 的缺点
不断地轮询内核,这将占用大量的 CPU 时间,效率低下。
2.同步非阻塞IO模型
模型如下图所示:
应用程序可从buffer中读取数据。
NIO三大组件为:Channel、Buffer、Selector。
2.1.Channel
Channel是一个对象,可以通过它读取和写入数据。拿 NIO 与原来的 I/O 做个比较,通道就像是流,而且他们面向缓冲区(Buffer)的。所有数据都通过Buffer对象来处理,永远不会将字节直接写入通道中,而是将数据写入包含一个或多个字节的缓冲区。也不会直接从通道中读取字节,而是将数据从通道读入缓冲区,再从缓冲区获取这个字节。
Channel是读写数据的双向通道,可以从Channel将数据读取Buffer,也可将Buffer的数据写入Channel,而之前的Stream要么是输入(InputStream)、要么是输出(OutputStream),只在一个方向上流通。 而通道(Channel)可以用于读、写或者同时用于读写。
常见的Channel有以下四种:
(1)FileChannel:文件通道,用于文件的读写。
(2)DatagramChannel:UDP通道,用于监听UDP请求和发送UDP请求。
(3)SocketChannel:TCP通道,用于发起TCP请求。
(4)ServerSocketChannel:用于服务端监听外部过来的TCP请求。
NIO的读操作就是将数据从Channel读到Buffer中,进行后续处理,调用方法如下:
channel.read(buffer)
NIO的写操作就是将数据从Buffer中写入到Channel中,调用方法如下:
channel.write(buffer)
2.2.Buffer
Buffer的本质就是内存块,所有的数据读写都要依赖这个,我们可以将数据写入到这个内存块,然后从这个内存块读取数据。
Buffer缓冲区用来读写数据,常见的Buffer有:ByteBuffer、ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer、CharBuffer。
每一种Buffer都可以理解为一个数组,有几个重要属性:position、limit、capacity。如下图所示:
position 的初始值是 0,每往 Buffer 中写入一个值,position 就自动加 1,代表下一次的写入位置。读操作的时候也是类似的,每读一个值,position 就自动加 1。
Buffer可以读写模式切换,从写操作模式到读操作模式切换的时候(flip),position 都会归零,这样就可以从头开始读写了。
写操作模式下,limit 代表的是最大能写入的数据,这个时候 limit 等于 capacity。写结束后,切换到读模式,此时的 limit 等于 Buffer 中实际的数据大小,因为 Buffer 不一定被写满了。
Buffer的使用案例如下代码所示:
// 创建一个Buffer对象
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 从Channel中读取数据到Buffer中
int num = channel.read(buf);
// 将Buffer中的数据写入到Channel中。
int num = channel.write(buf);
2.3.Selector
在多线程模式下,BIO时,一个线程只能处理一个请求,比如http请求,当请求响应式关闭连接,释放线程资源。Selector选择器的作用就是配合一个线程来管理多个Channel,获取这些Channel上发生的事件,这些Channel工作在非阻塞模式下,不会让线程一直在一个Channel上,适合连接数特别多,但流量低的场景。
调用Selector的select()方法会阻塞直到Channel发送了读写就绪事件,这些事件发生,select()方法就会返回这些事件交给thread来处理。
Selector的使用案例如下代码所示:
Selector selector = Selector.open();
// 将通道设置为非阻塞模式,因为默认都是阻塞模式的
channel.configureBlocking(false);
// 注册
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
Selector有四个事件:
(1)SelectKey.OP_ACCEPT:接收TCP连接
(2)SelectKey.OP_READ:通道有数据可以读
(3)SelectKey.OP_WRITE:通道有数据可以写
(4)SelectKey.OP_CONNECT:成功建立TCP连接
非阻塞IO核心是一个Selector管理多个通道,将各个通道注册到 Selector 上,指定监听的事件,之后可以只用一个线程来轮询这个 Selector,看看上面是否有通道是准备好的,当通道准备好可读或可写,然后才去开始真正的读写,这样速度就很快了。
Selector底层实现有三种方式。
2.3.1.select模式
缺点:
(1)单个进程能够监视的文件描述符的数量存在最大限制
(2)内核 / 用户空间内存拷贝问题,select需要复制大量的句柄数据结构,产生巨大的开销
(3)Select返回的是含有整个句柄的数组,应用程序需要遍历整个数组才能发现哪些句柄发生了事件
(4)select的触发方式是水平触发,应用程序如果没有完成对一个已经就绪的文件描述符进行IO操作,那么之后每次select调用还是会将这些文件描述符通知进程。
2.3.2.poll模式
相比select模型,poll使用链表保存文件描述符,因此没有了监视文件数量的限制,但其他三个缺点依然存在。
select 和 poll 都有一个共同的问题,那就是它们都只会告诉你有几个通道准备好了,但是不会告诉你具体是哪几个通道。所以,一旦知道有通道准备好以后,自己还是需要进行一次扫描,显然这个不太好,通道少的时候还行,一旦通道的数量是几十万个以上的时候,扫描一次的时间都很可观了,时间复杂度 O(n)。
2.3.3.epoll模式
通过红黑树和双链表数据结构,并结合回调机制,造就了epoll的高效,解决了select/poll缺点。
每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件,这些事件都会挂载在红黑树中。而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当相应的事件发生时会调用这个回调方法。这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中。
(1)调用epoll_create()建立一个epoll对象(在epoll文件系统中为这个句柄对象分配资源)
(2)调用epoll_ctl向epoll对象中添加这100万个连接的套接字
(3)调用epoll_wait收集发生的事件的连接
3.NIO简单demo
Server端代码如下所示:
public class SelectorServer {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel server = ServerSocketChannel.open();
server.socket().bind(new InetSocketAddress(8080));
// 将其注册到 Selector 中,监听 OP_ACCEPT 事件
server.configureBlocking(false);
server.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 需要不断地去调用 select() 方法获取最新的准备好的通道
int readyChannels = selector.select();
if (readyChannels == 0) {
continue;
}
Set<SelectionKey> readyKeys = selector.selectedKeys();
// 遍历
Iterator<SelectionKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
// 有已经接受的新的到服务端的连接
SocketChannel socketChannel = server.accept();
// 有新的连接并不代表这个通道就有数据,
// 这里将这个新的 SocketChannel 注册到 Selector,监听 OP_READ 事件,等待数据
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
// 有数据可读
// 上面一个 if 分支中注册了监听 OP_READ 事件的 SocketChannel
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int num = socketChannel.read(readBuffer);
if (num > 0) {
// 处理进来的数据...
System.out.println("收到数据:" + new String(readBuffer.array()).trim());
socketChannel.register(selector, SelectionKey.OP_WRITE);
} else if (num == -1) {
// -1 代表连接已经关闭
socketChannel.close();
}
} else if (key.isWritable()) {
// 通道可写
// 给用户返回数据的通道可以进行写操作了
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.wrap("返回给客户端的数据...".getBytes());
socketChannel.write(buffer);
// 重新注册这个通道,监听 OP_READ 事件,客户端还可以继续发送内容过来
socketChannel.register(selector, SelectionKey.OP_READ);
}
}
}
}
}
Client端代码与BIO相同。