我一直在按照这个paper的指令实现一个无锁数组。目标是创建一个线程安全的、使用无锁操作动态调整大小的数组。
对于小负载,该实现似乎工作得很好。然而,当我尝试同时从两个线程插入2e5元素时,我注意到以下错误:
1.Process finished with exit code 139 (interrupted by signal 11: SIGSEGV).
我使用调试器所能发现的就是我的描述符以某种方式结束为null。
2.
ds(86913,0x16dde7000) malloc: Heap corruption detected, free list is damaged at 0x60000192c000
*** Incorrect guard value: 4329210352
ds(86913,0x16de73000) malloc: *** error for object 0x60000192c000: pointer being freed was not allocated
ds(86913,0x16de73000) malloc: *** set a breakpoint in malloc_error_break to debug
字符串
下面是我的实现:
#ifndef DS_LOCKFREEARRAY_H
#define DS_LOCKFREEARRAY_H
#include <algorithm>
#include <cstddef>
namespace fast_ds {
namespace internal {
template<class T>
class write_descriptor {
public:
T old_value_;
T new_value_;
size_t location_;
bool completed_;
write_descriptor() : location_(0), completed_(true) {}
explicit write_descriptor(T oldValue, T newValue, size_t location, bool completed) : old_value_(oldValue),
new_value_(newValue),
location_(location),
completed_(completed) {}
};
template<class T>
class v_descriptor {
public:
size_t size_ = 0;
write_descriptor<T> writeDescriptor_;
v_descriptor() = default;
explicit v_descriptor(size_t size, write_descriptor<T> w_descriptor) : size_(size),
writeDescriptor_(w_descriptor) {}
};
}
template<class T>
class LockFreeArray {
private:
static constexpr size_t kNumberOfBuckets = 32;
static constexpr size_t kFirstBucketCapacity = 2;
std::atomic<std::atomic<T> *> *data_;
std::shared_ptr<internal::v_descriptor<T>> descriptor_ = std::make_shared<internal::v_descriptor<T>>();
void CompleteWrite(internal::write_descriptor<T> write_descriptor) {
if (!write_descriptor.completed_) {
std::atomic<T> *memoryLocation = At(write_descriptor.location_);
memoryLocation->compare_exchange_weak(write_descriptor.old_value_,
write_descriptor.new_value_);
write_descriptor.completed_ = true;
}
}
inline int HighestBitSet(unsigned int x) const {
return 31 - __builtin_clz(x);
}
void AllocBucket(int bucket) {
int bucketSize = 1 << (bucket + HighestBitSet(kFirstBucketCapacity));
std::atomic<T> *currentBucket = data_[bucket].load();
auto newBucket = new std::atomic<T>[bucketSize];
if (!data_[bucket].compare_exchange_weak(currentBucket, newBucket)) {
delete[] newBucket;
}
}
public:
explicit LockFreeArray() {
data_ = new std::atomic<std::atomic<T> *>[kNumberOfBuckets];
data_[0].store(new std::atomic<T>[kFirstBucketCapacity]);
}
LockFreeArray(const LockFreeArray &other) = delete;
size_t Size() noexcept {
auto current_descriptor = descriptor_;
auto current_size = current_descriptor->size_;
if (!descriptor_->writeDescriptor_.completed_) {
current_size--;
}
return current_size;
}
std::atomic<T> *At(size_t index) const {
auto pos = index + kFirstBucketCapacity;
auto hibit = HighestBitSet(pos);
auto index_in_bucket = (pos ^ (1 << hibit));
return &data_[hibit - HighestBitSet(kFirstBucketCapacity)][index_in_bucket];
}
void PushBack(const T &value) {
internal::v_descriptor<T> *current_descriptor;
std::shared_ptr<internal::v_descriptor<T>> new_descriptor;
std::shared_ptr<internal::v_descriptor<T>> current_descriptor_ptr = descriptor_;
do {
current_descriptor = descriptor_.get();
if (current_descriptor == nullptr) {
continue;
}
current_descriptor_ptr = descriptor_;
int current_size = current_descriptor->size_;
internal::write_descriptor<T> wDescriptor = current_descriptor->writeDescriptor_;
CompleteWrite(wDescriptor);
if (int bucket = HighestBitSet(current_size + kFirstBucketCapacity) -
HighestBitSet(kFirstBucketCapacity); data_[bucket].load() == 0) {
AllocBucket(bucket);
}
auto oldValue = At(current_size)->load();
auto write_op = internal::write_descriptor<T>(oldValue, value,
current_size,
false);
new_descriptor = std::make_shared<internal::v_descriptor<T>>(
internal::v_descriptor(current_size + 1, write_op));
} while (!std::atomic_compare_exchange_strong(&descriptor_, ¤t_descriptor_ptr, new_descriptor));
CompleteWrite(new_descriptor->writeDescriptor_);
}
};
}
#endif
型
这里是主文件:
#include <iostream>
#include <thread>
#include "include/vectors/LockFreeArray.h"
using namespace fast_ds;
using namespace std;
const int N = 100000;
auto v = LockFreeArray<int>();
void f1() {
for(int i = 1; i <= N; ++i) {
v.PushBack(i);
}
}
void f2() {
for(int i = 1 + N; i <= 2 * N; ++i) {
v.PushBack(i);
}
}
int main() {
std::thread thread1(f1);
std::thread thread2(f2);
thread1.join();
thread2.join();
cout << v.Size() << endl;
}
型
附加说明:
1.我使用appleclang17编译M1上的代码。
1.我试着使用std::experimental::atomic_shared_ptr<T>
和atomic<shared_ptr<T>>
(来自clang20),但它们似乎没有被这个编译器或gnu实现。
1.我使用原始指针来使操作尽可能快,但我考虑切换到智能指针。
我愿意听取改进我的执行工作的建议。
1条答案
按热度按时间qcbq4gxm1#
你的问题是这条线。
字符串
当
current_size
是0
时,At()
将返回什么?数组的大小永远不是该数组的有效索引。
使用
-fsanitize=address
reveals this mistake编译代码。