c++ Boost::asio异步调用无法正常工作

4nkexdtk  于 2023-05-20  发布在  其他
关注(0)|答案(1)|浏览(165)

当我运行以下测试时,

TEST_CASE("DummyNetwork_Basic") {
    // start a network at port 3000
    DummyNetwork network3000 = DummyNetwork(3000);
    network3000.listenForIncomingConnections();

    // start a network at port 3001
    DummyNetwork network3001 = DummyNetwork(3001);
    network3001.listenForIncomingConnections();

    // network 3001 connects to network 3000
    network3001.connect(3000);

    // sleep to give time for connection
    std::this_thread::sleep_for(std::chrono::seconds(50));

    // network 3000 sends message to network 3001
    network3000.send(3001, NetworkMessage{1, "hi"});

    // network 3001 calls a blocking function to receive messages
    NetworkMessage receivedOnN2_m1 = network3001.receiveMessageWithTypes(std::vector<int>{1}, 3000);

    bool result1 = receivedOnN2_m1.encodedData_ == "hi";
            CHECK(result1);

    // network 3001 sends messages to network 3000
    network3001.send(3000, NetworkMessage{2, "hello"});
    network3001.send(3000, NetworkMessage{1, "hii"});

    // network 3000 attempts to receive messages via blocking read
    NetworkMessage receivedOnN1_m1 = network3000.receiveMessageWithTypes(std::vector<int>{2}, 3001);
    NetworkMessage receivedOnN1_m2 = network3000.receiveMessageWithTypes(std::vector<int>{1}, 3001);

    // clean up
    network3000.shutdown();
    network3001.shutdown();

    bool result2 = receivedOnN1_m1.encodedData_ == "hello";
    bool result3 = receivedOnN1_m2.encodedData_ == "hii";
            CHECK(result2);
            CHECK(result3);

    if (result1 && result2 && result3) {
        std::cout << "DummyNetwork_Basic Passed" << std::endl;
    } else {
        std::cout << "DummyNetwork_Basic Passed" << std::endl;
    }

}

实际上,我看到的唯一输出(稍后会有意义)是

DummyNetwork3000: Created
DummyNetwork3000: Listening at port3000
DummyNetwork3001: Created
DummyNetwork3001: Listening at port3001
*thread sleeps to give time for connection*
DummyNetwork3000: Can't send. Not connected to port3001

下面是DummyNetwork的头文件。

#pragma once
#include <boost/asio.hpp>
#include <boost/unordered_map.hpp>
#include <vector>
#include "NetworkMessage.hpp"

class DMessageReceival {
public:
    DMessageReceival(NetworkMessage message, std::string sender);
    NetworkMessage message_;
    std::string sender_;
};

class DConnection {
public:
    std::shared_ptr<boost::asio::ip::tcp::socket> socket_;
    std::string otherPort_;
    std::string otherName_;
    std::atomic<bool> disconnectRequested_{false};

    DConnection(std::shared_ptr<boost::asio::ip::tcp::socket> socket, std::string otherPort, std::string otherName);
};

class DummyNetwork {
public:
    DummyNetwork(int port);
    void listenForIncomingConnections();
    void send(int port, const NetworkMessage& msg);
    void connect(int port);
    void disconnect(int port);
    NetworkMessage receiveMessageWithTypes(const std::vector<int>& types, int port = 0);
    void shutdown();

private:
    int port_;
    boost::asio::io_service ioService_;
    boost::unordered_map<std::string, std::shared_ptr<DConnection>> connections_;
    boost::unordered_map<int, std::vector<DMessageReceival>> messages_;

    std::atomic<bool> isListening_{false};
    std::atomic<bool> shutdownRequested_{false};
    std::condition_variable messagesConditionVariable_;
    std::mutex connectionsMutex_;
    std::mutex messagesMutex_;

    void printDebugMessage(std::string input);
    bool isConnected(int name);
    void handleAccept(std::shared_ptr<boost::asio::ip::tcp::socket> socket, const boost::system::error_code& error);
    void handleHandshakeRead(std::shared_ptr<boost::asio::ip::tcp::socket> socket, std::string buffer, const boost::system::error_code& error, std::size_t bytes_read);
    void handleRead(std::shared_ptr<DConnection> connection, std::string buffer, const boost::system::error_code& error, std::size_t bytes_read);
    void handleWrite(const boost::system::error_code& error, const NetworkMessage& msg, std::string other_name);
    void handleConnect(const boost::system::error_code& error, std::shared_ptr<boost::asio::ip::tcp::socket> socket, std::string name);
    void handleHandshakeWrite(const boost::system::error_code& error, std::shared_ptr<DConnection> conn);
};

这里是实现

#include "../include/DummyNetwork.hpp"
#include <iostream>
#include <functional>
#include <boost/bind.hpp>


