c++ 将64位整数范围[from,to]转换为伪随机顺序的最快方法,在所有平台上都具有相同的结果?

mgdq6dx1  于 2023-05-02  发布在  其他
关注(0)|答案(2)|浏览(133)

bounty还有6天到期。回答此问题可获得+100声望奖励。xamid想要引起更多关注这个问题:仍然在寻找正确的答案。

给定索引的某个区间[a,B](64位无符号整数),我想快速获得一个数组,该数组包含根据均匀分布的散列函数排序的所有这些索引,* 看起来随机 * 但实际上在每个系统上都是相同的,而不管使用的C++实现。
目标是找到高度优化的此类方法。您可以通过英特尔的oneTBB使用共享内存并行来提高性能。
就像

vector<uint64_t> distributeIndices(uint64_t from, uint64_t to) {
    unordered_set<uint64_t> uset;
    for (uint64_t i = from; i <= to; i++)
        uset.insert(i);
    return vector<uint64_t>(uset.begin(), uset.end());
}

如果unordered_set<uint64_t>在每个实现上总是使用相同的和伪随机分布的散列函数,则将生成期望的结果,而这两种情况都不是这样。这也将是一个低效的解决方案。TBB等效物:

tbb::concurrent_vector<uint64_t> distributeIndices(uint64_t from, uint64_t to) {
    tbb::concurrent_unordered_set<uint64_t> uset;
    tbb::parallel_for(from, to + 1, [&uset](uint64_t i) {
        uset.insert(i);
    }); // NOTE: This is only to illustrate a parallel loop, sequential insertion is actually faster.
    return tbb::concurrent_vector<uint64_t>(uset.begin(), uset.end());
}

请注意,distributeIndices(from, to)应该返回{from,...,到}。

考虑从this answer得到transform。值得注意的是,循环分布不是伪随机分布:

  • 排序{from,...,to} w.r.t. (uint64_t a, uint64_t b) { return transform(a) < transform(b) }
  • distributeIndices(42, 42+99999999)[0, ..., 999]看起来一点也不随机:

  • 排序{from,...,to} w.r.t. (uint64_t a, uint64_t b) { return transform(a) % n < transform(b) % n }
  • distributeIndices(42, 42+99999999)[0, ..., 999]看起来一点也不随机:

  • 将{from,...,to}到transform(x - from) % n + from

  • distributeIndices(42, 42+99999999)恰好是双射的(因为10000000039293coprime),但是distributeIndices(42, 42+99999999)[0, ..., 999]看起来一点也不随机:

  • distributeIndices(42, 42+3929299)不是双射。它只分配100个不同的元素,以100为周期循环:

  • 将{from,...,to}到transform(x - from) + from
  • distributeIndices(42, 42+99999999)不是双射,e.例如,它分配3929375282657 > 42+99999999
  • 特别地,linear congruential generator一般不是双射。但如果你能让它在每个区间[from,to]都是这样,同时也隐藏它的循环性质,* 怎么做 *?

因此,答案应该提供一个特定的散列函数(以及为什么它是快速和均匀分布的),如何有效地利用它来计算distributeIndices(from, to)
同样,无论distributeIndices(from, to)在哪里运行,使用什么编译器,它都有相同的结果,这一点是至关重要的,根据C++标准必须保证的。但是,如果distributeIndices(0,2)1分配了与distributeIndices(0,3)不同的索引,则没有问题。
可接受的返回类型为std::vectortbb::concurrent_vectordynamic arrays,由uint64_t类型的元素组成。
该函数应该在包含数十亿个索引的范围上表现良好。
[如果你想知道为什么这可能是有用的:考虑到在不同的计算节点上有不同的进程,通过Message Passing Interface进行通信,它们不应该发送实际的数据(很大),而只发送它们正在处理的数据条目的索引。同时,处理数据的顺序应该是伪随机化的,以便进度速度不会“反弹”太多(当沿着有序索引处理时,它确实如此)。这对于可靠地预测整个计算将花费多长时间至关重要。因此每个节点必须知道哪个转换索引引用哪个实际索引i。即每个节点必须为distributeIndices(from, to)计算相同的结果。]

最快的正确工作的解决方案赢得了公认的答案。

  • 没有C/C++代码,没有可接受的答案。

