c++ ROS1回调线程行为:访问共享数据的两个回调

sr4lhrrt  于 2023-02-01  发布在  其他
关注(0)|答案(1)|浏览(257)

我的问题是回调是线程安全的吗?对于可能访问和潜在修改共享数据的多个回调,添加互斥锁是一个好的实践吗?
我正在测试运行两个线程的行为,这两个线程带有两个访问和修改共享数据的回调函数。
我发现了一个类似的帖子here,但在我的简单实验中,我期待着发生一些类似竞态条件的事情,但它没有发生。
我编写了一个发布者节点,它可以在两个线程中发布两条String消息,每条消息将以不同的频率发布一秒钟。

#include <boost/thread.hpp>
#include <ros/ros.h>
#include <std_msgs/String.h>

int main(int argc, char **argv)
{
  ros::init(argc, argv, "AB_topic_pub");

  ros::NodeHandle nh;

  ros::Publisher pub_a = nh.advertise<std_msgs::String>("msg_a", 200);
  ros::Publisher pub_b = nh.advertise<std_msgs::String>("msg_b", 200);

  int COUNT_A = 0, COUNT_B = 0;

  ros::Duration(2.0).sleep();

  boost::thread pub_thread_a(
    [&]()
    {
      ROS_INFO("Publishing A for one second.");
      ros::Time start_time = ros::Time::now();
      ros::Rate freq(1000);
      while (ros::Time::now() - start_time < ros::Duration(1.0)) {
        std_msgs::String MSG_A;
        MSG_A.data = "A" + std::to_string(COUNT_A);
        pub_a.publish(MSG_A);
        freq.sleep();
        COUNT_A++;
      }
    }
  );
  boost::thread pub_thread_b(
    [&]()
    {
      ROS_INFO("Publishing B for one second.");
      ros::Time start_time = ros::Time::now();
      ros::Rate freq(200);
      while (ros::Time::now() - start_time < ros::Duration(1.0)) {
        std_msgs::String MSG_B;
        MSG_B.data = "B" + std::to_string(COUNT_B);
        pub_a.publish(MSG_B);
        freq.sleep();
        COUNT_B++;
      }
    }
  );

  pub_thread_a.join();
  pub_thread_b.join();

  std::cout << "A COUNT: " << COUNT_A << std::endl;
  std::cout << "B COUNT: " << COUNT_B << std::endl;
}

我写了一个有两个回调队列的订阅者节点。在下面的代码中,我想计算回调总共被调用了多少次。我定义了两个变量COUNT_WO_LOCK,没有互斥锁保护,COUNT_W_LOCK有互斥锁保护。

#include <iostream>

#include <boost/function.hpp>
#include <boost/atomic/atomic.hpp>
#include <boost/thread.hpp>
#include <ros/callback_queue.h>
#include <ros/ros.h>
#include <std_msgs/String.h>

int main(int argc, char **argv)
{
  ros::init(argc, argv, "callback_lock_test");

  ros::NodeHandle nh_a;
  ros::NodeHandle nh_b;

  ros::CallbackQueue cq_a, cq_b;

  nh_a.setCallbackQueue(&cq_a);
  nh_b.setCallbackQueue(&cq_b);

  int COUNT_WO_LOCK;
  COUNT_WO_LOCK = 0;
  int COUNT_W_LOCK;
  COUNT_W_LOCK = 0;
  boost::mutex LOCK;

  boost::function<void(const std_msgs::String::ConstPtr &msg)> cb_a = 
      [&](const std_msgs::StringConstPtr &msg)
      {
        LOCK.lock();
        COUNT_W_LOCK++;
        LOCK.unlock();
        COUNT_WO_LOCK++;

        ROS_INFO("[Thread ID: %s] I am A, heard: [%s]", boost::lexical_cast<std::string>(boost::this_thread::get_id()).c_str() , msg->data.c_str());
      };
  boost::function<void(const std_msgs::String::ConstPtr &msg)> cb_b = 
      [&](const std_msgs::StringConstPtr &msg)
      {
        LOCK.lock();
        COUNT_W_LOCK++;
        LOCK.unlock();
        COUNT_WO_LOCK++;

        ROS_WARN("\t\t[Thread ID: %s] I am B, heard: [%s]", boost::lexical_cast<std::string>(boost::this_thread::get_id()).c_str() , msg->data.c_str());
      };

  

  // A pub for one second at 1000Hz
  ros::Subscriber sub_a = nh_a.subscribe<std_msgs::String>("msg_a", 200, 
      cb_a, ros::VoidConstPtr(), ros::TransportHints().tcpNoDelay(true));
  // B pub for one second at 200Hz
  ros::Subscriber sub_b = nh_b.subscribe<std_msgs::String>("msg_b", 200, 
      cb_b, ros::VoidConstPtr(), ros::TransportHints().tcpNoDelay(true));
  
  boost::thread spin_thread_a(
      [&]()
      {
        ROS_ERROR("\t\t\t\t\t [Spinner Thead ID: %s]", boost::lexical_cast<std::string>(boost::this_thread::get_id()).c_str());
        while (!sub_a.getNumPublishers()) { }
        while (ros::ok())
          cq_a.callAvailable(ros::WallDuration());
      }
  );
  boost::thread spin_thread_b(
      [&]()
      {
        ROS_ERROR("\t\t\t\t\t [Spinner Thead ID: %s]", boost::lexical_cast<std::string>(boost::this_thread::get_id()).c_str());
        while (!sub_b.getNumPublishers()) { }
        while (ros::ok())
          cq_b.callAvailable(ros::WallDuration());
      }
  );

  spin_thread_a.join();
  spin_thread_b.join();

  if (ros::isShuttingDown()) {
    std::cout << "LOCKED COUNT: " << COUNT_W_LOCK << std::endl;
    std::cout << "UNLOCKED COUNT: " << COUNT_WO_LOCK << std::endl;
  }
}

我先启动订阅者节点,然后启动发布者节点,我期待COUNT_W_LOCK1200COUNT_WO_LOCK<1200,但实际上它们都是1200

doinxwow

doinxwow1#

正如在评论中提到的,回调在ROS中是线程安全的。我认为问题在于你所描述的与你所期望的可能有点不同。简短的回答是,对于一个回调,即使你一次发布了多条消息,在给定的时间内也只会有一次回调的执行;也就是说,两个线程不会同时执行回调。这是因为后端ROS用户使用队列。因此,当回调执行时,最后一条消息会从队列中弹出。
请注意,在线程返回之前 * 访问 * 数据存在潜在的线程安全问题。这不是您的代码所做的事情,但如果主线程在回调的同时编辑或访问数据,则可能会出现问题。

相关问题