由于某些原因,我必须在我的项目中使用futures/时雄0.1。现在我想要一个无限循环来为很多不同的手表做一个长轮询请求。
我将使用响应数据来发出一个新的请求,无论发生什么错误,如网络问题,超时,http响应错误等。
但是现在当一个错误发生时,流不会进入for_each分支,它会进入map_err,然后退出。当它进入map_err和runtime时,如何添加一个future back。block_on继续轮询?
[dependencies]
tokio = "0.1"
futures= "0.1"
hyper = { version = "0.12.20" }
我的演示:
// #![deny(warnings)]
extern crate hyper;
extern crate pretty_env_logger;
use cipher::crypto_common::Output;
use futures::future::join_all;
use futures::{future, Async};
use std::env;
use std::fmt::Error;
use std::io::{self, Write};
use std::ops::Add;
use std::pin::Pin;
use std::time::{Duration, Instant};
use hyper::client::HttpConnector;
use hyper::rt::{self, Future, Stream};
use hyper::Client;
use tokio::runtime::current_thread;
use tokio::runtime::current_thread::Runtime;
fn main() {
poll_long_poll_request();
}
#[derive(Debug)]
pub enum SomeError {
Http(hyper::Error),
Json(serde_json::Error),
}
impl From<hyper::Error> for SomeError {
fn from(err: hyper::Error) -> SomeError {
SomeError::Http(err)
}
}
impl From<serde_json::Error> for SomeError {
fn from(err: serde_json::Error) -> SomeError {
SomeError::Json(err)
}
}
fn long_poll_request(
url: hyper::Uri,
client: Client<HttpConnector>,
) -> impl Future<Item = Result<Vec<String>, SomeError>, Error = SomeError> {
// let client = Client::new();
client
// Fetch the url...
.get(url)
// And then, if we get a response back...
.and_then(|res| {
println!("Response: {}", res.status());
println!("Headers: {:#?}", res.headers());
res.into_body().concat2()
})
.from_err::<SomeError>()
.and_then(move |(body)| {
println!("body: {:?}", body);
let kvs: Vec<String> = serde_json::from_slice(&body)?;
Ok(kvs)
})
.map(|res| Ok(res))
// If there was an error, let the user know...
.map_err(|err| {
eprintln!("Error {:?}", err);
err
})
}
fn call_long_poll_request(
url: hyper::Uri,
client: Client<HttpConnector>,
) -> impl Future<Item = Result<Vec<String>, SomeError>, Error = SomeError> {
long_poll_request(url, client)
}
fn poll_long_poll_request() {
let urls = vec!["http://www.baidu.com/", "http://www.google.com/"];
let mut runtime = Runtime::new().unwrap();
let client = Client::new();
let mut futures = Vec::new();
for url in urls {
let url = url.parse::<hyper::Uri>().unwrap();
let future = call_long_poll_request(url.clone(), client.clone());
futures.push(future);
}
let stream = futures::stream::futures_unordered(futures);
runtime
.block_on(
stream
.for_each(|response_body| {
match response_body {
Ok(response_body) => {
println!("Long poll request response body: {:?}", response_body);
let url = "http://www.baidu.com/";
let url = url.parse::<hyper::Uri>().unwrap();
let future = call_long_poll_request(url, client.clone());
futures::future::Either::A(future.map(|_| ()))
}
Err(e) => {
let url = "http://www.baidu.com/";
let url = url.parse::<hyper::Uri>().unwrap();
println!("Long poll request error: {:?}", e);
let future = call_long_poll_request(url, client.clone());
futures::future::Either::B(future.map(|_| ()))
}
}
// futures::future::Either::B(futures::future::ok(()))
})
.map_err(|e| {
println!("Error: {:?}", e);
// i want to make a new future : let future = call_long_poll_request(url, client.clone());
// and then stream.push(future), so some request can continue to loop
}),
)
.unwrap();
}
1条答案
按热度按时间bybem2ql1#
尝试将
call_long_poll_request
更改为:实际上,由于使用链表堆栈进行递归,这可能会逐渐增加延迟和内存使用;这可能会更好: