任务拆分计算利器ForkJoin框架

1. 摘要

从JDK1.7开始,引入了一种新的Fork/Join线程池框架,它可以吧一个大任务拆分成多个小任务并行执行,最后汇总执行结果。

比如:当前要计算一个数组的和,最简单的办法就是用一个循环在一个线程中完成,但是当数组特别大的时候,这种执行效率比较差,例如下面的示例代码

1
2
3
4
5
6
7
8
9
public static void main(String[] args) {
long sum = 0;
long[] arr = new long[1024];
//省略赋值
for (long l : arr) {
sum += l;
}
System.out.println("结果" + sum);
}

还有一种办法就是将数组拆分,比如拆分成4个部分,用4个线程并行执行,分别激素啊,最后进行汇总,这样执行效率回显著提升。

如果拆分之后的部分还是很大,可以继续拆,知道满足最小颗粒度,再进行计算,这个过程可以反复”裂变” 成一系列小任务,这个就是Fork/Join的工作原理。

Fork/Join采用的是分而治之的基本思想,就是讲一个复杂的任务,按照规定的阈值划分成多个简单的小任务,然后将这些小任务的执行结果再进行汇总返回。得到最终的执行结果,

2. ForkJoin用法

计算2000个数字组成的数组的和

  1. 创建任务类,进行切分任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    public class SumTask extends RecursiveTask<Long> {

    /**
    * 最小任务数组最大容量
    */
    private static final int THRESHOLD = 500;

    private long[] array;
    private int start;

    private int end;

    public SumTask(long[] array, int start, int end) {
    this.array = array;
    this.start = start;
    this.end = end;
    }

    @Override
    protected Long compute() {
    //检查任务是否足够小,如果任务足够小,直接计算
    if (end - start <= THRESHOLD) {
    long sum = 0;
    for (int i = start; i < end; i++) {
    sum += this.array[i];
    }
    return sum;
    }
    //任务太大,一分为二
    int middle = (end + start) / 2;
    //拆分执行
    SumTask left = new SumTask(this.array, start, middle);
    left.fork();
    SumTask right = new SumTask(this.array, middle, end);
    right.fork();
    System.out.println("left数组区间:" + start + "," + middle +";right数组区间:" + middle + "," + end);
    //汇总结果
    return left.join() + right.join();

    }
    }
  2. 计算

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public class ForkJoinDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
    long sum = 0;
    //创建数组并赋值
    long[] arr = new long[2000];
    for (int i = 0; i < 2000; i++) {
    arr[i] = i;
    }
    for (long l : arr) {
    sum += l;
    }
    System.out.println("for循环汇总计算:" + sum+"; 耗时:"+stopWatch.getTotalTimeMillis());


    //fork/join汇总计算的值
    ForkJoinPool joinPool = new ForkJoinPool();
    ForkJoinTask<Long> taskFuture = joinPool.submit(new SumTask(arr, 0, arr.length));
    stopWatch.stop();
    System.out.println("fork/join汇总计算的值:"+ taskFuture.get());
    }
    }
  3. 打印结果

    1
    2
    3
    4
    5
    for循环汇总计算:1999000
    left数组区间:0,1000;right数组区间:1000,2000
    left数组区间:0,500;right数组区间:500,1000
    left数组区间:1000,1500;right数组区间:1500,2000
    fork/join汇总计算的值:1999000

    因为最小任务数组容量设置为500,所以Fork/Join对数组进行了三次拆分:

    • 0~2000拆分成0~10001000~2000
    • 0~1000拆分成0~500500~1000
    • 1000~2000拆分成1000~15001000~2000

    当数组量越大的时候,采用Fork/Join这种方式来计算,执行效率优势非常明显

3. Fork/Join框架的原理

从上面的用例可以看出,fork/Join框架使用包含两个核心类ForkJoinPoolForkJoinTask,它们之间的分工如下:

  • ForkJoinPool:是一个负责执行任务的线程池,内部使用了一个无限队列来保存需要的执行的任务,而执行任务的线程数量则是通过构造函数传入,如果没有传入注定的线程数量,则默认使用当前计算机可用的CPU核心量
  • ForkJoinTask:是一个负责任务的拆分和合并计算结果的抽象类,通过它可以完成将大任务分解成多个小任务计算,最后将各个任务执行结果进行汇总处理

正如上文所说,Fork/Join框架采用的是分而治之的思想,会讲一个大的任务进行分解,按照设定的阈值分解成多个小任务计算,最后将各个计算结果进行汇总。它的应用场景非常多,比如:大整数乘法、二分法搜索、大数组快速排序等。

3.1 ForkJoinPool

