Java 多线程编程之八 Fork/Join 框架使用

大纲

前言

在 Java 中,Fork/Join 框架(分支合并框架)是一种用于并行处理任务的强大工具,特别适用于那些可以递归地分解成更小任务的场景(如下图所示)。Fork/Join 框架基于 “工作窃取” 算法,允许空闲的线程从那些繁忙的线程那里窃取任务,以提高 CPU 的使用效率和程序的执行性能。

Fork/Join 框架的介绍

Fork/Join 框架的定义

Fork/Join 框架的基本思想是 “分而治之”,将大任务拆分(Fork)为若干个小任务(拆到不可再拆为止),这些小任务可以并行执行,然后再将一个个小任务的执行结果合并(Join)起来得到最终结果。

Fork/Join 框架的工作流程

Fork/Join 框架的工作流程如下:

  • (1) 任务被分解成更小的子任务。
  • (2) 子任务提交到 ForkJoinPool 中执行。
  • (3) ForkJoinPool 中的工作线程(ForkJoinWorkerThread)负责执行子任务。
  • (4) 子任务的结果合并成最终结果。

Fork/Join 框架的工作窃取

工作窃取(Work-Stealing)是一种负载均衡算法,用于在多线程环境下高效地利用系统资源。其基本思想是:如果某个线程完成了它的任务并且变得空闲,而其他线程仍然有未完成的任务,那么空闲的线程会从其他忙碌的线程那里窃取任务来执行。这样可以避免某些线程闲置而其他线程繁忙的情况,从而提高整个系统的吞吐量。

工作窃取算法的实现

在 Fork/Join 框架中,每个线程都有一个双端队列(Deque)来存储需要执行的任务:

  • (1) 任务分解:当一个任务需要被执行时,它首先会被分解成若干子任务(通过调用 fork() 方法),这些子任务会被压入当前线程的任务队列的头部。
  • (2) 任务执行:线程会不断地从自己队列的尾部弹出任务并执行(通过调用 join() 方法)。如果一个线程完成了自己队列中的所有任务,它就会去 “窃取” 其他线程队列头部的任务来执行。
  • (3) 任务窃取:窃取任务时,线程通常会从其他线程的队列头部取任务,而非从尾部取任务。这种设计是工作窃取算法的一部分,它有助于最大限度地减少线程之间的锁竞争,可以提升并行任务处理的效率。

工作窃取算法的优点

  • (1) 负载均衡:通过让空闲线程窃取任务,工作窃取算法实现了任务的负载均衡,避免某些线程长时间空闲。
  • (2) 局部性:线程通常优先执行自己队列中的任务,这有助于提高数据局部性,减少缓存失效。
  • (3) 弹性和灵活性:线程数可以动态调整,系统可以自动适应当前的任务负载。

工作窃取算法的设计

为什么工作线程(ForkJoinWorkerThread)正常获取任务时会从工作队列(WorkQueue)的尾部取,而窃取任务时则从工作队列的头部取呢?这样的设计主要是出于以下几点考虑:

  • 减少锁争用:在 Fork/Join 框架中,工作线程会频繁地添加和取出任务。为了提高效率,队列通常是双端队列(Deque)。从尾部添加和取出任务通常不需要加锁,因为通常只有一个线程在操作自己的工作队列。然而,如果线程从队列头部取任务,可能会导致与其他窃取线程的竞争,从而增加锁争用和同步开销。
  • 任务执行的局部性:Fork/Join 框架鼓励对任务进行分解,并让产生任务的线程尽快执行这些任务。这种局部性策略通过让线程处理它自己刚生成的任务(即尾部的任务),这样可以更好地利用 CPU 缓存,从而提高执行效率。这是因为新生成的任务往往与生成它们的任务共享数据或资源,保持这些任务的局部性可以提高缓存命中率。
  • 工作窃取算法的有效性:工作窃取算法允许线程从其他线程的工作队列中窃取任务,以保持系统的负载均衡。窃取线程从头部窃取任务,工作线程从尾部取任务,这种方式可以最大化任务的处理并减少同步开销。因为通常窃取的任务是队列中最早生成的任务,这些任务可能更耗时,这也有助于保持负载的均衡。

Fork/Join 框架的类继承关系

Fork/Join 框架的类结构

Fork/Join 框架主要包含两个核心部分:

  • ForkJoinPool:一个特殊的线程池,用于管理 ForkJoinTask 的执行。
  • ForkJoinTask:一个抽象类,表示可以并行执行的任务。ForkJoinTask 有两个子抽象类:
    • RecursiveAction:用于没有返回结果的任务(类似 Runnable),可以实现递归调用任务。
    • RecursiveTask:用于有返回结果的任务(类似 Callable),可以实现递归调用任务。

