c++ asio:co_await多个外包异步任务

x3naxklr  于 2023-03-25  发布在  其他
关注(0)|答案(1)|浏览(186)

我有一个单线程的io_context,它作为一个事件循环。我还有多个繁重的任务,它们不能阻塞主事件循环,但也应该并行大量运行。
主循环中的所有任务都是asio::awaitables<void>。这些任务可以创建任意新的可等待任务,也可以将它们派生到io_context的执行程序(asio::co_spawn)中。
并发任务被发布到thread_pool,但是它们并不是在创建了可等待任务后立即开始,而是在我尝试等待它们的时候开始。这是非常烦人的,特别是当创建协程想要创建并产生更多并发任务以在需要时等待它们时。
我如何在创建了等待对象后立即启动这些任务?
下面是一些最小的示例代码,显示了这个问题:

#define ASIO_STANDALONE 1
#define ASIO_HAS_CO_AWAIT 1
#define ASIO_HAS_STD_COROUTINE 1
//

#include <iostream>
#include <thread>

#include <asio/awaitable.hpp>
#include <asio/bind_cancellation_slot.hpp>
#include <asio/cancellation_signal.hpp>
#include <asio/co_spawn.hpp>
#include <asio/io_service.hpp>
#include <asio/post.hpp>
#include <asio/steady_timer.hpp>
#include <asio/thread_pool.hpp>

using namespace std;

auto my_async(auto& context, auto&& func, auto&& handler) {
    return async_initiate<decltype(handler), void(std::error_code e)>(
        [&context](auto&& handler, auto&& func) {
            asio::post(context, [func = std::forward<decltype(func)>(func)]() { func(); });
        },
        handler, std::forward<decltype(func)>(func));
}

auto main() -> int {
    std::cout << "start program\n";

    asio::thread_pool pool(4);
    asio::io_service io_service{};
    std::thread jthrd{};
    asio::executor_work_guard<asio::io_context::executor_type> guard{io_service.get_executor()};
    jthrd = std::thread{[&io_service]() {
        std::cout << "start loop\n";
        io_service.run();
    }};

    asio::steady_timer timer{asio::system_executor{}, asio::steady_timer::duration::max()};

    asio::cancellation_signal signal;
    auto cancel = signal.slot();

    auto expansive_task_generator = [&](std::string coroname) {
        return [&, coroname = std::move(coroname)] {
            std::cout << "begin : " << coroname << " with tid: " << std::this_thread::get_id() << std::endl;
            auto timer = asio::steady_timer(io_service);
            timer.expires_from_now(std::chrono::seconds(10));
            timer.wait();
            std::cout << "10s timer expired" << std::endl;
            std::cout << "end : " << coroname << " with tid: " << std::this_thread::get_id() << std::endl;
        };
    };

    auto mainloop_awaitable = [&]() -> asio::awaitable<void> {
        std::cout << "run mainloop event\n";
        auto awaitable1 = my_async(pool, expansive_task_generator("aw1"), asio::use_awaitable);
        auto awaitable2 = my_async(pool, expansive_task_generator("aw2"), asio::use_awaitable);
        // < possible medium expansive work here>
        auto awaitable3 = my_async(pool, expansive_task_generator("aw3"), asio::use_awaitable);
        auto awaitable4 = my_async(
            pool, [&timer]() { timer.expires_at(asio::steady_timer::time_point::min()); }, asio::use_awaitable);

        auto timer = asio::steady_timer(io_service);
        timer.expires_from_now(std::chrono::seconds(10));
        co_await timer.async_wait(asio::use_awaitable);

        std::cout << "before co_await\n";    // My expectation: tasks are running already
        co_await std::move(awaitable1);      // Problem: These are all executed serially
        // < possible medium expansive work here>
        co_await std::move(awaitable2);    // Problem: These are all executed serially
        co_await std::move(awaitable3);    // Problem: These are all executed serially
        // < possible medium expansive work here>
        co_await std::move(awaitable4);    // Problem: These are all executed serially
        co_await timer.async_wait(asio::use_awaitable);
    };

    auto mainloop_completion = [](std::exception_ptr e) {
        if (e) {
            try {
                std::rethrow_exception(e);
            } catch (const std::exception& e) {
                std::cerr << "mainloop failed with: " << e.what() << std::endl;
            } catch (...) {
                std::cerr << "mainloop failed with unknown exception" << std::endl;
            }
        }
    };

    asio::co_spawn(io_service.get_executor(), std::move(mainloop_awaitable),
                   asio::bind_cancellation_slot(cancel, mainloop_completion));
    guard.reset();
    jthrd.join();
    return 0;
}

