c++ 如何减少在两个线程之间共享一个小结构对(编写器端)性能的影响?

flseospp  于 2022-11-27  发布在  其他
关注(0)|答案(1)|浏览(116)

我正在给一个已有的求值器添加一个分析器。求值器是用c++实现的(std 17 x86 msvc),我需要存储3个int 32、1个uint 16和1个uint 8来表示执行帧的上下文。由于这是解释代码,我无法编写任何内核级驱动程序来获取快照,因此我们必须将其添加到eval循环中。与所有分析一样,我们希望避免减慢实际的计算速度。到目前为止,对于这个问题的动机/上下文。
在高级别上,行为如下:在赋值器执行每条指令之前,它调用一个小函数(“嘿,我现在就在这里”),采样刻画器每X毫秒只对该帧位置感兴趣一次(或μs,但这很可能会推动它)。因此,我们需要2个线程(现在我们先忽略序列化)。我们有一个非常频繁的writer(一个单一线程)和一个不常使用的读取器(一个不同的单一线程)。我们希望将写入大小的性能损失降到最低。注意,有时写入器会变慢,因此可能会在一个帧上停留几秒钟,我们希望能够观察到这一点。
为了解决这个问题,我编写了一个小型基准测试设置。

#include <memory>
#include <chrono>
#include <string>
#include <iostream>
#include <thread>
#include <immintrin.h>
#include <atomic>
#include <cstring>

using namespace std;

typedef struct frame {
    int32_t a;
    int32_t b;
    uint16_t c;
    uint8_t d;
} frame;

class ProfilerBase {
public:
    virtual void EnterFrame(int32_t a, int32_t b, uint16_t c, uint8_t d) = 0;
    virtual void Stop() = 0;
    virtual ~ProfilerBase() {}
    virtual string Name() = 0;
};

class NoOp : public ProfilerBase {
public:
    void EnterFrame(int32_t a, int32_t b, uint16_t c, uint8_t d) override {}
    void Stop() override {}
    string Name() override { return "NoOp"; }
};

class JustStore : public ProfilerBase {
private:
    frame _current = { 0 };
public:
    string Name() override { return "OnlyStoreInMember"; }
    void EnterFrame(int32_t a, int32_t b, uint16_t c, uint8_t d) override {
        _current.a = a;
        _current.b = b;
        _current.c = c;
        _current.d = d;
    }
    void Stop() override {
        if ((_current.a + _current.b + _current.c + _current.d) == _current.a) {
            cout << "Make sure optimizer keeps the record around";
        }
    }
};

class WithSampler : public ProfilerBase {
private:
    unique_ptr<thread> _sampling;
    atomic<bool> _keepSampling = true;
protected:
    const chrono::milliseconds _sampleEvery;
    virtual void _snap() = 0;
    virtual string _subname() = 0;
public:
    WithSampler(chrono::milliseconds sampleEvery): _sampleEvery(sampleEvery) {
        _sampling = make_unique<thread>(&WithSampler::_sampler, this);
    }
    void Stop() override {
        _keepSampling = false;
        _sampling->join();
    }

    string Name() override { 
        return _subname() + to_string(_sampleEvery.count()) + "ms"; 
    }
private:
    void _sampler() {
        auto nextTick = chrono::steady_clock::now();
        while (_keepSampling)
        {
            const auto sleepTime = nextTick - chrono::steady_clock::now();
            if (sleepTime > chrono::milliseconds(0))
            {
                this_thread::sleep_for(sleepTime);
            }
            _snap();
            nextTick += _sampleEvery;
        }

    }
};

struct checkedFrame {
    frame actual;
    int32_t check;
};

// https://rigtorp.se/spinlock/
struct spinlock {
    std::atomic<bool> lock_ = { 0 };

    void lock() noexcept {
        for (;;) {
            // Optimistically assume the lock is free on the first try
            if (!lock_.exchange(true, std::memory_order_acquire)) {
                return;
            }
            // Wait for lock to be released without generating cache misses
            while (lock_.load(std::memory_order_relaxed)) {
                // Issue X86 PAUSE or ARM YIELD instruction to reduce contention between
                // hyper-threads
                _mm_pause(); 
            }
        }
    }

    void unlock() noexcept {
        lock_.store(false, std::memory_order_release);
    }
};

class Spinlock : public WithSampler {
private:
    spinlock _loc;
    checkedFrame _current;
public:
    using WithSampler::WithSampler;

    string _subname() override { return "Spinlock"; }

    void EnterFrame(int32_t a, int32_t b, uint16_t c, uint8_t d) override {
        _loc.lock();
        _current.actual.a = a;
        _current.actual.b = b;
        _current.actual.c = c;
        _current.actual.d = d;
        _current.check = a + b + c + d;
        _loc.unlock();
    }

protected:
    void _snap() override {
        _loc.lock();
        auto snap = _current;
        _loc.unlock();
        if ((snap.actual.a + snap.actual.b + snap.actual.c + snap.actual.d) != snap.check) {
            cout << "Corrupted snap!!\n";
        }
    }
};

