单生产者多消费者C++

m3eecexj  于 2023-06-25  发布在  其他
关注(0)|答案(1)|浏览(198)

我正在尝试实现一个程序,它由一个生产者线程和多个消费者线程组成,生产者线程向std::vector添加对象,消费者线程从同一个向量中删除对象,直到它为空。我使用一个condition_variable来让消费者知道新的对象已经产生了。问题是,在最后一次迭代中(存储中剩下n个项目,其中n是消费者线程的数量),消费者线程在等待一个条件变量时卡住了,即使该条件不应该被满足(storage不是空的->至少这是我在一些调试日志中发现的)。

  1. #include <chrono>
  2. #include <condition_variable>
  3. #include <functional>
  4. #include <iostream>
  5. #include <mutex>
  6. #include <thread>
  7. #include <vector>
  8. #define CONSUMER_COUNT 4
  9. #define STORAGE_SIZE CONSUMER_COUNT * 10000
  10. class Foo {
  11. private:
  12. int _id;
  13. public:
  14. Foo(int id) : _id(id) {}
  15. int getId() const { return _id; }
  16. };
  17. std::vector<Foo> storage;
  18. std::mutex storageMutex;
  19. std::condition_variable storageCV;
  20. void Producer(int limit) {
  21. for (int i = 0; i < limit; ++i) {
  22. std::lock_guard<std::mutex> lg{storageMutex};
  23. storage.emplace_back(Foo(i));
  24. storageCV.notify_one();
  25. }
  26. storageCV.notify_all();
  27. }
  28. void Consumer(int id) {
  29. while (true) {
  30. std::unique_lock<std::mutex> ul{storageMutex};
  31. storageCV.wait(ul, []() { return !storage.empty(); });
  32. if (storage.empty())
  33. return;
  34. storage.pop_back();
  35. }
  36. }
  37. int main(int argc, char *argv[]) {
  38. std::vector<std::thread> consumers;
  39. consumers.reserve(CONSUMER_COUNT);
  40. auto producer = std::thread(Producer, STORAGE_SIZE);
  41. for (int i = 0; i < CONSUMER_COUNT; ++i) {
  42. consumers.emplace_back(std::thread(Consumer, i));
  43. }
  44. producer.join();
  45. for (auto &consumer : consumers)
  46. consumer.join();
  47. storageCV.notify_all();
  48. std::cout << "[MAIN] Done!" << std::endl;
  49. std::cout << "Storage is left with " << storage.size() << " items!"
  50. << std::endl;
  51. return 0;
  52. }

我尝试添加一个简单的布尔标志,一旦生产者完成添加所有项目,它将切换,但我不确定(逻辑上)我应该如何在消费者线程中设置条件。简单地在当前检查之上添加该检查是不够的,因为这样即使存储中仍然有一些项,线程也可能停止运行。

cbjzeqam

cbjzeqam1#

正如您所发现的,问题在于消费者被卡住了。问题就在这里:

  1. storageCV.wait(ul, []() { return !storage.empty(); });
  2. if (storage.empty())
  3. return;

请注意,std::condition::variable::wait(lock, condition)只是一个方便的函数,这段代码相当于:

  1. while (storage.empty())
  2. storageCV.wait();
  3. if (storage.empty())
  4. return;

很容易看出条件return是毫无意义的,因为当我们到达它时,条件总是为假。条件必须在循环内部才能产生任何效果。
这些调整是必要的:
1.我们必须修改这个循环,这样消费者就不会无限等待了。
1.一旦处理完所有元素,生产者线程必须唤醒当前等待的消费者。

  • 一个消费者线程当前可能是:
  • while (storage.empty())之前,或
  • storageCV.wait()之前

...所以它不会被notify_all()唤醒,因为它还没有等待,而是即将等待。我们必须重复.notify_all()来唤醒每个线程,即使是那些还没有等待的线程。

Producer更改

  1. // We create an atomic counter to keep track of how many
  2. // consumers remain.
  3. // When this hits zero, the producer can stop notifying the
  4. // remaining consumers.
  5. std::atomic_int consumer_stop_counter;
  6. // The producer needs to receive the number of consumers from the main thread.
  7. // It wouldn't be safe to communicate this via an atomic counter which
  8. // is modified by the consumers, because it's possible that the producer
  9. // is done pushing all of its items to storage before any consumers
  10. // "register themselves".
  11. // In such a scenario, the producer would think that there are no
  12. // consumers that need to be notified anymore and all works has been completed.
  13. void Producer(int limit, int consumers) {
  14. for (int i = 0; i < limit; ++i) {
  15. std::lock_guard<std::mutex> lg{storageMutex};
  16. storage.emplace_back(Foo(i));
  17. storageCV.notify_one();
  18. }
  19. consumer_stop_counter = 0;
  20. while (consumer_stop_counter < consumers) {
  21. storageCV.notify_all();
  22. // optional: mitigate effects of busy wait
  23. std::this_thread::yield();
  24. }
  25. }

消费者更改

  1. void Consumer(int id) {
  2. while (true) {
  3. std::unique_lock<std::mutex> ul{storageMutex};
  4. // Transform cv waiting into a regular while-loop.
  5. while (storage.empty()) {
  6. storageCV.wait(ul);
  7. // If we got woken up and the storage is empty, that means that we exit.
  8. // We hold the lock after calling .wait(), so it is safe to access the storage.
  9. if (storage.empty()) {
  10. consumer_stop_counter++;
  11. return;
  12. }
  13. }
  14. storage.pop_back();
  15. }
  16. }
展开查看全部

相关问题