JUC系列之CyclicBarrier详解

x33g5p2x  于2021-11-22 转载在 其他  
字(9.2k)|赞(0)|评价(0)|浏览(408)

最近又在重读CyclicBarrier源码,并进行了深入分析,重点源码也自己跟过并做了一些注释,仅供大家参考。

CyclicBarrier:回环栅栏(有人也称之为循环屏障),通过他可以让一组线程等待至某个状态(屏障点)之后再全部同时执行,同时他还有一个特点,所有线程都被释放了以后,CyclicBarrier还可以被重用。

废话不多说,一切以实践为主,以下是我写的一个例子,供大家理解。

CyclicBarrier cyclicBarrier = new CyclicBarrier(3, ()->{System.out.println("人满发车了");});

System.out.println("一车3人,车满发车");
for (int i = 0; i < 10; i++) {
    int finalI = i;
    new Thread(()->{
        System.out.println("第"+ finalI +"个人上车了");
        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }).start();
}

再来看一下执行结果:

从结果可以看出来,车上每上3个人,就会人满发车,最后第9个人上车了,但是由于不满3个人,所以线程一直没发车,直到等到3个人满时才会发车。

1. 先看下CyclicBarrier的结构有哪些方法和属性

//栅栏代
private static class Generation {
	//是否要打破
    boolean broken = false;
}

//资源独占锁
private final ReentrantLock lock = new ReentrantLock();
//条件等待队列
private final Condition trip = lock.newCondition();
//拦截的线程数
private final int parties;
//到达屏障点(换代前),执行的方法
private final Runnable barrierCommand;
//栅栏代
private Generation generation = new Generation();
//线程计数器
private int count;

CyclicBarrier提供了2个构造方法。

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    //标识屏障拦截的线程数量,相当于一个副本,作用是在达到屏障后进行重置count操作
    this.parties = parties;
    //线程计数器
    this.count = parties;
    //达到屏障点后执行的方法
    this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
    this(parties, null);
}

2. 接着我们分析一下CyclicBarrier的await()方法

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException
   //实际用的是一把独占锁 
   final ReentrantLock lock = this.lock;
   lock.lock();
   try {
       final Generation g = generation;

       if (g.broken)
           throw new BrokenBarrierException();
       //线程是否中断,如果中断的话,需要打破栅栏,并抛出中断异常 
       if (Thread.interrupted()) {
           breakBarrier();
           throw new InterruptedException();
       }

       int index = --count;
       if (index == 0) {  // tripped
           boolean ranAction = false;
           try {
               final Runnable command = barrierCommand;
               //到达屏障点时,执行需要处理的业务
               if (command != null)
                   command.run();
               ranAction = true;
               //执行下一代屏障,在这儿会将条件等待队列的元素转到同步队列,并将之前阻塞的线程唤醒,
               nextGeneration();
               return 0;
           } finally {
           		//是否要打破栅栏代
               if (!ranAction)
                   breakBarrier();
           }
       }

       // loop until tripped, broken, interrupted, or timed out
       for (;;) {
           try {
               if (!timed)
               	   //在线程计数器count不等于0时,会走到Condition.await()
                   trip.await();
               else if (nanos > 0L)
                   nanos = trip.awaitNanos(nanos);
           } catch (InterruptedException ie) {
               if (g == generation && ! g.broken) {
                   breakBarrier();
                   throw ie;
               } else {
                   // We're about to finish waiting even if we had not
                   // been interrupted, so this interrupt is deemed to
                   // "belong" to subsequent execution.
                   Thread.currentThread().interrupt();
               }
           }

           if (g.broken)
               throw new BrokenBarrierException();

           if (g != generation)
               return index;

           if (timed && nanos <= 0L) {
               breakBarrier();
               throw new TimeoutException();
           }
       }
   } finally {
       lock.unlock();
   }
}

/** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //添加到条件等待队列中
    Node node = addConditionWaiter();
    //释放当前拿到的ReentrantLock锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //判断是否在同步队列上
    while (!isOnSyncQueue(node)) {
    	//不在的话,就阻塞当前队列
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

/** * Adds a new waiter to wait queue. * @return its new wait node */
private Node addConditionWaiter() {
	//第一次进来时,lastWaiter为Null
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    //如果最后一个节点不为空而且不是条件等待状态
    if (t != null && t.waitStatus != Node.CONDITION) {
    	//剔除已取消的等待队列,并重新给t复制lastWaiter
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    //新创建一个thread当前线程的Node
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    //lastWaiter为空的话,新增firstWaiter为当前节点;lastWaiter不为空的话,新增t.nextWaiter为当前节点;
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    //将lastWaiter指向当前节点
    lastWaiter = node;
    return node;
}

/** * Invokes release with current state value; returns saved state. * Cancels node and throws exception on failure. * @param node the condition node for this wait * @return previous sync state */
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
    	//拿到当前的锁状态标识state
        int savedState = getState();
        //释放锁
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

/** * Unlinks cancelled waiter nodes from condition queue. * Called only while holding lock. This is called when * cancellation occurred during condition wait, and upon * insertion of a new waiter when lastWaiter is seen to have * been cancelled. This method is needed to avoid garbage * retention in the absence of signals. So even though it may * require a full traversal, it comes into play only when * timeouts or cancellations occur in the absence of * signals. It traverses all nodes rather than stopping at a * particular target to unlink all pointers to garbage nodes * without requiring many re-traversals during cancellation * storms. */
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
        	//出队操作,由于条件等待队列是个单向链表,所以只需要将t.nextWaiter=null即可完成出队操作
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}

