c++ 无锁动态调整阵列实现的问题

z9gpfhce  于 2023-08-09  发布在  其他
关注(0)|答案(1)|浏览(139)

我一直在按照这个paper的指令实现一个无锁数组。目标是创建一个线程安全的、使用无锁操作动态调整大小的数组。
对于小负载,该实现似乎工作得很好。然而,当我尝试同时从两个线程插入2e5元素时,我注意到以下错误:
1.
Process finished with exit code 139 (interrupted by signal 11: SIGSEGV).我使用调试器所能发现的就是我的描述符以某种方式结束为null。
2.

ds(86913,0x16dde7000) malloc: Heap corruption detected, free list is damaged at 0x60000192c000
*** Incorrect guard value: 4329210352
ds(86913,0x16de73000) malloc: *** error for object 0x60000192c000: pointer being freed was not allocated
ds(86913,0x16de73000) malloc: *** set a breakpoint in malloc_error_break to debug

字符串
下面是我的实现:

#ifndef DS_LOCKFREEARRAY_H
#define DS_LOCKFREEARRAY_H

#include <algorithm>
#include <cstddef>

namespace fast_ds {

    namespace internal {
        template<class T>
        class write_descriptor {
        public:
            T old_value_;
            T new_value_;
            size_t location_;
            bool completed_;

            write_descriptor() : location_(0), completed_(true) {}

            explicit write_descriptor(T oldValue, T newValue, size_t location, bool completed) : old_value_(oldValue),
                                                                                                 new_value_(newValue),
                                                                                                 location_(location),
                                                                                                 completed_(completed) {}
        };

        template<class T>
        class v_descriptor {
        public:
            size_t size_ = 0;
            write_descriptor<T> writeDescriptor_;

            v_descriptor() = default;

            explicit v_descriptor(size_t size, write_descriptor<T> w_descriptor) : size_(size),
                                                                                   writeDescriptor_(w_descriptor) {}
        };
    }

    template<class T>
    class LockFreeArray {
    private:
        static constexpr size_t kNumberOfBuckets = 32;
        static constexpr size_t kFirstBucketCapacity = 2;

        std::atomic<std::atomic<T> *> *data_;
        std::shared_ptr<internal::v_descriptor<T>> descriptor_ = std::make_shared<internal::v_descriptor<T>>();

        void CompleteWrite(internal::write_descriptor<T> write_descriptor) {
            if (!write_descriptor.completed_) {
                std::atomic<T> *memoryLocation = At(write_descriptor.location_);
                memoryLocation->compare_exchange_weak(write_descriptor.old_value_,
                                                      write_descriptor.new_value_);

                write_descriptor.completed_ = true;
            }
        }

        inline int HighestBitSet(unsigned int x) const {
            return 31 - __builtin_clz(x);
        }

        void AllocBucket(int bucket) {
            int bucketSize = 1 << (bucket + HighestBitSet(kFirstBucketCapacity));
            std::atomic<T> *currentBucket = data_[bucket].load();
            auto newBucket = new std::atomic<T>[bucketSize];

            if (!data_[bucket].compare_exchange_weak(currentBucket, newBucket)) {
                delete[] newBucket;
            }
        }

    public:
        explicit LockFreeArray() {
            data_ = new std::atomic<std::atomic<T> *>[kNumberOfBuckets];
            data_[0].store(new std::atomic<T>[kFirstBucketCapacity]);
        }

        LockFreeArray(const LockFreeArray &other) = delete;

        size_t Size() noexcept {
            auto current_descriptor = descriptor_;
            auto current_size = current_descriptor->size_;

            if (!descriptor_->writeDescriptor_.completed_) {
                current_size--;
            }

            return current_size;
        }

        std::atomic<T> *At(size_t index) const {
            auto pos = index + kFirstBucketCapacity;
            auto hibit = HighestBitSet(pos);
            auto index_in_bucket = (pos ^ (1 << hibit));
            return &data_[hibit - HighestBitSet(kFirstBucketCapacity)][index_in_bucket];
        }

        void PushBack(const T &value) {
            internal::v_descriptor<T> *current_descriptor;
            std::shared_ptr<internal::v_descriptor<T>> new_descriptor;
            std::shared_ptr<internal::v_descriptor<T>> current_descriptor_ptr = descriptor_;

            do {
                current_descriptor = descriptor_.get();
                if (current_descriptor == nullptr) {
                    continue;
                }
                current_descriptor_ptr = descriptor_;
                int current_size = current_descriptor->size_;
                internal::write_descriptor<T> wDescriptor = current_descriptor->writeDescriptor_;
                CompleteWrite(wDescriptor);

                if (int bucket = HighestBitSet(current_size + kFirstBucketCapacity) -
                                 HighestBitSet(kFirstBucketCapacity); data_[bucket].load() == 0) {
                    AllocBucket(bucket);
                }

                auto oldValue = At(current_size)->load();
                auto write_op = internal::write_descriptor<T>(oldValue, value,
                                                              current_size,
                                                              false);

                new_descriptor = std::make_shared<internal::v_descriptor<T>>(
                        internal::v_descriptor(current_size + 1, write_op));
            } while (!std::atomic_compare_exchange_strong(&descriptor_, &current_descriptor_ptr, new_descriptor));
            CompleteWrite(new_descriptor->writeDescriptor_);
        }
    };
}

#endif


这里是主文件:

#include <iostream>
#include <thread>
#include "include/vectors/LockFreeArray.h"

using namespace fast_ds;
using namespace std;

const int N = 100000;

auto v = LockFreeArray<int>();

void f1() {
    for(int i = 1; i <= N; ++i) {
        v.PushBack(i);
    }
}

void f2() {
    for(int i = 1 + N; i <= 2 * N; ++i) {
        v.PushBack(i);
    }
}

int main() {
    std::thread thread1(f1);
    std::thread thread2(f2);

    thread1.join();
    thread2.join();

    cout << v.Size() << endl;
}


附加说明:
1.我使用appleclang17编译M1上的代码。
1.我试着使用std::experimental::atomic_shared_ptr<T>atomic<shared_ptr<T>>(来自clang20),但它们似乎没有被这个编译器或gnu实现。
1.我使用原始指针来使操作尽可能快,但我考虑切换到智能指针。
我愿意听取改进我的执行工作的建议。

qcbq4gxm

qcbq4gxm1#

你的问题是这条线。

auto oldValue = At(current_size)->load();

字符串
current_size0时,At()将返回什么?
数组的大小永远不是该数组的有效索引。
使用-fsanitize=addressreveals this mistake编译代码。

相关问题