Java网络编程系列之基于BIO的多人聊天室设计与实现

x33g5p2x  于2021-12-07 转载在 Java  
字(8.4k)|赞(0)|评价(0)|浏览(615)

BIO模型

传统的BIO模型(同步阻塞IO模型)+线程池(多线程)模式:适合活动连接次数不是特别高。该模式是1:1,即每次连接每个线程。

处理步骤:客户端发送请求,接收器Acceptor每接收一个请求,就创建一个新线程,处理完成之后,再通过输出流返回到客户端,然后销毁线程。

缺陷:一个客户端请求,就对应一个线程,客户端的请求和服务端的线程就成为1:1的比例,当请求过多的时候,线程越来越多,最后导致JVM的内存被大量的占用,堆栈溢出,发生异常。

多人聊天室功能概述

多人聊天室设计UML建模之时序图

  • 1.服务器端: 通过ServerSocket对象,绑定端口,调用accept函数,等待客户端连接
  • 2.服务器端维护一个map集合,通过每个客户端的端口号,来唯一识别每一个客户端对象
  • 2.当有客户端连接成功后,通过ChatHandler创建一个新的线程用以处理当前客户端的连接
  • 4.ChatHandler负责将当前连接成功的客户端放入当前在线用户集合中,然后保持与当前客户端的线程连接,直到当前客户端主动退出连接
  • 5.客户端ChatClient通过服务器ip和端口与之建立连接,然后等待接收服务器发送过来的消息
  • 6.同时客户端创建一个单独的线程UserInputHandler,负责发送消息,当客户端这边输入quit指令的时候,表示客户端要退出连接

服务端代码实现

ChatServer

  1. import java.io.BufferedWriter;
  2. import java.io.IOException;
  3. import java.io.OutputStreamWriter;
  4. import java.io.Writer;
  5. import java.net.ServerSocket;
  6. import java.net.Socket;
  7. import java.util.Map;
  8. import java.util.concurrent.ConcurrentHashMap;
  9. public class ChatServer
  10. {
  11. private final int SERVER_PORT=8080;
  12. private final String QUIT="quit";
  13. private ServerSocket serverSocket;
  14. //key是当前客户端对应的端口号,value是服务器与当前客户端之前的关联输出字符流
  15. private Map<Integer, Writer> connectedClients;
  16. public ChatServer()
  17. {
  18. //初始化map集合
  19. connectedClients=new ConcurrentHashMap<>();
  20. }
  21. //新增客户端
  22. public synchronized void addClient(Socket socket) throws IOException {
  23. if(socket!=null)
  24. {
  25. //添加进集合
  26. connectedClients.put(socket.getPort(),new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())));
  27. System.out.println("当前客户端["+socket.getPort()+"]已成功连接到服务器");
  28. }
  29. }
  30. //移除客户端
  31. public synchronized void removeClient(Socket socket) throws IOException {
  32. if(socket!=null)
  33. {
  34. //从集合中移除,并关闭相关流
  35. if(connectedClients.containsKey(socket.getPort()))
  36. {
  37. connectedClients.get(socket.getPort()).close();
  38. connectedClients.remove(socket.getPort());
  39. System.out.println("客户端[" + socket.getPort() + "]已断开连接");
  40. }
  41. }
  42. }
  43. //群发消息给其他客户端
  44. public synchronized void forwardMessage(Socket socket,String msg) throws IOException {
  45. //遍历集合,挨个转发消息--但是不发给自己
  46. // connectedClients.forEach(
  47. // (port,client)->
  48. // {
  49. // if(!port.equals(socket.getPort()))
  50. // {
  51. // System.out.println("当前客户端["+port+"]");
  52. // //这里最好还是把异常外抛出去,因为这里使用lambda写法,因此没法抛出去
  53. // try
  54. // {
  55. // client.write(msg+"\n");
  56. // client.flush();
  57. // }
  58. // catch (IOException e)
  59. // {
  60. // e.printStackTrace();
  61. // }
  62. // }
  63. // }
  64. // );
  65. for (Integer id : connectedClients.keySet()) {
  66. if (!id.equals(socket.getPort())) {
  67. Writer writer = connectedClients.get(id);
  68. //这个地方必须要加\n,否则readline读取不到换行符,会阻塞住
  69. writer.write(msg+"\n");
  70. writer.flush();
  71. }
  72. }
  73. }
  74. //客户端是否准备好退出连接
  75. public boolean clientReadyToQuit(String msg)
  76. {
  77. return QUIT.equals(msg);
  78. }
  79. //关闭服务器端口
  80. public synchronized void close() throws IOException {
  81. serverSocket.close();
  82. System.out.println("关闭socket服务器");
  83. }
  84. //启动服务器
  85. public void start()
  86. {
  87. try
  88. {
  89. //绑定服务器与对应的端口
  90. ServerSocket serverSocket = new ServerSocket(SERVER_PORT);
  91. System.out.println("服务器启动,对应的端口为: "+SERVER_PORT);
  92. //等待客户端连接
  93. while(true)
  94. {
  95. //如果没有客户端连接,这边就会阻塞住
  96. System.out.println("等待客户端连接...");
  97. Socket socket = serverSocket.accept();
  98. System.out.println("客户端连接中...");
  99. //有客户端连接后,创建ChatHandler线程
  100. new Thread(new ChatHandler(this,socket)).start();
  101. }
  102. } catch (IOException e) {
  103. e.printStackTrace();
  104. }
  105. }
  106. //启动服务
  107. public static void main(String[] args) {
  108. ChatServer chatServer=new ChatServer();
  109. chatServer.start();
  110. }
  111. }

