简介
Fork/Join框架是Java7提供了的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。
工作窃取算法
工作窃取算法(work-stealing)是指当一个队列所对应的线程先执行完队列中的所有任务后,从其他线程的队列里窃取一个任务来执行。为了减少竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
Fork/join的使用
ForkJoinTask
:我们要使用Fork/Join框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()
和join()
操作的机制,通常情况下我们不需要直接继承ForkJoinTask
类,而只需要继承它的子类,Fork/Join框架提供了以下两个子类:
RecursiveAction
:用于没有返回结果的任务。
RecursiveTask
:用于有返回结果的任务。
ForkJoinPool
:ForkJoinTask
需要通过ForkJoinPool
来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
接下来我们看一个问题:如何充分利用多核 CPU 计算很大 List 中所有整数的和?
这里就可以用到Fork/join框架将求和任务分成许多子任务来完成,再将子任务的计算结果相加即可,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| public class ForkJoinLargeListSum { public static void main(String[] args) { int[] array = new int[100000000]; for(int i = 0; i < array.length; i++){ array[i] = i + 1; } ForkJoinPool forkJoinPool = new ForkJoinPool(); long begintime = System.currentTimeMillis(); CountSumTask task = new CountSumTask(100000, 0, array.length-1, array); Future<Long> future = forkJoinPool.submit(task); try { System.out.println("计算结果为:" + future.get()); long endtime=System.currentTimeMillis(); System.out.println("耗时:" + (endtime - begintime) + "ms"); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }
static class CountSumTask extends RecursiveTask<Long> { private int threshold; private int hi, lo; private int[] array;
public CountSumTask(int threshold, int lo, int hi, int[] array) { this.threshold = threshold; this.hi = hi; this.lo = lo; this.array = array; }
@Override protected Long compute() { long sum = 0L;
boolean canCompute = (hi - lo) <= threshold; if(canCompute){ for(int i = lo; i <= hi; i++){ sum += array[i]; } } else{ int middle = lo + (hi - lo) / 2; CountSumTask leftTask = new CountSumTask(threshold, lo, middle, array); CountSumTask rightTask = new CountSumTask(threshold, middle+1, hi, array); leftTask.fork(); rightTask.fork(); long leftResult = leftTask.join(); long rightResult = rightTask.join(); sum = leftResult + rightResult; } return sum; } } }
|
此时输出:
1 2
| 计算结果为:5000000050000000 耗时:69ms
|
当我们调大阈值threshold
时,意味着分割任务的次数减少,直接计算的次数增多,此时计算的效率也有可能降低。例如,当把阈值增大为100000000时,输出结果为:
1 2
| 计算结果为:5000000050000000 耗时:110ms
|
参考资料