c++ 如何有效地按键合并k个已排序的成对键/值向量?

mxg2im7a  于 2023-01-22  发布在  其他
关注(0)|答案(5)|浏览(156)

我想按键合并k排序的成对键/值向量。通常,向量的大小n非常大(例如,n >= 4,000,000,000)。
考虑k = 2的以下示例:

// Input
keys_1 = [1, 2, 3, 4], values_1 = [11, 12, 13, 14]
keys_2 = [3, 4, 5, 6], values_2 = [23, 24, 25, 26]

// Output
merged_keys = [1, 2, 3, 3, 4, 4, 5, 6], merged_values = [11, 12, 13, 23, 14, 24, 25, 26]

由于__gnu_parallel::multiway_merge是一种高效的k路合并算法,因此我尝试利用最先进的zip迭代器(https://github.com/dpellegr/ZipIterator)来“合并”键值对向量。

#include <iostream>
#include <vector>
#include <parallel/algorithm>

#include "ZipIterator.hpp"

int main(int argc, char* argv[]) {
  std::vector<int> keys_1   = {1, 2, 3, 4};
  std::vector<int> values_1 = {11, 12, 13, 14};
  std::vector<int> keys_2   = {3, 4, 5, 6};
  std::vector<int> values_2 = {23, 24, 25, 26};

  std::vector<int> merged_keys(8);
  std::vector<int> merged_values(8);

  auto kv_it_1 = Zip(keys_1, values_1);
  auto kv_it_2 = Zip(keys_2, values_2);
  auto mkv_it = Zip(merged_keys, merged_values);

  auto it_pairs = {std::make_pair(kv_it_1.begin(), kv_it_1.end()),
                   std::make_pair(kv_it_2.begin(), kv_it_2.end())};

  __gnu_parallel::multiway_merge(it_pairs.begin(), it_pairs.end(), mkv_it.begin(), 8, std::less<>());
  
  for (size_t i = 0; i < 8; ++i) {
    std::cout << merged_keys[i] << ":" << merged_values[i] << (i == 7 ? "\n" : ", ");
  }

  return 0;
}

但是,我得到各种编译错误(用-O3构建):
错误:无法将类型为“std::__迭代器特性〈ZipIter〈__gnu_cxx::__normal_iterator〈int*,std::向量〈int,std::分配器〉〉,__gnu_cxx::_normal_iterator〈int*,std::向量〈int,std::分配器〉〉,void〉::value_type&' {aka 'std::tuple〈int,int〉&'}”的非常数左值引用绑定到类型为“std::tuple〈int,int〉”的右值
错误:无法将“ZipIter〈__gnu_cxx::__normal_iterator〈整数 ,标准::向量〈整数,标准::分配器〉〉,__gnu_cxx::_normal_iterator〈整数 ,标准::向量〈整数,标准::分配器〉〉::引用 ”{又称“ZipRef〈整数,整数〉”}转换为“_ValueType”{又称“标准::元组〈整数,整数〉”}

是否可以修改ZipIterator以使其正常工作?
或者,是否有更有效的方法按键合并k排序的成对键/值向量?

  • 考虑的替代方案 *

1.使用int keyint value成员以及operator<operator<=运算符定义KeyValuePairstruct。将键/值向量的元素移到std::vector<KeyValuePair>中。在std::vector<KeyValuePair>上调用__gnu_parallel::multiway_merge。将合并后的元素移回键/值向量中。[判定:执行速度慢,内存开销高,即使使用-O3也是如此]
1.使用std::merge(std::execution::par_unseq, kv_it_1.begin(), kv_it_1.end(), kv_it_2.begin(), kv_it_2.end(), mkv_it.begin());代替__gnu_parallel::multiway_merge[判定:仅支持两个键/值向量]

ux6nzvsh

ux6nzvsh1#

是否可以修改ZipIterator使其工作?
可以,但需要修补__gnu_parallel::multiway_merge。错误来源是以下行:

/** @brief Dereference operator.
      *  @return Referenced element. */
      typename std::iterator_traits<_RAIter>::value_type&
      operator*() const
      { return *_M_current; }

这是_GuardedIterator的一个成员函数--multiway_merge实现中使用的一个辅助结构。它 Package 了_RAIter类,在您的例子中是ZipIter。根据定义,当迭代器被取消引用时(*_M_current),则返回表达式的类型应该是reference类型。但是,此代码期望它是value_type&。在大多数情况下,这些都是相同的类型。实际上,当你解引用一个项目时,你希望得到一个对这个项目的引用。然而,这对zip迭代器是不可能的,因为它的元素是虚拟的,它们是在运行中创建的。这就是为什么ZipIterreference类型根本不是一个引用类型,它实际上是一个名为ZipRef的值类型:

using reference = ZipRef<std::remove_reference_t<typename std::iterator_traits<IT>::reference>...>;

与(非常讨厌的)vector<bool>使用的做法类似。
所以,ZipIterator没有问题,或者你如何使用算法,它是算法本身的一个重要需求,下一个问题是,我们能摆脱它吗?
答案是肯定的,你可以修改_GuardedIterator::operator*(),返回reference而不是value_type&,那么你会在下面这一行看到一个编译错误:

// Default value for potentially non-default-constructible types.
      _ValueType* __arbitrary_element = 0;

      for (_SeqNumber __t = 0; __t < __k; ++__t)
        {
          if(!__arbitrary_element
             && _GLIBCXX_PARALLEL_LENGTH(__seqs_begin[__t]) > 0)
            __arbitrary_element = &(*__seqs_begin[__t].first);
        }

这里的一个元素的地址是某个__arbitrary_element的,我们可以存储这个元素的一个副本,因为我们知道ZipRef复制起来很便宜,而且它是默认构造的:

// Local copy of the element
      _ValueType __arbitrary_element_val;
      _ValueType* __arbitrary_element = 0;

      for (_SeqNumber __t = 0; __t < __k; ++__t)
        {
          if(!__arbitrary_element
             && _GLIBCXX_PARALLEL_LENGTH(__seqs_begin[__t]) > 0) {
            __arbitrary_element_val = *__seqs_begin[__t].first;
            __arbitrary_element = &__arbitrary_element_val;
          }
        }

同样的错误会出现在文件multiseq_selection.h中的几个地方,例如here和here。使用类似的技术修复所有错误。
然后,您将看到多个错误,如下所示:

./parallel/multiway_merge.h:879:29: error: passing ‘const ZipIter<__gnu_cxx::__normal_iterator<int*, std::vector<int, std::allocator<int> > >, __gnu_cxx::__normal_iterator<int*, std::vector<int, std::allocator<int> > > >’ as ‘this’ argument discards qualifiers [-fpermissive]

它们是关于const不正确的。它们是由于您将it_pairs声明为auto,在这个特定的场景中,它推导出类型为std::inializer_list。这是一个非常特殊的类型。例如,它只提供对其成员的 constant 访问。即使它本身没有声明为const。这就是这些错误的来源。将auto更改为std::vector,这些错误就会消失。
它应该在这里编译find。只是不要忘记用-fopenmp编译,否则你会得到“undefined reference to 'omp_get_thread_num'”错误。
下面是我看到的输出:

$ ./a.out
1:11, 2:12, 3:13, 3:23, 4:14, 4:24, 5:25, 6:26
8ehkhllq

8ehkhllq2#

由于需要低内存开销,一个可能的解决方案是让multiway_merge算法只对唯一的范围标识符和范围索引进行操作,并将比较和复制操作符作为lambda函数提供,这样合并算法就完全独立于实际使用的容器类型以及键和值类型。
下面是一个C++17解决方案,它基于这里描述的基于堆的算法:

#include <cassert>
#include <cstdint>
#include <functional>
#include <initializer_list>
#include <iostream>
#include <iterator>
#include <queue>
#include <vector>

using range_type = std::pair<std::uint32_t,std::size_t>;

void multiway_merge(
    std::initializer_list<std::size_t> range_sizes,
    std::function<bool(const range_type&, const range_type&)> compare_func,
    std::function<void(const range_type&)> copy_func)
{
    // lambda compare function for priority queue of ranges
    auto queue_less = [&](const range_type& range1, const range_type& range2) {
        // reverse comparison order of range1 and range2 here,
        // because we require the smallest element to be on top
        return compare_func(range2, range1);
    };
    // create priority queue from all non-empty ranges
    std::priority_queue<
        range_type, std::vector<range_type>, 
        decltype(queue_less)> queue{ queue_less };
    for (std::uint32_t range_id = 0; range_id < range_sizes.size(); ++range_id) {
        if (std::data(range_sizes)[range_id] > 0) {
            queue.emplace(range_id, 0);
        }
    }
    // merge ranges until priority queue is empty
    while (!queue.empty()) {
        range_type top_range = queue.top();
        queue.pop();
        copy_func(top_range);
        if (++top_range.second != std::data(range_sizes)[top_range.first]) {
            // re-insert non-empty range
            queue.push(top_range);
        }
    }
}

int main() {
    std::vector<int> keys_1   = { 1, 2, 3, 4 };
    std::vector<int> values_1 = { 11, 12, 13, 14 };
    std::vector<int> keys_2   = { 3, 4, 5, 6, 7 };
    std::vector<int> values_2 = { 23, 24, 25, 26, 27 };

    std::vector<int> merged_keys;
    std::vector<int> merged_values;

    multiway_merge(
        { keys_1.size(), keys_2.size() },
        [&](const range_type& left, const range_type& right) {
            if (left == right) return false;
            switch (left.first) {
                case 0:
                    assert(right.first == 1);
                    return keys_1[left.second] < keys_2[right.second];
                case 1:
                    assert(right.first == 0);
                    return keys_2[left.second] < keys_1[right.second];
            }
            return false;
        },
        [&](const range_type& range) {
            switch (range.first) {
                case 0:
                    merged_keys.push_back(keys_1[range.second]);
                    merged_values.push_back(values_1[range.second]);
                    break;
                case 1:
                    merged_keys.push_back(keys_2[range.second]);
                    merged_values.push_back(values_2[range.second]);
                    break;
            }
        });
    // copy result to stdout
    std::cout << "keys: ";
    std::copy(
        merged_keys.cbegin(), merged_keys.cend(), 
        std::ostream_iterator<int>(std::cout, " "));
    std::cout << "\nvalues: ";
    std::copy(
        merged_values.cbegin(), merged_values.cend(), 
        std::ostream_iterator<int>(std::cout, " "));
    std::cout << "\n";
}

该算法具有 O(nlog(k)) 的时间复杂度和 O(k) 的空间复杂度,其中 n 是所有范围的总大小,k 是范围的数目。
所有输入范围的大小都需要作为初始化列表传递。这个例子只传递了你例子中的两个输入范围。扩展这个例子到两个以上的范围是很简单的。

n53p2ov0

n53p2ov03#

由于我在移动中,没有时间测试这个。这只是一段简明的代码来说明一个可能的解决方案:

class Solution{
    
    std::unordered_map<int,std::set<int>> keyValues;
    std::set<int> outputKeys;
    int outputVectorSize = 0;
    
public:
    
    void addKeyValueSet(const std::vector<int>& keys,const std::vector<int>& values){
        for(int i=0;i<keys.size();i++){
            const int& key = keys.at(i);
            outputKeys.insert(keys.at(i));
            if(keyValues.find(key)==keyValues.end()){
                std::set<int> newSet;
                keyValues[key] = newSet;
            }
            auto& newSet = keyValues[key];
            newSet.insert(values.at(i));
        }
    }
};

看起来您需要对两件事进行排序:

  • 钥匙
  • 如果有多个键值对具有相同的键,则也要对这些值进行排序。

这意味着,两个键都需要排序,如果有多个键-值对具有相同的键,则值也需要排序。
有序集将在您插入值时对其进行排序。一旦添加了所有值,您就可以迭代键,并按排序顺序提取该键的相应值集。
内存方面,所有的值存储一次,键存储两次,性能取决于在有序集中的插入。
编辑:
下面是另一个版本,我将从概念上对它进行图示,因为根据您的需求和资源,实现可能会有很大的不同,而且问题陈述也不是很详细或很好地描述。
这种方法涉及在多个合作节点之间划分密钥空间。每个节点存储总密钥空间的子集。所需要的是计算特定密钥"属于"哪个节点的方法,任何合作节点都应该能够做到这一点。一旦它可以计算节点,它就可以将密钥交给,值对分配给该节点,以便在该节点负责键之间进行排序。最后一步,只需从每个节点收集排序后的键,无论您希望最终收集在何处。
1.因此,你必须首先有一个所有向量或键的源。这个源可以是一个节点,也可以是几个节点,每个节点持有任意数量的键。它甚至不必是每个节点相等的数量。这些源充当"生产者"。源需要有一个公开的函数,消费者可以请求键。如果它们的键是作为整个向量提供的,还是单独的键值对,这完全取决于您的选择。
1.然后你需要一些"消费者"节点,这些消费者只需要知道他们可以请求键值对或向量的源列表,这取决于你的需求。
1.你需要一些节点,让我们调用"排序器",每个节点负责键空间中的一个范围。消费者应该能够调用一个函数,该函数可以"计算"向哪个排序器提供键值对。
1.一旦使用者有了键-值对(在您放弃整个向量的情况下,使用者仍将一次处理一个键-值对),使用者就可以使用"compute"函数来确定哪个使用者是负责排序的特定键。
1.在知道要将键值对交给哪个排序器之后,使用者必须将值交给该排序器,然后由排序器对自己内部的各个键值对进行排序。
1.在没有更多的对可以使用之后,所需要做的就是将值合并在一起。由于每个排序器都将在其键空间的指定范围内包含已排序的值,所以所需要做的就是按顺序查询每个排序器并组装值。这也可以通过多线程/分布式方式完成,因为您所做的就是将每个已排序的集合合并在一起。
由于问题不够详细,没有明确的需求,所以在提问者提出更明确的需求之前,我不会提供代码实现。
祝你好运。

1u4esq0p

1u4esq0p4#

您必须实现一个完全符合您的情况的内存,对于如此大的数组,如果您能够负担得起分配数组的完整或接近完整的副本,则多威胁可能不是那么好,您可以做的一个优化是使用large pages,并确保您访问的内存不是paged(如果您计划满负荷运行,则不使用交换并不理想)。
这个简单的低内存示例工作得很好,很难击败顺序 * i/o *,它的主要瓶颈是使用realloc,当将使用的值从arrs替换到ret时,在每个step_size处生成多个reallocs,但只有一个是昂贵的,X1M,N1X可能消耗"大量"时间,这仅仅是因为缩短缓冲区总是可用的,而扩展缓冲区可能不可用,并且OS可能需要进行多次存储器移动。

#include <vector>
#include <chrono>
#include <stdio.h>

template<typename Pair, typename bool REVERSED = true>
std::vector<Pair> multi_merge_lm(std::vector<std::vector<Pair>>& arrs, float step){
    size_t final_size = 0, max, i;
    for (i = 0; i < arrs.size(); i++){
        final_size += arrs[i].size();
    }

    float original = (float)final_size;
    size_t step_size = (size_t)((float)(final_size) * step);

    printf("Merge of %zi (%zi bytes) with %zi step size \n", 
        final_size, sizeof(Pair), step_size
    );
    printf("Merge operation size %.*f mb + %.*f mb \n",
        3, ((float)(sizeof(Pair) * (float)final_size) / 1000000),
        3, ((float)(sizeof(Pair) * (float)final_size * step) / 1000000)
    );

    std::vector<Pair> ret;
    while (final_size --> 0){

        for (max = 0, i = 0; i < arrs.size(); i++){
            // select the next biggest item from all the arrays
            if (arrs[i].back().first > arrs[max].back().first){
                max = i;
            }
        }

        // This does not actualy resize the vector 
        // unless the capacity is too small
        ret.push_back(arrs[max].back());
        arrs[max].pop_back();

        // This check could be extracted of the while
        // with a unroll and sort to little
        for (i = 0; i < arrs.size(); i++){
            if (arrs[i].empty()){
                arrs[i] = arrs.back();
                arrs.pop_back();
                break;
            }
        }

        if (ret.size() == ret.capacity()) {
            // Remove the used memory from the arrs and
            // realloc more to the ret
            for (std::vector<Pair>& chunk : arrs){
                chunk.shrink_to_fit();
            }
            ret.reserve(ret.size() + step_size);

            // Dont move this to the while loop, it will slow down
            // the execution, leave it just for debugging
            printf("\rProgress %i%c / Merge size %zi", 
                (int)((1 - ((float)final_size / original) ) * 100), 
                '%', ret.size()
            );
        }
    }

    printf("\r%*c\r", 40, ' ');
    ret.shrink_to_fit();
    arrs.clear();

    if (REVERSED){
        std::reverse(ret.begin(), ret.end());
    }

    return ret;
}

int main(void) {

    typedef std::pair<uint64_t, uint64_t> Pair;

    int inc = 1;
    int increment = 100000;
    int test_size = 40000000;
    float step_size = 0.05f;

    auto arrs = std::vector<std::vector<Pair>>(5);
    for (auto& chunk : arrs){

        // makes the arrays big and asymmetric and adds 
        // some data to check if it works
        chunk.resize(test_size + increment * inc++);
        for (int i = 0; i < chunk.size(); i++){
            chunk[i] = std::make_pair(i, i * -1);
        }

    }
    printf("Generation done \n");

    auto start = std::chrono::steady_clock::now();
    auto merged = multi_merge_lm<Pair>(arrs, step_size);
    auto end = std::chrono::steady_clock::now();

    printf("Time taken: %lfs \n", 
        (std::chrono::duration<double>(end - start)).count()
    );
    for (size_t i = 1; i < merged.size(); i++){
        if (merged[i - 1] > merged[i]){
            printf("Miss placed at index: %zi \n", i - 1);
        }
    }

    merged.clear();
    return 0;
}
Merge of 201500000 (16 bytes) with 10075000 step size
Merge operation size 3224.000 mb + 161.200 mb
Time taken: 166.197639s

通过分析器(在我的例子中是ANDuProf)运行这个过程表明,调整大小的开销相当大,step_size越大,效率就越高。


(名称重复,因为它们来自调用相同函数的代码的不同部分,在本例中,是std函数进行的调用)
这次重新运行的速度是0.5倍,大约快了2倍,但现在函数消耗的内存比以前多了10倍,您应该记住,这些值不是通用的,它们可能会根据您运行的硬件而变化,但比例不会变化太多。

Merge of 201500000 (16 bytes) with 100750000 step size
Merge operation size 3224.000 mb + 1612.000 mb
Time taken: 72.062857s

另外两件事你不应该忘记的是,std::vector是动态的,它的实际大小可能更大,O2不能真正做很多优化堆内存访问,如果你不能使它安全,那么指令只能等待。

q35jwt9p

q35jwt9p5#

我几乎不记得这个了,但是你可能会发现它很有帮助--我很确定我见过合并K排序链表的问题。它使用了类似于分治的东西,接近对数时间复杂度。我怀疑有没有可能得到一个更好的时间复杂度。
这背后的逻辑是最小化合并列表的迭代。如果你合并第一和第二个列表,然后将它与第三个合并涉及到更长的合并列表。这个方法避免了这种情况,首先合并所有的小列表,然后移动到(我喜欢称之为)“第二层合并”合并一次合并列表。
这样,如果你的列表的平均长度是n,你最多需要logn个迭代器,结果是K*log(n)复杂度,其中K是你拥有的列表的数量。
很抱歉我说的有点“不太准确”,但是我想你可能会发现这条信息很有用,尽管我对gnu的multiway_merge不熟悉,所以我说的话可能也是无用的。

相关问题