我正在尝试在C++中实现协作函数超时取消,遵循this answer中的建议。想法很简单:如果在单独线程函数中执行超出了其允许的时间限制,则其终止而不写入共享资源。因为它的线程将不再被使用,所以它得到detached。
代码如下:
#include <cassert>
#include <chrono>
#include <condition_variable>
#include <future>
#include <memory>
#include <thread>
class Inner
{
public:
Inner()
{
std::this_thread::sleep_for(std::chrono::milliseconds(150));
};
~Inner(){};
};
static std::unique_ptr<std::runtime_error>
func(std::stop_token stoken, std::shared_ptr<Inner> obj)
{
try {
std::shared_ptr<Inner> new_obj(new Inner());
if (!stoken.stop_requested()) {
obj.swap(new_obj);
}
return std::unique_ptr<std::runtime_error>(nullptr);
} catch (const std::runtime_error &err) {
return std::unique_ptr<std::runtime_error>(new std::runtime_error(err.what()));
}
}
class Wrapper
{
private:
std::shared_ptr<Inner> obj = nullptr;
public:
Wrapper(){};
~Wrapper(){};
void
cancellable_function(std::chrono::duration<uint32_t, std::micro> timeout)
{
std::packaged_task<std::unique_ptr<std::runtime_error>(std::stop_token, std::shared_ptr<Inner>)> task(
func);
auto result = task.get_future();
std::jthread thr(std::ref(task), obj);
if (result.wait_for(timeout) == std::future_status::ready) {
auto error = result.get();
if (error != nullptr)
throw *error;
} else {
thr.request_stop();
#ifdef DETACH
thr.detach();
#else
thr.join();
#endif
throw std::runtime_error("motor creation timeout");
}
}
};
int
main(void)
{
auto cls = std::shared_ptr<Wrapper>(new Wrapper());
try {
cls->cancellable_function(std::chrono::milliseconds(100));
assert(false);
} catch (const std::exception &) {
assert(true);
}
// wait for detached thread to finish
std::this_thread::sleep_for(std::chrono::seconds(1));
return 0;
}
在这段代码中,Inner
对象的构造超时。当它这样做时,线程thr
通过stoken
接收停止信号并离开而不修改共享资源obj
。
当使用join
而不是detach
构建时,此代码不会引发vargrind错误:
g++ -Wall -Wextra -pedantic -Werror -std=gnu++20 cancel_function.cxx -o cancel_function
valgrind -q --tool=memcheck --leak-check=full --error-exitcode=69 --track-origins=yes --show-reachable=yes ./cancel_function
但是,带有detach
的版本显示了大量的内存访问错误(由于问题大小限制,无法插入所有错误):
g++ -D DETACH -Wall -Wextra -pedantic -Werror -std=gnu++20 cancel_function.cxx -o cancel_function
valgrind -q --tool=memcheck --leak-check=full --error-exitcode=69 --track-origins=yes --show-reachable=yes ./cancel_function
==95200== Thread 2:
==95200== Invalid write of size 8
==95200== at 0x110BEA: std::_Tuple_impl<0ul, std::runtime_error*, std::default_delete<std::runtime_error> >::_Tuple_impl(std::_Tuple_impl<0ul, std::runtime_error*, std::default_delete<std::runtime_error> >&&) (in /tmp/cancel_function)
==95200== by 0x110C12: std::tuple<std::runtime_error*, std::default_delete<std::runtime_error> >::tuple(std::tuple<std::runtime_error*, std::default_delete<std::runtime_error> >&&) (in /tmp/cancel_function)
==95200== by 0x110C41: std::__uniq_ptr_impl<std::runtime_error, std::default_delete<std::runtime_error> >::__uniq_ptr_impl(std::__uniq_ptr_impl<std::runtime_error, std::default_delete<std::runtime_error> >&&) (in /tmp/cancel_function)
==95200== by 0x10F58A: std::__uniq_ptr_data<std::runtime_error, std::default_delete<std::runtime_error>, true, true>::__uniq_ptr_data(std::__uniq_ptr_data<std::runtime_error, std::default_delete<std::runtime_error>, true, true>&&) (in /tmp/cancel_function)
==95200== by 0x10F5B0: std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> >::unique_ptr(std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> >&&) (in /tmp/cancel_function)
==95200== by 0x1157E5: std::__future_base::_Result<std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >::_M_set(std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> >&&) (in /tmp/cancel_function)
==95200== by 0x115484: std::__future_base::_Task_setter<std::unique_ptr<std::__future_base::_Result<std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >, std::__future_base::_Result_base::_Deleter>, std::__future_base::_Task_state<Wrapper::cancellable_function(std::chrono::duration<unsigned int, std::ratio<1l, 1000000l> >)::{lambda(std::stop_token)#1}, std::allocator<int>, std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > (std::stop_token)>::_M_run(std::stop_token&&)::{lambda()#1}, std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >::operator()() const (in /tmp/cancel_function)
==95200== by 0x115271: std::unique_ptr<std::__future_base::_Result<std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >, std::__future_base::_Result_base::_Deleter> std::__invoke_impl<std::unique_ptr<std::__future_base::_Result<std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >, std::__future_base::_Result_base::_Deleter>, std::__future_base::_Task_setter<std::unique_ptr<std::__future_base::_Result<std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >, std::__future_base::_Result_base::_Deleter>, std::__future_base::_Task_state<Wrapper::cancellable_function(std::chrono::duration<unsigned int, std::ratio<1l, 1000000l> >)::{lambda(std::stop_token)#1}, std::allocator<int>, std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > (std::stop_token)>::_M_run(std::stop_token&&)::{lambda()#1}, std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >&>(std::__invoke_other, std::__future_base::_Task_setter<std::unique_ptr<std::__future_base::_Result<std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >, std::__future_base::_Result_base::_Deleter>, std::__future_base::_Task_state<Wrapper::cancellable_function(std::chrono::duration<unsigned int, std::ratio<1l, 1000000l> >)::{lambda(std::stop_token)#1}, std::allocator<int>, std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > (std::stop_token)>::_M_run(std::stop_token&&)::{lambda()#1}, std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >&) (in /tmp/cancel_function)
...
我不明白,为什么分离的线程在被取消后访问内存。如何在不使用join
的情况下修复此代码?
2条答案
按热度按时间nzk0hqpo1#
问题是
task
变量只作为std::jthread
的引用传入,因此当您分离它时,函数结束,变量将超出范围,底层对象将被删除。然而,一旦函数最终在分离线程中结束,它仍然会尝试访问作为引用传入的task
变量,但底层对象已经被删除。你可以做的是将
packaged_task
移动到std::jthread
中,这样它只会在实际线程结束时被销毁:然后分离将起作用。
**然而:**您的代码中存在一些没有多大意义的问题:
std::unique_ptr<std::runtime_error>
,为什么?如果你想 Package 异常,已经有std::exception_ptr
可以使用。另外,为什么要显式地使用它呢?std::future
可以很好地处理异常,为什么这里要显式处理?.swap()
不做你想做的事:它将用local函数变量交换local函数参数-由于两者都是局部变量,这实际上没有影响...当然,你可以交换共享指针的内容(std::swap(*obj, *new_obj);
),但这是不建议的,因为:我认为一种更简洁(也更简单)的方法是简单地依赖于
std::future
来返回您想要创建的值,然后使用该值。如果发生错误,std::future
也会将任何异常转发给您。这会让你的代码更简单:然而,如果你不能在中间真正中断操作,只能在最后检查是否使用了
stop_token
,为什么要为这个复杂性而烦恼呢?使用std::async
:当然,如果你在你的例程中有可能的取消点,可以定期检查,那么使用
jthread
+packaged_task
变体可能更好(因为据我所知std::async
没有任何取消支持)。0kjbasz62#
(In原则)永远不要使用detach,总是确保线程可以在关机时协作地停止它们的工作。然后与这些线程的停止同步。例如std::thread::join(),std::jthread::~jthread()(C++20),或者std::future/std::async的组合