我目前正在尝试将一些使用redis板条箱和redis事务的生 rust 代码从同步代码移植到异步代码,并遇到一些问题(目前与生存期相关,但根本原因可能在端口的其他地方)。
这两种情况下的代码在中都使用以下依赖项 Cargo.toml
:
[dependencies]
redis = "0.16"
tokio = { version = "0.2", features = ["full"] }
工作同步代码的最小示例如下所示:
use redis::{pipe, cmd, Pipeline, ToRedisArgs, RedisResult};
fn main() -> RedisResult<()> {
let client = redis::Client::open("redis://127.0.0.1/").unwrap();
let mut sync_conn = client.get_connection()?;
let key = "foo";
let (val,): (u32,) = transaction(&mut sync_conn, &[key], |conn, pipe| {
pipe.set(key, 1)
.ignore()
.get(key)
.query(conn)
})?;
println!("val: {}", val);
Ok(())
}
fn transaction<
C: redis::ConnectionLike,
K: ToRedisArgs,
T,
F: FnMut(&mut C, &mut Pipeline) -> RedisResult<Option<T>>,
>(
con: &mut C,
keys: &[K],
func: F,
) -> RedisResult<T> {
let mut func = func;
loop {
cmd("WATCH").arg(keys).query(con)?;
if let Some(response) = func(con, pipe().atomic())? {
cmd("UNWATCH").query(con)?;
return Ok(response);
}
}
}
我尝试将此代码转换为异步代码,如下所示:
use redis::{AsyncCommands, pipe, cmd, Pipeline, ToRedisArgs, RedisResult, aio::ConnectionLike};
use std::future::Future;
# [tokio::main]
async fn main() -> RedisResult<()> {
let client = redis::Client::open("redis://127.0.0.1/").unwrap();
let mut async_conn = client.get_async_connection().await?;
let key = "foo";
let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
pipe.set(key, 1)
.ignore()
.get(key)
.query_async(conn).await
}).await?;
println!("val: {}", val);
Ok(())
}
pub async fn async_transaction<
C: ConnectionLike + Send,
K: ToRedisArgs,
T,
F: FnMut(&mut C, &mut Pipeline) -> Fut,
Fut: Future<Output = RedisResult<Option<T>>>,
>(
con: & mut C,
keys: &[K],
func: F,
) -> RedisResult<T> {
let mut func = func;
loop {
cmd("WATCH").arg(keys).query_async(con).await?;
if let Some(response) = func(con, pipe().atomic()).await? {
cmd("UNWATCH").query_async(con).await?;
return Ok(response);
}
}
}
主要的变化是使用异步连接、使用async/await和 query_async
而不是 query
. 另一个主要的变化是传递给 async_transaction
返回 Future
而不是直接的结果。
但是,试图编译此文件会导致19个错误(它们大多是基于生命周期的):
error[E0486]: type of expression contains references that are not valid during the expression: `for<'_, '_> fn(&mut redis::aio::Connection, &[&str], [closure@src/main.rs:11:69: 16:6 key:&&str]) -> impl std::future::Future {async_transaction::<redis::aio::Connection, &str, (u32,), [closure@src/main.rs:11:69: 16:6 key:&&str], impl std::future::Future>}`
--> src/main.rs:11:26
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| ^^^^^^^^^^^^^^^^^
|
note: type is only valid for the anonymous lifetime #2 defined on the body at 11:69
--> src/main.rs:11:69
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| _____________________________________________________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |_____^
error[E0486]: type of expression contains references that are not valid during the expression: `for<'_, '_> fn(&mut redis::aio::Connection, &[&str], [closure@src/main.rs:11:69: 16:6 key:&&str]) -> impl std::future::Future {async_transaction::<redis::aio::Connection, &str, (u32,), [closure@src/main.rs:11:69: 16:6 key:&&str], impl std::future::Future>}`
--> src/main.rs:11:26
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| ^^^^^^^^^^^^^^^^^
|
note: type is only valid for the anonymous lifetime #3 defined on the body at 11:69
--> src/main.rs:11:69
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| _____________________________________________________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |_____^
error[E0486]: type of expression contains references that are not valid during the expression: `impl std::future::Future`
--> src/main.rs:11:26
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| __________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |______^
|
note: type is only valid for the anonymous lifetime #2 defined on the body at 11:69
--> src/main.rs:11:69
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| _____________________________________________________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |_____^
error[E0486]: type of expression contains references that are not valid during the expression: `impl std::future::Future`
--> src/main.rs:11:26
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| __________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |______^
|
note: type is only valid for the anonymous lifetime #3 defined on the body at 11:69
--> src/main.rs:11:69
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| _____________________________________________________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |_____^
error[E0486]: type of expression contains references that are not valid during the expression: `impl std::future::Future`
--> src/main.rs:11:26
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| __________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |____________^
|
note: type is only valid for the anonymous lifetime #2 defined on the body at 11:69
--> src/main.rs:11:69
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| _____________________________________________________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |_____^
error[E0486]: type of expression contains references that are not valid during the expression: `impl std::future::Future`
--> src/main.rs:11:26
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| __________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |____________^
|
note: type is only valid for the anonymous lifetime #3 defined on the body at 11:69
--> src/main.rs:11:69
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| _____________________________________________________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |_____^
error[E0486]: type of expression contains references that are not valid during the expression: `&mut impl std::future::Future`
--> src/main.rs:11:26
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| __________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |____________^
|
note: type is only valid for the anonymous lifetime #2 defined on the body at 11:69
--> src/main.rs:11:69
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| _____________________________________________________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |_____^
error[E0486]: type of expression contains references that are not valid during the expression: `&mut impl std::future::Future`
--> src/main.rs:11:26
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| __________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |____________^
|
note: type is only valid for the anonymous lifetime #3 defined on the body at 11:69
--> src/main.rs:11:69
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| _____________________________________________________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |_____^
error[E0486]: type of expression contains references that are not valid during the expression: `unsafe fn(&mut impl std::future::Future) -> std::pin::Pin<&mut impl std::future::Future> {std::pin::Pin::<&mut impl std::future::Future>::new_unchecked}`
--> src/main.rs:11:26
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| __________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |____________^
|
note: type is only valid for the anonymous lifetime #2 defined on the body at 11:69
--> src/main.rs:11:69
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| _____________________________________________________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |_____^
error[E0486]: type of expression contains references that are not valid during the expression: `unsafe fn(&mut impl std::future::Future) -> std::pin::Pin<&mut impl std::future::Future> {std::pin::Pin::<&mut impl std::future::Future>::new_unchecked}`
--> src/main.rs:11:26
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| __________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |____________^
|
note: type is only valid for the anonymous lifetime #3 defined on the body at 11:69
--> src/main.rs:11:69
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| _____________________________________________________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |_____^
error[E0486]: type of expression contains references that are not valid during the expression: `std::pin::Pin<&mut impl std::future::Future>`
--> src/main.rs:11:26
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| __________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |____________^
|
note: type is only valid for the anonymous lifetime #2 defined on the body at 11:69
--> src/main.rs:11:69
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| _____________________________________________________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |_____^
error[E0486]: type of expression contains references that are not valid during the expression: `std::pin::Pin<&mut impl std::future::Future>`
--> src/main.rs:11:26
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| __________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |____________^
|
note: type is only valid for the anonymous lifetime #3 defined on the body at 11:69
--> src/main.rs:11:69
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| _____________________________________________________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |_____^
error[E0486]: type of expression contains references that are not valid during the expression: `for<'r, 's, 't0> fn(std::pin::Pin<&'r mut impl std::future::Future>, &'s mut std::task::Context<'t0>) -> std::task::Poll<<impl std::future::Future as std::future::Future>::Output> {<impl std::future::Future as std::future::Future>::poll}`
--> src/main.rs:11:26
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| __________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |____________^
|
note: type is only valid for the anonymous lifetime #2 defined on the body at 11:69
--> src/main.rs:11:69
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| _____________________________________________________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |_____^
error[E0486]: type of expression contains references that are not valid during the expression: `for<'r, 's, 't0> fn(std::pin::Pin<&'r mut impl std::future::Future>, &'s mut std::task::Context<'t0>) -> std::task::Poll<<impl std::future::Future as std::future::Future>::Output> {<impl std::future::Future as std::future::Future>::poll}`
--> src/main.rs:11:26
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| __________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |____________^
|
note: type is only valid for the anonymous lifetime #3 defined on the body at 11:69
--> src/main.rs:11:69
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| _____________________________________________________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |_____^
error[E0481]: lifetime of function argument does not outlive the function call
--> src/main.rs:11:26
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| __________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |____________^
|
note: the function argument is only valid for the anonymous lifetime #2 defined on the body at 11:69
--> src/main.rs:11:69
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| _____________________________________________________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |_____^
error[E0481]: lifetime of function argument does not outlive the function call
--> src/main.rs:11:26
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| __________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |____________^
|
note: the function argument is only valid for the anonymous lifetime #3 defined on the body at 11:69
--> src/main.rs:11:69
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| _____________________________________________________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |_____^
error[E0488]: lifetime of variable does not enclose its declaration
--> src/main.rs:11:26
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| __________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |____________^
|
note: the variable is only valid for the anonymous lifetime #2 defined on the body at 11:69
--> src/main.rs:11:69
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| _____________________________________________________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |_____^
error[E0488]: lifetime of variable does not enclose its declaration
--> src/main.rs:11:26
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| __________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |____________^
|
note: the variable is only valid for the anonymous lifetime #3 defined on the body at 11:69
--> src/main.rs:11:69
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| _____________________________________________________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |_____^
error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
--> src/main.rs:11:26
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| __________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |____________^
|
note: first, the lifetime cannot outlive the anonymous lifetime #3 defined on the body at 11:69...
--> src/main.rs:11:69
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| _____________________________________________________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |_____^
note: ...so that the type `impl std::future::Future` is not borrowed for too long
--> src/main.rs:11:26
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| __________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |____________^
note: but, the lifetime must be valid for the call at 11:26...
--> src/main.rs:11:26
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| __________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |____________^
note: ...so that argument is valid for the call
--> src/main.rs:11:26
|
11 | let (val,): (u32,) = async_transaction(&mut async_conn, &[key], |conn, pipe| async {
| __________________________^
12 | | pipe.set(key, 1)
13 | | .ignore()
14 | | .get(key)
15 | | .query_async(conn).await
16 | | }).await?;
| |____________^
error: aborting due to 19 previous errors
For more information about this error, try `rustc --explain E0495`.
error: could not compile `redistest`.
To learn more, run the command again with --verbose.
我是否可以应用一些生命周期注解来解决这个问题,或者在这种情况下,我应该做些其他的事情来从同步代码转移到异步代码?
暂无答案!
目前还没有任何答案,快来回答吧!