单生产者多消费者C++

m3eecexj  于 2023-06-25  发布在  其他
关注(0)|答案(1)|浏览(149)

我正在尝试实现一个程序,它由一个生产者线程和多个消费者线程组成,生产者线程向std::vector添加对象,消费者线程从同一个向量中删除对象,直到它为空。我使用一个condition_variable来让消费者知道新的对象已经产生了。问题是,在最后一次迭代中(存储中剩下n个项目,其中n是消费者线程的数量),消费者线程在等待一个条件变量时卡住了,即使该条件不应该被满足(storage不是空的->至少这是我在一些调试日志中发现的)。

#include <chrono>
#include <condition_variable>
#include <functional>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

#define CONSUMER_COUNT 4
#define STORAGE_SIZE CONSUMER_COUNT * 10000

class Foo {
private:
  int _id;

public:
  Foo(int id) : _id(id) {}
  int getId() const { return _id; }
};

std::vector<Foo> storage;
std::mutex storageMutex;
std::condition_variable storageCV;

void Producer(int limit) {
  for (int i = 0; i < limit; ++i) {
    std::lock_guard<std::mutex> lg{storageMutex};
    storage.emplace_back(Foo(i));
    storageCV.notify_one();
  }
  storageCV.notify_all();
}

void Consumer(int id) {
  while (true) {
    std::unique_lock<std::mutex> ul{storageMutex};
    storageCV.wait(ul, []() { return !storage.empty(); });
    if (storage.empty())
      return;
    storage.pop_back();
  }
}

int main(int argc, char *argv[]) {
  std::vector<std::thread> consumers;
  consumers.reserve(CONSUMER_COUNT);

  auto producer = std::thread(Producer, STORAGE_SIZE);

  for (int i = 0; i < CONSUMER_COUNT; ++i) {
    consumers.emplace_back(std::thread(Consumer, i));
  }

  producer.join();
  for (auto &consumer : consumers)
    consumer.join();

  storageCV.notify_all();

  std::cout << "[MAIN] Done!" << std::endl;
  std::cout << "Storage is left with " << storage.size() << " items!"
            << std::endl;

  return 0;
}

我尝试添加一个简单的布尔标志,一旦生产者完成添加所有项目,它将切换,但我不确定(逻辑上)我应该如何在消费者线程中设置条件。简单地在当前检查之上添加该检查是不够的,因为这样即使存储中仍然有一些项,线程也可能停止运行。

cbjzeqam

cbjzeqam1#

正如您所发现的,问题在于消费者被卡住了。问题就在这里:

storageCV.wait(ul, []() { return !storage.empty(); });
if (storage.empty())
    return;

请注意,std::condition::variable::wait(lock, condition)只是一个方便的函数,这段代码相当于:

while (storage.empty())
    storageCV.wait();
if (storage.empty())
    return;

很容易看出条件return是毫无意义的,因为当我们到达它时,条件总是为假。条件必须在循环内部才能产生任何效果。
这些调整是必要的:
1.我们必须修改这个循环,这样消费者就不会无限等待了。
1.一旦处理完所有元素,生产者线程必须唤醒当前等待的消费者。

  • 一个消费者线程当前可能是:
  • while (storage.empty())之前,或
  • storageCV.wait()之前

...所以它不会被notify_all()唤醒,因为它还没有等待,而是即将等待。我们必须重复.notify_all()来唤醒每个线程,即使是那些还没有等待的线程。

Producer更改

// We create an atomic counter to keep track of how many
// consumers remain.
// When this hits zero, the producer can stop notifying the
// remaining consumers.
std::atomic_int consumer_stop_counter;

// The producer needs to receive the number of consumers from the main thread.
// It wouldn't be safe to communicate this via an atomic counter which
// is modified by the consumers, because it's possible that the producer
// is done pushing all of its items to storage before any consumers
// "register themselves".
// In such a scenario, the producer would think that there are no
// consumers that need to be notified anymore and all works has been completed.
void Producer(int limit, int consumers) {
  for (int i = 0; i < limit; ++i) {
    std::lock_guard<std::mutex> lg{storageMutex};
    storage.emplace_back(Foo(i));
    storageCV.notify_one();
  }

  consumer_stop_counter = 0;
  while (consumer_stop_counter < consumers) {
    storageCV.notify_all();

    // optional: mitigate effects of busy wait
    std::this_thread::yield();
  }
}

消费者更改

void Consumer(int id) {
  while (true) {
    std::unique_lock<std::mutex> ul{storageMutex};
    // Transform cv waiting into a regular while-loop.
    while (storage.empty()) {
      storageCV.wait(ul);

      // If we got woken up and the storage is empty, that means that we exit.
      // We hold the lock after calling .wait(), so it is safe to access the storage.
      if (storage.empty()) {
        consumer_stop_counter++;
        return;
      }
    }

    storage.pop_back();
  }
}

相关问题