通过选择器 Selector 开发高性能聊天室

x33g5p2x  于2022-05-23 转载在 其他  
字(7.2k)|赞(0)|评价(0)|浏览(276)

一 点睛

选择器(Selector)的核心作用:可以在一个选择器上注册多个通道,并且可以通过选择器切换使用这些通道。

如果不使用选择器,服务端的 I/O 代码,会给每个客户端创建一个新线程,也就是用 N 个线程去处理 N 个客户端请求。因此,如果有 1 万个客户请求,就会创建 1 万个线程,这显然是不合理地。而 NIO 处理这种问题的思路是,用一个线程处理全部请求,并通过 Selector 切换处理不同的请求通道。

二 实战

1 需求

使用 NIO 实现一个聊天室功能。要求如下:服务端启动后可以接收多个客户端连接,每个客户端都可以向服务端发送消息;服务端接受到消息后,会在控制台打印此客户端的信息,并且将此消息发给全部的客户端。

2 服务端

服务端只创建一个处理请求的线程

  1. package nio;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.net.ServerSocket;
  5. import java.nio.ByteBuffer;
  6. import java.nio.channels.SelectionKey;
  7. import java.nio.channels.Selector;
  8. import java.nio.channels.ServerSocketChannel;
  9. import java.nio.channels.SocketChannel;
  10. import java.nio.charset.Charset;
  11. import java.util.HashMap;
  12. import java.util.Iterator;
  13. import java.util.Map;
  14. import java.util.Set;
  15. /**
  16. * @className: ChatServer
  17. * @description: 服务器
  18. * @date: 2022/5/23
  19. * @author: cakin
  20. */
  21. public class ChatServer {
  22. /*
  23. clientsMap:保存所有的客户端
  24. key:客户端的名字
  25. value:客户端连接服务端的Channel
  26. */
  27. private static Map<String, SocketChannel> clientsMap = new HashMap();
  28. public static void main(String[] args) throws IOException {
  29. int[] ports = new int[]{7777, 8888, 9999};
  30. Selector selector = Selector.open();
  31. for (int port : ports) {
  32. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  33. serverSocketChannel.configureBlocking(false);
  34. ServerSocket serverSocket = serverSocketChannel.socket();
  35. // 将聊天服务绑定到 7777、8888和9999 三个端口上
  36. serverSocket.bind(new InetSocketAddress(port));
  37. System.out.println("服务端启动成功,端口" + port);
  38. // 在服务端的选择器上,注册一个通道,并标识该通道所感兴趣的事件是:接收客户端连接(接收就绪)
  39. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  40. }
  41. while (true) {
  42. // 一直阻塞,直到选择器上存在已经就绪的通道(包含感兴趣的事件)
  43. selector.select();
  44. // selectionKeys包含了所有通道与选择器之间的关系(接收连接、读、写)
  45. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  46. Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
  47. // 如果selector中有多个就绪通道(接收就绪、读就绪、写就绪等),则遍历这些通道
  48. while (keyIterator.hasNext()) {
  49. SelectionKey selectedKey = keyIterator.next();
  50. String receive = null;
  51. // 与客户端交互的通道
  52. SocketChannel clientChannel;
  53. try {
  54. // 接收就绪(已经可以接收客户端的连接了)
  55. if (selectedKey.isAcceptable()) {
  56. ServerSocketChannel server = (ServerSocketChannel) selectedKey.channel();
  57. clientChannel = server.accept();
  58. // 切换到非阻塞模式
  59. clientChannel.configureBlocking(false);
  60. // 再在服务端的选择器上,注册第二个通道,并标识该通道所感兴趣的事件是:接收客户端发来的消息(读就绪)
  61. clientChannel.register(selector, SelectionKey.OP_READ);
  62. // 用“key四位随机数”的形式模拟客户端的key值
  63. String key = "key" + (int) (Math.random() * 9000 + 1000);
  64. // 将该建立完毕连接的 通道 保存到clientsMap中
  65. clientsMap.put(key, clientChannel);
  66. // 读就绪(已经可以读取客户端发来的信息了)
  67. } else if (selectedKey.isReadable()) {
  68. clientChannel = (SocketChannel) selectedKey.channel();
  69. ByteBuffer readBuffer = ByteBuffer.allocate(1024);
  70. int result = -1;
  71. try {
  72. // 将服务端读取到的客户端消息,放入 readBuffer 中
  73. result = clientChannel.read(readBuffer);
  74. // 如果终止客户端,则 read() 会抛出 IOException 异常,可以依次判断是否有客户端退出。
  75. } catch (IOException e) {
  76. // 获取退出连接的 client 对应的 key
  77. String clientKey = getClientKey(clientChannel);
  78. System.out.println("客户端" + clientKey + "退出聊天室");
  79. clientsMap.remove(clientKey);
  80. clientChannel.close();
  81. selectedKey.cancel();
  82. continue;
  83. }
  84. if (result > 0) {
  85. readBuffer.flip();
  86. Charset charset = Charset.forName("utf-8");
  87. receive = String.valueOf(charset.decode(readBuffer).array());
  88. // 将读取到的客户端消息,打印在服务端的控制台
  89. System.out.println(clientChannel + ":" + receive);
  90. // 处理客户端第一次发来的连接测试信息
  91. if ("connecting".equals(receive)) {
  92. receive = "新客户端加入聊天!";
  93. }
  94. // 将读取到的客户消息保存在 attachment 中,用于后续向所有客户端转发此消息
  95. selectedKey.attach(receive);
  96. // 将通道所感兴趣的事件标识为:向客户端发送消息(写就绪)
  97. selectedKey.interestOps(SelectionKey.OP_WRITE);
  98. }
  99. // 写就绪
  100. } else if (selectedKey.isWritable()) {
  101. clientChannel = (SocketChannel) selectedKey.channel();
  102. // 获取发送消息从client对应的key
  103. String sendKey = getClientKey(clientChannel);
  104. // 将接收到的消息,拼接成“发送消息的客户端Key:消息”的形式,再广播给所有client
  105. for (Map.Entry<String, SocketChannel> entry : clientsMap.entrySet()) {
  106. SocketChannel eachClient = entry.getValue();
  107. ByteBuffer broadcastMsg = ByteBuffer.allocate(1024);
  108. broadcastMsg.put((sendKey + ":" + selectedKey.attachment()).getBytes());
  109. broadcastMsg.flip();
  110. eachClient.write(broadcastMsg);
  111. }
  112. selectedKey.interestOps(SelectionKey.OP_READ);
  113. }
  114. } catch (Exception e) {
  115. e.printStackTrace();
  116. }
  117. }
  118. selectionKeys.clear();
  119. }
  120. }
  121. public static String getClientKey(SocketChannel clientChannel) {
  122. String sendKey = null;
  123. //很多client在发下消息,通过for找到是哪个client在发消息,找到该client的key
  124. for (Map.Entry<String, SocketChannel> entry : clientsMap.entrySet()) {
  125. if (clientChannel == entry.getValue()) {
  126. //找到发送消息的client所对应的key
  127. sendKey = entry.getKey();
  128. break;
  129. }
  130. }
  131. return sendKey;
  132. }
  133. }

