rust Future生成器闭包上的错误:捕获的变量无法转义`FnMut`闭包体

4urapxun  于 2024-01-08  发布在  其他
关注(0)|答案(2)|浏览(123)

我想创建一个简单的WebSocket服务器。我想处理传入的消息并发送响应,但我得到一个错误:

error: captured variable cannot escape `FnMut` closure body
  --> src\main.rs:32:27
   |
32 |       incoming.for_each(|m| async {
   |  _________________________-_^
   | |                         |
   | |                         inferred to be a `FnMut` closure
33 | |         match m {
34 | |             // Error here...
35 | |             Ok(message) => do_something(message, db, &mut outgoing).await,
36 | |             Err(e) => panic!(e)
37 | |         }
38 | |     }).await;
   | |_____^ returns a reference to a captured variable which escapes the closure body
   |
   = note: `FnMut` closures only have access to their captured variables while they are executing...
   = note: ...therefore, they cannot allow references to captured variables to escape

字符串
这在Stack Overflow上有几个点击,但我没有看到我的代码中有任何变量正在转义的地方。Bloc块不会并发运行,所以我没有看到任何问题。此外,我觉得我正在做一些非常简单的事情:我得到了一个类型,它允许我将数据发送回客户端,但是当在blog块中使用对它的引用时,它给出了一个编译错误。这个错误只在我使用outgoingdb变量的时候发生。
这是我的代码(错误在handle_connection函数中):

main.rs

use tokio::net::{TcpListener, TcpStream};
use std::net::SocketAddr;
use std::sync::Arc;
use futures::{StreamExt, SinkExt};
use tungstenite::Message;
use tokio_tungstenite::WebSocketStream;

struct DatabaseConnection;

#[tokio::main]
async fn main() -> Result<(), ()> {
    listen("127.0.0.1:3012", Arc::new(DatabaseConnection)).await
}

async fn listen(address: &str, db: Arc<DatabaseConnection>) -> Result<(), ()> {
    let try_socket = TcpListener::bind(address).await;
    let mut listener = try_socket.expect("Failed to bind on address");

    while let Ok((stream, addr)) = listener.accept().await {
        tokio::spawn(handle_connection(stream, addr, db.clone()));
    }

    Ok(())
}

async fn handle_connection(raw_stream: TcpStream, addr: SocketAddr, db: Arc<DatabaseConnection>) {
    let db = &*db;
    let ws_stream = tokio_tungstenite::accept_async(raw_stream).await.unwrap();

    let (mut outgoing, incoming) = ws_stream.split();

    // Adding 'move' does also not work
    incoming.for_each(|m| async {
        match m {
            // Error here...
            Ok(message) => do_something(message, db, &mut outgoing).await,
            Err(e) => panic!(e)
        }
    }).await;
}

async fn do_something(message: Message, db: &DatabaseConnection, outgoing: &mut futures_util::stream::SplitSink<WebSocketStream<TcpStream>, Message>) {
    // Do something...

    // Send some message
    let _ = outgoing.send(Message::Text("yay".to_string())).await;
}

货物清单

[dependencies]
futures = "0.3.*"
futures-channel = "0.3.*"
futures-util = "0.3.*"
tokio = { version = "0.2.*", features = [ "full" ] }
tokio-tungstenite = "0.10.*"
tungstenite = "0.10.*"


当使用async move时,我得到以下错误:

验证码

incoming.for_each(|m| async move {
    let x = &mut outgoing;
    let b = db;
}).await;

错误

error[E0507]: cannot move out of `outgoing`, a captured variable in an `FnMut` closure
  --> src\main.rs:33:38
   |
31 |       let (mut outgoing, incoming) = ws_stream.split();
   |            ------------ captured outer variable
32 | 
33 |       incoming.for_each(|m| async move {
   |  ______________________________________^
34 | |         let x = &mut outgoing;
   | |                      --------
   | |                      |
   | |                      move occurs because `outgoing` has type `futures_util::stream::stream::split::SplitSink<tokio_tungstenite::WebSocketStream<tokio::net::tcp::stream::TcpStream>, tungstenite::protocol::message::Message>`, which does not implement the `Copy` trait
   | |                      move occurs due to use in generator
35 | |         let b = db;
36 | |     }).await;
   | |_____^ move out of `outgoing` occurs here

kxe2p93d

kxe2p93d1#

FnMut是一个匿名结构体,由于FnMut捕获了&mut outgoing,因此它成为该匿名结构体内部的字段,并且该字段将在每次调用FnMut时使用,它可以被多次调用。(通过返回或移动到另一个作用域等)您的程序将无法使用该字段进行进一步的调用,出于安全考虑,Rust Spider不允许您这样做(对于您的两种情况)。
在你的例子中,我们可以使用它作为每次调用的参数,而不是捕获&mut outgoing,这样我们就可以保留outgoing的所有权。你可以通过使用futures-rs中的fold来做到这一点:

incoming
    .fold(outgoing, |mut outgoing, m| async move {
        match m {
            // Error here...
            Ok(message) => do_something(message, db, &mut outgoing).await,
            Err(e) => panic!(e),
        }

        outgoing
    })
    .await;

字符串
这可能看起来有点棘手,但它确实完成了这项工作,我们使用常量累加器(outgoing),它将用作FnMut的参数。
Playground(感谢@所罗门Ucko创建可复制的示例)

另请参阅:

rryofs0p

rryofs0p2#

另一种方法似乎对我来说是用Mutex Package 它:

let outgoing = Mutex::new(outgoing);
incoming
    .foreach(|m| async move {
        outgoing = outgoing.lock().unwrap();
        match m {
            // Error here...
            Ok(message) => do_something(message, db, &mut outgoing).await,
            Err(e) => panic!(e),
        }
    })
    .await;

字符串

相关问题