我的问题是回调是线程安全的吗?对于可能访问和潜在修改共享数据的多个回调,添加互斥锁是一个好的实践吗?
我正在测试运行两个线程的行为,这两个线程带有两个访问和修改共享数据的回调函数。
我发现了一个类似的帖子here,但在我的简单实验中,我期待着发生一些类似竞态条件的事情,但它没有发生。
我编写了一个发布者节点,它可以在两个线程中发布两条String
消息,每条消息将以不同的频率发布一秒钟。
#include <boost/thread.hpp>
#include <ros/ros.h>
#include <std_msgs/String.h>
int main(int argc, char **argv)
{
ros::init(argc, argv, "AB_topic_pub");
ros::NodeHandle nh;
ros::Publisher pub_a = nh.advertise<std_msgs::String>("msg_a", 200);
ros::Publisher pub_b = nh.advertise<std_msgs::String>("msg_b", 200);
int COUNT_A = 0, COUNT_B = 0;
ros::Duration(2.0).sleep();
boost::thread pub_thread_a(
[&]()
{
ROS_INFO("Publishing A for one second.");
ros::Time start_time = ros::Time::now();
ros::Rate freq(1000);
while (ros::Time::now() - start_time < ros::Duration(1.0)) {
std_msgs::String MSG_A;
MSG_A.data = "A" + std::to_string(COUNT_A);
pub_a.publish(MSG_A);
freq.sleep();
COUNT_A++;
}
}
);
boost::thread pub_thread_b(
[&]()
{
ROS_INFO("Publishing B for one second.");
ros::Time start_time = ros::Time::now();
ros::Rate freq(200);
while (ros::Time::now() - start_time < ros::Duration(1.0)) {
std_msgs::String MSG_B;
MSG_B.data = "B" + std::to_string(COUNT_B);
pub_a.publish(MSG_B);
freq.sleep();
COUNT_B++;
}
}
);
pub_thread_a.join();
pub_thread_b.join();
std::cout << "A COUNT: " << COUNT_A << std::endl;
std::cout << "B COUNT: " << COUNT_B << std::endl;
}
我写了一个有两个回调队列的订阅者节点。在下面的代码中,我想计算回调总共被调用了多少次。我定义了两个变量COUNT_WO_LOCK
,没有互斥锁保护,COUNT_W_LOCK
有互斥锁保护。
#include <iostream>
#include <boost/function.hpp>
#include <boost/atomic/atomic.hpp>
#include <boost/thread.hpp>
#include <ros/callback_queue.h>
#include <ros/ros.h>
#include <std_msgs/String.h>
int main(int argc, char **argv)
{
ros::init(argc, argv, "callback_lock_test");
ros::NodeHandle nh_a;
ros::NodeHandle nh_b;
ros::CallbackQueue cq_a, cq_b;
nh_a.setCallbackQueue(&cq_a);
nh_b.setCallbackQueue(&cq_b);
int COUNT_WO_LOCK;
COUNT_WO_LOCK = 0;
int COUNT_W_LOCK;
COUNT_W_LOCK = 0;
boost::mutex LOCK;
boost::function<void(const std_msgs::String::ConstPtr &msg)> cb_a =
[&](const std_msgs::StringConstPtr &msg)
{
LOCK.lock();
COUNT_W_LOCK++;
LOCK.unlock();
COUNT_WO_LOCK++;
ROS_INFO("[Thread ID: %s] I am A, heard: [%s]", boost::lexical_cast<std::string>(boost::this_thread::get_id()).c_str() , msg->data.c_str());
};
boost::function<void(const std_msgs::String::ConstPtr &msg)> cb_b =
[&](const std_msgs::StringConstPtr &msg)
{
LOCK.lock();
COUNT_W_LOCK++;
LOCK.unlock();
COUNT_WO_LOCK++;
ROS_WARN("\t\t[Thread ID: %s] I am B, heard: [%s]", boost::lexical_cast<std::string>(boost::this_thread::get_id()).c_str() , msg->data.c_str());
};
// A pub for one second at 1000Hz
ros::Subscriber sub_a = nh_a.subscribe<std_msgs::String>("msg_a", 200,
cb_a, ros::VoidConstPtr(), ros::TransportHints().tcpNoDelay(true));
// B pub for one second at 200Hz
ros::Subscriber sub_b = nh_b.subscribe<std_msgs::String>("msg_b", 200,
cb_b, ros::VoidConstPtr(), ros::TransportHints().tcpNoDelay(true));
boost::thread spin_thread_a(
[&]()
{
ROS_ERROR("\t\t\t\t\t [Spinner Thead ID: %s]", boost::lexical_cast<std::string>(boost::this_thread::get_id()).c_str());
while (!sub_a.getNumPublishers()) { }
while (ros::ok())
cq_a.callAvailable(ros::WallDuration());
}
);
boost::thread spin_thread_b(
[&]()
{
ROS_ERROR("\t\t\t\t\t [Spinner Thead ID: %s]", boost::lexical_cast<std::string>(boost::this_thread::get_id()).c_str());
while (!sub_b.getNumPublishers()) { }
while (ros::ok())
cq_b.callAvailable(ros::WallDuration());
}
);
spin_thread_a.join();
spin_thread_b.join();
if (ros::isShuttingDown()) {
std::cout << "LOCKED COUNT: " << COUNT_W_LOCK << std::endl;
std::cout << "UNLOCKED COUNT: " << COUNT_WO_LOCK << std::endl;
}
}
我先启动订阅者节点,然后启动发布者节点,我期待COUNT_W_LOCK
是1200
,COUNT_WO_LOCK
是<1200
,但实际上它们都是1200
。
1条答案
按热度按时间doinxwow1#
正如在评论中提到的,回调在ROS中是线程安全的。我认为问题在于你所描述的与你所期望的可能有点不同。简短的回答是,对于一个回调,即使你一次发布了多条消息,在给定的时间内也只会有一次回调的执行;也就是说,两个线程不会同时执行回调。这是因为后端ROS用户使用队列。因此,当回调执行时,最后一条消息会从队列中弹出。
请注意,在线程返回之前 * 访问 * 数据存在潜在的线程安全问题。这不是您的代码所做的事情,但如果主线程在回调的同时编辑或访问数据,则可能会出现问题。