我已经使用消息队列为IPC创建了一个模板类。
我在无限while循环中运行我的程序(称为主循环)。
我通过以太网从各种子系统(传感器)收集数据,并使用消息队列将接收到的数据传递给适当的进程(它们是多个不同的进程,可以充当数据接收器,每个进程都有自己的消息队列)。
我刚刚运行了程序,没有执行任何活动。这是唯一运行的程序,每次运行前我都会重新启动操作系统。
程序只是在while循环中运行,其中所有标志都设置为false;因此程序只是运行一个空循环。
我随机得到boost::interprocess_exception::library_error
。因为他们没有活动,我希望他们应该没有错误。
我注解掉了以太网相关的代码,但仍然得到同样的错误。
我在声明中得到错误:
if (primaryNode == true)
{
this->mSecondaryToPrimaryMessageQueue->receive(
&receiveData,
sizeof(receiveData),
receiveLength,
priority
);
}
else
{
this->mPrimaryToSecondaryMessageQueue->receive(
&receiveData,
sizeof(receiveData),
receiveLength,
priority
);
}
字符串
我尝试将primaryNode设置为true或false。我得到了相同的错误。
代码:
ipc.hpp
#pragma once
#include <thread>
#include <string>
#include <atomic>
#include <memory>
#include <variant>
#include <optional>
#include <iostream>
#include <functional>
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
/// @brief
/// @tparam T1 Specifies the data-type that has to be sent
/// @tparam T2 Specifies the data-type that has will be received
/// @tparam primaryNode Denotes if the RTP is the primaryNode owner/creater of the message queue
template<typename T1, typename T2, bool primaryNode>
class Ipc
{
private:
static const std::uint8_t MAX_MESSAGE_DEPTH = 5; //Specifies the number of messages will the message queue hold
using callback_t = std::function<void(void)>;
callback_t mCallback;
std::unique_ptr<boost::interprocess::message_queue> mPrimaryToSecondaryMessageQueue;
std::unique_ptr<boost::interprocess::message_queue> mSecondaryToPrimaryMessageQueue;
std::string mPrimaryToSecondaryMessageQueueName;
std::string mSecondaryToPrimaryMessageQueueName;
std::thread mReceiveThread;
std::atomic_bool mExitReceiveThread{ false };
boost::lockfree::spsc_queue<T2, boost::lockfree::capacity<MAX_MESSAGE_DEPTH>> mReceiveDataQueue;
void listen(void);
public:
Ipc() {}
bool open(const std::string& queueName);
bool close(void);
bool send(const T1& data, std::uint32_t priority = 10);
std::optional<T2> receive(void);
bool register_callback(callback_t callback_implementation);
bool isDataAvailableInReceiveDataQueue(void) const;
};
template<typename T1, typename T2, bool primaryNode>
inline void Ipc<T1, T2, primaryNode>::listen(void)
{
T2 receiveData;//Buffer to store received data
std::uint64_t receiveLength;
std::uint32_t priority;
while(this->mExitReceiveThread.load() == false)
{
try
{
std::memset(&receiveData, 0, sizeof(receiveData)); //Initialize buffer to 0
receiveLength = 0; //Initialize read length to 0
priority = 0; //Initialize priority to 0
if (primaryNode == true)
{
this->mSecondaryToPrimaryMessageQueue->receive(
&receiveData,
sizeof(receiveData),
receiveLength,
priority
);
}
else
{
this->mPrimaryToSecondaryMessageQueue->receive(
&receiveData,
sizeof(receiveData),
receiveLength,
priority
);
}
this->mReceiveDataQueue.push(receiveData);
this->mCallback();
}
catch (const std::exception& ex)
{
std::cout << "Inside Listen Exception\n";
std::cout << ex.what() << std::endl;
}
}
}
template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::open(const std::string& queueName)
{
try
{
if(primaryNode == true)
{
this->mPrimaryToSecondaryMessageQueueName = queueName + std::string("_out");
this->mSecondaryToPrimaryMessageQueueName = queueName + std::string("_in");
}
else
{
this->mPrimaryToSecondaryMessageQueueName = queueName + std::string("_out");
this->mSecondaryToPrimaryMessageQueueName = queueName + std::string("_in");
}
//Open-Create message queue to send data from primaryNode node to secondary node
this->mPrimaryToSecondaryMessageQueue = std::make_unique<boost::interprocess::message_queue>(
boost::interprocess::open_or_create,
this->mPrimaryToSecondaryMessageQueueName.c_str(),
MAX_MESSAGE_DEPTH,
sizeof(T1)
);
//Open-Create message queue to send data from secondary node to primaryNode node
this->mSecondaryToPrimaryMessageQueue = std::make_unique<boost::interprocess::message_queue>(
boost::interprocess::open_or_create,
this->mSecondaryToPrimaryMessageQueueName.c_str(),
MAX_MESSAGE_DEPTH,
sizeof(T2)
);
//Start Listner Thread
this->mReceiveThread = std::thread(&Ipc::listen, this);
return true;
}
catch (const std::exception& ex)
{
std::cout << ex.what() << std::endl;
return false;
}
}
template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::close(void)
{
try
{
this->mExitReceiveThread.store(true); //Marked to close thread
boost::interprocess::message_queue::remove(this->mPrimaryToSecondaryMessageQueueName.c_str());//Delete Primary to Secondary Message Queue
boost::interprocess::message_queue::remove(this->mSecondaryToPrimaryMessageQueueName.c_str());//Delete Secondary to Primary Message Queue
}
catch (const std::exception& ex)
{
std::cout << ex.what() << std::endl;
return false;
}
}
template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::send(const T1& data, std::uint32_t priority)
{
try
{
if (primaryNode == true) //Send message on Primary to Secondary Queue
{
this->mPrimaryToSecondaryMessageQueue->send(&data, sizeof(data), priority);
}
else //Send message on Secondary to Primary Queue
{
this->mSecondaryToPrimaryMessageQueue->send(&data, sizeof(data), priority);
}
return true;
}
catch (const std::exception& ex)
{
std::cout << ex.what() << std::endl;
return false;
}
}
template<typename T1, typename T2, bool primaryNode>
inline std::optional<T2> Ipc<T1, T2, primaryNode>::receive(void)
{
std::optional<T2> data{ std::nullopt };
if (this->mReceiveDataQueue.read_available() > 0) //If data is avaiable, pop first element
{
data = this->mReceiveDataQueue.front();
this->mReceiveDataQueue.pop();
}
else
{
//data = std::nullopt; //Not needed
}
return data;
}
template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::register_callback(callback_t callbackImplementation)
{
try
{
this->mCallback = callbackImplementation;
return true;
}
catch (const std::exception& ex)
{
std::cerr << ex.what() << '\n';
}
return false;
}
template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::isDataAvailableInReceiveDataQueue(void) const
{
if (this->mReceiveDataQueue.read_available() > 0) //If data is avaiable
{
return true;
}
else
{
return false;
}
}
型
main.cpp
#include <ipc.hpp>
#include <iostream>
//P1 stands for Process 1
//P2 stands for Process 2
struct P1ToP2
{
float a;
int b;
};
struct P2ToP1
{
int a;
int b;
};
Ipc<P1ToP2, P2ToP1, false> ipc1; //Global IPC object
void message_queue_data_received(void)
{
if (ipc1.isDataAvailableInReceiveDataQueue() == true)
{
auto s = ipc1.receive();
if (s.has_value() == true)
{
std::cout << "a : " << s->a << "\tb : " << s->b << std::endl;
}
}
}
int main(int argc, char *argv[])
{
bool dataReceivedOnEthernet = false;
ipc1.register_callback(message_queue_data_received);
this->ipc1.open("ipc1");
while(true)
{
if(dataReceivedOnEthernet == true) //Flag set by other thread
{
P1ToP2 p;
p.a = 10.23; //Some Data received over ethernet
p.b = 10; //Some Data received over ethernet
ipc1.send(p); //Send data over IPC
}
//Other Code
}
}
型
误差
boost::interprocess_exception::library_error
型
1条答案
按热度按时间zzwlnbp81#
为什么进程使用不同的消息类型,同时默认它们的大小相同(以及琐碎和标准布局等)。你混淆了各种类型和队列吗?看起来是这样。
如果你能给事物起个好名字会很有帮助。
我将按消息类型分隔队列。按角色命名它们:
字符串
然后,通过定义Channel类型:
型
现在我们可以简单地说:
型
与建筑一样,
型
侦听器将接收到的消息排队。类型取决于客户端/服务器模式:
型
请注意我是如何选择更安全的
jthread
和停止令牌来协调线程退出的:型
外部操作看起来更简单,如:
型
客户端/服务器示例
让我们定义上面的服务器通过发送a和b/2的平方根来响应
Requests
:型
这就是全部。注意我们如何添加一个机制来告诉服务器客户端希望它退出(因为服务器拥有资源)。客户端可能看起来像这样:
型
它所做的就是发送一些请求,持续大约10秒,然后记录响应。然后它告诉服务器退出,然后关闭。只有服务器会删除队列。
一个简单的主切换客户端/服务器:
型
现场演示
Live¹ On Coliru
test.h
型
test.cpp
型
在本地进行现场演示:
x1c 0d1x的数据
遗憾的是,在线编译器不允许共享内存访问