rust 如何在流返回错误时向流添加将来返回

lhcgjxsq  于 2023-03-30  发布在  其他
关注(0)|答案(1)|浏览(129)

由于某些原因,我必须在我的项目中使用futures/时雄0.1。现在我想要一个无限循环来为很多不同的手表做一个长轮询请求。
我将使用响应数据来发出一个新的请求,无论发生什么错误,如网络问题,超时,http响应错误等。
但是现在当一个错误发生时,流不会进入for_each分支,它会进入map_err,然后退出。当它进入map_err和runtime时,如何添加一个future back。block_on继续轮询?

[dependencies]
tokio = "0.1"
futures= "0.1"
hyper = { version = "0.12.20" }

我的演示:

// #![deny(warnings)]
extern crate hyper;
extern crate pretty_env_logger;

use cipher::crypto_common::Output;
use futures::future::join_all;
use futures::{future, Async};
use std::env;
use std::fmt::Error;
use std::io::{self, Write};
use std::ops::Add;
use std::pin::Pin;
use std::time::{Duration, Instant};

use hyper::client::HttpConnector;
use hyper::rt::{self, Future, Stream};
use hyper::Client;
use tokio::runtime::current_thread;
use tokio::runtime::current_thread::Runtime;

fn main() {
    poll_long_poll_request();
}

#[derive(Debug)]
pub enum SomeError {
    Http(hyper::Error),
    Json(serde_json::Error),
}

impl From<hyper::Error> for SomeError {
    fn from(err: hyper::Error) -> SomeError {
        SomeError::Http(err)
    }
}

impl From<serde_json::Error> for SomeError {
    fn from(err: serde_json::Error) -> SomeError {
        SomeError::Json(err)
    }
}

fn long_poll_request(
    url: hyper::Uri,
    client: Client<HttpConnector>,
) -> impl Future<Item = Result<Vec<String>, SomeError>, Error = SomeError> {
    // let client = Client::new();

    client
        // Fetch the url...
        .get(url)
        // And then, if we get a response back...
        .and_then(|res| {
            println!("Response: {}", res.status());
            println!("Headers: {:#?}", res.headers());

            res.into_body().concat2()
        })
        .from_err::<SomeError>()
        .and_then(move |(body)| {
            println!("body: {:?}", body);
            let kvs: Vec<String> = serde_json::from_slice(&body)?;
            Ok(kvs)
        })
        .map(|res| Ok(res))
        // If there was an error, let the user know...
        .map_err(|err| {
            eprintln!("Error {:?}", err);
            err
        })
}

fn call_long_poll_request(
    url: hyper::Uri,
    client: Client<HttpConnector>,
) -> impl Future<Item = Result<Vec<String>, SomeError>, Error = SomeError> {
    long_poll_request(url, client)
}

fn poll_long_poll_request() {
    let urls = vec!["http://www.baidu.com/", "http://www.google.com/"];
    let mut runtime = Runtime::new().unwrap();
    let client = Client::new();

    let mut futures = Vec::new();
    for url in urls {
        let url = url.parse::<hyper::Uri>().unwrap();
        let future = call_long_poll_request(url.clone(), client.clone());
        futures.push(future);
    }

    let stream = futures::stream::futures_unordered(futures);

    runtime
        .block_on(
            stream
                .for_each(|response_body| {
                    match response_body {
                        Ok(response_body) => {
                            println!("Long poll request response body: {:?}", response_body);
                            let url = "http://www.baidu.com/";
                            let url = url.parse::<hyper::Uri>().unwrap();
                            let future = call_long_poll_request(url, client.clone());
                            futures::future::Either::A(future.map(|_| ()))
                        }
                        Err(e) => {
                            let url = "http://www.baidu.com/";
                            let url = url.parse::<hyper::Uri>().unwrap();
                            println!("Long poll request error: {:?}", e);
                            let future = call_long_poll_request(url, client.clone());
                            futures::future::Either::B(future.map(|_| ()))
                        }
                    }
                    // futures::future::Either::B(futures::future::ok(()))
                })
                .map_err(|e| {
                    println!("Error: {:?}", e);
                    // i want to make a new future : let future = call_long_poll_request(url, client.clone());
                    // and then stream.push(future), so some request can continue to loop
                }),
        )
        .unwrap();
}
bybem2ql

bybem2ql1#

尝试将call_long_poll_request更改为:

fn call_long_poll_request(
    url: hyper::Uri,
    client: Client<HttpConnector>,
) -> Box<dyn Future<Item = Result<Vec<String>, SomeError>, Error = SomeError>> {
    Box::new(long_poll_request(url, client).or_else(|e| {
        println!("Error: {:?}", e);
        call_long_poll_request(url, client)
    }));
}

实际上,由于使用链表堆栈进行递归,这可能会逐渐增加延迟和内存使用;这可能会更好:

fn call_long_poll_request(
    url: hyper::Uri,
    client: Client<HttpConnector>,
) -> Box<dyn Future<Item = Result<Vec<String>, SomeError>, Error = SomeError>> {
    loop_fn(
        (),
        |_| {
            long_poll_request(url, client).then(|result| {
                match result {
                    Ok(data) => Ok(Loop::Break(data)),
                    Err(e) => {
                        println!("Error: {:?}", e);
                        Ok(Loop::Continue(()))
                    },
                }
            })
        }
    )
}

相关问题