验证码:
use std::thread;
use chrono::Utc;
use tokio::sync::watch::Receiver;
use std::time::Duration;
use tokio::sync::watch;
use tokio::time::sleep;
pub fn get_now_formatted() -> String {
Utc::now().format("%Y-%m-%d %H:%M:%S.%3f").to_string()
}
#[tokio::main]
async fn main() {
let mut rx = subscribe_orderbook().await;
// simple thread that reads from the stream
tokio::spawn(async move {
loop {
println!("{}: Trying to read..", get_now_formatted());
rx.changed().await.expect("TODO: panic message");
println!("DOESN'T READ PROPERLY UNLESS WE COMMENT OUT AWAIT LINE IN SENDER");
println!("{}: Received: {:?}", get_now_formatted(), rx.borrow().clone());
}
});
loop {
sleep(Duration::from_millis(1000)).await;
}
}
pub async fn subscribe_orderbook() -> Receiver<i32> {
// this is where the issue will happen:
let (channel_tx, channel_rx) = watch::channel(1);
tokio::spawn( async move {
let mut counter = 0;
loop {
counter += 1;
let result = channel_tx.send(counter);
if result.is_err() {
eprintln!("{} Failed to send fair value to main thread: {}", get_now_formatted(), result.err().unwrap());
}
else {
println!("{} SENT {:?}", get_now_formatted(), counter);
}
// NOTE: commenting out this pointless await fixes the problem!
// sleep(Duration::from_millis(0)).await;
}
});
channel_rx
}
- 它只打印一个“已收到..”,尽管打印了许多“已发送”。
- 我可以通过注解掉
sleep(Duration::from_millis(0)).await
行来修复它
从本质上讲,这只适用于发送者任务在其闭包中的某个位置有一个“await”的情况。
请注意,这是特定于发送方任务的。如果我创建一个永久循环的单独任务,它不会导致任何问题。这意味着,将以下内容添加到main函数不会导致问题:
// NOTE: Let's add a task that blocks forever... does not make any difference to either case
tokio::spawn(async move {
loop {
thread::sleep(Duration::from_millis(1000));
}
});
为什么在sender中添加任何“await”都有效,简单地添加无意义的睡眠0毫秒行作为解决方案是“正确的”吗?
谢谢你
编辑:
看起来仅仅添加一个“await”是不够的,因为它引入了奇怪的延迟。
2条答案
按热度按时间dtcbnfnu1#
时雄任务不应该阻塞,否则你会遇到这样的情况。你应该使用tokio的sleep而不是
std::thread::sleep
:如果你正在执行一个计算量很大或阻塞的任务,你应该使用
spawn_blocking
:另一个有用的方法是插入对
yield_now
的调用。这是对零持续时间睡眠的直接替代,但如果可以使用其他解决方案,它们会更好。cwxwcias2#
正如我所怀疑的,问题在于发送者任务中缺少任何“等待”。
问题不在于
thread::sleep
的使用。删除那一行(这意味着我将永远循环),并不能解决问题。我已经编辑了我的问题,完全删除了那一行。使用send通道的sender循环似乎必须在使用await时的某个时刻“给予控制权”。否则,没有其他任务可以从通道的接收方部分读取。
解决方案是:
1.使用时雄::task::spawn_blocking
1.注解掉我的代码中执行无意义的await的行:
sleep(Duration::from_millis(0)).await;
然而,只有第一个解决方案似乎是正确的,第二个解决方案引入了奇怪的延迟。