我正在给一个已有的求值器添加一个分析器。求值器是用c++实现的(std 17 x86 msvc),我需要存储3个int 32、1个uint 16和1个uint 8来表示执行帧的上下文。由于这是解释代码,我无法编写任何内核级驱动程序来获取快照,因此我们必须将其添加到eval循环中。与所有分析一样,我们希望避免减慢实际的计算速度。到目前为止,对于这个问题的动机/上下文。
在高级别上,行为如下:在赋值器执行每条指令之前,它调用一个小函数(“嘿,我现在就在这里”),采样刻画器每X毫秒只对该帧位置感兴趣一次(或μs,但这很可能会推动它)。因此,我们需要2个线程(现在我们先忽略序列化)。我们有一个非常频繁的writer(一个单一线程)和一个不常使用的读取器(一个不同的单一线程)。我们希望将写入大小的性能损失降到最低。注意,有时写入器会变慢,因此可能会在一个帧上停留几秒钟,我们希望能够观察到这一点。
为了解决这个问题,我编写了一个小型基准测试设置。
#include <memory>
#include <chrono>
#include <string>
#include <iostream>
#include <thread>
#include <immintrin.h>
#include <atomic>
#include <cstring>
using namespace std;
typedef struct frame {
int32_t a;
int32_t b;
uint16_t c;
uint8_t d;
} frame;
class ProfilerBase {
public:
virtual void EnterFrame(int32_t a, int32_t b, uint16_t c, uint8_t d) = 0;
virtual void Stop() = 0;
virtual ~ProfilerBase() {}
virtual string Name() = 0;
};
class NoOp : public ProfilerBase {
public:
void EnterFrame(int32_t a, int32_t b, uint16_t c, uint8_t d) override {}
void Stop() override {}
string Name() override { return "NoOp"; }
};
class JustStore : public ProfilerBase {
private:
frame _current = { 0 };
public:
string Name() override { return "OnlyStoreInMember"; }
void EnterFrame(int32_t a, int32_t b, uint16_t c, uint8_t d) override {
_current.a = a;
_current.b = b;
_current.c = c;
_current.d = d;
}
void Stop() override {
if ((_current.a + _current.b + _current.c + _current.d) == _current.a) {
cout << "Make sure optimizer keeps the record around";
}
}
};
class WithSampler : public ProfilerBase {
private:
unique_ptr<thread> _sampling;
atomic<bool> _keepSampling = true;
protected:
const chrono::milliseconds _sampleEvery;
virtual void _snap() = 0;
virtual string _subname() = 0;
public:
WithSampler(chrono::milliseconds sampleEvery): _sampleEvery(sampleEvery) {
_sampling = make_unique<thread>(&WithSampler::_sampler, this);
}
void Stop() override {
_keepSampling = false;
_sampling->join();
}
string Name() override {
return _subname() + to_string(_sampleEvery.count()) + "ms";
}
private:
void _sampler() {
auto nextTick = chrono::steady_clock::now();
while (_keepSampling)
{
const auto sleepTime = nextTick - chrono::steady_clock::now();
if (sleepTime > chrono::milliseconds(0))
{
this_thread::sleep_for(sleepTime);
}
_snap();
nextTick += _sampleEvery;
}
}
};
struct checkedFrame {
frame actual;
int32_t check;
};
// https://rigtorp.se/spinlock/
struct spinlock {
std::atomic<bool> lock_ = { 0 };
void lock() noexcept {
for (;;) {
// Optimistically assume the lock is free on the first try
if (!lock_.exchange(true, std::memory_order_acquire)) {
return;
}
// Wait for lock to be released without generating cache misses
while (lock_.load(std::memory_order_relaxed)) {
// Issue X86 PAUSE or ARM YIELD instruction to reduce contention between
// hyper-threads
_mm_pause();
}
}
}
void unlock() noexcept {
lock_.store(false, std::memory_order_release);
}
};
class Spinlock : public WithSampler {
private:
spinlock _loc;
checkedFrame _current;
public:
using WithSampler::WithSampler;
string _subname() override { return "Spinlock"; }
void EnterFrame(int32_t a, int32_t b, uint16_t c, uint8_t d) override {
_loc.lock();
_current.actual.a = a;
_current.actual.b = b;
_current.actual.c = c;
_current.actual.d = d;
_current.check = a + b + c + d;
_loc.unlock();
}
protected:
void _snap() override {
_loc.lock();
auto snap = _current;
_loc.unlock();
if ((snap.actual.a + snap.actual.b + snap.actual.c + snap.actual.d) != snap.check) {
cout << "Corrupted snap!!\n";
}
}
};
static constexpr int32_t LOOP_MAX = 1000 * 1000 * 1000;
int measure(unique_ptr<ProfilerBase> profiler) {
cout << "Running profiler: " << profiler->Name() << "\n ";
cout << "\tProgress: ";
auto start_time = std::chrono::steady_clock::now();
int r = 0;
for (int32_t x = 0; x < LOOP_MAX; x++)
{
profiler->EnterFrame(x, x + x, x & 0xFFFF, x & 0xFF);
r += x;
if (x % (LOOP_MAX / 1000) == 0)
{
this_thread::sleep_for(chrono::nanoseconds(10)); // simulat that sometimes we do other stuff not like storing
}
if (x % (LOOP_MAX / 10) == 0)
{
cout << static_cast<int>((static_cast<double>(x) / LOOP_MAX) * 10);
}
if (x % 1000 == 0) {
_mm_pause(); // give the other threads some time
}
if (x == (LOOP_MAX / 2)) {
// the first half of the loop we take as warmup
// so now we take the actual time
start_time = std::chrono::steady_clock::now();
}
}
cout << "\n";
const auto done_calc = std::chrono::steady_clock::now();
profiler->Stop();
const auto done_writing = std::chrono::steady_clock::now();
cout << "\tcalc: " << chrono::duration_cast<chrono::milliseconds>(done_calc - start_time).count() << "ms\n";
cout << "\tflush: " << chrono::duration_cast<chrono::milliseconds>(done_writing - done_calc).count() << "ms\n";
return r;
}
int main() {
measure(make_unique<NoOp>());
measure(make_unique<JustStore>());
measure(make_unique<Spinlock>(chrono::milliseconds(1)));
measure(make_unique<Spinlock>(chrono::milliseconds(10)));
return 0;
}
在我的机器上以x86模式使用/O2
编译此代码,得到以下输出:
Running profiler: NoOp
Progress: 0123456789
calc: 1410ms
flush: 0ms
Running profiler: OnlyStoreInMember
Progress: 0123456789
calc: 1368ms
flush: 0ms
Running profiler: Spinlock1ms
Progress: 0123456789
calc: 3952ms
flush: 4ms
Running profiler: Spinlock10ms
Progress: 0123456789
calc: 3985ms
flush: 11ms
(虽然这是在VS 2022中用msvc编译的,但我认为g++ --std=c++17 -O2 -m32 -pthread -o testing small-test-case.cpp
应该足够接近)。
在这里,我们看到基于Spinlock的采样器比没有任何开销的采样器增加了约2.5倍的开销。我已经分析了它,正如预期的那样,很多时间花在获取锁上(在大多数情况下,不需要锁)。
1条答案
按热度按时间djmepvbi1#
我的一个想法是做一个循环缓冲区,并且只在它的索引上同步,这样在大多数情况下,读取器和写入器都在内存的一个单独的部分。我的理解是release & acquire对也会确保正确地刷新
_frames
内存块的每个cpu缓冲区,但这可能是我这边的一个错误。这就快了不少:
但是@PeterCordes在我的original question中指出,这不是解释C++内存模型的正确方式。