异步非阻塞式数据传输——AIO的两种实现方式

x33g5p2x  于2022-05-25 转载在 其他  
字(4.1k)|赞(0)|评价(0)|浏览(275)

一 点睛

AIO 是自 JDK 1.7 开始提供,本质是对 NIO 中 Channel 进行的一些扩展,因此 AIO 也称为 NIO.2。具体地讲,AIO 就是在 NIO 的基础上,新增加了下表的 3个 Channel 实现类,这 3 个类也称为异步通道。

| <br>异步通道<br> | <br>简介<br> |
| <br>AsynchronousFileChannel<br> | <br>用于文件的异步读写<br> |
| <br>AsynchronousServerSocketChannel<br> | <br>服务端异步 socket 通道<br> |
| <br>AsynchronousSokectChannel<br> | <br>客户端异步 socket 通道<br> |

NIO 是同步非阻塞方式的 I/O,而 AIO 是异步非阻塞方式的 I/O。以服务端读取客户端的数据为例,服务端使用 NIO 和 AIO 的区别如下所示。

  • NIO:如果某个 Channel 已经准备好了客户端发来的消息,再通知我。
  • AIO:如果某个 Channel 已经将客户端发来的消息读取完毕,再通知我。

AIO 可以通过 "Future模式" 和 "回调函数"两种方式来实现。

二 实战

package aio;

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.Future;

public class AIODemo {
    // Future模式:读
    public static void test1() throws Exception {
        Path filePath = Paths.get("g:\\abc.txt");
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(filePath);
        // 定义一个buffer,用于存放文件的内容
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        /*
            1 read()的作用
              将 abc.txt 通过 channel 读入 buffer 中(从第0位开始读取)
            2.read()是一个异步的方法:
             (1)会开启一个新线程,并且在这个新线程中读取文件;新线程将文件内容读取完毕前提
                a future.isDone() 的返回值为 true
                b future.get() 方法不再阻塞
             (2)其他线程(此时的main线程)可以执行其他事情
         */
        Future<Integer> future = channel.read(buffer, 0);

        while (!future.isDone()) {
            System.out.println("在 read() 的同时,可以处理其他事情...");
        }
        // future.get():
        // 1 如果读取文件的线程 将文件内容读取完毕,则 get() 会返回读取到的字节数;
        // 2 如果没有读取完毕 get() 方法会一直阻塞;
        Integer readNumber = future.get();
        buffer.flip();
        String data = new String(buffer.array(), 0, buffer.limit());
        System.out.println("read number:" + readNumber);
        System.out.println(data);
    }

    // 回调模式:读
    public static void test2() throws Exception {
        Path path = Paths.get("g:\\abc.txt");
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path, StandardOpenOption.READ);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        // 在read() 方法将文件全部读取到 buffer 之前,main 线程可以异步进行其他操作
        channel.read(buffer, 0, null, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer attachment) {
                buffer.flip();
                String data = new String(buffer.array(), 0, buffer.limit());
                System.out.println(data);
                System.out.println("read() 完毕!");
            }

            @Override
            public void failed(Throwable e, ByteBuffer attachment) {
                System.out.println("异常...");
            }
        });

        while (true) {
            System.out.println("在 read() 完毕以前,可以异步处理其他事情...");
            Thread.sleep(100);
        }
    }

    // Future模式:写
    public static void test3() throws Exception {
        Path path = Paths.get("d:\\abc3.txt");
        AsynchronousFileChannel fileChannel =
                AsynchronousFileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        long position = 0;

        buffer.put("hello world".getBytes());
        buffer.flip();

        Future<Integer> future = fileChannel.write(buffer, position);
        buffer.clear();

        while (!future.isDone()) {
            System.out.println("other thing....");
        }
        Integer result = future.get();
        System.out.println("写完毕!共写入字节数:" + result);
    }

    // 回调模式:写
    public static void test4() throws Exception {
        Path path = Paths.get("d:\\abc4.txt");
        AsynchronousFileChannel fileChannel =
                AsynchronousFileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        buffer.put("hello the world".getBytes());
        buffer.flip();
        fileChannel.write(buffer, 0, null, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer attachment) {
                System.out.println("写完毕!共写入的字节数: " + result);
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                System.out.println("发生了异常...");
            }
        });
        for (; ; ) {
            System.out.println("other things...");
            Thread.sleep(1000);
        }
    }

    public static void main(String[] args) throws Exception {
        test4();
    }
}

三 测试结果

1 读 "Future模式"

在 read() 的同时,可以处理其他事情...

read number:5

hello

2 读 "回调函数"

在 read() 完毕以前,可以异步处理其他事情...

hello

read() 完毕!

在 read() 完毕以前,可以异步处理其他事情...

在 read() 完毕以前,可以异步处理其他事情...

在 read() 完毕以前,可以异步处理其他事情...

在 read() 完毕以前,可以异步处理其他事情...

在 read() 完毕以前,可以异步处理其他事情...

在 read() 完毕以前,可以异步处理其他事情...

3 写 "Future模式"

other thing....

写完毕!共写入字节数:11

4 写 "回调函数"

other things...

写完毕!共写入的字节数: 15

other things...

other things...

other things...

相关文章