c++ boost::asio p2p networking

mrwjdhj3  于 2023-03-25  发布在  其他
关注(0)|答案(1)|浏览(215)

我想使用boost::asio tcp协议创建一个p2p网络。我给予p2p节点开始监听的第一个监听端口,如果我给出一个我想连接的端口,我希望监听端口连接到给定的端口。(所有在本地主机IP上执行的套接字)
例如:

listening port - 10 (accepting clients)
connect to port: 20
creates a new socket: 10 -> 20

当我尝试使用函数do_connect时,它给出了错误:
“无法连接到127.0.0.1:10:试图对非套接字的对象执行操作”
我猜问题出在我从acceptor.get_executor()获取的套接字上。我如何才能获得侦听端口并将其连接到另一个端点?
主要的:

try
    {
        boost::asio::io_context io_context;
        int port;
        int option;
        std::cout << "enter the port you are listening in: " << std::endl;
        std::cin >> port;
        tcp::endpoint endpoint(tcp::v4(), port);
        P2PNode node(io_context, endpoint, 1);
        std::cout << "do u want to connect to some server? yes - 1 / no - any other";
        std::cin >> option;
        if (option == 1)
        {
            std::cout << "enter the port you will be connecting to: " << std::endl;
            std::cin >> port;
            tcp::endpoint remote_endpoint(boost::asio::ip::address::from_string("127.0.0.1"), port);
            node.do_connect(remote_endpoint);
        }
        io_context.run();
    }
    catch (const std::exception& e)
    {
        std::cout << "Exception was thrown in function: " << e.what() << std::endl;
    }
    catch (...)
    {
        std::cout << "Unknown exception in main !" << std::endl;
    }

P2PNode.h

#pragma once
#include <boost/asio.hpp>
#include <iostream>
#include <string>
#include <vector>
#include "Protocol.h"

#define CONNECTING_NODE 1
#define CLIENT_NODE 2

using boost::asio::ip::tcp;
using namespace std;
class ProtocolMessage;

class P2PNode
{
public:
    P2PNode(boost::asio::io_context& io_context, const tcp::endpoint& endpoint, int nodeType);
    void do_connect(const tcp::endpoint& endpoint);
private:
    void do_accept();
    void do_read(tcp::socket& socket);
    void do_write(tcp::socket& socket, std::size_t length);
    tcp::acceptor acceptor_;
    std::string data_;
    std::vector<tcp::socket> sockets_;
    tcp::socket listeningSocket_;
    int nodeType_;
    ProtocolMessage GenerateResponse(ProtocolMessage message);
    std::string combineSocketsToString();
};

P2PNode.cpp

P2PNode::P2PNode(boost::asio::io_context& io_context, const tcp::endpoint& endpoint, int nodeType)
    : acceptor_(io_context, endpoint), listeningSocket_(io_context) {
    this->nodeType_ = nodeType;
    do_accept();
}

void P2PNode::do_connect(const tcp::endpoint& endpoint)
{
    tcp::socket socket = tcp::socket(acceptor_.get_executor());
    std::cout << "executor: " << socket.get_executor() << std::endl;
    socket.async_connect(endpoint,
        [this, &socket, endpoint](boost::system::error_code ec)
        {
            std::cout << "------------do connect------------";
            if (!ec)
            {
                std::cout << "Connected to " << endpoint << std::endl;

                // Add the connected socket to the list of sockets
                sockets_.emplace_back(std::move(socket));

                // Send a message to the connected server
                std::string message = "Hello from node " + std::to_string(nodeType_);
                data_ = message;
                do_write(sockets_.back(), message.size());
            }
            else
            {
                std::cout << "Failed to connect to " << endpoint << ": " << ec.message() << std::endl;
            }
        });
}

void P2PNode::do_accept() {
    std::cout << "Start Listening" << std::endl;
    acceptor_.async_accept(
        [this](boost::system::error_code ec, tcp::socket socket) {
            if (!ec) {
                std::cout << "Connection established with " << socket.remote_endpoint() << std::endl;
                sockets_.emplace_back(std::move(socket));
                do_read(sockets_.back());
            }
            do_accept();
        });
}

我试图从'listeningSocket_'获取执行程序,但它不起作用。

ffvjumwh

ffvjumwh1#

do_connect有问题:

void P2PNode::do_connect(tcp::endpoint endpoint) {
    std::cout << "------------do connect------------";
    tcp::socket socket = tcp::socket(acceptor_.get_executor());
    std::cout << "executor: " << socket.get_executor() << std::endl;

    socket.async_connect(endpoint, [this, &socket, endpoint](error_code ec) {
        if (!ec) {
            std::cout << "Connected to server " << socket.remote_endpoint() << std::endl;

            sockets_.push_back(std::move(socket));

            data_ = "Hello from node " + std::to_string(nodeType_);
            do_write(sockets_.back(), data_.size());
        } else {
            std::cout << "Failed to connect to " << endpoint << ": " << ec.message() << std::endl;
        }
    });
}

本地变量socket在异步操作(async_connect)开始之前离开作用域,这会导致操作被取消。从旧引用移动到vector是Undefined Behavior
你没有给予do_readdo_write的实现,但实际上使用vector几乎可以保证将来会有更多的UB,因为如果你推送另一个连接,向量可能会重新分配。当在飞行中对它们进行操作时,它是UB。我建议使用一个具有适当参考稳定性保证的容器。std::liststd::deque将是感兴趣的。
假设选择std::list,这会更好:

