Java I/O模型——AIO


发布于 2016-04-10 / 46 阅读 / 0 评论 /
AIO全称Asynchronous IO,异步IO,指的是用户空间的线程变成被动接收者,而内核空间成为主动调用者。

在异步IO模型中,当用户线程接收到通知时,数据已经被内核读取完毕并放在了用户缓冲区,内核在IO完成后通知用户线程直接使用即可。

异步IO类似于Java中典型的回调模式,用户进程(或线程)向内核空间注册了各种IO事件的回调函数,由内核去主动调用。

1.异步IO流程

异步IO地基本流程是:

(1)用户线程通过系统调用向内核注册某个IO操作。

(2)内核在整个IO操作(包括数据准备、数据复制等)完成后通知用户程序。

(3)用户线程执行后续的业务操作。

如下图所示:

在整个异步IO模型中,整个内核的数据处理过程(包括内核将数据从网络无力设备(网卡)读取到内核缓冲区、将内核缓冲区数据复制到用户缓冲区)中,用户程序都不需要阻塞。

举个例子,发起一个异步 IO 的 read 操作的系统调用,流程如下:

(1)当用户线程发起了 read 系统调用后,立刻就可以去做其他的事,用户线程不阻塞。

(2)内核开始 IO 的第一个阶段:准备数据。准备好数据,内核就会将数据从内核缓冲区复制到用户缓冲区。

(3)内核会给用户线程发送一个信号(Signal),或者回调用户线程注册的回调方法,告诉用户线程 read 系统调用已经完成,数据已经读入用户缓冲区。

(4)用户线程读取用户缓冲区的数据,完成后续的业务操作。

异步 IO 模型的特点是在内核等待数据和复制数据的两个阶段,用户线程都不是阻塞的。用户线程需要接收内核的 IO 操作完成的事件,或者用户线程需要注册一个 IO 操作完成的回调函数。正因为如此,异步 IO 有的时候也被称为信号驱动IO。

异步 IO 模型的缺点是应用程序仅需要进行事件的注册与接收,其余的工作都留给了操作系统,也就是说需要底层内核提供支持。

理论上来说,异步 IO 是真正的异步输入输出,它的吞吐量高于 IO 多路复用模型的吞吐量。就目前而言,Windows 系统下通过 IOCP 实现了真正的异步 IO。在 Linux 系统下,异步 IO 模型在 2.6 版本才引入,JDK 对它的支持目前并不完善,因此异步 IO 在性能上没有明显的优势。

2.AIO简单demo

异步IO两种实现方式:返回 Future 实例;提供 CompletionHandler 回调函数。

2.1.Server端demo

包含Server和ServerChannelHandler。

Server代码如下所示:

public class Server {
    public static void main(String[] args) throws IOException {
          // 实例化,并监听端口
        AsynchronousServerSocketChannel server =
                AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(8080));
        // 自己定义一个 Attachment 类,用于传递一些信息
        Attachment att = new Attachment();
        att.setServer(server);
        server.accept(att, new CompletionHandler<AsynchronousSocketChannel, Attachment>() {
            @Override
            public void completed(AsynchronousSocketChannel client, Attachment att) {
                try {
                    SocketAddress clientAddr = client.getRemoteAddress();
                    System.out.println("收到新的连接:" + clientAddr);
                    // 收到新的连接后,server 应该重新调用 accept 方法等待新的连接进来
                    att.getServer().accept(att, this);
                    Attachment newAtt = new Attachment();
                    newAtt.setServer(server);
                    newAtt.setClient(client);
                    newAtt.setReadMode(true);
                    newAtt.setBuffer(ByteBuffer.allocate(2048));
                    // 这里也可以继续使用匿名实现类,不过代码不好看,所以这里专门定义一个类
                    client.read(newAtt.getBuffer(), newAtt, new ChannelHandler());
                } catch (IOException ex) {
                    ex.printStackTrace();
                }
            }
            @Override
            public void failed(Throwable t, Attachment att) {
                System.out.println("accept failed");
            }
        });
        // 为了防止 main 线程退出
        try {
            Thread.currentThread().join();
        } catch (InterruptedException e) {
        }
    }
}

