我有一个单线程的io_context,它作为一个事件循环。我还有多个繁重的任务,它们不能阻塞主事件循环,但也应该并行大量运行。
主循环中的所有任务都是asio::awaitables<void>
。这些任务可以创建任意新的可等待任务,也可以将它们派生到io_context的执行程序(asio::co_spawn
)中。
并发任务被发布到thread_pool,但是它们并不是在创建了可等待任务后立即开始,而是在我尝试等待它们的时候开始。这是非常烦人的,特别是当创建协程想要创建并产生更多并发任务以在需要时等待它们时。
我如何在创建了等待对象后立即启动这些任务?
下面是一些最小的示例代码,显示了这个问题:
#define ASIO_STANDALONE 1
#define ASIO_HAS_CO_AWAIT 1
#define ASIO_HAS_STD_COROUTINE 1
//
#include <iostream>
#include <thread>
#include <asio/awaitable.hpp>
#include <asio/bind_cancellation_slot.hpp>
#include <asio/cancellation_signal.hpp>
#include <asio/co_spawn.hpp>
#include <asio/io_service.hpp>
#include <asio/post.hpp>
#include <asio/steady_timer.hpp>
#include <asio/thread_pool.hpp>
using namespace std;
auto my_async(auto& context, auto&& func, auto&& handler) {
return async_initiate<decltype(handler), void(std::error_code e)>(
[&context](auto&& handler, auto&& func) {
asio::post(context, [func = std::forward<decltype(func)>(func)]() { func(); });
},
handler, std::forward<decltype(func)>(func));
}
auto main() -> int {
std::cout << "start program\n";
asio::thread_pool pool(4);
asio::io_service io_service{};
std::thread jthrd{};
asio::executor_work_guard<asio::io_context::executor_type> guard{io_service.get_executor()};
jthrd = std::thread{[&io_service]() {
std::cout << "start loop\n";
io_service.run();
}};
asio::steady_timer timer{asio::system_executor{}, asio::steady_timer::duration::max()};
asio::cancellation_signal signal;
auto cancel = signal.slot();
auto expansive_task_generator = [&](std::string coroname) {
return [&, coroname = std::move(coroname)] {
std::cout << "begin : " << coroname << " with tid: " << std::this_thread::get_id() << std::endl;
auto timer = asio::steady_timer(io_service);
timer.expires_from_now(std::chrono::seconds(10));
timer.wait();
std::cout << "10s timer expired" << std::endl;
std::cout << "end : " << coroname << " with tid: " << std::this_thread::get_id() << std::endl;
};
};
auto mainloop_awaitable = [&]() -> asio::awaitable<void> {
std::cout << "run mainloop event\n";
auto awaitable1 = my_async(pool, expansive_task_generator("aw1"), asio::use_awaitable);
auto awaitable2 = my_async(pool, expansive_task_generator("aw2"), asio::use_awaitable);
// < possible medium expansive work here>
auto awaitable3 = my_async(pool, expansive_task_generator("aw3"), asio::use_awaitable);
auto awaitable4 = my_async(
pool, [&timer]() { timer.expires_at(asio::steady_timer::time_point::min()); }, asio::use_awaitable);
auto timer = asio::steady_timer(io_service);
timer.expires_from_now(std::chrono::seconds(10));
co_await timer.async_wait(asio::use_awaitable);
std::cout << "before co_await\n"; // My expectation: tasks are running already
co_await std::move(awaitable1); // Problem: These are all executed serially
// < possible medium expansive work here>
co_await std::move(awaitable2); // Problem: These are all executed serially
co_await std::move(awaitable3); // Problem: These are all executed serially
// < possible medium expansive work here>
co_await std::move(awaitable4); // Problem: These are all executed serially
co_await timer.async_wait(asio::use_awaitable);
};
auto mainloop_completion = [](std::exception_ptr e) {
if (e) {
try {
std::rethrow_exception(e);
} catch (const std::exception& e) {
std::cerr << "mainloop failed with: " << e.what() << std::endl;
} catch (...) {
std::cerr << "mainloop failed with unknown exception" << std::endl;
}
}
};
asio::co_spawn(io_service.get_executor(), std::move(mainloop_awaitable),
asio::bind_cancellation_slot(cancel, mainloop_completion));
guard.reset();
jthrd.join();
return 0;
}
另一个好的替代方案是某种asio::future::async_wait(asio::use_awaitable)
。
1条答案
按热度按时间mf98qq941#
awaitable<>
s直到co_await
-ed才开始。experimental::use_promise
允许异步操作的即时执行和同步,但我还没有使用过它。我看到了
my_async
的一些问题:post
的事情阅读通过其余的我点
io_service
类型system_executor
上的计时器让我感到困惑)timer
变量隐藏了main中的变量。它们中的一些实际上不是,其他的被独占地同步使用,所以我将替换作者
另请注意,一个是用
duration::max()
初始化的,但重置为time_point::min()
-这些是不一致的,可能会导致微妙的错误,具体取决于所使用的比较。正在等待等待设置
您可以使用
parallel_group
(请参阅make_parallel_group
)或使用awaitable运算符来完成:手动使用
make_parallel_group
的一个好处可能是它允许范围构造演示
解决上述大部分问题,并添加有趣的计时和跟踪它们:
**第一个e第一个f第一个x
输出示例:
相应的处理程序跟踪可视化:
更简单?
从取消槽和
main_timer
的存在中,我得到的印象是你可能想要全局截止日期语义。我会这样写:http://coliru.stacked-crooked.com/a/e80581a7d26c279a打印,例如:
取消注解截止日期,如:
打印e.g.
要获得关于完成的更准确的反馈,请执行以下操作:
图纸