java并发系列-介绍Java中的线程池

1 简介

这篇文章讲Java中的线程池,包括标准java库对线程池的各种实现和Google的Guava库对线程池的实现。

2 线程池

在Java中线程和系统级线程对应,是操作系统的资源。如果随意创建线程,可能导致系统资源很快被用光。

操作系统也负责线程间上下文切换,来模拟并发。简单来说,发起的线程越多,每个线程执行任务分配到的时间越少。

线程池模式帮助多线程应用节省系统资源,同时可以在预定义限制内实现并发。

当使用线程池的时候,并发代码以并发任务的形式实现,然后由线程池实例来执行。线程池实例控制多个可复用的线程执行这些任务。

线程池生命周期

应用通过线程池模式可以控制线程的数量。它的生命周期就是维护新来的任务入队和调度任务执行。

3 Java中的线程池

3.1 Executors, Executor and ExecutorService

Executors辅助类提供了创建预定义线程池实例的若干方法。如果没有特别的需求,我们使用这些类就可以。

ExecutorExecutorService接口获得Executors创建的不同线程池实现并使用它们。通常,我们应该在应用中使用这些接口,使得我们的代码和线程池的具体实现解耦。

Executor接口提供一个执行方法,用于执行实现Runnable接口的实例。

看看下面的例子,这个例子展示怎样通过ExecutorsAPI获得Executor实例,这个实例支持单线程池和一个无限队列(unbounded queue)串行执行任务。这里,它执行一个打印”HELLO WORLD”的任务。这个任务以lamdba(Java 8特性)的形式提交(这个lamdba被推断为Runnable接口实例)。

1
2
Executor executor = Executors.newSingleThreadExecutor();
executor.execute(() -> System.out.println("Hello World"));

ExecutorService接口提供很多方法控制任务执行过程和管理这个service的终止。

使用这个接口,不仅可以执行任务,同时获得返回的Future实例后,可以控制任务执行。

下面的例子,创建了一个ExecutorService, 提交一个任务后,通过Future实例的get方法等待任务执行完成后获得返回结果。

1
2
3
4
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> future = executorService.submit(() -> "Hello World");
// some operations
String result = future.get();

实际上,在具体场景中,并不需要马上调用future.get(),只在我们实际需要计算结果的时候调用就可以。

submit方法有多个重载方法,可以传递RunnableCallable函数接口实现的实例,也可以传递lambda形式的传参(Java8)。

Runnable接口不抛出异常和没有返回值。而Callable接口或许更方便,它会抛出异常和返回结果。

最后,让编译器推断传入的lambda是Callable类型的方法就是让lambad返回结果就行。

更多的关于使用ExecutorService和futures的例子,可以查看A Guide to the Java ExecutorService

3.2 ThreadPoolExecutor

ThreadPoolExecutor是一个可扩展的线程池实现,提供很多参数和hook。

下面将讨论到的主要配置参数有:corePoolSizemaximumPoolSizekeepAliveTime

ThreadPoolExecutor的线程池会维护固定数量的核心线程,以及一些超额的线程,这些超额线程创建后,不需要时会被终止。corePoolSize参数用于配置线程池实例化核心线程的数据。当提交了很多任务,没有闲置的核心线程时,线程池会将核心线程的数量提高到maximumPoolSize

这些参数覆盖很多使用场景,其中最常见的配置会直接被封装成Executors的静态方法。

比如,newFixedThreadPool方法,它的corePoolSizemaximumPoolSize相等,keepAliveTime为0。也就是说线程池的线程数量总是固定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ThreadPoolExecutor executor = 
(ThreadPoolExecutor) Executors.newFixedThreadPool(2);
executor.submit(() -> {
Thread.sleep(1000);
return null;
});
executor.submit(() -> {
Thread.sleep(1000);
return null;
});
executor.submit(() -> {
Thread.sleep(1000);
return null;
});

assertEquals(2, executor.getPoolSize());
assertEquals(1, executor.getQueue().size());

上面例子中,实例化了一个固定线程数为2的ThreadPoolExecutor。如果同时提交的任务小于等于两个,那么它们可以立马被执行。否则,就会有一些任务被放到队列里等待执行。

我们创建三个Callable任务通过sleep(1000)模拟任务执行长。前两个任务会马上被执行,第三个任务会在队列中等待执行。然后马上调用getPoolSize()getQueue().size()方法来验证。

另外一个预配置的ThreadPoolExecutorExecutors.newCachedThreadPool()方法创建。它不会保留执行数量的线程,这个ThreadPoolExecutor实例的corePoolSize实际上为0,maximumPoolSizeInteger.MAX_VALUEkeepAliveTime为60s。

