c++ 随机boost::interprocess_exception::library_error on boost::interprocess::message_queue

xkftehaa  于 2024-01-09  发布在  其他
关注(0)|答案(1)|浏览(250)

我已经使用消息队列为IPC创建了一个模板类。
我在无限while循环中运行我的程序(称为主循环)。
我通过以太网从各种子系统(传感器)收集数据,并使用消息队列将接收到的数据传递给适当的进程(它们是多个不同的进程,可以充当数据接收器,每个进程都有自己的消息队列)。
我刚刚运行了程序,没有执行任何活动。这是唯一运行的程序,每次运行前我都会重新启动操作系统。
程序只是在while循环中运行,其中所有标志都设置为false;因此程序只是运行一个空循环。
我随机得到boost::interprocess_exception::library_error。因为他们没有活动,我希望他们应该没有错误。
我注解掉了以太网相关的代码,但仍然得到同样的错误。
我在声明中得到错误:

  1. if (primaryNode == true)
  2. {
  3. this->mSecondaryToPrimaryMessageQueue->receive(
  4. &receiveData,
  5. sizeof(receiveData),
  6. receiveLength,
  7. priority
  8. );
  9. }
  10. else
  11. {
  12. this->mPrimaryToSecondaryMessageQueue->receive(
  13. &receiveData,
  14. sizeof(receiveData),
  15. receiveLength,
  16. priority
  17. );
  18. }

字符串
我尝试将primaryNode设置为true或false。我得到了相同的错误。
代码:

