c++ 随机boost::interprocess_exception::library_error on boost::interprocess::message_queue

xkftehaa  于 12个月前  发布在  其他
关注(0)|答案(1)|浏览(184)

我已经使用消息队列为IPC创建了一个模板类。
我在无限while循环中运行我的程序(称为主循环)。
我通过以太网从各种子系统(传感器)收集数据,并使用消息队列将接收到的数据传递给适当的进程(它们是多个不同的进程,可以充当数据接收器,每个进程都有自己的消息队列)。
我刚刚运行了程序,没有执行任何活动。这是唯一运行的程序,每次运行前我都会重新启动操作系统。
程序只是在while循环中运行,其中所有标志都设置为false;因此程序只是运行一个空循环。
我随机得到boost::interprocess_exception::library_error。因为他们没有活动,我希望他们应该没有错误。
我注解掉了以太网相关的代码,但仍然得到同样的错误。
我在声明中得到错误:

if (primaryNode == true)
{
    this->mSecondaryToPrimaryMessageQueue->receive(
        &receiveData,
        sizeof(receiveData),
        receiveLength,
        priority
    );
}
else
{
    this->mPrimaryToSecondaryMessageQueue->receive(
        &receiveData,
        sizeof(receiveData),
        receiveLength,
        priority
    );
}

字符串
我尝试将primaryNode设置为true或false。我得到了相同的错误。
代码:

ipc.hpp

#pragma once

#include <thread>
#include <string>
#include <atomic>
#include <memory>
#include <variant>
#include <optional>
#include <iostream>
#include <functional>
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>

/// @brief 
/// @tparam T1 Specifies the data-type that has to be sent
/// @tparam T2 Specifies the data-type that has will be received
/// @tparam primaryNode Denotes if the RTP is the primaryNode owner/creater of the message queue
template<typename T1, typename T2, bool primaryNode>
class Ipc
{
private:
    static const std::uint8_t MAX_MESSAGE_DEPTH = 5; //Specifies the number of messages will the message queue hold
    using callback_t = std::function<void(void)>;
    callback_t mCallback;
    std::unique_ptr<boost::interprocess::message_queue> mPrimaryToSecondaryMessageQueue;
    std::unique_ptr<boost::interprocess::message_queue> mSecondaryToPrimaryMessageQueue;

    std::string         mPrimaryToSecondaryMessageQueueName;
    std::string         mSecondaryToPrimaryMessageQueueName;

    std::thread         mReceiveThread;
    std::atomic_bool    mExitReceiveThread{ false };
    boost::lockfree::spsc_queue<T2, boost::lockfree::capacity<MAX_MESSAGE_DEPTH>> mReceiveDataQueue;

    void listen(void);
public:
    Ipc() {}
    bool open(const std::string& queueName);
    bool close(void);
    bool send(const T1& data, std::uint32_t priority = 10);
    std::optional<T2> receive(void);
    bool register_callback(callback_t callback_implementation);
    bool isDataAvailableInReceiveDataQueue(void) const;
};

template<typename T1, typename T2, bool primaryNode>
inline void Ipc<T1, T2, primaryNode>::listen(void)
{
    T2                          receiveData;//Buffer to store received data
    std::uint64_t               receiveLength;
    std::uint32_t               priority;
    while(this->mExitReceiveThread.load() == false)
    {
        try
        {
            std::memset(&receiveData, 0, sizeof(receiveData)); //Initialize buffer to 0
            receiveLength = 0; //Initialize read length to 0
            priority = 0; //Initialize priority to 0
            if (primaryNode == true)
            {
                this->mSecondaryToPrimaryMessageQueue->receive(
                    &receiveData,
                    sizeof(receiveData),
                    receiveLength,
                    priority
                );
            }
            else
            {
                this->mPrimaryToSecondaryMessageQueue->receive(
                    &receiveData,
                    sizeof(receiveData),
                    receiveLength,
                    priority
                );
            }
            this->mReceiveDataQueue.push(receiveData);
            this->mCallback();
        }
        catch (const std::exception& ex)
        {
            std::cout << "Inside Listen Exception\n";
            std::cout << ex.what() << std::endl;
        }
    }
}