DMessageReceival::DMessageReceival(NetworkMessage message, std::string sender) {
    message_ = message;
    sender_ = sender;
}

DConnection::DConnection(std::shared_ptr<boost::asio::ip::tcp::socket> socket, std::string otherPort, std::string otherName) {
    socket_ = socket;
    otherPort_ = otherPort;
    otherName_ = otherName;
}

void DummyNetwork::printDebugMessage(std::string input) {
    std::cout << "DummyNetwork" << std::to_string(port_) << ": " << input << std::endl;
}

DummyNetwork::DummyNetwork(int port) : port_(port), ioService_() {
    boost::asio::io_service::work work(ioService_);
    std::thread ioServiceThread([this]() {
        ioService_.run();
    });
    ioServiceThread.detach();
    printDebugMessage("Created");
}

void DummyNetwork::listenForIncomingConnections() {
    if (!isListening_.exchange(true)) {
        boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::tcp::v4(), port_);
        boost::asio::ip::tcp::acceptor acceptor(ioService_, endpoint);
        std::shared_ptr<boost::asio::ip::tcp::socket> socket(new boost::asio::ip::tcp::socket(ioService_));
        printDebugMessage("Listening at port" + std::to_string(port_));
        acceptor.async_accept(*socket, boost::bind(&DummyNetwork::handleAccept, this, socket,
                                                       boost::asio::placeholders::error));
    }
}

bool DummyNetwork::isConnected(int name) {
    std::lock_guard<std::mutex> lock(connectionsMutex_);
    const auto it = connections_.find(std::to_string(name));
    if (it != connections_.end()) {
        return true;
    }
    return false;
}

void DummyNetwork::handleHandshakeRead(std::shared_ptr<boost::asio::ip::tcp::socket> socket, std::string buffer, const boost::system::error_code& error, std::size_t bytes_read) {
    NetworkMessage msg = NetworkMessage::decode(buffer);
    if (msg.type_ == 0 && !isConnected(std::stoi(msg.encodedData_))) {
        std::lock_guard<std::mutex> lock(connectionsMutex_);
        std::string other_port = std::to_string(socket->remote_endpoint().port());
        std::shared_ptr<DConnection> conn = std::make_shared<DConnection>(socket, other_port, msg.encodedData_);
        connections_[conn->otherName_] = conn;
        printDebugMessage("Resolved port" + conn->otherPort_ + "to be port" + conn->otherName_);
        char new_buffer[10000];
        socket->async_read_some(boost::asio::buffer(new_buffer),
                                boost::bind(&DummyNetwork::handleRead, this, conn, new_buffer,
                                            boost::asio::placeholders::error,
                                            boost::asio::placeholders::bytes_transferred));
    }
}

void DummyNetwork::handleRead(std::shared_ptr<DConnection> connection, std::string buffer, const boost::system::error_code& error, std::size_t bytes_read) {
    NetworkMessage msg = NetworkMessage::decode(buffer);
    if (msg.type_ != 0 && !shutdownRequested_ && !connection->disconnectRequested_ && connection->socket_->is_open()) {
        std::lock_guard<std::mutex> lock(messagesMutex_);
        messages_[msg.type_].push_back(DMessageReceival(msg, connection->otherName_));
        messages_[0].push_back(DMessageReceival(msg, connection->otherName_));
        printDebugMessage("Received " + msg.encodedData_ + " from " + connection->otherName_);
        messagesConditionVariable_.notify_all();
        char new_buffer[10000];
        connection->socket_->async_read_some(boost::asio::buffer(new_buffer),
                                boost::bind(&DummyNetwork::handleRead, this, connection, new_buffer,
                                            boost::asio::placeholders::error,
                                            boost::asio::placeholders::bytes_transferred));
    }
}

void DummyNetwork::handleAccept(std::shared_ptr<boost::asio::ip::tcp::socket> socket,
                                const boost::system::error_code& error) {
    isListening_ = false;
    printDebugMessage("Accepted connection to port" + socket->remote_endpoint().port());
    char buffer[10000];
    socket->async_read_some(boost::asio::buffer(buffer),
                            boost::bind(&DummyNetwork::handleHandshakeRead, this, socket, buffer,
                                        boost::asio::placeholders::error,
                                        boost::asio::placeholders::bytes_transferred));
    listenForIncomingConnections();
}

void DummyNetwork::send(int port, const NetworkMessage& msg) {
    if (!shutdownRequested_ && !isConnected(port)) {
        printDebugMessage("Can't send. Not connected to port" + std::to_string(port));
    }
    std::lock_guard<std::mutex> lock(connectionsMutex_);
    std::shared_ptr<DConnection> conn = connections_[std::to_string(port)];
    std::shared_ptr<boost::asio::ip::tcp::socket> socket = conn->socket_;
    socket->async_write_some(boost::asio::buffer(msg.encode(),msg.encode().length()),
                             boost::bind(&DummyNetwork::handleWrite, this,boost::asio::placeholders::error, msg, conn->otherName_));
    printDebugMessage("Requested to write msg " + msg.encodedData_ + " to port" + std::to_string(port));
}

