我有以下设置
use futures::{
future,
stream::{self, Stream, FuturesUnordered},
};
use tokio;
fn foo(futures: FuturesUnordered<impl futures::Future<Output = std::io::Result<impl std::fmt::Binary>>>) {}
fn bar(futures: FuturesUnordered<impl futures::Future<Output = impl std::fmt::Binary>>) {}
# [tokio::main]
async fn main() {
let futures: FuturesUnordered<_> = (0..10).map(move |i| async move {
let mut delay = core::time::Duration::from_secs(rand::Rng::gen_range(&mut rand::thread_rng(), 1..3));
tokio::time::sleep(delay).await;
Ok::<i32, std::io::Error>(i) // this line can't be changed
}).collect();
// this is ok
foo(futures);
// this will not compile
bar(futures);
}
playground link
我希望能够用futures
调用bar
函数。假设我不能改变futures
的初始化方式,我如何忽略流中的错误,只处理那些不是错误的元素?
这里有一个类似的SO问题:How can I remove or otherwise ignore errors when processing a stream?
但是答案使用了stream::iter_ok
,我认为它已经过时了,还是什么?我希望下面的代码能起作用:
use futures::{
future,
stream::{self, Stream, FuturesUnordered},
StreamExt,
};
use tokio;
fn foo(futures: FuturesUnordered<impl futures::Future<Output = std::io::Result<impl std::fmt::Binary>>>) {}
async fn bar(futures: FuturesUnordered<impl futures::Future<Output = impl std::fmt::Binary>>) {
futures.for_each(|n| {
async move {
println!("Success on {:b}", n);
}
}).await
}
# [tokio::main]
async fn main() {
let futures: FuturesUnordered<_> = (0..10).map(move |i| async move {
let mut delay = core::time::Duration::from_secs(rand::Rng::gen_range(&mut rand::thread_rng(), 1..3));
tokio::time::sleep(delay).await;
Ok::<i32, std::io::Error>(i)
}).collect();
let futures = futures
.then(|r| future::ok(iter_ok::<_, ()>(r)))
.flatten();
bar(futures).await;
}
1条答案
按热度按时间qnakjoqk1#
您可以在另一个流的成功值上创建一个流,如下所示:
注意:由于
.then()
返回的类型包含闭包,因此不能命名,我们必须在bar()
中更改futures
的类型。