C++如何加速(使用x86 SIMD)批量可变长度整数编码/解码(可运行基准测试)

wixjitnu  于 11个月前  发布在  其他
关注(0)|答案(2)|浏览(95)

我有这样一种编码方法,它的工作原理是将16 x int64_t的小块编码为16个标志半字节的小块,这些标志半字节被打包成8个字节,然后为每个输入int64_t提供1个或更多字节的有效载荷:

  • 使用半字节(4位)表示flag
  • flag保留1个字节(1个字节可以存储2个值的标志)
  • sign(value)存储在标志中(半字节中的高位)
  • abs(value)存储在缓冲区中,字节数可变
  • 在标志中存储所需的字节数。(半字节中的3个低位)

这个示例基准是简化的。我已经用实际数据测试了和:它比LEB 128(又名varint)和lz 4更快,压缩效果也更好。所以让我们专注于这种编码方法或类似的方法。
如何改进这段代码?我主要关心解压缩速度。如果可以提高速度,可以对编码/解码格式(或其他格式)进行一些更改。我想知道在这种情况下是否可以使用SIMD。

特别说明:

  • 大多数abs(value)的长度<= 2字节。
  • 99%的情况下,4个连续的abs(value)可以用<= 7字节表示。同样,99%的情况下,16个连续的abs(value)可以用<= 31字节表示。
  • 两个重要的函数是encodedecode_original
//  ##### from @aqrit's answer, tweaked by @petercordes #######

#include <cstddef>  // size_t
#include <cstdint> // uint64_t, int64_t
#include <immintrin.h> // lzcnt intrinsic
#include <cstring> // memcpy

unsigned char* encode_buf_bmi(size_t n, const int64_t* src, unsigned char* dst) {
    unsigned char* key_ptr = dst;
    size_t num_chunks = n >> 3;
    unsigned char* data_ptr = &dst[(num_chunks * 3) + ((((n & 0x7) * 3) + 7) >> 3)];
    while (num_chunks--) {
        uint64_t key = 0;
        for (size_t i = 0; i < 8; i++) {
            uint64_t v = (uint64_t)*src++;
            memcpy(data_ptr, &v, sizeof(v)); // assumes little endian.

            v = (v + v) ^ v; // clear redundant sign bits (and trash everything else)
            v = (v << 8) | v; // combine lengths of 7 and 8
            v = (_lzcnt_u64(v | 1) ^ 63) >> 3; // use BSR on intel... 
            data_ptr += v + ((v + 1) >> 3); // key of 7 is length of 8
            key |= v << (i * 3);
        }
        memcpy(key_ptr, &key, 3); // assumes little endian.  Probably faster with overlapping 4-byte stores, except for the last chunk to avoid stepping on data
        key_ptr += 3;
    }
    // TODO: tail loop...
    
    return data_ptr;  // end of used part of buffer
}
    
void decode_buf_bmi(size_t n, const unsigned char* src, int64_t* dst) {
    const unsigned char* key_ptr = src;
    size_t num_chunks = n >> 3;
    const unsigned char* data_ptr = &src[(num_chunks * 3) + ((((n & 0x7) * 3) + 7) >> 3)];
    if (n >= 19) { // if has at least 8 bytes of padding before data_ptr
        data_ptr -= sizeof(uint64_t); // left aligned the truncated val in register (little endian) 
        while (num_chunks--) {
            unsigned keys;
            //memcpy(&keys, key_ptr, 3);   // big slowdown with GCC13.2: like 258 ms per buffer on Skylake 3.9GHz,  n_msg = 100'000'000
        memcpy(&keys, key_ptr, 4);     // 138 ms per buffer with 4-byte loads.  (Clang 16: 138 ms per buffer either way.)
            key_ptr += 3;
            for (size_t i = 0; i < 8; i++) {
                uint64_t v;
                size_t k = keys & 0x07;
                size_t len = k + ((k + 1) >> 3); // k==7 is len=8
                uint64_t mask = (uint64_t)0 - ((k + 7) >> 3); // k ? -1 : 0
                size_t shift = (64 - (len * 8)) & (size_t)mask; 

                keys >>= 3;     
                data_ptr += len; 
                memcpy(&v, data_ptr, sizeof(v));
                v &= mask;
                *dst++ = (int64_t)v >> shift;
            }
        }
        data_ptr += sizeof(uint64_t);
    }

    // TODO: tail loop...
}

