如何检测Rust rocket_ws客户端与WebSocket断开连接

iecba09b  于 11个月前  发布在  其他
关注(0)|答案(1)|浏览(128)

rocket_ws文档(https://api.rocket.rs/v0.5/rocket_ws/)中,我知道我可以用这段代码与客户端建立WebSocket连接:

#[get("/echo?channel")]
fn echo_channel(ws: ws::WebSocket) -> ws::Channel<'static> {
use rocket::futures::{SinkExt, StreamExt};

ws.channel(move |mut stream| Box::pin(async move {
    while let Some(message) = stream.next().await {
        let _ = stream.send(message?).await;
    }

    Ok(())
}))

字符串
}
但是,我如何检测到连接已关闭,客户端已断开连接?
这个例子只展示了用stream.next()阅读消息的用例,但是如果我不期望从客户端收到消息,而只是想用let mut interval = interval(Duration::from_secs(10));定期向他发送新值(用类似let _ = stream.send(ws::Message::Text(json!(reading).to_string())).await;的东西),该怎么办?

zzoitvuj

zzoitvuj1#

要检测客户端何时从WebSocket断开连接,您可以侦听从客户端发送的Close消息。
代码看起来像这样:

Some(Ok(message)) = stream.next() => {
    match message {
      ws::Message::Close(close_frame) => {
          // Handle Close message
          println!("Received Close message: {:?}", close_frame);
          let close_frame = ws::frame::CloseFrame {
              code: ws::frame::CloseCode::Normal,
              reason: "Client disconected".to_string().into(),
          };
          let _ = stream.close(Some(close_frame)).await;
          break;
        }
}

字符串
因此,在Rust中使用rocket_ws处理websockets的整个代码看起来像这样:

#[get("/ws")]
pub fn echo_channel(ws: ws::WebSocket) -> ws::Channel<'static> {
    use rocket::futures::{SinkExt, StreamExt};

    ws.channel(move |mut stream: ws::stream::DuplexStream| {
        Box::pin(async move {
            let mut interval = interval(Duration::from_secs(10));

            tokio::spawn(async move {
                loop {
                    tokio::select! {
                        _ = interval.tick() => {
                            // Send message every 10 seconds
                            let reading = get_latest_readings().await.unwrap();
                            let _ = stream.send(ws::Message::Text(json!(reading).to_string())).await;
                            // println!("Sent message");
                        }
                        Some(Ok(message)) = stream.next() => {
                            match message {
                                ws::Message::Text(text) => {
                                    // Handle Text message
                                    println!("Received Text message: {}", text);
                                }
                                ws::Message::Binary(data) => {
                                    // Handle Binary message
                                    println!("Received Binary message: {:?}", data);
                                }
                                ws::Message::Close(close_frame) => {
                                    // Handle Close message
                                    println!("Received Close message: {:?}", close_frame);
                                    let close_frame = ws::frame::CloseFrame {
                                        code: ws::frame::CloseCode::Normal,
                                        reason: "Client disconected".to_string().into(),
                                    };
                                    let _ = stream.close(Some(close_frame)).await;
                                    break;
                                }
                                ws::Message::Ping(ping_data) => {
                                    // Handle Ping message
                                    println!("Received Ping message: {:?}", ping_data);
                                }
                                ws::Message::Pong(pong_data) => {
                                    // Handle Pong message
                                    println!("Received Pong message: {:?}", pong_data);
                                }
                                _ => {
                                    println!("Received other message: {:?}", message);
                                }
                            }
                        }
                        else => {
                            println!("Connection closed");
                            let close_frame = ws::frame::CloseFrame {
                                code: ws::frame::CloseCode::Normal,
                                reason: "Client disconected".to_string().into(),
                            };
                            let _ = stream.close(Some(close_frame)).await;
                            // The connection is closed by the client
                            break;
                        }
                    }
                }
            });

            tokio::signal::ctrl_c().await.unwrap();
            Ok(())
        })
    })
}

相关问题