template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::open(const std::string& queueName)
{
    try
    {
        if(primaryNode == true)
        {
            this->mPrimaryToSecondaryMessageQueueName = queueName + std::string("_out");
            this->mSecondaryToPrimaryMessageQueueName = queueName + std::string("_in");
        }
        else
        {
            this->mPrimaryToSecondaryMessageQueueName = queueName + std::string("_out");
            this->mSecondaryToPrimaryMessageQueueName = queueName + std::string("_in");
        }
        //Open-Create message queue to send data from primaryNode node to secondary node
        this->mPrimaryToSecondaryMessageQueue = std::make_unique<boost::interprocess::message_queue>(
            boost::interprocess::open_or_create,
            this->mPrimaryToSecondaryMessageQueueName.c_str(),
            MAX_MESSAGE_DEPTH,
            sizeof(T1)
        );

        //Open-Create message queue to send data from secondary node to primaryNode node  
        this->mSecondaryToPrimaryMessageQueue = std::make_unique<boost::interprocess::message_queue>(
            boost::interprocess::open_or_create,
            this->mSecondaryToPrimaryMessageQueueName.c_str(),
            MAX_MESSAGE_DEPTH,
            sizeof(T2)
        );

        //Start Listner Thread
        this->mReceiveThread = std::thread(&Ipc::listen, this);
        return true;
    }
    catch (const std::exception& ex)
    {
        std::cout << ex.what() << std::endl;
        return false;
    }
}

template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::close(void)
{
    try
    {
        this->mExitReceiveThread.store(true); //Marked to close thread
        boost::interprocess::message_queue::remove(this->mPrimaryToSecondaryMessageQueueName.c_str());//Delete Primary to Secondary Message Queue
        boost::interprocess::message_queue::remove(this->mSecondaryToPrimaryMessageQueueName.c_str());//Delete Secondary to Primary Message Queue
    }
    catch (const std::exception& ex)
    {
        std::cout << ex.what() << std::endl;
        return false;
    }
}

template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::send(const T1& data, std::uint32_t priority)
{
    try
    {
        if (primaryNode == true) //Send message on Primary to Secondary Queue
        {
            this->mPrimaryToSecondaryMessageQueue->send(&data, sizeof(data), priority);
        }
        else //Send message on Secondary to Primary Queue
        {
            this->mSecondaryToPrimaryMessageQueue->send(&data, sizeof(data), priority);
        }
        return true;
    }
    catch (const std::exception& ex)
    {
        std::cout << ex.what() << std::endl;
        return false;
    }
}

template<typename T1, typename T2, bool primaryNode>
inline std::optional<T2> Ipc<T1, T2, primaryNode>::receive(void)
{
    std::optional<T2> data{ std::nullopt };
    if (this->mReceiveDataQueue.read_available() > 0) //If data is avaiable, pop first element
    {
        data = this->mReceiveDataQueue.front();
        this->mReceiveDataQueue.pop();
    }
    else
    {
        //data = std::nullopt; //Not needed 
    }
    return data;
}

template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::register_callback(callback_t callbackImplementation)
{
    try
    {
        this->mCallback = callbackImplementation;
        return true;
    }
    catch (const std::exception& ex)
    {
        std::cerr << ex.what() << '\n';
    }
    return false;
}

template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::isDataAvailableInReceiveDataQueue(void) const
{
    if (this->mReceiveDataQueue.read_available() > 0) //If data is avaiable
    {
        return true;
    }
    else
    {
        return false;
    }
}

main.cpp

#include <ipc.hpp>
#include <iostream>

//P1 stands for Process 1
//P2 stands for Process 2
struct P1ToP2
{
    float a;
    int b;
};

struct P2ToP1
{
    int a;
    int b;
};

Ipc<P1ToP2, P2ToP1, false> ipc1; //Global IPC object

