我正在处理一个项目,我想在不同的线程中从两个数据源传输数据,但遇到以下错误:
error[E0521]: borrowed data escapes outside of method
--> client_runner/src/lib.rs:37:23
|
15 | pub async fn run(&mut self) {
| ---------
| |
| `self` is a reference that is only valid in the method body
| let's call the lifetime of this reference `'1`
...
37 | for client in self.clients.iter() {
| ^^^^^^^^^^^^^^^^^^^
| |
| `self` escapes the method body here
| argument requires that `'1` must outlive `'static`
字符串
据我所知,这与clients
是self
的成员这一事实有关,并且在这里被借用用于方法run
。我真的不知道该怎么做才能正确。我尝试过使用for循环来迭代向量的大小,然后通过self.clients[idx].stream()
访问客户端,但当我尝试将其作为时雄启动时,这不起作用任务。如果我没有在时雄任务中启动它们,每个客户端都运行得很好,但现在它们是同步运行的。
我知道Rust的OOP能力有局限性,但我是一个来自Java,C++和一些python的程序员,所以OOP是我所知道的。如果有人能提供一种“Rusty”的方式来完成这项任务,这对所有权模型更有意义,我愿意接受。
我对生 rust 很陌生,所以如果我做得不对或者我可以做得更好,请告诉我。我把这当作一个从错误中学习的机会。
代码如下:
crate main_project
main.rs
use client_runner::*;
use data_sources::{
DataSource,
source1_interface::Source1Client,
source2_interface::Source2Client,
};
#[tokio::main(flavor = "multi_thread")]
async fn main() {
// Create the vector of clients
let client_list: Vec<Box<dyn DataSource + Send + Sync>> = vec! [
Box::new(Source1Client::new()),
Box::new(Source2Client::new()),
];
// Take in a vector of clients to launch in their own threads
let mut client_runner = ClientRunner::new(client_list);
client_runner.run().await;
}
型
crate client_runner
lib.rs
use data_sources::DataSource;
use std:thread;
pub struct ClientRunner {
clients: Vec<Box<dyn DataSource + Send + Sync>>,
}
impl ClientRunner {
/// Creates a new Client Runner given a list of clients that implement the DataSource trait
pub fn new(client_list: Vec<Box<dyn DataSource + Send + Sync>>) -> Self {
ClientRunner {clients: client_list}
}
pub async fn run(&mut self) {
println!("Main Thread: {:?}", thread::current().id());
// Single call works but is not multithreaded
// self.clients[0].stream().await;
// Error here
for client in self.clients.iter() {
// Launch worker threads to:
// Stream the data
tokio::task::spawn(async move {
client.stream().await;
});
}
}
}
型
crate data_sources
lib.rs
use async_trait::async_trait;
#[async_trait]
pub trait DataSource {
asyn fn stream(&self);
}
pub mod source1_interface;
pub mod source2_interface;
型
source1_interface.rs
pub struct Source1Client{
...
}
unsafe impl Send for Source1Client {}
unsafe impl Sync for Source1Client {}
#[async_trait]
impl DataSource for Source1Client {
async fn stream(&self) {
println!("Streaming Source1 Data on thread: {:?}", thread::current().id());
}
}
impl Source1Client {
pub fn new() -> Self {
...
Source1Client {
...
}
}
...
}
型
source2_interface.rs
pub struct Source2Client{
...
}
unsafe impl Send for Source2Client {}
unsafe impl Sync for Source2Client {}
#[async_trait]
impl DataSource for Source2Client {
async fn stream(&self) {
println!("Streaming Source2 Data on thread: {:?}", thread::current().id());
}
}
impl Source2Client {
pub fn new() -> Self {
...
Source2Client {
...
}
}
...
}
型
更新-可能的解决方案
我发现我可以使用Arc
智能指针代替Box
智能指针。根据Arc
文档:https://doc.rust-lang.org/std/sync/struct.Arc.html#://text ='Arc'%20stands%20for%20'Atomically,while%20increasing%20a%20reference%20count。
类型Arc<T>
提供了在堆中分配的类型T
的值的共享所有权。在Arc
上复制克隆会产生一个新的Arc
示例,它指向堆上与源Arc
相同的分配,同时增加引用计数。
这让我避开了借用数据逃逸的问题,因为它是共享的。
我邀请任何人提出一个更“生 rust ”的解决方案。请参阅下面的代码:
crate main_project
main.rs
...
#[tokio::main(flavor = "multi_thread")]
async fn main() {
// Create the vector of clients
let client_list: Vec<Arc<dyn DataSource + Send + Sync>> = vec! [
Arc::new(Source1Client::new()),
Arc::new(Source2Client::new()),
];
...
}
型
crate client_runner
lib.rs
...
use std::sync::Arc;
pub struct ClientRunner {
clients: Vec<Arc<dyn DataSource + Send + Sync>>,
}
impl ClientRunner {
pub fn new(client_list: Vec<Arc<dyn DataSource + Send + Sync>>) -> Self {
ClientRunner { clients: client_list }
}
pub async fn run(&mut self) {
println!("Main Thread: {:?}", thread::current().id());
let mut worker_threads = Vec::new();
for idx in 0..self.clients.len() {
// Launch worker threads to:
// Stream the data
let moved_client = Arc::clone(&self.clients[idx]);
let task_handle = tokio::task::spawn(async move {
moved_client.stream().await;
});
worker_threads.push(task_handle);
}
// Join the threads
for thread in worker_threads {
tokio::join!(thread);
}
}
}
型
1条答案
按热度按时间46scxncf1#
这里有两个正交的问题。
首先,这个错误与OOP、trait等无关;它的发生是因为对
self.clients
元素的引用被一个闭包捕获,而这个闭包本身因为tokio::task::spawn_blocking
而转义了这个方法,因为tokio::task::spawn_blocking
的参数有一个'static
的边界。下面是一个没有trait对象的最小复制:字符串
变频器输出:
型
正如您在更新中所指出的,使用
Arc<T>
overBox<T>
是可行的,因为Arc<T>
可以被克隆并通过值移动到闭包中。关于你的问题中的OOP方面,这个特定的用例--使用trait对象来抽象不同的trait实现--是完全符合Rust习惯的。Rust中最大的“缺失”的OOP特性是继承,据我所知,这不是你问题中的一个因素。1
关于trait方法分派,在可能的情况下,使用sum类型(Rust
enum
s)通常优于trait对象,因为它不需要堆分配。2在您的情况下使用枚举看起来像:型
代码的其余部分看起来和你已经拥有的几乎一样。
1 Rust Book有一节介绍了Rust如何通过其他机制实现继承的目标。
2使用
enum
手动调度方法调用还允许编译器更积极地内联并以更少的开销调用方法,尽管这样做的好处微不足道,除非方法体执行得非常快(想想微秒),并且在一个紧密的循环中有许多这样的调用。