rust 时雄接收方不能从通道读取,除非发送方任务的闭包中有任何“await”,tokio中的潜在错误或怪癖?

bejyjqdl  于 2023-04-12  发布在  其他
关注(0)|答案(2)|浏览(183)

验证码:

use std::thread;
use chrono::Utc;
use tokio::sync::watch::Receiver;
use std::time::Duration;
use tokio::sync::watch;
use tokio::time::sleep;

pub fn get_now_formatted() -> String {
    Utc::now().format("%Y-%m-%d %H:%M:%S.%3f").to_string()
}

#[tokio::main]
async fn main() {
    let mut rx = subscribe_orderbook().await;

    // simple thread that reads from the stream
    tokio::spawn(async move {
        loop {
            println!("{}: Trying to read..", get_now_formatted());
            rx.changed().await.expect("TODO: panic message");
            println!("DOESN'T READ PROPERLY UNLESS WE COMMENT OUT AWAIT LINE IN SENDER");
            println!("{}: Received: {:?}", get_now_formatted(), rx.borrow().clone());
        }
    });

    loop {
        sleep(Duration::from_millis(1000)).await;
    }
}

pub async fn subscribe_orderbook() -> Receiver<i32> {
    // this is where the issue will happen:
    let (channel_tx, channel_rx) = watch::channel(1);

    tokio::spawn( async move {
        let mut counter = 0;
        loop {
            counter += 1;
            let result = channel_tx.send(counter);
            if result.is_err() {
                eprintln!("{} Failed to send fair value to main thread: {}", get_now_formatted(), result.err().unwrap());
            }
            else {
                println!("{} SENT {:?}", get_now_formatted(), counter);
            }

            // NOTE: commenting out this pointless await fixes the problem!
            // sleep(Duration::from_millis(0)).await;
        }
    });
   channel_rx
}
  • 它只打印一个“已收到..”,尽管打印了许多“已发送”。
  • 我可以通过注解掉sleep(Duration::from_millis(0)).await行来修复它

从本质上讲,这只适用于发送者任务在其闭包中的某个位置有一个“await”的情况。
请注意,这是特定于发送方任务的。如果我创建一个永久循环的单独任务,它不会导致任何问题。这意味着,将以下内容添加到main函数不会导致问题:

// NOTE: Let's add a task that blocks forever... does not make any difference to either case
    tokio::spawn(async move {
        loop {
            thread::sleep(Duration::from_millis(1000));
        }
    });

为什么在sender中添加任何“await”都有效,简单地添加无意义的睡眠0毫秒行作为解决方案是“正确的”吗?
谢谢你
编辑:
看起来仅仅添加一个“await”是不够的,因为它引入了奇怪的延迟。

dtcbnfnu

dtcbnfnu1#

时雄任务不应该阻塞,否则你会遇到这样的情况。你应该使用tokio的sleep而不是std::thread::sleep

// thread::sleep(Duration::from_millis(1000));
sleep(Duration::from_millis(1000)).await;

如果你正在执行一个计算量很大或阻塞的任务,你应该使用spawn_blocking

tokio::task::spawn_blocking(|| expensive_task())
    .await
    .unwrap();

另一个有用的方法是插入对yield_now的调用。这是对零持续时间睡眠的直接替代,但如果可以使用其他解决方案,它们会更好。

for _ in 0..100 {
    tokio::task::yield_now().await;
    // Something that takes some time
    thread::sleep(Duration::from_millis(10));
}
cwxwcias

cwxwcias2#

正如我所怀疑的,问题在于发送者任务中缺少任何“等待”。
问题不在于thread::sleep的使用。删除那一行(这意味着我将永远循环),并不能解决问题。我已经编辑了我的问题,完全删除了那一行。

使用send通道的sender循环似乎必须在使用await时的某个时刻“给予控制权”。否则,没有其他任务可以从通道的接收方部分读取。

解决方案是:
1.使用时雄::task::spawn_blocking
1.注解掉我的代码中执行无意义的await的行:sleep(Duration::from_millis(0)).await;
然而,只有第一个解决方案似乎是正确的,第二个解决方案引入了奇怪的延迟。

相关问题