另一个好的替代方案是某种asio::future::async_wait(asio::use_awaitable)

mf98qq94

mf98qq941#

std::cout << "before co_await" << std::endl; // My expectation: tasks are running already

awaitable<> s直到co_await-ed才开始。

  • 根据更新日志,experimental::use_promise允许异步操作的即时执行和同步,但我还没有使用过它。

我看到了my_async的一些问题:

auto my_async(auto& context, auto&& func, auto&& handler) {
    return async_initiate<decltype(handler), void(std::error_code e)>(
        [&context](auto&& handler, auto&& func) {
            asio::post(context, [func = std::forward<decltype(func)>(func)]() { func(); });
        },
        handler, std::forward<decltype(func)>(func));
}
  • 它从不执行完成标记的处理程序
  • 它通过引用传递执行上下文,其中执行器更加通用和惯用
  • 软关心:我想知道它的用途是什么,因为它似乎没有做超过post的事情

阅读通过其余的我点

  • 奇怪的是,代码使用了大多数现代功能,但随后使用了弃用的io_service类型
  • 代码混合了执行上下文,没有明确的目标(特别是system_executor上的计时器让我感到困惑)
  • 取消插槽实际上没有被使用
  • 有多个timer变量隐藏了main中的变量。它们中的一些实际上不是,其他的被独占地同步使用,所以我将替换
auto timer = asio::steady_timer(io_service);
 timer.expires_from_now(10s);
 timer.wait();

作者

std::this_thread::sleep_for(10s);

另请注意,一个是用duration::max()初始化的,但重置为time_point::min()-这些是不一致的,可能会导致微妙的错误,具体取决于所使用的比较。

正在等待等待设置

您可以使用parallel_group(请参阅make_parallel_group)或使用awaitable运算符来完成:

asio::steady_timer local_timer(io_service, 5s);
 auto delay = local_timer.async_wait(use_awaitable);

 trace("before co_await");
 co_await (std::move(delay) && std::move(aw1) && std::move(aw2) && std::move(aw3) && std::move(aw4));
 trace("after co_await");

手动使用make_parallel_group的一个好处可能是它允许范围构造

演示

解决上述大部分问题,并添加有趣的计时和跟踪它们:

**第一个e第一个f第一个x

//#define BOOST_ASIO_ENABLE_HANDLER_TRACKING 1
//#define BOOST_ASIO_STANDALONE        1
//#define BOOST_ASIO_HAS_CO_AWAIT      1
//#define BOOST_ASIO_HAS_STD_COROUTINE 1

#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <iomanip>
#include <iostream>
namespace asio = boost::asio;
using namespace std::chrono_literals;

auto my_async(auto executor, auto task, auto&& token) {
    return async_initiate<decltype(token), void(std::error_code e)>(
        [executor](auto handler, auto task) {
            asio::post( //
                executor, [h = std::move(handler), t = std::move(task)]() mutable {
                    t();
                    std::move(h)(std::error_code{});
                });
        },
        std::forward<decltype(token)>(token), std::move(task));
}

static std::atomic_int thread_counter = 0;

thread_local int const t_id  = thread_counter++;
static auto const      now   = std::chrono::steady_clock::now;
static auto const      start = now();

static void trace(auto const&... args) {
    static std::mutex mx;

    std::lock_guard lk(mx);
    std::cout << std::setw(10) << (now() - start) / 1ms << "ms t_id:" << t_id << " ";
    (std::cout << ... << args) << std::endl;
}

