我试图运行一个Kraken WebSocket客户端的两个符号,澳元/美元和澳元/日元,在单独的IOContexts在单独的线程,使用C++和Boost。野兽图书馆。我有6个核心可用,并希望在一个单独的线程运行每个符号。但是,当我运行代码时,程序立即终止,没有任何错误,并且不向控制台打印任何数据,控制台应该在消息处理程序内部定义。
http://coliru.stacked-crooked.com/a/51f248c085656de7
kraken_config.json
{
"AUD/USD": 0,
"AUD/JPY": 1
}
这两个函数的总体思想是创建一个多线程架构,允许在单独的线程中为大量符号运行多个WebSocket订阅,同时将线程数量限制为可用内核的数量:
void run_event_loop(const std::vector<std::string>& symbols, net::io_context& ioc)
{
ssl::context ctx{ssl::context::tlsv12_client};
ctx.set_verify_mode(ssl::verify_peer);
ctx.set_default_verify_paths();
for (const auto& symbol : symbols) {
std::cout << symbol << std::endl;
auto krakenws = std::make_shared<krakenWS>(ioc.get_executor(), ctx);
krakenws->subscribe_orderbook(symbol, 10);
}
ioc.run(); // this will block until all asynchronous operations have completed
}
void run_threads_in_cores(){
const std::size_t num_cores = std::thread::hardware_concurrency();
std::vector<std::string> symbols;
std::map<std::string, int> partition_map = load_symbols_partition_map();
for (const auto& pair : partition_map) {
symbols.push_back(pair.first);
}
std::vector<std::thread> threads;
// partition symbols into groups based on the number of available cores
std::vector<std::vector<std::string>> symbol_groups(num_cores);
std::size_t i = 0;
for (const auto& symbol : symbols) {
symbol_groups[i++ % num_cores].push_back(symbol);
}
for (const auto& symbol_group : symbol_groups) {
if(symbol_group.empty()){ // if symbols is less than number of cores you dont need to start the thread
continue;
}
net::io_context ioc;
threads.emplace_back([&symbol_group, &ioc]() { run_event_loop(symbol_group, ioc); });
}
std::for_each(threads.begin(), threads.end(), [](std::thread& t) { t.join(); });
}
但是当我调用run_threads_in_core()时,它会为每个符号启动run_event_loop()的两个线程(在本例中,我们有两个符号,num_cores = 6),因为我们的核心数比符号数多,我们可以为每个符号创建线程。 www.example.com ()似乎没有在run_event_loop()函数中运行,因为整个程序立即返回,并将符号打印到控制台。
但是当我在单独的线程中手动实现每个符号WebSocket io_context时,它运行得很好:
http://coliru.stacked-crooked.com/a/2cc33f6037a3b01f输出:
using host_: ws.kraken.com
using host_: ws.kraken.com
Sending : {"event":"subscribe","pair":["AUD/USD"],"subscription":{"depth":10,"name":"book"}}
Sending : {"event":"subscribe","pair":["AUD/JPY"],"subscription":{"depth":10,"name":"book"}}
Kraken Orderbook snapshot : {"channelID":176,"channelName":"book-10","event":"subscriptionStatus","pair":"AUD/USD","status":"subscribed","subscription":{"depth":10,"name":"book"}}
Kraken Orderbook snapshot : {"channelID":144,"channelName":"book-10","event":"subscriptionStatus","pair":"AUD/JPY","status":"subscribed","subscription":{"depth":10,"name":"book"}}
我试着在www. example前睡一秒钟 www.example.com ()
输出:
AUD/JPY
AUD/USD
using host_: ws.kraken.com
using host_: ws.kraken.com
Sending : {"event":"subscribe","pair":["AUD/JPY"],"subscription":{"depth":10,"name":"book"}}
read: Operation canceled
Sending : {"event":"subscribe","pair":["AUD/USD"],"subscription":{"depth":10,"name":"book"}}
Segmentation fault (core dumped)
我怀疑这个问题可能与我如何定义这两个函数(run_threads_in_core,run_event_loop)有关,但我不确定。有人能帮我找出问题并提出解决方案吗?谢谢大家。
1条答案
按热度按时间wgx48brx1#
我从头到尾都看过了。有很多事情引起了人们的关注。
您的实际问题在这里:
不要通过引用捕获。这只是未定义的行为,因为您不确保生存期(
symbol_group
是一个循环变量),并且它会被修改,导致数据竞争。例如使用开箱即用
整个前提似乎是“使用单独的线程会更快”。除非这是真的,因为减少了代码中的锁争用(未显示),否则这是非常试探性的。通常,网络IO受益于单个线程上的多路复用。