ChatHandler

  1. import java.io.BufferedReader;
  2. import java.io.IOException;
  3. import java.io.InputStreamReader;
  4. import java.net.Socket;
  5. public class ChatHandler implements Runnable
  6. {
  7. //保存服务器对象和对应的客户端socket连接对象
  8. private ChatServer chatServer;
  9. private Socket socket;
  10. ChatHandler(ChatServer chatServer,Socket socket)
  11. {
  12. this.chatServer=chatServer;
  13. this.socket=socket;
  14. }
  15. @Override
  16. public void run()
  17. {
  18. try
  19. {
  20. //保存新用户连接进在线用户集合
  21. chatServer.addClient(socket);
  22. //读取用户发送的信息
  23. BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
  24. //判断当前用户是否发送了消息
  25. String msg=null;
  26. //bufferedReader.readLine()是阻塞式的,直到收到客户端发送过来的一条信息
  27. //读取直到信息是以换行符或者回车结尾,然后返回换行符之前的所有信息
  28. //当客户端断开连接后,此时readLine函数会返回null值
  29. while((msg=bufferedReader.readLine())!=null)
  30. {
  31. //检查当前用户是否准备退出
  32. if(chatServer.clientReadyToQuit(msg))
  33. {
  34. break;
  35. }
  36. String clientMsg = "客户端[" + socket.getPort() + "]的消息: " + msg;
  37. System.out.println(clientMsg);
  38. //转发消息
  39. chatServer.forwardMessage(socket,clientMsg);
  40. }
  41. }
  42. catch (IOException e)
  43. {
  44. e.printStackTrace();
  45. }finally
  46. {
  47. try {
  48. chatServer.removeClient(socket);
  49. } catch (IOException e) {
  50. e.printStackTrace();
  51. }
  52. }
  53. }
  54. }

'

客户端代码实现

ChatClient

  1. //客户端
  2. public class ChatClient
  3. {
  4. private final String DEFAULT_SERVER_HOST = "127.0.0.1";
  5. private final int DEFAULT_SERVER_PORT = 8080;
  6. private final String QUIT = "quit";
  7. private Socket socket;
  8. private BufferedReader reader;
  9. private BufferedWriter writer;
  10. // 发送消息给服务器
  11. public void send(String msg) throws IOException
  12. {
  13. //当前当前客户端与服务器端之间的输出流是否关闭
  14. if (!socket.isOutputShutdown())
  15. {
  16. writer.write(msg + "\n");
  17. writer.flush();
  18. }
  19. }
  20. // 从服务器接收消息
  21. public String receive() throws IOException {
  22. String msg = null;
  23. //判断当前客户端与服务器端之间的输入流是否关闭
  24. if (!socket.isInputShutdown()) {
  25. msg = reader.readLine();
  26. }
  27. return msg;
  28. }
  29. // 检查用户是否准备退出
  30. public boolean readyToQuit(String msg) {
  31. return QUIT.equals(msg);
  32. }
  33. public void close() {
  34. if (writer != null) {
  35. try {
  36. System.out.println("关闭socket");
  37. writer.close();
  38. } catch (IOException e) {
  39. e.printStackTrace();
  40. }
  41. }
  42. }
  43. public void start()
  44. {
  45. try {
  46. // 创建socket
  47. socket = new Socket(DEFAULT_SERVER_HOST, DEFAULT_SERVER_PORT);
  48. // 创建IO流
  49. reader = new BufferedReader(
  50. new InputStreamReader(socket.getInputStream())
  51. );
  52. writer = new BufferedWriter(
  53. new OutputStreamWriter(socket.getOutputStream())
  54. );
  55. // 处理用户的输入
  56. new Thread(new UserInputHandler(this)).start();
  57. // 读取服务器转发的消息
  58. String msg = null;
  59. while ((msg = receive()) != null) {
  60. System.out.println(msg);
  61. }
  62. } catch (IOException e) {
  63. e.printStackTrace();
  64. } finally {
  65. close();
  66. }
  67. }
  68. public static void main(String[] args) {
  69. ChatClient chatClient = new ChatClient();
  70. chatClient.start();
  71. }
  72. }

