C++分离线程内存访问错误

zfycwa2u  于 2023-07-01  发布在  其他
关注(0)|答案(2)|浏览(128)

我正在尝试在C++中实现协作函数超时取消,遵循this answer中的建议。想法很简单:如果在单独线程函数中执行超出了其允许的时间限制,则其终止而不写入共享资源。因为它的线程将不再被使用,所以它得到detached
代码如下:

#include <cassert>
#include <chrono>
#include <condition_variable>
#include <future>
#include <memory>
#include <thread>

class Inner
{
      public:
    Inner()
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(150));
    };
    ~Inner(){};
};

static std::unique_ptr<std::runtime_error>
func(std::stop_token stoken, std::shared_ptr<Inner> obj)
{
    try {
        std::shared_ptr<Inner> new_obj(new Inner());
        if (!stoken.stop_requested()) {
            obj.swap(new_obj);
        }
        return std::unique_ptr<std::runtime_error>(nullptr);
    } catch (const std::runtime_error &err) {
        return std::unique_ptr<std::runtime_error>(new std::runtime_error(err.what()));
    }
}

class Wrapper
{
      private:
    std::shared_ptr<Inner> obj = nullptr;

      public:
    Wrapper(){};
    ~Wrapper(){};

    void
    cancellable_function(std::chrono::duration<uint32_t, std::micro> timeout)
    {
        std::packaged_task<std::unique_ptr<std::runtime_error>(std::stop_token, std::shared_ptr<Inner>)> task(
            func);

        auto         result = task.get_future();
        std::jthread thr(std::ref(task), obj);

        if (result.wait_for(timeout) == std::future_status::ready) {
            auto error = result.get();
            if (error != nullptr)
                throw *error;
        } else {
            thr.request_stop();
#ifdef DETACH
            thr.detach();
#else
            thr.join();
#endif
            throw std::runtime_error("motor creation timeout");
        }
    }
};

int
main(void)
{
    auto cls = std::shared_ptr<Wrapper>(new Wrapper());
    try {
        cls->cancellable_function(std::chrono::milliseconds(100));
        assert(false);
    } catch (const std::exception &) {
        assert(true);
    }

    // wait for detached thread to finish
    std::this_thread::sleep_for(std::chrono::seconds(1));

    return 0;
}

在这段代码中,Inner对象的构造超时。当它这样做时,线程thr通过stoken接收停止信号并离开而不修改共享资源obj
当使用join而不是detach构建时,此代码不会引发vargrind错误:

g++ -Wall -Wextra -pedantic -Werror -std=gnu++20 cancel_function.cxx -o cancel_function
valgrind -q --tool=memcheck --leak-check=full --error-exitcode=69 --track-origins=yes --show-reachable=yes ./cancel_function

但是,带有detach的版本显示了大量的内存访问错误(由于问题大小限制,无法插入所有错误):

