我想在2s过去之前和2s过去之后以不同的方式处理传入的WebSocket消息。
这很棘手,因为我们只有一个read
(显然不能被克隆),而且也不喜欢被传递给函数。
我想在处理消息和计时器时使用select!
,然后在计时器融合第一个select!
之后,在第2阶段再次使用select!
,将read
的可变借位传递给不同的处理函数。
原来由于固定,我根本无法将read
传递给函数。
use std::time::Duration;
use futures_util::{Stream, StreamExt};
use tokio_tungstenite::connect_async;
async fn wait_2_seconds() {
tokio::time::sleep(Duration::from_secs(2)).await;
}
async fn process_messages(read: &mut impl Stream) {
while let Some(m) = read.next().await {
let data = m.unwrap().into_data();
println!("{m:?}");
}
}
#[tokio::main]
async fn main() {
let url = url::Url::parse("wss://127.0.0.1:12345").unwrap();
let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
// don't plan on sending anything to ws server so discard write half
let (_, read) = ws_stream.split();
tokio::select!{
_ = process_messages(&mut read) => {},
_ = wait_2_seconds() => {},
};
println!("phase 1 complete");
}
所以我不确定如何将read
传递(mut借用)给函数。
错误消息说考虑使用Box::pin
,但我意识到我甚至知道 * 如何 * 在这种情况下使用Box::pin
。我尝试将process_messages
参数类型更改为Box<Pin<&mut impl Stream>>
,并意识到我需要帮助。
2条答案
按热度按时间ssgvzors1#
只需将
read
固定在main()
中即可,您可以选择Box::pin()
it或更好的tokio::pin!()
it(或futures::pin_mut()
,甚至是每夜std::pin::pin!()
),还需要指定流的Item
类型,然后将Pin<&mut impl Stream<Item = ...>>
固定在process_messages()
中:vnjpjtjt2#
错误消息说考虑使用
Box::pin
,但后来我意识到我甚至知道如何在这种情况下使用Box::pin
。我试图将process_messages
参数类型更改为Box<Pin<&mut impl Stream>>
,并意识到我需要帮助。Box::pin
返回Pin<Box<_>>
,而不是Box<Pin<_>>
。还有一些小的调整和Box::pin
编译:Rustexplorer(编译,但在运行时死机,因为你不能绑定到套接字)。