UserInputHandler

  1. public class UserInputHandler implements Runnable {
  2. private ChatClient chatClient;
  3. public UserInputHandler(ChatClient chatClient) {
  4. this.chatClient = chatClient;
  5. }
  6. @Override
  7. public void run() {
  8. try {
  9. // 等待用户输入消息
  10. BufferedReader consoleReader =
  11. new BufferedReader(new InputStreamReader(System.in));
  12. while (true) {
  13. String input = consoleReader.readLine();
  14. // 向服务器发送消息
  15. chatClient.send(input);
  16. // 检查用户是否准备退出
  17. if (chatClient.readyToQuit(input)) {
  18. break;
  19. }
  20. }
  21. } catch (IOException e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. }

伪异步IO编程模型简析

上面版本的聊天室缺陷:

  • 服务端会为每一个客户端都创建一个新的线程用来通信,这样比较耗费资源,好的做法是创建一个线程池用来使用

复习java提供的线程池操作:

代码实现

  1. public class ChatServer
  2. {
  3. private final int SERVER_PORT=8080;
  4. private final String QUIT="quit";
  5. private ServerSocket serverSocket;
  6. //key是当前客户端对应的端口号,value是服务器与当前客户端之前的关联输出字符流
  7. private Map<Integer, Writer> connectedClients;
  8. //维护一个线程池对象
  9. private ExecutorService executorService;
  10. public ChatServer()
  11. {
  12. //初始化map集合
  13. connectedClients=new ConcurrentHashMap<>();
  14. //线程池对象初始化---线程池里面固定线程数量
  15. executorService=Executors.newFixedThreadPool(10);
  16. }
  17. //新增客户端
  18. public synchronized void addClient(Socket socket) throws IOException {
  19. if(socket!=null)
  20. {
  21. //添加进集合
  22. connectedClients.put(socket.getPort(),new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())));
  23. System.out.println("当前客户端["+socket.getPort()+"]已成功连接到服务器");
  24. }
  25. }
  26. //移除客户端
  27. public synchronized void removeClient(Socket socket) throws IOException {
  28. if(socket!=null)
  29. {
  30. //从集合中移除,并关闭相关流
  31. if(connectedClients.containsKey(socket.getPort()))
  32. {
  33. connectedClients.get(socket.getPort()).close();
  34. connectedClients.remove(socket.getPort());
  35. System.out.println("客户端[" + socket.getPort() + "]已断开连接");
  36. }
  37. }
  38. }
  39. //群发消息给其他客户端
  40. public synchronized void forwardMessage(Socket socket,String msg) throws IOException {
  41. //遍历集合,挨个转发消息--但是不发给自己
  42. // connectedClients.forEach(
  43. // (port,client)->
  44. // {
  45. // if(!port.equals(socket.getPort()))
  46. // {
  47. // System.out.println("当前客户端["+port+"]");
  48. // //这里最好还是把异常外抛出去,因为这里使用lambda写法,因此没法抛出去
  49. // try
  50. // {
  51. // client.write(msg+"\n");
  52. // client.flush();
  53. // }
  54. // catch (IOException e)
  55. // {
  56. // e.printStackTrace();
  57. // }
  58. // }
  59. // }
  60. // );
  61. for (Integer id : connectedClients.keySet()) {
  62. if (!id.equals(socket.getPort())) {
  63. Writer writer = connectedClients.get(id);
  64. //这个地方必须要加\n,否则readline读取不到换行符,会阻塞住
  65. writer.write(msg+"\n");
  66. writer.flush();
  67. }
  68. }
  69. }
  70. //客户端是否准备好退出连接
  71. public boolean clientReadyToQuit(String msg)
  72. {
  73. return QUIT.equals(msg);
  74. }
  75. //关闭服务器端口
  76. public synchronized void close() throws IOException {
  77. serverSocket.close();
  78. System.out.println("关闭socket服务器");
  79. }
  80. //启动服务器
  81. public void start()
  82. {
  83. try
  84. {
  85. //绑定服务器与对应的端口
  86. ServerSocket serverSocket = new ServerSocket(SERVER_PORT);
  87. System.out.println("服务器启动,对应的端口为: "+SERVER_PORT);
  88. //等待客户端连接
  89. while(true)
  90. {
  91. //如果没有客户端连接,这边就会阻塞住
  92. System.out.println("等待客户端连接...");
  93. Socket socket = serverSocket.accept();
  94. System.out.println("客户端连接中...");
  95. //有客户端连接后,创建ChatHandler线程
  96. //new Thread(new ChatHandler(this,socket)).start();
  97. //使用线程池管理客户端连接---这里excute会调用线程的start方法
  98. executorService.execute(new ChatHandler(this,socket));
  99. }
  100. } catch (IOException e) {
  101. e.printStackTrace();
  102. }
  103. }
  104. //启动服务
  105. public static void main(String[] args) {
  106. ChatServer chatServer=new ChatServer();
  107. chatServer.start();
  108. }
  109. }

相关文章

最新文章

更多