rust 是否可以在旋转循环中调用`try_recv()`?

fdbelqdn  于 2023-08-05  发布在  其他
关注(0)|答案(1)|浏览(87)

我想按以下方式处理在tokio::mpsc::Receiver<T>上接收到的消息:

loop {
    match rx.try_recv() {
        Ok(msg) => {
            // add to buffer/queue but process all when buffer.len() = 100
            ...
        },
        Err(e) => {
            match e {
                TryRecvError::Empty => {
                    // if there are no more messages ready immediately, 
                    // drain the buffer/process all currently in queue
                    ...
                },
                _ => ()
            }
        },
    }
    // short sleep required here?
}

字符串
是否有必要插入一个小睡眠,以防止线程旋转(如果是这样,我应该如何确定持续时间)?是否对该方法进行了其他监督/改进?
编辑:这是在异步上下文中,在时雄运行时。
编辑2:人们建议使用阻塞recv,但我怎么能有同样的行为呢?特别是,我希望缓冲接收到的消息,直到我缓冲了100条消息,此时处理所有消息,如果接收机上没有新消息,则处理缓冲区中的消息。如何使用阻塞读取来实现这一点?

ctrmrzij

ctrmrzij1#

看起来你实际上想要的是对 * 第一个 * 项使用recv(),然后对后面的项使用try_recv(),直到你的缓冲区有100个项或者读取返回空。然后处理这些项,并返回到分块读取。举例来说:

const BATCH_SIZE: usize = 100;

let mut buffer = Vec::with_capacity(BATCH_SIZE);

loop {
    match rx.recv().await {
        Some(msg) => buffer.push(msg),
        None => break,
    };

    while buffer.len() < BATCH_SIZE {
        match rx.try_recv() {
            Ok(msg) => buffer.push(msg),

            // For both errors (Disconnected and Empty), the correct action
            // is to process the items.  If the error was Disconnected, on
            // the next iteration rx.recv().await will be None and we'll
            // break from the outer loop anyway.
            Err(_) => break,
        }
    }

    process_the_buffer(&buffer);
    buffer.clear();
}

字符串

相关问题