我想合并< code>k个按键排序的成对键/值向量。通常,向量的大小< code>n非常大(例如< code>n
请考虑以下示例 k = 2
:
// Input
keys_1 = [1, 2, 3, 4], values_1 = [11, 12, 13, 14]
keys_2 = [3, 4, 5, 6], values_2 = [23, 24, 25, 26]
// Output
merged_keys = [1, 2, 3, 3, 4, 4, 5, 6], merged_values = [11, 12, 13, 23, 14, 24, 25, 26]
因为< code > _ _ GNU _ parallel::multi way _ merge 是一个高效的< code>k路合并算法,所以我尝试使用一个最先进的zip迭代器(https://github.com/dpellegr/ZipIterator)来“组合”键值对向量。
#include <iostream>
#include <vector>
#include <parallel/algorithm>
#include "ZipIterator.hpp"
int main(int argc, char* argv[]) {
std::vector<int> keys_1 = {1, 2, 3, 4};
std::vector<int> values_1 = {11, 12, 13, 14};
std::vector<int> keys_2 = {3, 4, 5, 6};
std::vector<int> values_2 = {23, 24, 25, 26};
std::vector<int> merged_keys(8);
std::vector<int> merged_values(8);
auto kv_it_1 = Zip(keys_1, values_1);
auto kv_it_2 = Zip(keys_2, values_2);
auto mkv_it = Zip(merged_keys, merged_values);
auto it_pairs = {std::make_pair(kv_it_1.begin(), kv_it_1.end()),
std::make_pair(kv_it_2.begin(), kv_it_2.end())};
__gnu_parallel::multiway_merge(it_pairs.begin(), it_pairs.end(), mkv_it.begin(), 8, std::less<>());
for (size_t i = 0; i < 8; ++i) {
std::cout << merged_keys[i] << ":" << merged_values[i] << (i == 7 ? "\n" : ", ");
}
return 0;
}
但是,我得到了各种编译错误(使用< code>-O3构建):
错误:无法绑定类型为“std::__iterator_traits”的非常数左值引用
错误:无法转换'ZipIter
有没有可能修改< code>ZipIterator使其工作?
或者有没有更有效的方法来合并k
按键排序的成对键/值向量?
考虑的替代方案
KeyValuePair
struct
具有int key
和int value
成员以及运算符
是否可以修改压缩迭代器以使其工作?
是的,但这需要修补 __gnu_parallel::multiway_merge
。错误源是以下行:
/** @brief Dereference operator.
* @return Referenced element. */
typename std::iterator_traits<_RAIter>::value_type&
operator*() const
{ return *_M_current; }
这是 _GuardedIterator
的成员函数 - multiway_merge
实现中使用的辅助结构。它包装了_RAIter
类,在您的情况下是ZipIter
。根据定义,当迭代器被取消引用 (*_M_current
) 时,返回表达式的类型应该是引用
类型。但是,此代码希望它value_type
using reference = ZipRef<std::remove_reference_t<typename std::iterator_traits<IT>::reference>...>;
与(非常讨厌的)向量一起使用的做法相同
因此,<code>ZipIterator</code>没有问题,或者您如何使用算法,这对算法本身来说是一个非常重要的要求。下一个问题是,我们能摆脱它吗?
答案是肯定的。您可以更改_GuardedIterator::operator*()
以返回reference
而不是value_type
// Default value for potentially non-default-constructible types.
_ValueType* __arbitrary_element = 0;
for (_SeqNumber __t = 0; __t < __k; ++__t)
{
if(!__arbitrary_element
&& _GLIBCXX_PARALLEL_LENGTH(__seqs_begin[__t]) > 0)
__arbitrary_element = &(*__seqs_begin[__t].first);
}
这里,元素的地址被认为是某个< code>__arbitrary_element。我们可以存储这个元素的副本,因为我们知道复制< code>ZipRef的成本很低,并且它是默认可构造的:
// Local copy of the element
_ValueType __arbitrary_element_val;
_ValueType* __arbitrary_element = 0;
for (_SeqNumber __t = 0; __t < __k; ++__t)
{
if(!__arbitrary_element
&& _GLIBCXX_PARALLEL_LENGTH(__seqs_begin[__t]) > 0) {
__arbitrary_element_val = *__seqs_begin[__t].first;
__arbitrary_element = &__arbitrary_element_val;
}
}
相同的错误将出现在文件multiseq_selection. h
的几个地方,例如这里和这里。使用类似的技术修复所有这些。
然后,您将看到多个错误,如下图所示:
./parallel/multiway_merge.h:879:29: error: passing ‘const ZipIter<__gnu_cxx::__normal_iterator<int*, std::vector<int, std::allocator<int> > >, __gnu_cxx::__normal_iterator<int*, std::vector<int, std::allocator<int> > > >’ as ‘this’ argument discards qualifiers [-fpermissive]
它们是关于不正确的常数。这是因为您将< code>it_pairs声明为< code>auto,在这个特定的场景中,它将类型推断为< code>std::inializer_list。这是一种非常奇特的类型。例如,它只提供对其成员的常量访问,即使它本身没有声明为const。这就是这些误差的来源。例如,将< code>auto更改为< code>std::vector后,这些错误就消失了。
此时它应该编译查找。只是不要忘记使用 -fopenmp
进行编译,否则您将获得“未定义对'omp_get_thread_num'的引用”错误。
这是我看到的输出:
$ ./a.out
1:11, 2:12, 3:13, 3:23, 4:14, 4:24, 5:25, 6:26
由于您需要较低的内存开销,一个可能的解决方案是让< code>multiway_merge算法只对唯一的范围标识符和范围索引进行操作,并以lambda函数的形式提供比较和复制运算符。这样,合并算法就完全独立于实际使用的容器类型、键和值类型。
这是一个基于此处描述的基于堆的算法的C 17解决方案:
#include <cassert>
#include <cstdint>
#include <functional>
#include <initializer_list>
#include <iostream>
#include <iterator>
#include <queue>
#include <vector>
using range_type = std::pair<std::uint32_t,std::size_t>;
void multiway_merge(
std::initializer_list<std::size_t> range_sizes,
std::function<bool(const range_type&, const range_type&)> compare_func,
std::function<void(const range_type&)> copy_func)
{
// lambda compare function for priority queue of ranges
auto queue_less = [&](const range_type& range1, const range_type& range2) {
// reverse comparison order of range1 and range2 here,
// because we require the smallest element to be on top
return compare_func(range2, range1);
};
// create priority queue from all non-empty ranges
std::priority_queue<
range_type, std::vector<range_type>,
decltype(queue_less)> queue{ queue_less };
for (std::uint32_t range_id = 0; range_id < range_sizes.size(); ++range_id) {
if (std::data(range_sizes)[range_id] > 0) {
queue.emplace(range_id, 0);
}
}
// merge ranges until priority queue is empty
while (!queue.empty()) {
range_type top_range = queue.top();
queue.pop();
copy_func(top_range);
if (++top_range.second != std::data(range_sizes)[top_range.first]) {
// re-insert non-empty range
queue.push(top_range);
}
}
}
int main() {
std::vector<int> keys_1 = { 1, 2, 3, 4 };
std::vector<int> values_1 = { 11, 12, 13, 14 };
std::vector<int> keys_2 = { 3, 4, 5, 6, 7 };
std::vector<int> values_2 = { 23, 24, 25, 26, 27 };
std::vector<int> merged_keys;
std::vector<int> merged_values;
multiway_merge(
{ keys_1.size(), keys_2.size() },
[&](const range_type& left, const range_type& right) {
if (left == right) return false;
switch (left.first) {
case 0:
assert(right.first == 1);
return keys_1[left.second] < keys_2[right.second];
case 1:
assert(right.first == 0);
return keys_2[left.second] < keys_1[right.second];
}
return false;
},
[&](const range_type& range) {
switch (range.first) {
case 0:
merged_keys.push_back(keys_1[range.second]);
merged_values.push_back(values_1[range.second]);
break;
case 1:
merged_keys.push_back(keys_2[range.second]);
merged_values.push_back(values_2[range.second]);
break;
}
});
// copy result to stdout
std::cout << "keys: ";
std::copy(
merged_keys.cbegin(), merged_keys.cend(),
std::ostream_iterator<int>(std::cout, " "));
std::cout << "\nvalues: ";
std::copy(
merged_values.cbegin(), merged_values.cend(),
std::ostream_iterator<int>(std::cout, " "));
std::cout << "\n";
}
该算法的时间复杂度为 O(n log(k)),空间复杂度为 O(k),其中 n 是所有范围的总大小,k 是范围的数量。
所有输入范围的大小都需要作为初始值设定项列表传递。该示例仅传递示例中的两个输入范围。将示例扩展到两个以上的范围很简单。
你将必须实现一个适合你的确切情况下,有这么大的数组,多威胁可能不是那么好,如果你能负担得起分配一个完整或接近完整的数组副本,你可以做的一个优化是使用大页面,并确保你正在访问的内存没有分页(如果你打算满负荷运行,没有交换是不理想的)。
这个简单的低内存示例工作得很好,很难击败顺序 I/O,它的主要瓶颈是使用 realloc,当使用的值从 arrs
替换到 ret
时,每个step_size
都会有多个 realloc
,但只有一个是昂贵的,
ret.reserve()
可能会消耗“大量”时间,因为缩短缓冲区始终可用,但扩展缓冲区可能不可用,并且操作系统可能需要进行多次内存移动。
#include <vector>
#include <chrono>
#include <stdio.h>
template<typename Pair, typename bool REVERSED = true>
std::vector<Pair> multi_merge_lm(std::vector<std::vector<Pair>>& arrs, float step){
size_t final_size = 0, max, i;
for (i = 0; i < arrs.size(); i++){
final_size += arrs[i].size();
}
float original = (float)final_size;
size_t step_size = (size_t)((float)(final_size) * step);
printf("Merge of %zi (%zi bytes) with %zi step size \n",
final_size, sizeof(Pair), step_size
);
printf("Merge operation size %.*f mb + %.*f mb \n",
3, ((float)(sizeof(Pair) * (float)final_size) / 1000000),
3, ((float)(sizeof(Pair) * (float)final_size * step) / 1000000)
);
std::vector<Pair> ret;
while (final_size --> 0){
for (max = 0, i = 0; i < arrs.size(); i++){
// select the next biggest item from all the arrays
if (arrs[i].back().first > arrs[max].back().first){
max = i;
}
}
// This does not actualy resize the vector
// unless the capacity is too small
ret.push_back(arrs[max].back());
arrs[max].pop_back();
// This check could be extracted of the while
// with a unroll and sort to little
for (i = 0; i < arrs.size(); i++){
if (arrs[i].empty()){
arrs[i] = arrs.back();
arrs.pop_back();
break;
}
}
if (ret.size() == ret.capacity()) {
// Remove the used memory from the arrs and
// realloc more to the ret
for (std::vector<Pair>& chunk : arrs){
chunk.shrink_to_fit();
}
ret.reserve(ret.size() + step_size);
// Dont move this to the while loop, it will slow down
// the execution, leave it just for debugging
printf("\rProgress %i%c / Merge size %zi",
(int)((1 - ((float)final_size / original) ) * 100),
'%', ret.size()
);
}
}
printf("\r%*c\r", 40, ' ');
ret.shrink_to_fit();
arrs.clear();
if (REVERSED){
std::reverse(ret.begin(), ret.end());
}
return ret;
}
int main(void) {
typedef std::pair<uint64_t, uint64_t> Pair;
int inc = 1;
int increment = 100000;
int test_size = 40000000;
float step_size = 0.05f;
auto arrs = std::vector<std::vector<Pair>>(5);
for (auto& chunk : arrs){
// makes the arrays big and asymmetric and adds
// some data to check if it works
chunk.resize(test_size + increment * inc++);
for (int i = 0; i < chunk.size(); i++){
chunk[i] = std::make_pair(i, i * -1);
}
}
printf("Generation done \n");
auto start = std::chrono::steady_clock::now();
auto merged = multi_merge_lm<Pair>(arrs, step_size);
auto end = std::chrono::steady_clock::now();
printf("Time taken: %lfs \n",
(std::chrono::duration<double>(end - start)).count()
);
for (size_t i = 1; i < merged.size(); i++){
if (merged[i - 1] > merged[i]){
printf("Miss placed at index: %zi \n", i - 1);
}
}
merged.clear();
return 0;
}
Merge of 201500000 (16 bytes) with 10075000 step size
Merge operation size 3224.000 mb + 161.200 mb
Time taken: 166.197639s
通过一个分析器(在我的例子中是ANDuProf)运行这个测试表明,调整大小是非常昂贵的,你把< code>step_size做得越大,它就变得越有效。
这次重新运行是 0.5 倍,它快 ~2 倍,但现在该函数消耗的内存比以前多 10 倍,您应该记住,此值不是通用的,它们可能会根据您运行的硬件而变化,但比例不会改变那么多。
Merge of 201500000 (16 bytes) with 100750000 step size
Merge operation size 3224.000 mb + 1612.000 mb
Time taken: 72.062857s
你不应该忘记的另外两件事是,< code>std::vector是动态的,它的实际大小可能更大,而且< code>O2并不能对堆内存访问进行太多的优化,如果你不能保证它的安全性,那么指令只能等待。