rust 忽略FuturesUnordered中的错误

aelbi1ox  于 2022-11-12  发布在  其他
关注(0)|答案(1)|浏览(125)

我有以下设置

use futures::{
    future,
    stream::{self, Stream, FuturesUnordered},
};
use tokio;

fn foo(futures: FuturesUnordered<impl futures::Future<Output = std::io::Result<impl std::fmt::Binary>>>) {}

fn bar(futures: FuturesUnordered<impl futures::Future<Output = impl std::fmt::Binary>>) {}

# [tokio::main]

async fn main() {
    let futures: FuturesUnordered<_> = (0..10).map(move |i| async move {
        let mut delay = core::time::Duration::from_secs(rand::Rng::gen_range(&mut rand::thread_rng(), 1..3));
        tokio::time::sleep(delay).await;
        Ok::<i32, std::io::Error>(i) // this line can't be changed
    }).collect();

    // this is ok
    foo(futures);

    // this will not compile
    bar(futures);
}

playground link
我希望能够用futures调用bar函数。假设我不能改变futures的初始化方式,我如何忽略流中的错误,只处理那些不是错误的元素?
这里有一个类似的SO问题:How can I remove or otherwise ignore errors when processing a stream?
但是答案使用了stream::iter_ok,我认为它已经过时了,还是什么?我希望下面的代码能起作用:

use futures::{
    future,
    stream::{self, Stream, FuturesUnordered},
    StreamExt,
};
use tokio;

fn foo(futures: FuturesUnordered<impl futures::Future<Output = std::io::Result<impl std::fmt::Binary>>>) {}

async fn bar(futures: FuturesUnordered<impl futures::Future<Output = impl std::fmt::Binary>>) {
    futures.for_each(|n| {
        async move {
            println!("Success on {:b}", n);
        }
    }).await
}

# [tokio::main]

async fn main() {
    let futures: FuturesUnordered<_> = (0..10).map(move |i| async move {
        let mut delay = core::time::Duration::from_secs(rand::Rng::gen_range(&mut rand::thread_rng(), 1..3));
        tokio::time::sleep(delay).await;
        Ok::<i32, std::io::Error>(i)
    }).collect();

    let futures = futures
        .then(|r| future::ok(iter_ok::<_, ()>(r)))
        .flatten();

    bar(futures).await;
}

playground link

qnakjoqk

qnakjoqk1#

您可以在另一个流的成功值上创建一个流,如下所示:

use futures::{
    stream::{self, Stream, FuturesUnordered},
    StreamExt,
};
use tokio;

async fn bar(futures: impl Stream<Item = impl std::fmt::Binary>) {
    futures.for_each(|n| {
        async move {
            println!("Success on {:b}", n);
        }
    }).await
}

# [tokio::main]

async fn main() {
    let futures: FuturesUnordered<_> = (0..10).map(move |i| async move {
        let delay = core::time::Duration::from_secs(rand::Rng::gen_range(&mut rand::thread_rng(), 1..3));
        tokio::time::sleep(delay).await;
        Ok::<i32, std::io::Error>(i)
    }).collect();

    let futures = futures
        .then(|r| async { stream::iter(r.into_iter()) })
        .flatten();

    bar(futures).await;
}

注意:由于.then()返回的类型包含闭包,因此不能命名,我们必须在bar()中更改futures的类型。

相关问题