int main()  {
    trace("start program");

    asio::thread_pool pool(4);
    asio::io_context  io_service{};

    auto guard = make_work_guard(io_service);

    std::thread thrd = std::thread{[&io_service]() {
        trace("start io_service loop");
        io_service.run();
        trace("end io_service loop");
    }};

    asio::steady_timer main_timer{io_service, asio::steady_timer::time_point::max()};

    asio::cancellation_signal cancel;

    auto make_task = [&](std::string name, auto cost) { // generates expensive tasks
        return [=] {
            trace(name, " begin ", cost/1.s, "s work");
            std::this_thread::sleep_for(cost);
            trace(name, " end ", cost/1.s, "s work");
        };
    };

    using asio::use_awaitable;
    using namespace asio::experimental::awaitable_operators;

    auto mainloop = [&]() -> asio::awaitable<void> {
        trace("run mainloop event");
        auto ex  = pool.get_executor();
        auto aw1 = my_async(ex, make_task("aw1", 6s), use_awaitable);
        auto aw2 = my_async(ex, make_task("aw2", 2s), use_awaitable);
        // < possible medium expansive work here>
        auto aw3 = my_async(ex, make_task("aw3", 4s), use_awaitable);
        auto aw4 = my_async(
            ex,
            [&main_timer]() {
                std::this_thread::sleep_for(3s);
                trace("aw4 cancels main_timer");
                main_timer.expires_at(asio::steady_timer::time_point::min());
            },
            use_awaitable);

        asio::steady_timer local_timer(io_service, 5s);
        auto delay = local_timer.async_wait(use_awaitable);

        trace("before co_await");
        co_await (std::move(delay) && std::move(aw1) && std::move(aw2) && std::move(aw3) && std::move(aw4));
        trace("after co_await");
    };

    trace(std::boolalpha, "main_timer.expiry() == max()? ",
          main_timer.expiry() == std::chrono::steady_clock::time_point::max());

    asio::co_spawn(io_service.get_executor(), mainloop(), [&](std::exception_ptr e) {
        if (e) {
            try {
                std::rethrow_exception(e);
            } catch (const std::exception& e) {
                trace("mainloop failed with: ", e.what());
            } catch (...) {
                trace("mainloop failed with unknown exception");
            }
        }

        trace(std::boolalpha, "main_timer.expiry() == min()? ",
              main_timer.expiry() == std::chrono::steady_clock::time_point::min());
    });

    guard.reset();
    thrd.join();
    trace("bye");
}

输出示例:

0ms t_id:0 start program
     0ms t_id:0 main_timer.expiry() == max()? true
     0ms t_id:1 start io_service loop
     0ms t_id:1 run mainloop event
     0ms t_id:1 before co_await
     0ms t_id:2 aw1 begin 6s work
     0ms t_id:3 aw2 begin 2s work
     1ms t_id:4 aw3 begin 4s work
  2001ms t_id:3 aw2 end 2s work
  3001ms t_id:5 aw4 cancels main_timer
  4001ms t_id:4 aw3 end 4s work
  6001ms t_id:2 aw1 end 6s work
  6001ms t_id:1 after co_await
  6001ms t_id:1 main_timer.expiry() == min()? true
  6001ms t_id:1 end io_service loop
  6001ms t_id:0 bye

相应的处理程序跟踪可视化:

更简单?

从取消槽和main_timer的存在中,我得到的印象是你可能想要全局截止日期语义。我会这样写:http://coliru.stacked-crooked.com/a/e80581a7d26c279a

//#define BOOST_ASIO_ENABLE_HANDLER_TRACKING 1
#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <iomanip>
#include <iostream>
namespace asio = boost::asio;
using namespace std::chrono_literals;
using asio::use_awaitable;
using namespace asio::experimental::awaitable_operators;

static std::atomic_int thread_id_gen = 0;
thread_local int const t_id  = thread_id_gen++;
static auto const      now   = std::chrono::steady_clock::now;
static auto const      start = now();

static void trace(auto const&... args) {
    static std::mutex mx;
    std::lock_guard lk(mx);
    std::cout << std::setw(10) << (now() - start) / 1ms << "ms t_id:" << t_id << " ";
    (std::cout << ... << args) << std::endl;
}

