在rust中将同步闭包移植到异步闭包时的生存期问题

4urapxun  于 2021-06-10  发布在  Redis
关注(0)|答案(0)|浏览(262)

我目前正在尝试将一些使用redis板条箱和redis事务的生 rust 代码从同步代码移植到异步代码,并遇到一些问题(目前与生存期相关,但根本原因可能在端口的其他地方)。
这两种情况下的代码在中都使用以下依赖项 Cargo.toml :

[dependencies]
redis = "0.16"
tokio = { version = "0.2", features = ["full"] }

工作同步代码的最小示例如下所示:

use redis::{pipe, cmd, Pipeline, ToRedisArgs, RedisResult};

fn main() -> RedisResult<()> {
    let client = redis::Client::open("redis://127.0.0.1/").unwrap();
    let mut sync_conn = client.get_connection()?;

    let key = "foo";

    let (val,): (u32,) = transaction(&mut sync_conn, &[key], |conn, pipe| {
        pipe.set(key, 1)
            .ignore()
            .get(key)
            .query(conn)
    })?;
    println!("val: {}", val);

    Ok(())
}

fn transaction<
    C: redis::ConnectionLike,
    K: ToRedisArgs,
    T,
    F: FnMut(&mut C, &mut Pipeline) -> RedisResult<Option<T>>,
>(
    con: &mut C,
    keys: &[K],
    func: F,
) -> RedisResult<T> {
    let mut func = func;
    loop {
        cmd("WATCH").arg(keys).query(con)?;
        if let Some(response) = func(con, pipe().atomic())? {
            cmd("UNWATCH").query(con)?;
            return Ok(response);
        }
    }
}

我尝试将此代码转换为异步代码,如下所示:

use redis::{AsyncCommands, pipe, cmd, Pipeline, ToRedisArgs, RedisResult, aio::ConnectionLike};
use std::future::Future;

# [tokio::main]

async fn main() -> RedisResult<()> {
    let client = redis::Client::open("redis://127.0.0.1/").unwrap();
    let mut async_conn = client.get_async_connection().await?;

    let key = "foo";

    let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
        pipe.set(key, 1)
            .ignore()
            .get(key)
            .query_async(conn).await
    }).await?;
    println!("val: {}", val);

    Ok(())
}

pub async fn async_transaction<
    C: ConnectionLike + Send,
    K: ToRedisArgs,
    T,
    F: FnMut(&mut C, &mut Pipeline) -> Fut,
    Fut: Future<Output = RedisResult<Option<T>>>,
>(
    con: & mut C,
    keys: &[K],
    func: F,
) -> RedisResult<T> {
    let mut func = func;
    loop {
        cmd("WATCH").arg(keys).query_async(con).await?;
        if let Some(response) = func(con, pipe().atomic()).await? {
            cmd("UNWATCH").query_async(con).await?;
            return Ok(response);
        }
    }
}

主要的变化是使用异步连接、使用async/await和 query_async 而不是 query . 另一个主要的变化是传递给 async_transaction 返回 Future 而不是直接的结果。
但是,试图编译此文件会导致19个错误(它们大多是基于生命周期的):

error[E0486]: type of expression contains references that are not valid during the expression: `for<'_, '_> fn(&mut redis::aio::Connection, &[&str], [closure@src/main.rs:11:69: 16:6 key:&&str]) -> impl std::future::Future {async_transaction::<redis::aio::Connection, &str, (u32,), [closure@src/main.rs:11:69: 16:6 key:&&str], impl std::future::Future>}`
  --> src/main.rs:11:26
   |
11 |     let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |                          ^^^^^^^^^^^^^^^^^
   |
note: type is only valid for the anonymous lifetime #2 defined on the body at 11:69
  --> src/main.rs:11:69
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  _____________________________________________________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |_____^

error[E0486]: type of expression contains references that are not valid during the expression: `for<'_, '_> fn(&mut redis::aio::Connection, &[&str], [closure@src/main.rs:11:69: 16:6 key:&&str]) -> impl std::future::Future {async_transaction::<redis::aio::Connection, &str, (u32,), [closure@src/main.rs:11:69: 16:6 key:&&str], impl std::future::Future>}`
  --> src/main.rs:11:26
   |
11 |     let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |                          ^^^^^^^^^^^^^^^^^
   |
note: type is only valid for the anonymous lifetime #3 defined on the body at 11:69
  --> src/main.rs:11:69
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  _____________________________________________________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |_____^

error[E0486]: type of expression contains references that are not valid during the expression: `impl std::future::Future`
  --> src/main.rs:11:26
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  __________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |______^
   |
note: type is only valid for the anonymous lifetime #2 defined on the body at 11:69
  --> src/main.rs:11:69
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  _____________________________________________________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |_____^

error[E0486]: type of expression contains references that are not valid during the expression: `impl std::future::Future`
  --> src/main.rs:11:26
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  __________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |______^
   |
note: type is only valid for the anonymous lifetime #3 defined on the body at 11:69
  --> src/main.rs:11:69
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  _____________________________________________________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |_____^

error[E0486]: type of expression contains references that are not valid during the expression: `impl std::future::Future`
  --> src/main.rs:11:26
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  __________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |____________^
   |
note: type is only valid for the anonymous lifetime #2 defined on the body at 11:69
  --> src/main.rs:11:69
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  _____________________________________________________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |_____^

error[E0486]: type of expression contains references that are not valid during the expression: `impl std::future::Future`
  --> src/main.rs:11:26
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  __________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |____________^
   |
note: type is only valid for the anonymous lifetime #3 defined on the body at 11:69
  --> src/main.rs:11:69
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  _____________________________________________________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |_____^