void P2PNode::do_connect(tcp::endpoint endpoint) {
    std::cout << "------------do connect------------";

    auto& conn = sockets_.emplace_back(acceptor_.get_executor());
    std::cout << "executor: " << conn.get_executor() << std::endl;

    conn.async_connect(endpoint, [=, &conn](error_code ec) {
        if (!ec) {
            std::cout << "Connected to server " << conn.remote_endpoint() << std::endl;

            data_ = "Hello from node " + std::to_string(nodeType_);
            do_write(conn, data_.size());
        } else {
            std::cout << "Failed to connect to " << endpoint << ": " << ec.message() << std::endl;
        }
    });
}

看它**Live On Coliru**

// #pragma once
#include <boost/asio.hpp>
#include <iostream>
#include <string>
#include <list>
// #include "Protocol.h"

using boost::asio::ip::tcp;
class ProtocolMessage;

class P2PNode {
    using error_code = boost::system::error_code;

  public:
    enum Type { CONNECTING_NODE = 1, CLIENT_NODE = 2 };
    P2PNode(boost::asio::io_context& io_context, tcp::endpoint endpoint, Type nodeType);
    void do_connect(tcp::endpoint endpoint);

  private:
    void do_accept();
    void do_read(tcp::socket& ) {}
    void do_write(tcp::socket&, size_t) {}

    tcp::acceptor          acceptor_;
    std::list<tcp::socket> sockets_;
    tcp::socket            listeningSocket_;
    std::string            data_;
    Type                   nodeType_;

    ProtocolMessage GenerateResponse(ProtocolMessage const& message);
    std::string     combineSocketsToString();
};

P2PNode::P2PNode(boost::asio::io_context& io_context, tcp::endpoint endpoint, Type nodeType)
    : acceptor_(io_context, endpoint)
    , listeningSocket_(io_context)
    , nodeType_(nodeType) {
    do_accept();
}

void P2PNode::do_connect(tcp::endpoint endpoint) {
    std::cout << "------------do connect------------";

    auto& conn = sockets_.emplace_back(acceptor_.get_executor());
    std::cout << "executor: " << conn.get_executor() << std::endl;

    conn.async_connect(endpoint, [=, this, &conn](error_code ec) {
        if (!ec) {
            std::cout << "Connected to server " << conn.remote_endpoint() << std::endl;

            data_ = "Hello from node " + std::to_string(nodeType_);
            do_write(conn, data_.size());
        } else {
            std::cout << "Failed to connect to " << endpoint << ": " << ec.message() << std::endl;
        }
    });
}

void P2PNode::do_accept() {
    std::cout << "Start Listening" << std::endl;

    acceptor_.async_accept([this](error_code ec, tcp::socket socket) {
        if (!ec) {
            std::cout << "Connection accepted from " << socket.remote_endpoint() << std::endl;
            sockets_.push_back(std::move(socket));
            do_read(sockets_.back());
        }
        do_accept();
    });
}

int main(int argc, char** argv) try {
    uint16_t const port = argc > 1 ? atoi(argv[1]) : 8989;

    std::cout << "listening port: " << port << std::endl;
    boost::asio::io_context io_context;
    P2PNode node(io_context, {tcp::v4(), port}, P2PNode::CONNECTING_NODE);

    if (uint16_t option, port; //
        std::cout << "Do you want to connect? yes - 1 / no - any other" && std::cin >> option &&
        option == 1) {
        std::cout << "port to connect: " << std::endl;
        std::cin >> port;
        node.do_connect({{},port});
    }
    io_context.run();
} catch (std::exception const& e) {
    std::cout << "Exception was thrown in function: " << e.what() << std::endl;
} catch (...) {
    std::cout << "Unhandled exception" << std::endl;
}

使用两个示例进行测试:

g++ -std=c++20 -O2 -Wall -pedantic -pthread main.cpp 
(./a.out 7878 <<< "other")& sleep 1
(./a.out 8989 <<< "1 7878")&

打印输出,如下所示:

绑定本地端口

这不会导致可观察到的差异,除非您使用netstattcpview等检查连接。
我只想添加bind调用:

auto& conn = sockets_.emplace_back(acceptor_.get_executor());
conn.open(acceptor_.local_endpoint().protocol());
conn.bind(acceptor_.local_endpoint());

然而,这导致

Exception was thrown in function: bind: Address already in use [system:98 at /home/sehe/custom/superboost/boost/asio/detail/reactive_socket_service.hpp:161:5 in function 'boost::system::error_code boost::asio::detail::reactive_socket_service<Protocol>::bind(boost::asio::detail::reactive_socket_service<Protocol>::implementation_type&, const endpoint_type&, boost::system::error_code&)']

设置相关选项doesn't fix it

void P2PNode::do_accept() {
    std::cout << "Start Listening" << std::endl;

    acceptor_.set_option(tcp::acceptor::reuse_address(true));
    acceptor_.async_accept([this](error_code ec, tcp::socket socket) {
         // etc.

我倾向于得出这样的结论:不支持绑定到侦听器正在使用的地址(可移植)。

相关问题