为什么boost::asio::io_context::run()在boost::beast::WebSocket::stream::close()之后仍然阻塞?

ddrv8njm  于 2023-01-21  发布在  其他
关注(0)|答案(1)|浏览(455)

我有一个线程池,同时使用相同的io_context来运行一个WebSocket流,我这样做是因为首先,我实际上有两个websocket流(我抽象了这个,因为通过测试它似乎不是问题),并且因为我想运行除了websocket之外的其他io操作,即async_readasync_write
每个WebSocket流都使用自己的串,并使用额外的锁定来确保async_read(分别为async_write)在另一个到达处理程序之前不会执行。
所以基本上

  1. io_context context;
  2. std::vector<std::thread> pool(std::thread::hardware_concurrency());
  3. ...
  4. wss(make_strand(context),ssl);
  5. ...
  6. wss.async_read(&loop_read_handler);
  7. ...
  8. for(auto& th:pool)
  9. th=std::thread([&]{
  10. try{
  11. start_read_loop();//give work to do to each thread
  12. context.run();
  13. }catch(...){}
  14. wss.close(...);//closing the websocket stream, expected to cancel all threads
  15. context.stop();//with or without it, no change
  16. });
  17. for(auto& th:pool)
  18. th.join();//hangs here since the other threads did not return from run()

当我想让程序停止时,我close(boost::beast::websocket::close_code::normal,ec)流,这有效地取消了当前线程中的io操作(接收到错误代码为boost::beast::websocket::error::closed的空消息),但在其他线程中没有:而不是被取消,他们挂起。
深入到代码中,我排除了自己的死锁假设,发现context.run()没有注意到WebSocket流被关闭,而是继续等待传入消息。
当然,当池被限制为单个线程时,问题就消失了。
从外部或内部调用close(...)一个io操作不改变这问题.调用context.stop()对这问题没有任何影响,无论它是被调用外部或内部.
问题是什么?我应该如何让上下文在一个优雅的WebSocket关闭时停止运行?
==============================使用解决方案编辑
多亏了上面的答案,我成功地修改了代码。我没有在每个线程中启动读循环,而是在池初始化之后做一次,但是添加了auto work=make_work_guard(context);work.reset()

  1. io_context context;
  2. auto work=make_work_guard(context);//<<<<<<<<<<<<<<
  3. std::vector<std::thread> pool(std::thread::hardware_concurrency());
  4. ...
  5. wss(make_strand(context),ssl);//I keep it because I will add other streams
  6. ...
  7. for(auto& th:pool)
  8. th=std::thread([&]{
  9. try{ context.run(); }catch(...){} //<<<<<<<<<<<<<<<<<<<
  10. close_wss_streams_once_each(...);//cancels all threads
  11. });
  12. start_async_read_loop();//<<<<<<<<<<<<<<
  13. work.reset();//<<<<<<<<<<<<<<<<<
  14. for(auto& th:pool)
  15. th.join();

显然,我不应该在每个线程中发布IO操作,我决定这样做是为了给予所有线程都有工作要做。相反,使用work guqrd可以防止线程过早返回。

vd2z7a6w

vd2z7a6w1#

相同io_context来运行网络套接字流
流不是一个进程(甚至不是一个操作)。你不能"运行一个[websocket]流"。你基本上只会运行一个执行排队处理程序的事件循环,除了同步代码。
其他线程:他们没有被取消,而是被挂在
显示的代码回避了相反的问题:为什么所有的线程all不立即返回(因为在启动线程之前不存在任何工作)?很明显,您的实际代码有很大的不同,因此不会发生这种情况。
也许你甚至有一个明确的work_guard。如果是这样,这当然解释了为什么事情没有关闭。
当然,当池被限制为单个线程时,问题就消失了。
我不确定这对我是否有帮助。从逻辑上讲,线程越少,死锁的可能性就越大。不管怎样,这不是你的问题。

想象的问题代码,但有效

下面是我的设想,只是添加了工作保护,使线程在您发布第一个async_read之前不会全部完成:

  1. net::io_context ioc;
  2. std::vector<std::thread> pool(std::thread::hardware_concurrency());
  3. auto work = make_work_guard(ioc);
  4. for (auto& th : pool)
  5. th = std::thread{[&ioc] { try { ioc.run(); } catch (...) { } }};

现在,让我们构造、连接、ssl握手和ws握手一个websocket客户端(为简单起见,同步进行):

  1. sctx ctx(sctx::tlsv13_client);
  2. Ws wss(make_strand(ioc), ctx);
  3. auto& s = beast::get_lowest_layer(wss);
  4. s.connect({{}, 8989});
  5. wss.next_layer().handshake(Ws::next_layer_type::handshake_type::client);
  6. wss.handshake("localhost", "/");