static constexpr int32_t LOOP_MAX = 1000 * 1000 * 1000;

int measure(unique_ptr<ProfilerBase> profiler) {
    cout << "Running profiler: " << profiler->Name() << "\n ";
    cout << "\tProgress: ";
    auto start_time = std::chrono::steady_clock::now();
    int r = 0;
    for (int32_t x = 0; x < LOOP_MAX; x++)
    {
        profiler->EnterFrame(x, x + x, x & 0xFFFF, x & 0xFF);
        r += x;
        if (x % (LOOP_MAX / 1000) == 0)
        {
            this_thread::sleep_for(chrono::nanoseconds(10)); // simulat that sometimes we do other stuff not like storing
        }
        if (x % (LOOP_MAX / 10) == 0)
        {
            cout << static_cast<int>((static_cast<double>(x) / LOOP_MAX) * 10);
        }
        if (x % 1000 == 0) {
            _mm_pause(); // give the other threads some time
        }
        if (x == (LOOP_MAX / 2)) {
            // the first half of the loop we take as warmup
            // so now we take the actual time
            start_time = std::chrono::steady_clock::now();
        }
    }
    cout << "\n";
    const auto done_calc = std::chrono::steady_clock::now();
    profiler->Stop();
    const auto done_writing = std::chrono::steady_clock::now();
    cout << "\tcalc: " << chrono::duration_cast<chrono::milliseconds>(done_calc - start_time).count() << "ms\n";
    cout << "\tflush: " << chrono::duration_cast<chrono::milliseconds>(done_writing - done_calc).count() << "ms\n";
    return r;
}

int main() {
    measure(make_unique<NoOp>());
    measure(make_unique<JustStore>());
    measure(make_unique<Spinlock>(chrono::milliseconds(1)));
    measure(make_unique<Spinlock>(chrono::milliseconds(10)));
    return 0;
}

在我的机器上以x86模式使用/O2编译此代码,得到以下输出:

Running profiler: NoOp
        Progress: 0123456789
        calc: 1410ms
        flush: 0ms
Running profiler: OnlyStoreInMember
        Progress: 0123456789
        calc: 1368ms
        flush: 0ms
Running profiler: Spinlock1ms
        Progress: 0123456789
        calc: 3952ms
        flush: 4ms
Running profiler: Spinlock10ms
        Progress: 0123456789
        calc: 3985ms
        flush: 11ms

(虽然这是在VS 2022中用msvc编译的,但我认为g++ --std=c++17 -O2 -m32 -pthread -o testing small-test-case.cpp应该足够接近)。
在这里,我们看到基于Spinlock的采样器比没有任何开销的采样器增加了约2.5倍的开销。我已经分析了它,正如预期的那样,很多时间花在获取锁上(在大多数情况下,不需要锁)。

djmepvbi

djmepvbi1#

我的一个想法是做一个循环缓冲区,并且只在它的索引上同步,这样在大多数情况下,读取器和写入器都在内存的一个单独的部分。我的理解是release & acquire对也会确保正确地刷新_frames内存块的每个cpu缓冲区,但这可能是我这边的一个错误。

static constexpr unsigned int BUFFER_SIZE = 1024;

class CircularBuffer : public WithSampler {
private:
    atomic<unsigned int> _index = 0;
    checkedFrame _frames[BUFFER_SIZE];
public:
    CircularBuffer(chrono::milliseconds sampleEvery) : WithSampler(sampleEvery) {
        memset(_frames, 0, sizeof(_frames));
    }

    string _subname() override { return "CircularBuffer"; }

    void EnterFrame(int32_t a, int32_t b, uint16_t c, uint8_t d) override {
        auto ix = _index.load(memory_order_relaxed) + 1; // we are the only one mutating the index
        auto& rec = _frames[ix % BUFFER_SIZE];
        rec.actual.a = a;
        rec.actual.b = b;
        rec.actual.c = c;
        rec.actual.d = d;
        rec.check = ix;
        _index.store(ix, memory_order_release);
    }

protected:
    void _snap() override {
        auto ix = _index.load(memory_order_acquire);
        auto snap = _frames[ix % BUFFER_SIZE];
        if (snap.check != ix && ix != 0) {
            // we ignore the ix == 0 case, as that might be the sampler starting to soon
            cout << "Corrupted snap!!\n";
        }
    }
};

这就快了不少:

Running profiler: NoOp
        Progress: 0123456789
        calc: 1504ms
        flush: 0ms
Running profiler: OnlyStoreInMember
        Progress: 0123456789
        calc: 1594ms
        flush: 0ms
Running profiler: Spinlock1ms
        Progress: 0123456789
        calc: 4032ms
        flush: 2ms
Running profiler: Spinlock10ms
        Progress: 0123456789
        calc: 4045ms
        flush: 16ms
Running profiler: CircularBuffer1ms
        Progress: 0123456789
        calc: 1594ms
        flush: 13ms
Running profiler: CircularBuffer10ms
        Progress: 0123456789
        calc: 1575ms
        flush: 10ms

但是@PeterCordes在我的original question中指出,这不是解释C++内存模型的正确方式。

相关问题