c++ 我可以执行get `std::future`并等待它吗?

jxct1oxe  于 2023-05-20  发布在  其他
关注(0)|答案(2)|浏览(112)

因此,您可以创建一个std::future,它在调用.get()之前不工作:

auto f_deferred = std::async( std::launch::deferred, []{ std::cout << "I ran\n"; } );

你也可以写一个可等待的std::future,并且可以在任何线程中的任何时候通过代码准备好:

std::packaged_task<void()> p( []( std::cout << "I also ran\n"; } );
auto f_waitable = p.get_future();

如果你调用f_deferred.wait_for(1ms),它不会等待。如果调用f_deferred.get(),则执行您选择的lambda(在本例中,是打印"I ran\n"的lambda)。
如果调用f_waitable.get(),管理任务的代码无法知道有人在等待未来。但是如果调用f_deferred.wait(1ms);,则会立即得到future_status::deferred
有没有办法把这两个结合起来?
一个具体的用例是线程池在人们排队任务时返回future。如果一个未排队的future是.get() 'd,我想使用被阻塞的线程来执行任务,而不是让它空闲。另一方面,我希望具有返回的期货的人能够确定任务是否完成,甚至等待有限的时间来完成任务。(如果你正在等待,我可以让你的线程在等待期间空闲)
如果做不到这一点,那么在即将到来的提案中是否有解决方案可以比让我的线程池返回一个具有所有限制的未来更好地解决这个问题?我听说未来是没有未来的,而未来所解决的问题有更好的解决方案。

f45qwnt8

f45qwnt81#

我不确定这是否正是你所需要的,但它的目的是说明我在评论中提出的建议。至少,我希望它能给你一些想法来实现你需要的东西,如果它不能满足你的所有需求。
免责声明:这是非常粗糙的。很多事情当然可以做得更优雅和更有效。

#include <iostream>
#include <thread>
#include <future>
#include <memory>
#include <functional>
#include <queue>
#include <random>
#include <chrono>
#include <mutex>

typedef std::packaged_task<void()> task;
typedef std::shared_ptr<task> task_ptr;
typedef std::lock_guard<std::mutex> glock;
typedef std::unique_lock<std::mutex> ulock;
typedef unsigned int uint;
typedef unsigned long ulong;

// For sync'd std::cout
std::mutex cout_mtx;

// For task scheduling
std::mutex task_mtx;
std::condition_variable task_cv;

// Prevents main() from exiting
// before the last worker exits
std::condition_variable kill_switch;

// RNG engine
std::mt19937_64 engine;

// Random sleep (in ms)
std::uniform_int_distribution<int> sleep(100, 10000);

// Task queue
std::queue<task_ptr> task_queue;

static uint tasks = 0;
static std::thread::id main_thread_id;
static uint workers = 0;

template<typename T>
class Task
{
    // Not sure if this needs
    // to be std::atomic.
    // A simple bool might suffice.
    std::atomic<bool> working;
    task_ptr tp;

public:

    Task(task_ptr _tp)
        :
          working(false),
          tp(_tp)
    {}

    inline T get()
    {
        working.store(true);
        (*tp)();
        return tp->get_future().get();
    }

    inline bool is_working()
    {
        return working.load();
    }
};

auto task_factory()
{
    return std::make_shared<task>([&]
    {
        uint task_id(0);
        {
            glock lk(cout_mtx);
            task_id = ++tasks;
            if (std::this_thread::get_id() == main_thread_id)
            {
                std::cout << "Executing task " << task_id << " in main thread.\n";
            }
            else
            {
                std::cout << "Executing task " << task_id << " in worker " << std::this_thread::get_id() << ".\n";
            }
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(sleep(engine)));
        {
            glock lk(cout_mtx);
            std::cout << "\tTask " << task_id << " completed.\n";
        }
    });
}

auto func_factory()
{
    return [&]
    {

        while(true)
        {
            ulock lk(task_mtx);
            task_cv.wait(lk, [&]{ return !task_queue.empty(); });
            Task<void> task(task_queue.front());
            task_queue.pop();

            // Check if the task has been assigned
            if (!task.is_working())
            {
                // Sleep for a while and check again.
                // If it is still not assigned after 1 s,
                // start working on it.
                // You can also place these checks
                // directly in Task::get()
                {
                    glock lk(cout_mtx);
                    std::cout << "\tTask not started, waiting 1 s...\n";
                }
                lk.unlock();
                std::this_thread::sleep_for(std::chrono::milliseconds(1000));
                lk.lock();
                if (!task.is_working())
                {
                    {
                        glock lk(cout_mtx);
                        std::cout << "\tTask not started after 1 s, commencing work...\n";
                    }
                    lk.unlock();
                    task.get();
                    lk.lock();
                }

                if (task_queue.empty())
                {
                    break;
                }
            }
        }
    };
}

int main()
{
    engine.seed(std::chrono::high_resolution_clock::now().time_since_epoch().count());

    std::cout << "Main thread: " << std::this_thread::get_id() << "\n";
    main_thread_id = std::this_thread::get_id();

    for (int i = 0; i < 50; ++i)
    {
        task_queue.push(task_factory());
    }

    std::cout << "Tasks enqueued: " << task_queue.size() << "\n";

    // Spawn 5 workers
    for (int i = 0; i < 5; ++i)
    {
        std::thread([&]
        {
            {
                ulock lk(task_mtx);
                ++workers;
                task_cv.wait(lk);
                {
                    glock lk(cout_mtx);
                    std::cout << "\tWorker started\n";
                }
            }

            auto fn(func_factory());
            fn();

            ulock lk(task_mtx);
            --workers;
            if (workers == 0)
            {
                kill_switch.notify_all();
            }

        }).detach();
    }

    // Notify all workers to start processing the queue
    task_cv.notify_all();

    // This is the important bit:
    // Tasks can be executed by the main thread
    // as well as by the workers.
    // In fact, any thread can grab a task from the queue,
    // check if it is running and start working
    // on it if it is not.
    auto fn(func_factory());
    fn();

    ulock lk(task_mtx);
    if (workers > 0)
    {
        kill_switch.wait(lk);
    }

    return 0;
}

这是我的CMakeLists.txt

cmake_minimum_required(VERSION 3.2)

project(tp_wait)

set(CMAKE_CXX_COMPILER "clang++")
set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

set(CMAKE_BUILD_TYPE "Debug" CACHE STRING "Build type" FORCE)

find_package(Threads REQUIRED)

add_executable(${PROJECT_NAME} "main.cpp")
target_link_libraries(${PROJECT_NAME} ${CMAKE_THREAD_LIBS_INIT})
9njqaruj

9njqaruj2#

Taskflow实现了一个工作窃取调度器。
tf::Executor::corun等待子任务流完成,但不会阻塞工作线程。正在运行等待任务的工作线程然后开始从其他工作线程窃取作业。这发生在corun方法本身内部。
https://taskflow.github.io/
下面的代码创建了1000个任务,这些任务本身等待1000个其他任务完成。同时有2个线程在工作,没有死锁。

tf::Executor executor(2);
tf::Taskflow taskflow;
std::array<tf::Taskflow, 1000> others;

std::atomic<size_t> counter{0};

for(size_t n=0; n<1000; n++) {
  for(size_t i=0; i<1000; i++) {
    others[n].emplace([&](){ counter++; });
  }
  taskflow.emplace([&executor, &tf=others[n]](){
    executor.corun(tf); // Does not block the worker thread ( no deadlock )
    //executor.run(tf).wait();  <- blocking the worker without doing anything
    //                             will introduce deadlock
  });
}
executor.run(taskflow).wait();

相关问题