ipc.hpp

  1. #pragma once
  2. #include <thread>
  3. #include <string>
  4. #include <atomic>
  5. #include <memory>
  6. #include <variant>
  7. #include <optional>
  8. #include <iostream>
  9. #include <functional>
  10. #include <boost/lockfree/spsc_queue.hpp>
  11. #include <boost/interprocess/ipc/message_queue.hpp>
  12. /// @brief
  13. /// @tparam T1 Specifies the data-type that has to be sent
  14. /// @tparam T2 Specifies the data-type that has will be received
  15. /// @tparam primaryNode Denotes if the RTP is the primaryNode owner/creater of the message queue
  16. template<typename T1, typename T2, bool primaryNode>
  17. class Ipc
  18. {
  19. private:
  20. static const std::uint8_t MAX_MESSAGE_DEPTH = 5; //Specifies the number of messages will the message queue hold
  21. using callback_t = std::function<void(void)>;
  22. callback_t mCallback;
  23. std::unique_ptr<boost::interprocess::message_queue> mPrimaryToSecondaryMessageQueue;
  24. std::unique_ptr<boost::interprocess::message_queue> mSecondaryToPrimaryMessageQueue;
  25. std::string mPrimaryToSecondaryMessageQueueName;
  26. std::string mSecondaryToPrimaryMessageQueueName;
  27. std::thread mReceiveThread;
  28. std::atomic_bool mExitReceiveThread{ false };
  29. boost::lockfree::spsc_queue<T2, boost::lockfree::capacity<MAX_MESSAGE_DEPTH>> mReceiveDataQueue;
  30. void listen(void);
  31. public:
  32. Ipc() {}
  33. bool open(const std::string& queueName);
  34. bool close(void);
  35. bool send(const T1& data, std::uint32_t priority = 10);
  36. std::optional<T2> receive(void);
  37. bool register_callback(callback_t callback_implementation);
  38. bool isDataAvailableInReceiveDataQueue(void) const;
  39. };
  40. template<typename T1, typename T2, bool primaryNode>
  41. inline void Ipc<T1, T2, primaryNode>::listen(void)
  42. {
  43. T2 receiveData;//Buffer to store received data
  44. std::uint64_t receiveLength;
  45. std::uint32_t priority;
  46. while(this->mExitReceiveThread.load() == false)
  47. {
  48. try
  49. {
  50. std::memset(&receiveData, 0, sizeof(receiveData)); //Initialize buffer to 0
  51. receiveLength = 0; //Initialize read length to 0
  52. priority = 0; //Initialize priority to 0
  53. if (primaryNode == true)
  54. {
  55. this->mSecondaryToPrimaryMessageQueue->receive(
  56. &receiveData,
  57. sizeof(receiveData),
  58. receiveLength,
  59. priority
  60. );
  61. }
  62. else
  63. {
  64. this->mPrimaryToSecondaryMessageQueue->receive(
  65. &receiveData,
  66. sizeof(receiveData),
  67. receiveLength,
  68. priority
  69. );
  70. }
  71. this->mReceiveDataQueue.push(receiveData);
  72. this->mCallback();
  73. }
  74. catch (const std::exception& ex)
  75. {
  76. std::cout << "Inside Listen Exception\n";
  77. std::cout << ex.what() << std::endl;
  78. }
  79. }
  80. }
  81. template<typename T1, typename T2, bool primaryNode>
  82. inline bool Ipc<T1, T2, primaryNode>::open(const std::string& queueName)
  83. {
  84. try
  85. {
  86. if(primaryNode == true)
  87. {
  88. this->mPrimaryToSecondaryMessageQueueName = queueName + std::string("_out");
  89. this->mSecondaryToPrimaryMessageQueueName = queueName + std::string("_in");
  90. }
  91. else
  92. {
  93. this->mPrimaryToSecondaryMessageQueueName = queueName + std::string("_out");
  94. this->mSecondaryToPrimaryMessageQueueName = queueName + std::string("_in");
  95. }
  96. //Open-Create message queue to send data from primaryNode node to secondary node
  97. this->mPrimaryToSecondaryMessageQueue = std::make_unique<boost::interprocess::message_queue>(
  98. boost::interprocess::open_or_create,
  99. this->mPrimaryToSecondaryMessageQueueName.c_str(),
  100. MAX_MESSAGE_DEPTH,
  101. sizeof(T1)
  102. );
  103. //Open-Create message queue to send data from secondary node to primaryNode node
  104. this->mSecondaryToPrimaryMessageQueue = std::make_unique<boost::interprocess::message_queue>(
  105. boost::interprocess::open_or_create,
  106. this->mSecondaryToPrimaryMessageQueueName.c_str(),
  107. MAX_MESSAGE_DEPTH,
  108. sizeof(T2)
  109. );
  110. //Start Listner Thread
  111. this->mReceiveThread = std::thread(&Ipc::listen, this);
  112. return true;
  113. }
  114. catch (const std::exception& ex)
  115. {
  116. std::cout << ex.what() << std::endl;
  117. return false;
  118. }
  119. }
  120. template<typename T1, typename T2, bool primaryNode>
  121. inline bool Ipc<T1, T2, primaryNode>::close(void)
  122. {
  123. try
  124. {
  125. this->mExitReceiveThread.store(true); //Marked to close thread
  126. boost::interprocess::message_queue::remove(this->mPrimaryToSecondaryMessageQueueName.c_str());//Delete Primary to Secondary Message Queue
  127. boost::interprocess::message_queue::remove(this->mSecondaryToPrimaryMessageQueueName.c_str());//Delete Secondary to Primary Message Queue
  128. }
  129. catch (const std::exception& ex)
  130. {
  131. std::cout << ex.what() << std::endl;
  132. return false;
  133. }
  134. }
  135. template<typename T1, typename T2, bool primaryNode>
  136. inline bool Ipc<T1, T2, primaryNode>::send(const T1& data, std::uint32_t priority)
  137. {
  138. try
  139. {
  140. if (primaryNode == true) //Send message on Primary to Secondary Queue
  141. {
  142. this->mPrimaryToSecondaryMessageQueue->send(&data, sizeof(data), priority);
  143. }
  144. else //Send message on Secondary to Primary Queue
  145. {
  146. this->mSecondaryToPrimaryMessageQueue->send(&data, sizeof(data), priority);
  147. }
  148. return true;
  149. }
  150. catch (const std::exception& ex)
  151. {
  152. std::cout << ex.what() << std::endl;
  153. return false;
  154. }
  155. }
  156. template<typename T1, typename T2, bool primaryNode>
  157. inline std::optional<T2> Ipc<T1, T2, primaryNode>::receive(void)
  158. {
  159. std::optional<T2> data{ std::nullopt };
  160. if (this->mReceiveDataQueue.read_available() > 0) //If data is avaiable, pop first element
  161. {
  162. data = this->mReceiveDataQueue.front();
  163. this->mReceiveDataQueue.pop();
  164. }
  165. else
  166. {
  167. //data = std::nullopt; //Not needed
  168. }
  169. return data;
  170. }
  171. template<typename T1, typename T2, bool primaryNode>
  172. inline bool Ipc<T1, T2, primaryNode>::register_callback(callback_t callbackImplementation)
  173. {
  174. try
  175. {
  176. this->mCallback = callbackImplementation;
  177. return true;
  178. }
  179. catch (const std::exception& ex)
  180. {
  181. std::cerr << ex.what() << '\n';
  182. }
  183. return false;
  184. }
  185. template<typename T1, typename T2, bool primaryNode>
  186. inline bool Ipc<T1, T2, primaryNode>::isDataAvailableInReceiveDataQueue(void) const
  187. {
  188. if (this->mReceiveDataQueue.read_available() > 0) //If data is avaiable
  189. {
  190. return true;
  191. }
  192. else
  193. {
  194. return false;
  195. }
  196. }