ServerChannelHandler代码如下所示:

public class ServerChannelHandler implements CompletionHandler<Integer, Attachment> {
    @Override
    public void completed(Integer result, Attachment att) {
        if (att.isReadMode()) {
            // 读取来自客户端的数据
            ByteBuffer buffer = att.getBuffer();
            buffer.flip();
            byte bytes[] = new byte[buffer.limit()];
            buffer.get(bytes);
            String msg = new String(buffer.array()).toString().trim();
            System.out.println("收到来自客户端的数据: " + msg);
            // 响应客户端请求,返回数据
            buffer.clear();
            buffer.put("Response from server!".getBytes(Charset.forName("UTF-8")));
            att.setReadMode(false);
            buffer.flip();
            // 写数据到客户端也是异步
            att.getClient().write(buffer, att, this);
        } else {
            // 到这里,说明往客户端写数据也结束了,有以下两种选择:
            // 1. 继续等待客户端发送新的数据过来
            // att.setReadMode(true);
            // att.getBuffer().clear();
            // att.getClient().read(att.getBuffer(), att, this);
            // 2. 既然服务端已经返回数据给客户端,断开这次的连接
            try {
                att.getClient().close();
            } catch (IOException e) {
            }
        }
    }
    @Override
    public void failed(Throwable t, Attachment att) {
        System.out.println("连接断开");
    }
}

2.2.Client端demo

包含Client和ClientChannelHandler。

Client代码如下所示:

public class Client {
    public static void main(String[] args) throws Exception {
        AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
          // 来个 Future 形式的
        Future<?> future = client.connect(new InetSocketAddress(8080));
        // 阻塞一下,等待连接成功
        future.get();
        Attachment att = new Attachment();
        att.setClient(client);
        att.setReadMode(false);
        att.setBuffer(ByteBuffer.allocate(2048));
        byte[] data = "I am obot!".getBytes();
        att.getBuffer().put(data);
        att.getBuffer().flip();
        // 异步发送数据到服务端
        client.write(att.getBuffer(), att, new ClientChannelHandler());
        // 这里休息一下再退出,给出足够的时间处理数据
        Thread.sleep(2000);
    }
}

ClientChannelHandler代码如下所示:

public class ClientChannelHandler implements CompletionHandler<Integer, Attachment> {
    @Override
    public void completed(Integer result, Attachment att) {
        ByteBuffer buffer = att.getBuffer();
        if (att.isReadMode()) {
            // 读取来自服务端的数据
            buffer.flip();
            byte[] bytes = new byte[buffer.limit()];
            buffer.get(bytes);
            String msg = new String(bytes, Charset.forName("UTF-8"));
            System.out.println("收到来自服务端的响应数据: " + msg);
            // 接下来,有以下两种选择:
            // 1. 向服务端发送新的数据
            // att.setReadMode(false);
            // buffer.clear();
            // String newMsg = "new message from client";
            // byte[] data = newMsg.getBytes(Charset.forName("UTF-8"));
            // buffer.put(data);
            // buffer.flip();
            // att.getClient().write(buffer, att, this);
            // 2. 关闭连接
            try {
                att.getClient().close();
            } catch (IOException e) {
            }
        } else {
            // 写操作完成后,会进到这里
            att.setReadMode(true);
            buffer.clear();
            att.getClient().read(buffer, att, this);
        }
    }
    @Override
    public void failed(Throwable t, Attachment att) {
        System.out.println("服务器无响应");
    }
}

从代码可以看出来:

(1)BIO从连接请求(accept)就开始阻塞一直到通道读写完成

(2)NIO从连接请求(accept)之前一直阻塞,连接之后非阻塞,通过注册读写事件,委托工作线程执行

(3)AIO从连接请求(accept)之前就是异步的,通过回调函数或者Future实现。