java并发系列-介绍Java中的ExecutorService类

1 简介

ExecutorService是由JDK提供的用于简化任务异步执行的框架。ExecutorService会自动创建线程池,并提供接口提交任务。

2 实例化ExecutorService

2.1 Executors类的工厂方法

创建ExecutorService最简单方式就是使用Executors类的工厂方法之一。
例如,下面的代码将会创建10个线程的线程池。

1
ExecutorService executor = Executors.newFixedThreadPool(10);

还有其他工厂方法创建一些满足具体场景的预定义ExecutorService。更多关于介绍查看Oracle’s official documentation

2.2 直接创建一个ExecutorService

因为ExecutorService*是接口类,所有创建一个它的实现类的实例就可以。在 *java.util.concurrent 包中有几个它实现类。

1
2
3
ExecutorService executorService = 
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());

你可能会注意到上述代码与工厂方法newSingleThreadExecutor()源码很相似。

3 给ExecutorService分配任务

ExecutorService可以执行RunnableCallable任务。下面代码使用lambda表达式定义任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Runnable runnableTask = () -> {
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
};

Callable<String> callableTask = () -> {
TimeUnit.MILLISECONDS.sleep(300);
return "Task's execution";
};

List<Callable<String>> callableTasks = new ArrayList<>();
callableTasks.add(callableTask);
callableTasks.add(callableTask);
callableTasks.add(callableTask);

将任务分配给ExecutorService有几个方法,包括execute()集成自Executor,还有submit(),invokeAny(),invokeAll()等。
execute()方法没有返回值,执行这个方法,得不到任务的执行结果也不能检查任务的状态。

1
executorService.execute(runnableTask);

submit()方法接收一个CallableRunnable任务,并返回Future类型的结果。

1
2
Future<String> future = 
executorService.submit(callableTask);

invokeAny()接收一个任务集合,批量执行,任意一个任务执行成功就返回。

1
String result = executorService.invokeAny(callableTasks);

invokeAll接收一个任务集合,批量执行,所有任务执行结果都放在Future对象中以列表的形式返回。

1
List<Future<String>> futures = executorService.invokeAll(callableTasks

在进一步讨论前,还需要讨论下两个内容:关闭ExecutorService和处理返回类型Future。

4 关闭ExecutorService

通常,当没有任务处理的时候,ExecutorService不会自动摧毁。它会一直等待新任务。

在某些情况下很有用。例如,应用中任务出现时间不规则或者任务的数量在编译阶段不知道。

另外,应用已经执行完,但是它不会停下来,因为处于等待状态的ExecutorService会导致JVM保持运行状态。

关闭ExecutorService有两个方法分别为shutdown()shutdownNow()

shutdownNow()方法会立即尝试销毁ExecutorService,它不能保证所有运行中的线程同时停止。这个方法会返回等待被处理的任务列表。供开发者决定如何处理。

1
List<Runnable> notExecutedTasks = executorService.shutDownNow();

一个好的关闭ExecutorService的方法(Oracle推荐)是使用这些方法时和awaitTermination()结合使用。使用awaitTermination()方法,ExecutorService首先会停止接收新任务,提供一定时间让所有任务完成。如果时间超出,执行会立马停止:

1
2
3
4
5
6
7
8
executorService.shutdown();
try {
if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}

5 Future接口类

submit()invokeAll() 方法返回一个Future类型的对象或者列表,通过Future对象可以拿到任务执行结果或者校验任务状态。

Future接口类提供了一个阻塞式的方法get()Callable任务会返回实际执行结果,Runnable任务则返回null。当任务在执行的时候调用get()会造成当前线程阻塞,直到任务执行完成可以拿到结果后才恢复执行。

1
2
3
4
5
6
7
Future<String> future = executorService.submit(callableTask);
String result = null;
try {
result = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}

如果调用get()方法要阻塞很久,那么应用性能就降级了。如果执行结果不重要,那么在调用时指定超时时间来避免这种问题:

1
String result = future.get(200, TimeUnit.MILLISECONDS);

如果执行时间比指定超时时间长,则会抛出TimeoutException

isDone()方法用来校验分配的任务是否执行完毕。

Future接口类也提供了结束任务执行的方法cancel()方法,以及校验任务是否结束的isCancelled()方法:

1
2
boolean canceled = future.cancel(true);
boolean isCancelled = future.isCancelled();

6 ScheduledExecutorService接口类

ScheduledExecutorService执行任务可以设置预定义延迟和周期。实例化ScheduledExecutorService的最佳方式是使用Executors类的工厂方法。
下面代码片段,创建了单线程的ScheduledExecutorService

1
2
ScheduledExecutorService executorService = Executors
.newSingleThreadScheduledExecutor();

在固定延迟后调用任务,可以使用ScheduledExecutorServicescheduled()方法。有两个方法一个用于执行Runnable任务,一个用于执行Callable任务。

下面的代码在执行callableTask之前会延迟1秒:

1
2
Future<String> resultFuture = 
executorService.schedule(callableTask, 1, TimeUnit.SECONDS);

scheduleAtFixedRate()方法会在固定延迟后周期性执行一个任务。

下面的代码在延迟100毫秒后执行一个任务,并且每隔450毫秒再次执行相同任务。如果处理器需要比周期时间更长的事件来处理任务,ScheduledExecutorService将等待当前任务执行完成再开始下一个:

1
2
Future<String> resultFuture = service
.scheduleAtFixedRate(runnableTask, 100, 450, TimeUnit.MILLISECONDS);

如果任务迭代执行中间需要固定长度的延迟时间,则要用到scheduleWithFixedDelay()。例如,下面的代码在上一个任务结束到下一个任务开始间有一个150毫秒的固定暂停。

1
service.scheduleWithFixedDelay(task, 100, 150, TimeUnit.MILLISECONDS);

根据scheduleAtFixedRate()scheduleWithFixedDelay()方法声明,任务的执行周期会因为ExecutorService的终止或者任务抛出异常而终止。

7 ExecutorService vs. Fork/Join

Java7发布以后,许多开发者认为ExecutorService框架应该被fork/join框架代替。这个观点不太对。尽管fork/join框架用法更简单性能更佳,但是会控制并发执行的开发者人数也下降了。

ExecutorService是开发者可以控制线程的数量和任务的粒度。ExecutorService最佳使用场景是独立任务的处理,比如事务或者请求,根据一个线程执行一个任务的原则。

相比较,根据Oracle文档,fork/join框架设计成加速任务处理,递归地将工作分解成更小的单元。

8 总结

虽然ExecutorService比较简单,但是也有一些常见的问题。总结如下:

保持一个未用的ExecutorService处于执行状态:第四节做了详细解释
错误的线程池容量当使用固定长度的线程池时:判断需要多少线程才能有效地执行任务非常重要。线程池太大会导致不必要的浪费,创建的线程大多处于等待状态。太少会让应用看起来没有响应,因为任务需要等待很长时间才能分配到线程处理。
调用Future的get()方法在任务结束后:尝试获取已经结束的任务的执行结果会抛CancellationException
Future的get()方法导致不可预期的阻塞时间:应该指定超时时间来避免不可预期的等待。


本文为译文,作者通过翻译达到学习目的。 原文链接 | 原文源码链接