rust lockfree ringbuffer无法在释放模式下工作

5lhxktic  于 2022-11-12  发布在  其他
关注(0)|答案(2)|浏览(190)

我实现了一个无锁的环形缓冲区,然后我测试调试是好的,但在释放模式下,它不能总是工作。

use std::path::Display;
use std::sync::Arc;

# [derive(Debug)]

pub struct RingBuffer<T, const m_size: usize> {
    idx_head: usize,
    idx_tail: usize,
    m_data: [T; m_size],
}

pub trait Queue<T> {
    fn new_empty() -> Self;
    fn push(&mut self, value: T) -> bool;
    fn pop(&mut self) -> Option<&T>;
    fn is_full(&self) -> bool;
    fn is_empty(&self) -> bool;
}

impl<T, const Size: usize> Queue<T> for RingBuffer<T, Size>
{
    fn new_empty() -> Self {
        RingBuffer::<T, Size> {
            idx_head: 0,
            idx_tail: 0,
            m_data: array_init::array_init(|_| {
                unsafe {
                    std::mem::zeroed()
                }
            }),
        }
    }

    fn push(&mut self, value: T) -> bool {
        let mut head = self.idx_head + 1;
        if head == Size {
            head = 0;
        }
        if head == self.idx_tail {
            return false;
        }
        self.m_data[self.idx_head] = value;
        self.idx_head = head;
        return true;
    }

    fn pop(&mut self) -> Option<&T> {
        let mut tail = self.idx_tail;
        if self.idx_head == tail {
            return None;
        }
        let res = &self.m_data[tail];
        tail += 1;
        if tail == Size {
            tail = 0;
        }
        self.idx_tail = tail;
        return Some(res);
    }

    fn is_full(&self) -> bool {
        self.idx_tail == (self.idx_head + 1) % Size
    }

    fn is_empty(&self) -> bool {
        self.idx_head == self.idx_tail
    }
}

pub struct SharedRingBuffer<T, const m_size: usize> {
    pub ringbuffer: Arc<RingBuffer<T, m_size>>,
}

impl<T, const Size: usize> Clone for SharedRingBuffer<T, Size> {
    fn clone(&self) -> Self {
        Self {
            ringbuffer: self.ringbuffer.clone(),
        }
    }
}
impl<T, const Size: usize, > Queue<T> for SharedRingBuffer<T, Size> {
    fn new_empty() -> Self {
        Self {
            ringbuffer: Arc::new(RingBuffer::<T, Size>::new_empty()),
        }
    }

    fn push(&mut self, value: T) -> bool {
        unsafe {
            (*Arc::get_mut_unchecked(&mut self.ringbuffer)).push(value)
        }
    }

    fn pop(&mut self) -> Option<&T> {
        unsafe {
            (*Arc::get_mut_unchecked(&mut self.ringbuffer)).pop()
        }
    }

    fn is_full(&self) -> bool {
        self.ringbuffer.is_full()
    }

    fn is_empty(&self) -> bool {
        self.ringbuffer.is_empty()
    }
}
////////////////////// for test//////////////////////////
fn test_speed1() {
    let mut q: SharedRingBuffer<i32, 8> = SharedRingBuffer::new_empty();
    let mut t0 = std::time::SystemTime::now();
    let t = {
        let mut q = q.clone();
        std::thread::spawn(move || {
            loop {
                let t = match q.pop() {
                    None => {
                        // std::thread::sleep(Duration::from_millis(10));
                        continue;
                    }
                    Some(res) => res
                };
                if *t == -1 {
                    break;
                }
                std::thread::sleep(Duration::from_millis(1));
            }
            let now = std::time::SystemTime::now();
            println!("res: {}", now.duration_since(t0).unwrap().as_millis());
        })
    };
    for i in 0..99 {
        loop {
            if q.push(i) {
                // std::thread::sleep(Duration::from_millis(10));
                break;
            }
        }
    }
    q.push(-1);
    t.join().unwrap();
}

当i为q.push和q.pop方法添加std::thread::sleep(Duration::from_millis(10))时,它工作得很好。