/** * Updates state on barrier trip and wakes up everyone. * Called only while holding lock. */
private void nextGeneration() {
    // signal completion of last generation
    //唤醒所有队列
    trip.signalAll();
    // set up next generation 
    //重置计数器
    count = parties;
    //新创建一个屏障
    generation = new Generation();
}

/** * Moves all threads from the wait queue for this condition to * the wait queue for the owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */
public final void signalAll() {
	//判断是否为当前独占线程
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}

/** * Removes and transfers all nodes. * @param first (non-null) the first node on condition queue */
private void doSignalAll(Node first) {
	//直接清空条件等待队列,设计精髓所在
    lastWaiter = firstWaiter = null;
    do {
    	//保存first的下一个节点
        Node next = first.nextWaiter;
        //first出单项链表
        first.nextWaiter = null;
        //转换到同步队列Sync并依次唤醒
        transferForSignal(first);
        //first指向下一个节点
        first = next;
    } while (first != null);
}

/** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) */
final boolean transferForSignal(Node node) {
    /* * If cannot change waitStatus, the node has been cancelled. */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */
    //当前节点入队同步队列Sync,并返回node的前驱节点
    Node p = enq(node);
    int ws = p.waitStatus;
    //CAS将node的前驱节点p的waitStatus修改为待唤醒
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */
private Node enq(final Node node) {
	//用for(;;)必须保证入队成功
    for (;;) {
    	//t指向队尾
        Node t = tail;
        //如果队尾为空,说明队列为空
        if (t == null) { // Must initialize
        	//CAS操作将head指向一个空Node,并将tail也指向空Node
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
        	//如果队尾不为空,则将当前节点的前驱指向队尾
            node.prev = t;
            //CAS操作将tail指向当前节点
            if (compareAndSetTail(t, node)) {
            	//由于是个双向链表,将t的后继节点指向当前节点
                t.next = node;
                return t;
            }
        }
    }
}

await()源码也跟着走了一遍,以下是我根据源码画了一些关键节点的流程图,便于理解。

3. 再来看一下我们的reset()方法,可以重置我们的计数器,并释放所有线程

/** * Resets the barrier to its initial state. If any parties are * currently waiting at the barrier, they will return with a * {@link BrokenBarrierException}. Note that resets <em>after</em> * a breakage has occurred for other reasons can be complicated to * carry out; threads need to re-synchronize in some other way, * and choose one to perform the reset. It may be preferable to * instead create a new barrier for subsequent use. */
public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    	 //打破栅栏
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

/** * Sets current barrier generation as broken and wakes up everyone. * Called only while holding lock. */
private void breakBarrier() {
   //将是否打破标识置为true
   generation.broken = true;
   //重置计数器
   count = parties;
   //重置后,会唤醒之前阻塞的所有的线程
   trip.signalAll();
}

到此为止,CyclicBarrier的两个重要的方法已分析完,其他的一些方法相对简单,就不在这儿分析了,大家有兴趣的可以去看下源码,也可以私信我进行探讨。

我们来总结一下:
说到底,底层为CyclicBarrier是基于ReentrantLock和Condition实现的。

CountDownLatch也可以实现一组线程等待至某个状态之后再全部同时执行的需求。
那到底CyclicBarrier与CountDownLatch有啥区别呢?

  1. CountDownLatch的计数器只能使用一次,而CyclicBarrier可以通过reset()使用多次.
  2. CountDownLatch会阻塞主线程,CyclicBarrier不会阻塞主线程,只会阻塞子线程。
  3. CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不 同。CountDownLatch一般用于一个或多个线程,等待其他线程执行完任务后,再执行。 CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行。
  4. CyclicBarrier 还可以提供一个 barrierAction,合并多线程计算结果。
  5. CyclicBarrier是通过ReentrantLock的"独占锁"和Conditon来实现一组线程的阻塞唤 醒的,而CountDownLatch则是通过AQS的“共享锁”实现。

CountDownLatch链接:暂无,后续补上。

相关文章