为什么我的nio客户端到服务器的连接会中断?

xriantvc  于 2021-07-03  发布在  Java
关注(0)|答案(1)|浏览(495)

我目前有一个桌面服务器,它是在非阻塞模式下使用niojava库编写的。您可以在这里找到完整的服务器项目。我还为测试目的创建了一个非阻塞nio客户机。你可以在这里找到那个项目。最后,该服务器应该用于android即时消息应用程序。我将客户端和服务器都建立在发送“数据包”进行通信的思想上。我的意思是,包类的引用被打包到字节缓冲区中,并通过套接字通道发送。然后在另一端反序列化并执行它们。
我当前的问题是:我的测试客户机在连接到服务器时似乎会断开连接。我知道问题是客户端的,因为当我使用telnet通过命令行连接到服务器时,连接不会断开。奇怪的是,服务器正在打印与它的连接已经建立,但是当客户端断开连接时,它从不说连接已经丢失/终止。
这是处理所有nio网络的客户机类。。。

  1. import java.io.IOException;
  2. import java.net.InetSocketAddress;
  3. import java.nio.ByteBuffer;
  4. import java.nio.channels.SelectionKey;
  5. import java.nio.channels.Selector;
  6. import java.nio.channels.SocketChannel;
  7. import java.nio.channels.spi.SelectorProvider;
  8. import java.util.Iterator;
  9. import java.util.LinkedList;
  10. import java.util.List;
  11. import org.baiocchi.enigma.client.test.Engine;
  12. import org.baiocchi.enigma.client.test.databundle.DataBundle;
  13. import org.baiocchi.enigma.client.test.packet.Packet;
  14. import org.baiocchi.enigma.client.test.ui.LogType;
  15. import org.baiocchi.enigma.client.test.ui.Logger;
  16. public class Client extends Thread {
  17. private boolean running;
  18. private final int port;
  19. private SocketChannel connection;
  20. private final ByteBuffer buffer;
  21. private Selector selector;
  22. private List<DataBundle> pendingDataBundleQue;
  23. public Client(int port) {
  24. this.port = port;
  25. pendingDataBundleQue = new LinkedList<DataBundle>();
  26. buffer = ByteBuffer.allocate(8192);
  27. try {
  28. selector = initiateSelector();
  29. connection = initiateConnection();
  30. } catch (IOException e) {
  31. e.printStackTrace();
  32. }
  33. }
  34. private Selector initiateSelector() throws IOException {
  35. return SelectorProvider.provider().openSelector();
  36. }
  37. private SocketChannel initiateConnection() throws IOException {
  38. SocketChannel connection = SocketChannel.open();
  39. connection.configureBlocking(false);
  40. connection.connect(new InetSocketAddress("localhost", port));
  41. connection.register(selector, SelectionKey.OP_CONNECT);
  42. return connection;
  43. }
  44. public SocketChannel getConnection() {
  45. return connection;
  46. }
  47. @Override
  48. public void start() {
  49. running = true;
  50. super.start();
  51. }
  52. public void addToPendingDataBundleQue(DataBundle bundle) {
  53. synchronized (pendingDataBundleQue) {
  54. pendingDataBundleQue.add(bundle);
  55. }
  56. }
  57. @Override
  58. public void run() {
  59. while (running) {
  60. System.out.println("loop");
  61. try {
  62. synchronized (pendingDataBundleQue) {
  63. System.out.println("Checking for que changes.");
  64. if (!pendingDataBundleQue.isEmpty()) {
  65. System.out.println("Found que change.");
  66. SelectionKey key = connection.keyFor(selector);
  67. key.interestOps(SelectionKey.OP_WRITE);
  68. }
  69. }
  70. System.out.println("Selecting keys");
  71. selector.select();
  72. System.out.println("Creating selected keys list.");
  73. Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
  74. while (selectedKeys.hasNext()) {
  75. System.out.println("scrolling through list");
  76. SelectionKey key = (SelectionKey) selectedKeys.next();
  77. selectedKeys.remove();
  78. if (!key.isValid()) {
  79. System.out.println("invalid");
  80. continue;
  81. } else if (key.isConnectable()) {
  82. System.out.println("connect");
  83. establishConnection(key);
  84. } else if (key.isReadable()) {
  85. System.out.println("read");
  86. readData(key);
  87. } else if (key.isWritable()) {
  88. System.out.println("write");
  89. writeData(key);
  90. }
  91. }
  92. } catch (IOException | ClassNotFoundException e) {
  93. e.printStackTrace();
  94. }
  95. }
  96. System.out.println("Broke loop");
  97. }
  98. private void writeData(SelectionKey key) throws IOException {
  99. synchronized (pendingDataBundleQue) {
  100. SocketChannel connection = (SocketChannel) key.channel();
  101. for (DataBundle bundle : pendingDataBundleQue) {
  102. System.out.println("sent packet");
  103. connection.write(bundle.getBuffer());
  104. }
  105. pendingDataBundleQue.clear();
  106. if (pendingDataBundleQue.isEmpty()) {
  107. Logger.write("All packets sent.", LogType.CLIENT);
  108. connection.keyFor(selector).interestOps(SelectionKey.OP_READ);
  109. }
  110. }
  111. }
  112. private void readData(SelectionKey key) throws IOException, ClassNotFoundException {
  113. buffer.clear();
  114. int byteCount;
  115. try {
  116. byteCount = connection.read(buffer);
  117. } catch (IOException e) {
  118. Logger.writeException("Connenction closed.", LogType.CLIENT);
  119. connection.close();
  120. key.cancel();
  121. return;
  122. }
  123. if (byteCount == -1) {
  124. Logger.writeException("Connection error. Attempting to terminate connection.", LogType.CLIENT);
  125. key.channel().close();
  126. key.cancel();
  127. }
  128. Engine.getInstance().getPacketProcessor().processData(buffer);
  129. }
  130. private void establishConnection(SelectionKey key) throws IOException {
  131. SocketChannel channel = (SocketChannel) key.channel();
  132. try {
  133. if (channel.finishConnect()) {
  134. Logger.write("Connection established.", LogType.CLIENT);
  135. key.interestOps(SelectionKey.OP_READ);
  136. }
  137. } catch (IOException e) {
  138. Logger.write("Failed to establish connection.", LogType.CLIENT);
  139. key.channel().close();
  140. key.cancel();
  141. return;
  142. }
  143. }
  144. }

