c++ 原子计数器标准::内存顺序

8ehkhllq  于 2022-12-20  发布在  其他
关注(0)|答案(1)|浏览(210)

我知道这个问题已经被问过很多次了。我已经读过和看过很多,但是仍然不能完全理解如何正确地在原子变量上使用内存顺序类型。我想在分离线程上使用原子计数器和条件变量来等待。每个原子操作应该如何编写?

#include <random>
#include <thread>
#include <condition_variable>

int blockAndGet(int a, int b);

int main()
{
    std::jthread producer{[](std::stop_token st)
    {
        std::mutex m;
        std::condition_variable cond;
        std::atomic<std::uint16_t> counter;

        while (!st.stop_requested()) {
            // Get some data to handle(sometimes slower, generally faster than handling it)
            int data = blockAndGet(1, 40);

            // #1 - I'm not interested with the result, are workers too?
            counter.fetch_add(1, std::memory_order::relaxed);

            std::thread{[&](std::stop_token st, int data)
            {
                blockAndGet(30, 100); // Do some work

                if (!st.stop_requested())
                    // #2 - I'm not interested with the result too but do other workers care about?
                    counter.fetch_sub(1, std::memory_order::release);
                else {
                    std::lock_guard l{m};
                    // #3 - If I'm the one who has to signal the producer to wake up I had to get the newest value.
                    if (counter.fetch_sub(1, std::memory_order::acq_rel) == 1)
                        cond.notify_one();
                }    
            }, st, data}
            .detach();
        }

        std::unique_lock lock{m};
        // #4 I must be sure all workers finished their execution
        cond.wait(lock, [&]{ return counter.load(std::memory_order::acquire) == 0; });
    }};

    // Should spawn ~100 worker thread before stop signal
    std::this_thread::sleep_for(std::chrono::seconds{2});
    return 0;
}

int blockAndGet(int a, int b) {
    // You know what this is intended for
    // No need to think about the random_device's surprises within the scope of this question
    thread_local std::mt19937 engine(std::random_device{}());
    std::uniform_int_distribution<int> dist{a, b};
    int val{dist(engine)};
    std::this_thread::sleep_for(std::chrono::milliseconds{val});
    return val;
}

https://godbolt.org/z/f46YMcPTj
我把我的想法写在评论里了。你能看到任何潜在的bug或改进吗?
编辑:
看起来我做的抽象让人们感到困惑和分心。对此我很抱歉。你可以把它当作一个服务器程序。First blockAndGet只是一个接受()系统调用并返回一个文件描述符。使用该整数创建一个新线程,并在其中处理客户端(第二个blockAndGet)。当它完成时,如果停止没有被请求线程计数器递减。如果停止被请求那么生产者(侦听器)将等待所有工作线程完成,这也意味着最后一个工作线程负责额外唤醒生产者。
我不希望监听线程忙于等待,这就是为什么有condition_variable和mutex的原因。我怀疑atomic::wait会这样做。
在这个例子中,stop信号来自jthread的析构函数main()函数的末尾,但它也可能来自另一个线程(就像一个管理器,只为某个端口和自定义协议上的特权管理员服务)。
当然,这可以用更简单的方法来实现,但我认为利用c++的这个特性是一个很好的例子。
我希望这篇文章能帮助你认识到这个问题。谢谢你的时间和回应。

e0bqpujr

e0bqpujr1#

TLDR(如果您要对RCU进行编程,则需要阅读标准)。
这需要理解缓存一致性、CPU内存访问顺序、无序执行和推测执行,否则只需使用seq_cst,或者如果只是计数,也可以放松。
放松--就像你认为的那样工作,所有的内存操作都应该工作,所以如果你在不同的线程中数到10000,它最终会数到10000,而不是9996,就像我在没有atomic的测试中所做的那样。
获取-释放,在获取之后做一些事情,在我们释放的时候停止。我们确信我们在开始计算的时候有一个值,当我们完成的时候更新到其他值。
acq_rel -我们之前完成了所有的内存访问,然后做我们的事情,然后下一个事情会发生。
seq_cst -我们完成之前所有的mem访问,然后做我们的事情,然后下一个事情可以发生,我们有总排序。如果你想确保正确性,而不是真正考虑它,这是非常好的,但有点慢(在一些架构上非常慢)。
consume -我们没有一个总的内存顺序,所以编译器需要在这里帮助我们(我想是Arm、Itanium、PowerPC和Alpha)。大多数编译器只是想...让我们给予它们一个seq_cst,它们就会被服务。
为什么要这么多东西?(凭记忆,所以吃这个要加一勺盐)

  • Dec Alpha:仅在请求时进行缓存一致性和内存排序...
  • PowerPC:你知道你做对了什么吗?这样就不需要完美的内存排序了,只要在需要的时候就可以了。
  • Arm:哦,我有更多的内核,我最好做一些一致性,我们将使用C++11的定义修复Arm 8中的所有内容!!!
  • x86 -我们的用户希望我们帮助他们不要搬起石头砸自己的脚,所以我们要求总订购!!!你想搬起石头砸自己的脚吗?让我们添加一些说明来做到这一点!!!

排序方式理论上使最快的执行变为最慢的,但也考虑到维护正确性的难度。如果您不对每个写入进行缓存一致性检查,您可能会更快一些,并获得更有趣的结果...
如果我们没有这些,我们需要知道编译器和CPU是如何重新排序所有内存访问和每个CPU所需的所有额外指令的。

相关问题