// ############## From @Soonts' answer ##############
#include <immintrin.h>
#include <stdint.h>

// Exclusive prefix sum of unsigned bytes
// When the sum of all bytes exceeds 0xFF, the output is garbage
// Which is fine here because our bytes are in [0..8] interval
inline __m128i exclusivePrefixSum( const __m128i src, size_t& sum )
{
    __m128i v = src;
    // https://en.wikipedia.org/wiki/Prefix_sum#/media/File:Hillis-Steele_Prefix_Sum.svg
    v = _mm_add_epi8( v, _mm_slli_si128( v, 1 ) );
    v = _mm_add_epi8( v, _mm_slli_si128( v, 2 ) );
    v = _mm_add_epi8( v, _mm_slli_si128( v, 4 ) );
    v = _mm_add_epi8( v, _mm_slli_si128( v, 8 ) );

    // So far we have computed inclusive prefix sum
    // Keep the last byte which is total sum of bytes in the vector
    uint16_t tmp = _mm_extract_epi16( v, 7 );
    tmp >>= 8;
    sum = tmp;

    // Subtract original numbers to make exclusive sum = byte offsets to load
    return _mm_sub_epi8( v, src );
}

// Load 4 uint64 numbers from ( rsi + offsets[ i ] ), keep the lowest bits[ i ] bits in the numbers
inline __m256i loadAndMask( const uint8_t* rsi, uint32_t offsets, __m128i bits )
{
    // Load 4 uint64_t numbers from the correct locations, without AVX2 gathers
    const int64_t* s0 = (const int64_t*)( rsi + (uint8_t)offsets );
    const int64_t* s1 = (const int64_t*)( rsi + (uint8_t)( offsets >> 8 ) );
    const int64_t* s2 = (const int64_t*)( rsi + (uint8_t)( offsets >> 16 ) );
    const int64_t* s3 = (const int64_t*)( rsi + ( offsets >> 24 ) );
    __m256i r = _mm256_setr_epi64x( *s0, *s1, *s2, *s3 );

    // Mask away the higher pieces in the loaded numbers
    __m256i shift = _mm256_cvtepu8_epi64( bits );
    const __m256i ones = _mm256_set1_epi32( -1 );
    __m256i mask = _mm256_sllv_epi64( ones, shift );
    return _mm256_andnot_si256( mask, r );
}

inline __m256i applySigns( __m256i v, __m128i signs )
{
    // Sign extend the masks from bytes to int64
    __m256i mask = _mm256_cvtepi8_epi64( signs );
    // Conditionally negate the numbers
    __m256i neg = _mm256_sub_epi64( _mm256_setzero_si256(), v );
    return _mm256_blendv_epi8( v, neg, mask );
}

class BlockDecoder
{
    // Load offsets, in bytes
    __m128i offsetBytes;
    // Payload bits per element, the bytes are in [ 0 .. 64 ] interval
    __m128i payloadBits;
    // 16 bytes with the 0x80 bit set when the corresponding input was negative; the rest of the bits are unused
    __m128i signs;
    // Count of payload bytes in the complete block
    size_t payloadBytes;

    // Decode the block header
    inline void loadHeader( const uint8_t* rsi )
    {
        // Load 8 bytes, and zero extend them into uint16_t
        const __m128i v = _mm_cvtepu8_epi16( _mm_loadu_si64( rsi ) );

        // Unpack lengths
        const __m128i seven = _mm_set1_epi8( 7 );
        const __m128i l4 = _mm_slli_epi16( v, 4 );
        __m128i lengths = _mm_or_si128( v, l4 );
        lengths = _mm_and_si128( lengths, seven );
        // Transform 7 into 8
        __m128i tmp = _mm_cmpeq_epi8( lengths, seven );
        lengths = _mm_sub_epi8( lengths, tmp );

        // Byte offsets to load 16 numbers, and count of payload bytes in the complete block
        offsetBytes = exclusivePrefixSum( lengths, payloadBytes );
        // Count of payload bits in the loaded numbers, lengths * 8
        payloadBits = _mm_slli_epi16( lengths, 3 );
        // Signs vector, we only use the highest 0x80 bit in these bytes
        signs = _mm_or_si128( _mm_slli_epi16( v, 8 ), l4 );
    }

