c++ 关于Leetcode 1117的无锁与互斥解决方案的问题:H2O大楼

xlpyo6sf  于 2023-05-24  发布在  其他
关注(0)|答案(1)|浏览(212)

我正在练习C++并发性,并学习使用互斥锁与无锁原子的实际应用。我正在处理Leetcode中的以下问题:https://leetcode.com/problems/building-h2o/description/
基本上,不同的线程负责释放氢原子或氧原子,并且您希望同步,以便每个连续的3个原子组由2个氢和1个氧原子组成。
我用两种方法实现了这个解决方案,一种使用互斥锁,另一种使用原子。
互斥解决方案:

class H2O {
    mutex m;
    condition_variable m_cond;
    int idx;
public:
    H2O() : idx(0) {
    }

    void hydrogen(function<void()> releaseHydrogen) {
        unique_lock<mutex> mlock(m);
        m_cond.wait(mlock, [this](){return idx != 0;});
        // releaseHydrogen() outputs "H". Do not change or remove this line.
        releaseHydrogen();
        idx = (idx+1) % 3;
        m_cond.notify_all();
    }

    void oxygen(function<void()> releaseOxygen) {
        unique_lock<mutex> mlock(m);
        m_cond.wait(mlock, [this](){return idx == 0;});
        // releaseOxygen() outputs "O". Do not change or remove this line.
        releaseOxygen();
        idx = (idx+1)%3;
        m_cond.notify_all();
    }
};

原子溶液:

class H2O {
    /* state is one of {00, 01, 10, 20, 21} where the first digit represents the number of hydrogen atoms acquires and the second digit is the number of oxygen atoms acquired */
    atomic<int> state_{0};
    /* stores the number of atoms that we have finished processing, increments from 0 to 3 and resets back to 3*/
    atomic<int> completedCount_{0};

public:
    H2O() {}

    void acquireHydrogen(){
        int curState = state_.load();
        do{
            while(curState/10 == 2){
                // full, 2 hydrogen atoms have already been acquired
                curState = state_.load();
            }
        } while(!state_.compare_exchange_weak(curState, curState + 10));
            // modify the state to show that 1 more hydrogen has been acquired
    }

    void acquireOxygen(){
        int curState = state_.load();
        do{
            while(curState % 10 == 1){
                // full, 1 oxygen has already been acquired
                curState = state_.load();
            }
        } while(!state_.compare_exchange_weak(curState, curState + 1));
            // modify the state to show that 1 oxygen has been acquired
    }

    void complete(){
        // increment count of completed
        completedCount_.fetch_add(1);
        int expected = 3;
        /* The thread that resets the completed count back to 0 is responsible for resetting the acquired state as well.
        If more than 1 acquired thread tries to reset state, in between 2 of these resets a new set of atoms might already be acquired which we don't want to write over. */
        if(completedCount_.compare_exchange_strong(expected, 0)){
            state_.store(0);
        }
    }
    void hydrogen(function<void()> releaseHydrogen) {
        acquireHydrogen();
        releaseHydrogen(); // prints "H"
        complete();
    }

    void oxygen(function<void()> releaseOxygen) {
        acquireOxygen();
        releaseOxygen(); // prints "O"
        complete();
    }
};

当我提交给Leetcode时,使用互斥锁的代码要简单得多,并且平均运行速度比使用原子的代码快约20倍。我试图更好地理解什么时候使用锁/互斥锁,什么时候更喜欢原子/无锁。我有以下问题:
1.在这种情况下,我不知道Leetcode服务器实际上是如何运行线程来执行测试的,也不知道它有多少处理器/内核可用。我的理解是,使用原子,您应该获得更好的吞吐量,因为更少的线程“等待”获得锁。然而,我猜在这个问题中,因为只能有连续的2个氢原子和1个氧原子被释放,如果有许多线程正在运行,那么最多只有3个线程可以同时释放相应的原子。有没有一个例子,说明这个问题的原子解决方案何时可能比基于互斥锁的解决方案性能更高/更快?或者这是一个问题的例子,你会期望互斥锁更好地开始,一般来说?
1.有没有一种方法可以更有效地使用原子来编写解决方案?也许有些while循环和CAS操作是不需要的,或者可以用不同的方式来构造?
1.我还尝试为原子解决方案指定内存顺序,我将reads -> memory_order_acquire,writes -> memory_order_released和rmw -> memory_order_acq_rel。当我多次提交代码时,似乎放松内存顺序使代码平均快了1-2倍。一般来说,当使用原子编写代码时,您通常可以如上所述指定内存顺序吗?你如何决定是否需要所有原子操作之间的真正顺序一致性,而不是根据它是读、写还是rmw来放松语义?
我知道这是一篇很长的文章,但我真的很感激任何想法!

dzjeubhm

dzjeubhm1#

我不认为您可以在没有看到测试工具的情况下根据竞争测试来确定任何性能。这些结果似乎到处都是。
我做了一个简单的测试装置,简单地从两个不同的线程喷出氢气和氧气,比例正确。我的目标是隔离同步代码的性能和线程开销。
正如预期的那样,在这个问题上,std::atomicstd::mutex快得多(732us对16ms)。根据我的经验,std::atomic可能更快(更低的延迟),而std::mutex可能消耗更少的功率(有效的睡眠)。通常的警告适用于您实际上只需要测量您的用例,操作系统和硬件的性能。
对于std::atomic代码,我建议的一件事是在while循环中抛出std::this_thread::yield(),以便线程在无法前进时释放其时间片。如果有许多线程试图获取资源,这将减少线程争用。