Fork/Join 框架的类关系

  • ForkJoinPool 是一个特殊的线程池,实现了 Executor 接口,其构造方法中常用参数的介绍如下:

    • parallelism:并行的线程数,也就是在没有额外任务提交的情况下,最大的并行级别。表示要维护的并行操作的目标数量,对于默认值(等于可用处理器的数量)可以通过 Runtime.getRuntime().availableProcessors() 获取到。
    • factory:生成 ForkJoinWorkerThread 线程的工厂类,此工厂在创建新线程时会使用到。
    • handler:处理未捕获的异常的处理器,它允许在任何 Worker 线程意外终止时进行处理。
    • asyncMode:此选项的默认值为 false,可以设置为 true,以便采用 “工作窃取(Work-Stealing)” 模式。在这种模式下,当线程的待执行任务队列为空时,该线程会尝试窃取其他线程的任务来执行。如果设置为 false,则会使用 “FIFO(先进先出)” 模式,这也被称为 “队列模式”。在 FIFO 模式下,任务是以提交的顺序执行的,也就是比较早提交的任务会被比较早地执行。通常情况下,为了获取更好的性能一般都使用 “工作窃取” 模式。
  • ForkJoinTask 本身是个抽象类,实现了 Future 和 Serializable 接口,核心方法有:

    • fork():安排异步执行此任务。
    • join():返回执行结果或者抛出一个异常(如果任务没有正常完成)。
    • invoke():开始执行任务,并等待任务完成。
    • isCompletedAbnormally():返回任务是否异常结束。
    • getException():返回任务抛出的异常。
    • cancel():取消任务。
  • RecursiveAction:这是没有返回结果的任务类,支持递归执行任务,使用它需要实现其抽象方法 compute() 来执行具体的任务,但不返回任何结果。

  • RecursiveTask:这是带有返回结果的任务类,支持递归执行任务,使用它需要实现其抽象方法 compute() 来执行具体的任务,会返回执行结果。

Fork/Join 框架的使用注意事项

  • 避免任务之间的相互依赖,以防止线程死锁。
  • 任务的分解粒度需要合理考虑,分解粒度过大或过小都会影响性能。
  • 合理配置 ForkJoinPool 的线程数量,以充分利用多核 CPU 的计算能力。

Fork/Join 框架与线程池的区别

  • Fork/Join 框架采用了 “工作窃取(Work-Stealing)” 算法,也就是当执行新的任务时,它可以将其拆分成更小的任务,并将小任务添加到线程自己的队列中。当线程执行完自己队列中的所有任务后,它就会去 “窃取” 其他线程队列尾部的任务并放在自己的队列中,然后再执行 “窃取” 得到的任务。
  • 相对于传统的线程池实现,Fork/Join 框架的优势体现在对其中包含的任务的处理方式上。在传统的线程池中,如果一个线程正在执行的任务由于某些原因无法继续运行(比如等待锁资源),那么该线程会处于等待状态;而在 ForF/Join 框架的实现中,如果某个子任务由于等待另外一个子任务的完成而无法继续运行,那么处理该子任务的线程会主动寻找其他尚未执行的子任务来执行。这种方式减少了线程的等待时间,提高了整体的性能。

Fork/Join 框架的使用案例

使用 Fork/Join 框架,首先需要创建一个 ForkJoinTask 任务,该类提供了在任务中执行 Fork 和 Join 的机制。在一般情况下,不需要直接继承 ForkJoinTask 类,只需要继承它的子类 RecursiveAction 或者 RecursiveTask 即可。

使用案例一

利用 Fork/Join 框架计算 1 + 2 + 3 + … + 100000000 的和。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class ForkJoinTest {

public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinSumCalculate(0, 100000000L);
// 调用 invoke 方法将任务提交到线程池,会阻塞直到任务完成
Long sum = pool.invoke(task);
System.out.println("sum = " + sum);
}

}