(除非它证明问题无法有效解决。)
我将使用GCC 11测试解决方案。3 -O3在我的旧i7- 3610 QM笔记本电脑上,8个硬件线程在1亿个索引上(即。即distributeIndices(c, c + 99999999)),并且当将来的答案提供更好的执行解决方案时,可以改变接受的答案。
Testing code(最多运行10次,选择最快执行):

int main(int argc, char* argv[]) {
    uint64_t c = argc < 3 ? 42 : atoll(argv[1]);
    uint64_t s = argc < 3 ? 99999 : atoll(argv[2]); // 99999999 for performance testing
    for (unsigned i = 0; i < 10; i++) {
        chrono::time_point<chrono::system_clock> startTime = chrono::system_clock::now();
        auto indices = distributeIndices(c, c + s);
        chrono::microseconds dur = chrono::duration_cast<chrono::microseconds>(chrono::system_clock::now() - startTime);
        cout << durationStringMs(dur) << endl;
        // [... some checks ...]
#if 0 // bijectivity check
        set<uint64_t> m = set<uint64_t>(indices.begin(), indices.end());
        cout << "min: " << *m.begin() << " , max: " << *prev(m.end()) << ", #elements: " << m.size() << endl;
#endif
        cout << "required average: " << round((2.0L * c + s) / 2, 2) << endl;
        long double avg = accumulate(indices.begin(), indices.end(), __uint128_t(0)) / static_cast<long double>(indices.size());
        string sa = round(avg, 2);
        cout << "actual average:   " << sa << endl;
        auto printTrendlineHelpers = [](uint64_t minX, string avgX, uint64_t maxX, uint64_t minY, string avgY, uint64_t maxY) {
            cout << "Trendline helpers:" << endl;
            cout << "[max] " << minX << " " << maxY << " " << avgX << " " << maxY << " " << maxX << " " << maxY << endl;
            cout << "[avg] " << minX << " " << avgY << " " << avgX << " " << avgY << " " << maxX << " " << avgY << endl;
            cout << "[min] " << minX << " " << minY << " " << avgX << " " << minY << " " << maxX << " " << minY << endl;
        };
        // Print some plottable data, for e.g. https://www.rapidtables.com/tools/scatter-plot.html
        unsigned plotAmount = 2000;
        auto printPlotData = [&](uint64_t start, uint64_t end) {
            long double rng = static_cast<long double>(end - start);
            long double avg = accumulate(indices.begin() + start, indices.begin() + end, __uint128_t(0)) / rng;
            cout << "\ndistributeIndices(" << c << ", " << c << "+" << s << ")[" << start << ", ..., " << end - 1 << "]: (average " << round(avg, 2) << ")" << endl;
            stringstream ss;
            for (unsigned i = start; i < end; i++)
                ss << i << " " << indices[i] << (i + 1 == end ? "" : " ");
            cout << ss.str() << endl;
            printTrendlineHelpers(start, round(start + rng / 2, 2), end - 1, c, sa, c + s);
        };
        printPlotData(0, plotAmount); // front
        printPlotData(indices.size() / 2 - plotAmount / 2, indices.size() / 2 + plotAmount / 2); // middle
        printPlotData(indices.size() - plotAmount, indices.size()); // back
#if 1 // Print average course
        if (s >= 1000000)
            plotAmount *= 10;
        stringstream ss;
        for (uint64_t start = 0; start < indices.size(); start += plotAmount) {
            uint64_t end = min(start + plotAmount, indices.size());
            uint64_t i = start + (end - start) / 2;
            long double avg = accumulate(indices.begin() + start, indices.begin() + end, __uint128_t(0)) / static_cast<long double>(end - start);
            ss << i << " " << round(avg, 2) << (end == indices.size() ? "" : " ");
        }
        cout << "\nAverage course of distributeIndices(" << c << ", " << c << "+" << s << ") with slices of size " << plotAmount << ":\n" << ss.str() << endl;
        printTrendlineHelpers(c, sa, c + s, c, sa, c + s);
        break;
#endif
    }
    return 0;
}
  • 结果的存储(e.例如,通过静态变量)显然是不允许的。
  • uint64_t fromuint64_t to * 不能 * 被认为是constexpr

我的两个(不合适的)例子是14482.83 ms (14 s 482.83 ms)186812.68 ms (3 min 6 s 812.68 ms)
第二种方法看起来非常慢,但仔细检查后,它是我的系统上唯一真正分发值的方法:

  • unordered_set<uint64_t>变体:

