如何在rust中触发异步回调

mwg9r5ms  于 2022-11-30  发布在  其他
关注(0)|答案(1)|浏览(344)

我试图在Rust中实现一个状态机,但是在一个spawn线程中触发状态机的回调时遇到了一些问题。
这是我的StateMachine结构。状态是一个通用的T,因为我想在许多不同的场景中使用它,我使用Vec来存储所有注册到这个StateMachine中的回调。
一开始,我没有使用lifetime 'a,但这样会遇到一些lifetime问题,所以我通过以下建议添加了lifetime 'a:Idiomatic callbacks in Rust

  1. pub struct StateMachine<'a, T> where T:Clone+Eq+'a {
  2. state: RwLock<T>,
  3. listeners2: Vec<Arc<Mutex<ListenerCallback<'a, T>>>>,
  4. }
  5. pub type ListenerCallback<'a, T> = dyn FnMut(T) -> Result<()> + Send + Sync + 'a ;

当状态更改时,StateMachine将激发所有回调,如下所示。

  1. pub async fn try_set(&mut self, new_state:T) -> Result<()> {
  2. if (block_on(self.state.read()).deref().eq(&new_state)) {
  3. return Ok(())
  4. }
  5. // todo change the state
  6. // fire every listener in spawn
  7. let mut fire_results = vec![];
  8. for listener in &mut self.listeners2 {
  9. let state = new_state.clone();
  10. let fire_listener = listener.clone();
  11. fire_results.push(tokio::spawn(async move {
  12. let mut guard = fire_listener.lock().unwrap();
  13. guard.deref_mut()(state);
  14. }));
  15. }
  16. // if fire result return Err, return it
  17. for fire_result in fire_results {
  18. fire_result.await?;
  19. }
  20. Ok(())
  21. }

但会导致编译错误。

  1. error[E0521]: borrowed data escapes outside of associated function
  2. --> src/taf/taf-core/src/execution/state_machine.rs:54:33
  3. |
  4. 15 | impl<'a,T> StateMachine<'a,T> where T:Clone+Eq+Send {
  5. | -- lifetime `'a` defined here
  6. ...
  7. 34 | pub async fn try_set(&mut self, new_state:T) -> Result<()> {
  8. | --------- `self` is a reference that is only valid in the associated function body
  9. ...
  10. 54 | let fire_listener = listener.clone();
  11. | ^^^^^^^^^^^^^^^^
  12. | |
  13. | `self` escapes the associated function body here
  14. | argument requires that `'a` must outlive `'static`

##########################################################

完整的代码与大量的业务逻辑结合在一起,所以我重写了如下两个演示,问题是相同的。第一个演示同步触发回调,它工作,第二个演示尝试异步触发回调,它遇到同样的问题:self在此处转义关联的函数体。
第一个演示(有效):

  1. use std::alloc::alloc;
  2. use std::ops::DerefMut;
  3. use std::sync::{Arc, Mutex, RwLock};
  4. use anyhow::Result;
  5. use dashmap::DashMap;
  6. struct StateMachine<'a,T> where T:Clone+Eq+'a {
  7. state: T,
  8. listeners: Vec<Box<Callback<'a, T>>>,
  9. }
  10. type Callback<'a, T> = dyn FnMut(T) -> Result<()> + Send + Sync + 'a;
  11. impl<'a, T> StateMachine<'a,T> where T:Clone+Eq+'a {
  12. pub fn new(init_state: T) -> Self {
  13. StateMachine {
  14. state: init_state,
  15. listeners: vec![]
  16. }
  17. }
  18. pub fn add_listener(&mut self, listener: Box<Callback<'a, T>>) -> Result<()> {
  19. self.listeners.push(listener);
  20. Ok(())
  21. }
  22. pub fn set(&mut self, new_state: T) -> Result<()> {
  23. self.state = new_state.clone();
  24. for listener in &mut self.listeners {
  25. listener(new_state.clone());
  26. }
  27. Ok(())
  28. }
  29. }
  30. #[derive(Clone, Eq, PartialEq, Hash)]
  31. enum ExeState {
  32. Waiting,
  33. Running,
  34. Finished,
  35. Failed,
  36. }
  37. struct Execution<'a> {
  38. exec_id: String,
  39. pub state_machine: StateMachine<'a, ExeState>,
  40. }
  41. struct ExecManager<'a> {
  42. all_jobs: Arc<RwLock<DashMap<String, Execution<'a>>>>,
  43. finished_jobs: Arc<RwLock<Vec<String>>>,
  44. }
  45. impl<'a> ExecManager<'a> {
  46. pub fn new() -> Self {
  47. ExecManager {
  48. all_jobs: Arc::new(RwLock::new(DashMap::new())),
  49. finished_jobs: Arc::new(RwLock::new(vec![]))
  50. }
  51. }
  52. fn add_job(&mut self, job_id: String) {
  53. let mut execution = Execution {
  54. exec_id: job_id.clone(),
  55. state_machine: StateMachine::new(ExeState::Waiting)
  56. };
  57. // add listener
  58. let callback_finished_jobs = self.finished_jobs.clone();
  59. let callback_job_id = job_id.clone();
  60. execution.state_machine.add_listener( Box::new(move |new_state| {
  61. println!("listener fired!, job_id {}", callback_job_id.clone());
  62. if new_state == ExeState::Finished || new_state == ExeState::Failed {
  63. let mut guard = callback_finished_jobs.write().unwrap();
  64. guard.deref_mut().push(callback_job_id.clone());
  65. }
  66. Ok(())
  67. }));
  68. let mut guard = self.all_jobs.write().unwrap();
  69. guard.deref_mut().insert(job_id, execution);
  70. }
  71. fn mock_exec(&mut self, job_id: String) {
  72. let mut guard = self.all_jobs.write().unwrap();
  73. let mut exec = guard.deref_mut().get_mut(&job_id).unwrap();
  74. exec.state_machine.set(ExeState::Finished);
  75. }
  76. }
  77. #[test]
  78. fn test() {
  79. let mut manager = ExecManager::new();
  80. manager.add_job(String::from("job_id1"));
  81. manager.add_job(String::from("job_id2"));
  82. manager.mock_exec(String::from("job_id1"));
  83. manager.mock_exec(String::from("job_id2"));
  84. }