    // Decode and store 4 numbers
    template<int slice>
    inline void decodeSlice( int64_t* rdi, const uint8_t* payload ) const
    {
        uint32_t off;
        __m128i bits, s;
        if constexpr( slice != 0 )
        {
            off = (uint32_t)_mm_extract_epi32( offsetBytes, slice );
            constexpr int imm = _MM_SHUFFLE( slice, slice, slice, slice );
            bits = _mm_shuffle_epi32( payloadBits, imm );
            s = _mm_shuffle_epi32( signs, imm );
        }
        else
        {
            off = (uint32_t)_mm_cvtsi128_si32( offsetBytes );
            // For the first slice of the block, the 4 lowest bytes are in the correct locations already
            bits = payloadBits;
            s = signs;
        }

        __m256i v = loadAndMask( payload, off, bits );
        v = applySigns( v, s );
        _mm256_storeu_si256( ( __m256i* )rdi, v );
    }

public:

    // Decode and store a block of 16 numbers, return pointer to the next block
    const uint8_t* decode( int64_t* rdi, const uint8_t* rsi )
    {
        loadHeader( rsi );
        rsi += 8;

        decodeSlice<0>( rdi, rsi );
        decodeSlice<1>( rdi + 4, rsi );
        decodeSlice<2>( rdi + 8, rsi );
        decodeSlice<3>( rdi + 12, rsi );

        return rsi + payloadBytes;
    }
};

//   ############ reference version and updated benchmark framework ###########
#include <iostream>
#include <string>
#include <cstring>
#include <algorithm>
#include <vector>
#include <chrono>
#include <iomanip>
using namespace std;

// https://en.wikipedia.org/wiki/Xorshift   IIRC, wikipedia used to have different constants for some PRNG, maybe some of these, vs. the upstream site; I just copied wikipedia
#include <stdint.h>
uint64_t rol64(uint64_t x, int k)
{
    return (x << k) | (x >> (64 - k));
}

struct xoshiro256ss_state {
    uint64_t s[4];
};

static xoshiro256ss_state rng_state; // init by main
uint64_t xoshiro256ss(struct xoshiro256ss_state *state)
{
    uint64_t *s = state->s;
    uint64_t const result = rol64(s[1] * 5, 7) * 9;
    uint64_t const t = s[1] << 17;

    s[2] ^= s[0];
    s[3] ^= s[1];
    s[1] ^= s[2];
    s[0] ^= s[3];

    s[2] ^= t;
    s[3] = rol64(s[3], 45);

    return result;
}

namespace {
    class MyTimer {
    std::chrono::time_point<std::chrono::system_clock> start;

public:
    void startCounter() {
        start = std::chrono::system_clock::now();
    }

    int64_t getCounterNs() {
        return std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now() - start).count();
    }

    int64_t getCounterMs() {
        return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - start).count();
    }

    double getCounterMsPrecise() {
        return std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now() - start).count()
                / 1000000.0;
    }
};
}

template <typename T>
void leb128_encode(uint8_t* data, T value, size_t &idx)
{
  bool more = true;
  bool negative = value < 0;
  constexpr int size = sizeof(value) * 8;

  while (more) {
    uint8_t byte = value & 0x7F;
    value >>= 7;
    if (negative) value |= (~0LL << (size - 7));

    if ((value == 0 && (byte & 0x40) == 0) || (value == -1 && (byte & 0x40) != 0)) {
      more = false;
    } else {
        byte |= 0x80;
    }

    data[idx++] = byte;
  }
}

template <typename T>
void leb128_decode(uint8_t* data, T &value, size_t &idx)
{
  value = 0;
  int shift = 0;
  constexpr int size = sizeof(value) * 8;
  uint8_t byte;

  do {
    byte = data[idx++];
    value |= int64_t(byte & 0x7F) << shift;
    shift += 7;
  } while (byte & 0x80);

  if (shift < size && (byte & 0x40)) {
    value |= (~0LL << shift);
  }
}

int64_t n_msg = 100'000'000;
constexpr int BLK_SIZE = 16;
uint8_t* buffer;

int64_t *original;
int64_t *decoded;
int64_t *dummy_array;

