ForkJoinPool
就是JDK7提供的一种“分治算法”的多线程并行计算框架。Fork
意为分叉,Join
意为合并,一分一合,相互配合,形成分治算法。此外,也可以将ForkJoinPool
看作一个单机版的Map/Reduce
,多个线程并行计算。
相比于ThreadPoolExecutor
,ForkJoinPool
可以更好地实现计算的负载均衡,提高资源利用率。
假设有5个任务,在ThreadPoolExecutor
中有5个线程并行执行,其中一个任务的计算量很大,其余4个任务的计算量很小,这会导致1个线程很忙,其他4个线程则处于空闲状态。
利用ForkJoinPool
,可以把大的任务拆分成很多小任务,然后这些小任务被所有的线程执行,从而实现任务计算的负载均衡。
快排
快排有2个步骤:
- 利用数组的第1个元素把数组划分成两半,左边数组里面的元素小于或等于该元素,右边数组里面的元素比该元素大
- 对左右的两个子数组分别排序
左右两个子数组相互独立可以并行计算。利用ForkJoinPool
,代码如下:
package com.rubin.concurrent.forkjoinpool;
import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;
public class ForkJoinPoolDemo2 {
static class SortTask extends RecursiveAction {
final long[] array;
final int lo;
final int hi;
public SortTask(long[] array) {
this.array = array;
this.lo = 0;
this.hi = array.length - 1;
}
public SortTask(long[] array, int lo, int hi) {
this.array = array;
this.lo = lo;
this.hi = hi;
}
private int partition(long[] array, int lo, int hi) {
long x = array[hi];
int i = lo - 1;
for (int j = lo; j < hi; j++) {
if (array[j] <= x) {
i++;
swap(array, i, j);
}
}
swap(array, i + 1, hi);
return i + 1;
}
private void swap(long[] array, int i, int j) {
if (i != j) {
long temp = array[i];
array[i] = array[j];
array[j] = temp;
}
}
@Override
protected void compute() {
if (lo < hi) {
// 找到分区的元素下标
int pivot = partition(array, lo, hi);
// 将数组分为两部分
SortTask left = new SortTask(array, lo, pivot - 1);
SortTask right = new SortTask(array, pivot + 1, hi);
left.fork();
right.fork();
left.join();
right.join();
}
}
}
public static void main(String[] args) throws InterruptedException {
long[] array = {5, 3, 7, 9, 2, 4, 1, 8, 10};
// 一个任务
ForkJoinTask sort = new SortTask(array);
// 一个pool
ForkJoinPool pool = new ForkJoinPool();
// ForkJoinPool开启多个线程,同时执行上面的子任务
pool.submit(sort);
// 结束ForkJoinPool
pool.shutdown();
// 等待结束Pool
pool.awaitTermination(10, TimeUnit.SECONDS);
System.out.println(Arrays.toString(array));
}
}
求1到n个数的和
package com.rubin.concurrent.forkjoinpool;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
public class ForkJoinPoolDemo1 {
static class SumTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 10;
private long start;
private long end;
public SumTask(long n) {
this(1, n);
}
public SumTask(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
// 如果计算的范围在threshold之内,则直接进行计算
if ((end - start) <= THRESHOLD) {
for (long l = start; l <= end; l++) {
sum += l;
}
} else {
// 否则找出起始和结束的中间值,分割任务
long mid = (start + end) >>> 1;
SumTask left = new SumTask(start, mid);
SumTask right = new SumTask(mid + 1, end);
left.fork();
right.fork();
// 收集子任务计算结果
sum = left.join() + right.join();
}
return sum;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
SumTask sum = new SumTask(10);
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Long> future = pool.submit(sum);
Long aLong = future.get();
System.out.println(aLong);
pool.shutdown();
}
}
上面的代码用到了RecursiveAction
和RecursiveTask
两个类,它们都继承自抽象类ForkJoinTask
,用到了其中关键的接口 fork()
、join()
。二者的区别是一个有返回值,一个没有返回值。
RecursiveAction/RecursiveTask
类继承关系:

我们可以使用现有的接口灵活的完成工作中的各种场景的解决方案。至于该框架的实现原理比线程池要复杂,但是思想都一样,都是多线程并发的执行任务,关闭逻辑也一致,上述示例中也有优雅关闭的写法。
如果后续有精力的话,会详细补充该工具的使用逻辑。
本文的内容就到这里了。欢迎小伙伴们积极留言交流~~~
文章评论