例如100000041、100000040、100000039、100000038、100000037、... // bad

  • tbb::concurrent_unordered_set<uint64_t>变体:

例如67108864、33554432、16777216、83886080、50331648、...//分布良好,但看起来不随机

  • distributeIndices(42, 42+99999999)[0, ..., 999]与多项式趋势线:

上面的分布看起来是有序的,而不是随机的。
表明随机性的示例性分布可以从olegarch's answer获得,截至2023年4月30日。

// LCG params from: https://nuclear.llnl.gov/CNP/rng/rngman/node4.html
std::vector<uint64_t> distributeIndices(uint64_t lo, uint64_t hi) {
    uint64_t size = hi - lo + 1;
    std::vector<uint64_t> vec(size);
    for(uint64_t i = 0; i < size; i++)
        vec[i] = i + lo;
    uint64_t rnd = size ^ 0xBabeCafeFeedDad;
    for(uint64_t i = 0; i < size; i++) {
        rnd = rnd * 2862933555777941757ULL + 3037000493;
        uint64_t j = rnd % size;
        uint64_t tmp = vec[i]; vec[i] = vec[j]; vec[j] = tmp;
    }
    return std::move(vec);
}

请注意,该解决方案仍然是不正确的,因为它没有为所有范围提供均匀分布,如下所示。它也不使用并行计算,但它表现良好:在我的i7- 3610 QM上计算1亿个索引需要3235.18 ms

  • 正面; distributeIndices(42, 42+99999999)[0, ..., 1999],多项式趋势线:

  • 中间; distributeIndices(42, 42+99999999)[49999000, ..., 50000999],具有多项式趋势线:

  • 背面; distributeIndices(42, 42+99999999)[99998000, ..., 99999999],带多项式趋势线:

上面的分布看起来是随机的,即使平均值在开始时似乎有点低和有弹性。其全球趋势如下。

  • 多项式趋势线distributeIndices(42, 42+99999999)的平均过程:

多项式趋势线与全球平均值的偏差高达5%,因此分布不均匀。
对于某些范围,情况变得更糟:

  • 多项式趋势线distributeIndices(0, 67108863)的平均过程:

  • 正面; distributeIndices(0, 67108863)[0, ..., 1999],多项式趋势线:

这显然不是一个可接受的分布。
具有无缺陷趋势线的示例性分布可以从Severin Pappadeux's answer获得,截至2023年4月30日。根据建议,我添加了一些并行化。

uint64_t m = 0xd1342543de82ef95ULL; // taken from https://arxiv.org/pdf/2001.05304.pdf
uint64_t c = 0x1ULL;
inline auto lcg(uint64_t xi) -> uint64_t { // as LCG as it gets
    return m*xi + c;
}
inline auto cmp_lcg(uint64_t a, uint64_t b) -> bool {
    return lcg(a) < lcg(b);
}
auto distributeIndices(uint64_t from, uint64_t to) -> std::vector<uint64_t> {
    uint64_t size = to - from + 1;
    std::vector<uint64_t> z(size);
    tbb::parallel_for(uint64_t(0), size, [&](uint64_t i) {
        z[i] = from + i;
    }); // instead of std::iota(z.begin(), z.end(), from);
    tbb::parallel_sort(z.begin(), z.end(), cmp_lcg); // instead of std::sort(z.begin(), z.end(), cmp_lcg);
    return z;
}