这些配置使得缓存线程池的大小没有限制,随着提交的任务数量变化。但是当这些线程不再需要时,他们会在60s后被清理掉。一个典型例子是应用存在大量短时任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ThreadPoolExecutor executor = 
(ThreadPoolExecutor) Executors.newCachedThreadPool();
executor.submit(() -> {
Thread.sleep(1000);
return null;
});
executor.submit(() -> {
Thread.sleep(1000);
return null;
});
executor.submit(() -> {
Thread.sleep(1000);
return null;
});

assertEquals(3, executor.getPoolSize());
assertEquals(0, executor.getQueue().size());

上述例子中,队列的大小总是为0,因为内部使用一个同步队列。在同步队列中,插入和移除总是同时发生,即这个队列实际上不会保留任何东西。

Executors.newSingleThreadExecutor()接口创建另一种典型的ThreadPoolExecutor,它包含一个线程。单线程执行器非常适合用于创建一个事件循环。它的corePoolSizemaximumPoolSize为1,keepAliveTime为0。

下面例子的任务是串行执行,所以任务执行完以后counter值为2。

1
2
3
4
5
6
7
8
9
AtomicInteger counter = new AtomicInteger();

ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
counter.set(1);
});
executor.submit(() -> {
counter.compareAndSet(1, 2);
});

另外,ThreadPoolExecutor的配置在创建以后不可修改。所以创建的具体实例不能直接赋值给ThreadPoolExecutor

3.3 ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor类继承了ThreadPoolExecutor类,实现了ScheduledExecutorService接口,以及添加了几个方法:

  • schedule方法在指定延迟时间后执行任务;
  • scheduleAtFixedRate方法在指定延迟时间后执行任务,随后重复执行指定时长。period参数是从任务开始计时执行时间,因此执行率是固定的。
  • scheduleWithFixedDelay方法跟scheduleAtFixedRate相似,会重复执行给定任务。不同之处是在上个任务结束到下个任务开始之间会有固定延迟;执行率会随任务的执行时间长短变化。

Executors.newScheduledThreadPool()方法用来创建一个ScheduledThreadPoolExecutorcorePoolSize需要指定,maximumPoolSize没有限制,keepAliveTime为0。下面的例子展示调度任务在500ms延迟后执行:

1
2
3
4
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
executor.schedule(() -> {
System.out.println("Hello World");
}, 500, TimeUnit.MILLISECONDS);

接着,下面的代码展示在500ms延迟后执行任务,然后每隔100ms重复执行。代码中在调度任务后,等待CountDownLatch lock触发三次,然后调用Future.cancel()方法结束任务调度。

1
2
3
4
5
6
7
8
9
10
CountDownLatch lock = new CountDownLatch(3);

ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
System.out.println("Hello World");
lock.countDown();
}, 500, 100, TimeUnit.MILLISECONDS);

lock.await(1000, TimeUnit.MILLISECONDS);
future.cancel(true);

3.4 ForkJoinPool

ForkJoinPoolfork/join框架的核心部分,这个框架在Java7中被引入,用于解决递归算法中创建多任务产生的问题。如果使用简单的ThreadPoolExecutor,将很快耗光线程,这是因为每个任务或者子任务都需要一个线程来执行。

fork/join框架中,任何任务都能创建(fork)一些子任务并通过join方法等待它们完成。fork/join框架的优点在于它不会为每个任务或者子任务创建新线程,而是通过实现工作窃取算法(Work Stealing algorithm)来代替。框架更详尽的描述在这里

下面看看一个简单的例子,这个例子使用ForkJoinPool遍历树节点并计算所有节点的值的和。树节点的实现如下,包含一个整书和一个孩子节点集合:

1
2
3
4
5
6
7
8
9
10
11
static class TreeNode {

int value;

Set<TreeNode> children;

TreeNode(int value, TreeNode... children) {
this.value = value;
this.children = Sets.newHashSet(children);
}
}

现在如果想要并行计算树所有节点值的和,我们需要实现一个RecursiveTask接口。每个任务处理一个节点,并将它的值与它孩子节点的值相加。为了计算孩子节点值的总和,做了如下事情:

  • 孩子节点集合流式处理(stream)
  • 将每个元素映射为CountingTask(map)
  • 在每个子任务上调用fork方法
  • 在每个forked的任务上调用join方法收集结果
  • 通过Collectors.summingInt收集器对结果求和
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static class CountingTask extends RecursiveTask<Integer> {

private final TreeNode node;

public CountingTask(TreeNode node) {
this.node = node;
}

@Override
protected Integer compute() {
return node.value + node.children.stream()
.map(childNode -> new CountingTask(childNode).fork())
.collect(Collectors.summingInt(ForkJoinTask::join));
}
}