main.cpp

  1. #include <ipc.hpp>
  2. #include <iostream>
  3. //P1 stands for Process 1
  4. //P2 stands for Process 2
  5. struct P1ToP2
  6. {
  7. float a;
  8. int b;
  9. };
  10. struct P2ToP1
  11. {
  12. int a;
  13. int b;
  14. };
  15. Ipc<P1ToP2, P2ToP1, false> ipc1; //Global IPC object
  16. void message_queue_data_received(void)
  17. {
  18. if (ipc1.isDataAvailableInReceiveDataQueue() == true)
  19. {
  20. auto s = ipc1.receive();
  21. if (s.has_value() == true)
  22. {
  23. std::cout << "a : " << s->a << "\tb : " << s->b << std::endl;
  24. }
  25. }
  26. }
  27. int main(int argc, char *argv[])
  28. {
  29. bool dataReceivedOnEthernet = false;
  30. ipc1.register_callback(message_queue_data_received);
  31. this->ipc1.open("ipc1");
  32. while(true)
  33. {
  34. if(dataReceivedOnEthernet == true) //Flag set by other thread
  35. {
  36. P1ToP2 p;
  37. p.a = 10.23; //Some Data received over ethernet
  38. p.b = 10; //Some Data received over ethernet
  39. ipc1.send(p); //Send data over IPC
  40. }
  41. //Other Code
  42. }
  43. }


误差

  1. boost::interprocess_exception::library_error

zzwlnbp8

zzwlnbp81#

为什么进程使用不同的消息类型,同时默认它们的大小相同(以及琐碎和标准布局等)。你混淆了各种类型和队列吗?看起来是这样。
如果你能给事物起个好名字会很有帮助。
我将按消息类型分隔队列。按角色命名它们:

  1. // Requests are sent by client to server.
  2. // Responses are sent by server to client.
  3. struct Request { int a, b; };
  4. struct Response { double a; int b; };
  5. using ClientIpc = Ipc<Request, Response, true>;
  6. using ServerIpc = Ipc<Request, Response, false>;