为了通过多线程来提高性能,在我的i7- 3610 QM上计算1亿个索引时,顺序地使用了15925.91 ms,并行地使用了3666.21 ms(在8个硬件线程上)。
在使用Intel Xeon Platinum 8160处理器的计算群集上,我测量了(#cpu,duration[ms])结果(1,19174.65)(2,9862.29)(4,5580.47)(8,3402.05)(12,2119.28)(24,1606.78)(48,1330.20)
还应该注意的是,当将cmp_lcg转换为lambda函数时,代码得到了更好的优化,运行速度也快得多。例如auto cmp_lcg = [](uint64_t a, uint64_t b) -> bool { return lcg(a) < lcg(b); };。这样,它在我的i7- 3610 QM上的2608.15 ms上表现最好。当将全局变量mc声明为constexpr,或者使它们成为局部变量或文字变量时,可以达到更好的性能,这导致持续时间为2542.14 ms

  • 多项式趋势线distributeIndices(42, 42+99999999)的平均过程:

但是当我们看实际的分布时,很明显它不是随机的,而是有序的:

  • 正面; distributeIndices(42, 42+99999999)[0, ..., 1999],多项式趋势线:

该解决方案对于分布应该看起来是随机的任务是不正确的,但由于其大多数是随机分布,它对于前面提到的MPI用例特别有用。因此,如果没有任何完全正确的解决方案,它也将成为一个可接受的答案-只要不会给出任何合理的范围,其中它具有非均匀分布。这里的 Plausible 意味着,算法将运行至少几天的值可以被忽略。
因子0xd1342543de82ef95显示了spectral test的一些弱点,我还没有找到不使用0x9e3779b97f4a7c15的理由。
在这一切之后,应该清楚任务是合并意味着什么
1.感知的随机性
1.均匀双射分布
1.合理的任意间隔,其中
1.高性能。
我很好奇,是否存在任何正确和良好的解决方案的问题!
对这个问题的否定回答,加上相应的证明,当然也是可以接受的。
甚至比distributeIndices(uint64_t, uint64_t) -> vector<uint64_t>更好的方法是不必创建向量,而只是以伪随机顺序迭代索引,但这需要每个伪随机索引可以从其实际索引有效地计算(而无需迭代它之前的所有元素)。如果这是可能的,我会感到惊讶,但我很乐意感到惊讶。这样的方法总是被认为比向量构造的方法更好,并且通过迭代1亿个索引的持续时间来相互比较。

dgtucam1

dgtucam11#

一个简单的想法:用增量索引填充数组,然后-使用自己的,独立于系统的随机生成器对其进行 Shuffle 。
您没有请求加密保护的置换,在本例中我只使用了一个简单的LCG。如果你需要保持你的 Shuffle 加密安全,我建议你使用RC4。它在安全性和性能之间取得了很好的平衡。

#include <vector>
#include <algorithm>
#include <stdint.h>
#include <stdio.h>

// LCG parameters from: https://nuclear.llnl.gov/CNP/rng/rngman/node4.html
std::vector<uint64_t> distributeIndices(uint64_t lo, uint64_t hi) {
    uint64_t size = hi - lo + 1;
    std::vector<uint64_t> vec(size);
    for(uint64_t i = 0; i < size; i++)
        vec[i] = i + lo;
    uint64_t rnd = size ^ 0xBabeCafeFeedDad;
    for(uint64_t i = size - 1; i != 0; i--) {
        rnd = rnd * 2862933555777941757ULL + 3037000493;
        uint64_t j = rnd % i;
        uint64_t tmp = vec[i]; vec[i] = vec[j]; vec[j] = tmp;
    }
    return std::move(vec);
}

int main(int argc, char **argv) {
    uint64_t lo = atoll(argv[1]);
    uint64_t hi = atoll(argv[2]);

    std::vector<uint64_t> vec = distributeIndices(lo, hi);
    for (uint64_t x : vec)
        printf("%lu\n", x);
    return 0;
}
disho6za

disho6za2#

最简单的方法是使用一个LCG和一个整数[0]对它进行排序。.264)到自身的唯一Map,如果LCG参数服从Hull-Dobell定理条件。良好的光谱参数取自 * Computationally easy, spectrally good multipliers for congruential pseudorandom number generators *。
你可以很容易地将它应用于TBB向量和并行排序。
沿着

#include <algorithm>
#include <cstdint>
#include <iostream>
#include <numeric>
#include <vector>

#define func auto

uint64_t m = 0xd1342543de82ef95ULL; // taken from https://arxiv.org/pdf/2001.05304.pdf
uint64_t c = 0x1ULL;

inline func lcg(uint64_t xi) -> uint64_t { // as LCG as it gets
    return m*xi + c;
}

inline func cmp_lcg(uint64_t a, uint64_t b) -> bool {
    return lcg(a) < lcg(b);
}

func distributeIndices(uint64_t from, uint64_t to) -> std::vector<uint64_t> {
    std::vector<uint64_t> z(to - from + 1);

    std::iota(z.begin(), z.end(), from);

    std::sort(z.begin(), z.end(), cmp_lcg);

    return z;
}

static const char NL = '\n';

func main() -> int {

    auto q = distributeIndices(7, 23);

    for(auto v: q)
        std::cout << v << " " << lcg(v) << NL;
    std::cout << NL;

    return 0;
}

相关问题