kcp 为什么我写的KCP测试只能接收窗口大小条消息,超过了无法接收,

8wtpewkr  于 2022-10-25  发布在  其他
关注(0)|答案(8)|浏览(667)

请教如下问题
代码里面有KCP传消息和纯UDP传消息的部分
问题一:为什么我写的KCP测试只能接收窗口大小条消息,超过了无法接收。(本代码是只能接收128条)
问题二:感觉KCP传递的效率明显低于UDP

  1. # include <chrono>
  2. # include <future>
  3. # include <ikcp.h>
  4. # if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
  5. # include <winsock2.h>
  6. # else
  7. # include <sys/socket.h>
  8. # endif
  9. static long long
  10. GetNowTime()
  11. {
  12. return std::chrono::duration_cast<std::chrono::milliseconds>(
  13. std::chrono::high_resolution_clock::now().time_since_epoch())
  14. .count();
  15. }
  16. static long long
  17. iclock64()
  18. {
  19. return GetNowTime();
  20. }
  21. static unsigned int
  22. iclock()
  23. {
  24. return (unsigned int)(iclock64() & 0xfffffffful);
  25. }
  26. struct IKCP
  27. {
  28. ikcpcb* kcp{nullptr};
  29. size_t index{0};
  30. long current{0};
  31. long slap{0};
  32. };
  33. class SocketSender
  34. {
  35. public:
  36. virtual ~SocketSender() { clear(); }
  37. SocketSender(const std::string& net_addr, int port, bool bind_addr = false)
  38. {
  39. WSAStartup(MAKEWORD(2, 2), &wsd);
  40. m_send_socket = socket(AF_INET, SOCK_DGRAM, 0); // UDP
  41. if (INVALID_SOCKET == m_send_socket) {
  42. WSACleanup();
  43. }
  44. memset(&socket_addr, 0, sizeof(SOCKADDR_IN));
  45. socket_addr.sin_family = AF_INET;
  46. if (net_addr == "*" || net_addr.empty()) {
  47. socket_addr.sin_addr.S_un.S_addr = htonl(INADDR_ANY); // inet_addr(net_addr.c_str());
  48. } else {
  49. socket_addr.sin_addr.S_un.S_addr = inet_addr(net_addr.c_str());
  50. }
  51. //socket_addr.sin_addr.S_un.S_addr = inet_addr(net_addr.c_str());
  52. socket_addr.sin_port = htons(port);
  53. int imode{1};
  54. // 设置为非阻塞模式 1
  55. int ret = ioctlsocket(m_send_socket, FIONBIO, (u_long*)&imode);
  56. if (SOCKET_ERROR == ret) {
  57. closesocket(m_send_socket);
  58. WSACleanup();
  59. }
  60. if (bind_addr){
  61. ret = bind(m_send_socket, (sockaddr*)&socket_addr, sizeof(sockaddr));
  62. if (SOCKET_ERROR == ret) {
  63. closesocket(m_send_socket);
  64. WSACleanup();
  65. }
  66. }
  67. }
  68. // 清除数据
  69. void
  70. clear()
  71. {
  72. closesocket(m_send_socket);
  73. WSACleanup();
  74. }
  75. // 发送数据
  76. int
  77. send(const void* data, int size)
  78. {
  79. // send
  80. return sendto(m_send_socket, (char*)data, size, 0, (sockaddr*)&(socket_addr), skt_addr_len);
  81. }
  82. // 接收数据
  83. int
  84. recv(void* data, int maxsize)
  85. {
  86. return recvfrom(m_send_socket, (char*)data, maxsize, 0, (sockaddr*)&socket_addr, &skt_addr_len);
  87. }
  88. protected:
  89. SOCKET m_send_socket;
  90. SOCKADDR_IN socket_addr; // 服务器套接字地址
  91. int skt_addr_len = sizeof(SOCKADDR_IN);
  92. WSADATA wsd;
  93. };
  94. int
  95. udp_output(const char* buf, int len, ikcpcb* kcp, void* user)
  96. {
  97. SocketSender* send_net = (SocketSender*)user;
  98. int ret = send_net->send(buf, len);
  99. return ret;
  100. }
  101. # define PUREUDP 0
  102. void
  103. Publisher1()
  104. {
  105. SocketSender* send_net;
  106. send_net = new SocketSender("127.0.0.1", 6677);
  107. IKCP kcp;
  108. kcp.kcp = ikcp_create(0x11223344, (void*)send_net);
  109. kcp.kcp->output = udp_output;
  110. ikcp_wndsize(kcp.kcp, 128, 128);
  111. ikcp_nodelay(kcp.kcp, 2, 10, 2, 1);
  112. kcp.kcp->rx_minrto = 10;
  113. kcp.kcp->fastresend = 1;
  114. kcp.index = 0;
  115. kcp.current = iclock();
  116. kcp.slap = kcp.current + 20;
  117. std::string str =
  118. "分配string为指向size + 1大小的heap空间,那个多出来的1字节是'\\0'的空间 原始文字是人类用来纪录特 ";
  119. int len = str.size()+1;
  120. char msgdata[2000];
  121. # if PUREUDP
  122. // 纯UDP
  123. while (1) {
  124. ((IUINT32*)msgdata)[0] = kcp.index++;
  125. ((IUINT32*)msgdata)[1] = iclock();
  126. memcpy(msgdata + 8, str.data(), len * sizeof(char));
  127. //printf("Pure UDP Send message: idx = %u, current tm = %u, msg = %s\n", ((IUINT32*)msgdata)[0], ((IUINT32*)msgdata)[1],
  128. // msgdata + 8);
  129. send_net->send(msgdata, str.size()+9);
  130. }
  131. # else
  132. while (1) {
  133. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  134. auto clk = iclock();
  135. kcp.current = clk;
  136. ikcp_update(kcp.kcp, clk);
  137. ((IUINT32*)msgdata)[0] = kcp.index++;
  138. ((IUINT32*)msgdata)[1] = kcp.current;
  139. memcpy(msgdata + 8, str.data(), len * sizeof(char));
  140. int ret = ikcp_send(kcp.kcp, (char*)msgdata, (len + 8) * sizeof(char));
  141. // printf("Send message: idx = %u, current tm = %u, msg = %s\n", ((IUINT32*)msgdata)[0], ((IUINT32*)msgdata)[1],
  142. // msgdata + 8);
  143. }
  144. # endif
  145. delete send_net;
  146. ikcp_release(kcp.kcp);
  147. }
  148. void
  149. Subscriber1()
  150. {
  151. SocketSender* receive_net;
  152. receive_net = new SocketSender("*", 6677, true);
  153. IKCP kcp;
  154. kcp.kcp = ikcp_create(0x11223344, (void*)receive_net);
  155. kcp.kcp->output = udp_output;
  156. ikcp_wndsize(kcp.kcp, 128, 128);
  157. ikcp_nodelay(kcp.kcp, 2, 10, 2, 1);
  158. kcp.kcp->rx_minrto = 10;
  159. kcp.kcp->fastresend = 1;
  160. kcp.current = iclock();
  161. kcp.slap = kcp.current + 20;
  162. const int max_kcp_buff_size = 2000;
  163. const int max_rev_size = 1400;
  164. char data[max_kcp_buff_size]{0};
  165. std::vector<char> data_vec;
  166. # if PUREUDP
  167. // 纯UDP
  168. while (1) {
  169. receive_net->recv(data, 2000);
  170. printf("Pure UDP Receive message: idx = %u, current tm = %u, msg = %s\n", ((IUINT32*)data)[0], ((IUINT32*)data)[1],
  171. data + 8);
  172. }
  173. # else
  174. while (1) {
  175. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  176. auto clk = iclock();
  177. kcp.current = clk;
  178. ikcp_update(kcp.kcp, clk);
  179. memset(data, 0, sizeof(char) * max_kcp_buff_size);
  180. // idx > 128后无法接收成功
  181. while (1) {
  182. int hr = receive_net->recv(data, max_rev_size);
  183. if (hr < 0)
  184. break;
  185. hr = ikcp_input(kcp.kcp, data, hr);
  186. }
  187. data_vec.clear();
  188. memset(data, 0, sizeof(char) * max_kcp_buff_size);
  189. while (1) {
  190. int hr = ikcp_recv(kcp.kcp, data, max_rev_size);
  191. if (hr < 0)
  192. break;
  193. data_vec.insert(data_vec.end(), data, data + hr);
  194. }
  195. printf("KCP Receive message: idx = %u, current tm = %u, msg = %s\n", ((IUINT32*)data)[0], ((IUINT32*)data)[1],
  196. data + 8);
  197. }
  198. # endif
  199. delete receive_net;
  200. ikcp_release(kcp.kcp);
  201. }
  202. int
  203. main()
  204. {
  205. auto thread2 = std::async(std::launch::async, Subscriber1 );
  206. std::this_thread::sleep_for(std::chrono::milliseconds(100));
  207. auto thread1 = std::async(std::launch::async, Publisher1 );
  208. thread1.wait();
  209. thread2.wait();
  210. return 0;
  211. }