下面对具体的树计算的代码就比较简单:

1
2
3
4
5
6
TreeNode tree = new TreeNode(5,
new TreeNode(3), new TreeNode(2,
new TreeNode(2), new TreeNode(8)));

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
int sum = forkJoinPool.invoke(new CountingTask(tree));

4 Guava的线程池实现

Guava是一个流行的Google出品的工具类库。它有许多有用的并发类,包括若干简单易用的ExecutorService实现。这些实现类不能直接实例化或者子类化(subclassing 类似继承??),只能通过MoreExecutors辅助类创建它们的实例。

4.1 通过Maven添加Guava依赖

添加如下依赖到pom文件中来依赖Guava库。在Maven中央库可以找到最新版本:

1
2
3
4
5
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>

4.2 Direct Executor和Direct Executor Service

有时候因为情况不同,我们会想要在当前线程中或者线程池中执行任务。这时,我们更喜欢有一个Executor接口只需切换它的具体实现就能完成上述操作。虽然在当前线程中使用Executor或者ExecutorService执行任务也不难,但总归需要写一些模板代码(boilerplate)。

让人开心的是,Guava为我们提供了预定义的实例。

这里举个在同一线程中执行任务的例子。代码中任务通过睡眠500ms阻塞当前线程,当execute方法执行完成时,执行结果也能立即获得。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Executor executor = MoreExecutors.directExecutor();

AtomicBoolean executed = new AtomicBoolean();

executor.execute(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
executed.set(true);
});

assertTrue(executed.get());

上面说到的预定义实例,通过directExecutor()方法获得的实际上是一个静态单例(singleton),所以使用这个方法不用担心过度创建。

MoreExecutors.newDirectExecutorService()接口每次调用都会创建一个全新完整的executor service。

4.3 Exiting Executor Services

另外一个常见问题是当一个线程池还在跑它的任务时,停止虚拟机(VM)运行。即使存在结束机制,也不一定能保证executor service停止时,任务也能优化停止。这会导致JVM无限挂在那里,而任务还继续执行。

为了解决这个问题,Guava引入一系列有退出机制的executor service(exiting executor service)。它们基于守护线程(daemon threads),和JVM一起结束。

这些services通过Runtime.getRuntime().addShutdownHook()方法配置关闭钩子,防止虚拟机等待一定时间后放弃悬停任务(hung)直接终止。

接下来的例子,提交一个含有无限循环的任务,使用一个exiting executor service,并为虚拟机终止前为未结束任务执行等待100 milliseconds。如果没有exitingExecutorService, 这个任务将导致虚拟机一直挂着。

1
2
3
4
5
6
7
8
9
10
ThreadPoolExecutor executor = 
(ThreadPoolExecutor) Executors.newFixedThreadPool(5);
ExecutorService executorService =
MoreExecutors.getExitingExecutorService(executor,
100, TimeUnit.MILLISECONDS);

executorService.submit(() -> {
while (true) {
}
});

4.4 Listening Decorators

监听修饰器帮助我们在执行提交任务时,封装ExecutorService并接收返回对象ListenableFuture实例,而不是简单的Future实例。ListenableFuture接口类继承了Future并增加了一个方法addListener。这个方法会添加一个监听器,监听future完成。

我们基本不会直接使用ListenableFuture.addListener()方法 ,但是它对于Future工具类中的很多辅助方法很有用。例如,在Futures.allAsList()方法中所有future成功完成并合并后,去将若干ListenableFuture实例合并到一个ListenableFuture中:

1
2
3
4
5
6
7
8
9
10
11
12
13
ExecutorService executorService = Executors.newCachedThreadPool();
ListeningExecutorService listeningExecutorService =
MoreExecutors.listeningDecorator(executorService);

ListenableFuture<String> future1 =
listeningExecutorService.submit(() -> "Hello");
ListenableFuture<String> future2 =
listeningExecutorService.submit(() -> "World");

String greeting = Futures.allAsList(future1, future2).get()
.stream()
.collect(Collectors.joining(" "));
assertEquals("Hello World", greeting);

5 总结

这篇文章讨论了线程池模型以及它在标准JAVA库和Google’s Guava库中的实现。


本文为译文,作者通过翻译达到学习目的。 原文链接 | 原文源码链接 | 本站源码链接