// encode 1 value using variable length encoding,
// storing the result in a flag
void encode(int64_t value, uint8_t &flag, size_t &idx)
{
  bool negative = value < 0;
  value = abs(value);

  uint8_t byte_cnt = 0;
  while (value > 0) {
    buffer[idx] = value & 0xFF; // lowest byte
    idx++;
    value >>= 8;
    byte_cnt++;
  }

  // special cases. Since we only have 3 bits to represent 9 values (0->8),
  // we choose to group case 7-8 together because they're rarest.  
  if (byte_cnt == 7) {
    buffer[idx] = 0;
    idx++;
  } else if (byte_cnt == 8) {
    // since we only have 3 bits, we use 7 to represent 8 bytes
    byte_cnt = 7;
  }

  flag = byte_cnt; // bit 0-2 represent length
  if (negative) flag |= 0b1000; // bit 3 represent sign (1 means negative)  
}

// returns compression ratio
double __attribute__ ((noinline)) encode_all(int type = 0)
{
  if (type == 0) {
    // Soonts version uses the same format as this reference version
    size_t idx = 0;
    // encode in blocks of 16 values
    for (size_t i = 0; i < n_msg; i += 16) {
      size_t flag_idx = idx;
      idx += 8; // first 8 bytes of a block are for sign/length flags
      for (int t = 0; t < BLK_SIZE; t += 2) {
        uint8_t low_flag, high_flag;
        encode(original[i + t], low_flag, idx);
        encode(original[i + t + 1], high_flag, idx);
        buffer[flag_idx + t / 2] = (high_flag << 4) | low_flag;
      }
    }
    return (double(idx) / (n_msg * 8));
  } else if (type == 1) {
    const unsigned char *end = encode_buf_bmi(n_msg, original, buffer);
    return (double(end - buffer) / (n_msg * 8));    
  } else if (type == 2) {
    size_t idx = 0;
    for (int i = 0; i < n_msg; i++) leb128_encode(buffer, original[i], idx);
    return (double(idx) / (n_msg * 8));
  }

  cout << "encode_all unknown type " << type << std::endl;
  exit(1);
}

template <typename T>
void extract_flag(uint8_t flag, T &low_sign, T &low_length, T &high_sign, T &high_length)
{
  low_sign = flag & 0b1000;
  low_length = flag & 0b0111;
  high_sign = (flag >> 4) & 0b1000;
  high_length = (flag >> 4) & 0b0111;
}

void __attribute__ ((noinline)) decode_original()
{
  static constexpr int64_t mult[] = {
    1, 1LL << 8, 1LL << 16, 1LL << 24, 1LL << 32, 1LL << 40, 1LL << 48, 1LL << 56
  };

  size_t idx = 0, num_idx = 0;
  for (size_t i = 0; i < n_msg; i += 16) {
    // first 8 bytes of a block are flags
    int signs[BLK_SIZE], lens[BLK_SIZE];
    for (int t = 0; t < BLK_SIZE; t += 2) {
      extract_flag(buffer[idx], signs[t], lens[t], signs[t + 1], lens[t + 1]);
      idx++;
    }
    for (int t = 0; t < BLK_SIZE; t++) if (lens[t] == 7) lens[t] = 8; // special case

    // decode BLK_SIZE values  
    for (int t = 0; t < BLK_SIZE; t++) {
      int64_t value = 0;
      for (int i = 0; i < lens[t]; i++) value += mult[i] * buffer[idx + i];
      if (signs[t]) value = -value;
      decoded[num_idx + t] = value;
      idx += lens[t];
    }
    num_idx += BLK_SIZE;
  }
}

void __attribute__ ((noinline)) decode_soonts()
{
  BlockDecoder dec;
  const uint8_t* rsi = buffer;
  int64_t* rdi = decoded;
  for( size_t i = 0; i < n_msg; i += 16 )
  {
      rsi = dec.decode( rdi, rsi );
      rdi += 16;
  }
}

void __attribute__ ((noinline)) decode_aqrit()
{
  decode_buf_bmi(n_msg, buffer, decoded);
}

void __attribute__ ((noinline)) decode_leb()
{
  size_t idx = 0;
  for (int i = 0; i < n_msg; i++) {
    leb128_decode(buffer, decoded[i], idx);
  }
}

