我想创建一个简单的WebSocket服务器。我想处理传入的消息并发送响应,但我得到一个错误:
error: captured variable cannot escape `FnMut` closure body
--> src\main.rs:32:27
|
32 | incoming.for_each(|m| async {
| _________________________-_^
| | |
| | inferred to be a `FnMut` closure
33 | | match m {
34 | | // Error here...
35 | | Ok(message) => do_something(message, db, &mut outgoing).await,
36 | | Err(e) => panic!(e)
37 | | }
38 | | }).await;
| |_____^ returns a reference to a captured variable which escapes the closure body
|
= note: `FnMut` closures only have access to their captured variables while they are executing...
= note: ...therefore, they cannot allow references to captured variables to escape
字符串
这在Stack Overflow上有几个点击,但我没有看到我的代码中有任何变量正在转义的地方。Bloc块不会并发运行,所以我没有看到任何问题。此外,我觉得我正在做一些非常简单的事情:我得到了一个类型,它允许我将数据发送回客户端,但是当在blog块中使用对它的引用时,它给出了一个编译错误。这个错误只在我使用outgoing
或db
变量的时候发生。
这是我的代码(错误在handle_connection
函数中):
main.rs
use tokio::net::{TcpListener, TcpStream};
use std::net::SocketAddr;
use std::sync::Arc;
use futures::{StreamExt, SinkExt};
use tungstenite::Message;
use tokio_tungstenite::WebSocketStream;
struct DatabaseConnection;
#[tokio::main]
async fn main() -> Result<(), ()> {
listen("127.0.0.1:3012", Arc::new(DatabaseConnection)).await
}
async fn listen(address: &str, db: Arc<DatabaseConnection>) -> Result<(), ()> {
let try_socket = TcpListener::bind(address).await;
let mut listener = try_socket.expect("Failed to bind on address");
while let Ok((stream, addr)) = listener.accept().await {
tokio::spawn(handle_connection(stream, addr, db.clone()));
}
Ok(())
}
async fn handle_connection(raw_stream: TcpStream, addr: SocketAddr, db: Arc<DatabaseConnection>) {
let db = &*db;
let ws_stream = tokio_tungstenite::accept_async(raw_stream).await.unwrap();
let (mut outgoing, incoming) = ws_stream.split();
// Adding 'move' does also not work
incoming.for_each(|m| async {
match m {
// Error here...
Ok(message) => do_something(message, db, &mut outgoing).await,
Err(e) => panic!(e)
}
}).await;
}
async fn do_something(message: Message, db: &DatabaseConnection, outgoing: &mut futures_util::stream::SplitSink<WebSocketStream<TcpStream>, Message>) {
// Do something...
// Send some message
let _ = outgoing.send(Message::Text("yay".to_string())).await;
}
型
货物清单
[dependencies]
futures = "0.3.*"
futures-channel = "0.3.*"
futures-util = "0.3.*"
tokio = { version = "0.2.*", features = [ "full" ] }
tokio-tungstenite = "0.10.*"
tungstenite = "0.10.*"
型
当使用async move
时,我得到以下错误:
验证码
incoming.for_each(|m| async move {
let x = &mut outgoing;
let b = db;
}).await;
型
错误
error[E0507]: cannot move out of `outgoing`, a captured variable in an `FnMut` closure
--> src\main.rs:33:38
|
31 | let (mut outgoing, incoming) = ws_stream.split();
| ------------ captured outer variable
32 |
33 | incoming.for_each(|m| async move {
| ______________________________________^
34 | | let x = &mut outgoing;
| | --------
| | |
| | move occurs because `outgoing` has type `futures_util::stream::stream::split::SplitSink<tokio_tungstenite::WebSocketStream<tokio::net::tcp::stream::TcpStream>, tungstenite::protocol::message::Message>`, which does not implement the `Copy` trait
| | move occurs due to use in generator
35 | | let b = db;
36 | | }).await;
| |_____^ move out of `outgoing` occurs here
型
2条答案
按热度按时间kxe2p93d1#
FnMut
是一个匿名结构体,由于FnMut
捕获了&mut outgoing
,因此它成为该匿名结构体内部的字段,并且该字段将在每次调用FnMut
时使用,它可以被多次调用。(通过返回或移动到另一个作用域等)您的程序将无法使用该字段进行进一步的调用,出于安全考虑,Rust Spider不允许您这样做(对于您的两种情况)。在你的例子中,我们可以使用它作为每次调用的参数,而不是捕获
&mut outgoing
,这样我们就可以保留outgoing
的所有权。你可以通过使用futures-rs中的fold来做到这一点:字符串
这可能看起来有点棘手,但它确实完成了这项工作,我们使用常量累加器(
outgoing
),它将用作FnMut
的参数。Playground(感谢@所罗门Ucko创建可复制的示例)
另请参阅:
FnMut
closure, which is a captor at the same time的rryofs0p2#
另一种方法似乎对我来说是用Mutex Package 它:
字符串