g++ -D DETACH -Wall -Wextra -pedantic -Werror -std=gnu++20 cancel_function.cxx -o cancel_function
valgrind -q --tool=memcheck --leak-check=full --error-exitcode=69 --track-origins=yes --show-reachable=yes ./cancel_function
==95200== Thread 2:
==95200== Invalid write of size 8
==95200==    at 0x110BEA: std::_Tuple_impl<0ul, std::runtime_error*, std::default_delete<std::runtime_error> >::_Tuple_impl(std::_Tuple_impl<0ul, std::runtime_error*, std::default_delete<std::runtime_error> >&&) (in /tmp/cancel_function)
==95200==    by 0x110C12: std::tuple<std::runtime_error*, std::default_delete<std::runtime_error> >::tuple(std::tuple<std::runtime_error*, std::default_delete<std::runtime_error> >&&) (in /tmp/cancel_function)
==95200==    by 0x110C41: std::__uniq_ptr_impl<std::runtime_error, std::default_delete<std::runtime_error> >::__uniq_ptr_impl(std::__uniq_ptr_impl<std::runtime_error, std::default_delete<std::runtime_error> >&&) (in /tmp/cancel_function)
==95200==    by 0x10F58A: std::__uniq_ptr_data<std::runtime_error, std::default_delete<std::runtime_error>, true, true>::__uniq_ptr_data(std::__uniq_ptr_data<std::runtime_error, std::default_delete<std::runtime_error>, true, true>&&) (in /tmp/cancel_function)
==95200==    by 0x10F5B0: std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> >::unique_ptr(std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> >&&) (in /tmp/cancel_function)
==95200==    by 0x1157E5: std::__future_base::_Result<std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >::_M_set(std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> >&&) (in /tmp/cancel_function)
==95200==    by 0x115484: std::__future_base::_Task_setter<std::unique_ptr<std::__future_base::_Result<std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >, std::__future_base::_Result_base::_Deleter>, std::__future_base::_Task_state<Wrapper::cancellable_function(std::chrono::duration<unsigned int, std::ratio<1l, 1000000l> >)::{lambda(std::stop_token)#1}, std::allocator<int>, std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > (std::stop_token)>::_M_run(std::stop_token&&)::{lambda()#1}, std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >::operator()() const (in /tmp/cancel_function)
==95200==    by 0x115271: std::unique_ptr<std::__future_base::_Result<std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >, std::__future_base::_Result_base::_Deleter> std::__invoke_impl<std::unique_ptr<std::__future_base::_Result<std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >, std::__future_base::_Result_base::_Deleter>, std::__future_base::_Task_setter<std::unique_ptr<std::__future_base::_Result<std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >, std::__future_base::_Result_base::_Deleter>, std::__future_base::_Task_state<Wrapper::cancellable_function(std::chrono::duration<unsigned int, std::ratio<1l, 1000000l> >)::{lambda(std::stop_token)#1}, std::allocator<int>, std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > (std::stop_token)>::_M_run(std::stop_token&&)::{lambda()#1}, std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >&>(std::__invoke_other, std::__future_base::_Task_setter<std::unique_ptr<std::__future_base::_Result<std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >, std::__future_base::_Result_base::_Deleter>, std::__future_base::_Task_state<Wrapper::cancellable_function(std::chrono::duration<unsigned int, std::ratio<1l, 1000000l> >)::{lambda(std::stop_token)#1}, std::allocator<int>, std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > (std::stop_token)>::_M_run(std::stop_token&&)::{lambda()#1}, std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >&) (in /tmp/cancel_function)
...

我不明白,为什么分离的线程在被取消后访问内存。如何在不使用join的情况下修复此代码?

nzk0hqpo

nzk0hqpo1#

问题是task变量只作为std::jthread的引用传入,因此当您分离它时,函数结束,变量将超出范围,底层对象将被删除。然而,一旦函数最终在分离线程中结束,它仍然会尝试访问作为引用传入的task变量,但底层对象已经被删除。
你可以做的是将packaged_task移动到std::jthread中,这样它只会在实际线程结束时被销毁:

std::jthread thr(std::move(task), obj);

然后分离将起作用。

**然而:**您的代码中存在一些没有多大意义的问题:

  • 您正在使用std::unique_ptr<std::runtime_error>,为什么?如果你想 Package 异常,已经有std::exception_ptr可以使用。另外,为什么要显式地使用它呢?std::future可以很好地处理异常,为什么这里要显式处理?
  • .swap()不做你想做的事:它将用local函数变量交换local函数参数-由于两者都是局部变量,这实际上没有影响...当然,你可以交换共享指针的内容(std::swap(*obj, *new_obj);),但这是不建议的,因为:
  • 您的中止检查有一个固有的竞态条件。如果在你启动的线程中,检查注意到它没有被中止,那么主线程在那个时候中止了,然后交换发生了。那可不好

我认为一种更简洁(也更简单)的方法是简单地依赖于std::future来返回您想要创建的值,然后使用该值。如果发生错误,std::future也会将任何异常转发给您。这会让你的代码更简单:

static std::shared_ptr<Inner>
func(std::stop_token token)
{
#if THROW_EXCEPTION
    throw std::runtime_error("Some error happened");
#endif
    std::shared_ptr<Inner> new_obj(new Inner());
    if (token.stop_requested())
        throw std::runtime_error("Aborted");
    return new_obj;
}

class Wrapper
{
      private:
    std::shared_ptr<Inner> obj = nullptr;

      public:
    Wrapper(){};
    ~Wrapper(){};

    void
    cancellable_function(std::chrono::duration<uint32_t, std::micro> timeout)
    {
        std::packaged_task<std::shared_ptr<Inner>(std::stop_token)> task(func);

        auto result = task.get_future();
        std::jthread thr(std::move(task));

        if (result.wait_for(timeout) == std::future_status::ready) {
            // result.get() will throw if there was an exception
            // in the code
            obj = result.get();
        } else {
            thr.request_stop();
#ifdef DETACH
            thr.detach();
#else
            thr.join();
#endif
            throw std::runtime_error("motor creation timeout");
        }
    }
};

然而,如果你不能在中间真正中断操作,只能在最后检查是否使用了stop_token,为什么要为这个复杂性而烦恼呢?使用std::async

static std::shared_ptr<Inner>
func()
{
#if THROW_EXCEPTION
    throw std::runtime_error("Some error happened");
#endif
    std::shared_ptr<Inner> new_obj(new Inner());
    return new_obj;
}

class Wrapper
{
      private:
    std::shared_ptr<Inner> obj = nullptr;

      public:
    Wrapper(){};
    ~Wrapper(){};

    void
    cancellable_function(std::chrono::duration<uint32_t, std::micro> timeout)
    {
        auto result = std::async(func);
        if (result.wait_for(timeout) == std::future_status::ready) {
            // result.get() will throw if there was an exception
            // in the code
            obj = result.get();
        } else {
            throw std::runtime_error("motor creation timeout");
        }
    }
};

当然,如果你在你的例程中有可能的取消点,可以定期检查,那么使用jthread + packaged_task变体可能更好(因为据我所知std::async没有任何取消支持)。

0kjbasz6

0kjbasz62#

(In原则)永远不要使用detach,总是确保线程可以在关机时协作地停止它们的工作。然后与这些线程的停止同步。例如std::thread::join(),std::jthread::~jthread()(C++20),或者std::future/std::async的组合

相关问题