void DummyNetwork::handleWrite(const boost::system::error_code& error, const NetworkMessage& msg, std::string other_name) {
    printDebugMessage("Wrote " + msg.encodedData_ + " to " + other_name);
}

void DummyNetwork::handleConnect(const boost::system::error_code& error, std::shared_ptr<boost::asio::ip::tcp::socket> socket, std::string name) {
    if (!shutdownRequested_) {
        printDebugMessage("Can't connect. Shutdown");
    }
    std::lock_guard<std::mutex> lock(connectionsMutex_);
    std::shared_ptr<DConnection> conn = std::make_shared<DConnection>(socket, name, name);
    connections_[conn->otherName_] = conn;
    NetworkMessage msg = NetworkMessage{0, std::to_string(port_)};
    socket->async_write_some(boost::asio::buffer(msg.encode(),msg.encode().length()),
                             boost::bind(&DummyNetwork::handleHandshakeWrite, this,boost::asio::placeholders::error, conn));
}

void DummyNetwork::handleHandshakeWrite(const boost::system::error_code& error, std::shared_ptr<DConnection> conn) {
    printDebugMessage("Connected to port" + conn->otherName_);
}

void DummyNetwork::connect(int name) {
    if (isConnected(name)) {
        return;
    }
    boost::asio::ip::tcp::resolver resolver(ioService_);
    auto new_socket = std::make_shared<boost::asio::ip::tcp::socket>(ioService_);
    boost::asio::ip::tcp::resolver::query query("127.0.0.1", std::to_string(name));
    boost::asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
    boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator;
    new_socket->async_connect(endpoint,
                          boost::bind(&DummyNetwork::handleConnect, this,
                                      boost::asio::placeholders::error, new_socket, std::to_string(name)));
}

void DummyNetwork::disconnect(int port) {
    if (isConnected(port)) {
        std::lock_guard<std::mutex> lock(connectionsMutex_);
        const auto it = connections_.find(std::to_string(port));
        if (it != connections_.end()) {
            std::shared_ptr<DConnection> conn = it->second;
            conn->disconnectRequested_ = true;
            conn->socket_->close();
            printDebugMessage("Closed connection to " + conn->otherName_);
        }
        connections_.erase(std::to_string(port));
    }
}

NetworkMessage DummyNetwork::receiveMessageWithTypes(const std::vector<int>& types, int port) {
    std::unique_lock<std::mutex> lock(messagesMutex_);
    while (port == 0 || isConnected(port)) {
        if (port == 0) {
            if (messages_[0].size() > 0) {
                DMessageReceival mr = messages_[0][0];
                messages_[0].erase(messages_[0].begin());
                for (auto it = messages_[mr.message_.type_].begin();
                     it != messages_[mr.message_.type_].end(); it++) {
                    if ((it->message_.encodedData_ ==
                         mr.message_.encodedData_) &&
                        (it->sender_ == mr.sender_)) {
                        messages_[mr.message_.type_].erase(it);
                        lock.unlock();
                        return mr.message_;
                    }
                }
            } else {
                messagesConditionVariable_.wait(lock);
            }
        } else {
            for (auto type: types) {
                if (messages_[type].size() > 0) {
                    DMessageReceival mr = messages_[type][0];
                    messages_[type].erase(messages_[type].begin());
                    for (auto it = messages_[0].begin();
                         it != messages_[0].end(); it++) {
                        if ((it->message_.encodedData_ ==
                             mr.message_.encodedData_) &&
                            (it->sender_ == mr.sender_)) {
                            messages_[0].erase(it);
                            lock.unlock();
                            return mr.message_;
                        }
                    }
                }
            }
            messagesConditionVariable_.wait(lock);
        }
    }
    return {};
}

void DummyNetwork::shutdown() {
    shutdownRequested_ = true;
    std::lock_guard<std::mutex> lock(connectionsMutex_);
    for (const auto& [key, value] : connections_) {
        value->disconnectRequested_ = true;
        value->socket_->close();
        printDebugMessage("Closed connection to " + value->otherName_);
    }
    connections_.clear();
}

我怀疑问题出在我对boost库的使用上,特别是我对io_service对象的使用,要么a)没有正确地将工作发布到它的队列中,要么b)没有正确地执行队列中的内容。

m2xkgtsf

m2xkgtsf1#

这是一个很大的代码。
只有几件事立即脱颖而出:

printDebugMessage("Accepted connection to port" + socket->remote_endpoint().port());

