并发工具类之 CyclicBarrier

x33g5p2x  于2021-12-18 转载在 其他  
字(3.0k)|赞(0)|评价(0)|浏览(327)

CyclicBarrier 使用解析

CyclicBarrier 字面意思是可循环(Cyclic)的屏障(Barrier),它要做的事情是让一组线程到达屏障时被阻塞,直到最后一个线程也到达屏障,屏障才会开门,所有被屏障拦截的线程才会继续运行。

CyclicBarrier 有两个构造方法,我们先来看第一个:

CyclicBarrier(int parties)

参数 parties 表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达屏障,然后当前线程被阻塞,测试代码如下:

public class TestCyclicBarrier {
    static CyclicBarrier c = new CyclicBarrier(2);
    
    public static void main(String[] args) {
        new Thread(() -> {
            try {
                c.await();
                System.out.println(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }).start();
        try {
            c.await();
            System.out.println(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

由于子线程和主线程的调度是由 CPU 决定的,所以两个线程都有可能先执行,多次执行代码后会控制台会有两种不同的输出:

// 第一种输出
1
2
// 第二种输出
2
1

如果把上面代码中的 new CyclicBarrier(2) 改为 new CyclicBarrier(3),则主线程和子线程会永远等待,因为没有第三个线程执行 await 方法,所以前两个线程会一直在屏障处等待。

接下来我们来看第二个构造器:

CyclicBarrier(int parties, Runnable barrierAction)

这个构造器用于在线程到达屏障时,优先执行 barrierAction,在 barrierAction 执行完之前等待在屏障处的线程是不会往下执行的,执行 barrierAction 这个操作由最后一个到达屏障的线程执行,但这并不代表最后一个到达屏障的线程会比其他线程先执行完,参考下图:

测试代码如下:

public class TestCyclicBarrier {
    static CyclicBarrier c = new CyclicBarrier(2, () -> {
        System.out.println(Thread.currentThread().getName());
    });

    public static void main(String[] args) {
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
                c.await();
                System.out.println("我是子线程");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }).start();
        try {
            TimeUnit.SECONDS.sleep(1);
            c.await();
            System.out.println("我是主线程");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

控制台输出如下(多次执行结果会不同):

Thread-0
我是子线程
我是主线程

CyclicBarrier 应用场景

CyclicBarrier 可用于多线程计算数据,最后合并计算结果的场景。例如,一个 Excel 保存了用户所有银行流水,每个 sheet 保存了一个账户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个 sheet 里的银行流水,都执行完之后,得到每个 sheet 的日均银行流水,最后再通过 barrierAction 用这些线程的计算结果,计算出整个 Excel 的日均银行流水,代码如下图所示:

public class BankWaterService implements Runnable {
    // 假设只有 4 个 sheet
    private CyclicBarrier c = new CyclicBarrier(4, this);
    private Executor executor = Executors.newFixedThreadPool(4);
    // 保存每个 sheet 计算出的日均银行流水结果
    private ConcurrentHashMap<String, Integer> count = new ConcurrentHashMap<String, Integer>();

    @Override
    public void run() {
        int result = 0;
        for(Map.Entry<String, Integer> sheet : count.entrySet()) {
            result += sheet.getValue();
        }
        System.out.println(result);
    }

    private void count() {
        for (int i = 0; i < 4; i++) {
            executor.execute(() -> {
                try {
                    // 忽略日均银流计算过程,直接输入计算结果
                    count.put(Thread.currentThread().getName(), 100);
                    System.out.println("线程 " + Thread.currentThread().getName() + " 计算完成");
                    c.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
        }
    }

    public static void main(String[] args) {
        BankWaterService bankWaterService = new BankWaterService();
        bankWaterService.count();
    }
}

控制台输出如下:

线程 pool-1-thread-1 计算完成
线程 pool-1-thread-3 计算完成
线程 pool-1-thread-4 计算完成
线程 pool-1-thread-2 计算完成
400

CyclicBarrier VS CountDownLatch

它们之间最大的区别在于:CountDownLatch 只能拦截一轮,而 CyclicBarrier 可以实现循环拦截

此外,CountDownLatch 还提供了 reset()、getNumberWaiting()、isBroken() 等比较有用的方法。

推荐一篇非常不错的文章:深入理解CyclicBarrier原理

相关文章