在Rust中使用原子实现Barrier有什么问题?

6l7fqoea  于 2022-12-04  发布在  其他
关注(0)|答案(2)|浏览(173)

我只使用原子编写了Barrier的以下实现:

use std::sync::atomic::{AtomicUsize, Ordering};

pub struct Barrier {
  pub done: AtomicUsize,
  pub tids: usize,
}

impl Barrier {
  pub fn new(tids: usize) -> Barrier {
    Barrier {
      done: AtomicUsize::new(0),
      tids,
    }
  }

  pub fn wait(&self) {
    let done = self.done.fetch_add(1, Ordering::SeqCst);
    if done + 1 == self.tids {
      self.done.store(0, Ordering::SeqCst);
    } else {
      while self.done.load(Ordering::SeqCst) != 0 {}
    }
  }
}

它不能像预期的那样工作。例如,

// inside threads loop
barrier.wait();
println!("a");
barrier.wait();
println!("b");

直觉上,它应该可以工作,因为一旦.wait()被调用,它就会挂起在while循环上,在所有线程都调用了.wait()之后,它就会从循环中脱离出来,并为下一个.wait()重置计数器。相反,它最终会挂起。下面是一个用法示例:

fn main() {
  println!("Hello, world!");

  let barrier = &Barrier::new(10);

  std::thread::scope(|s| {
    for tid in 0 .. 10 {
      s.spawn(move || {
        loop {
          barrier.wait();
          println!("{} a", tid);
          barrier.wait();
          println!("{} b", tid);
        }
      });
    }
  });
}
zlwx9yxi

zlwx9yxi1#

问题在于,两个连续屏障之间存在竞争条件:

  • 等待屏障时,线程可能会被取消调度。
  • 第二个线程(屏障等待的最后一个线程)进入屏障,释放它,运行它的下一个迭代,然后再次进入屏障
  • 第一个线程醒来并看到值1,完全错过了屏障释放。

如果你确信你总是在使用相同的线程,你可以通过使用两个计数器来解决这个问题,并在它们之间来回切换。这样所有的线程要么等待第一个,要么等待第二个。但是一个线程没有办法绕过其他线程,因为它必须通过第二个计数器来再次阻塞第一个线程。而第二个线程只有在第一个线程中没有剩余线程时才会解除阻塞。
这一条似乎行得通:

use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};

pub struct Barrier {
    pub done: [AtomicUsize; 2],
    pub use_first_done: AtomicBool,
    pub tids: usize,
}

impl Barrier {
    pub fn new(tids: usize) -> Barrier {
        Barrier {
            done: [AtomicUsize::new(0), AtomicUsize::new(0)],
            use_first_done: AtomicBool::new(true),
            tids,
        }
    }

    pub fn wait(&self) {
        let done = if self.use_first_done.load(Ordering::SeqCst) {
            &self.done[0]
        } else {
            &self.done[1]
        };

        let num_done = done.fetch_add(1, Ordering::SeqCst) + 1;
        if num_done == self.tids {
            self.use_first_done.fetch_xor(true, Ordering::SeqCst);
            done.store(0, Ordering::SeqCst);
        } else {
            while done.load(Ordering::SeqCst) != 0 {}
        }
    }
}

fn main() {
    println!("Hello, world!");

    let barrier = &Barrier::new(10);

    std::thread::scope(|s| {
        for tid in 0..10 {
            s.spawn(move || loop {
                barrier.wait();
                println!("{} a", tid);
                barrier.wait();
                println!("{} b", tid);
            });
        }
    });
}

另一种方法是使用迭代计数器。
出于与两个done计数器之间的翻转工作相同的原因,两次迭代的迭代计数器(=布尔值)应该足够了。
这一条也适用于我:

use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};

pub struct Barrier {
    pub done: AtomicUsize,
    pub iteration: AtomicBool,
    pub tids: usize,
}

impl Barrier {
    pub fn new(tids: usize) -> Barrier {
        Barrier {
            done: AtomicUsize::new(0),
            iteration: AtomicBool::new(false),
            tids,
        }
    }

    pub fn wait(&self) {
        let iteration = self.iteration.load(Ordering::SeqCst);
        let num_done = self.done.fetch_add(1, Ordering::SeqCst) + 1;
        if num_done == self.tids {
            self.done.store(0, Ordering::SeqCst);
            self.iteration.fetch_xor(true, Ordering::SeqCst);
        } else {
            while iteration == self.iteration.load(Ordering::SeqCst) {}
        }
    }
}

fn main() {
    println!("Hello, world!");

    let barrier = &Barrier::new(10);

    std::thread::scope(|s| {
        for tid in 0..10 {
            s.spawn(move || loop {
                barrier.wait();
                println!("{} a", tid);
                barrier.wait();
                println!("{} b", tid);
            });
        }
    });
}

**重要提示:**这只在线程总是相同的情况下有效。如果不同的线程使用这个屏障,那么就需要有一个更大的迭代计数器。

thigvfpy

thigvfpy2#

在Rust中使用原子实现Barrier存在几个问题。
首先,Barrier结构体中的done计数器永远不会重置为0。这意味着,一旦done计数器达到tids的值,它将永远不会重置,并且wait()函数将始终挂起while循环。
第二,当线程从wait()函数释放时,done计数器不会递减,这意味着done计数器会不断增加,最终溢出并导致未定义的行为。
第三,done计数器不受任何同步机制(如互斥锁)的保护,因此多个线程有可能并发修改done计数器,从而导致争用条件和不可预测的行为。
总的来说,Barrier的这种实现是不正确的,不应该使用。相反,一个更好的方法是使用互斥体来保护done计数器,并在所有线程都到达barrier后正确地重置它。

相关问题