考虑Rust中的以下完整示例:
use async_stream::stream;
use futures::{pin_mut, StreamExt};
use std::time::Duration;
use tokio::time::sleep;
pub struct Processor {
pub state: i32,
}
impl Processor {
pub fn process(&mut self, x: i32) -> i32 {
self.state += 1;
self.state * x
}
}
#[tokio::main]
async fn main() {
let mut p = Processor { state: 1 };
let stream = stream! {
for i in 1..=10 {
sleep(Duration::from_millis(100)).await;
yield i
}
};
let s2 = stream.filter_map(move |m| async {
match m {
7 => None,
_ => Some(p.process(m)),
}
});
pin_mut!(s2);
while let Some(v) = s2.next().await {
println!("Next value {:?}", v);
}
}
字符串
这种设计非常简单,它有一个有状态的处理适配器,用于接收项、更改状态并返回某个值。
但是,由于以下原因,此示例无法编译:
error: captured variable cannot escape `FnMut` closure body
--> src/main.rs:28:41
|
19 | let mut p = Processor { state: 1 };
| ----- variable defined here
...
28 | let s2 = stream.filter_map(move |m| async {
| _______________________________________-_^
| | |
| | inferred to be a `FnMut` closure
29 | | match m {
30 | | 7 => None,
31 | | _ => Some(p.process(m)),
| | - variable captured here
32 | | }
33 | | });
| |_____^ returns an `async` block that contains a reference to a captured variable, which then escapes the closure body
型
问题是,一方面,我们有一个保存状态的适配器,另一方面,程序的一个MVC部分(在本例中为filter_map
)需要保存对该适配器的可变引用。
在某些情况下,可以分离出PECC部分,例如,首先应用同步map
,然后将PECC filter_map
链接到它,但我的问题是对于更一般的情况,PECC块 * 必须 * 持有一些可变引用。
在这种情况下,另一个逃避的办法是克隆处理器,但为了讨论起见,让我们假设这对性能不利。
1条答案
按热度按时间2guxujil1#
StreamExt::filter_map()
(和类似的方法)不能支持这一点,它们不是为此而设计的(issue #2464),我认为它们不能被重新设计来处理这一点。您可以创建自己的流组合子来实现这一点,但即使这样也不是那么简单。目前不可能(AFAIK)有一个签名接受从其参数借用的所有Functions块。有关详细信息,请参阅Calling a generic async function with a (mutably) borrowed argument。一个可能的部分解决方案是将返回的future装箱并返回
Pin<Box<dyn Future>>
(以内存分配为代价),或者使用trait magic,但这只适用于functional函数,而不是返回functional块的闭包(至少,没有奇怪的签名和不稳定的特性,如type_alias_impl_trait
和closure_lifetime_binder
)。