java并发系列-Java8 Parallel Streams中的自定义线程池

1 介绍

Java8引入了流的概念,一种对数据执行批量操作的有效方式。并行流用在支持并发的场景中。

以多线程为代价,这些流能够提高处理效率。

这篇文章介绍一下流API最大的限制之一以及怎样使用自定义的线程池实例处理并行流工作,也可以通过这个来实现。

2 并行流(Parallel Stream)

首先展示一个小例子。在任一集合类型上调用parallelStream方法,返回一个并行流。

1
2
3
4
5
6
7
@Test
public void givenList_whenCallingParallelStream_shouldBeParallelStream(){
List<Long> aList = new ArrayList<>();
Stream<Long> parallelStream = aList.parallelStream();

assertTrue(parallelStream.isParallel());
}

在这个流中的默认处理方式是使用ForkJoinPool.commonPool(),这个公共线程池供整个应用共享。

3 自定义线程池

当处理流时,我们可以传递了一个自定义线程池。

下面的例子让并行流使用自定义线程池计算1到10000000的总和:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Test
public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal()
throws InterruptedException, ExecutionException {

long firstNum = 1;
long lastNum = 1000000;

List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
.collect(Collectors.toList());

ForkJoinPool customThreadPool = new ForkJoinPool(4);
long actualTotal = customThreadPool.submit(
() -> aList.parallelStream().reduce(0L, Long::sum)).get();

assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
}

首先,使用ForkJoinPool构造方法创建一个并发等级为4的实例。有些试验要求为不同环境指定最优的等级,不过也可以简单地根据自己的cpu核数来指定。

接着,处理并行流的内容。调用reduce求和。

这个例子没有展现出使用自定义线程池的所有用处,但是在某些场景好处显而易见。比如公共线程池在执行长任务,或者公共线程池被用于应用的其它组件。

4 总结

这篇文档,简要地介绍了如何使用自定义线程池处理并行流。在正确的环境下,使用合适的并行等级,能很好地提高处理能力。


本文为译文,作者通过翻译达到学习目的。 原文链接 | 原文源码链接 | 本站源码链接