异步非阻塞式数据传输——AIO的两种实现方式

x33g5p2x  于2022-05-25 转载在 其他  
字(4.1k)|赞(0)|评价(0)|浏览(303)

一 点睛

AIO 是自 JDK 1.7 开始提供,本质是对 NIO 中 Channel 进行的一些扩展,因此 AIO 也称为 NIO.2。具体地讲,AIO 就是在 NIO 的基础上,新增加了下表的 3个 Channel 实现类,这 3 个类也称为异步通道。

| <br>异步通道<br> | <br>简介<br> |
| <br>AsynchronousFileChannel<br> | <br>用于文件的异步读写<br> |
| <br>AsynchronousServerSocketChannel<br> | <br>服务端异步 socket 通道<br> |
| <br>AsynchronousSokectChannel<br> | <br>客户端异步 socket 通道<br> |

NIO 是同步非阻塞方式的 I/O,而 AIO 是异步非阻塞方式的 I/O。以服务端读取客户端的数据为例,服务端使用 NIO 和 AIO 的区别如下所示。

  • NIO:如果某个 Channel 已经准备好了客户端发来的消息,再通知我。
  • AIO:如果某个 Channel 已经将客户端发来的消息读取完毕,再通知我。

AIO 可以通过 "Future模式" 和 "回调函数"两种方式来实现。

二 实战

  1. package aio;
  2. import java.nio.ByteBuffer;
  3. import java.nio.channels.AsynchronousFileChannel;
  4. import java.nio.channels.CompletionHandler;
  5. import java.nio.file.Path;
  6. import java.nio.file.Paths;
  7. import java.nio.file.StandardOpenOption;
  8. import java.util.concurrent.Future;
  9. public class AIODemo {
  10. // Future模式:读
  11. public static void test1() throws Exception {
  12. Path filePath = Paths.get("g:\\abc.txt");
  13. AsynchronousFileChannel channel = AsynchronousFileChannel.open(filePath);
  14. // 定义一个buffer,用于存放文件的内容
  15. ByteBuffer buffer = ByteBuffer.allocate(1024);
  16. /*
  17. 1 read()的作用
  18. 将 abc.txt 通过 channel 读入 buffer 中(从第0位开始读取)
  19. 2.read()是一个异步的方法:
  20. (1)会开启一个新线程,并且在这个新线程中读取文件;新线程将文件内容读取完毕前提
  21. a future.isDone() 的返回值为 true
  22. b future.get() 方法不再阻塞
  23. (2)其他线程(此时的main线程)可以执行其他事情
  24. */
  25. Future<Integer> future = channel.read(buffer, 0);
  26. while (!future.isDone()) {
  27. System.out.println("在 read() 的同时,可以处理其他事情...");
  28. }
  29. // future.get():
  30. // 1 如果读取文件的线程 将文件内容读取完毕,则 get() 会返回读取到的字节数;
  31. // 2 如果没有读取完毕 get() 方法会一直阻塞;
  32. Integer readNumber = future.get();
  33. buffer.flip();
  34. String data = new String(buffer.array(), 0, buffer.limit());
  35. System.out.println("read number:" + readNumber);
  36. System.out.println(data);
  37. }
  38. // 回调模式:读
  39. public static void test2() throws Exception {
  40. Path path = Paths.get("g:\\abc.txt");
  41. AsynchronousFileChannel channel = AsynchronousFileChannel.open(path, StandardOpenOption.READ);
  42. ByteBuffer buffer = ByteBuffer.allocate(1024);
  43. // 在read() 方法将文件全部读取到 buffer 之前,main 线程可以异步进行其他操作
  44. channel.read(buffer, 0, null, new CompletionHandler<Integer, ByteBuffer>() {
  45. @Override
  46. public void completed(Integer result, ByteBuffer attachment) {
  47. buffer.flip();
  48. String data = new String(buffer.array(), 0, buffer.limit());
  49. System.out.println(data);
  50. System.out.println("read() 完毕!");
  51. }
  52. @Override
  53. public void failed(Throwable e, ByteBuffer attachment) {
  54. System.out.println("异常...");
  55. }
  56. });
  57. while (true) {
  58. System.out.println("在 read() 完毕以前,可以异步处理其他事情...");
  59. Thread.sleep(100);
  60. }
  61. }
  62. // Future模式:写
  63. public static void test3() throws Exception {
  64. Path path = Paths.get("d:\\abc3.txt");
  65. AsynchronousFileChannel fileChannel =
  66. AsynchronousFileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
  67. ByteBuffer buffer = ByteBuffer.allocate(1024);
  68. long position = 0;
  69. buffer.put("hello world".getBytes());
  70. buffer.flip();
  71. Future<Integer> future = fileChannel.write(buffer, position);
  72. buffer.clear();
  73. while (!future.isDone()) {
  74. System.out.println("other thing....");
  75. }
  76. Integer result = future.get();
  77. System.out.println("写完毕!共写入字节数:" + result);
  78. }
  79. // 回调模式:写
  80. public static void test4() throws Exception {
  81. Path path = Paths.get("d:\\abc4.txt");
  82. AsynchronousFileChannel fileChannel =
  83. AsynchronousFileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
  84. ByteBuffer buffer = ByteBuffer.allocate(1024);
  85. buffer.put("hello the world".getBytes());
  86. buffer.flip();
  87. fileChannel.write(buffer, 0, null, new CompletionHandler<Integer, ByteBuffer>() {
  88. @Override
  89. public void completed(Integer result, ByteBuffer attachment) {
  90. System.out.println("写完毕!共写入的字节数: " + result);
  91. }
  92. @Override
  93. public void failed(Throwable exc, ByteBuffer attachment) {
  94. System.out.println("发生了异常...");
  95. }
  96. });
  97. for (; ; ) {
  98. System.out.println("other things...");
  99. Thread.sleep(1000);
  100. }
  101. }
  102. public static void main(String[] args) throws Exception {
  103. test4();
  104. }
  105. }

三 测试结果

1 读 "Future模式"

在 read() 的同时,可以处理其他事情...

read number:5

hello

2 读 "回调函数"

在 read() 完毕以前,可以异步处理其他事情...

hello

read() 完毕!

在 read() 完毕以前,可以异步处理其他事情...

在 read() 完毕以前,可以异步处理其他事情...

在 read() 完毕以前,可以异步处理其他事情...

在 read() 完毕以前,可以异步处理其他事情...

在 read() 完毕以前,可以异步处理其他事情...

在 read() 完毕以前,可以异步处理其他事情...

3 写 "Future模式"

other thing....

写完毕!共写入字节数:11

4 写 "回调函数"

other things...

写完毕!共写入的字节数: 15

other things...

other things...

other things...

相关文章