C++线程互斥同步

0x6upsns  于 2023-05-02  发布在  其他
关注(0)|答案(2)|浏览(158)

我是并行编程的新手,我正在尝试在多线程上分解迭代矩阵计算:每次迭代由两个顺序作业A和B组成;它需要等待所有线程在启动作业B之前完成作业A。
我使用互斥锁在所有线程之间进行同步,下面是一个示例脚本。当我多次运行脚本时,输出会随机变化,所以我猜我搞砸了线程同步。有人能给点建议吗?谢谢!

static std::mutex mutex_A, mutex_B;
static std::condition_variable cv_A, cv_B;

int main()
{
    int Nthread=2;
    std::vector<std::thread> threads;
    threads.reserve(Nthread);
    for(int thread_id=0; thread_id < Nthread; thread_id++)
    {
        threads.emplace_back([](){
            for(int iter=0; iter<3000; iter++){
                
                // launch job A

                std::unique_lock<std::mutex> lock_A{mutex_A};
                cv_A.wait(lock_A, [=](){return true;});
                cv_A.notify_all();

                // launch job B

                std::unique_lock<std::mutex> lock_B{mutex_B};
                cv_B.wait(lock_B, [=](){return true;});
                cv_B.notify_all();
            }
        })
    }
    for(auto &thread: threads)
        thread.join();

    return 0;
}
gev0vcfq

gev0vcfq1#

我认为问题在于您通知了条件变量而没有释放锁--导致线程无限期地阻塞

static std::mutex mutex_A, mutex_B;
static std::condition_variable cv_A, cv_B;

int main()
{
    int Nthread = 2;
    std::vector<std::thread> threads;
    std::atomic<int> count_A(0);
    std::atomic<int> count_B(0);

    for(int thread_id = 0; thread_id < Nthread; thread_id++)
    {
        threads.emplace_back([&](){
            for(int iter = 0; iter < 3000; iter++){

                // launch job A
                {
                    std::unique_lock<std::mutex> lock_A(mutex_A);
                    // Wait for all threads to finish job A
                    if(++count_A == Nthread) {
                        count_A.store(0);
                        cv_A.notify_all();
                    } else {
                        cv_A.wait(lock_A, [&](){return count_A == 0;});
                    }
                }

                // launch job B
                {
                    std::unique_lock<std::mutex> lock_B(mutex_B);
                    // Wait for all threads to finish job B
                    if(++count_B == Nthread) {
                        count_B.store(0);
                        cv_B.notify_all();
                    } else {
                        cv_B.wait(lock_B, [&](){return count_B == 0;});
                    }
                }
            }
        });
    }

    for(auto &thread : threads)
        thread.join();

    return 0;
}
  • 使用2个原子计数器(count_A + count_B)--跟踪有多少线程完成了作业A|B分别

线程完成作业A|B -递增相应的计数器+检查它是否是最后一个完成的线程(最后一个线程-重置计数器+通知所有等待相应条件变量的线程-否则-等待条件变量直到所有线程完成)

  • lambda函数--检查等待条件变量的条件(count_A == 0|count_B == 0确保所有线程都完成了相应的作业)

希望在这些更改之后,输出应该是确定性

2vuwiymt

2vuwiymt2#

不要尝试使用互斥体来协调不同线程的活动。只会让你发疯。使用互斥锁的唯一目的是防止不同的线程同时访问相同的共享数据。
如果你使用的是C++20,那么你可以使用std::barrier来实现你的目标。

int main()
{
    int Nthread=2;
    std::barrier barrier(Nthread);
    std::vector<std::thread> threads;
    threads.reserve(Nthread);
    for(int thread_id=0; thread_id < Nthread; thread_id++)
    {
        threads.emplace_back([](){
            for(int iter=0; iter<3000; iter++){
                
                jobA();
                barrier.arrive_and_wait();

                // No thread can get _past_ the barrier until Nthread 
                // threads have arrived _at_ the barrier (I.E., not until
                // all of the threads have completed jobA()).
                //
                // Once Nthread threads have arrived, the threads all are 
                // released to go on to perform jobB(), and the barrier is
                // automatically reset, ready to be used again.

                jobB();
                barrier.arrive_and_wait();

                // No thread can get past the barrier until all threads have
                // completed jobB();
            }
        });
    }
    ...
}

如果你必须使用比C20更早的C版本,那么你的下一个最佳选择是使用boost::barrier。如果你不能做到这一点,那么也许可以找到一个barrier的开源实现,并将源代码合并到你的程序中。

相关问题