NIO 中多缓冲区实战

x33g5p2x  于2022-05-23 转载在 其他  
字(4.4k)|赞(0)|评价(0)|浏览(252)

一 点睛

在 scatter-and-gather 场景下,可以将数据写入多个 Buffer 中。在 NIO 中,也能够同时操作多个缓冲区。在很多 Channel 实现类中,都提供了多个重载的 read() 和 write() 方法,下表介绍了 SocketChannel 的 read() 和 write() 方法。

1 read 的重载方法

| <br>read() 的重载方法<br> | <br>简介<br> |
| <br>int read(ByteBuffer dst)<br> | <br>将 Channel 读取到的数据存入一个 ByteBuffer 中<br> |
| <br>long read(ByteBuffer[] dsts,int offset,int length)                                  <br><br>long read(ByteBuffer[] dsts)<br> | <br>将 Channel 读取到的数据存入一个 ByteBuffer 数组中      <br> |

2 write 的重载方法

| <br>write() 的重载方法<br> | <br>简介<br> |
| <br>int write(ByteBuffer srcs)<br> | <br>将 ByteBuffer 中的数据写入 Channel 中<br> |
| <br>long write(ByteBuffer[] srcs,int offset,int length)<br><br>long write(ByteBuffer[] srcs)<br> | <br>将 ByteBuffer 数组中的所有数据写入 Channel 中<br> |

二 实战

1 服务端

  1. package nio;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.net.ServerSocket;
  5. import java.nio.ByteBuffer;
  6. import java.nio.channels.ServerSocketChannel;
  7. import java.nio.channels.SocketChannel;
  8. public class NIOServerWith2Buffers {
  9. public static void main(String[] args) throws IOException {
  10. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  11. ServerSocket serverSocket = serverSocketChannel.socket();
  12. serverSocket.bind(new InetSocketAddress(8888)) ;
  13. ByteBuffer[] buffers = new ByteBuffer[2] ;
  14. buffers[0] = ByteBuffer.allocate(4) ;
  15. buffers[1] = ByteBuffer.allocate(8) ;
  16. int bufferSum = 4 + 8 ;
  17. SocketChannel socketChannel = serverSocketChannel.accept();
  18. while(true){
  19. /*
  20. 读取客户端的消息:
  21. eachReadbytes:每次读取到的字节数
  22. totalReadBytes:当前时刻,一共读取的字节数
  23. 如果 totalReadBytes 小于 "buffers能够容纳的最大字节数",则循环累加读取;否则,清空buffers,重新读取
  24. */
  25. int totalReadBytes = 0 ;
  26. while(totalReadBytes < bufferSum){
  27. long eachReadbytes = socketChannel.read(buffers);
  28. totalReadBytes += eachReadbytes ;
  29. System.out.println("读取到的数据大小:" + eachReadbytes);
  30. }
  31. // 如果buffers已满
  32. for(ByteBuffer buffer : buffers) {
  33. buffer.flip() ;
  34. }
  35. }
  36. }
  37. }

2 客户端

  1. package nio;
  2. import java.io.BufferedReader;
  3. import java.io.IOException;
  4. import java.io.InputStreamReader;
  5. import java.net.InetSocketAddress;
  6. import java.nio.ByteBuffer;
  7. import java.nio.channels.SelectionKey;
  8. import java.nio.channels.Selector;
  9. import java.nio.channels.SocketChannel;
  10. import java.util.Iterator;
  11. import java.util.Set;
  12. /**
  13. * @className: ChatClient
  14. * @description: 客户端
  15. * @date: 2022/5/23
  16. * @author: cakin
  17. */
  18. public class ChatClient {
  19. public static void main(String[] args) {
  20. try {
  21. SocketChannel socketChannel = SocketChannel.open();
  22. // 切换到非阻塞模式
  23. socketChannel.configureBlocking(false);
  24. Selector selector = Selector.open();
  25. // 在客户端的选择器上,注册一个通道,并标识该通道所感兴趣的事件是:向服务端发送连接(连接就绪)。对应于服务端的OP_ACCEPT事件
  26. socketChannel.register(selector, SelectionKey.OP_CONNECT);
  27. socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888));
  28. while (true) {
  29. selector.select();
  30. // selectionKeys 包含了所有通道与选择器之间的关系(请求连接、读、写)
  31. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  32. Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
  33. while (keyIterator.hasNext()) {
  34. SelectionKey selectedKey = keyIterator.next();
  35. // 判断是否连接成功
  36. if (selectedKey.isConnectable()) {
  37. ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
  38. // 创建一个用于和服务端交互的 Channel
  39. SocketChannel client = (SocketChannel) selectedKey.channel();
  40. // 如果状态是:正在连接中...
  41. if (client.isConnectionPending()) {
  42. boolean isConnected = client.finishConnect();
  43. if (isConnected) {
  44. System.out.println("连接成功!访问的端口是:" + port);
  45. // 向服务端发送一条测试消息
  46. sendBuffer.put("connecting".getBytes());
  47. sendBuffer.flip();
  48. client.write(sendBuffer);
  49. }
  50. // 在“聊天室”中,对于客户端而言,可以随时向服务端发送消息(写操作),因此,需要建立一个单独写线程
  51. new Thread(() -> {
  52. while (true) {
  53. try {
  54. sendBuffer.clear();
  55. // 接收用户从控制台输入的内容,并发送给服务端
  56. InputStreamReader reader = new InputStreamReader(System.in);
  57. BufferedReader bReader = new BufferedReader(reader);
  58. String message = bReader.readLine();
  59. sendBuffer.put(message.getBytes());
  60. sendBuffer.flip();
  61. client.write(sendBuffer);
  62. } catch (Exception e) {
  63. e.printStackTrace();
  64. }
  65. }
  66. }).start();
  67. }
  68. // 标记通道感兴趣的事件是:读取服务端消息(读就绪)
  69. client.register(selector, SelectionKey.OP_READ);
  70. // 客户端读取服务端的反馈消息
  71. } else if (selectedKey.isReadable()) {
  72. SocketChannel client = (SocketChannel) selectedKey.channel();
  73. ByteBuffer readBuffer = ByteBuffer.allocate(1024);
  74. // 将服务端的反馈消息放入 readBuffer中
  75. int len = client.read(readBuffer);
  76. if (len > 0) {
  77. String receive = new String(readBuffer.array(), 0, len);
  78. System.out.println(receive);
  79. }
  80. }
  81. }
  82. selectionKeys.clear();
  83. }
  84. } catch (IOException e) {
  85. e.printStackTrace();
  86. }
  87. }
  88. }

三 测试

1 启动服务端,然后启动客户端,会发送测试消息"connecting",共 10 个字节。

2 客户端向服务端发送一个"helloserver",一共有 11 个字节。

3 客户端向服务端发送"byebye",一共有 6 个字节。

4 整个过程中,客户端的打印

连接成功!访问的端口是:8888

helloserver

byebye

5 整个过程中,服务端的打印如下

读取到的数据大小:10

读取到的数据大小:2

读取到的数据大小:9

读取到的数据大小:3

读取到的数据大小:3

相关文章