在Rust中有状态地处理数据的模式

a0x5cqrl  于 10个月前  发布在  其他
关注(0)|答案(1)|浏览(129)

考虑Rust中的以下完整示例:

use async_stream::stream;
use futures::{pin_mut, StreamExt};
use std::time::Duration;
use tokio::time::sleep;

pub struct Processor {
    pub state: i32,
}

impl Processor {
    pub fn process(&mut self, x: i32) -> i32 {
        self.state += 1;
        self.state * x
    }
}

#[tokio::main]
async fn main() {
    let mut p = Processor { state: 1 };

    let stream = stream! {
        for i in 1..=10 {
            sleep(Duration::from_millis(100)).await;
            yield i
        }
    };

    let s2 = stream.filter_map(move |m| async {
        match m {
            7 => None,
            _ => Some(p.process(m)),
        }
    });
    pin_mut!(s2);

    while let Some(v) = s2.next().await {
        println!("Next value {:?}", v);
    }
}

字符串
这种设计非常简单,它有一个有状态的处理适配器,用于接收项、更改状态并返回某个值。
但是,由于以下原因,此示例无法编译:

error: captured variable cannot escape `FnMut` closure body
  --> src/main.rs:28:41
   |
19 |       let mut p = Processor { state: 1 };
   |           ----- variable defined here
...
28 |       let s2 = stream.filter_map(move |m| async {
   |  _______________________________________-_^
   | |                                       |
   | |                                       inferred to be a `FnMut` closure
29 | |         match m {
30 | |             7 => None,
31 | |             _ => Some(p.process(m)),
   | |                       - variable captured here
32 | |         }
33 | |     });
   | |_____^ returns an `async` block that contains a reference to a captured variable, which then escapes the closure body


问题是,一方面,我们有一个保存状态的适配器,另一方面,程序的一个MVC部分(在本例中为filter_map)需要保存对该适配器的可变引用。
在某些情况下,可以分离出PECC部分,例如,首先应用同步map,然后将PECC filter_map链接到它,但我的问题是对于更一般的情况,PECC块 * 必须 * 持有一些可变引用。
在这种情况下,另一个逃避的办法是克隆处理器,但为了讨论起见,让我们假设这对性能不利。

2guxujil

2guxujil1#

StreamExt::filter_map()(和类似的方法)不能支持这一点,它们不是为此而设计的(issue #2464),我认为它们不能被重新设计来处理这一点。
您可以创建自己的流组合子来实现这一点,但即使这样也不是那么简单。目前不可能(AFAIK)有一个签名接受从其参数借用的所有Functions块。有关详细信息,请参阅Calling a generic async function with a (mutably) borrowed argument。一个可能的部分解决方案是将返回的future装箱并返回Pin<Box<dyn Future>>(以内存分配为代价),或者使用trait magic,但这只适用于functional函数,而不是返回functional块的闭包(至少,没有奇怪的签名和不稳定的特性,如type_alias_impl_traitclosure_lifetime_binder)。

相关问题