字符串
然后,通过定义Channel类型:

  1. using Prio = uint32_t;
  2. template <typename T> struct Channel {
  3. Channel(std::string name);
  4. ~Channel();
  5. std::tuple<T, Prio> receive(std::stop_token token);
  6. bool send(T const& msg, Prio prio, std::stop_token token);
  7. private:
  8. using Queue = boost::interprocess::message_queue;
  9. std::string name_;
  10. Queue queue_ = open();
  11. Queue open() {
  12. if constexpr (IsClient) {
  13. return {boost::interprocess::open_only, name_.c_str()};
  14. } else {
  15. return {boost::interprocess::open_or_create, name_.c_str(), MAX_DEPTH, sizeof(T)};
  16. }
  17. }
  18. };


现在我们可以简单地说:

  1. Channel<Request> reqQ;
  2. Channel<Response> resQ;


与建筑一样,

  1. Ipc(std::string const& queueName) try
  2. : reqQ(queueName + "_req")
  3. , resQ(queueName + "_res")
  4. , mThread(bind(&Ipc::listen, this, std::placeholders::_1)) {
  5. } catch (std::exception const& ex) {
  6. std::cerr << "Ipc: " << ex.what() << std::endl;
  7. throw;
  8. }


侦听器将接收到的消息排队。类型取决于客户端/服务器模式:

  1. using Incoming = std::conditional_t<IsClient, Response, Request>;
  2. boost::lockfree::spsc_queue<Incoming, boost::lockfree::capacity<MAX_DEPTH>> mInbox;


请注意我是如何选择更安全的jthread和停止令牌来协调线程退出的:

  1. std::jthread mThread;
  2. std::stop_token mToken = mThread.get_stop_token();
  3. void listen(std::stop_token token) {
  4. while (!token.stop_requested()) {
  5. try {
  6. if constexpr (IsClient)
  7. mInbox.push(get<0>(resQ.receive(token)));
  8. else
  9. mInbox.push(get<0>(reqQ.receive(token)));
  10. if (mCallback)
  11. mCallback();
  12. } catch (std::exception const& ex) {
  13. std::cerr << "Listen: " << ex.what() << std::endl;
  14. }
  15. }
  16. }


外部操作看起来更简单,如:

  1. void wait() { mThread.join(); }
  2. void close() {
  3. mThread.request_stop();
  4. if (std::this_thread::get_id() != mThread.get_id())
  5. wait();
  6. }
  7. bool send(Request const& msg, Prio prio = 10) { return reqQ.send(msg, prio, mToken); }
  8. bool send(Response const& msg, Prio prio = 10) { return resQ.send(msg, prio, mToken); }
  9. std::optional<Incoming> consume() {
  10. if (Incoming val; mInbox.consume_one([&val](Incoming res) { val = std::move(res); }))
  11. return val;
  12. return {};
  13. }
  14. void register_callback(callback_t cb) { mCallback = cb; }
  15. bool haveMessage() const { return mInbox.read_available(); }

客户端/服务器示例

让我们定义上面的服务器通过发送a和b/2的平方根来响应Requests

  1. void server() {
  2. ServerIpc ipc(IPC_NAME);
  3. auto handler = [&ipc] {
  4. assert(ipc.haveMessage());
  5. if (std::optional<Request> req = ipc.consume()) {
  6. auto [a, b] = *req;
  7. std::cout << "server received request a:" << a << " b:" << b << std::endl;
  8. if (a == -42 && b == -42) {
  9. std::cout << " -> server handling close request" << std::endl;
  10. ipc.close();
  11. } else {
  12. // send response
  13. ipc.send(Response{sqrt(a), b / 2});
  14. }
  15. }
  16. };
  17. ipc.register_callback(handler);
  18. ipc.wait();
  19. }


