为什么在生产者/消费者多线程C++程序中会随机发生死锁?

vvppvyoh  于 2023-02-14  发布在  其他
关注(0)|答案(2)|浏览(200)

在下面的代码中,我们的任务是使用多线程创建一个消费者/生产者程序,并找到一种防止死锁的方法。

#include <iostream>
#include <stack>
#include <time.h>
#include <mutex>
#include <condition_variable>
#include <random>
#include <thread>
#include <windows.h>
#include <algorithm>
#include <stack>

#define MAX 100
#define MIN 0

std::condition_variable cv;
std::mutex mtx;

class ProducerConsumer{

    public:

        // Creates a stack
        std::stack<int> stack_array;

        // Facilitates checking of values
        int producer_sum = 0;
        int consumer_sum = 0;

        // Generates random number
        int random_number_generator(){
            std::random_device rd;
            std::default_random_engine re{rd()};

            return re();
        }

        // Produces values and pushes them into the stack       
        void producer(){
            for (int i = 0 ; i < MAX ; i++){

                std::unique_lock<std::mutex> lg(mtx);

                if (stack_array.size() == MAX){
                    std::cout << "Producer Thread " << std::this_thread::get_id() << " is waiting" << std::endl;
                    cv.wait(lg);
                }

                int rand_num = random_number_generator()%10+1;
                int rand_num_sleep = random_number_generator()%100+1;
                
                std::cout << "Current Stack Size -  " << stack_array.size() <<  " | Pushing: " << rand_num << "... " << "and sleeping for: " << rand_num_sleep << " milliseconds" << std::endl;

                producer_sum += rand_num;

                stack_array.push(rand_num); 

                std::this_thread::sleep_for(std::chrono::milliseconds(rand_num_sleep)); 
                
                cv.notify_all();
                lg.unlock();
            }
        }

        void consumer(){
            for (int i = 0 ; i < MAX ; i++) {

                std::unique_lock<std::mutex> lg(mtx);

                if (stack_array.size() == MIN){
                    std::cout << "Consumer Thread " << std::this_thread::get_id() << " is waiting" << std::endl;
                    cv.wait(lg);
                }

                int rand_num_sleep = random_number_generator()%100+1;
            
                std::cout << "Current Stack Size - " << stack_array.size() << " | Popping: " << stack_array.top() << "... " << "and sleeping for: " << rand_num_sleep << " milliseconds" << std::endl;

                consumer_sum += stack_array.top();

                stack_array.pop();  

                std::this_thread::sleep_for(std::chrono::milliseconds(rand_num_sleep));
                
                cv.notify_all();
                lg.unlock();
            }

        }
};

int main(){

    ProducerConsumer pc;

    std::thread producer_one (&ProducerConsumer::producer, &pc);
    std::thread consumer_one (&ProducerConsumer::consumer, &pc);
    std::thread producer_two (&ProducerConsumer::producer, &pc);
    std::thread consumer_two (&ProducerConsumer::consumer, &pc);

    producer_one.join();
    producer_two.join();
    consumer_one.join();
    consumer_two.join();

    std::cout << "Producer sum: " << pc.producer_sum << std::endl;
    std::cout << "Consumer sum: " << pc.consumer_sum << std::endl;
    std::cout << "Stack size: " << pc.stack_array.size() << std::endl;
}

当尝试运行代码时,有时候代码会遇到死锁,因为我认为程序是从生产者线程开始的(这是可以理解的,因为我没有编写一种方法来绕过它)。但有时候,代码运行得很好。其他时候,程序会遇到分段错误或在执行过程中遇到死锁。
我假设我的互斥实现是条件变量是正确的。如果不是,谁能解释一下我的代码有什么问题吗?任何帮助都是感激的。非常感谢!

ddhy6vgd

ddhy6vgd1#

条件变量wait总是必须在循环中,因为它可能在另一个线程已经获取了该项时醒来,或者它可能无缘无故地醒来。
当然,如果没有循环,编译器不会告诉你错误,我的意思是没有循环的变量是无用的,因为它不可靠。
你的生产者使用notify_all,你有两个消费者,这意味着在添加一个项目后,你唤醒两个消费者(和两个生产者),他们都试图获得一个项目。当然,其中一个不会有一个项目-除非另一个生产者也生产了一个项目(偶然)在同一时间。
取代:

if (stack_array.size() == MAX){
                std::cout << "Producer Thread " << std::this_thread::get_id() << " is waiting" << std::endl;
                cv.wait(lg);
            }

您只需将if更改为while-在使用者中也是如此。
注意,即使你很小心地平衡了通知和消耗的数量,循环也是需要的,因为wait可能会无缘无故地返回。在一些操作系统上,有时候,wait不确定通知是否发生了,所以它无论如何都会停止等待。这被称为"伪唤醒"。然后你检查队列是否满了,如果仍然满了,你就再等一会儿。

xqkwcwgp

xqkwcwgp2#

你需要生产者等到某样东西被消费掉,消费者等到某样东西被生产出来。
现在,不管发生了什么,只要线程被唤醒,线程就会继续运行。例如,消费者在唤醒时不会检查堆栈上是否有任何内容,但另一个消费者可能已经获取了它休眠时生成的所有内容。
然后它试图弹出堆栈,程序就爆炸了。
您还需要防止“伪唤醒”。
幸运的是,有一个wait重载可以帮助处理这两个问题:

if (stack_array.size() == MAX){
    std::cout << "Producer Thread " << std::this_thread::get_id() << " is waiting" << std::endl;
    cv.wait(lg, [&stack_array]() { return stack_array.size() < MAX; });
}

以及

if (stack_array.size() == MIN){
    std::cout << "Consumer Thread " << std::this_thread::get_id() << " is waiting" << std::endl;
    cv.wait(lg, [&stack_array]() { return stack_array.size() > MIN; });
}

相关问题