在异步IO模型中,当用户线程接收到通知时,数据已经被内核读取完毕并放在了用户缓冲区,内核在IO完成后通知用户线程直接使用即可。
异步IO类似于Java中典型的回调模式,用户进程(或线程)向内核空间注册了各种IO事件的回调函数,由内核去主动调用。
1.异步IO流程
异步IO地基本流程是:
(1)用户线程通过系统调用向内核注册某个IO操作。
(2)内核在整个IO操作(包括数据准备、数据复制等)完成后通知用户程序。
(3)用户线程执行后续的业务操作。
如下图所示:
在整个异步IO模型中,整个内核的数据处理过程(包括内核将数据从网络无力设备(网卡)读取到内核缓冲区、将内核缓冲区数据复制到用户缓冲区)中,用户程序都不需要阻塞。
举个例子,发起一个异步 IO 的 read 操作的系统调用,流程如下:
(1)当用户线程发起了 read 系统调用后,立刻就可以去做其他的事,用户线程不阻塞。
(2)内核开始 IO 的第一个阶段:准备数据。准备好数据,内核就会将数据从内核缓冲区复制到用户缓冲区。
(3)内核会给用户线程发送一个信号(Signal),或者回调用户线程注册的回调方法,告诉用户线程 read 系统调用已经完成,数据已经读入用户缓冲区。
(4)用户线程读取用户缓冲区的数据,完成后续的业务操作。
异步 IO 模型的特点是在内核等待数据和复制数据的两个阶段,用户线程都不是阻塞的。用户线程需要接收内核的 IO 操作完成的事件,或者用户线程需要注册一个 IO 操作完成的回调函数。正因为如此,异步 IO 有的时候也被称为信号驱动IO。
异步 IO 模型的缺点是应用程序仅需要进行事件的注册与接收,其余的工作都留给了操作系统,也就是说需要底层内核提供支持。
理论上来说,异步 IO 是真正的异步输入输出,它的吞吐量高于 IO 多路复用模型的吞吐量。就目前而言,Windows 系统下通过 IOCP 实现了真正的异步 IO。在 Linux 系统下,异步 IO 模型在 2.6 版本才引入,JDK 对它的支持目前并不完善,因此异步 IO 在性能上没有明显的优势。
2.AIO简单demo
异步IO两种实现方式:返回 Future 实例;提供 CompletionHandler 回调函数。
2.1.Server端demo
包含Server和ServerChannelHandler。
Server代码如下所示:
public class Server {
public static void main(String[] args) throws IOException {
// 实例化,并监听端口
AsynchronousServerSocketChannel server =
AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(8080));
// 自己定义一个 Attachment 类,用于传递一些信息
Attachment att = new Attachment();
att.setServer(server);
server.accept(att, new CompletionHandler<AsynchronousSocketChannel, Attachment>() {
@Override
public void completed(AsynchronousSocketChannel client, Attachment att) {
try {
SocketAddress clientAddr = client.getRemoteAddress();
System.out.println("收到新的连接:" + clientAddr);
// 收到新的连接后,server 应该重新调用 accept 方法等待新的连接进来
att.getServer().accept(att, this);
Attachment newAtt = new Attachment();
newAtt.setServer(server);
newAtt.setClient(client);
newAtt.setReadMode(true);
newAtt.setBuffer(ByteBuffer.allocate(2048));
// 这里也可以继续使用匿名实现类,不过代码不好看,所以这里专门定义一个类
client.read(newAtt.getBuffer(), newAtt, new ChannelHandler());
} catch (IOException ex) {
ex.printStackTrace();
}
}
@Override
public void failed(Throwable t, Attachment att) {
System.out.println("accept failed");
}
});
// 为了防止 main 线程退出
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
}
}
}
ServerChannelHandler代码如下所示:
public class ServerChannelHandler implements CompletionHandler<Integer, Attachment> {
@Override
public void completed(Integer result, Attachment att) {
if (att.isReadMode()) {
// 读取来自客户端的数据
ByteBuffer buffer = att.getBuffer();
buffer.flip();
byte bytes[] = new byte[buffer.limit()];
buffer.get(bytes);
String msg = new String(buffer.array()).toString().trim();
System.out.println("收到来自客户端的数据: " + msg);
// 响应客户端请求,返回数据
buffer.clear();
buffer.put("Response from server!".getBytes(Charset.forName("UTF-8")));
att.setReadMode(false);
buffer.flip();
// 写数据到客户端也是异步
att.getClient().write(buffer, att, this);
} else {
// 到这里,说明往客户端写数据也结束了,有以下两种选择:
// 1. 继续等待客户端发送新的数据过来
// att.setReadMode(true);
// att.getBuffer().clear();
// att.getClient().read(att.getBuffer(), att, this);
// 2. 既然服务端已经返回数据给客户端,断开这次的连接
try {
att.getClient().close();
} catch (IOException e) {
}
}
}
@Override
public void failed(Throwable t, Attachment att) {
System.out.println("连接断开");
}
}
2.2.Client端demo
包含Client和ClientChannelHandler。
Client代码如下所示:
public class Client {
public static void main(String[] args) throws Exception {
AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
// 来个 Future 形式的
Future<?> future = client.connect(new InetSocketAddress(8080));
// 阻塞一下,等待连接成功
future.get();
Attachment att = new Attachment();
att.setClient(client);
att.setReadMode(false);
att.setBuffer(ByteBuffer.allocate(2048));
byte[] data = "I am obot!".getBytes();
att.getBuffer().put(data);
att.getBuffer().flip();
// 异步发送数据到服务端
client.write(att.getBuffer(), att, new ClientChannelHandler());
// 这里休息一下再退出,给出足够的时间处理数据
Thread.sleep(2000);
}
}
ClientChannelHandler代码如下所示:
public class ClientChannelHandler implements CompletionHandler<Integer, Attachment> {
@Override
public void completed(Integer result, Attachment att) {
ByteBuffer buffer = att.getBuffer();
if (att.isReadMode()) {
// 读取来自服务端的数据
buffer.flip();
byte[] bytes = new byte[buffer.limit()];
buffer.get(bytes);
String msg = new String(bytes, Charset.forName("UTF-8"));
System.out.println("收到来自服务端的响应数据: " + msg);
// 接下来,有以下两种选择:
// 1. 向服务端发送新的数据
// att.setReadMode(false);
// buffer.clear();
// String newMsg = "new message from client";
// byte[] data = newMsg.getBytes(Charset.forName("UTF-8"));
// buffer.put(data);
// buffer.flip();
// att.getClient().write(buffer, att, this);
// 2. 关闭连接
try {
att.getClient().close();
} catch (IOException e) {
}
} else {
// 写操作完成后,会进到这里
att.setReadMode(true);
buffer.clear();
att.getClient().read(buffer, att, this);
}
}
@Override
public void failed(Throwable t, Attachment att) {
System.out.println("服务器无响应");
}
}
从代码可以看出来:
(1)BIO从连接请求(accept)就开始阻塞一直到通道读写完成
(2)NIO从连接请求(accept)之前一直阻塞,连接之后非阻塞,通过注册读写事件,委托工作线程执行
(3)AIO从连接请求(accept)之前就是异步的,通过回调函数或者Future实现。