深入分析 Java IO (四)AIO

这是我参与8月更文挑战的第4天,活动详情查看:8月更文挑战

前言

前面我们对 Java IOBIONIO 进行了分析,相关文章链接如下:

深入分析 Java IO (一)概述

深入分析 Java IO (二)BIO

深入分析 Java IO (三)NIO

本篇文章我们就来讲讲 AIO的相关知识。

概述

Java 1.7开始,Java 提供了 AIO(异步I/O)。Java AIO 也被称为 NIO2.0,提供了异步I/O的方式,用法和标准的I/O有非常大的差异。

Java AIO 采用订阅-通知模式:即应用程序向操作系统注册IO监听,然后继续做自己的事情。当操作系统发生IO事件,并且准备好数据后,在主动通知应用程序,触发相应的函数。

AIO

Java AIO 的处理流程图如下:

image-20210802162741731

  • 和同步IO一样,异步IO也是由操作系统进行支持的。微软的windows系统提供了一种异步IO技术:IOCP(I/O CompletionPort,I/O完成端口)
  • Linux下由于没有这种异步IO技术,所以使用的是epoll(上文介绍过的一种多路复用IO技术的实现)对异步IO进行模拟。

代码示例

客户端:

 package org.example.aio;
 ​
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.AsynchronousSocketChannel;
 import java.nio.charset.Charset;
 import java.util.concurrent.ExecutionException;
 ​
 public class AioClient {
 ​
     public static void main(String[] args) throws IOException, InterruptedException {
         //打开一个客户端通道
         AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
         //与服务端建立连接
         channel.connect(new InetSocketAddress("127.0.0.1", 9988));
         //睡眠一秒,等待与服务端的连接
         Thread.sleep(1000);
 ​
         try {
             //向服务端发送数据
             channel.write(ByteBuffer.wrap("Hello,我是客户端".getBytes())).get();
         } catch (ExecutionException e) {
             e.printStackTrace();
         }
 ​
         try {
             //从服务端读取返回的数据
             ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
             channel.read(byteBuffer).get();//将通道中的数据写入缓冲Buffer
             byteBuffer.flip();
             String result = Charset.defaultCharset().newDecoder().decode(byteBuffer).toString();
             System.out.println("客户端收到服务端返回的内容:" + result);//服务端返回的数据
         } catch (ExecutionException e) {
             e.printStackTrace();
         }
     }
 }
复制代码

服务端:

 package org.example.aio;
 ​
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.AsynchronousServerSocketChannel;
 import java.nio.channels.AsynchronousSocketChannel;
 import java.nio.channels.CompletionHandler;
 import java.nio.charset.Charset;
 ​
 public class AioServer {
 ​
     public AsynchronousServerSocketChannel serverSocketChannel;
 ​
     public void listen() throws Exception {
         //打开一个服务端通道
         serverSocketChannel = AsynchronousServerSocketChannel.open();
         serverSocketChannel.bind(new InetSocketAddress(9988));//监听9988端口
         //监听
         serverSocketChannel.accept(this, new CompletionHandler<AsynchronousSocketChannel, AioServer>() {
             @Override
             public void completed(AsynchronousSocketChannel client, AioServer attachment) {
                 try {
                     if (client.isOpen()) {
                         System.out.println("接收到新的客户端的连接,地址:" + client.getRemoteAddress());
                         final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                         //读取客户端发送的数据
                         client.read(byteBuffer, client, new CompletionHandler<Integer, AsynchronousSocketChannel>() {
                             @Override
                             public void completed(Integer result, AsynchronousSocketChannel attachment) {
                                 try {
                                     //读取请求,处理客户端发送的数据
                                     byteBuffer.flip();
                                     String content = Charset.defaultCharset().newDecoder().decode(byteBuffer).toString();
                                     System.out.println("服务端接受到客户端发来的数据:" + content);
                                     //向客户端发送数据
                                     ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
                                     writeBuffer.put("Server send".getBytes());
                                     writeBuffer.flip();
                                     attachment.write(writeBuffer).get();
                                 } catch (Exception e) {
                                     e.printStackTrace();
                                 }
                             }
 ​
                             @Override
                             public void failed(Throwable exc, AsynchronousSocketChannel attachment) {
                                 try {
                                     exc.printStackTrace();
                                     attachment.close();
                                 } catch (IOException e) {
                                     e.printStackTrace();
                                 }
                             }
                         });
                     }
                 } catch (Exception e) {
                     e.printStackTrace();
                 } finally {
                     //当有新的客户端接入的时候,直接调用accept的方法,递归执行下去,这样可以保证多个客户端都可以阻塞
                     attachment.serverSocketChannel.accept(attachment, this);
                 }
             }
 ​
             @Override
             public void failed(Throwable exc, AioServer attachment) {
                 exc.printStackTrace();
             }
         });
     }
 ​
 ​
     public static void main(String[] args) throws Exception {
         new AioServer().listen();
         Thread.sleep(Integer.MAX_VALUE);
     }
 ​
 }
复制代码

这种组合方式用起来比较复杂,只有在一些非常复杂的分布式情况下使用,像集群之间的消息同步机制一般用这种 I/O 组合方式。如 Cassandra 的 Gossip 通信机制就是采用异步非阻塞的方式。

BIO/NIO/AIO比较

BIO NIO AIO
IO模型 同步阻塞 同步非阻塞(多路复用) 异步非阻塞
编程难度 简单 复杂 复杂
可靠性
吞吐量

BIO、NIO、AIO适用场景

  • BIO方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序直观简单易理解。

  • NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂,JDK1.4开始支持。

  • AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持。

总结

Java BIO: 同步并阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,当然可以通过线程池机制改善。

Java NIO : 同步非阻塞,服务器实现模式为一个请求一个线程,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求时才启动一个线程进行处理。

Java AIO(NIO.2): 异步非阻塞,服务器实现模式为一个有效请求一个线程,客户端的I/O请求都是由OS先完成了再通知服务器应用去启动线程进行处理。

结尾

我是一个正在被打击还在努力前进的码农。如果文章对你有帮助,记得点赞、关注哟,谢谢!

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享