我有一个简单的客户端-服务器设置,可以传输JPG字节。当在本地运行时,它工作得很好。但是当在互联网上传输JPG时,它会被损坏,当解码时会有严重的视觉伪像。
客户端是一个Rust时雄应用程序。它使用来自摄像头的jpeg流,并将jpeg字节推送到TCP套接字。
// Async application to run on the edge (Raspberry Pi)
// Reads MJPEG HTTP stream from provided URL and sends it to the CWS server over TCP
#![warn(rust_2018_idioms)]
use std::convert::TryFrom;
use futures::StreamExt;
use tokio::signal;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use std::error::Error;
use std::path::PathBuf;
use structopt::StructOpt;
//DEBUG
use std::fs;
#[derive(StructOpt, Debug)]
#[structopt(name = "basic")]
struct Opt {
#[structopt(long = "stream", required(true))]
stream: String,
#[structopt(long = "server", required(true))]
server: String,
}
async fn acquire_tcp_connection(server: &str) -> Result<TcpStream, Box<dyn Error>> {
// "127.0.0.1:6142"
loop {
match TcpStream::connect(server).await {
Ok(stream) => {
println!("Connected to server");
return Ok(stream);
}
Err(e) => {
println!("Failed to connect to server: {}", e);
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let opt = Opt::from_args();
let url = http::Uri::try_from(opt.stream).unwrap();
loop {
let mut tcp_conn = acquire_tcp_connection(&opt.server).await?;
// hyper client
let client = hyper::Client::new();
// Do the request
let res = client.get(url.clone()).await.unwrap();
// Check the status
if !res.status().is_success() {
eprintln!("HTTP request failed with status {}", res.status());
std::process::exit(1);
}
// https://docs.rs/mime/latest/mime/#what-is-mime
// Basically HTTP response content
let content_type: mime::Mime = res
.headers()
.get(http::header::CONTENT_TYPE)
.unwrap()
.to_str()
.unwrap()
.parse()
.unwrap();
assert_eq!(content_type.type_(), "multipart");
let boundary = content_type.get_param(mime::BOUNDARY).unwrap();
let stream = res.into_body();
// https://github.com/scottlamb/multipart-stream-rs
let mut stream = multipart_stream::parse(stream, boundary.as_str());
'outer: while let Some(p) = stream.next().await {
let p = p.unwrap();
// Split the jpeg bytes into chunks of 2048 bytes
for slice in p.body.chunks(2048) {
// DEBUG capture bytes to a file just for debugging
fs::write("tcp_debug.txt", &slice).expect("Unable to write file");
let tcp_result = tcp_conn.write(&slice).await;
match tcp_result {
Ok(_) => {
println!("Sent {} bytes", slice.len());
}
Err(e) => {
println!("Failed to send data: {}", e);
break 'outer;
}
}
}
}
}
Ok(())
}
服务器是一个Python应用程序,它接受TCP连接并对接收到的字节进行解码:
import asyncio
from threading import Thread
import cv2
import numpy as np
class SingleFrameReader:
"""
Simple API for reading a single frame from a video source
"""
def __init__(self, video_source, ..., tcp=False):
self._video_source = video_source
...
elif tcp:
self._stop_tcp_server = False
self._tcp_image = None
self._tcp_addr = video_source
self._tcp_thread = Thread(target=asyncio.run, args=(self._start_tcp_server(),)).start()
self.read = lambda: self._tcp_image
async def _start_tcp_server(self):
uri, port = self._tcp_addr.split(':')
port = int(port)
print(f"Starting TCP server on {uri}:{port}")
server = await asyncio.start_server(
self.handle_client, uri, port
)
addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
print(f'Serving on {addrs}')
async with server:
await server.serve_forever()
async def handle_client(self, reader, writer):
"""
Handle a client connection. Receive JPEGs from the client and have them ready to be ready
"""
client_addr = writer.get_extra_info('peername')
print(f"New connection from {client_addr}")
jpg_bytes = b''
while True:
if self._stop_tcp_server:
break
data = await reader.read(2**16)
#DEBUG
print(f"Received {len(data)} bytes from {client_addr}")
byes_file = open('tcp_bytes_server.txt', 'wb')
if not data:
print("Client disconnected")
break
#DEBUG the corruption issue
byes_file.write(data)
jpg_bytes += data
start_idx = jpg_bytes.find(b'\xff\xd8')
end_idx = jpg_bytes.find(b'\xff\xd9')
if start_idx != -1 and end_idx != -1:
nparr = np.frombuffer(jpg_bytes[start_idx:end_idx+2], np.uint8)
img_np = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img_np is None:
continue
self._tcp_image = img_np
jpg_bytes = jpg_bytes[end_idx+2:]
...
我试过发送不同大小的块,当一次发送整个图像时,图像伪影会更严重。
我捕获了从客户端(rust)发送到文件的字节和从服务器(python)接收到的字节。从服务器发送的字节比从rust发送的字节小约1.6KB,而从rust发送的字节小约2KB。并且第一个接收到的字节彼此不匹配。
同样,当在本地运行时,它工作得很顺利,你可以真实的看到摄像头流。当客户端和服务器被互联网分开时,字节似乎会被损坏。
1条答案
按热度按时间juzqafwq1#
要将字节写入您正在调用
AsyncWriteExt::write
的TCP套接字:并引用文档:
此函数将尝试写入
buf
的整个内容,但整个写入可能不会成功...如果返回值是
Ok(n)
,那么必须保证n〈= buf.len()。您将查看是否存在错误并记录字节数,但如果字节数小于缓冲区的长度,则不执行任何操作。
但是什么时候缓冲区不会被完全写入呢?通常情况下,当你的程序比网络快很多或者你的分片太大的时候就可能发生这种情况。看起来当你连接到本地主机的时候这不是问题,但是当你连接到远程系统的时候就有问题了。YMMV
如果你在用C语言编程,你需要做一个循环,前进一个指针等等。在Rust中你也可以做这些,但是调用
AsyncWriteExt::write_all
来为你做循环会更容易:注意,现在
tcp_result
是io::Result<()>
而不是io::Result<usize>
,因为现在不写入分片中的所有字节是错误的。同样要注意的是,当从TCP连接阅读数据时,即使使用Python,对等体也会有这个潜在的问题。这一行代码可能会执行部分读取,无论服务器是否写入了整个缓冲区:
但是这一行已经在连接数据并等待整个图像的循环中,所以它不会引起任何问题。