第二个演示:

  1. use std::alloc::alloc;
  2. use std::ops::DerefMut;
  3. use std::sync::{Arc, Mutex, RwLock};
  4. use anyhow::Result;
  5. use dashmap::DashMap;
  6. use petgraph::algo::astar;
  7. struct StateMachine<'a,T> where T:Clone+Eq+Send+'a {
  8. state: T,
  9. listeners: Vec<Arc<Mutex<Box<Callback<'a, T>>>>>,
  10. }
  11. type Callback<'a, T> = dyn FnMut(T) -> Result<()> + Send + Sync + 'a;
  12. impl<'a, T> StateMachine<'a,T> where T:Clone+Eq+Send+'a {
  13. pub fn new(init_state: T) -> Self {
  14. StateMachine {
  15. state: init_state,
  16. listeners: vec![]
  17. }
  18. }
  19. pub fn add_listener(&mut self, listener: Box<Callback<'a, T>>) -> Result<()> {
  20. self.listeners.push(Arc::new(Mutex::new(listener)));
  21. Ok(())
  22. }
  23. pub fn set(&mut self, new_state: T) -> Result<()> {
  24. self.state = new_state.clone();
  25. for listener in &mut self.listeners {
  26. let spawn_listener = listener.clone();
  27. tokio::spawn(async move {
  28. let mut guard = spawn_listener.lock().unwrap();
  29. guard.deref_mut()(new_state.clone());
  30. });
  31. }
  32. Ok(())
  33. }
  34. }
  35. #[derive(Clone, Eq, PartialEq, Hash)]
  36. enum ExeState {
  37. Waiting,
  38. Running,
  39. Finished,
  40. Failed,
  41. }
  42. struct Execution<'a> {
  43. exec_id: String,
  44. pub state_machine: StateMachine<'a, ExeState>,
  45. }
  46. struct ExecManager<'a> {
  47. all_jobs: Arc<RwLock<DashMap<String, Execution<'a>>>>,
  48. finished_jobs: Arc<RwLock<Vec<String>>>,
  49. }
  50. impl<'a> ExecManager<'a> {
  51. pub fn new() -> Self {
  52. ExecManager {
  53. all_jobs: Arc::new(RwLock::new(DashMap::new())),
  54. finished_jobs: Arc::new(RwLock::new(vec![]))
  55. }
  56. }
  57. fn add_job(&mut self, job_id: String) {
  58. let mut execution = Execution {
  59. exec_id: job_id.clone(),
  60. state_machine: StateMachine::new(ExeState::Waiting)
  61. };
  62. // add listener
  63. let callback_finished_jobs = self.finished_jobs.clone();
  64. let callback_job_id = job_id.clone();
  65. execution.state_machine.add_listener( Box::new(move |new_state| {
  66. println!("listener fired!, job_id {}", callback_job_id.clone());
  67. if new_state == ExeState::Finished || new_state == ExeState::Failed {
  68. let mut guard = callback_finished_jobs.write().unwrap();
  69. guard.deref_mut().push(callback_job_id.clone());
  70. }
  71. Ok(())
  72. }));
  73. let mut guard = self.all_jobs.write().unwrap();
  74. guard.deref_mut().insert(job_id, execution);
  75. }
  76. fn mock_exec(&mut self, job_id: String) {
  77. let mut guard = self.all_jobs.write().unwrap();
  78. let mut exec = guard.deref_mut().get_mut(&job_id).unwrap();
  79. exec.state_machine.set(ExeState::Finished);
  80. }
  81. }
  82. #[test]
  83. fn test() {
  84. let mut manager = ExecManager::new();
  85. manager.add_job(String::from("job_id1"));
  86. manager.add_job(String::from("job_id2"));
  87. manager.mock_exec(String::from("job_id1"));
  88. manager.mock_exec(String::from("job_id2"));
  89. }

