我正在用std::condition_variable
测试边缘情况,我测试了一个线程饥饿的场景。场景是有99个生产者和只有一个消费者,他们都在一个最大大小的队列上工作。notify_one(在达到队列的最大大小之后)将击中消费者接近1%。所以这意味着它将击中另一个生产者,它将检查 predicate ,然后等待互斥体。
我希望程序在这一点上挂起,但我看到的是,程序尝试尽可能多的 predicate ,直到最后它击中消费者。如果没有消费者,那么 predicate 将永远在所有等待的线程中检查。
我的问题是:notify_one()会尝试用正 predicate 结果通知第一个等待的线程,这是标准定义的吗?那么为什么它在没有预测的情况下与消费者一起工作(注解代码)。或者它是一个虚假的唤醒,试图唤醒我的等待线程?或者其他什么?
在Windows上使用Clang和MSVC进行测试。
我的代码重现了这个案例:
#include <condition_variable>
#include <mutex>
#include <queue>
#include <vector>
#include <iostream>
class CVSimple
{
public:
static void test() {
std::queue<int> que;
std::mutex m;
std::condition_variable cv;
int max_size = 10;
bool working = true;
auto producer = [&]() {
while (working) {
std::unique_lock<std::mutex> lock(m);
std::chrono::milliseconds t(200);
auto predicate = [&que, &max_size]() {
if (que.size() < max_size) {
std::cout << "T";
return true;
}
std::cout << "F";
return false;
};
if (cv.wait_for(lock, t, predicate)) {
std::cout << "+";
std::this_thread::sleep_for(std::chrono::milliseconds(50));
que.push(1);
lock.unlock();
std::cout << "N";
cv.notify_one();
}
else {
//std::cout << "P";
}
}
};
auto consumer = [&]() {
while (working) {
std::unique_lock<std::mutex> lock(m);
std::chrono::milliseconds t(200);
auto predicate = [&que]() {
if (!que.empty()) {
std::cout << "t";
return true;
}
else {
std::cout << "f";
return false;
};
};
//cv.wait(lock, predicate);
//std::cout << "-";
//std::this_thread::sleep_for(std::chrono::milliseconds(50));
//que.pop();
//lock.unlock();
//std::cout << "n";
//cv.notify_one();
if (cv.wait_for(lock, t, predicate)) {
std::cout << "-";
std::this_thread::sleep_for(std::chrono::milliseconds(50));
que.pop();
lock.unlock();
std::cout << "n";
cv.notify_one();
}
else {
std::cout << "o";
}
}
};
int nprod = 100;
int ncons = 1;
std::cout << "Start producers" << std::endl;
std::vector<std::thread> threads;
for (int i = 0; i < nprod; ++i) {
threads.emplace_back(producer);
}
std::cout << "Start consumers" << std::endl;
for (int i = 0; i < ncons; ++i) {
threads.emplace_back(consumer);
}
std::this_thread::sleep_for(std::chrono::seconds(20));
std::cout << "Stop working" << std::endl;
working = false;
for (auto& th : threads) {
if (th.joinable()) {
th.join();
}
}
return;
}
};
个字符
产出:
Start producers
T+Start consumers
NT+NT+NT+NT+NT+NT+NT+NT+NT+NFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFt-nT+NFFFFFFFFFFt-nT+NFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
型
所以当队列满了,在最后一个元素被添加并且通知被发送之后,第一个生产者的 predicate 失败(输出中的第一个F)。然后我期望它等待另一个通知(N),这不会发生,但是会发生很多检查。这意味着sb触发它,这是虚假唤醒吗?
我已经尝试了问题中提供的代码,它工作,但我不知道为什么。
1条答案
按热度按时间3phpmpom1#
“FFFFFFFFFF”发生在队列满了之后。下面是我的想法:
你的
producer
函数调用cv.wait_for(lock, t, predicate)
,其中t
是一个 timeout 值。一旦队列满了,你所有的生产者线程都会 * 超时 * 等待一个永远不会到来的通知。每次wait_for
调用超时,生产者除了回到循环的顶部并再次调用wait_for
之外,什么也不做。对wait_for
的每次调用将测试给定的predicate
一次,每次打印一个“F”,因为队列仍然是满的。有了99个生产者线程和200毫秒的超时,我估计你应该每秒看到大约495个“F“,偶尔会有一个“t-nT+N”,每次消费者线程得到一个幸运的通知。
我添加了一个
main
例程,然后运行程序:字符串
我实际上看到的是长时间的“FFFFFFF”,偶尔会出现“t-nt-nt-n......”(通常重复9或10次),然后是相等数量的“T+N”,然后回到“FFFFFFF”。
但是等等!(我听到你哭了。)为什么当消费者的
wait_for(...)
呼叫超时时,每秒没有打印五个“o“?我不知道,但也许消费者正渴望得到
lock
。记住,即使超时后,wait_for(lock,...)
也不能返回,直到它重新获得lock
,并且有99个其他线程都在为它的所有权而互相争斗。通常,当消费者 * 确实 * 开始运行时,它几乎总是在任何生产者被允许再次运行之前耗尽整个队列,这一事实强烈暗示您已经遇到了饥饿问题。
†如果你取消注解
std::cout << "P"
行,它会做一些事情。