void message_queue_data_received(void)
{
    if (ipc1.isDataAvailableInReceiveDataQueue() == true)
    {
        auto s = ipc1.receive();
        if (s.has_value() == true)
        {
            std::cout << "a : " << s->a << "\tb : " << s->b << std::endl;
        }

    }
}

int main(int argc, char *argv[])
{
    bool dataReceivedOnEthernet = false;
    
    ipc1.register_callback(message_queue_data_received);
    this->ipc1.open("ipc1");
    
    while(true)
    {
        if(dataReceivedOnEthernet == true) //Flag set by other thread
        {
            P1ToP2 p;
            p.a = 10.23; //Some Data received over ethernet
            p.b = 10; //Some Data received over ethernet
            ipc1.send(p); //Send data over IPC
        }
        //Other Code
    }
}


误差

boost::interprocess_exception::library_error

zzwlnbp8

zzwlnbp81#

为什么进程使用不同的消息类型,同时默认它们的大小相同(以及琐碎和标准布局等)。你混淆了各种类型和队列吗?看起来是这样。
如果你能给事物起个好名字会很有帮助。
我将按消息类型分隔队列。按角色命名它们:

// Requests are sent by client to server.
// Responses are sent by server to client.
struct Request { int a, b; };
struct Response { double a; int b; };
using ClientIpc = Ipc<Request, Response, true>;
using ServerIpc = Ipc<Request, Response, false>;

字符串
然后,通过定义Channel类型:

using Prio = uint32_t;
template <typename T> struct Channel {
    Channel(std::string name);
    ~Channel();

    std::tuple<T, Prio> receive(std::stop_token token);
    bool send(T const& msg, Prio prio, std::stop_token token);

  private:
    using Queue = boost::interprocess::message_queue;
    std::string name_;
    Queue       queue_ = open();

    Queue open() {
        if constexpr (IsClient) {
            return {boost::interprocess::open_only, name_.c_str()};
        } else {
            return {boost::interprocess::open_or_create, name_.c_str(), MAX_DEPTH, sizeof(T)};
        }
    }
};


现在我们可以简单地说:

Channel<Request>  reqQ;
Channel<Response> resQ;


与建筑一样,

Ipc(std::string const& queueName) try
    : reqQ(queueName + "_req")
    , resQ(queueName + "_res")
    , mThread(bind(&Ipc::listen, this, std::placeholders::_1)) {
} catch (std::exception const& ex) {
    std::cerr << "Ipc: " << ex.what() << std::endl;
    throw;
}


侦听器将接收到的消息排队。类型取决于客户端/服务器模式:

using Incoming = std::conditional_t<IsClient, Response, Request>;
boost::lockfree::spsc_queue<Incoming, boost::lockfree::capacity<MAX_DEPTH>> mInbox;


请注意我是如何选择更安全的jthread和停止令牌来协调线程退出的:

std::jthread      mThread;
std::stop_token   mToken = mThread.get_stop_token();

void listen(std::stop_token token) {
    while (!token.stop_requested()) {
        try {
            if constexpr (IsClient)
                mInbox.push(get<0>(resQ.receive(token)));
            else
                mInbox.push(get<0>(reqQ.receive(token)));

            if (mCallback)
                mCallback();
        } catch (std::exception const& ex) {
            std::cerr << "Listen: " << ex.what() << std::endl;
        }
    }
}


外部操作看起来更简单,如:

void wait() { mThread.join(); }

void close() {
    mThread.request_stop();
    if (std::this_thread::get_id() != mThread.get_id())
        wait();
}

bool send(Request  const& msg, Prio prio = 10) { return reqQ.send(msg, prio, mToken); }
bool send(Response const& msg, Prio prio = 10) { return resQ.send(msg, prio, mToken); }

std::optional<Incoming> consume() {
    if (Incoming val; mInbox.consume_one([&val](Incoming res) { val = std::move(res); }))
        return val;
    return {};
}

void register_callback(callback_t cb) { mCallback = cb; }
bool haveMessage() const { return mInbox.read_available(); }