第二个演示的编译错误:

  1. error[E0521]: borrowed data escapes outside of associated function
  2. --> generic/src/callback2.rs:34:34
  3. |
  4. 15 | impl<'a, T> StateMachine<'a,T> where T:Clone+Eq+Send+'a {
  5. | -- lifetime `'a` defined here
  6. ...
  7. 29 | pub fn set(&mut self, new_state: T) -> Result<()> {
  8. | --------- `self` is a reference that is only valid in the associated function body
  9. ...
  10. 34 | let spawn_listener = listener.clone();
  11. | ^^^^^^^^^^^^^^^^
  12. | |
  13. | `self` escapes the associated function body here
  14. | argument requires that `'a` must outlive `'static`
  15. |
  16. = note: requirement occurs because of the type `std::sync::Mutex<Box<dyn FnMut(T) -> Result<(), anyhow::Error> + Send + Sync>>`, which makes the generic argument `Box<dyn FnMut(T) -> Result<(), anyhow::Error> + Send + Sync>` invariant
  17. = note: the struct `std::sync::Mutex<T>` is invariant over the parameter `T`
  18. = help: see <https://doc.rust-lang.org/nomicon/subtyping.html> for more information about variance
disbfnqx

disbfnqx1#

使用tokio::spawn() * 衍生的任务不能使用借用的数据 *(在这里,是指生存期为'a的数据,无论它是什么)。这是因为当前没有(并且可能永远不会有)任何方法来保证借用的数据可靠地生存于衍生的任务之后。
您有两个选择:
1.在不产生通知的情况下触发通知。您可以将通知future放入FuturesUnordered以同时运行它们,但它们仍然必须在try_set()之前完成。
1.删除生存期参数;停止允许借用数据的回调。在必要的地方将'static放在您的dyn类型上。更改StateMachine的用户,以便他们不尝试使用借用的数据,而是在必要时使用Arc

  1. pub struct StateMachine<T> where T: Clone + Eq + 'static {
  2. state: RwLock<T>,
  3. listeners2: Vec<Arc<Mutex<ListenerCallback<T>>>>,
  4. }
  5. pub type ListenerCallback<T> = dyn FnMut(T) -> Result<()> + Send + Sync + 'static;

相关问题