网络编程 AIO

x33g5p2x  于2021-12-06 转载在 其他  
字(4.9k)|赞(0)|评价(0)|浏览(537)

AIO模型介绍

与NIO模型不同,一读写操作为例,只需直接调用read和write的API即可。对于读操作:当有流可读时,系统会将可读的流传入到read方法的缓冲区,并通知应用程序。读写都是异步的,完成之后会主动调用回调函数。

  • AsynchronousSocketChannel:异步操作TCP通道,主要连接AsynchronousServerSocketChannel,一般在客户端实现;
  • AsynchronousServerSocketChannel:异步操作TCP通道,主要接收客户端的连接,一般在服务端实现;
  • AsynchronousFileChannel:操作文件;
  • AsynchronousDatagramChanel:异步操作UDP的通道。

AsynchronousServerSocketChannel:AIO中网络通信服务端的socket

异步IO中有两种实现方式:

1、future方法

  1. Future<AsynchronousSocketChannel> accept();

提交一个IO操作请求(Accept/read/write),返回future,就可以对future进行检查,future.get()方法,future方法会让用户程序阻塞直至操作正常完成,使用future方法比较简单,但是future.get()是同步的,使用该方式容易进入到同步的编程模式,这种方式会使AIO的异步操作成为摆设。

2、callback回调方式

  1. <A> void accept(A attachment, CompletionHandler<AsynchronousSocketChannel,? super A> handler)

开始接收客户端的连接,连接成功或者失败都是触发CompletionHandler对象的响应方法。

CompletionHandler接口提供了两个方法:

  1. void completed(V result, A attachment);

当IO完成时触发该方法,该方法的第一个参数代表IO操作的返回的对象,第二个参数代表发起IO操作时传入的附加参数。

  1. void failed(Throwable exc, A attachment);

当IO失败时触发该方法,第一个参数表示IO操作失败引起的异常或错误,第二个参数代表发起IO操作时传入的附加参数。

即提交一个IO操作请求(Accept/read/write),指定一个CompletionHandler,当异步IO操作完成时,发送一个通知,这个时候CompletionHandler对象的completed或者failed方法将会被调用。

AIO的实现需要充分调用操作系统参数,IO需要操作系统的支持,并发也同样需要操作系统的支持,所以性能方面不同的操作系统差异会比较明显。

AIO 的回调方式编程

AIO服务端(Server.java):

  1. /**
  2. * AIO服务端代码
  3. * @Author : quwenjing
  4. * @Date : 2021/11/18 19:01
  5. **/
  6. public class Server {
  7.    public static void main(String[] args) {
  8.        try {
  9.            //创建服务端通道
  10.            AsynchronousServerSocketChannel asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
  11.            //绑定端口
  12.            asynchronousServerSocketChannel.bind(new InetSocketAddress(6666));
  13.            System.out.println("服务端启动");
  14.            //接受客户端的连接accept
  15.            asynchronousServerSocketChannel.accept(null,new AcceptCompletionHandler(asynchronousServerSocketChannel));
  16.            //BIO accept操作返回Socket实例
  17.            //AIO accept操作返回AsynchronousSocketChannel
  18.            //accept是异步操作,防止当前程序直接执行结束
  19.            //方法一:while(ture) +sleep
  20.            while (true){
  21.                try {
  22.                    Thread.sleep(1000);
  23.               } catch (InterruptedException e) {
  24.                    e.printStackTrace();
  25.               }
  26.           }
  27.       } catch (IOException e) {
  28.            e.printStackTrace();
  29.       }
  30.   }
  31. }

