我用C++实现了一个非常标准的单消费者多生产者模式,并增加了队列中任务数量的限制。
Worker在单独的线程上运行消息队列。任务从生产者发送到Worker。如果队列中已经有max_num_tasks_
,则生产者必须等待。
在VS2022中使用工具集v143为Windows x64编译。
下面的代码在Worker::Send()
中的cv_.wait(lock, [&] {return (mq.Size() < max_num_tasks_); });
语句处偶尔挂起。有人知道我做错了什么吗?
工人的实现
#pragma once
#include <queue>
#include <mutex>
#include <functional>
class Worker {
public:
~Worker() {
Send([this] {done = true; });
thd.join();
}
Worker(size_t max_num_tasks) : max_num_tasks_(max_num_tasks), done(false), thd([this] {
while (!done) {
mq.PopFront()();
cv_.notify_all();
}
})
{ }
void Send(std::function<void()>&& m) {
{
std::unique_lock<std::mutex> lock(m_);
cv_.wait(lock, [&] {return (mq.Size() < max_num_tasks_); });
}
mq.PushBack(std::move(m));
}
private:
bool done;
size_t max_num_tasks_;
ThreadSafeQueue<std::function<void()>> mq;
std::thread thd;
std::mutex m_;
std::condition_variable cv_;
};
线程安全队列的实现
#pragma once
#include <queue>
#include <mutex>
#include <functional>
template <typename T>
class ThreadSafeQueue {
public:
void PushBack(T&& val) {
{
std::unique_lock<std::mutex> lock(q_mutex);
q.push(std::move(val));
}
cv.notify_one();
}
void PushBack(const T& val) {
{
std::unique_lock<std::mutex> lock(q_mutex);
q.push(val);
}
cv.notify_one();
}
T PopFront() {
std::unique_lock<std::mutex> lock(q_mutex);
cv.wait(lock, [&] { return (!q.empty()); });
T v = q.front();
q.pop();
return std::move(v);
}
bool Empty() const {
std::unique_lock<std::mutex> lock(q_mutex);
return q.empty();
}
size_t Size() const {
std::unique_lock<std::mutex> lock(q_mutex);
return q.size();
}
private:
mutable std::mutex q_mutex;
std::condition_variable cv;
std::queue<T> q;
};
偶尔触发问题的单元测试示例:
TEST(WorkerTests, Compute_PI) {
std::atomic<double> value;
auto multiply_by_pi_over_four = [&value] {
int n = 750;
double v = 0;
for (int i = 0; i < n; i++) {
v += std::pow(-1, i) / (2 * i + 1);
}
value = value * v;
};
auto add_pi_over_four = [&value] {
int n = 750;
double v = 0;
for (int i = 0; i < n; i++) {
v += std::pow(-1, i) / (2 * i + 1);
}
value = value + v;
};
auto add_one = [&value] {
value = value + 1;
std::this_thread::sleep_for(std::chrono::milliseconds(3));
};
auto multiply_by_three = [&value] {
value = value * 3;
std::this_thread::sleep_for(std::chrono::milliseconds(1));
};
for (int i = 0; i < 100; i++) {
value = 0.0;
{
Worker worker(1);
worker.Send(add_one);
worker.Send(multiply_by_pi_over_four);
worker.Send(multiply_by_three);
worker.Send([]() {});
worker.Send(add_pi_over_four);
}
EXPECT_GE(3.15, value.load());
EXPECT_LE(3.14, value.load());
}
}
2条答案
按热度按时间8yparm6h1#
cv_
的同步在显示的代码中被破坏。wait()
执行的事件顺序如下:1.互斥体的初始条件是锁定的。
1.检查
wait()
条件。1.如果condition为false,则互斥体被自动解锁,并等待condition变量。
步骤3是原子的、不可分割的操作,但是步骤2***与步骤3***不可分割。这是一个独立的步骤。
所以:
步骤2发生。
return (mq.Size() < max_num_tasks_);
的计算结果为false。B. Worker线程唤醒并快速清空
mq
中的所有内容,每次都向条件变量发送信号,耗尽mq
直到它为空。另一个线程从休眠中醒来,转到第3步,解锁互斥锁并等待某人向条件变量发送信号。
没有任何东西会发出条件变量的信号。工作线程进入深度睡眠状态,而另一个执行线程在快速清空队列时重复地按响条件变量。
E.对
wait()
的调用现在等待条件变量的信号,它永远不会。基本上,这个
wait()
需要使用***相同的***条件变量和互斥锁,因为工作线程正在使用它来锁定它的线程。这种逻辑,wait()
的ing队列下降到其最大值以下。大小,需要移动到事件队列中,并使用相同的互斥体和条件变量。91zkwejq2#
我的第二次尝试如下:
然而,从性能的Angular 来看,使用两个cv来避免在Send()中不必要地唤醒所有消费者线程是有益的。但也许这在宏伟的计划中是微不足道的。..