没道理啊你要把一个整数加到一个字符串中。如果你启用了编译器警告,你就会知道你什么时候写了这样的boo-boos。你在代码的不同部分正确地(尽管重复)了这一点。
大得多:

char new_buffer[10000];
connection->socket_->async_read_some(asio::buffer(new_buffer),
                                     boost::bind(&DummyNetwork::handleRead, this, connection,
                                                 new_buffer, ph::error, ph::bytes_transferred));

这也说不通。new_buffer是局部变量。根据定义,您将无法在handleRead中访问它。此外,它将在封闭作用域的末尾停止存在。你有两次这样的错误:

char buffer[10000];
socket->async_read_some( //
    asio::buffer(buffer),
    boost::bind(&DummyNetwork::handleHandshakeRead, this, socket, buffer, ph::error,
                ph::bytes_transferred));

剩下的代码是。。非常非常复杂

  • 我不知道你为什么更新isListening_。它的价值似乎从来没有用处。
  • 你基本上从来没有检查过任何error_code。这是一个不必要的混乱配方
  • 您可以在DummyNetwork上进行所有操作。这意味着它的责任过重。它充当监听器**,**充当(多个)连接。在这方面,我们必须问为什么你甚至有一个单独的DConnection类型摆在首位。
  • receiveMessagesWithTypes闻起来像是20世纪80年代的select/poll循环。你现在用的是Asio,何必呢?
  • 你有显式的线程同步(messageConditionVariable_messagesMutex_)。这似乎也不匹配异步IO使用Asio好
  • 解析端点,但不知何故只考虑第一个解析器结果;首选asio::async_connect
  • 将接收到的buffer的内容解释为有效的nul终止字符串,忽略bytes_read
  • 你在shutdown中有一个数据竞争,你在connectionsMutex下调用close()。但是,connection可能被其他不使用相同锁的异步完成处理程序使用。使用strands或者认识到每个“网络”示例都有一个单线程,并将其与之接近,可能会更好。另外,考虑shutdown(...)cancel(),而不是close()
  • 你使用async_write_some,它不能保证发送整个缓冲区。首选asio::async_write
  • 此外,更喜欢分散/聚集,而不是手动编码/解码的东西,你似乎正在进行(未显示)。至少它看起来是非常不一致的(type_在成员和encode()结果中必然是重复的-我希望不是按值返回,因为这意味着在asio::buffer(msg.encode(), msg.encode().length())中会有更多的生命周期错误导致Undefined Behaviour)。
  • 说到这一点,永远不要指定缓冲区的大小,如果这只意味着你可能会出错:asio::buffer(msg.encode())
  • isConnected是一个活泼的接口:根据定义,一旦connectionMutex_被释放,返回的值就变得陈旧。一个更有用的接口可能会接受一个unique_lock或假设调用者持有锁。
  • send仅检查shutdownRequested iff !isConnected(port)。这似乎是错误的这里有一个固定的尝试:
void DummyNetwork::send(int port, NetworkMessage const& msg) {
     if (shutdownRequested_)
         printDebugMessage("Can't send. Shutdown.");
     std::lock_guard<std::mutex>  lock(connectionsMutex_);

     if (auto conn = connections_[std::to_string(port)]) {
         asio::async_write(*conn->socket_, asio::buffer(msg.encode()),
                           boost::bind(&DummyNetwork::handleWrite, this, ph::error, msg, conn->otherName_));
         printDebugMessage("Requested to write msg " + msg.encodedData_ + " to port " + std::to_string(port));
     } else {
         printDebugMessage("Can't send. Not connected to port " + std::to_string(port));
     }
 }

当然,现在我们也注意到,msgasync_write完成之前并没有生命周期保证。事实上,在每一次调用send时,你都在传递一个临时的NetworkMessage。唉。又是一辈子的错误。

  • 您无法在上一个send完成之前阻止下一个send发生:
netB.send(portA, NetworkMessage{2, "hello"});
 netB.send(portA, NetworkMessage{1, "hii"});

这可能违反documented at asio::async_write的要求:

  • 此操作通过零次或多次调用流的async_write_some函数来实现,称为组合操作。程序必须确保流在此操作完成之前不执行其他写操作(例如async_write、流的async_write_some函数或任何其他执行写的组合操作)。

我知道你的代码中有async_write_some,但这本身就是一个错误。您可能需要wellknown outbound queue pattern

  • 您可以将一些完成处理程序绑定到connection,一些绑定到other_name,一些甚至绑定到原始tcp::socket示例。所有这些可能都应该是DConnection上的成员,并使用shared_from_this
  • 你有async_read_some,它有和async_write_some相同的问题:不能保证它读取“完整的消息”。您可能需要在协议中添加帧,以便明确地知道消息何时完成。否则,您将不正确地解析部分消息,这会造成足够的混乱。但是,您也会将尾随数据解释为一条新消息,这将再次导致UB,因为它没有以类型开始,或者实际上可能不够长,甚至不足以包含这样的头数据。