class ForkJoinSumCalculate extends RecursiveTask<Long> {

private long start;
private long end;
private static final long THURSHOLD = 100000L; // 阀值

public ForkJoinSumCalculate(long start, long end) {
this.start = start;
this.end = end;
}

@Override
protected Long compute() {
long length = end - start;
// 当任务的大小小于或等于阈值时,直接执行计算,否则将任务一分为二,递归地创建子任务并进行计算
if (length <= THURSHOLD) {
// 执行小任务
long sum = 0L;
for (long i = start; i <= end; i++) {
sum += i;
}
return sum;
} else {
long middle = (start + end) / 2;

// 拆分大任务为小任务,并将小任务压入线程队列,等待异步执行
ForkJoinSumCalculate left = new ForkJoinSumCalculate(start, middle);
left.fork();

// 拆分大任务为小任务,并将小任务压入线程队列,等待异步执行
ForkJoinSumCalculate right = new ForkJoinSumCalculate(middle + 1, end);
right.fork();

// 合并多个小任务的执行结果
return left.join() + right.join();
}
}

}

程序执行的结果:

1
sum = 5000000050000000

JDK 1.8 提供了并行流(Stream),下面代码的运行效果等价于上述代码,但运行效率更高。

1
2
3
4
5
6
7
8
public class StreamTest {

public static void main(String[] args) {
Long sum = (LongStream.rangeClosed(0L, 100000000L)).parallel().reduce(0L, Long::sum);
System.out.println("sum = " + sum);
}

}

程序执行的结果:

1
sum = 5000000050000000

Fork/Join 框架的工作原理

ForkJoinPool 剖析

  • ForkJoinPool 的组成:

    • ForkJoinPool 的核心组件是 ForkJoinWorkerThread 和 WorkQueue。其中 ForkJoinWorkerThread 代表了 ForkJoinPool 中的工作线程,而 WorkQueue 则用于存储线程内的待执行任务。
    • ForkJoinPool 内部有一个 ctl 的原子整型字段,用来控制和调度工作线程的数量。它包括了状态、工作线程的数量,以及提交任务数量等信息。
    • 任务会被直接提交到 ForkJoinPool 的工作队列(WorkQueue)中,每个工作线程(WorkThread)都有自己的工作队列,这些队列分别存储各自线程内的待执行任务,包括来自外部线程的提交任务。
  • ForkJoinPool 的执行过程:

    • 当一个 ForkJoinTask 被提交到 ForkJoinPool 时,线程池会首先检查提交任务的线程是否为 ForkJoinWorkerThread。如果是,任务会被直接放置在该线程的 WorkQueue 中;如果不是,线程池会为此任务选择一个 WorkQueue 并将任务放入其中。
    • 当 ForkJoinWorkerThread 执行时,它首先会尝试从自己的 WorkQueue 中取出任务来执行。如果队列为空,它会尝试去其他线程的 WorkQueue 中窃取任务来执行。
    • 当 ForkJoinTask 执行 Fork 操作(即进行任务分解)时,新拆分出来的任务会被放入当前 ForkJoinWorkerThread 的 WorkQueue 的尾部。
    • WorkQueue 是一个双端队列,它允许 ForkJoinWorkerThread 以 LIFO 的方式(即后进先出)从队列尾部获取任务来执行,也允许其他线程从队列头部窃取任务来执行。
    • 使用这种方式,ForkJoinPool 能够充分利用所有可用的处理器核心,并且能够将大任务适当地拆分成多个小任务来进行并行处理,从而最大化并行处理的效率。
  • ForkJoinPool 中的 workQueues 属性和 ForkJoinWorkerThread 中的 workQueue 属性关联关系:

    • ForkJoinPool:它管理一组 ForkJoinWorkerThread。每一个 ForkJoinWorkerThread 都有一个对应的 WorkQueue,用于存储待处理的任务。
    • ForkJoinPool 和 workQueues:这是 ForkJoinPool 内部的一个数据结构,通常表现为一个数组,负责存储与每个 ForkJoinWorkerThread 对应的 WorkQueue。在 ForkJoinWorkerThread 创建时,也会为其创建一个新的 WorkQueue,并添加到 workQueues 中。
    • ForkJoinWorkerThread 和 WorkQueue:每个 ForkJoinWorkerThread 一开始会尝试从与其关联的 WorkQueue 中获取任务执行。如果该 WorkQueue 已空,即没有可执行的任务,那么 ForkJoinWorkerThread 会尝试从其他 ForkJoinWorkerThread 的 WorkQueue 中窃取任务来执行。
    • 总的来说,ForkJoinPool 通过 workQueues 管理所有的 ForkJoinWorkerThread 及其对应的 WorkQueue,而 ForkJoinWorkerThread 则通过与其关联的 WorkQueue 来获取并执行任务。这是一种典型的工作窃取并发模型,它使得 ForkJoinPool 能够有效地并行处理共享任务负载,并提高处理效率。

参考资料