这就是全部。注意我们如何添加一个机制来告诉服务器客户端希望它退出(因为服务器拥有资源)。客户端可能看起来像这样:

  1. void client() {
  2. ClientIpc ipc(IPC_NAME);
  3. auto handler = [&ipc] {
  4. assert(ipc.haveMessage());
  5. if (std::optional<Response> res = ipc.consume()) {
  6. auto [a, b] = *res;
  7. std::cout << "client received response a:" << a << " b:" << b << std::endl;
  8. }
  9. };
  10. ipc.register_callback(handler);
  11. for (int i = 0; i < 100; ++i) {
  12. if (rand() % 30 == 0) // Flag set by other thread
  13. ipc.send(Request{i, 2 * i}); // Send request
  14. std::this_thread::sleep_for(10ms);
  15. }
  16. std::cout << "Client sending close command" << std::endl;
  17. ipc.send(Request{-42, -42});
  18. std::cout << "Closing" << std::endl;
  19. ipc.close();
  20. }


它所做的就是发送一些请求,持续大约10秒,然后记录响应。然后它告诉服务器退出,然后关闭。只有服务器会删除队列。
一个简单的主切换客户端/服务器:

  1. int main(int argc, char** argv) {
  2. if (std::set<std::string_view>(argv + 1, argv + argc).contains("server"))
  3. server();
  4. else
  5. client();
  6. std::cout << "Bye" << std::endl;
  7. }

现场演示

