java并发系列-Java中的Fork/Join框架介绍

1 介绍

Java7引入了fork/join框架。它利用所有可用的处理器来加速并行处理,通过分而治之的方法完成。

实践中,这意味着框架先“forks”,递归地将任务分解成更小的独立子任务,直到它们足够简单,再异步进行处理。

然后,“join”部分开始,所有子任务的结果递归地合并处理成单个结果,或者如果任务的返回是void,程序简单地等待每个子任务执行完。

为了让并发执行更有效率,fork/join框架使用叫ForkJoinPool的线程池,它管理ForkJoinWorkerThread类型的线程。

2 ForkJoinPool

ForkJoinPool是框架核心,是ExecutorService的实现类,管理工作者线程并且提供获取线程池信息和性能的工具。

工作者线程一次处理一个任务,但是ForkJoinPool不会为每个子任务创建单独的线程。相反,每个线程池的线程都会有一个双向队列存储任务。

这个机制,用到了工作窃取算法,对于平衡线程的工作负载很重要。

2.1 工作窃取算法

简单来说,空闲线程会尝试从繁忙线程的双向队列窃取工作。

一个工作线程默认会从它的双向队列头部拿去任务。当自己的队列为空后,线程就会从另外一个忙线程的双向队列尾部获取线程或者从全局队列获取,因为这里保存有最多的工作。

因为线程会先去找存在大量工作的队列,所以这个方法极大降低了线程竞争任务的可能,也减少了线程寻找任务的次数。

2.2 ForkJoinPool实例化

Java8中,最便捷的访问ForkJoinPool实例的方式是使用静态方法 *commonPool()。和它的名字一样,它会返回一个通用线程池的引用,处理ForkJoinTask*的默认线程池。

根据Oracle文档,使用预定义的通用线程池会减少资源浪费,它不会为每个任务创建单独的线程池。

1
ForkJoinPool commonPool = ForkJoinPool.commonPool();

类似于上述操作,Java7中可以通过创建一个ForkJoinPool,并把它保存在一个工具类的静态字段中。

1
public static ForkJoinPool forkJoinPool = new ForkJoinPool(2);

现在,可以访问它了。

1
ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool;

通过ForkJoinPool的构造方法,可以在创建线程池时指定并发级别,线程工厂和异常处理方法。例如上面代码,线程池的并发等级是2,即这个线程池使用两个处理器内核。

3 ForkJoinTask

ForkJoinTask是在ForkJoinPool中执行的任务的基本类型。在实际使用中,任务类需要继承它的两个子类之一。其中,RecursiveAction的任务没有返回值,RecursiveTask的任务有返回值。它们都有一个抽象方法compute()用于定义任务具体逻辑。

3.1 RecursiveAction - 一个例子

下面的例子,处理的工作量由String类型的变量workload表示。例子的任务没有实际意义,只是简单地将输入数据转成大写并打印日志。

为了演示这个框架的forking行为,例子里在workload的长度大于指定阈值时,用createSubtask()分解任务。

字符串递归地被分割成子字符串,并基于这些子字符串创建CustomRecursiveTask实例。

最后,方法返回一个List<CustomRecursiveAction类型的结果。

上面的列表通过invokeAll()方法提交给ForkJoinPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class CustomRecursiveAction extends RecursiveAction {

private String workload = "";
private static final int THRESHOLD = 4;

private static Logger logger =
Logger.getAnonymousLogger();

public CustomRecursiveAction(String workload) {
this.workload = workload;
}

@Override
protected void compute() {
if (workload.length() > THRESHOLD) {
ForkJoinTask.invokeAll(createSubtasks());
} else {
processing(workload);
}
}

private List<CustomRecursiveAction> createSubtasks() {
List<CustomRecursiveAction> subtasks = new ArrayList<>();

String partOne = workload.substring(0, workload.length() / 2);
String partTwo = workload.substring(workload.length() / 2, workload.length());

subtasks.add(new CustomRecursiveAction(partOne));
subtasks.add(new CustomRecursiveAction(partTwo));

return subtasks;
}

private void processing(String work) {
String result = work.toUpperCase();
logger.info("This result - (" + result + ") - was processed by "
+ Thread.currentThread().getName());
}
}

3.2 RecursiveTask

对于有返回值的任务,除了计算任务结果,这里的逻辑都比较简单。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class CustomRecursiveTask extends RecursiveTask<Integer> {
private int[] arr;

private static final int THRESHOLD = 20;

public CustomRecursiveTask(int[] arr) {
this.arr = arr;
}

@Override
protected Integer compute() {
if (arr.length > THRESHOLD) {
return ForkJoinTask.invokeAll(createSubtasks())
.stream()
.mapToInt(ForkJoinTask::join)
.sum();
} else {
return processing(arr);
}
}

private Collection<CustomRecursiveTask> createSubtasks() {
List<CustomRecursiveTask> dividedTasks = new ArrayList<>();
dividedTasks.add(new CustomRecursiveTask(
Arrays.copyOfRange(arr, 0, arr.length / 2)));
dividedTasks.add(new CustomRecursiveTask(
Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
return dividedTasks;
}

private Integer processing(int[] arr) {
return Arrays.stream(arr)
.filter(a -> a > 10 && a < 27)
.map(a -> a * 10)
.sum();
}
}

这个例子中,工作被保存在CustomRecursiveTask类的名为arr的列表类型的字段里。createSubtask()方法递归地将任务分解成小于阈值的子任务,然后使用invokeAll()提交这些子任务到线程池,最后返回Future类型的结果列表。

每个子任务调用join()触发执行。

代码使用了Java8的Stream APIsum()方法将子任务结果合并成最终结果。

4 提交任务给ForkJoinPool

提交任务给线程池,有以下一些方法。
submit()或者execute()方法(它们的用法一样):

1
2
forkJoinPool.execute(customRecursiveTask);
int result = customRecursiveTask.join();

invoke()方法fork任务并且等待结果,不需要主动join:

1
int result = forkJoinPool.invoke(customRecursiveTask);

invokeAll()方法是最便捷的方式提交一系列ForkJoinTasks任务到ForkJoinPool。它将任务作为参数,fork它们并返回Future类型的对象列表,列表按结果产生先后顺序。

1
2
customRecursiveTaskFirst.fork();
result = customRecursiveTaskLast.join();

在这个RecursiveTask的例子中,使用了invokeAll()方法提交一系列子任务到线程池。使用fork()join()也可以完成相同的工作。

为了避免混淆,通常推荐使用invokeAll()方法提交多个任务到ForkJoinPool

5 总结

使用fork/join框架能够加速处理大量任务,不过要遵循一些准则:

  • 尽量少使用线程池 - 最好每个应用或者系统使用一个线程池
  • 使用默认通用线程池,如果没有特殊要求
  • 使用合适的阈值拆解ForkJoinTask
  • 避免在ForkJoinTask中做阻塞动作

本文为译文,作者通过翻译达到学习目的。 原文链接 | 原文源码链接 | 本站源码链接