Java线程(十一):Fork/Join-Java并行计算框架

x33g5p2x  于2021-03-13 发布在 Java  
字(2.3k)|赞(0)|评价(0)|浏览(564)

并行计算在处处都有大数据的今天已经不是一个新鲜的词汇了,现在已经有单机多核甚至多机集群并行计算,注意,这里说的是并行,而不是并发。严格的将,并行是指系统内有多个任务同时执行,而并发是指系统内有多个任务同时存在,不同的任务按时间分片的方式切换执行,由于切换的时间很短,给人的感觉好像是在同时执行。 
Java在JDK7之后加入了并行计算的框架Fork/Join,可以解决我们系统中大数据计算的性能问题。Fork/Join采用的是分治法,Fork是将一个大任务拆分成若干个子任务,子任务分别去计算,而Join是获取到子任务的计算结果,然后合并,这个是递归的过程。子任务被分配到不同的核上执行时,效率最高。伪代码如下:

  1. Result solve(Problem problem) {
  2. if (problem is small)
  3. directly solve problem
  4. else {
  5. split problem into independent parts
  6. fork new subtasks to solve each part
  7. join all subtasks
  8. compose result from subresults
  9. }
  10. }

Fork/Join框架的核心类是ForkJoinPool,它能够接收一个ForkJoinTask,并得到计算结果。ForkJoinTask有两个子类,RecursiveTask(有返回值)和RecursiveAction(无返回结果),我们自己定义任务时,只需选择这两个类继承即可。类图如下: 
 
下面来看一个实例:计算一个超大数组所有元素的和。代码如下:

  1. import java.util.Arrays;
  2. import java.util.Random;
  3. import java.util.concurrent.ExecutionException;
  4. import java.util.concurrent.ForkJoinPool;
  5. import java.util.concurrent.RecursiveTask;
  6. /**
  7. * @author: shuang.gao Date: 2015/7/14 Time: 8:16
  8. */
  9. public class SumTask extends RecursiveTask<Integer> {
  10. private static final long serialVersionUID = -6196480027075657316L;
  11. private static final int THRESHOLD = 500000;
  12. private long[] array;
  13. private int low;
  14. private int high;
  15. public SumTask(long[] array, int low, int high) {
  16. this.array = array;
  17. this.low = low;
  18. this.high = high;
  19. }
  20. @Override
  21. protected Integer compute() {
  22. int sum = 0;
  23. if (high - low <= THRESHOLD) {
  24. // 小于阈值则直接计算
  25. for (int i = low; i < high; i++) {
  26. sum += array[i];
  27. }
  28. } else {
  29. // 1\. 一个大任务分割成两个子任务
  30. int mid = (low + high) >>> 1;
  31. SumTask left = new SumTask(array, low, mid);
  32. SumTask right = new SumTask(array, mid + 1, high);
  33. // 2\. 分别计算
  34. left.fork();
  35. right.fork();
  36. // 3\. 合并结果
  37. sum = left.join() + right.join();
  38. }
  39. return sum;
  40. }
  41. public static void main(String[] args) throws ExecutionException, InterruptedException {
  42. long[] array = genArray(1000000);
  43. System.out.println(Arrays.toString(array));
  44. // 1\. 创建任务
  45. SumTask sumTask = new SumTask(array, 0, array.length - 1);
  46. long begin = System.currentTimeMillis();
  47. // 2\. 创建线程池
  48. ForkJoinPool forkJoinPool = new ForkJoinPool();
  49. // 3\. 提交任务到线程池
  50. forkJoinPool.submit(sumTask);
  51. // 4\. 获取结果
  52. Integer result = sumTask.get();
  53. long end = System.currentTimeMillis();
  54. System.out.println(String.format("结果 %s 耗时 %sms", result, end - begin));
  55. }
  56. private static long[] genArray(int size) {
  57. long[] array = new long[size];
  58. for (int i = 0; i < size; i++) {
  59. array[i] = new Random().nextLong();
  60. }
  61. return array;
  62. }
  63. }

我们通过调整阈值(THRESHOLD),可以发现耗时是不一样的。实际应用中,如果需要分割的任务大小是固定的,可以经过测试,得到最佳阈值;如果大小不是固定的,就需要设计一个可伸缩的算法,来动态计算出阈值。如果子任务很多,效率并不一定会高。 
未完待续。。。

相关文章