3 客户端

  1. package nio;
  2. import java.io.BufferedReader;
  3. import java.io.IOException;
  4. import java.io.InputStreamReader;
  5. import java.net.InetSocketAddress;
  6. import java.nio.ByteBuffer;
  7. import java.nio.channels.SelectionKey;
  8. import java.nio.channels.Selector;
  9. import java.nio.channels.SocketChannel;
  10. import java.util.Iterator;
  11. import java.util.Set;
  12. /**
  13. * @className: ChatClient
  14. * @description: 客户端
  15. * @date: 2022/5/23
  16. * @author: cakin
  17. */
  18. public class ChatClient {
  19. public static void main(String[] args) {
  20. try {
  21. SocketChannel socketChannel = SocketChannel.open();
  22. // 切换到非阻塞模式
  23. socketChannel.configureBlocking(false);
  24. Selector selector = Selector.open();
  25. // 在客户端的选择器上,注册一个通道,并标识该通道所感兴趣的事件是:向服务端发送连接(连接就绪)。对应于服务端的OP_ACCEPT事件
  26. socketChannel.register(selector, SelectionKey.OP_CONNECT);
  27. // 随机连接到服务端提供的一个端口上
  28. int[] ports = {7777, 8888, 9999};
  29. int port = ports[(int) (Math.random() * 3)];
  30. socketChannel.connect(new InetSocketAddress("127.0.0.1", port));
  31. while (true) {
  32. selector.select();
  33. // selectionKeys 包含了所有通道与选择器之间的关系(请求连接、读、写)
  34. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  35. Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
  36. while (keyIterator.hasNext()) {
  37. SelectionKey selectedKey = keyIterator.next();
  38. // 判断是否连接成功
  39. if (selectedKey.isConnectable()) {
  40. ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
  41. // 创建一个用于和服务端交互的 Channel
  42. SocketChannel client = (SocketChannel) selectedKey.channel();
  43. // 如果状态是:正在连接中...
  44. if (client.isConnectionPending()) {
  45. boolean isConnected = client.finishConnect();
  46. if (isConnected) {
  47. System.out.println("连接成功!访问的端口是:" + port);
  48. // 向服务端发送一条测试消息
  49. sendBuffer.put("connecting".getBytes());
  50. sendBuffer.flip();
  51. client.write(sendBuffer);
  52. }
  53. // 在“聊天室”中,对于客户端而言,可以随时向服务端发送消息(写操作),因此,需要建立一个单独写线程
  54. new Thread(() -> {
  55. while (true) {
  56. try {
  57. sendBuffer.clear();
  58. // 接收用户从控制台输入的内容,并发送给服务端
  59. InputStreamReader reader = new InputStreamReader(System.in);
  60. BufferedReader bReader = new BufferedReader(reader);
  61. String message = bReader.readLine();
  62. sendBuffer.put(message.getBytes());
  63. sendBuffer.flip();
  64. client.write(sendBuffer);
  65. } catch (Exception e) {
  66. e.printStackTrace();
  67. }
  68. }
  69. }).start();
  70. }
  71. // 标记通道感兴趣的事件是:读取服务端消息(读就绪)
  72. client.register(selector, SelectionKey.OP_READ);
  73. // 客户端读取服务端的反馈消息
  74. } else if (selectedKey.isReadable()) {
  75. SocketChannel client = (SocketChannel) selectedKey.channel();
  76. ByteBuffer readBuffer = ByteBuffer.allocate(1024);
  77. // 将服务端的反馈消息放入 readBuffer中
  78. int len = client.read(readBuffer);
  79. if (len > 0) {
  80. String receive = new String(readBuffer.array(), 0, len);
  81. System.out.println(receive);
  82. }
  83. }
  84. }
  85. selectionKeys.clear();
  86. }
  87. } catch (IOException e) {
  88. e.printStackTrace();
  89. }
  90. }
  91. }

三 测试

依次启动服务端和2个客户端,并发送消息

1 服务端打印

服务端启动成功,端口7777

服务端启动成功,端口8888

服务端启动成功,端口9999

java.nio.channels.SocketChannel[connected local=/127.0.0.1:7777 remote=/127.0.0.1:52781]:connecting

java.nio.channels.SocketChannel[connected local=/127.0.0.1:7777 remote=/127.0.0.1:52892]:connecting

java.nio.channels.SocketChannel[connected local=/127.0.0.1:7777 remote=/127.0.0.1:52892]:你好    

java.nio.channels.SocketChannel[connected local=/127.0.0.1:7777 remote=/127.0.0.1:52781]:你来自哪里?            

2 第1个客户端打印

连接成功!访问的端口是:7777

key9664:新客户端加入聊天!

key4978:新客户端加入聊天!

key4978:你好    

你来自哪里?

key9664:你来自哪里?            

3 第2个客户端打印

连接成功!访问的端口是:7777

key4978:新客户端加入聊天!

你好

key4978:你好    

key9664:你来自哪里?            

相关文章