//------------------
//------------------
//------------------  MAIN

void __attribute__ ((noinline)) gen_data()
{
  for (size_t i = 0; i < n_msg; i++) {
    uint64_t modifier = xoshiro256ss(&rng_state);  // low 32 bits decide magnitude range, high bits decide sign, to avoid correlation with %100 over the full thing.
    if ( uint32_t(modifier) % 100 == 0) {
      original[i] = int64_t(xoshiro256ss(&rng_state)) * 1'000'000'000;
    } else {
      original[i] = xoshiro256ss(&rng_state) % 70'000;
    }
    if ((modifier>>62) == 0)
      original[i] *= -1;
  }
}

void __attribute__ ((noinline)) check(const string &name)
{
  for (size_t i = 0; i < n_msg; i++) if (original[i] != decoded[i]) {
    cout << name << " wrong at " << i << " " << original[i] << " " << decoded[i] << "\n";
    exit(1);
  }
}

int64_t volatile dummy = 42;
constexpr int N_DUMMY = 8'000'000;
void __attribute__ ((noinline)) clear_cache()
{
#if 1
  for (int i = 0; i < N_DUMMY; i++) dummy_array[i] = dummy;
  std::sort(dummy_array, dummy_array + N_DUMMY);
  dummy = dummy_array[rand() % N_DUMMY];
#else
    asm("" ::: "memory");
#endif
}

using FuncPtr = void (*)();
int64_t __attribute__ ((noinline)) test(FuncPtr func, const string &name)
{
  clear_cache();
  MyTimer timer;
  timer.startCounter();
  func();
  int64_t cost = timer.getCounterNs();
  check(name);
  return cost;
}

void printTableRow(const std::vector<std::string>& columns, int columnWidth) {
    for (const std::string& column : columns) {
        std::cout << std::left << std::setw(columnWidth) << column << " | ";
    }
    std::cout << std::endl;
}

int main()
{
  // srand(time(NULL));
  rng_state.s[0] = time(nullptr);  // other elements left zero.  This is terrible but sufficient for our purposes

  MyTimer timer;
  buffer = new uint8_t[n_msg * 10];
  original = new int64_t[n_msg];
  decoded = new int64_t[n_msg];
  dummy_array = new int64_t[N_DUMMY]; // enough to flush cache
  
  memset(buffer, 0, n_msg * 10);
  memset(original, 0, n_msg * 8);
  memset(decoded, 0, n_msg * 8);
  memset(dummy_array, 0, N_DUMMY * 8);

  int n_test = 10;
  constexpr int L = 4;
  string names[L] = {"original", "soonts", "aqrit", "leb"};
  int64_t total_costs[L] = {0, 0, 0, 0};

  for (int t = 0; t < n_test; t++) {    
    gen_data();
    double ratio_original = encode_all(0);
    int64_t costs[L];

    costs[0] = test(decode_original, names[0]);
    costs[1] = test(decode_soonts, names[1]);

    double ratio_aqrit = encode_all(1);
    costs[2] = test(decode_aqrit, names[2]);

    double ratio_leb = encode_all(2);
    costs[3] = test(decode_leb, names[3]);
    
    if (t == 0) {
      cout << "Compression ratios: " << ratio_original << " " << ratio_aqrit << " " << ratio_leb << "\n";
    }

    cout << t << ": ";
    for (int i = 0; i < L; i++) {
      cout << (double(costs[i]) / 1'000'000) << " ";
      total_costs[i] += costs[i];
    }
    cout << "\n";
  }

  vector<string> headers = {"name", "cost (ms)", "cost/int64 (ns)", "cost/byte (ns)"};
  printTableRow(headers, 15);
  for (int i = 0; i < L; i++) {    
    double average_cost_ms = double(total_costs[i]) / n_test / 1'000'000;
    double cost_int64 = double(total_costs[i]) / (n_msg * n_test);
    double cost_byte = cost_int64 / 8;
    
    vector<string> row;
    row.push_back(names[i]);
    row.push_back(to_string(average_cost_ms));
    row.push_back(to_string(cost_int64));
    row.push_back(to_string(cost_byte));
    printTableRow(row, 15);
  }

  return 0;
}

字符串