error[E0486]: type of expression contains references that are not valid during the expression: `&mut impl std::future::Future`
  --> src/main.rs:11:26
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  __________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |____________^
   |
note: type is only valid for the anonymous lifetime #2 defined on the body at 11:69
  --> src/main.rs:11:69
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  _____________________________________________________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |_____^

error[E0486]: type of expression contains references that are not valid during the expression: `&mut impl std::future::Future`
  --> src/main.rs:11:26
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  __________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |____________^
   |
note: type is only valid for the anonymous lifetime #3 defined on the body at 11:69
  --> src/main.rs:11:69
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  _____________________________________________________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |_____^

error[E0486]: type of expression contains references that are not valid during the expression: `unsafe fn(&mut impl std::future::Future) -> std::pin::Pin<&mut impl std::future::Future> {std::pin::Pin::<&mut impl std::future::Future>::new_unchecked}`
  --> src/main.rs:11:26
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  __________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |____________^
   |
note: type is only valid for the anonymous lifetime #2 defined on the body at 11:69
  --> src/main.rs:11:69
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  _____________________________________________________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |_____^

error[E0486]: type of expression contains references that are not valid during the expression: `unsafe fn(&mut impl std::future::Future) -> std::pin::Pin<&mut impl std::future::Future> {std::pin::Pin::<&mut impl std::future::Future>::new_unchecked}`
  --> src/main.rs:11:26
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  __________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |____________^
   |
note: type is only valid for the anonymous lifetime #3 defined on the body at 11:69
  --> src/main.rs:11:69
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  _____________________________________________________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |_____^

error[E0486]: type of expression contains references that are not valid during the expression: `std::pin::Pin<&mut impl std::future::Future>`
  --> src/main.rs:11:26
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  __________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |____________^
   |
note: type is only valid for the anonymous lifetime #2 defined on the body at 11:69
  --> src/main.rs:11:69
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  _____________________________________________________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |_____^

error[E0486]: type of expression contains references that are not valid during the expression: `std::pin::Pin<&mut impl std::future::Future>`
  --> src/main.rs:11:26
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  __________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |____________^
   |
note: type is only valid for the anonymous lifetime #3 defined on the body at 11:69
  --> src/main.rs:11:69
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  _____________________________________________________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |_____^

error[E0486]: type of expression contains references that are not valid during the expression: `for<'r, 's, 't0> fn(std::pin::Pin<&'r mut impl std::future::Future>, &'s mut std::task::Context<'t0>) -> std::task::Poll<<impl std::future::Future as std::future::Future>::Output> {<impl std::future::Future as std::future::Future>::poll}`
  --> src/main.rs:11:26
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  __________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |____________^
   |
note: type is only valid for the anonymous lifetime #2 defined on the body at 11:69
  --> src/main.rs:11:69
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  _____________________________________________________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |_____^

error[E0486]: type of expression contains references that are not valid during the expression: `for<'r, 's, 't0> fn(std::pin::Pin<&'r mut impl std::future::Future>, &'s mut std::task::Context<'t0>) -> std::task::Poll<<impl std::future::Future as std::future::Future>::Output> {<impl std::future::Future as std::future::Future>::poll}`
  --> src/main.rs:11:26
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  __________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |____________^
   |
note: type is only valid for the anonymous lifetime #3 defined on the body at 11:69
  --> src/main.rs:11:69
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  _____________________________________________________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |_____^

error[E0481]: lifetime of function argument does not outlive the function call
  --> src/main.rs:11:26
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  __________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |____________^
   |
note: the function argument is only valid for the anonymous lifetime #2 defined on the body at 11:69
  --> src/main.rs:11:69
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  _____________________________________________________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |_____^

error[E0481]: lifetime of function argument does not outlive the function call
  --> src/main.rs:11:26
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  __________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |____________^
   |
note: the function argument is only valid for the anonymous lifetime #3 defined on the body at 11:69
  --> src/main.rs:11:69
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  _____________________________________________________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |_____^

error[E0488]: lifetime of variable does not enclose its declaration
  --> src/main.rs:11:26
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  __________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |____________^
   |
note: the variable is only valid for the anonymous lifetime #2 defined on the body at 11:69
  --> src/main.rs:11:69
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  _____________________________________________________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |_____^

error[E0488]: lifetime of variable does not enclose its declaration
  --> src/main.rs:11:26
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  __________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |____________^
   |
note: the variable is only valid for the anonymous lifetime #3 defined on the body at 11:69
  --> src/main.rs:11:69
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  _____________________________________________________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |_____^

error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
  --> src/main.rs:11:26
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  __________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |____________^
   |
note: first, the lifetime cannot outlive the anonymous lifetime #3 defined on the body at 11:69...
  --> src/main.rs:11:69
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  _____________________________________________________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |_____^
note: ...so that the type `impl std::future::Future` is not borrowed for too long
  --> src/main.rs:11:26
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  __________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |____________^
note: but, the lifetime must be valid for the call at 11:26...
  --> src/main.rs:11:26
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  __________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |____________^
note: ...so that argument is valid for the call
  --> src/main.rs:11:26
   |
11 |       let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
   |  __________________________^
12 | |         pipe.set(key, 1)
13 | |             .ignore()
14 | |             .get(key)
15 | |             .query_async(conn).await
16 | |     }).await?;
   | |____________^

error: aborting due to 19 previous errors

For more information about this error, try `rustc --explain E0495`.
error: could not compile `redistest`.

To learn more, run the command again with --verbose.

我是否可以应用一些生命周期注解来解决这个问题,或者在这种情况下,我应该做些其他的事情来从同步代码转移到异步代码?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题