也许如果你描述一下你想实现什么,我就能帮助你展示我是如何实现的。与此同时,这里是我的中途审查纳入了许多上述想法:

(Un)live On Coliru

请注意,我无法测试逻辑,因为逻辑从一开始就不清楚,并且提供的实现无法工作(因此我没有比较)。提供这些代码只是为了说明我在此过程中提出的一些要点。

更新

完成替代实施

Live On Coliru

#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <boost/endian/arithmetic.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/key.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/sequenced_index.hpp>
#include <boost/multi_index_container.hpp>
#include <boost/signals2.hpp>
#include <deque>
#include <functional>
#include <iomanip>
#include <iostream>
#include <map>
#include <vector>

using namespace std::chrono_literals;

namespace { // diagnostics tracing helpers
    auto now = std::chrono::high_resolution_clock::now;

    static auto timestamp() {
        static auto start = now();
        return (now() - start) / 1.ms;
    }

    static std::atomic_int tid_gen = 0;
    thread_local int       tid     = tid_gen++;
    std::mutex             console_mx;

    void trace(auto const&... args) {
        std::lock_guard lk(console_mx);
        std::cout << "T:" << std::setw(2) << tid << std::right << std::setw(10) << timestamp() << "ms ";
        (std::cout << ... << args) << std::endl;
    }
} // namespace

namespace Network {
    namespace asio = boost::asio;
    namespace ph   = asio::placeholders;
    namespace bmi  = boost::multi_index;
    namespace s2   = boost::signals2;
    using duration = std::chrono::steady_clock::duration;
    using asio::ip::tcp;
    using boost::system::error_code;
#ifdef VERBOSE
    using mutex_type = std::recursive_mutex;        // recursive only to allow locking inside trace function
    using cv_type    = std::condition_variable_any; // _any to allow recursive mutex
#else
    using mutex_type = std::mutex;
    using cv_type    = std::condition_variable;
#endif

    struct timeout_exception : std::runtime_error{
        timeout_exception() : std::runtime_error("timeout_exception") {}
    };

    struct Message {
        using NetInt32 = boost::endian::big_int32_t;

        Message(int type = -1, std::string payload = {}) : type_(type), payload_(std::move(payload)) {
            prepare_payload();
        }

        int                type() const { return type_; }
        std::string const& payload() const { return payload_; }
        void               prepare_payload() { payloadsize_ = payload_.size(); }

        asio::mutable_buffer prepare_payload_buffer() {
            payload_.resize(payloadsize_);
            return asio::buffer(payload_);
        }

        auto buffers() const {
            return std::array{
                asio::buffer(&type_, sizeof(type_)),
                asio::buffer(&payloadsize_, sizeof(payloadsize_)),
                asio::buffer(payload_),
            };
        }

        auto mutable_headers() {
            return std::array{
                asio::buffer(&type_, sizeof(type_)),
                asio::buffer(&payloadsize_, sizeof(payloadsize_)),
            };
        }

      private:
        NetInt32    type_;
        NetInt32    payloadsize_;
        std::string payload_;
    };

    struct Receipt {
        size_t      ordinal;
        Message     message_;
        std::string sender_;

        int type() const { return message_.type(); }
    };

    class Session;
    using SessionPtr = std::shared_ptr<Session>;

    class Session : public std::enable_shared_from_this<Session> {
        int const id = []{ static int gen = 0; return ++gen; }();
        void trace(auto const&... args) const {
            ::trace("Session#", id, "\t", peerPort_, '/', quoted(peerName_), " ", args...);
        }

      public:
        Session(asio::any_io_executor ex) : socket_(ex) {}

        Session(tcp::socket s)
            : socket_(std::move(s))
            , peerPort_(socket_.remote_endpoint().port())
            , peerName_(std::to_string(peerPort_)) {}

        ~Session() { trace("Destructing"); }

        uint16_t           peerPort() const { return peerPort_; }
        std::string const& peerName() const { return peerName_; }

        s2::signal<void(SessionPtr const&)>                 when_established;
        s2::signal<void(SessionPtr const&, Message const&)> when_message;

      private:
#include <boost/asio/yield.hpp>
        template <typename Token> auto async_receive(Token&& token) {
            auto init = [this, coro = asio::coroutine{}](auto&& self, error_code ec = {},
                                                         size_t = {}) mutable {
                reenter(coro) {
                    yield asio::async_read(socket_, incoming_.mutable_headers(), std::move(self));

                    if (ec.failed())
                        return self.complete(ec, incoming_);

                    yield asio::async_read(socket_, incoming_.prepare_payload_buffer(), std::move(self));

                    trace("Received message payload ", quoted(incoming_.payload()), ", ", ec.message());
                    self.complete(ec, incoming_);
                }
            };
            return asio::async_compose<decltype(token), void(error_code, Message)>(init, token);
        }

