传统的BIO模型(同步阻塞IO模型)+线程池(多线程)模式:适合活动连接次数不是特别高。该模式是1:1,即每次连接每个线程。
处理步骤:客户端发送请求,接收器Acceptor每接收一个请求,就创建一个新线程,处理完成之后,再通过输出流返回到客户端,然后销毁线程。
缺陷:一个客户端请求,就对应一个线程,客户端的请求和服务端的线程就成为1:1的比例,当请求过多的时候,线程越来越多,最后导致JVM的内存被大量的占用,堆栈溢出,发生异常。
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ChatServer
{
private final int SERVER_PORT=8080;
private final String QUIT="quit";
private ServerSocket serverSocket;
//key是当前客户端对应的端口号,value是服务器与当前客户端之前的关联输出字符流
private Map<Integer, Writer> connectedClients;
public ChatServer()
{
//初始化map集合
connectedClients=new ConcurrentHashMap<>();
}
//新增客户端
public synchronized void addClient(Socket socket) throws IOException {
if(socket!=null)
{
//添加进集合
connectedClients.put(socket.getPort(),new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())));
System.out.println("当前客户端["+socket.getPort()+"]已成功连接到服务器");
}
}
//移除客户端
public synchronized void removeClient(Socket socket) throws IOException {
if(socket!=null)
{
//从集合中移除,并关闭相关流
if(connectedClients.containsKey(socket.getPort()))
{
connectedClients.get(socket.getPort()).close();
connectedClients.remove(socket.getPort());
System.out.println("客户端[" + socket.getPort() + "]已断开连接");
}
}
}
//群发消息给其他客户端
public synchronized void forwardMessage(Socket socket,String msg) throws IOException {
//遍历集合,挨个转发消息--但是不发给自己
// connectedClients.forEach(
// (port,client)->
// {
// if(!port.equals(socket.getPort()))
// {
// System.out.println("当前客户端["+port+"]");
// //这里最好还是把异常外抛出去,因为这里使用lambda写法,因此没法抛出去
// try
// {
// client.write(msg+"\n");
// client.flush();
// }
// catch (IOException e)
// {
// e.printStackTrace();
// }
// }
// }
// );
for (Integer id : connectedClients.keySet()) {
if (!id.equals(socket.getPort())) {
Writer writer = connectedClients.get(id);
//这个地方必须要加\n,否则readline读取不到换行符,会阻塞住
writer.write(msg+"\n");
writer.flush();
}
}
}
//客户端是否准备好退出连接
public boolean clientReadyToQuit(String msg)
{
return QUIT.equals(msg);
}
//关闭服务器端口
public synchronized void close() throws IOException {
serverSocket.close();
System.out.println("关闭socket服务器");
}
//启动服务器
public void start()
{
try
{
//绑定服务器与对应的端口
ServerSocket serverSocket = new ServerSocket(SERVER_PORT);
System.out.println("服务器启动,对应的端口为: "+SERVER_PORT);
//等待客户端连接
while(true)
{
//如果没有客户端连接,这边就会阻塞住
System.out.println("等待客户端连接...");
Socket socket = serverSocket.accept();
System.out.println("客户端连接中...");
//有客户端连接后,创建ChatHandler线程
new Thread(new ChatHandler(this,socket)).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
//启动服务
public static void main(String[] args) {
ChatServer chatServer=new ChatServer();
chatServer.start();
}
}
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
public class ChatHandler implements Runnable
{
//保存服务器对象和对应的客户端socket连接对象
private ChatServer chatServer;
private Socket socket;
ChatHandler(ChatServer chatServer,Socket socket)
{
this.chatServer=chatServer;
this.socket=socket;
}
@Override
public void run()
{
try
{
//保存新用户连接进在线用户集合
chatServer.addClient(socket);
//读取用户发送的信息
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
//判断当前用户是否发送了消息
String msg=null;
//bufferedReader.readLine()是阻塞式的,直到收到客户端发送过来的一条信息
//读取直到信息是以换行符或者回车结尾,然后返回换行符之前的所有信息
//当客户端断开连接后,此时readLine函数会返回null值
while((msg=bufferedReader.readLine())!=null)
{
//检查当前用户是否准备退出
if(chatServer.clientReadyToQuit(msg))
{
break;
}
String clientMsg = "客户端[" + socket.getPort() + "]的消息: " + msg;
System.out.println(clientMsg);
//转发消息
chatServer.forwardMessage(socket,clientMsg);
}
}
catch (IOException e)
{
e.printStackTrace();
}finally
{
try {
chatServer.removeClient(socket);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
'
//客户端
public class ChatClient
{
private final String DEFAULT_SERVER_HOST = "127.0.0.1";
private final int DEFAULT_SERVER_PORT = 8080;
private final String QUIT = "quit";
private Socket socket;
private BufferedReader reader;
private BufferedWriter writer;
// 发送消息给服务器
public void send(String msg) throws IOException
{
//当前当前客户端与服务器端之间的输出流是否关闭
if (!socket.isOutputShutdown())
{
writer.write(msg + "\n");
writer.flush();
}
}
// 从服务器接收消息
public String receive() throws IOException {
String msg = null;
//判断当前客户端与服务器端之间的输入流是否关闭
if (!socket.isInputShutdown()) {
msg = reader.readLine();
}
return msg;
}
// 检查用户是否准备退出
public boolean readyToQuit(String msg) {
return QUIT.equals(msg);
}
public void close() {
if (writer != null) {
try {
System.out.println("关闭socket");
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void start()
{
try {
// 创建socket
socket = new Socket(DEFAULT_SERVER_HOST, DEFAULT_SERVER_PORT);
// 创建IO流
reader = new BufferedReader(
new InputStreamReader(socket.getInputStream())
);
writer = new BufferedWriter(
new OutputStreamWriter(socket.getOutputStream())
);
// 处理用户的输入
new Thread(new UserInputHandler(this)).start();
// 读取服务器转发的消息
String msg = null;
while ((msg = receive()) != null) {
System.out.println(msg);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
close();
}
}
public static void main(String[] args) {
ChatClient chatClient = new ChatClient();
chatClient.start();
}
}
public class UserInputHandler implements Runnable {
private ChatClient chatClient;
public UserInputHandler(ChatClient chatClient) {
this.chatClient = chatClient;
}
@Override
public void run() {
try {
// 等待用户输入消息
BufferedReader consoleReader =
new BufferedReader(new InputStreamReader(System.in));
while (true) {
String input = consoleReader.readLine();
// 向服务器发送消息
chatClient.send(input);
// 检查用户是否准备退出
if (chatClient.readyToQuit(input)) {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
上面版本的聊天室缺陷:
复习java提供的线程池操作:
代码实现
public class ChatServer
{
private final int SERVER_PORT=8080;
private final String QUIT="quit";
private ServerSocket serverSocket;
//key是当前客户端对应的端口号,value是服务器与当前客户端之前的关联输出字符流
private Map<Integer, Writer> connectedClients;
//维护一个线程池对象
private ExecutorService executorService;
public ChatServer()
{
//初始化map集合
connectedClients=new ConcurrentHashMap<>();
//线程池对象初始化---线程池里面固定线程数量
executorService=Executors.newFixedThreadPool(10);
}
//新增客户端
public synchronized void addClient(Socket socket) throws IOException {
if(socket!=null)
{
//添加进集合
connectedClients.put(socket.getPort(),new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())));
System.out.println("当前客户端["+socket.getPort()+"]已成功连接到服务器");
}
}
//移除客户端
public synchronized void removeClient(Socket socket) throws IOException {
if(socket!=null)
{
//从集合中移除,并关闭相关流
if(connectedClients.containsKey(socket.getPort()))
{
connectedClients.get(socket.getPort()).close();
connectedClients.remove(socket.getPort());
System.out.println("客户端[" + socket.getPort() + "]已断开连接");
}
}
}
//群发消息给其他客户端
public synchronized void forwardMessage(Socket socket,String msg) throws IOException {
//遍历集合,挨个转发消息--但是不发给自己
// connectedClients.forEach(
// (port,client)->
// {
// if(!port.equals(socket.getPort()))
// {
// System.out.println("当前客户端["+port+"]");
// //这里最好还是把异常外抛出去,因为这里使用lambda写法,因此没法抛出去
// try
// {
// client.write(msg+"\n");
// client.flush();
// }
// catch (IOException e)
// {
// e.printStackTrace();
// }
// }
// }
// );
for (Integer id : connectedClients.keySet()) {
if (!id.equals(socket.getPort())) {
Writer writer = connectedClients.get(id);
//这个地方必须要加\n,否则readline读取不到换行符,会阻塞住
writer.write(msg+"\n");
writer.flush();
}
}
}
//客户端是否准备好退出连接
public boolean clientReadyToQuit(String msg)
{
return QUIT.equals(msg);
}
//关闭服务器端口
public synchronized void close() throws IOException {
serverSocket.close();
System.out.println("关闭socket服务器");
}
//启动服务器
public void start()
{
try
{
//绑定服务器与对应的端口
ServerSocket serverSocket = new ServerSocket(SERVER_PORT);
System.out.println("服务器启动,对应的端口为: "+SERVER_PORT);
//等待客户端连接
while(true)
{
//如果没有客户端连接,这边就会阻塞住
System.out.println("等待客户端连接...");
Socket socket = serverSocket.accept();
System.out.println("客户端连接中...");
//有客户端连接后,创建ChatHandler线程
//new Thread(new ChatHandler(this,socket)).start();
//使用线程池管理客户端连接---这里excute会调用线程的start方法
executorService.execute(new ChatHandler(this,socket));
}
} catch (IOException e) {
e.printStackTrace();
}
}
//启动服务
public static void main(String[] args) {
ChatServer chatServer=new ChatServer();
chatServer.start();
}
}
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/m0_53157173/article/details/121756354
内容来源于网络,如有侵权,请联系作者删除!