c++ Boost ASIO中断c_read_until不阅读背靠背消息的问题

ff29svar  于 11个月前  发布在  其他
关注(0)|答案(1)|浏览(155)

我正在使用boost.asio实现一个网络库。这是一个TCP类的读取函数。
我的一个测试案例有一个客户端连接到服务器,一旦握手完成,服务器立即用delimeter背靠背发送两条消息。客户端读取第一条消息,但不读取第二条。如果稍后发送第三条消息,它将拾取第二条丢失的消息。bytes变量仅为一条消息的长度。这里有什么令人不安的错误吗?
(我删除了不相关的代码)

boost::asio::streambuf _recBuf;
ssl::stream<ip::tcp::socket&> _stream;

void SomeClass::read(const String &delim) {
    boost::asio::async_read_until(_stream, _recBuf, delim,
    boost::asio::bind_executor(_readStrand,
    [this, delim, self = shared_from_this()] (const boost::system::error_code& ec, const std::size_t bytes) {
        if (!ec) {

            if (bytes <= delim.size()) {
                return;
            }

            String data(boost::asio::buffers_begin(_recBuf.data()), boost::asio::buffers_begin(_recBuf.data()) + (bytes - delim.size()));
            _recBuf.consume(bytes);

            read(delim);
        }
        else if (ec == system::errc::operation_canceled) {
            
        }
        else {
            
        }
    }));
}

字符串
我尝试使用commit()方法和istream来改变读取_recBuf的方式。
调用available()方法on _stream.lowest_layer()查看客户端上有多少字节可用于等待这两个背靠背的消息,在读取初始消息后返回0。
编辑
下面是write方法。也许这会导致问题?

void TCPSBase::write(const Message &msg) {
String data = msg.toJsonStr() + '^';
asio::async_write(_stream, asio::buffer(data),
asio::bind_executor(_writeStrand,
[this, data, self = shared_from_this()] (const boost::system::error_code& ec, const std::size_t bytes) {
    LogHandler lh(Logger::getInstance(), "TCPSBase::write::lambda");
    if (!ec) {
        lh.d("wrote " + std::to_string(bytes) + " bytes");
        lh.d(data);
    }
    else if (ec == system::errc::operation_canceled) {
        lh.d("canceled");
    }
    else {
        lh.e("disconnecting after error during write: " + ec.message());
        stop();
    }
}));


}

ycggw6v2

ycggw6v21#

调用available()方法on _stream.lowest_layer()查看有多少字节可用会导致错误的文件描述符异常。
这意味着套接字变得无效,例如,因为它被关闭。
为了检查问题的其余部分,我使用BARE最小代码使您的代码自包含:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <iostream>
namespace asio = boost::asio;
namespace ssl = asio::ssl;
using namespace std::chrono_literals;
using error_code = boost::system::error_code;
using asio::ip::tcp;

static ssl::context s_ctx{ssl::context::sslv23_server};

struct SomeClass : std::enable_shared_from_this<SomeClass> {
    SomeClass(tcp::socket&& s) : _stream(std::move(s), s_ctx) {}

    void start() {
        _stream.async_handshake(ssl::stream_base::server, [this, self = shared_from_this()](error_code ec) {
            std::cout << "Handshake: " << ec.message() << std::endl;
            if (!ec)
                read("\n");
        });
    }

  private:
    asio::streambuf                     _recBuf;
    ssl::stream<tcp::socket>            _stream;
    asio::strand<asio::any_io_executor> _readStrand{_stream.get_executor()};
    using String = std::string;

    void read(String const& delim) {
        asio::async_read_until(
            _stream, _recBuf, delim,
            bind_executor( //
                _readStrand, [this, delim, self = shared_from_this()](error_code const& ec, size_t bytes) {
                    if (!ec || (ec == asio::error::eof && bytes)) {
                        if (bytes <= delim.size()) {
                            return; // TODO review?!
                        }

                        String data(asio::buffers_begin(_recBuf.data()),
                                    asio::buffers_begin(_recBuf.data()) + (bytes - delim.size()));
                        _recBuf.consume(bytes);

                        std::cout << "Received:  " << quoted(data) << std::endl;
                        std::cout << "Available: " << _recBuf.size() << "/"
                                  << _stream.lowest_layer().available() << std::endl;

                        read(delim);
                    } else if (ec == asio::error::operation_aborted) {

                    } else {
                    }
                }));
    }
};

void accept_loop(tcp::acceptor& acc) {
    acc.async_accept(make_strand(acc.get_executor()), [&acc](error_code ec, tcp::socket s) {
        if (!ec) {
            std::cout << "Accepted from " << s.remote_endpoint() << std::endl;
            std::make_shared<SomeClass>(std::move(s))->start();
            accept_loop(acc);
        }
    });
}

int main() {
    s_ctx.set_default_verify_paths();
    s_ctx.use_private_key_file("server.pem", ssl::context::pem);
    s_ctx.use_certificate_file("server.pem", ssl::context::pem);

    asio::io_context ioc(1);
    tcp::acceptor acc(ioc, {{}, 6443});
    accept_loop(acc);

    ioc.run();
    ioc.run_for(30s);
}

字符串
使用server.pem和客户端

openssl s_client -connect 127.0.0.1:6443 <<< $'FIRST\nSECOND\nTHIRD\nFOURTH'


精确显示预期的

Accepted from 127.0.0.1:40198
Handshake: Success
Received:  "FIRST"
Available: 20/0
Received:  "SECOND"
Available: 13/0
Received:  "THIRD"
Available: 7/0
Received:  "FOURTH"
Available: 0/0


任何剩下的批评都将是

  • 不是特别有效的字符串提取
  • bind_executor,它比首先将套接字与串相关联要差(在我的代码中,我已经这样做了,所以会话类(SomeClass)中的整个串都是冗余的)
  • 事实上,你比较错误代码,特别是在不同的类别-这将不起作用。相反,比较asio::error::operation_aborted或实际上对应的error_condition,如果可用(阅读这里了解更多:测试特定的错误条件)
  • bytes <= delim.size()时,静默地中断读循环(以及可能的连接)。当客户端发送空消息时,这看起来像“刚刚挂断”。
  • 忘记处理部分成功(例如,在中断时)-在这种特殊情况下,您可能希望在截断消息之前检查中断的存在。(我的代码还没有做这部分)

相关问题