这里是处理所有服务器网络的服务器类。

  1. package org.baiocchi.enigma.server.network;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.SelectionKey;
  6. import java.nio.channels.Selector;
  7. import java.nio.channels.ServerSocketChannel;
  8. import java.nio.channels.SocketChannel;
  9. import java.nio.channels.spi.SelectorProvider;
  10. import java.util.ArrayList;
  11. import java.util.Iterator;
  12. import org.baiocchi.enigma.server.Engine;
  13. import org.baiocchi.enigma.server.databundle.DataBundle;
  14. import org.baiocchi.enigma.server.ui.components.logger.LogType;
  15. import org.baiocchi.enigma.server.ui.components.logger.Logger;
  16. public class Server extends Thread {
  17. private boolean running;
  18. private final int port;
  19. private ServerSocketChannel server;
  20. private final ByteBuffer buffer;
  21. private Selector selector;
  22. private ArrayList<DataBundle> pendingDataBundleQue;
  23. public Server(int port) {
  24. this.port = port;
  25. buffer = ByteBuffer.allocate(8192);
  26. pendingDataBundleQue = new ArrayList<DataBundle>();
  27. try {
  28. server = ServerSocketChannel.open().bind(new InetSocketAddress("localhost", port));
  29. server.configureBlocking(false);
  30. selector = SelectorProvider.provider().openSelector();
  31. server.register(selector, SelectionKey.OP_ACCEPT);
  32. } catch (IOException e) {
  33. e.printStackTrace();
  34. }
  35. }
  36. @Override
  37. public void start() {
  38. running = true;
  39. super.start();
  40. }
  41. public void terminateConnection(SocketChannel channel) {
  42. SelectionKey key = channel.keyFor(selector);
  43. try {
  44. key.channel().close();
  45. key.cancel();
  46. } catch (IOException e) {
  47. e.printStackTrace();
  48. }
  49. }
  50. public void addToPendingPacketQue(DataBundle bundle) {
  51. synchronized (pendingDataBundleQue) {
  52. pendingDataBundleQue.add(bundle);
  53. }
  54. }
  55. @Override
  56. public void run() {
  57. while (running) {
  58. try {
  59. synchronized (pendingDataBundleQue) {
  60. if (!pendingDataBundleQue.isEmpty()) {
  61. for (DataBundle bundle : pendingDataBundleQue) {
  62. SelectionKey key = bundle.getChannel().keyFor(selector);
  63. key.interestOps(SelectionKey.OP_WRITE);
  64. }
  65. }
  66. }
  67. selector.select();
  68. Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
  69. while (selectedKeys.hasNext()) {
  70. SelectionKey key = (SelectionKey) selectedKeys.next();
  71. selectedKeys.remove();
  72. if (!key.isValid()) {
  73. continue;
  74. } else if (key.isAcceptable()) {
  75. acceptConnection(key);
  76. } else if (key.isReadable()) {
  77. readData(key);
  78. } else if (key.isWritable()) {
  79. writeData(key);
  80. }
  81. }
  82. } catch (IOException | ClassNotFoundException e) {
  83. Logger.writeException("Internal server error.", LogType.SERVER);
  84. Logger.writeException(e.getMessage(), LogType.SERVER);
  85. }
  86. }
  87. }
  88. private void writeData(SelectionKey key) throws IOException {
  89. DataBundle bundle = null;
  90. for (DataBundle b : pendingDataBundleQue) {
  91. if (b.getChannel().equals((SocketChannel) key.channel())) {
  92. bundle = b;
  93. break;
  94. }
  95. }
  96. if (bundle == null) {
  97. Logger.writeException("Couldn't find out bound packet in list.", LogType.SERVER);
  98. return;
  99. }
  100. SocketChannel connection = bundle.getChannel();
  101. connection.write(bundle.getBuffer());
  102. connection.keyFor(selector).interestOps(SelectionKey.OP_READ);
  103. pendingDataBundleQue.remove(bundle);
  104. }
  105. private void readData(SelectionKey key) throws IOException, ClassNotFoundException {
  106. SocketChannel channel = (SocketChannel) key.channel();
  107. buffer.clear();
  108. int byteCount;
  109. try {
  110. byteCount = channel.read(buffer);
  111. } catch (IOException e) {
  112. Logger.writeException("Connenction terminated.", LogType.SERVER);
  113. channel.close();
  114. key.cancel();
  115. return;
  116. }
  117. if (byteCount == -1) {
  118. Logger.writeException("Connection error. Terminating connection.", LogType.SERVER);
  119. key.channel().close();
  120. key.cancel();
  121. return;
  122. }
  123. Engine.getInstance().getPacketProcessor().processData(buffer, channel);
  124. }
  125. private void acceptConnection(SelectionKey key) throws IOException {
  126. ServerSocketChannel channel = (ServerSocketChannel) key.channel();
  127. SocketChannel connection = channel.accept();
  128. connection.configureBlocking(false);
  129. connection.register(selector, SelectionKey.OP_READ);
  130. Logger.write("Connection established.", LogType.SERVER);
  131. }
  132. }

提前谢谢!

cig3rfwq

cig3rfwq1#

这里没有证据表明客户端已断开连接。如果有,其中一个块将在服务器上执行:

  1. try {
  2. byteCount = channel.read(buffer);
  3. } catch (IOException e) {
  4. Logger.writeException("Connenction terminated.", LogType.SERVER);
  5. channel.close();
  6. key.cancel();
  7. return;
  8. }
  9. if (byteCount == -1) {
  10. Logger.writeException("Connection error. Terminating connection.", LogType.SERVER);
  11. key.channel().close();
  12. key.cancel();
  13. return;
  14. }

我的结论是客户端根本没有断开连接。
我注意到您有一个名为 Logger.writeException() 您从不使用它来记录异常,并且您的日志消息是背对背的: catch (IOException ) 指示一个连接错误,您应该在该错误上记录实际的异常,并且 readBytes == -1 表示“连接已终止”,这不是错误。
我还注意到一个 return 在客户端的相应代码中。
nb关闭频道取消按键。你不需要自己取消。

展开查看全部

相关问题