exdqitrt

exdqitrt1#

作者你好。窗口不可能无限设大。拆包拼包是说把一个大文件(比如几兆上百M的拆分成不大于窗口份数)然后依次发送和接收?接收完毕后(接收128次后),似乎"窗口满了",如何“刷新”/“清空”窗口(清理发送端还是接收端)?原作者您的example里面也是128的窗口,在同一个循环体内部发送和接收,确是可以无上限ikcp_recv的。

q7solyqu

q7solyqu2#

直接把文件拆分成 1KB 一个包。

krcsximq

krcsximq3#

作者你好。窗口不可能无限设大。拆包拼包是说把一个大文件(比如几兆上百M的拆分成不大于窗口份数)然后依次发送和接收?接收完毕后(接收128次后),似乎"窗口满了",如何“刷新”/“清空”窗口(清理发送端还是接收端)?原作者您的example里面也是128的窗口,在同一个循环体内部发送和接收,确是可以无上限ikcp_recv的。

是的,为什么呢,假如在外面设置了这个拆包分包逻辑,为什么KCP里还有拆包分包呢?KCP里面不会把接收到的数据清除掉留给下一个序号的数据嘛,很疑惑

9avjhtql

9avjhtql4#

感觉作者理解错你想问的了,你的意思是接收了128次后后面的数据就接收不了了,但是作者写的测试的例子可以一直接收,而不是一个包超过窗口大小就接不了

hs1rzwqc

hs1rzwqc5#

时间太久了,有点不记得但是的想法。也是我提问没有描述清楚,应是你说的,假设有1万条消息,接受128条消息后,后续消息没有被接受了。

xwmevbvl

xwmevbvl6#

时间太久了,有点不记得但是的想法。也是我提问没有描述清楚,应是你说的,假设有1万条消息,接受128条消息后,后续消息没有被接受了。

这个要在客户端接收服务端的ACK包然后推进kcp里,才能把snd_buf的窗口往后移,就是把收到ACK包的数据从缓存中清除,就能发送新数据

0qx6xfy6

0qx6xfy67#

时间太久了,有点不记得但是的想法。也是我提问没有描述清楚,应是你说的,假设有1万条消息,接受128条消息后,后续消息没有被接受了。
您没有在客户端进行udp的recvfrom然后input进kcp处理ACK包

e5nszbig

e5nszbig8#

对的,没错,是这样,你可以自己把接收窗口设置大一点,或者你自己实现一个拆包拼包的逻辑。

相关问题