当我运行以下测试时,
TEST_CASE("DummyNetwork_Basic") {
// start a network at port 3000
DummyNetwork network3000 = DummyNetwork(3000);
network3000.listenForIncomingConnections();
// start a network at port 3001
DummyNetwork network3001 = DummyNetwork(3001);
network3001.listenForIncomingConnections();
// network 3001 connects to network 3000
network3001.connect(3000);
// sleep to give time for connection
std::this_thread::sleep_for(std::chrono::seconds(50));
// network 3000 sends message to network 3001
network3000.send(3001, NetworkMessage{1, "hi"});
// network 3001 calls a blocking function to receive messages
NetworkMessage receivedOnN2_m1 = network3001.receiveMessageWithTypes(std::vector<int>{1}, 3000);
bool result1 = receivedOnN2_m1.encodedData_ == "hi";
CHECK(result1);
// network 3001 sends messages to network 3000
network3001.send(3000, NetworkMessage{2, "hello"});
network3001.send(3000, NetworkMessage{1, "hii"});
// network 3000 attempts to receive messages via blocking read
NetworkMessage receivedOnN1_m1 = network3000.receiveMessageWithTypes(std::vector<int>{2}, 3001);
NetworkMessage receivedOnN1_m2 = network3000.receiveMessageWithTypes(std::vector<int>{1}, 3001);
// clean up
network3000.shutdown();
network3001.shutdown();
bool result2 = receivedOnN1_m1.encodedData_ == "hello";
bool result3 = receivedOnN1_m2.encodedData_ == "hii";
CHECK(result2);
CHECK(result3);
if (result1 && result2 && result3) {
std::cout << "DummyNetwork_Basic Passed" << std::endl;
} else {
std::cout << "DummyNetwork_Basic Passed" << std::endl;
}
}
实际上,我看到的唯一输出(稍后会有意义)是
DummyNetwork3000: Created
DummyNetwork3000: Listening at port3000
DummyNetwork3001: Created
DummyNetwork3001: Listening at port3001
*thread sleeps to give time for connection*
DummyNetwork3000: Can't send. Not connected to port3001
下面是DummyNetwork的头文件。
#pragma once
#include <boost/asio.hpp>
#include <boost/unordered_map.hpp>
#include <vector>
#include "NetworkMessage.hpp"
class DMessageReceival {
public:
DMessageReceival(NetworkMessage message, std::string sender);
NetworkMessage message_;
std::string sender_;
};
class DConnection {
public:
std::shared_ptr<boost::asio::ip::tcp::socket> socket_;
std::string otherPort_;
std::string otherName_;
std::atomic<bool> disconnectRequested_{false};
DConnection(std::shared_ptr<boost::asio::ip::tcp::socket> socket, std::string otherPort, std::string otherName);
};
class DummyNetwork {
public:
DummyNetwork(int port);
void listenForIncomingConnections();
void send(int port, const NetworkMessage& msg);
void connect(int port);
void disconnect(int port);
NetworkMessage receiveMessageWithTypes(const std::vector<int>& types, int port = 0);
void shutdown();
private:
int port_;
boost::asio::io_service ioService_;
boost::unordered_map<std::string, std::shared_ptr<DConnection>> connections_;
boost::unordered_map<int, std::vector<DMessageReceival>> messages_;
std::atomic<bool> isListening_{false};
std::atomic<bool> shutdownRequested_{false};
std::condition_variable messagesConditionVariable_;
std::mutex connectionsMutex_;
std::mutex messagesMutex_;
void printDebugMessage(std::string input);
bool isConnected(int name);
void handleAccept(std::shared_ptr<boost::asio::ip::tcp::socket> socket, const boost::system::error_code& error);
void handleHandshakeRead(std::shared_ptr<boost::asio::ip::tcp::socket> socket, std::string buffer, const boost::system::error_code& error, std::size_t bytes_read);
void handleRead(std::shared_ptr<DConnection> connection, std::string buffer, const boost::system::error_code& error, std::size_t bytes_read);
void handleWrite(const boost::system::error_code& error, const NetworkMessage& msg, std::string other_name);
void handleConnect(const boost::system::error_code& error, std::shared_ptr<boost::asio::ip::tcp::socket> socket, std::string name);
void handleHandshakeWrite(const boost::system::error_code& error, std::shared_ptr<DConnection> conn);
};
这里是实现
#include "../include/DummyNetwork.hpp"
#include <iostream>
#include <functional>
#include <boost/bind.hpp>
DMessageReceival::DMessageReceival(NetworkMessage message, std::string sender) {
message_ = message;
sender_ = sender;
}
DConnection::DConnection(std::shared_ptr<boost::asio::ip::tcp::socket> socket, std::string otherPort, std::string otherName) {
socket_ = socket;
otherPort_ = otherPort;
otherName_ = otherName;
}
void DummyNetwork::printDebugMessage(std::string input) {
std::cout << "DummyNetwork" << std::to_string(port_) << ": " << input << std::endl;
}
DummyNetwork::DummyNetwork(int port) : port_(port), ioService_() {
boost::asio::io_service::work work(ioService_);
std::thread ioServiceThread([this]() {
ioService_.run();
});
ioServiceThread.detach();
printDebugMessage("Created");
}
void DummyNetwork::listenForIncomingConnections() {
if (!isListening_.exchange(true)) {
boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::tcp::v4(), port_);
boost::asio::ip::tcp::acceptor acceptor(ioService_, endpoint);
std::shared_ptr<boost::asio::ip::tcp::socket> socket(new boost::asio::ip::tcp::socket(ioService_));
printDebugMessage("Listening at port" + std::to_string(port_));
acceptor.async_accept(*socket, boost::bind(&DummyNetwork::handleAccept, this, socket,
boost::asio::placeholders::error));
}
}
bool DummyNetwork::isConnected(int name) {
std::lock_guard<std::mutex> lock(connectionsMutex_);
const auto it = connections_.find(std::to_string(name));
if (it != connections_.end()) {
return true;
}
return false;
}
void DummyNetwork::handleHandshakeRead(std::shared_ptr<boost::asio::ip::tcp::socket> socket, std::string buffer, const boost::system::error_code& error, std::size_t bytes_read) {
NetworkMessage msg = NetworkMessage::decode(buffer);
if (msg.type_ == 0 && !isConnected(std::stoi(msg.encodedData_))) {
std::lock_guard<std::mutex> lock(connectionsMutex_);
std::string other_port = std::to_string(socket->remote_endpoint().port());
std::shared_ptr<DConnection> conn = std::make_shared<DConnection>(socket, other_port, msg.encodedData_);
connections_[conn->otherName_] = conn;
printDebugMessage("Resolved port" + conn->otherPort_ + "to be port" + conn->otherName_);
char new_buffer[10000];
socket->async_read_some(boost::asio::buffer(new_buffer),
boost::bind(&DummyNetwork::handleRead, this, conn, new_buffer,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
}
void DummyNetwork::handleRead(std::shared_ptr<DConnection> connection, std::string buffer, const boost::system::error_code& error, std::size_t bytes_read) {
NetworkMessage msg = NetworkMessage::decode(buffer);
if (msg.type_ != 0 && !shutdownRequested_ && !connection->disconnectRequested_ && connection->socket_->is_open()) {
std::lock_guard<std::mutex> lock(messagesMutex_);
messages_[msg.type_].push_back(DMessageReceival(msg, connection->otherName_));
messages_[0].push_back(DMessageReceival(msg, connection->otherName_));
printDebugMessage("Received " + msg.encodedData_ + " from " + connection->otherName_);
messagesConditionVariable_.notify_all();
char new_buffer[10000];
connection->socket_->async_read_some(boost::asio::buffer(new_buffer),
boost::bind(&DummyNetwork::handleRead, this, connection, new_buffer,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
}
void DummyNetwork::handleAccept(std::shared_ptr<boost::asio::ip::tcp::socket> socket,
const boost::system::error_code& error) {
isListening_ = false;
printDebugMessage("Accepted connection to port" + socket->remote_endpoint().port());
char buffer[10000];
socket->async_read_some(boost::asio::buffer(buffer),
boost::bind(&DummyNetwork::handleHandshakeRead, this, socket, buffer,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
listenForIncomingConnections();
}
void DummyNetwork::send(int port, const NetworkMessage& msg) {
if (!shutdownRequested_ && !isConnected(port)) {
printDebugMessage("Can't send. Not connected to port" + std::to_string(port));
}
std::lock_guard<std::mutex> lock(connectionsMutex_);
std::shared_ptr<DConnection> conn = connections_[std::to_string(port)];
std::shared_ptr<boost::asio::ip::tcp::socket> socket = conn->socket_;
socket->async_write_some(boost::asio::buffer(msg.encode(),msg.encode().length()),
boost::bind(&DummyNetwork::handleWrite, this,boost::asio::placeholders::error, msg, conn->otherName_));
printDebugMessage("Requested to write msg " + msg.encodedData_ + " to port" + std::to_string(port));
}
void DummyNetwork::handleWrite(const boost::system::error_code& error, const NetworkMessage& msg, std::string other_name) {
printDebugMessage("Wrote " + msg.encodedData_ + " to " + other_name);
}
void DummyNetwork::handleConnect(const boost::system::error_code& error, std::shared_ptr<boost::asio::ip::tcp::socket> socket, std::string name) {
if (!shutdownRequested_) {
printDebugMessage("Can't connect. Shutdown");
}
std::lock_guard<std::mutex> lock(connectionsMutex_);
std::shared_ptr<DConnection> conn = std::make_shared<DConnection>(socket, name, name);
connections_[conn->otherName_] = conn;
NetworkMessage msg = NetworkMessage{0, std::to_string(port_)};
socket->async_write_some(boost::asio::buffer(msg.encode(),msg.encode().length()),
boost::bind(&DummyNetwork::handleHandshakeWrite, this,boost::asio::placeholders::error, conn));
}
void DummyNetwork::handleHandshakeWrite(const boost::system::error_code& error, std::shared_ptr<DConnection> conn) {
printDebugMessage("Connected to port" + conn->otherName_);
}
void DummyNetwork::connect(int name) {
if (isConnected(name)) {
return;
}
boost::asio::ip::tcp::resolver resolver(ioService_);
auto new_socket = std::make_shared<boost::asio::ip::tcp::socket>(ioService_);
boost::asio::ip::tcp::resolver::query query("127.0.0.1", std::to_string(name));
boost::asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator;
new_socket->async_connect(endpoint,
boost::bind(&DummyNetwork::handleConnect, this,
boost::asio::placeholders::error, new_socket, std::to_string(name)));
}
void DummyNetwork::disconnect(int port) {
if (isConnected(port)) {
std::lock_guard<std::mutex> lock(connectionsMutex_);
const auto it = connections_.find(std::to_string(port));
if (it != connections_.end()) {
std::shared_ptr<DConnection> conn = it->second;
conn->disconnectRequested_ = true;
conn->socket_->close();
printDebugMessage("Closed connection to " + conn->otherName_);
}
connections_.erase(std::to_string(port));
}
}
NetworkMessage DummyNetwork::receiveMessageWithTypes(const std::vector<int>& types, int port) {
std::unique_lock<std::mutex> lock(messagesMutex_);
while (port == 0 || isConnected(port)) {
if (port == 0) {
if (messages_[0].size() > 0) {
DMessageReceival mr = messages_[0][0];
messages_[0].erase(messages_[0].begin());
for (auto it = messages_[mr.message_.type_].begin();
it != messages_[mr.message_.type_].end(); it++) {
if ((it->message_.encodedData_ ==
mr.message_.encodedData_) &&
(it->sender_ == mr.sender_)) {
messages_[mr.message_.type_].erase(it);
lock.unlock();
return mr.message_;
}
}
} else {
messagesConditionVariable_.wait(lock);
}
} else {
for (auto type: types) {
if (messages_[type].size() > 0) {
DMessageReceival mr = messages_[type][0];
messages_[type].erase(messages_[type].begin());
for (auto it = messages_[0].begin();
it != messages_[0].end(); it++) {
if ((it->message_.encodedData_ ==
mr.message_.encodedData_) &&
(it->sender_ == mr.sender_)) {
messages_[0].erase(it);
lock.unlock();
return mr.message_;
}
}
}
}
messagesConditionVariable_.wait(lock);
}
}
return {};
}
void DummyNetwork::shutdown() {
shutdownRequested_ = true;
std::lock_guard<std::mutex> lock(connectionsMutex_);
for (const auto& [key, value] : connections_) {
value->disconnectRequested_ = true;
value->socket_->close();
printDebugMessage("Closed connection to " + value->otherName_);
}
connections_.clear();
}
我怀疑问题出在我对boost库的使用上,特别是我对io_service对象的使用,要么a)没有正确地将工作发布到它的队列中,要么b)没有正确地执行队列中的内容。
1条答案
按热度按时间m2xkgtsf1#
这是一个很大的代码。
只有几件事立即脱颖而出:
没道理啊你要把一个整数加到一个字符串中。如果你启用了编译器警告,你就会知道你什么时候写了这样的boo-boos。你在代码的不同部分正确地(尽管重复)了这一点。
大得多:
这也说不通。
new_buffer
是局部变量。根据定义,您将无法在handleRead
中访问它。此外,它将在封闭作用域的末尾停止存在。你有两次这样的错误:剩下的代码是。。非常非常复杂
isListening_
。它的价值似乎从来没有用处。error_code
。这是一个不必要的混乱配方DummyNetwork
上进行所有操作。这意味着它的责任过重。它充当监听器**,**充当(多个)连接。在这方面,我们必须问为什么你甚至有一个单独的DConnection
类型摆在首位。receiveMessagesWithTypes
闻起来像是20世纪80年代的select/poll循环。你现在用的是Asio,何必呢?messageConditionVariable_
,messagesMutex_
)。这似乎也不匹配异步IO使用Asio好asio::async_connect
buffer
的内容解释为有效的nul终止字符串,忽略bytes_read
shutdown
中有一个数据竞争,你在connectionsMutex
下调用close()
。但是,connection
可能被其他不使用相同锁的异步完成处理程序使用。使用strands或者认识到每个“网络”示例都有一个单线程,并将其与之接近,可能会更好。另外,考虑shutdown(...)
或cancel()
,而不是close()
。async_write_some
,它不能保证发送整个缓冲区。首选asio::async_write
。type_
在成员和encode()
结果中必然是重复的-我希望不是按值返回,因为这意味着在asio::buffer(msg.encode(), msg.encode().length())
中会有更多的生命周期错误导致Undefined Behaviour)。asio::buffer(msg.encode())
isConnected
是一个活泼的接口:根据定义,一旦connectionMutex_
被释放,返回的值就变得陈旧。一个更有用的接口可能会接受一个unique_lock
或假设调用者持有锁。send
仅检查shutdownRequested
iff!isConnected(port)
。这似乎是错误的这里有一个固定的尝试:当然,现在我们也注意到,
msg
在async_write
完成之前并没有生命周期保证。事实上,在每一次调用send
时,你都在传递一个临时的NetworkMessage
。唉。又是一辈子的错误。send
完成之前阻止下一个send
发生:这可能违反documented at
asio::async_write
的要求:async_write_some
函数来实现,称为组合操作。程序必须确保流在此操作完成之前不执行其他写操作(例如async_write、流的async_write_some
函数或任何其他执行写的组合操作)。我知道你的代码中有
async_write_some
,但这本身就是一个错误。您可能需要wellknown outbound queue patternconnection
,一些绑定到other_name
,一些甚至绑定到原始tcp::socket
示例。所有这些可能都应该是DConnection
上的成员,并使用shared_from_this
。async_read_some
,它有和async_write_some
相同的问题:不能保证它读取“完整的消息”。您可能需要在协议中添加帧,以便明确地知道消息何时完成。否则,您将不正确地解析部分消息,这会造成足够的混乱。但是,您也会将尾随数据解释为一条新消息,这将再次导致UB,因为它没有以类型开始,或者实际上可能不够长,甚至不足以包含这样的头数据。也许如果你描述一下你想实现什么,我就能帮助你展示我是如何实现的。与此同时,这里是我的中途审查纳入了许多上述想法:
(Un)live On Coliru
请注意,我无法测试逻辑,因为逻辑从一开始就不清楚,并且提供的实现无法工作(因此我没有比较)。提供这些代码只是为了说明我在此过程中提出的一些要点。
更新
完成替代实施
Live On Coliru
打印