我实现了一个无锁的环形缓冲区,然后我测试调试是好的,但在释放模式下,它不能总是工作。
use std::path::Display;
use std::sync::Arc;
# [derive(Debug)]
pub struct RingBuffer<T, const m_size: usize> {
idx_head: usize,
idx_tail: usize,
m_data: [T; m_size],
}
pub trait Queue<T> {
fn new_empty() -> Self;
fn push(&mut self, value: T) -> bool;
fn pop(&mut self) -> Option<&T>;
fn is_full(&self) -> bool;
fn is_empty(&self) -> bool;
}
impl<T, const Size: usize> Queue<T> for RingBuffer<T, Size>
{
fn new_empty() -> Self {
RingBuffer::<T, Size> {
idx_head: 0,
idx_tail: 0,
m_data: array_init::array_init(|_| {
unsafe {
std::mem::zeroed()
}
}),
}
}
fn push(&mut self, value: T) -> bool {
let mut head = self.idx_head + 1;
if head == Size {
head = 0;
}
if head == self.idx_tail {
return false;
}
self.m_data[self.idx_head] = value;
self.idx_head = head;
return true;
}
fn pop(&mut self) -> Option<&T> {
let mut tail = self.idx_tail;
if self.idx_head == tail {
return None;
}
let res = &self.m_data[tail];
tail += 1;
if tail == Size {
tail = 0;
}
self.idx_tail = tail;
return Some(res);
}
fn is_full(&self) -> bool {
self.idx_tail == (self.idx_head + 1) % Size
}
fn is_empty(&self) -> bool {
self.idx_head == self.idx_tail
}
}
pub struct SharedRingBuffer<T, const m_size: usize> {
pub ringbuffer: Arc<RingBuffer<T, m_size>>,
}
impl<T, const Size: usize> Clone for SharedRingBuffer<T, Size> {
fn clone(&self) -> Self {
Self {
ringbuffer: self.ringbuffer.clone(),
}
}
}
impl<T, const Size: usize, > Queue<T> for SharedRingBuffer<T, Size> {
fn new_empty() -> Self {
Self {
ringbuffer: Arc::new(RingBuffer::<T, Size>::new_empty()),
}
}
fn push(&mut self, value: T) -> bool {
unsafe {
(*Arc::get_mut_unchecked(&mut self.ringbuffer)).push(value)
}
}
fn pop(&mut self) -> Option<&T> {
unsafe {
(*Arc::get_mut_unchecked(&mut self.ringbuffer)).pop()
}
}
fn is_full(&self) -> bool {
self.ringbuffer.is_full()
}
fn is_empty(&self) -> bool {
self.ringbuffer.is_empty()
}
}
////////////////////// for test//////////////////////////
fn test_speed1() {
let mut q: SharedRingBuffer<i32, 8> = SharedRingBuffer::new_empty();
let mut t0 = std::time::SystemTime::now();
let t = {
let mut q = q.clone();
std::thread::spawn(move || {
loop {
let t = match q.pop() {
None => {
// std::thread::sleep(Duration::from_millis(10));
continue;
}
Some(res) => res
};
if *t == -1 {
break;
}
std::thread::sleep(Duration::from_millis(1));
}
let now = std::time::SystemTime::now();
println!("res: {}", now.duration_since(t0).unwrap().as_millis());
})
};
for i in 0..99 {
loop {
if q.push(i) {
// std::thread::sleep(Duration::from_millis(10));
break;
}
}
}
q.push(-1);
t.join().unwrap();
}
当i为q.push和q.pop方法添加std::thread::sleep(Duration::from_millis(10))
时,它工作得很好。
rustc 1.67.0-nightly (95a3a7277 2022-10-31)
binary: rustc
commit-hash: 95a3a7277b44bbd2dd3485703d9a05f64652b60e
commit-date: 2022-10-31
host: x86_64-pc-windows-msvc
release: 1.67.0-nightly
LLVM version: 15.0.4
我希望RingBuffer能够很好地工作。等效代码为:
fn test_speed2() {
let (send, recv) = channel::<i32>();
let mut is_run = SharedValue::new(true);
let mut t0 = std::time::SystemTime::now();
let t = {
let is_run = is_run.clone();
std::thread::spawn(move || {
loop {
let t = match recv.recv() {
Err(e) => {
break;
}
Ok(res) => res
};
if t == -1 {
break;
}
std::thread::sleep(Duration::from_millis(1));
}
let now = std::time::SystemTime::now();
// println!("res: {}", now.duration_since(t0).unwrap().as_millis());
})
};
for i in 0..99 {
send.send(i).unwrap();
}
send.send(-1).unwrap();
t.join().unwrap();
}
我希望ringbuffer可以代替通道来实现两个线程之间的通信,因为ringbuffer是无锁的,而且速度更快。
2条答案
按热度按时间dfddblmv1#
您的代码通过
Arc::get_mut_unchecked()
同时创建两个对同一对象的可变引用,导致了未定义的行为。看起来这甚至是您的意图,但它公然违反了Rust的规则。即使使用unsafe
,您也不能违反可变引用是 * 独占 * 的要求。使用
cargo miri
运行代码会报告以下未定义的行为:你需要重新考虑你的设计。你可能需要一个类似这样的基础来保证在线程之间进行修改的安全性:
xggvc2p62#
这实际上是由CPU缓存引起的,解决方法如下: