最近又在重读CyclicBarrier源码,并进行了深入分析,重点源码也自己跟过并做了一些注释,仅供大家参考。
CyclicBarrier:回环栅栏(有人也称之为循环屏障),通过他可以让一组线程等待至某个状态(屏障点)之后再全部同时执行,同时他还有一个特点,所有线程都被释放了以后,CyclicBarrier还可以被重用。
废话不多说,一切以实践为主,以下是我写的一个例子,供大家理解。
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, ()->{System.out.println("人满发车了");});
System.out.println("一车3人,车满发车");
for (int i = 0; i < 10; i++) {
int finalI = i;
new Thread(()->{
System.out.println("第"+ finalI +"个人上车了");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
再来看一下执行结果:
从结果可以看出来,车上每上3个人,就会人满发车,最后第9个人上车了,但是由于不满3个人,所以线程一直没发车,直到等到3个人满时才会发车。
1. 先看下CyclicBarrier的结构有哪些方法和属性
//栅栏代
private static class Generation {
//是否要打破
boolean broken = false;
}
//资源独占锁
private final ReentrantLock lock = new ReentrantLock();
//条件等待队列
private final Condition trip = lock.newCondition();
//拦截的线程数
private final int parties;
//到达屏障点(换代前),执行的方法
private final Runnable barrierCommand;
//栅栏代
private Generation generation = new Generation();
//线程计数器
private int count;
CyclicBarrier提供了2个构造方法。
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
//标识屏障拦截的线程数量,相当于一个副本,作用是在达到屏障后进行重置count操作
this.parties = parties;
//线程计数器
this.count = parties;
//达到屏障点后执行的方法
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
2. 接着我们分析一下CyclicBarrier的await()方法
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException
//实际用的是一把独占锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
//线程是否中断,如果中断的话,需要打破栅栏,并抛出中断异常
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
//到达屏障点时,执行需要处理的业务
if (command != null)
command.run();
ranAction = true;
//执行下一代屏障,在这儿会将条件等待队列的元素转到同步队列,并将之前阻塞的线程唤醒,
nextGeneration();
return 0;
} finally {
//是否要打破栅栏代
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
//在线程计数器count不等于0时,会走到Condition.await()
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
/** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//添加到条件等待队列中
Node node = addConditionWaiter();
//释放当前拿到的ReentrantLock锁
int savedState = fullyRelease(node);
int interruptMode = 0;
//判断是否在同步队列上
while (!isOnSyncQueue(node)) {
//不在的话,就阻塞当前队列
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
/** * Adds a new waiter to wait queue. * @return its new wait node */
private Node addConditionWaiter() {
//第一次进来时,lastWaiter为Null
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
//如果最后一个节点不为空而且不是条件等待状态
if (t != null && t.waitStatus != Node.CONDITION) {
//剔除已取消的等待队列,并重新给t复制lastWaiter
unlinkCancelledWaiters();
t = lastWaiter;
}
//新创建一个thread当前线程的Node
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//lastWaiter为空的话,新增firstWaiter为当前节点;lastWaiter不为空的话,新增t.nextWaiter为当前节点;
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
//将lastWaiter指向当前节点
lastWaiter = node;
return node;
}
/** * Invokes release with current state value; returns saved state. * Cancels node and throws exception on failure. * @param node the condition node for this wait * @return previous sync state */
final int fullyRelease(Node node) {
boolean failed = true;
try {
//拿到当前的锁状态标识state
int savedState = getState();
//释放锁
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
/** * Unlinks cancelled waiter nodes from condition queue. * Called only while holding lock. This is called when * cancellation occurred during condition wait, and upon * insertion of a new waiter when lastWaiter is seen to have * been cancelled. This method is needed to avoid garbage * retention in the absence of signals. So even though it may * require a full traversal, it comes into play only when * timeouts or cancellations occur in the absence of * signals. It traverses all nodes rather than stopping at a * particular target to unlink all pointers to garbage nodes * without requiring many re-traversals during cancellation * storms. */
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
//出队操作,由于条件等待队列是个单向链表,所以只需要将t.nextWaiter=null即可完成出队操作
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
/** * Updates state on barrier trip and wakes up everyone. * Called only while holding lock. */
private void nextGeneration() {
// signal completion of last generation
//唤醒所有队列
trip.signalAll();
// set up next generation
//重置计数器
count = parties;
//新创建一个屏障
generation = new Generation();
}
/** * Moves all threads from the wait queue for this condition to * the wait queue for the owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */
public final void signalAll() {
//判断是否为当前独占线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
/** * Removes and transfers all nodes. * @param first (non-null) the first node on condition queue */
private void doSignalAll(Node first) {
//直接清空条件等待队列,设计精髓所在
lastWaiter = firstWaiter = null;
do {
//保存first的下一个节点
Node next = first.nextWaiter;
//first出单项链表
first.nextWaiter = null;
//转换到同步队列Sync并依次唤醒
transferForSignal(first);
//first指向下一个节点
first = next;
} while (first != null);
}
/** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) */
final boolean transferForSignal(Node node) {
/* * If cannot change waitStatus, the node has been cancelled. */
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */
//当前节点入队同步队列Sync,并返回node的前驱节点
Node p = enq(node);
int ws = p.waitStatus;
//CAS将node的前驱节点p的waitStatus修改为待唤醒
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */
private Node enq(final Node node) {
//用for(;;)必须保证入队成功
for (;;) {
//t指向队尾
Node t = tail;
//如果队尾为空,说明队列为空
if (t == null) { // Must initialize
//CAS操作将head指向一个空Node,并将tail也指向空Node
if (compareAndSetHead(new Node()))
tail = head;
} else {
//如果队尾不为空,则将当前节点的前驱指向队尾
node.prev = t;
//CAS操作将tail指向当前节点
if (compareAndSetTail(t, node)) {
//由于是个双向链表,将t的后继节点指向当前节点
t.next = node;
return t;
}
}
}
}
await()源码也跟着走了一遍,以下是我根据源码画了一些关键节点的流程图,便于理解。
3. 再来看一下我们的reset()方法,可以重置我们的计数器,并释放所有线程
/** * Resets the barrier to its initial state. If any parties are * currently waiting at the barrier, they will return with a * {@link BrokenBarrierException}. Note that resets <em>after</em> * a breakage has occurred for other reasons can be complicated to * carry out; threads need to re-synchronize in some other way, * and choose one to perform the reset. It may be preferable to * instead create a new barrier for subsequent use. */
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//打破栅栏
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
/** * Sets current barrier generation as broken and wakes up everyone. * Called only while holding lock. */
private void breakBarrier() {
//将是否打破标识置为true
generation.broken = true;
//重置计数器
count = parties;
//重置后,会唤醒之前阻塞的所有的线程
trip.signalAll();
}
到此为止,CyclicBarrier的两个重要的方法已分析完,其他的一些方法相对简单,就不在这儿分析了,大家有兴趣的可以去看下源码,也可以私信我进行探讨。
我们来总结一下:
说到底,底层为CyclicBarrier是基于ReentrantLock和Condition实现的。
CountDownLatch也可以实现一组线程等待至某个状态之后再全部同时执行的需求。
那到底CyclicBarrier与CountDownLatch有啥区别呢?
CountDownLatch链接:暂无,后续补上。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/qq_25667339/article/details/121453302
内容来源于网络,如有侵权,请联系作者删除!