c++ 条件变量notify_one是否一直尝试,直到它到达带有正 predicate 的线程等待?

vojdkbi0  于 2023-11-19  发布在  其他
关注(0)|答案(1)|浏览(89)

我正在用std::condition_variable测试边缘情况,我测试了一个线程饥饿的场景。场景是有99个生产者和只有一个消费者,他们都在一个最大大小的队列上工作。notify_one(在达到队列的最大大小之后)将击中消费者接近1%。所以这意味着它将击中另一个生产者,它将检查 predicate ,然后等待互斥体。
我希望程序在这一点上挂起,但我看到的是,程序尝试尽可能多的 predicate ,直到最后它击中消费者。如果没有消费者,那么 predicate 将永远在所有等待的线程中检查。
我的问题是:notify_one()会尝试用正 predicate 结果通知第一个等待的线程,这是标准定义的吗?那么为什么它在没有预测的情况下与消费者一起工作(注解代码)。或者它是一个虚假的唤醒,试图唤醒我的等待线程?或者其他什么?
在Windows上使用Clang和MSVC进行测试。
我的代码重现了这个案例:

#include <condition_variable>
#include <mutex>
#include <queue>
#include <vector>
#include <iostream>

class CVSimple
{

public:
    static void test() {
        std::queue<int> que;
        std::mutex m;
        std::condition_variable cv;
        int max_size = 10;
        bool working = true;

        auto producer = [&]() {
            while (working) {
                std::unique_lock<std::mutex> lock(m);
                std::chrono::milliseconds t(200);
                auto predicate = [&que, &max_size]() { 
                    if (que.size() < max_size) {
                        std::cout << "T";
                        return true;
                    } 
                    std::cout << "F";
                    return false;
                };
                if (cv.wait_for(lock, t, predicate)) {
                    std::cout << "+";
                    std::this_thread::sleep_for(std::chrono::milliseconds(50));
                    que.push(1);
                    lock.unlock();
                    std::cout << "N";
                    cv.notify_one();
                }
                else {
                    //std::cout << "P";
                }

            }
        };

        auto consumer = [&]() {
            while (working) {
                std::unique_lock<std::mutex> lock(m);
                std::chrono::milliseconds t(200);
                auto predicate = [&que]() {
                    if (!que.empty()) {
                        std::cout << "t";
                        return true;
                    }
                    else {
                        std::cout << "f";
                        return false;
                    };
                };
                //cv.wait(lock, predicate);
                //std::cout << "-";
                //std::this_thread::sleep_for(std::chrono::milliseconds(50));
                //que.pop();
                //lock.unlock();
                //std::cout << "n";
                //cv.notify_one();

                if (cv.wait_for(lock, t, predicate)) {
                    std::cout << "-";
                    std::this_thread::sleep_for(std::chrono::milliseconds(50));
                    que.pop();
                    lock.unlock();
                    std::cout << "n";
                    cv.notify_one();
                }
                else {
                    std::cout << "o";
                }
            }
        };

        int nprod = 100;
        int ncons = 1;

        std::cout << "Start producers" << std::endl;
        std::vector<std::thread> threads;
        for (int i = 0; i < nprod; ++i) {
            threads.emplace_back(producer);
        }

        std::cout << "Start consumers" << std::endl;
        for (int i = 0; i < ncons; ++i) {
            threads.emplace_back(consumer);
        }

        std::this_thread::sleep_for(std::chrono::seconds(20));
        std::cout << "Stop working" << std::endl;
        working = false;
        for (auto& th : threads) {
            if (th.joinable()) {
                th.join();
            }
        }
        return;
    }
};

个字符
产出:

Start producers
T+Start consumers
NT+NT+NT+NT+NT+NT+NT+NT+NT+NFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFt-nT+NFFFFFFFFFFt-nT+NFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF


所以当队列满了,在最后一个元素被添加并且通知被发送之后,第一个生产者的 predicate 失败(输出中的第一个F)。然后我期望它等待另一个通知(N),这不会发生,但是会发生很多检查。这意味着sb触发它,这是虚假唤醒吗?
我已经尝试了问题中提供的代码,它工作,但我不知道为什么。

3phpmpom

3phpmpom1#

“FFFFFFFFFF”发生在队列满了之后。下面是我的想法:
你的producer函数调用cv.wait_for(lock, t, predicate),其中t是一个 timeout 值。一旦队列满了,你所有的生产者线程都会 * 超时 * 等待一个永远不会到来的通知。每次wait_for调用超时,生产者除了回到循环的顶部并再次调用wait_for之外,什么也不做。对wait_for的每次调用将测试给定的predicate一次,每次打印一个“F”,因为队列仍然是满的。
有了99个生产者线程和200毫秒的超时,我估计你应该每秒看到大约495个“F“,偶尔会有一个“t-nT+N”,每次消费者线程得到一个幸运的通知。
我添加了一个main例程,然后运行程序:

int main(int argc, char* argv[]) {
    CVSimple cvs;
    cvs.test();
}

字符串
我实际上看到的是长时间的“FFFFFFF”,偶尔会出现“t-nt-nt-n......”(通常重复9或10次),然后是相等数量的“T+N”,然后回到“FFFFFFF”。
但是等等!(我听到你哭了。)为什么当消费者的wait_for(...)呼叫超时时,每秒没有打印五个“o“?
我不知道,但也许消费者正渴望得到lock。记住,即使超时后,wait_for(lock,...)也不能返回,直到它重新获得lock,并且有99个其他线程都在为它的所有权而互相争斗。
通常,当消费者 * 确实 * 开始运行时,它几乎总是在任何生产者被允许再次运行之前耗尽整个队列,这一事实强烈暗示您已经遇到了饥饿问题。
†如果你取消注解std::cout << "P"行,它会做一些事情。

相关问题