Live¹ On Coliru

  • 文件test.h
  1. #pragma once
  2. #include <boost/interprocess/ipc/message_queue.hpp>
  3. #include <boost/lockfree/spsc_queue.hpp>
  4. #include <iostream>
  5. #include <optional>
  6. #include <thread>
  7. using namespace std::chrono_literals;
  8. template <typename Request, typename Response, bool IsClient> class Ipc {
  9. private:
  10. static constexpr uint8_t MAX_DEPTH = 5;
  11. using callback_t = std::function<void()>;
  12. callback_t mCallback;
  13. using Prio = uint32_t;
  14. template <typename T> struct Channel {
  15. Channel(std::string name) : name_(std::move(name)) { //
  16. assert(queue_.get_max_msg_size() == sizeof(T));
  17. }
  18. ~Channel() {
  19. if (!IsClient) {
  20. std::cerr << "Server cleaning up " << name_ << std::endl;
  21. Queue::remove(name_.c_str());
  22. }
  23. }
  24. std::tuple<T, Prio> receive(std::stop_token token) {
  25. size_t len = 0;
  26. Prio prio = 0;
  27. T msg{};
  28. while (!token.stop_requested()) {
  29. auto deadline = std::chrono::steady_clock::now() + 50ms;
  30. if (queue_.timed_receive(&msg, sizeof(msg), len, prio, deadline)) {
  31. assert(len == sizeof(T));
  32. return {std::move(msg), prio};
  33. }
  34. }
  35. throw std::runtime_error("stop requested");
  36. }
  37. bool send(T const& msg, Prio prio, std::stop_token token) {
  38. while (!token.stop_requested()) {
  39. auto deadline = std::chrono::steady_clock::now() + 50ms;
  40. if (queue_.timed_send(&msg, sizeof(msg), prio, deadline))
  41. return true;
  42. }
  43. return false;
  44. }
  45. private:
  46. using Queue = boost::interprocess::message_queue;
  47. std::string name_;
  48. Queue queue_ = open();
  49. Queue open() {
  50. if constexpr (IsClient) {
  51. return {boost::interprocess::open_only, name_.c_str()};
  52. } else {
  53. return {boost::interprocess::open_or_create, name_.c_str(), MAX_DEPTH, sizeof(T)};
  54. }
  55. }
  56. };
  57. Channel<Request> reqQ;
  58. Channel<Response> resQ;
  59. std::jthread mThread;
  60. std::stop_token mToken = mThread.get_stop_token();
  61. using Incoming = std::conditional_t<IsClient, Response, Request>;
  62. boost::lockfree::spsc_queue<Incoming, boost::lockfree::capacity<MAX_DEPTH>> mInbox;
  63. void listen(std::stop_token token) {
  64. while (!token.stop_requested()) {
  65. try {
  66. if constexpr (IsClient)
  67. mInbox.push(get<0>(resQ.receive(token)));
  68. else
  69. mInbox.push(get<0>(reqQ.receive(token)));
  70. if (mCallback)
  71. mCallback();
  72. } catch (std::exception const& ex) {
  73. std::cerr << "Listen: " << ex.what() << std::endl;
  74. }
  75. }
  76. }
  77. public:
  78. Ipc(std::string const& queueName) try
  79. : reqQ(queueName + "_req")
  80. , resQ(queueName + "_res")
  81. , mThread(bind(&Ipc::listen, this, std::placeholders::_1)) {
  82. } catch (std::exception const& ex) {
  83. std::cerr << "Ipc: " << ex.what() << std::endl;
  84. throw;
  85. }
  86. void wait() { mThread.join(); }
  87. void close() {
  88. mThread.request_stop();
  89. if (std::this_thread::get_id() != mThread.get_id())
  90. wait();
  91. }
  92. bool send(Request const& msg, Prio prio = 10) { return reqQ.send(msg, prio, mToken); }
  93. bool send(Response const& msg, Prio prio = 10) { return resQ.send(msg, prio, mToken); }
  94. std::optional<Incoming> consume() {
  95. if (Incoming val; mInbox.consume_one([&val](Incoming res) { val = std::move(res); }))
  96. return val;
  97. return {};
  98. }
  99. void register_callback(callback_t cb) { mCallback = cb; }
  100. bool haveMessage() const { return mInbox.read_available(); }
  101. };

  • 文件test.cpp
  1. #include "test.h"
  2. #include <cmath>
  3. #include <set>
  4. // Requests are sent by client to server.
  5. // Responses are sent by server to client.
  6. struct Request { int a, b; };
  7. struct Response { double a; int b; };
  8. using ClientIpc = Ipc<Request, Response, true>;
  9. using ServerIpc = Ipc<Request, Response, false>;
  10. static std::string IPC_NAME = "so_demo_ipc";
  11. void server() {
  12. ServerIpc ipc(IPC_NAME);
  13. auto handler = [&ipc] {
  14. assert(ipc.haveMessage());
  15. if (std::optional<Request> req = ipc.consume()) {
  16. auto [a, b] = *req;
  17. std::cout << "server received request a:" << a << " b:" << b << std::endl;
  18. if (a == -42 && b == -42) {
  19. std::cout << " -> server handling close request" << std::endl;
  20. ipc.close();
  21. } else {
  22. // send response
  23. ipc.send(Response{sqrt(a), b / 2});
  24. }
  25. }
  26. };
  27. ipc.register_callback(handler);
  28. ipc.wait();
  29. }
  30. void client() {
  31. ClientIpc ipc(IPC_NAME);
  32. auto handler = [&ipc] {
  33. assert(ipc.haveMessage());
  34. if (std::optional<Response> res = ipc.consume()) {
  35. auto [a, b] = *res;
  36. std::cout << "client received response a:" << a << " b:" << b << std::endl;
  37. }
  38. };
  39. ipc.register_callback(handler);
  40. for (int i = 0; i < 100; ++i) {
  41. if (rand() % 30 == 0) // Flag set by other thread
  42. ipc.send(Request{i, 2 * i}); // Send request
  43. std::this_thread::sleep_for(10ms);
  44. }
  45. std::cout << "Client sending close command" << std::endl;
  46. ipc.send(Request{-42, -42});
  47. std::cout << "Closing" << std::endl;
  48. ipc.close();
  49. }
  50. int main(int argc, char** argv) {
  51. if (std::set<std::string_view>(argv + 1, argv + argc).contains("server"))
  52. server();
  53. else
  54. client();
  55. std::cout << "Bye" << std::endl;
  56. }


在本地进行现场演示:
x1c 0d1x的数据
遗憾的是,在线编译器不允许共享内存访问

展开查看全部

相关问题