接受连接accept的回调(AcceptCompletionHandler.java):

  1. /**
  2. * 接收连接accept的回调
  3. * @Author : quwenjing
  4. * @Date : 2021/11/18 19:07
  5. **/
  6. public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel,Object > {
  7. //返回参数,传入参数
  8.    private AsynchronousServerSocketChannel channel;
  9.    public AcceptCompletionHandler(AsynchronousServerSocketChannel channel){
  10.        this.channel = channel;
  11.   }
  12.    @Override
  13.    public void completed(AsynchronousSocketChannel result, Object attachment) {
  14.        System.out.println("有新客户的连接");
  15.        //完成accept连接操作
  16.        //读写操作,需要使用buffer
  17.        //创建新的buffer
  18.        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  19.        //读客户端的数据,读操作是异步操作,需要实现CompletionHandler对象
  20.        /*
  21.        void read(ByteBuffer dst,A attachment,CompletionHandler<Integer,? super A> handler);
  22.         读操作异步方式方法解读
  23.         dst:数据读取目的地   attachment:给读回调传递的信息
  24.         CompletionHandler:当读数据完成后CompletionHandler对象
  25.         */
  26.        result.read(byteBuffer,byteBuffer,new ReadCompletionHandler(result));
  27.        //再次接受其他客户端的连接,调用accept方法
  28.        channel.accept(null,new AcceptCompletionHandler(channel));
  29.   }
  30.    @Override
  31.    public void failed(Throwable exc, Object attachment) {
  32.   }
  33. }

读操作的回调(ReadCompletionHandler.java):

  1. /**
  2. * 读操作的回调
  3. * @Author : quwenjing
  4. * @Date : 2021/11/18 19:21
  5. **/
  6. public class ReadCompletionHandler implements CompletionHandler<Integer,ByteBuffer> {
  7.    //读操作返回的结果是读取的个数,应该是Integer
  8.    //用户接收或者发送操作的通道
  9.    private AsynchronousSocketChannel asynchronousSocketChannel;
  10.    public ReadCompletionHandler(AsynchronousSocketChannel channel){
  11.        this.asynchronousSocketChannel = channel;
  12.   }
  13.    @Override
  14.    public void completed(Integer result, ByteBuffer attachment) {
  15.        //读数据完成后
  16.        //数据已经完成并写入ByteBuffer类型的result变量中
  17.        attachment.flip();
  18.        byte[] bytes = new byte[attachment.remaining()];
  19.        attachment.get(bytes);
  20.        String string = null;
  21.        try {
  22.            string = new String(bytes,"utf-8");
  23.       } catch (UnsupportedEncodingException e) {
  24.            e.printStackTrace();
  25.       }
  26.        System.out.println("服务端接收的数据:"+string);
  27.        attachment.clear();
  28.        //重复接收消息,再次调用异步读操作
  29.        this.asynchronousSocketChannel.read(attachment,attachment,new ReadCompletionHandler(this.asynchronousSocketChannel));
  30.   }
  31.    @Override
  32.    public void failed(Throwable exc, ByteBuffer attachment) {
  33.   }
  34. }

客户端(Client.java):

  1. public class Client {
  2.    public static void main(String[] args) {
  3.        //创建异步通道
  4.        try {
  5.            AsynchronousSocketChannel asynchronousSocketChannel = AsynchronousSocketChannel.open();
  6.            //连接服务端异步方式
  7.            asynchronousSocketChannel.connect(new InetSocketAddress("127.0.0.1",6666),asynchronousSocketChannel,new ConnectionCompletionHandler());
  8.            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  9.            //写操作
  10.            Scanner scanner = new Scanner(System.in);
  11.            while (scanner.hasNext()){
  12.                String msg = scanner.nextLine();
  13.                if (msg!=null && !"".equals(msg.trim())){
  14.                    byteBuffer.put(msg.getBytes());
  15.                    byteBuffer.flip();
  16.                    asynchronousSocketChannel.write(byteBuffer);
  17.                    byteBuffer.clear();
  18.               }
  19.           }
  20.       } catch (IOException e) {
  21.            e.printStackTrace();
  22.       }
  23.   }
  24. }

连接服务端(ConnectionCompletionHandler.java):

  1. public class ConnectionCompletionHandler implements CompletionHandler<Void, AsynchronousSocketChannel> {
  2.    @Override
  3.    public void completed(Void result, AsynchronousSocketChannel attachment) {
  4.        //连接服务端成功
  5.        System.out.println("连接服务端成功");
  6.   }
  7.    @Override
  8.    public void failed(Throwable exc, AsynchronousSocketChannel attachment) {
  9.   }
  10. }

相关文章