c++ 避免通知std::condition_variable,如果它是不必要的

tct7dpnv  于 2023-08-09  发布在  其他
关注(0)|答案(2)|浏览(89)

我正在写一个线程池。主线程通过设置一个原子指针来给辅助线程一个任务。worker检查指针,如果它是nullptr,它就等待std::condition_variable,以避免不必要地消耗CPU。如果不是,它将运行作业。
因此,主线程在给工作线程一个作业后,对工作线程的条件变量调用notify_one(),即使工作线程当前正在运行一个作业,而不是等待(即等待)。下一个循环将拾取下一个作业)。
下面是原始代码:

struct Worker
{
    bool wake_up {false};
    std::condition_variable condition;
    std::mutex mutex;

    std::atomic<Job*> next_job {nullptr};
    std::atomic<bool> stop {false};

    void run()
    {
        while (! stop.load())
        {
            auto* job = next_job.exchange(nullptr);

            if (job != nullptr)
                job->run();
            else
                wait_for_job();
        }
    }

    void wait_for_job()
    {
        std::unique_lock lock(mutex);
        if (! wake_up)
            condition.wait(lock, [this] { return wake_up; });
    }

    bool push_job (Job* job)
    {
        Job* expected = nullptr;

        if (w.next_job.compare_exchange_weak(expected, job))
        {
            std::lock_guard lock(w.mutex);
            w.wake_up = true;
            w.condition.notify_all();
            return true;
        }

        return false;
    }
};

int main()
{
    Worker w;
    std::thread t ([&w] { w.run(); });

    // The main thread runs an event loop, where
    // certain events launch background jobs.
    // This is a stand-in for the event loop.
    for (int i = 0; i < 10000; ++i)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds (50));
        w.push_job (chooseJob());
    }

    w.stop.store(true);
    t.join();
    return 0;
}

字符串
这工作得相当好,但是分析表明主线程在notify_one()调用上花费了大量的时间,我想知道是否可以减少这一点。所以我尝试了这个(省略main(),因为它保持不变):

enum class WaitState
{
    waiting,
    running,
    woken
};

struct Worker
{
    std::atomic<WaitState> waiting {WaitState::running};
    std::condition_variable condition;
    std::mutex mutex;

    std::atomic<Job*> next_job {nullptr};
    std::atomic<bool> stop {false};

    void run()
    {
        while (! stop.load())
        {
            auto* job = next_job.exchange(nullptr);

            if (job != nullptr)
                job->run();
            else
                wait_for_job();
        }
    }

    void wait_for_job()
    {
        std::unique_lock lock(mutex);
        auto w = WaitState::running;

        if (waiting.compare_exchange_strong(w, WaitState::waiting))
        {
            const auto stop_waiting = [this] { return waiting.load() == WaitState::woken; };
            condition.wait_for(lock, std::chrono::milliseconds(500), stop_waiting);
        }

        waiting.store(WaitState::running);
    }

    bool push_job (Job* job)
    {
        Job* expected = nullptr;

        if (w.next_job.compare_exchange_weak(expected, job))
        {
            std::lock_guard lock (w.mutex);

            auto w = WaitState::waiting;

            if (waiting.compare_exchange_strong (w, WaitState::woken))
                w.condition.notify_all();
        }
    }
};


但现在我的工作似乎并不总是正确运行,几乎就像工人不再在正确的时间被唤醒一样。我做错了什么?

vlju58qv

vlju58qv1#

不需要原子指针。

struct Worker
{
    std::condition_variable condition;
    std::mutex mutex;

    Job* next_job {nullptr};
    bool stop {false};

    void run()
    {
        std::unique_lock lock(mutex);
    
        while (!stop)
        {
            if (next_job)
                next_job->run();
            next_job = nullptr;
            condition.wait(lock, []{ return next_job || stop; });
        }
    }

    bool push_job (Job* job)
    {
        std::lock_guard lock(w.mutex);

        // If we are here, the worker thread is waiting for the condition
        // and next_job is nullptr.

        next_job = job;
        w.condition.notify_all();
        return true;
    }
};

int main()
{
    Worker w;
    std::thread t ([&w] { w.run(); });

    for (int i = 0; i < 10000; ++i)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds (50));
        w.push_job (chooseJob());
    }

    w.stop = true;
    {
      std::lock_guard lock(w.mutex);
      w.condition.notify_all();
    }
    t.join();
    return 0;
}

字符串

vkc1a9a2

vkc1a9a22#

FYI:你的设计有更深层次的问题。下面是对您尝试执行的操作的高级描述:

10000 times:
    - worker thread waits for next job
    - main thread chooses a job
    - main thread "pushes" job
    - main thread waits while worker thread performs job

字符串
你的主线程消耗了大量的CPU周期,因为工作线程需要完成busy waiting。但是拥有一个工作线程的意义是什么呢?在该循环中的每个时刻,要么一个线程在等待,要么另一个线程在等待。你为什么不干脆这样做呢?

int main() {
    for (int i = 0; i < 10000; ++i) {
        chooseJob()->run();
    };
    return 0;
}


这是四行代码,而不是你的三十五行,它可以更有效地完成同样的工作。如果你不能让多个线程同时做一些有趣的事情,那么就没有理由使用多个线程。在你最初的例子和其他人在这里提出的例子中,主线程和辅助线程只是轮流做事情。每当你看到线程轮流执行时,你可以通过在一个线程中完成所有的工作来改进程序。

相关问题