ForkJoinPoolFork/Join框架中负责任务执行的县城吃,核心构造方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
//默认无参构造方法
public ForkJoinPool() {
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
  • int parallelism: 取得是当前计算机可用的CPU数量

  • ForkJoinWorkerThreadFactory factory: 采用的是默认DefaultForkJoinWorkerThreadFactory类,其中ForkJoinWorkerThreadFork/Join框架中负责真正执行任务的线程

    1
    defaultForkJoinWorkerThreadFactory = new DefaultForkJoinWorkerThreadFactory();
  • boolean asyncMode: 参数设置的是false,也就是说存在队列的任务采用的是先进后出的工作方式

其次,也可以使用Executors工具类来创建ForkJoinPool,例如下面这种方式:

1
ExecutorService executorService = Executors.newWorkStealingPool();

ThreadPoolExecutor县城吃一样,ForkJoinPool也实现了ExecutorExecutorService接口,支持通过execute()submit()等方式提交任务

不过需要注意的地方是ForkJoinPool线程池和ThreadPoolExecutor线程池两个实现的原理是不一样的,最明显的区别在于:

  • ThreadPoolExecutor

    • 多个线程都共有一个阻塞任务队列
    • 线程无法向任务队列在添加一个任务并在等待该任务完成之后在继续执行
  • ForkJoinPool

    • 每一个线程都有自己的任务队列,当线程发现自己的队列里面没有任务,就会到别的线程的队列里面获取任务执行,这样设计的目的主要是充分利用线程实现并行计算的效果,减少线程之间的竞争
    • 它能够让其中的线程创建新的任务添加到队列中,并挂起当前的任务,此时线程继续从队列中选择子任务执行。

比如线程 A 负责处理队列 A 里面的任务,线程 B 负责处理队列 B 里面的任务,两者如果队列里面的任务数差不多,执行的时候互相不干扰,此时的计算性能是最佳的;假如线程 A 的任务执行完毕,发现线程 B 中的队列数还有一半没有执行,线程 A 会主动从线程 B 的队列里获取任务执行

Fork/Join框架中负责执行任务的线程ForkJoinWorkerThread部分源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ForkJoinWorkerThread extends Thread {
//所在线程池
final ForkJoinPool pool; // the pool this thread works in
//当前线程下的任务队列
final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics

/** An AccessControlContext supporting no privileges */
private static final AccessControlContext INNOCUOUS_ACC =
new AccessControlContext(
new ProtectionDomain[]{new ProtectionDomain(null, null)});

/**
* Creates a ForkJoinWorkerThread operating in the given pool.
*
* @param pool the pool this thread works in
* @throws NullPointerException if pool is null
*/
protected ForkJoinWorkerThread(ForkJoinPool pool) {
// Use a placeholder until a useful name can be set in registerWorker
super("aForkJoinWorkerThread");
this.pool = pool;
this.workQueue = pool.registerWorker(this);
}
}

3.2 ForkJoinTask

ForkJoinTaskFork/Join框架中负责任务分解和合并计算的抽象类,它实现了Future接口,因此可以直接作为任务类提交到线程池中。

同时,它还包括了两个主要方法:fork()join(),分别表示任务的拆分与合并。

ForkJoinTask部分源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
//将任务推动到任务队列
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread) t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}

//等待任务执行结果
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
}

在JDK中,ForkJoinTask有三个常用的子类实现,分别如下:

  • RecursiveAction:用于没有返回结果的任务

  • RecursiveTask: 用于有返回结果的任务

  • CountedCompleter: 在任务执行完成后,触发自定义的钩子函数

ForkJoinTask其实利用了递归算法来实现任务的拆分,将拆分后的子任务提交到线程池的任务队列中进行执行,最后将各个拆分后的任务计算结果进行汇总,得到最终的任务结果

4. 总结

整体上,ForkJoinPool可以看成是对ThreadPoolExecutor线程池的一种补充,在工作线程中存放了任务队列,充分利用线程进行并行计算,进一步提升了线程的迸发执行性能。

通过ForkJoinPoolForkJoinTask搭配使用,将大计算任拆分成多个互补干扰的小任务,提交给线程池进行计算,最后将计算结果进行汇总处理,得到跟单线程执行一致的结果,当计算任务越大Fork/Join框架执行任务的效率越明显。

但并不是所有的任务都适合采用Fork/Join框架来处理,比如读写数据文件这种IO密集型的任务就不适合,因为磁盘IO、网络IO的操作特点就是等待,容易造成线程阻塞。

5. 参考

1.https://www.liaoxuefeng.com/wiki/1252599548343744/1306581226487842