我尝试使用StreamExt和TryStreamExt如下:
use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};
use futures::stream::{self, StreamExt, TryStreamExt};
async fn read_file(name: &str) -> io::Result<[u8; 10]> {
let mut f = File::open(name).await?;
let mut buffer = [0; 10];
// read up to 10 bytes... yes I know this is not safe but...
let n = f.read(&mut buffer[..]).await?;
Ok(buffer)
}
#[tokio::main]
async fn main() -> io::Result<()> {
let files = vec!["foo.txt", "bar.txt"];
let headers = stream::iter(files)
.map(|f| {
tokio::spawn(
read_file(f)
)
})
.try_buffer_unordered(8)
.try_collect()?;
println!("Got results: {:?}", headers);
Ok(())
}
字符串
链接到操场在这里:https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=3edf39926c3f046f89f2d925f1a93115
tryc_fn和tokio::spawn
的结果是Result<Result<...., MyError>, JoinError>.
try_buffer_unordered在Map后无法编译,我无法弄清楚。我得到:
Compiling playground v0.0.1 (/playground)
error[E0599]: no method named `try_buffer_unordered` found for struct `futures::stream::Map<futures::stream::Iter<std::vec::IntoIter<&str>>, [closure@src/main.rs:19:14: 23:10]>` in the current scope
--> src/main.rs:24:10
|
24 | .try_buffer_unordered(8)
| ^^^^^^^^^^^^^^^^^^^^ method not found in `futures::stream::Map<futures::stream::Iter<std::vec::IntoIter<&str>>, [closure@src/main.rs:19:14: 23:10]>`
|
::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.13/src/stream/stream/map.rs:12:1
|
12 | / pin_project! {
13 | | /// Stream for the [`map`](super::StreamExt::map) method.
14 | | #[must_use = "streams do nothing unless polled"]
15 | | pub struct Map<St, F> {
... |
19 | | }
20 | | }
| | -
| | |
| |_doesn't satisfy `_: TryStreamExt`
| doesn't satisfy `_: TryStream`
|
= note: the method `try_buffer_unordered` exists but the following trait bounds were not satisfied:
`futures::stream::Map<futures::stream::Iter<std::vec::IntoIter<&str>>, [closure@src/main.rs:19:14: 23:10]>: TryStream`
which is required by `futures::stream::Map<futures::stream::Iter<std::vec::IntoIter<&str>>, [closure@src/main.rs:19:14: 23:10]>: TryStreamExt`
`&futures::stream::Map<futures::stream::Iter<std::vec::IntoIter<&str>>, [closure@src/main.rs:19:14: 23:10]>: TryStream`
which is required by `&futures::stream::Map<futures::stream::Iter<std::vec::IntoIter<&str>>, [closure@src/main.rs:19:14: 23:10]>: TryStreamExt`
`&mut futures::stream::Map<futures::stream::Iter<std::vec::IntoIter<&str>>, [closure@src/main.rs:19:14: 23:10]>: TryStream`
which is required by `&mut futures::stream::Map<futures::stream::Iter<std::vec::IntoIter<&str>>, [closure@src/main.rs:19:14: 23:10]>: TryStreamExt`
型
我可以使用buffer_unordered(),但这样我就得到了一个带有双重嵌套Result的流,我希望使用try_buffer_unordered和try_collect().
(The另一种方法是类似于一系列的收集,试图摆脱结果,这是丑陋的,并使用额外的收集)
2条答案
按热度按时间xv8emn3q1#
FWIW,我从时雄discord中得知,不能将
try_buffer_unordered
与从map
返回的常规Future
一起使用。try_*
方法是为了在Future
的创建可能不成功时使用的,换句话说,类似于Result<Future, _>
。b09cbbtk2#
double
Result
(Result<Result<_, _>, _>
)来自于将对read_file()
的调用(返回Result
) Package 在tokio::spawn()
中,这将派生的Future
的返回值 Package 在Result
中。这可以通过查看this的返回类型来看到:字符串
(Playground.)
我不知道最终的目标是什么,但是一个选择是不将
read_file()
Future
Package 在tokio::span()
中。将那些Future
Package 在tokio::spawn()
will run each of those futures in the background immediately中,而不管传递给.buffer_unordered()
的缓冲区大小。这可能不是我们想要的。相反,让
.buffer_unordered()
处理并发。该解决方案可能看起来像这样:
型
(Playground.)
另一种方法是将
Result<Result<_, _> _>
Map到Result<_, _>
,这需要将错误Map到单个类型。但是,这样做的结果是具有欺骗性的。即使最终返回单个错误,其他Future
仍将运行(也可能出错)。您可以在输出here in the playground中看到。将该输出与not usingtokio::spawn()
时发生的情况进行比较。