
x33g5p2x  于2021-11-22 转载在 其他  




CyclicBarrier cyclicBarrier = new CyclicBarrier(3, ()->{System.out.println("人满发车了");});

for (int i = 0; i < 10; i++) {
    int finalI = i;
    new Thread(()->{
        System.out.println("第"+ finalI +"个人上车了");
        try {
        } catch (InterruptedException e) {
        } catch (BrokenBarrierException e) {



1. 先看下CyclicBarrier的结构有哪些方法和属性

private static class Generation {
    boolean broken = false;

private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
private final int parties;
private final Runnable barrierCommand;
private Generation generation = new Generation();
private int count;


public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;

public CyclicBarrier(int parties) {
    this(parties, null);

2. 接着我们分析一下CyclicBarrier的await()方法

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
   final ReentrantLock lock = this.lock;
   try {
       final Generation g = generation;

       if (g.broken)
           throw new BrokenBarrierException();
       if (Thread.interrupted()) {
           throw new InterruptedException();

       int index = --count;
       if (index == 0) {  // tripped
           boolean ranAction = false;
           try {
               final Runnable command = barrierCommand;
               if (command != null)
               ranAction = true;
               return 0;
           } finally {
               if (!ranAction)

       // loop until tripped, broken, interrupted, or timed out
       for (;;) {
           try {
               if (!timed)
               else if (nanos > 0L)
                   nanos = trip.awaitNanos(nanos);
           } catch (InterruptedException ie) {
               if (g == generation && ! g.broken) {
                   throw ie;
               } else {
                   // We're about to finish waiting even if we had not
                   // been interrupted, so this interrupt is deemed to
                   // "belong" to subsequent execution.

           if (g.broken)
               throw new BrokenBarrierException();

           if (g != generation)
               return index;

           if (timed && nanos <= 0L) {
               throw new TimeoutException();
   } finally {

/** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
    if (interruptMode != 0)

/** * Adds a new waiter to wait queue. * @return its new wait node */
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        t = lastWaiter;
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
        t.nextWaiter = node;
    lastWaiter = node;
    return node;

/** * Invokes release with current state value; returns saved state. * Cancels node and throws exception on failure. * @param node the condition node for this wait * @return previous sync state */
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;

/** * Unlinks cancelled waiter nodes from condition queue. * Called only while holding lock. This is called when * cancellation occurred during condition wait, and upon * insertion of a new waiter when lastWaiter is seen to have * been cancelled. This method is needed to avoid garbage * retention in the absence of signals. So even though it may * require a full traversal, it comes into play only when * timeouts or cancellations occur in the absence of * signals. It traverses all nodes rather than stopping at a * particular target to unlink all pointers to garbage nodes * without requiring many re-traversals during cancellation * storms. */
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
            trail = t;
        t = next;

/** * Updates state on barrier trip and wakes up everyone. * Called only while holding lock. */
private void nextGeneration() {
    // signal completion of last generation
    // set up next generation 
    count = parties;
    generation = new Generation();

/** * Moves all threads from the wait queue for this condition to * the wait queue for the owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */
public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)

/** * Removes and transfers all nodes. * @param first (non-null) the first node on condition queue */
private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        first = next;
    } while (first != null);

/** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) */
final boolean transferForSignal(Node node) {
    /* * If cannot change waitStatus, the node has been cancelled. */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    return true;

/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;


3. 再来看一下我们的reset()方法,可以重置我们的计数器,并释放所有线程

/** * Resets the barrier to its initial state. If any parties are * currently waiting at the barrier, they will return with a * {@link BrokenBarrierException}. Note that resets <em>after</em> * a breakage has occurred for other reasons can be complicated to * carry out; threads need to re-synchronize in some other way, * and choose one to perform the reset. It may be preferable to * instead create a new barrier for subsequent use. */
public void reset() {
    final ReentrantLock lock = this.lock;
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {

/** * Sets current barrier generation as broken and wakes up everyone. * Called only while holding lock. */
private void breakBarrier() {
   generation.broken = true;
   count = parties;




  1. CountDownLatch的计数器只能使用一次,而CyclicBarrier可以通过reset()使用多次.
  2. CountDownLatch会阻塞主线程,CyclicBarrier不会阻塞主线程,只会阻塞子线程。
  3. CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不 同。CountDownLatch一般用于一个或多个线程,等待其他线程执行完任务后,再执行。 CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行。
  4. CyclicBarrier 还可以提供一个 barrierAction,合并多线程计算结果。
  5. CyclicBarrier是通过ReentrantLock的"独占锁"和Conditon来实现一组线程的阻塞唤 醒的,而CountDownLatch则是通过AQS的“共享锁”实现。

