我只使用原子编写了Barrier的以下实现:
use std::sync::atomic::{AtomicUsize, Ordering};
pub struct Barrier {
pub done: AtomicUsize,
pub tids: usize,
}
impl Barrier {
pub fn new(tids: usize) -> Barrier {
Barrier {
done: AtomicUsize::new(0),
tids,
}
}
pub fn wait(&self) {
let done = self.done.fetch_add(1, Ordering::SeqCst);
if done + 1 == self.tids {
self.done.store(0, Ordering::SeqCst);
} else {
while self.done.load(Ordering::SeqCst) != 0 {}
}
}
}
它不能像预期的那样工作。例如,
// inside threads loop
barrier.wait();
println!("a");
barrier.wait();
println!("b");
直觉上,它应该可以工作,因为一旦.wait()
被调用,它就会挂起在while
循环上,在所有线程都调用了.wait()
之后,它就会从循环中脱离出来,并为下一个.wait()
重置计数器。相反,它最终会挂起。下面是一个用法示例:
fn main() {
println!("Hello, world!");
let barrier = &Barrier::new(10);
std::thread::scope(|s| {
for tid in 0 .. 10 {
s.spawn(move || {
loop {
barrier.wait();
println!("{} a", tid);
barrier.wait();
println!("{} b", tid);
}
});
}
});
}
2条答案
按热度按时间zlwx9yxi1#
问题在于,两个连续屏障之间存在竞争条件:
1
,完全错过了屏障释放。如果你确信你总是在使用相同的线程,你可以通过使用两个计数器来解决这个问题,并在它们之间来回切换。这样所有的线程要么等待第一个,要么等待第二个。但是一个线程没有办法绕过其他线程,因为它必须通过第二个计数器来再次阻塞第一个线程。而第二个线程只有在第一个线程中没有剩余线程时才会解除阻塞。
这一条似乎行得通:
另一种方法是使用迭代计数器。
出于与两个
done
计数器之间的翻转工作相同的原因,两次迭代的迭代计数器(=布尔值)应该足够了。这一条也适用于我:
**重要提示:**这只在线程总是相同的情况下有效。如果不同的线程使用这个屏障,那么就需要有一个更大的迭代计数器。
thigvfpy2#
在Rust中使用原子实现Barrier存在几个问题。
首先,Barrier结构体中的done计数器永远不会重置为0。这意味着,一旦done计数器达到tids的值,它将永远不会重置,并且wait()函数将始终挂起while循环。
第二,当线程从wait()函数释放时,done计数器不会递减,这意味着done计数器会不断增加,最终溢出并导致未定义的行为。
第三,done计数器不受任何同步机制(如互斥锁)的保护,因此多个线程有可能并发修改done计数器,从而导致争用条件和不可预测的行为。
总的来说,Barrier的这种实现是不正确的,不应该使用。相反,一个更好的方法是使用互斥体来保护done计数器,并在所有线程都到达barrier后正确地重置它。