c++ 使用boost::asio::blog c_write进行多个调用

lnlaulya  于 2023-10-20  发布在  其他
关注(0)|答案(1)|浏览(141)

我正在尝试boost asio,测试代码基于asio-wrapper
在这种情况下,有没有办法在不依赖std::this_thread::sleep_for的情况下防止数据集中?
这是测试代码:

#include<iostream>
#include <vector>
#include<thread>
#include<chrono>
#include "asio_wrapper.hpp"
    
    
class WritableHandlerImplServer : public cppeng::tcp::WritableHandler {
    public:
        void HandleCallback(std::vector<uint8_t> data,
                std::shared_ptr<cppeng::tcp::Writable> writable) override {
                // Convert void pointer to string
                std::string received(data.begin(), data.end());
                std::cout << "Server Rx: " << received << std::endl;

            }
            void NotifyClosed(std::shared_ptr<cppeng::tcp::Writable> ptr) override {
                std::cout << "Server: Connection closed!" << std::endl;
            }
        };

class WritableHandlerImplClient : public cppeng::tcp::WritableHandler {
    public:
        void HandleCallback(std::vector<uint8_t> data,
            std::shared_ptr<cppeng::tcp::Writable> writable) override {
            // Convert void pointer to string
            std::string received(data.begin(), data.end());
            std::cout << "Client Rx: " << received << std::endl;
            
            rx_flag_ = true;
        }
        void NotifyClosed(std::shared_ptr<cppeng::tcp::Writable> ptr) override {
            std::cout << "Client: Connection closed!" << std::endl;
        }
        bool rx_flag_{ false };
};

int main()
{
    /*this is a test code*/
    try {

        WritableHandlerImplServer writable_handler_server;
        WritableHandlerImplClient writable_handler_client;

        cppeng::tcp::Server server(3000, writable_handler_server);
        server.Start();
        cppeng::tcp::Client client("127.0.0.1", 3000, writable_handler_client);
        client.Start(1000);

        
        client.Write("abcd");
        std::this_thread::sleep_for(std::chrono::milliseconds(5));
        client.Write("efgh");
        std::this_thread::sleep_for(std::chrono::milliseconds(5));
        client.Write("ijkl");
        std::this_thread::sleep_for(std::chrono::milliseconds(5));
        client.Write("mnop");
        std::this_thread::sleep_for(std::chrono::milliseconds(5));
        client.Write("qrst");
        std::this_thread::sleep_for(std::chrono::milliseconds(5));
        client.Write("uvwx");
        std::this_thread::sleep_for(std::chrono::milliseconds(5));
        client.Write("yz");
            
        
        server.Stop();
        client.Stop();

    }
    catch (std::exception& e) {
        std::cout << "exception :" << e.what() << "\n";
    }

    return EXIT_SUCCESS;
}

写/读功能:

void Connection::DoRead() {

    auto self(shared_from_this());
    boost::asio::async_read(socket_,
        boost::asio::buffer(read_buffer_.data(), read_buffer_.size()),
        boost::asio::transfer_at_least(1),
                    [this, self](std::error_code ec, std::size_t length) {
                        if (!ec) {
                        std::vector<uint8_t> rx_data(read_buffer_.begin(), read_buffer_.begin() + length);
                        writable_handler_.HandleCallback(std::move(rx_data), shared_from_this());
                        DoRead();
                        } else {
                        Close();
                        }
                    });
}

void Connection::DoWrite() {
    auto self(shared_from_this());
    boost::asio::async_write(socket_,
                    boost::asio::buffer(write_queue_.front().data(),
                                    write_queue_.front().size()),
                    [this, self](std::error_code ec, std::size_t /*length*/) {
                        if (!ec) {
                        write_queue_.pop();
                        if (!write_queue_.empty()) {
                            DoWrite();
                        } else {
                            write_busy_ = false;
                        }
                        } else {
                        Close();
                        }
                    });
}

更新:
源代码作者以这样的方式实现了 Package 器写函数,如果boost::asio写函数正在忙碌,第二条消息就会丢失!

void Connection::Write(const std::string &data) {
      // Add the data to the queue
      write_queue_.emplace(data.begin(), data.end());
      // Start the write process only if we are not already busy writing
      if (!write_busy_) {
        write_busy_ = true;
        DoWrite();
      }
      else {
         
      }
    }
niknxzdl

niknxzdl1#