        template <typename Token> auto async_connect_handshake(std::string ourName, Token&& token) {
            auto init = [=, this, coro = asio::coroutine{}] //
                (auto&& self, error_code ec = {}, tcp::endpoint = {}) mutable {
                    reenter(coro) {
                        yield socket_.async_connect({{}, peerPort_}, std::move(self));
                        if (ec.failed())
                            return self.complete(ec);

                        trace("Connected to ", socket_.remote_endpoint(), " ", ec.message());

                        do_send(Message{0, ourName});

                        self.complete(ec);
                    }
                };
            return asio::async_compose<decltype(token), void(error_code)>(init, token);
        }
#include <boost/asio/unyield.hpp>

      public:
        void startIncoming() {
            // not posting to strand because startXXXX() is always called as first & only operation
            assert(socket_.is_open());
            async_receive(
                boost::bind(&Session::on_incoming_handshake, shared_from_this(), ph::error, boost::placeholders::_2));
        }

        void startOutgoing(uint16_t port, std::string ourName) {
            // not posting to strand because startXXXX() is always called as first & only operation
            assert(!socket_.is_open());
            peerPort_ = port;
            peerName_ = std::to_string(port);
            async_connect_handshake(ourName,
                                    boost::bind(&Session::on_establish, shared_from_this(), ph::error));
        }

        void send(Message msg) {
            post(socket_.get_executor(),
                 [this, self = shared_from_this(), m = std::move(msg)]() mutable { do_send(std::move(m)); });
        }

        void shutdown() {
            post(socket_.get_executor(), [this, self = shared_from_this()]() mutable { do_shutdown(); });
        }

      private:
        // assumed on strand
        void do_receive_message() {
            async_receive(
                boost::bind(&Session::on_message, shared_from_this(), ph::error, boost::placeholders::_2));
        }

        void do_send(Message msg) {
            outbox_.push_back(std::move(msg));
            if (outbox_.size() == 1)
                do_write_loop();
        }

        void do_shutdown() {
            socket_.cancel(); // maybe close()?
        }

        void do_write_loop() {
            if (outbox_.empty())
                return;

            asio::async_write( //
                socket_, outbox_.front().buffers(),
                [this, self = shared_from_this()](error_code ec, size_t n) {
                    trace("write loop: ", n, " bytes, ", ec.message());
                    if (ec.failed())
                        return do_shutdown();

                    outbox_.pop_front();
                    do_write_loop();
                });
        }

        void on_incoming_handshake(error_code ec, Message msg) {
            trace(__FUNCTION__, ": ", ec.message());
            if (ec.failed())
                return do_shutdown();

            if (msg.type() != 0) {
                trace(__FUNCTION__, ": Invalid handshake message");
                return do_shutdown();
            }

            peerName_ = msg.payload();
            trace("Handshake ", quoted(peerName_), " on port ", peerPort_);

            on_establish(ec);
        }

        void on_establish(error_code ec) {
            trace(__FUNCTION__, ": ", ec.message());
            if (ec.failed())
                return do_shutdown();

            when_established(shared_from_this());
            do_receive_message();
        }

        void on_message(error_code ec, Message msg) {
            trace(__FUNCTION__, ": ", ec.message());
            if (ec.failed())
                return do_shutdown();

            when_message(shared_from_this(), msg);

            do_receive_message();
        }

        tcp::socket         socket_;
        uint16_t            peerPort_ = 0xFFFF;
        std::string         peerName_ = "(none)";
        // message exchange
        Message             incoming_;
        std::deque<Message> outbox_;
    };

    class Listener : public std::enable_shared_from_this<Listener>{
        int const id = []{ static int gen = 0; return ++gen; }();
        void trace(auto const&... args) const {
            ::trace("Listener#", id, "\t", acceptor_.local_endpoint(), " ", args...);
        }

      public:
        Listener(asio::any_io_executor ex, uint16_t port) : acceptor_(ex, {{}, port}) {}

        void listen(std::function<void(SessionPtr const&)> when_established = {}) {
            trace("Listening at ", acceptor_.local_endpoint());
            doAccept(when_established);
        }

      private:
        void doAccept(std::function<void(SessionPtr const&)> when_established = {}) {
            acceptor_.async_accept( //
                make_strand(acceptor_.get_executor()),
                [=, this, self = shared_from_this()](error_code ec, tcp::socket s) {
                    trace("on_accept: ", ec.message());
                    if (ec.failed())
                        return;

                    auto sess = std::make_shared<Session>(std::move(s));
                    if (when_established)
                        sess->when_established.connect(when_established);
                    sess->startIncoming();
                    doAccept(when_established);
                });
        }
        tcp::acceptor acceptor_;
    };