auto my_async(auto ex, auto task, auto&& token) {
    return async_initiate<decltype(token), void(std::string)>(
        [ex, work = make_work_guard(ex)](auto handler, auto task) {
            asio::post( //
                ex,
                [h = std::move(handler), t = std::move(task)]() mutable //
                { std::move(h)(t()); });
        },
        std::forward<decltype(token)>(token), std::move(task));
}

auto task(std::string name, auto cost) { // expensive task
    return [=]() -> std::string {
        trace(name, " begin ", cost / 1.s, "s work");
        std::this_thread::sleep_for(cost); // NOTE: anti-pattern to block in async task!
        trace(name, " end ", cost / 1.s, "s work");
        return name + " returned value";
    };
}

asio::awaitable<void> mainloop(auto pool) {
    trace("run mainloop event");
    auto aw1 = my_async(pool, task("aw1", 6s), use_awaitable);
    auto aw2 = my_async(pool, task("aw2", 2s), use_awaitable);
    auto aw3 = my_async(pool, task("aw3", 4s), use_awaitable);

    asio::steady_timer local_timer(pool, 5s);
    auto delay = local_timer.async_wait(use_awaitable);

    trace("before co_await");
    auto x = co_await (std::move(delay) && std::move(aw1) && std::move(aw2) && std::move(aw3));
    trace(get<0>(x));
    trace(get<1>(x));
    trace(get<2>(x));
    trace("after co_await");
};

asio::awaitable<void> deadline(auto expiry) {
    co_await asio::steady_timer(co_await asio::this_coro::executor, expiry).async_wait(use_awaitable);
    trace("deadline expired");
}

int main()  {
    asio::thread_pool pool(10);
    asio::io_context ioc;
    trace("start program");

    boost::system::error_code ec;
    asio::co_spawn(ioc, /*deadline(3s) || */ mainloop(pool.get_executor()),
                   redirect_error(asio::detached, ec));

    ioc.run();
    trace("mainloop completed (", ec.message(), ")");

    pool.join();
    trace("work pool joined");
}

打印,例如:

0ms t_id:0 start program
     0ms t_id:0 run mainloop event
     0ms t_id:0 before co_await
     0ms t_id:1 aw1 begin 6s work
     0ms t_id:2 aw3 begin 4s work
     0ms t_id:3 aw2 begin 2s work
  2001ms t_id:3 aw2 end 2s work
  4000ms t_id:2 aw3 end 4s work
  6000ms t_id:1 aw1 end 6s work
  6001ms t_id:0 aw1 returned value
  6001ms t_id:0 aw2 returned value
  6001ms t_id:0 aw3 returned value
  6001ms t_id:0 after co_await
  6001ms t_id:0 mainloop completed (Success)
  6001ms t_id:0 work pool joined

取消注解截止日期,如:

asio::co_spawn(ioc, deadline(3s) || mainloop(pool.get_executor()),
               redirect_error(asio::detached, ec));

打印e.g.

0ms t_id:0 start program
     0ms t_id:0 run mainloop event
     0ms t_id:0 before co_await
     0ms t_id:1 aw1 begin 6s work
     0ms t_id:2 aw2 begin 2s work
     0ms t_id:3 aw3 begin 4s work
  2001ms t_id:2 aw2 end 2s work
  3000ms t_id:0 deadline expired
  4001ms t_id:3 aw3 end 4s work
  6000ms t_id:1 aw1 end 6s work
  6001ms t_id:0 mainloop completed (Success)
  6001ms t_id:0 work pool joined

要获得关于完成的更准确的反馈,请执行以下操作:

asio::co_spawn(ioc, mainloop(pool.get_executor()) || deadline(3s), [](std::exception_ptr e, auto r) {
    if (!e)
        trace("mainloop ", 0 == r.index() ? "completed" : "timed out");
    else
        try {
            std::rethrow_exception(e);
        } catch (boost::system::system_error const& se) {
            trace("mainloop error (", se.code().message(), ")");
        }
});

图纸

....
  6001ms t_id:0 mainloop timed out
  6001ms t_id:0 work pool joined

相关问题