文章22 | 阅读 8456 | 点赞0
CyclicBarrier 字面意思是可循环(Cyclic)的屏障(Barrier),它要做的事情是让一组线程到达屏障时被阻塞,直到最后一个线程也到达屏障,屏障才会开门,所有被屏障拦截的线程才会继续运行。
CyclicBarrier 有两个构造方法,我们先来看第一个:
CyclicBarrier(int parties)
参数 parties
表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达屏障,然后当前线程被阻塞,测试代码如下:
public class TestCyclicBarrier {
static CyclicBarrier c = new CyclicBarrier(2);
public static void main(String[] args) {
new Thread(() -> {
try {
c.await();
System.out.println(1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
try {
c.await();
System.out.println(2);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
由于子线程和主线程的调度是由 CPU 决定的,所以两个线程都有可能先执行,多次执行代码后会控制台会有两种不同的输出:
// 第一种输出
1
2
// 第二种输出
2
1
如果把上面代码中的 new CyclicBarrier(2)
改为 new CyclicBarrier(3)
,则主线程和子线程会永远等待,因为没有第三个线程执行 await 方法,所以前两个线程会一直在屏障处等待。
接下来我们来看第二个构造器:
CyclicBarrier(int parties, Runnable barrierAction)
这个构造器用于在线程到达屏障时,优先执行 barrierAction,在 barrierAction 执行完之前等待在屏障处的线程是不会往下执行的,执行 barrierAction 这个操作由最后一个到达屏障的线程执行,但这并不代表最后一个到达屏障的线程会比其他线程先执行完,参考下图:
测试代码如下:
public class TestCyclicBarrier {
static CyclicBarrier c = new CyclicBarrier(2, () -> {
System.out.println(Thread.currentThread().getName());
});
public static void main(String[] args) {
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
c.await();
System.out.println("我是子线程");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
try {
TimeUnit.SECONDS.sleep(1);
c.await();
System.out.println("我是主线程");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
控制台输出如下(多次执行结果会不同):
Thread-0
我是子线程
我是主线程
CyclicBarrier 可用于多线程计算数据,最后合并计算结果的场景。例如,一个 Excel 保存了用户所有银行流水,每个 sheet 保存了一个账户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个 sheet 里的银行流水,都执行完之后,得到每个 sheet 的日均银行流水,最后再通过 barrierAction 用这些线程的计算结果,计算出整个 Excel 的日均银行流水,代码如下图所示:
public class BankWaterService implements Runnable {
// 假设只有 4 个 sheet
private CyclicBarrier c = new CyclicBarrier(4, this);
private Executor executor = Executors.newFixedThreadPool(4);
// 保存每个 sheet 计算出的日均银行流水结果
private ConcurrentHashMap<String, Integer> count = new ConcurrentHashMap<String, Integer>();
@Override
public void run() {
int result = 0;
for(Map.Entry<String, Integer> sheet : count.entrySet()) {
result += sheet.getValue();
}
System.out.println(result);
}
private void count() {
for (int i = 0; i < 4; i++) {
executor.execute(() -> {
try {
// 忽略日均银流计算过程,直接输入计算结果
count.put(Thread.currentThread().getName(), 100);
System.out.println("线程 " + Thread.currentThread().getName() + " 计算完成");
c.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
});
}
}
public static void main(String[] args) {
BankWaterService bankWaterService = new BankWaterService();
bankWaterService.count();
}
}
控制台输出如下:
线程 pool-1-thread-1 计算完成
线程 pool-1-thread-3 计算完成
线程 pool-1-thread-4 计算完成
线程 pool-1-thread-2 计算完成
400
它们之间最大的区别在于:CountDownLatch 只能拦截一轮,而 CyclicBarrier 可以实现循环拦截。
此外,CountDownLatch 还提供了 reset()、getNumberWaiting()、isBroken() 等比较有用的方法。
推荐一篇非常不错的文章:深入理解CyclicBarrier原理
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_41685207/article/details/111768583
内容来源于网络,如有侵权,请联系作者删除!