    class Hub {
        int const id = []{ static int gen = 0; return ++gen; }();
        void trace(auto const&... args) const {
#ifdef VERBOSE
            auto nmsg = [&] { std::lock_guard lk(messages_mx_); return messages_.size(); }();
            auto nconn = [&] { std::lock_guard lk(connections_mx_); return connections_.size(); }();
            ::trace("Hub#", id, "\t", port_, " (messages_:", nmsg, ", connections_:", nconn, ") ", args...);
#else
            ::trace("Hub#", id, "\t", port_, " ", args...);
#endif
        }

        auto register_session() {
            return [this](SessionPtr sp) { // when_established
                if (!sp)
                    return;
                {
                    trace("Established connection to ", sp->peerPort(), " (", quoted(sp->peerName()), ")");
                    std::lock_guard lk(connections_mx_);
                    connections_.emplace(sp->peerName(), sp);
                }

                sp->when_established.disconnect_all_slots();

                sp->when_message.connect([this](SessionPtr const& sp, Message msg) {
                    trace("Received ", quoted(msg.payload()), " from ", sp->peerName());
                    std::lock_guard lk(messages_mx_);

                    messages_.push_back({message_id_sequence_++, msg, sp->peerName()});
                    messages_cv_.notify_all();
                });
            };
        }

      public:
        Hub(asio::any_io_executor ex, uint16_t port) : ex_(ex), port_(port) { trace("Created"); }

        void listen() {
            if (isListening_.exchange(true))
                return;

            std::make_shared<Listener>(ex_, port_)->listen(register_session());
        }

        void send(std::string peerName, Message msg) {
            if (shutdownRequested_) {
                trace("Can't send. Shutdown.");
                return;
            }

            if (auto conn = find_session(peerName))
                conn->send(std::move(msg));
            else
                trace("Can't send. No connection for ", quoted(peerName));
        }

        void connect(uint16_t port) {
            std::string name = std::to_string(port_);
            if (!isConnected(name)) {
                auto sess = std::make_shared<Session>(make_strand(ex_));
                sess->when_established.connect(register_session());
                sess->startOutgoing(port, std::move(name));
            }
        }

        // void disconnect(uint16_t port);
        Message receiveMessage(duration timeout) {
            std::unique_lock lk(messages_mx_);

            if (!messages_cv_.wait_for(lk, timeout, [&] { return !messages_.empty(); }))
                throw timeout_exception{};

            Receipt mr = std::move(messages_.front());
            messages_.pop_front();

            return std::move(mr.message_);
        }

        Message receiveMessageWithTypes(duration timeout, std::vector<int> const& types, uint16_t port) {
            std::string const name = std::to_string(port);
            if (port == 0)
                return receiveMessage(timeout);

            std::unique_lock lk(messages_mx_);

            auto& idx = messages_.get<perType>();

            do {
                for (auto t : types) {
                    if (auto r = idx.equal_range(t); r.first != r.second) {
                        Receipt mr = std::move(*r.first);
                        idx.erase(r.first);
                        return mr.message_;
                    }
                }
                if (std::cv_status::timeout == messages_cv_.wait_for(lk, timeout)) {
                    throw timeout_exception{};
                }
            } while (isConnected(name));

            throw std::runtime_error("Not connected");
        }

        void shutdown() {
            std::lock_guard lk(connections_mx_);
            for (auto const& [name, conn] : connections_) {
                conn->shutdown();
                trace("Closed connection to ", quoted(name));
            }
            connections_.clear();
        }

      private:
        asio::any_io_executor ex_;
        uint16_t              port_;
        std::unordered_map<std::string, SessionPtr> connections_;

        size_t message_id_sequence_ = 0;
        bmi::multi_index_container<
            Receipt,                          //
            bmi::indexed_by<bmi::sequenced<>, // general queue
                            bmi::ordered_unique<bmi::tag<struct perType>,
                                                bmi::key<&Receipt::type, &Receipt::ordinal>> // per-type
                            >>
            messages_;

        SessionPtr find_session(std::string const& name) const {
            std::lock_guard lk(connections_mx_);

            auto it = connections_.find(name);
            return it != connections_.end() ? it->second : nullptr;
        }

        std::atomic<bool>  isListening_{false};
        std::atomic<bool>  shutdownRequested_{false};
        mutable mutex_type connections_mx_;
        mutable mutex_type messages_mx_;
        cv_type            messages_cv_;

        bool isConnected(std::string const& name) { return !!find_session(name); }
    };

} // namespace Network