客户端/服务器示例

让我们定义上面的服务器通过发送a和b/2的平方根来响应Requests

void server() {
    ServerIpc ipc(IPC_NAME);

    auto handler = [&ipc] {
        assert(ipc.haveMessage());
        if (std::optional<Request> req = ipc.consume()) {
            auto [a, b] = *req;
            std::cout << "server received request a:" << a << " b:" << b << std::endl;

            if (a == -42 && b == -42) {
                std::cout << " -> server handling close request" << std::endl;
                ipc.close();
            } else {
                // send response
                ipc.send(Response{sqrt(a), b / 2});
            }
        }
    };

    ipc.register_callback(handler);
    ipc.wait();
}


这就是全部。注意我们如何添加一个机制来告诉服务器客户端希望它退出(因为服务器拥有资源)。客户端可能看起来像这样:

void client() {
    ClientIpc ipc(IPC_NAME);

    auto handler = [&ipc] {
        assert(ipc.haveMessage());
        if (std::optional<Response> res = ipc.consume()) {
            auto [a, b] = *res;
            std::cout << "client received response a:" << a << " b:" << b << std::endl;
        }
    };

    ipc.register_callback(handler);

    for (int i = 0; i < 100; ++i) {
        if (rand() % 30 == 0)            // Flag set by other thread
            ipc.send(Request{i, 2 * i}); // Send request

        std::this_thread::sleep_for(10ms);
    }

    std::cout << "Client sending close command" << std::endl;
    ipc.send(Request{-42, -42});

    std::cout << "Closing" << std::endl;
    ipc.close();
}


它所做的就是发送一些请求,持续大约10秒,然后记录响应。然后它告诉服务器退出,然后关闭。只有服务器会删除队列。
一个简单的主切换客户端/服务器:

int main(int argc, char** argv) {
    if (std::set<std::string_view>(argv + 1, argv + argc).contains("server"))
        server();
    else
        client();

    std::cout << "Bye" << std::endl;
}

现场演示

Live¹ On Coliru

  • 文件test.h