**编辑:**我刚刚发现有人在2017年发现了几乎相同的编码/解码格式(但对于int 32)。多么有趣的巧合,两个人在两个不同的时间独立地“发明”了同样的东西:D

Soonts答案的第二个版本(funcinl funcvpgather)代码here。它看起来更干净,并且不会读取输入缓冲区的末尾,但速度较慢,所以我保留了第一个版本。

  • func:此版本不需要解码器对象。force_inline = 0, use_gather = 0
  • inl funcforce_inline = 1, use_gather = 0
  • vpgatherforce_inline = 0, use_gather = 1

编译命令:g++ -o main test.cpp -std=c+=17 -O3 -mavx2 -mlzcnt -march=native
. -march=native上的基准测试提供了可衡量的性能提升(最佳情况下约8%)
达到10次运行的平均值的时间(10次运行的平均值):

// AMD EPYC 75F3 (Zen 3 x86-64), 2950MHz
Compression ratios: 0.327437 0.369956
0: 447.651 65.0414 110.171 
1: 442.853 64.5952 105.635 
2: 434.715 64.1977 108.984 
3: 430.09 63.0572 104.074 
4: 424.451 64.6397 103.604 
5: 436.631 65.0076 104.04 
6: 429.59 64.1896 102.936 
7: 434.184 64.0522 104.035 
8: 430.223 69.3877 105.922 
9: 420.659 63.7519 105.563
// AMD EPYC 75F3 -march=native
name            | cost (ms)       | cost/int64 (ns) | cost/byte (ns)  | 
original        | 433.104635      | 4.331046        | 0.541381        | 
soonts          | 64.792034       | 0.647920        | 0.080990        | 
aqrit           | 105.496544      | 1.054965        | 0.131871        |
leb             | 438.236469      | 4.382365        | 0.547796        |
soonts (func)   | 68.403465       | 0.684035        | 0.085504        | 
soon (inl func) | 71.202335       | 0.712023        | 0.089003        |
soon (vpgather) | 78.308508       | 0.783085        | 0.097886        |

// AMD EPYC 75F3 without -march=native
name            | cost (ms)       | cost/int64 (ns) | cost/byte (ns)  | 
original        | 424.631426      | 4.246314        | 0.530789        | 
soonts          | 69.623498       | 0.696235        | 0.087029        | 
aqrit           | 109.895916      | 1.098959        | 0.137370        | 

// Intel(R) Xeon(R) Gold 6252N CPU (supposedly 2.3 GHz, I can't change/lock frequency on this borrowed machine)
name            | cost (ms)       | cost/int64 (ns) | cost/byte (ns)  | 
original        | 549.280441      | 5.492804        | 0.686601        | 
soonts          | 118.401718      | 1.184017        | 0.148002        | 
aqrit           | 151.811314      | 1.518113        | 0.189764        |
soon (func)     | 122.440794      | 1.224408        | 0.153051        |
soon (inl func) | 120.559447      | 1.205594        | 0.150699        |
soon (vpgather) | 115.704298      | 1.157043        | 0.144630        |
aqrit           | 151.811314      | 1.518113        | 0.189764        |

wfsdck30

wfsdck301#

这是相对棘手的,但仍然可以矢量化你的解码器。这里有一个实现,在我的电脑与AMD Zen 3 CPU优于您的参考版本的2.8倍。

#include <immintrin.h>
#include <stdint.h>

// Exclusive prefix sum of unsigned bytes
// When the sum of all bytes exceeds 0xFF, the output is garbage
// Which is fine here because our bytes are in [0..8] interval
inline __m128i exclusivePrefixSum( const __m128i src, size_t& sum )
{
    __m128i v = src;
    // https://en.wikipedia.org/wiki/Prefix_sum#/media/File:Hillis-Steele_Prefix_Sum.svg
    v = _mm_add_epi8( v, _mm_slli_si128( v, 1 ) );
    v = _mm_add_epi8( v, _mm_slli_si128( v, 2 ) );
    v = _mm_add_epi8( v, _mm_slli_si128( v, 4 ) );
    v = _mm_add_epi8( v, _mm_slli_si128( v, 8 ) );

    // So far we have computed inclusive prefix sum
    // Keep the last byte which is total sum of bytes in the vector
    uint16_t tmp = _mm_extract_epi16( v, 7 );
    tmp >>= 8;
    sum = tmp;

    // Subtract original numbers to make exclusive sum = byte offsets to load
    return _mm_sub_epi8( v, src );
}