#define CHECK(cond)                                                                                          \
    do {                                                                                                     \
        auto&& c = (cond);                                                                                   \
        trace("Check '" #cond "': ", std::boolalpha, c);                                                     \
    } while (false)

void DummyNetwork_Basic() {
    using namespace Network;
    asio::thread_pool ioc(1);

    constexpr uint16_t portA = 3000;
    constexpr uint16_t portB = 3001;

    Hub netA = Hub(ioc.get_executor(), portA);
    Hub netB = Hub(ioc.get_executor(), portB);

    netA.listen();
    netB.listen();

    // network portB connects to network portA
    netB.connect(portA);

    // sleep to give time for connection
    using namespace std::chrono_literals;
    std::this_thread::sleep_for(3s);

    // network portA sends message to network portB
    netA.send(std::to_string(portB), Message{1, "hi"});

    // network portB calls a blocking function to receive messages
    Message receivedOnN2_m1 = netB.receiveMessageWithTypes(1s, {1}, portA);

    bool result1 = receivedOnN2_m1.payload() == "hi";
    CHECK(result1);

    // network portB sends messages to network portA
    netB.send(std::to_string(portA), Message{2, "hello"});
    netB.send(std::to_string(portA), Message{1, "hii"});

    // network portA attempts to receive messages via blocking read
    Message receivedOnN1_m1 = netA.receiveMessageWithTypes(1s, {2}, portB);
    Message receivedOnN1_m2 = netA.receiveMessageWithTypes(1s, {1}, portB);

    // clean up
    netA.shutdown();
    netB.shutdown();

    bool result2 = receivedOnN1_m1.payload() == "hello";
    bool result3 = receivedOnN1_m2.payload() == "hii";
    CHECK(result2);
    CHECK(result3);

    if (result1 && result2 && result3) {
        trace("DummyNetwork_Basic Passed");
    } else {
        trace("DummyNetwork_Basic Passed");
    }
}

int main() { //
    DummyNetwork_Basic();
}

打印

g++ -std=c++2b -Ofast -Wall -pedantic main.cpp -pthread && ./a.out
T: 0  0.001854ms Hub#1  3000 Created
T: 0  0.076385ms Hub#2  3001 Created
T: 0  0.171003ms Listener#1 0.0.0.0:3000 Listening at 0.0.0.0:3000
T: 0  0.266008ms Listener#2 0.0.0.0:3001 Listening at 0.0.0.0:3001
T: 1   0.52161ms Listener#1 0.0.0.0:3000 on_accept: Success
T: 1  0.640365ms Session#1  3000/"3000" Connected to 127.0.0.1:3000 Success
T: 1  0.714466ms Session#1  3000/"3000" on_establish: Success
T: 1  0.730936ms Hub#2  3001 Established connection to 3000 ("3000")
T: 1  0.766172ms Session#1  3000/"3000" write loop: 12 bytes, Success
T: 1  0.792727ms Session#2  34546/"34546" Received message payload "3001", Success
T: 1  0.804801ms Session#2  34546/"34546" on_incoming_handshake: Success
T: 1  0.814638ms Session#2  34546/"3001" Handshake "3001" on port 34546
T: 1  0.825382ms Session#2  34546/"3001" on_establish: Success
T: 1  0.834955ms Hub#1  3000 Established connection to 34546 ("3001")
T: 1   3000.73ms Session#2  34546/"3001" write loop: 10 bytes, Success
T: 1   3000.82ms Session#1  3000/"3000" Received message payload "hi", Success
T: 1   3000.83ms Session#1  3000/"3000" on_message: Success
T: 1   3000.85ms Hub#2  3001 Received "hi" from 3000
T: 0   3000.89ms Check 'result1': true
T: 1   3000.98ms Session#1  3000/"3000" write loop: 13 bytes, Success
T: 1   3001.01ms Session#2  34546/"3001" Received message payload "hello", Success
T: 1   3001.02ms Session#2  34546/"3001" on_message: Success
T: 1   3001.03ms Hub#1  3000 Received "hello" from 3001
T: 1   3001.06ms Session#1  3000/"3000" write loop: 11 bytes, Success
T: 1   3001.08ms Session#2  34546/"3001" Received message payload "hii", Success
T: 1   3001.09ms Session#2  34546/"3001" on_message: Success
T: 1    3001.1ms Hub#1  3000 Received "hii" from 3001
T: 0   3001.13ms Hub#1  3000 Closed connection to "3001"
T: 0   3001.16ms Hub#2  3001 Closed connection to "3000"
T: 0   3001.17ms Check 'result2': true
T: 0   3001.19ms Check 'result3': true
T: 0    3001.2ms DummyNetwork_Basic Passed
T: 1   3001.22ms Session#2  34546/"3001" on_message: Operation canceled
T: 1   3001.24ms Session#2  34546/"3001" Destructing
T: 0   3001.35ms Session#1  3000/"3000" Destructing

相关问题