现在让我们添加loop_read_handler,显然这是某种(成员)函数,但是我们这里没有类,所以让我们添加一个闭包:

  1. std::function<void(error_code, size_t)> loop_read_handler;
  2. beast::flat_buffer buf;
  3. loop_read_handler = [&](error_code ec, size_t n) {
  4. std::cout << "loop_read_handler " << ec.message() << ", " << n << std::endl;
  5. if (n)
  6. std::cout << "Received " << quoted(beast::buffers_to_string(buf.cdata())) << std::endl;
  7. if (!ec) {
  8. buf.consume(n);
  9. wss.async_read(buf, loop_read_handler);
  10. }
  11. };

当然,我们必须开始第一次阅读:

  1. wss.async_read(buf, loop_read_handler); // not on strand, because nothing is yet on the pool

现在,我可以设置一个计时器,但实际上您希望在应用程序接收到终止信号时正常关闭,因此让我们在演示中这样做:

  1. net::signal_set ss(ioc, SIGINT, SIGTERM); // SIGINT e.g. from Ctrl-C in a terminal
  2. ss.async_wait([&](error_code ec, int sig) {
  3. std::cout << "signal " << ::strsignal(sig) << " (" << ec.message() << ")" << std::endl;
  4. if (!ec) {
  5. // on strand:
  6. post(wss.get_executor(), [&wss] { wss.close(websocket::normal); });
  7. }
  8. });

就这样!现在,我们要做的就是等待。这样,我们就可以把脚手架拆了:

  1. // from this point we're okay returning, as soon as the read loop stops
  2. work.reset();
  3. std::cout << "waiting for graceful shutdown" << std::endl;
  4. for (auto& th : pool)
  5. th.join();
  6. std::cout << "graceful shutdown complete" << std::endl;

完整列表

    • 生活在科里鲁**
  1. #include <boost/asio.hpp>
  2. #include <boost/asio/ssl.hpp>
  3. #include <boost/beast.hpp>
  4. #include <boost/beast/websocket/ssl.hpp>
  5. #include <iomanip>
  6. #include <iostream>
  7. namespace net = boost::asio;
  8. namespace ssl = net::ssl;
  9. namespace beast = boost::beast;
  10. namespace websocket = beast::websocket;
  11. using boost::system::error_code;
  12. using net::ip::tcp;
  13. using sctx = ssl::context;
  14. using Ws = websocket::stream<ssl::stream<tcp::socket>>;
  15. int main() {
  16. net::io_context ioc;
  17. std::vector<std::thread> pool(std::thread::hardware_concurrency());
  18. auto work = make_work_guard(ioc);
  19. for (auto& th : pool)
  20. th = std::thread{[&ioc] { try { ioc.run(); } catch (...) { } }};
  21. sctx ctx(sctx::tlsv13_client);
  22. Ws wss(make_strand(ioc), ctx);
  23. auto& s = beast::get_lowest_layer(wss);
  24. s.connect({{}, 8989});
  25. wss.next_layer().handshake(Ws::next_layer_type::handshake_type::client);
  26. wss.handshake("localhost", "/");
  27. std::function<void(error_code, size_t)> loop_read_handler;
  28. beast::flat_buffer buf;
  29. loop_read_handler = [&](error_code ec, size_t n) {
  30. std::cout << "loop_read_handler " << ec.message() << ", " << n << std::endl;
  31. if (n)
  32. std::cout << "Received " << quoted(beast::buffers_to_string(buf.cdata())) << std::endl;
  33. if (!ec) {
  34. buf.consume(n);
  35. wss.async_read(buf, loop_read_handler);
  36. }
  37. };
  38. wss.async_read(buf, loop_read_handler); // not on strand, because nothing is yet on the pool
  39. net::signal_set ss(ioc, SIGINT, SIGTERM); // SIGINT e.g. from Ctrl-C in a terminal
  40. ss.async_wait([&](error_code ec, int sig) {
  41. std::cout << "signal " << ::strsignal(sig) << " (" << ec.message() << ")" << std::endl;
  42. if (!ec) {
  43. // on strand:
  44. post(wss.get_executor(), [&wss] { wss.close(websocket::normal); });
  45. }
  46. });
  47. // from this point we're okay returning, as soon as the read loop stops
  48. work.reset();
  49. std::cout << "waiting for graceful shutdown" << std::endl;
  50. for (auto& th : pool)
  51. th.join();
  52. std::cout << "graceful shutdown complete" << std::endl;
  53. }

在一个简单的演示WSS服务器上运行它:

  1. websocketd -port 8989 -ssl --sslcert server.pem --sslkey server.pem ping www.google.com

并且在终端中用Ctrl-C终止,或者向其发送SIGTERM信号:

奖金

整个线程池可以用asio::thread_pool替换(更正确!):

  1. int main() {
  2. net::thread_pool ioc;
  3. // ...
  4. Ws wss(make_strand(ioc), ctx);
  5. // ...
  6. // from this point we're okay returning, as soon as the read loop stops
  7. std::cout << "waiting for graceful shutdown" << std::endl;
  8. ioc.join();
  9. std::cout << "graceful shutdown complete" << std::endl;
  10. }

这样,您就根本不必干预工作守卫(或担心异常的正确处理)。

展开查看全部

相关问题