// Load 4 uint64 numbers from ( rsi + offsets[ i ] ), keep the lowest bits[ i ] bits in the numbers
inline __m256i loadAndMask( const uint8_t* rsi, uint32_t offsets, __m128i bits )
{
    // Load 4 uint64_t numbers from the correct locations, without AVX2 gathers
    const int64_t* s0 = (const int64_t*)( rsi + (uint8_t)offsets );
    const int64_t* s1 = (const int64_t*)( rsi + (uint8_t)( offsets >> 8 ) );
    const int64_t* s2 = (const int64_t*)( rsi + (uint8_t)( offsets >> 16 ) );
    const int64_t* s3 = (const int64_t*)( rsi + ( offsets >> 24 ) );
    __m256i r = _mm256_setr_epi64x( *s0, *s1, *s2, *s3 );

    // Mask away the higher pieces in the loaded numbers
    __m256i shift = _mm256_cvtepu8_epi64( bits );
    const __m256i ones = _mm256_set1_epi32( -1 );
    __m256i mask = _mm256_sllv_epi64( ones, shift );
    return _mm256_andnot_si256( mask, r );
}

inline __m256i applySigns( __m256i v, __m128i signs )
{
    // Sign extend the masks from bytes to int64
    __m256i mask = _mm256_cvtepi8_epi64( signs );
    // Conditionally negate the numbers
    __m256i neg = _mm256_sub_epi64( _mm256_setzero_si256(), v );
    return _mm256_blendv_epi8( v, neg, mask );
}

class BlockDecoder
{
    // Load offsets, in bytes
    __m128i offsetBytes;
    // Payload bits per element, the bytes are in [ 0 .. 64 ] interval
    __m128i payloadBits;
    // 16 bytes with the 0x80 bit set when the corresponding input was negative; the rest of the bits are unused
    __m128i signs;
    // Count of payload bytes in the complete block
    size_t payloadBytes;

    // Decode the block header
    inline void loadHeader( const uint8_t* rsi )
    {
        // Load 8 bytes, and zero extend them into uint16_t
        const __m128i v = _mm_cvtepu8_epi16( _mm_loadu_si64( rsi ) );

        // Unpack lengths
        const __m128i seven = _mm_set1_epi8( 7 );
        const __m128i l4 = _mm_slli_epi16( v, 4 );
        __m128i lengths = _mm_or_si128( v, l4 );
        lengths = _mm_and_si128( lengths, seven );
        // Transform 7 into 8
        __m128i tmp = _mm_cmpeq_epi8( lengths, seven );
        lengths = _mm_sub_epi8( lengths, tmp );

        // Byte offsets to load 16 numbers, and count of payload bytes in the complete block
        offsetBytes = exclusivePrefixSum( lengths, payloadBytes );
        // Count of payload bits in the loaded numbers, lengths * 8
        payloadBits = _mm_slli_epi16( lengths, 3 );
        // Signs vector, we only use the highest 0x80 bit in these bytes
        signs = _mm_or_si128( _mm_slli_epi16( v, 8 ), l4 );
    }

    // Decode and store 4 numbers
    template<int slice>
    inline void decodeSlice( int64_t* rdi, const uint8_t* payload ) const
    {
        uint32_t off;
        __m128i bits, s;
        if constexpr( slice != 0 )
        {
            off = (uint32_t)_mm_extract_epi32( offsetBytes, slice );
            constexpr int imm = _MM_SHUFFLE( slice, slice, slice, slice );
            bits = _mm_shuffle_epi32( payloadBits, imm );
            s = _mm_shuffle_epi32( signs, imm );
        }
        else
        {
            off = (uint32_t)_mm_cvtsi128_si32( offsetBytes );
            // For the first slice of the block, the 4 lowest bytes are in the correct locations already
            bits = payloadBits;
            s = signs;
        }

        __m256i v = loadAndMask( payload, off, bits );
        v = applySigns( v, s );
        _mm256_storeu_si256( ( __m256i* )rdi, v );
    }

public:

