我正在尝试实现一个程序,它由一个生产者线程和多个消费者线程组成,生产者线程向std::vector
添加对象,消费者线程从同一个向量中删除对象,直到它为空。我使用一个condition_variable
来让消费者知道新的对象已经产生了。问题是,在最后一次迭代中(存储中剩下n个项目,其中n是消费者线程的数量),消费者线程在等待一个条件变量时卡住了,即使该条件不应该被满足(storage
不是空的->至少这是我在一些调试日志中发现的)。
#include <chrono>
#include <condition_variable>
#include <functional>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
#define CONSUMER_COUNT 4
#define STORAGE_SIZE CONSUMER_COUNT * 10000
class Foo {
private:
int _id;
public:
Foo(int id) : _id(id) {}
int getId() const { return _id; }
};
std::vector<Foo> storage;
std::mutex storageMutex;
std::condition_variable storageCV;
void Producer(int limit) {
for (int i = 0; i < limit; ++i) {
std::lock_guard<std::mutex> lg{storageMutex};
storage.emplace_back(Foo(i));
storageCV.notify_one();
}
storageCV.notify_all();
}
void Consumer(int id) {
while (true) {
std::unique_lock<std::mutex> ul{storageMutex};
storageCV.wait(ul, []() { return !storage.empty(); });
if (storage.empty())
return;
storage.pop_back();
}
}
int main(int argc, char *argv[]) {
std::vector<std::thread> consumers;
consumers.reserve(CONSUMER_COUNT);
auto producer = std::thread(Producer, STORAGE_SIZE);
for (int i = 0; i < CONSUMER_COUNT; ++i) {
consumers.emplace_back(std::thread(Consumer, i));
}
producer.join();
for (auto &consumer : consumers)
consumer.join();
storageCV.notify_all();
std::cout << "[MAIN] Done!" << std::endl;
std::cout << "Storage is left with " << storage.size() << " items!"
<< std::endl;
return 0;
}
我尝试添加一个简单的布尔标志,一旦生产者完成添加所有项目,它将切换,但我不确定(逻辑上)我应该如何在消费者线程中设置条件。简单地在当前检查之上添加该检查是不够的,因为这样即使存储中仍然有一些项,线程也可能停止运行。
1条答案
按热度按时间cbjzeqam1#
正如您所发现的,问题在于消费者被卡住了。问题就在这里:
请注意,
std::condition::variable::wait(lock, condition)
只是一个方便的函数,这段代码相当于:很容易看出条件
return
是毫无意义的,因为当我们到达它时,条件总是为假。条件必须在循环内部才能产生任何效果。这些调整是必要的:
1.我们必须修改这个循环,这样消费者就不会无限等待了。
1.一旦处理完所有元素,生产者线程必须唤醒当前等待的消费者。
while (storage.empty())
之前,或storageCV.wait()
之前...所以它不会被
notify_all()
唤醒,因为它还没有等待,而是即将等待。我们必须重复.notify_all()
来唤醒每个线程,即使是那些还没有等待的线程。Producer更改
消费者更改