假设“数据集中”是指数据包不代表“消息”:是的。TCP是一种流协议。你得到一个逻辑流。数据流在线路上传输的方式无关紧要。
您需要使用应用程序协议来了解如何解析流。通常会有信息的框架。常用的选择是分隔符(例如,NUL字节)或长度前缀消息。
在你的例子中,看起来消息可能是4个八位字节,这意味着你可以transfer_exactly(4)。如果没有,我建议使用NULL定界符(当然,选择一个不会干扰实际消息内容的分隔符)。
看起来asio_wrapper没有提供实现这一点的方法。它只会在框架不重要的情况下帮助你,或者你必须做额外的缓冲和解析,这会使你的处理程序变得复杂。
我建议不要使用 Package :

Live On Coliru

#include <boost/asio.hpp>
#include <deque>
#include <iostream>

namespace asio = boost::asio;
using asio::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;

using Message = std::string;
using Handler = std::function<void(Message const& msg)>; // or perhaps an interface

struct Connection : std::enable_shared_from_this<Connection> {
    Connection(tcp::socket s, Handler cb) : s_(std::move(s)), cb_(std::move(cb)) {}

    void Start() { read_loop(); }

    void Write(Message msg) {
        post(s_.get_executor(),
             [this, self = shared_from_this(), msg = std::move(msg)]() mutable { do_write(std::move(msg)); });
    }

    void Stop() {
        post(s_.get_executor(), [this, self = shared_from_this()]() { s_.cancel(); });
    }

  private:
    tcp::socket s_;
    Handler     cb_;

    Message             incoming_;
    std::deque<Message> outbox_;

    void do_write(Message msg) {
        if (!msg.ends_with('\n'))
            msg += '\n';
        outbox_.push_back(std::move(msg));
        if (outbox_.size() == 1)
            write_loop();
    }

    void read_loop() {
        asio::async_read_until( //
            s_, asio::dynamic_buffer(incoming_), "\n",
            [this, self = shared_from_this()](error_code ec, size_t n) {
                if (!ec) {
                    if (cb_ && n)
                        cb_(incoming_.substr(0, n - 1)); // exclude '\n'
                    incoming_.erase(0, n);
                    read_loop();
                }
            });
    }

    void write_loop() {
        if (outbox_.empty())
            return;
        asio::async_write( //
            s_, asio::buffer(outbox_.front()), [this, self = shared_from_this()](error_code ec, size_t) {
                if (!ec) {
                    outbox_.pop_front();
                    write_loop();
                }
            });
    }
};

struct Server {
    Server(uint16_t port, Handler cb) : acc_(io_, {{}, port}), callback_(std::move(cb)) {}

    void Start() {
        acc_.listen();
        accept_loop();
    }

    void Stop() {
        io_.stop();
        // acc_.cancel();
        //  TODO perhaps keep a list of sessions and stop them?
        // io_.join();
    }

  private:
    asio::thread_pool io_{1};
    tcp::acceptor     acc_;
    Handler           callback_;

    void accept_loop() {
        acc_.async_accept(make_strand(acc_.get_executor()), [this](error_code ec, tcp::socket s) {
            if (!ec) {
                std::make_shared<Connection>(std::move(s), callback_)->Start();
                accept_loop();
            }
        });
    }
};

struct Client {
    Client(std::string host, uint16_t port, Handler cb)
        : spec_(host, std::to_string(port))
        , callback_(std::move(cb)) {}

    void Start() {
        assert(!conn_);
        tcp::socket socket_(io_);
        asio::connect(socket_, tcp::resolver(io_).resolve(spec_));
        conn_ = std::make_shared<Connection>(std::move(socket_), callback_);
        conn_->Start();
    }

    void Write(Message msg) const {
        assert(conn_);
        conn_->Write(std::move(msg));
    }

    void Stop() {
        if (conn_)
            conn_->Stop();
    }

  private:
    asio::thread_pool           io_{1};
    tcp::resolver::query        spec_;
    Handler                     callback_;
    std::shared_ptr<Connection> conn_;
};

int main() {
    try {
        Server server(3000, [](std::string const& msg) { std::cout << "Server Rx: " << msg << std::endl; });
        server.Start();

        Client client("127.0.0.1", 3000,
                      [](std::string const& msg) { std::cout << "Client Rx: " << msg << std::endl; });
        client.Start();

        client.Write("abcd");
        client.Write("efgh");
        client.Write("ijkl");
        client.Write("mnop");
        client.Write("qrst");
        client.Write("uvwx");
        client.Write("yz");

        client.Stop();

        std::this_thread::sleep_for(5ms);
        server.Stop(); // don't stop the server before last message arrived
    } catch (std::exception const& e) {
        std::cout << "exception :" << e.what() << "\n";
    }
}

印刷

Server Rx: abcd
Server Rx: efgh
Server Rx: ijkl
Server Rx: mnop
Server Rx: qrst
Server Rx: uvwx
Server Rx: yz

相关问题