我想并发处理一个vec的项目。基本上,每个项目都涉及到一些I/O,但它们并不相互依赖。我可以使用futures::join_all
(或在我的情况下futures::try_join_all
)来实现这一点。
由于我不关心处理结果(除了try_join_all
情况下的错误),所以我最终并不想要单位为(Vec<()>
)的vec;这只是一个无用的分配,其中()
(或Result<(), Error>
)就足够了。futures
crate文档中提到,如果需要,可以直接使用FuturesUnordered
,所以我尝试了一下(playground):
use futures::{
stream::{FuturesUnordered, StreamExt},
Future,
};
fn main() {
tokio::spawn(async move {
let foos = [Foo, Foo, Foo];
// process_foos(&[]).await;
join_all_discard(foos.iter().map(|foo| process_foo(foo))).await;
});
}
async fn process_foo(foo: &Foo) {
// do something async with foo
}
async fn join_all_discard<I>(iter: I) -> ()
where
I: IntoIterator,
I::Item: Future<Output = ()>,
{
let mut stream: FuturesUnordered<_> = iter.into_iter().collect();
while let Some(()) = stream.next().await {}
}
错误是
error: higher-ranked lifetime error
--> src/lib.rs:9:5
|
9 | / tokio::spawn(async move {
10 | | let foos = [Foo, Foo, Foo];
11 | | join_all_discard(foos.iter().map(|foo| process_foo(foo))).await;
12 | | });
| |______^
|
= note: could not prove `[async block@src/lib.rs:9:18: 12:6]: std::marker::Send`
编译器错误仅在当前调用tokio::spawn
时才会出现(这有点道理,因为它可能需要将future发送到不同的线程)
使用join_all_discard(foos.iter().map(process_foo)).await
(不带闭包)消除了错误,使用futures::join_all
也是如此,但我自己的实现有缺陷。我迷路了。我怀疑有些东西与join_all_discard
上的泛型边界有关。
- P.S.为了解决真实的的问题,我写了
try_join_all_discard
,它表现出相同的错误,看起来像这样:*
async fn try_join_all_discard<I, E>(iter: I) -> Result<(), E>
where
I: IntoIterator,
I::Item: Future<Output = Result<(), E>>,
{
let mut stream: FuturesUnordered<_> = iter.into_iter().collect();
loop {
match stream.next().await {
Some(Ok(())) => continue,
Some(Err(e)) => break Err(e),
None => break Ok(()),
}
}
}
1条答案
按热度按时间a1o7rhls1#
像这样更改trait边界