示例代码

#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>

class H2O {
    std::mutex m;
    std::condition_variable m_cond;
    int idx;
public:
    H2O() : idx(0) {
    }

    void hydrogen(std::function<void()> releaseHydrogen) {
        std::unique_lock<std::mutex> mlock(m);
        m_cond.wait(mlock, [this](){return idx != 0;});
        // releaseHydrogen() outputs "H". Do not change or remove this line.
        releaseHydrogen();
        idx = (idx+1) % 3;
        m_cond.notify_all();
    }

    void oxygen(std::function<void()> releaseOxygen) {
        std::unique_lock<std::mutex> mlock(m);
        m_cond.wait(mlock, [this](){return idx == 0;});
        // releaseOxygen() outputs "O". Do not change or remove this line.
        releaseOxygen();
        idx = (idx+1)%3;
        m_cond.notify_all();
    }
};

class H2OAtomic {
    /* state is one of {00, 01, 10, 20, 21} where the first digit represents the number of hydrogen\
 atoms acquires and the second digit is the number of oxygen atoms acquired */
    std::atomic<int> state_{0};
    /* stores the number of atoms that we have finished processing, increments from 0 to 3 and rese\
ts back to 3*/
    std::atomic<int> completedCount_{0};

public:
    H2OAtomic() {}

    void acquireHydrogen(){
        int curState = state_.load();
        do{
            while(curState/10 == 2){
                // full, 2 hydrogen atoms have already been acquired
                curState = state_.load();
            }
        } while(!state_.compare_exchange_weak(curState, curState + 10));
            // modify the state to show that 1 more hydrogen has been acquired
    }

    void acquireOxygen(){
        int curState = state_.load();
        do{
            while(curState % 10 == 1){
                // full, 1 oxygen has already been acquired
                curState = state_.load();
            }
        } while(!state_.compare_exchange_weak(curState, curState + 1));
            // modify the state to show that 1 oxygen has been acquired
    }

    void complete(){
        // increment count of completed
        completedCount_.fetch_add(1);
        int expected = 3;
        /* The thread that resets the completed count back to 0 is responsible for resetting the ac\
quired state as well.
        If more than 1 acquired thread tries to reset state, in between 2 of these resets a new set\
 of atoms might already be acquired which we don't want to write over. */
        if(completedCount_.compare_exchange_strong(expected, 0)){
            state_.store(0);
        }
    }
    void hydrogen(std::function<void()> releaseHydrogen) {
        acquireHydrogen();
        releaseHydrogen(); // prints "H"
        complete();
    }

    void oxygen(std::function<void()> releaseOxygen) {
        acquireOxygen();
        releaseOxygen(); // prints "O"
        complete();
    }
};

template<class Clock = std::chrono::high_resolution_clock>
class StopWatch
{
public:
    StopWatch()
        : start_tp_(Clock::now())
        , last_tp_(start_tp_)
    { }

    auto now() const
    {
        std::atomic_thread_fence(std::memory_order_relaxed);
        auto current_tp = Clock::now();
        std::atomic_thread_fence(std::memory_order_relaxed);
        return current_tp;
    }

    auto mark()
    {
        auto current_tp = now();
        auto elapsed = current_tp - last_tp_;
        last_tp_ = current_tp;
        return elapsed;
    }

    template<class Units = typename Clock::duration>
    auto elapsed_duration()
    {
        auto elapsed = mark();
        return std::chrono::duration_cast<Units>(elapsed);
    }

    template<class Units = typename Clock::duration>
    auto elapsed_time()
    {
        auto elapsed = mark();
        return std::chrono::duration_cast<Units>(elapsed).count();
    }

private:
    typename Clock::time_point start_tp_;
    typename Clock::time_point last_tp_;
};

using std::cout, std::endl;

void release_hydrogen() {
    // cout << "H";
}

void release_oxygen() {
    // cout << "O";
}

template<class Builder, class T, class U>
void build_water(int n, T&& h, U&& o) {
    Builder builder;
    auto h0th = std::thread([&]() {
        for (auto i = 0; i < n; ++i)
            builder.hydrogen(h);
    });
    auto h1th = std::thread([&]() {
        for (auto i = 0; i < n; ++i)
            builder.hydrogen(h);
    });
    auto oth = std::thread([&]() {
        for (auto i = 0; i < n; ++i)
            builder.oxygen(o);
    });

    h0th.join();
    h1th.join();
    oth.join();
}

template<class Work>
void measure(std::string_view desc, Work&& work) {
    StopWatch timer;
    timer.mark();
    work();
    auto n = timer.elapsed_duration<std::chrono::microseconds>().count();
    cout << desc << " : " << n << endl;
}

int main(int argc, const char *argv[]) {
    measure("mutex", [&]() {
        build_water<H2O>(3000, release_hydrogen, release_oxygen);
    });
    measure("atomic", [&]() {
        build_water<H2OAtomic>(3000, release_hydrogen, release_oxygen);
    });

    return 0;
}

输出

mutex : 16447
atomic : 732

相关问题