rust 无法编译:将try_buffer_unordered与StreamExtMap一起使用

fkvaft9z  于 2024-01-08  发布在  其他
关注(0)|答案(2)|浏览(89)

我尝试使用StreamExt和TryStreamExt如下:

use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};
use futures::stream::{self, StreamExt, TryStreamExt};

async fn read_file(name: &str) -> io::Result<[u8; 10]> {
    let mut f = File::open(name).await?;
    let mut buffer = [0; 10];
   
    // read up to 10 bytes... yes I know this is not safe but...
    let n = f.read(&mut buffer[..]).await?;
    Ok(buffer)    
} 

#[tokio::main]
async fn main() -> io::Result<()> {
    let files = vec!["foo.txt", "bar.txt"];
    let headers = stream::iter(files)
        .map(|f| {
            tokio::spawn(
                read_file(f)
            )
        })
        .try_buffer_unordered(8)
        .try_collect()?;
    println!("Got results: {:?}", headers);
    Ok(())
}

字符串
链接到操场在这里:https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=3edf39926c3f046f89f2d925f1a93115
tryc_fn和tokio::spawn的结果是Result<Result<...., MyError>, JoinError>. try_buffer_unordered在Map后无法编译,我无法弄清楚。我得到:

Compiling playground v0.0.1 (/playground)
error[E0599]: no method named `try_buffer_unordered` found for struct `futures::stream::Map<futures::stream::Iter<std::vec::IntoIter<&str>>, [closure@src/main.rs:19:14: 23:10]>` in the current scope
  --> src/main.rs:24:10
   |
24 |           .try_buffer_unordered(8)
   |            ^^^^^^^^^^^^^^^^^^^^ method not found in `futures::stream::Map<futures::stream::Iter<std::vec::IntoIter<&str>>, [closure@src/main.rs:19:14: 23:10]>`
   | 
  ::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.13/src/stream/stream/map.rs:12:1
   |
12 | / pin_project! {
13 | |     /// Stream for the [`map`](super::StreamExt::map) method.
14 | |     #[must_use = "streams do nothing unless polled"]
15 | |     pub struct Map<St, F> {
...  |
19 | |     }
20 | | }
   | | -
   | | |
   | |_doesn't satisfy `_: TryStreamExt`
   |   doesn't satisfy `_: TryStream`
   |
   = note: the method `try_buffer_unordered` exists but the following trait bounds were not satisfied:
           `futures::stream::Map<futures::stream::Iter<std::vec::IntoIter<&str>>, [closure@src/main.rs:19:14: 23:10]>: TryStream`
           which is required by `futures::stream::Map<futures::stream::Iter<std::vec::IntoIter<&str>>, [closure@src/main.rs:19:14: 23:10]>: TryStreamExt`
           `&futures::stream::Map<futures::stream::Iter<std::vec::IntoIter<&str>>, [closure@src/main.rs:19:14: 23:10]>: TryStream`
           which is required by `&futures::stream::Map<futures::stream::Iter<std::vec::IntoIter<&str>>, [closure@src/main.rs:19:14: 23:10]>: TryStreamExt`
           `&mut futures::stream::Map<futures::stream::Iter<std::vec::IntoIter<&str>>, [closure@src/main.rs:19:14: 23:10]>: TryStream`
           which is required by `&mut futures::stream::Map<futures::stream::Iter<std::vec::IntoIter<&str>>, [closure@src/main.rs:19:14: 23:10]>: TryStreamExt`


我可以使用buffer_unordered(),但这样我就得到了一个带有双重嵌套Result的流,我希望使用try_buffer_unordered和try_collect().
(The另一种方法是类似于一系列的收集,试图摆脱结果,这是丑陋的,并使用额外的收集)

xv8emn3q

xv8emn3q1#

FWIW,我从时雄discord中得知,不能将try_buffer_unordered与从map返回的常规Future一起使用。try_*方法是为了在Future的创建可能不成功时使用的,换句话说,类似于Result<Future, _>

b09cbbtk

b09cbbtk2#

double ResultResult<Result<_, _>, _>)来自于将对read_file()的调用(返回Result) Package 在tokio::spawn()中,这将派生的Future的返回值 Package 在Result中。这可以通过查看this的返回类型来看到:

tokio::spawn(read_file("foo.txt")).await

字符串
Playground.
我不知道最终的目标是什么,但是一个选择是不将read_file()Future Package 在tokio::span()中。将那些Future Package 在tokio::spawn() will run each of those futures in the background immediately中,而不管传递给.buffer_unordered()的缓冲区大小。这可能不是我们想要的。
相反,让.buffer_unordered()处理并发。
该解决方案可能看起来像这样:

#[tokio::main]
async fn main() -> io::Result<()> {
    let files = vec!["foo.txt", "bar.txt"];
    let headers: Vec<_> = stream::iter(files)
        .map(read_file)
        .buffer_unordered(8)
        .try_collect()
        .await?;
    println!("Got results: {:?}", headers);
    Ok(())
}


Playground.
另一种方法是将Result<Result<_, _> _>Map到Result<_, _>,这需要将错误Map到单个类型。但是,这样做的结果是具有欺骗性的。即使最终返回单个错误,其他Future仍将运行(也可能出错)。您可以在输出here in the playground中看到。将该输出与not using tokio::spawn()时发生的情况进行比较。

相关问题