#pragma once

 #include <boost/interprocess/ipc/message_queue.hpp>
 #include <boost/lockfree/spsc_queue.hpp>
 #include <iostream>
 #include <optional>
 #include <thread>
 using namespace std::chrono_literals;

 template <typename Request, typename Response, bool IsClient> class Ipc {
   private:
     static constexpr uint8_t MAX_DEPTH = 5;

     using callback_t = std::function<void()>;
     callback_t mCallback;

     using Prio = uint32_t;
     template <typename T> struct Channel {
         Channel(std::string name) : name_(std::move(name)) { //
             assert(queue_.get_max_msg_size() == sizeof(T));
         }

         ~Channel() {
             if (!IsClient) {
                 std::cerr << "Server cleaning up " << name_ << std::endl;
                 Queue::remove(name_.c_str());
             }
         }

         std::tuple<T, Prio> receive(std::stop_token token) {
             size_t len  = 0;
             Prio   prio = 0;
             T      msg{};

             while (!token.stop_requested()) {
                 auto deadline = std::chrono::steady_clock::now() + 50ms;
                 if (queue_.timed_receive(&msg, sizeof(msg), len, prio, deadline)) {
                     assert(len == sizeof(T));
                     return {std::move(msg), prio};
                 }
             }

             throw std::runtime_error("stop requested");
         }

         bool send(T const& msg, Prio prio, std::stop_token token) {
             while (!token.stop_requested()) {
                 auto deadline = std::chrono::steady_clock::now() + 50ms;
                 if (queue_.timed_send(&msg, sizeof(msg), prio, deadline))
                     return true;
             }
             return false;
         }

       private:
         using Queue = boost::interprocess::message_queue;
         std::string name_;
         Queue       queue_ = open();

         Queue open() {
             if constexpr (IsClient) {
                 return {boost::interprocess::open_only, name_.c_str()};
             } else {
                 return {boost::interprocess::open_or_create, name_.c_str(), MAX_DEPTH, sizeof(T)};
             }
         }
     };

     Channel<Request>  reqQ;
     Channel<Response> resQ;
     std::jthread      mThread;
     std::stop_token   mToken = mThread.get_stop_token();

     using Incoming = std::conditional_t<IsClient, Response, Request>;
     boost::lockfree::spsc_queue<Incoming, boost::lockfree::capacity<MAX_DEPTH>> mInbox;

     void listen(std::stop_token token) {
         while (!token.stop_requested()) {
             try {
                 if constexpr (IsClient)
                     mInbox.push(get<0>(resQ.receive(token)));
                 else
                     mInbox.push(get<0>(reqQ.receive(token)));

                 if (mCallback)
                     mCallback();
             } catch (std::exception const& ex) {
                 std::cerr << "Listen: " << ex.what() << std::endl;
             }
         }
     }

   public:
     Ipc(std::string const& queueName) try
         : reqQ(queueName + "_req")
         , resQ(queueName + "_res")
         , mThread(bind(&Ipc::listen, this, std::placeholders::_1)) {
     } catch (std::exception const& ex) {
         std::cerr << "Ipc: " << ex.what() << std::endl;
         throw;
     }

     void wait() { mThread.join(); }

     void close() {
         mThread.request_stop();
         if (std::this_thread::get_id() != mThread.get_id())
             wait();
     }

     bool send(Request  const& msg, Prio prio = 10) { return reqQ.send(msg, prio, mToken); }
     bool send(Response const& msg, Prio prio = 10) { return resQ.send(msg, prio, mToken); }

     std::optional<Incoming> consume() {
         if (Incoming val; mInbox.consume_one([&val](Incoming res) { val = std::move(res); }))
             return val;
         return {};
     }

     void register_callback(callback_t cb) { mCallback = cb; }
     bool haveMessage() const { return mInbox.read_available(); }
 };

  • 文件test.cpp
#include "test.h"
 #include <cmath>
 #include <set>

 // Requests are sent by client to server.
 // Responses are sent by server to client.
 struct Request { int a, b; };
 struct Response { double a; int b; };
 using ClientIpc = Ipc<Request, Response, true>;
 using ServerIpc = Ipc<Request, Response, false>;

 static std::string IPC_NAME = "so_demo_ipc";
 void server() {
     ServerIpc ipc(IPC_NAME);

     auto handler = [&ipc] {
         assert(ipc.haveMessage());
         if (std::optional<Request> req = ipc.consume()) {
             auto [a, b] = *req;
             std::cout << "server received request a:" << a << " b:" << b << std::endl;

             if (a == -42 && b == -42) {
                 std::cout << " -> server handling close request" << std::endl;
                 ipc.close();
             } else {
                 // send response
                 ipc.send(Response{sqrt(a), b / 2});
             }
         }
     };

     ipc.register_callback(handler);
     ipc.wait();
 }

 void client() {
     ClientIpc ipc(IPC_NAME);

     auto handler = [&ipc] {
         assert(ipc.haveMessage());
         if (std::optional<Response> res = ipc.consume()) {
             auto [a, b] = *res;
             std::cout << "client received response a:" << a << " b:" << b << std::endl;
         }
     };

     ipc.register_callback(handler);

     for (int i = 0; i < 100; ++i) {
         if (rand() % 30 == 0)            // Flag set by other thread
             ipc.send(Request{i, 2 * i}); // Send request

         std::this_thread::sleep_for(10ms);
     }

     std::cout << "Client sending close command" << std::endl;
     ipc.send(Request{-42, -42});

     std::cout << "Closing" << std::endl;
     ipc.close();
 }

 int main(int argc, char** argv) {
     if (std::set<std::string_view>(argv + 1, argv + argc).contains("server"))
         server();
     else
         client();

     std::cout << "Bye" << std::endl;
 }


在本地进行现场演示:
x1c 0d1x的数据
遗憾的是,在线编译器不允许共享内存访问

相关问题