我有一个线程池,同时使用相同的io_context
来运行一个WebSocket流,我这样做是因为首先,我实际上有两个websocket流(我抽象了这个,因为通过测试它似乎不是问题),并且因为我想运行除了websocket之外的其他io操作,即async_read
和async_write
。
每个WebSocket流都使用自己的串,并使用额外的锁定来确保async_read
(分别为async_write
)在另一个到达处理程序之前不会执行。
所以基本上
io_context context;
std::vector<std::thread> pool(std::thread::hardware_concurrency());
...
wss(make_strand(context),ssl);
...
wss.async_read(&loop_read_handler);
...
for(auto& th:pool)
th=std::thread([&]{
try{
start_read_loop();//give work to do to each thread
context.run();
}catch(...){}
wss.close(...);//closing the websocket stream, expected to cancel all threads
context.stop();//with or without it, no change
});
for(auto& th:pool)
th.join();//hangs here since the other threads did not return from run()
当我想让程序停止时,我close(boost::beast::websocket::close_code::normal,ec)
流,这有效地取消了当前线程中的io操作(接收到错误代码为boost::beast::websocket::error::closed
的空消息),但在其他线程中没有:而不是被取消,他们挂起。
深入到代码中,我排除了自己的死锁假设,发现context.run()
没有注意到WebSocket流被关闭,而是继续等待传入消息。
当然,当池被限制为单个线程时,问题就消失了。
从外部或内部调用close(...)
一个io操作不改变这问题.调用context.stop()
对这问题没有任何影响,无论它是被调用外部或内部.
问题是什么?我应该如何让上下文在一个优雅的WebSocket关闭时停止运行?
==============================使用解决方案编辑
多亏了上面的答案,我成功地修改了代码。我没有在每个线程中启动读循环,而是在池初始化之后做一次,但是添加了auto work=make_work_guard(context);
和work.reset()
:
io_context context;
auto work=make_work_guard(context);//<<<<<<<<<<<<<<
std::vector<std::thread> pool(std::thread::hardware_concurrency());
...
wss(make_strand(context),ssl);//I keep it because I will add other streams
...
for(auto& th:pool)
th=std::thread([&]{
try{ context.run(); }catch(...){} //<<<<<<<<<<<<<<<<<<<
close_wss_streams_once_each(...);//cancels all threads
});
start_async_read_loop();//<<<<<<<<<<<<<<
work.reset();//<<<<<<<<<<<<<<<<<
for(auto& th:pool)
th.join();
显然,我不应该在每个线程中发布IO操作,我决定这样做是为了给予所有线程都有工作要做。相反,使用work guqrd可以防止线程过早返回。
1条答案
按热度按时间vd2z7a6w1#
相同io_context来运行网络套接字流
流不是一个进程(甚至不是一个操作)。你不能"运行一个[websocket]流"。你基本上只会运行一个执行排队处理程序的事件循环,除了同步代码。
其他线程:他们没有被取消,而是被挂在
显示的代码回避了相反的问题:为什么所有的线程all不立即返回(因为在启动线程之前不存在任何工作)?很明显,您的实际代码有很大的不同,因此不会发生这种情况。
也许你甚至有一个明确的
work_guard
。如果是这样,这当然解释了为什么事情没有关闭。当然,当池被限制为单个线程时,问题就消失了。
我不确定这对我是否有帮助。从逻辑上讲,线程越少,死锁的可能性就越大。不管怎样,这不是你的问题。
想象的问题代码,但有效
下面是我的设想,只是添加了工作保护,使线程在您发布第一个async_read之前不会全部完成:
现在,让我们构造、连接、ssl握手和ws握手一个websocket客户端(为简单起见,同步进行):
现在让我们添加
loop_read_handler
,显然这是某种(成员)函数,但是我们这里没有类,所以让我们添加一个闭包:当然,我们必须开始第一次阅读:
现在,我可以设置一个计时器,但实际上您希望在应用程序接收到终止信号时正常关闭,因此让我们在演示中这样做:
就这样!现在,我们要做的就是等待。这样,我们就可以把脚手架拆了:
完整列表
在一个简单的演示WSS服务器上运行它:
并且在终端中用Ctrl-C终止,或者向其发送SIGTERM信号:
奖金
整个线程池可以用
asio::thread_pool
替换(更正确!):这样,您就根本不必干预工作守卫(或担心异常的正确处理)。