    // Decode and store a block of 16 numbers, return pointer to the next block
    const uint8_t* decode( int64_t* rdi, const uint8_t* rsi )
    {
        loadHeader( rsi );
        rsi += 8;

        decodeSlice<0>( rdi, rsi );
        decodeSlice<1>( rdi + 4, rsi );
        decodeSlice<2>( rdi + 8, rsi );
        decodeSlice<3>( rdi + 12, rsi );

        return rsi + payloadBytes;
    }
};

字符串
我对代码的测试很少,它有可能进一步提高性能。
如您所见,该实现需要AVX 2,并且是无分支的。
使用示例:

BlockDecoder dec;
const uint8_t* rsi = buffer;
int64_t* rdi = decoded;
for( size_t i = 0; i < n_msg; i += 16 )
{
    rsi = dec.decode( rdi, rsi );
    rdi += 16;
}

**NB!**通常情况下,当流中的最后一个数字没有使用最大8字节时,该实现会在编码缓冲区的末尾加载几个字节。您可以填充压缩数据,或实现特殊情况来解码流的最后一个块,或调整编码器以始终为最后一个块的最后一个数字发出8字节。

P.S.初始版本使用_mm256_i32gather_epi64指令从内存中加载四个int 64数字,使用一个基指针+另一个向量的四个字节偏移量。然而,在AMD CPU上,它比4个标量加载稍慢。如果你的目标是Intel,可能更好地使用初始版本,请参阅编辑历史。

qltillow

qltillow2#

在大多数情况下,标量实现可以是无分支的。
从长远来看,用符号位填充控制位可能会花费位。相反,这里所有的控制位都打包在流的开头(类似于streamvbyte)。
使用可变符号扩展是因为它比字节标量实现的zigzag更便宜。
概念验证假设:

  • 快速非对齐内存访问(和memcpy优化了)
  • 小端存储顺序
  • 算术右移
#include <cstddef>  // size_t
#include <cstdint> // uint64_t, int64_t
#include <immintrin.h> // lzcnt intrinsic
#include <cstring> // memcpy

unsigned char* encode(size_t n, const int64_t* src, unsigned char* dst) {
    unsigned char* key_ptr = dst;
    size_t num_chunks = n >> 3;
    unsigned char* data_ptr = &dst[(num_chunks * 3) + ((((n & 0x7) * 3) + 7) >> 3)];
    while (num_chunks--) {
        uint64_t key = 0;
        for (size_t i = 0; i < 8; i++) {
            uint64_t v = (uint64_t)*src++;
            memcpy(data_ptr, &v, sizeof(v)); // assumes little endian

            v = (v + v) ^ v; // clear redundant sign bits (and trash everything else)
            v = (v << 8) | v; // combine lengths of 7 and 8
            v = (_lzcnt_u64(v | 1) ^ 63) >> 3; // use BSR on intel... 
            data_ptr += v + ((v + 1) >> 3); // key of 7 is length of 8
            key |= v << (i * 3);
        }
        memcpy(key_ptr, &key, 3); // assumes little endian
        key_ptr += 3;
    }
    // TODO: tail loop...
    
    return dst;
}
    
void decode(size_t n, const unsigned char* src, int64_t* dst) {
    const unsigned char* key_ptr = src;
    size_t num_chunks = n >> 3;
    const unsigned char* data_ptr = &src[(num_chunks * 3) + ((((n & 0x7) * 3) + 7) >> 3)];
    if (n >= 19) { // if has at least 8 bytes of padding before data_ptr
        data_ptr -= sizeof(uint64_t); // left aligned the truncated val in register (little endian) 
        while (num_chunks--) {
            unsigned keys;
            memcpy(&keys, key_ptr, 3);
            key_ptr += 3;
            for (size_t i = 0; i < 8; i++) {
                uint64_t v;
                size_t k = keys & 0x07;
                size_t len = k + ((k + 1) >> 3); // k==7 is len=8
                uint64_t mask = (uint64_t)0 - ((k + 7) >> 3); // k ? -1 : 0
                size_t shift = (64 - (len * 8)) & (size_t)mask; 

                keys >>= 3;     
                data_ptr += len; 
                memcpy(&v, data_ptr, sizeof(v));
                v &= mask;
                *dst++ = (int64_t)v >> shift;
            }
        }
        data_ptr += sizeof(uint64_t);
    }

    // TODO: tail loop...
}

字符串

相关问题