c++ 为什么这段代码偶尔挂在std::condition_variable::wait()上?

tag5nh1u  于 2023-05-02  发布在  其他
关注(0)|答案(2)|浏览(109)

我用C++实现了一个非常标准的单消费者多生产者模式,并增加了队列中任务数量的限制。
Worker在单独的线程上运行消息队列。任务从生产者发送到Worker。如果队列中已经有max_num_tasks_,则生产者必须等待。
在VS2022中使用工具集v143为Windows x64编译。
下面的代码在Worker::Send()中的cv_.wait(lock, [&] {return (mq.Size() < max_num_tasks_); });语句处偶尔挂起。有人知道我做错了什么吗?
工人的实现

#pragma once
#include <queue>
#include <mutex>
#include <functional>

class Worker {
public:
    ~Worker() {
        Send([this] {done = true; });
        thd.join();
    }

    Worker(size_t max_num_tasks) : max_num_tasks_(max_num_tasks), done(false), thd([this] {
        while (!done) {
            mq.PopFront()();
            cv_.notify_all();
        }
        })
    { }

    void Send(std::function<void()>&& m) {
        {
            std::unique_lock<std::mutex> lock(m_);
            cv_.wait(lock, [&] {return (mq.Size() < max_num_tasks_); });
        }
        mq.PushBack(std::move(m));
    }
private:
    bool done;
    size_t max_num_tasks_;
    ThreadSafeQueue<std::function<void()>> mq;
    std::thread thd;
    std::mutex m_;
    std::condition_variable cv_;
};

线程安全队列的实现

#pragma once
#include <queue>
#include <mutex>
#include <functional>

template <typename T>
class ThreadSafeQueue {
public:
    void PushBack(T&& val) {
        {
            std::unique_lock<std::mutex> lock(q_mutex);
            q.push(std::move(val));
        }
        cv.notify_one();
    }

    void PushBack(const T& val) {
        {
            std::unique_lock<std::mutex> lock(q_mutex);
            q.push(val);
        }
        cv.notify_one();
    }

    T PopFront() {
        std::unique_lock<std::mutex> lock(q_mutex);
        cv.wait(lock, [&] { return (!q.empty()); });
        T v = q.front();
        q.pop();
        return std::move(v);
    }

    bool Empty() const {
        std::unique_lock<std::mutex> lock(q_mutex);
        return q.empty();
    }

    size_t Size() const {
        std::unique_lock<std::mutex> lock(q_mutex);
        return q.size();
    }
private:
    mutable std::mutex q_mutex;
    std::condition_variable cv;
    std::queue<T> q;
};

偶尔触发问题的单元测试示例:

TEST(WorkerTests, Compute_PI) {

        std::atomic<double> value;
        auto multiply_by_pi_over_four = [&value] {
            int n = 750;
            double v = 0;
            for (int i = 0; i < n; i++) {
                v += std::pow(-1, i) / (2 * i + 1);
            }
            value = value * v;
        };

        auto add_pi_over_four = [&value] {
            int n = 750;
            double v = 0;
            for (int i = 0; i < n; i++) {
                v += std::pow(-1, i) / (2 * i + 1);
            }
            value = value + v;
        };

        auto add_one = [&value] {
            value = value + 1;
            std::this_thread::sleep_for(std::chrono::milliseconds(3));
        };

        auto multiply_by_three = [&value] {
            value = value * 3;
            std::this_thread::sleep_for(std::chrono::milliseconds(1));
        };

        for (int i = 0; i < 100; i++) {
            value = 0.0;
            {
                Worker worker(1);
                worker.Send(add_one);
                worker.Send(multiply_by_pi_over_four);
                worker.Send(multiply_by_three);
                worker.Send([]() {});
                worker.Send(add_pi_over_four);
            }
            EXPECT_GE(3.15, value.load());
            EXPECT_LE(3.14, value.load());
        }
    }
8yparm6h

8yparm6h1#

cv_的同步在显示的代码中被破坏。

cv_.wait(lock, [&] {return (mq.Size() < max_num_tasks_); });

wait()执行的事件顺序如下:
1.互斥体的初始条件是锁定的。
1.检查wait()条件。
1.如果condition为false,则互斥体被自动解锁,并等待condition变量。
步骤3是原子的、不可分割的操作,但是步骤2***与步骤3***不可分割。这是一个独立的步骤。
所以:
步骤2发生。return (mq.Size() < max_num_tasks_);的计算结果为false。
B. Worker线程唤醒并快速清空mq中的所有内容,每次都向条件变量发送信号,耗尽mq直到它为空。
另一个线程从休眠中醒来,转到第3步,解锁互斥锁并等待某人向条件变量发送信号。
没有任何东西会发出条件变量的信号。工作线程进入深度睡眠状态,而另一个执行线程在快速清空队列时重复地按响条件变量。
E.对wait()的调用现在等待条件变量的信号,它永远不会。
基本上,这个wait()需要使用***相同的***条件变量和互斥锁,因为工作线程正在使用它来锁定它的线程。这种逻辑,wait()的ing队列下降到其最大值以下。大小,需要移动到事件队列中,并使用相同的互斥体和条件变量。

91zkwejq

91zkwejq2#

我的第二次尝试如下:

class Worker {
public:
    ~Worker() {
        Send([this] {done = true; });
        thd.join();
    }

    Worker(size_t max_num_tasks) : max_num_tasks_(max_num_tasks), done(false), thd([this] {
        while (!done) {
            std::function<void()> f;
            {
                std::unique_lock<std::mutex> lock(m_);
                cv_.wait(lock, [&] { return (!mq.empty()); });
                f = mq.front();
                mq.pop();
            }
            f();
            cv_.notify_all();
        }
        })
    { }

    void Send(std::function<void()>&& m) {
        {
            {
                std::unique_lock<std::mutex> lock(m_);
                cv_.wait(lock, [&] {return (mq.size() < max_num_tasks_); });
                mq.push(std::move(m));
            }
            cv_.notify_all();
        }
    }
private:
    bool done;
    size_t max_num_tasks_;
    std::queue<std::function<void()>> mq;
    std::thread thd;
    std::mutex m_;
    std::condition_variable cv_;
};

然而,从性能的Angular 来看,使用两个cv来避免在Send()中不必要地唤醒所有消费者线程是有益的。但也许这在宏伟的计划中是微不足道的。..

相关问题