rustc 1.67.0-nightly (95a3a7277 2022-10-31)
binary: rustc
commit-hash: 95a3a7277b44bbd2dd3485703d9a05f64652b60e
commit-date: 2022-10-31
host: x86_64-pc-windows-msvc
release: 1.67.0-nightly
LLVM version: 15.0.4

我希望RingBuffer能够很好地工作。等效代码为:

fn test_speed2() {
    let (send, recv) = channel::<i32>();
    let mut is_run = SharedValue::new(true);
    let mut t0 = std::time::SystemTime::now();
    let t = {
        let is_run = is_run.clone();
        std::thread::spawn(move || {
            loop {
                let t = match recv.recv() {
                    Err(e) => {
                        break;
                    }
                    Ok(res) => res
                };
                if t == -1 {
                    break;
                }
                std::thread::sleep(Duration::from_millis(1));
            }
            let now = std::time::SystemTime::now();
            // println!("res: {}", now.duration_since(t0).unwrap().as_millis());
        })
    };
    for i in 0..99 {
        send.send(i).unwrap();
    }
    send.send(-1).unwrap();
    t.join().unwrap();
}

我希望ringbuffer可以代替通道来实现两个线程之间的通信,因为ringbuffer是无锁的,而且速度更快。

dfddblmv

dfddblmv1#

您的代码通过Arc::get_mut_unchecked()同时创建两个对同一对象的可变引用,导致了未定义的行为。看起来这甚至是您的意图,但它公然违反了Rust的规则。即使使用unsafe,您也不能违反可变引用是 * 独占 * 的要求。
使用cargo miri运行代码会报告以下未定义的行为:

error: Undefined Behavior: Data race detected between Read on thread `<unnamed>` and Write on thread `main` at alloc1894+0x10
   --> bar/src/main.rs:45:12
    |
45  |         if self.idx_head == tail {
    |            ^^^^^^^^^^^^^ Data race detected between Read on thread `<unnamed>` and Write on thread `main` at alloc1894+0x10
    |
    = help: this indicates a bug in the program: it performed an invalid operation, and caused Undefined Behavior
    = help: see https://doc.rust-lang.org/nightly/reference/behavior-considered-undefined.html for further information
    = note: BACKTRACE:
    = note: inside `<RingBuffer<i32, 8> as Queue<i32>>::pop` at bar/src/main.rs:45:12
note: inside `<SharedRingBuffer<i32, 8> as Queue<i32>>::pop` at bar/src/main.rs:89:18
   --> bar/src/main.rs:89:18
    |
89  |         unsafe { (*Arc::get_mut_unchecked(&mut self.ringbuffer)).pop() }
    |                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
note: inside closure at bar/src/main.rs:108:31
   --> bar/src/main.rs:108:31
    |
108 |                 let t = match q.pop() {
    |                               ^^^^^^^

你需要重新考虑你的设计。你可能需要一个类似这样的基础来保证在线程之间进行修改的安全性:

use std::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::sync::atomic::AtomicUsize;

pub struct RingBuffer<T, const SIZE: usize> {
    idx_head: AtomicUsize,
    idx_tail: AtomicUsize,
    m_data: [UnsafeCell<MaybeUninit<T>>; SIZE],
}
xggvc2p6

xggvc2p62#

这实际上是由CPU缓存引起的,解决方法如下:

fn push(&mut self, value: T) -> bool {
    let mut head  = unsafe {
        std::ptr::read_volatile(&self.idx_head) + 1
    };
    let tail = unsafe {
        std::ptr::read_volatile(&self.idx_tail)
    };
    if head == Size {
        head = 0;
    }
    if head == tail {
        return false;
    }
    self.m_data[self.idx_head] = value;
    unsafe {
        std::ptr::write_volatile(&mut self.idx_head, head);
    }
    return true;
}

fn pop(&mut self) -> Option<&T> {
    let mut tail = unsafe {
        std::ptr::read_volatile(&self.idx_tail)
    };
    let head  = unsafe {
        std::ptr::read_volatile(&self.idx_head)
    };
    if head == tail {
        return None;
    }
    let res = &self.m_data[tail];
    tail += 1;
    if tail == Size {
        tail = 0;
    }
    unsafe {
        std::ptr::write_volatile(&mut self.idx_tail, tail);
    }
    return Some(res);
}

相关问题