如何在异步东京运行时使用future::join \u all和多路复用redis

jk9hmnmh  于 2021-06-07  发布在  Redis
关注(0)|答案(1)|浏览(394)

我尝试在异步多路复用模式下使用rust redis客户机,使用tokio作为异步运行时,并动态加入未来数。
我成功地使用了 future::join3 在一个固定数量的未来,但我想多路复用更多的命令(具体的大小不应该知道在编译时,但即使这将是一个改进)。
这是使用 future::join3 ; 示例正确打印
Ok(Some("PONG")) Ok(Some("PONG")) Ok(Some("PONG")) Cargo.toml ```
[package]
name = "redis_sample"
version = "0.1.0"
authors = ["---"]
edition = "2018"

[dependencies]
redis = { version = "0.17.0", features = ["aio", "tokio-comp", "tokio-rt-core"] }
tokio = { version = "0.2.23", features = ["full"] }
futures = "0.3.8"
`src/main.rs`
use futures::future;
use redis::RedisResult;

[tokio::main]

async fn main() -> Result<(), Box> {
let redis_client = redis::Client::open("redis://127.0.0.1:6379")?;
let mut redis_connection = redis_client.get_multiplexed_tokio_connection().await?;

let results: (RedisResult<Option<String>>, RedisResult<Option<String>>, RedisResult<Option<String>>) = future::join3(
    redis::cmd("PING").query_async(&mut redis_connection.clone()),
    redis::cmd("PING").query_async(&mut redis_connection.clone()),
    redis::cmd("PING").query_async(&mut redis_connection),
).await;

println!("{:?} {:?} {:?}", results.0, results.1, results.2);

Ok(())

}

现在我也想这么做,但是 `n` 命令(假设是10个,但理想情况下,我希望根据生产中的性能调整它)。这是我所能做到的,但我无法克服借贷规则;我尝试存储一些中介(redis `Cmd` 或未来本身),以延长他们的生命,但这有其他问题(与多个) `mut` 参考文献)。
这个 `Cargo.toml` 是一样的;这里是 `main.rs` ```
use futures::{future, Future};
use std::pin::Pin;
use redis::RedisResult;

const BATCH_SIZE: usize = 10;

# [tokio::main]

async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let redis_client = redis::Client::open("redis://127.0.0.1:6379")?;
    let redis_connection = redis_client.get_multiplexed_tokio_connection().await?;

    let mut commands: Vec<Pin<Box<dyn Future<Output = RedisResult<Option<String>>>>>> = vec![];
    for _ in 0..BATCH_SIZE {
        commands.push(Box::pin(redis::cmd("PING").query_async(& mut redis_connection.clone())));
    }
    let results = future::join_all(commands).await;

    println!("{:?}", results);

    Ok(())
}

我收到两个编译器警告( creates a temporary which is freed while still in use ),我不知道如何继续使用这个代码。我不是百分之百地喜欢使用pin,但是没有它我甚至无法储存期货。
完整编译器输出:

Compiling redis_sample v0.1.0 (/Users/gyfis/Documents/programming/rust/redis_sample)
error[E0716]: temporary value dropped while borrowed
  --> redis_sample/src/main.rs:14:32
   |
14 |         commands.push(Box::pin(redis::cmd("PING").query_async(& mut redis_connection.clone())));
   |                                ^^^^^^^^^^^^^^^^^^                                              - temporary value is freed at the end of this statement
   |                                |
   |                                creates a temporary which is freed while still in use
...
21 | }
   | - borrow might be used here, when `commands` is dropped and runs the `Drop` code for type `std::vec::Vec`
   |
   = note: consider using a `let` binding to create a longer lived value

error[E0716]: temporary value dropped while borrowed
  --> redis_sample/src/main.rs:14:69
   |
14 |         commands.push(Box::pin(redis::cmd("PING").query_async(& mut redis_connection.clone())));
   |                                                                     ^^^^^^^^^^^^^^^^^^^^^^^^   - temporary value is freed at the end of this statement
   |                                                                     |
   |                                                                     creates a temporary which is freed while still in use
...
21 | }
   | - borrow might be used here, when `commands` is dropped and runs the `Drop` code for type `std::vec::Vec`
   |
   = note: consider using a `let` binding to create a longer lived value

error: aborting due to 2 previous errors

For more information about this error, try `rustc --explain E0716`.
error: could not compile `redis_sample`.

感谢您的帮助!

epfja78i

epfja78i1#

这应该管用,我只是延长了 redis_connection .

use futures::{future, Future};
use std::pin::Pin;
use redis::RedisResult;

const BATCH_SIZE: usize = 10;

# [tokio::main]

async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let redis_client = redis::Client::open("redis://127.0.0.1:6379")?;
    let redis_connection = redis_client.get_multiplexed_tokio_connection().await?;

    let mut commands: Vec<Pin<Box<dyn Future<Output = RedisResult<Option<String>>>>>> = vec![];
    for _ in 0..BATCH_SIZE {
        let mut redis_connection = redis_connection.clone();
        commands.push(Box::pin(async move {
            redis::cmd("PING").query_async(&mut redis_connection).await
        }));
    }
    let results = future::join_all(commands).await;

    println!("{:?}", results);

    Ok(())
}

由于您在一个函数体中,甚至不需要对未来进行装箱,因此类型推断可以完成所有工作:

use futures::future;
use redis::RedisResult;

const BATCH_SIZE: usize = 10;

# [tokio::main]

async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let redis_client = redis::Client::open("redis://127.0.0.1:6379")?;
    let redis_connection = redis_client.get_multiplexed_tokio_connection().await?;

    let mut commands = vec![];
    for _ in 0..BATCH_SIZE {
        let mut redis_connection = redis_connection.clone();
        commands.push(async move {
            redis::cmd("PING").query_async::<_, Option<String>>(&mut redis_connection).await
        });
    }
    let results = future::join_all(commands).await;

    println!("{:?}", results);

    Ok(())
}

相关问题