java并发系列-ExecutorService-等待线程结束

1 介绍

ExecutorService框架使得在多线程中处理任务变得简单。文中会举一些等待线程完成执行的例子。

同时会展示怎样优雅地关闭ExecutorService并等待已经运行的线程结束执行。

2 Executor关闭后

当使用Executor时,通过调用shutdown()或者shutdownNow()方法关闭。不过,它不会等待所有线程都执行完才关闭。

等待现存的线程完成执行再关闭可以通过调用awaitTermination()方法实现。

下面的例子阻塞当前线程直到所有任务都执行完成或者超过指定的超时时间:

1
2
3
4
5
6
7
8
9
10
11
public void awaitTerminationAfterShutdown(ExecutorService threadPool) {
threadPool.shutdown();
try {
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
threadPool.shutdownNow();
}
} catch (InterruptedException ex) {
threadPool.shutdownNow();
Thread.currentThread().interrupt();
}
}

3 使用CountDownLatch

下面,看看另外一种解决上面问题的办法-使用CountDownLatch通知任务完成。

我们可以初始化一个值用于调用await()方法的线程在这个值递减到0后会被通知。

例如,如果我们需要当前线程等待另外N个线程完成执行。我们可以用N初始化latch:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ExecutorService WORKER_THREAD_POOL 
= Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
WORKER_THREAD_POOL.submit(() -> {
try {
// ...
latch.countDown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}

// wait for the latch to be decremented by the two remaining threads
latch.await();

4 使用invokeAll()

第一个我们用于运行线程的方法是invokeAll()。在所有任务执行结束或者超过超时时间后,方法会返回Future对象列表。

同时注意返回的Future对象列表顺序和提供的Callable对象列表顺序一样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ExecutorService WORKER_THREAD_POOL = Executors.newFixedThreadPool(10);

List<Callable<String>> callables = Arrays.asList(
new DelayedCallable("fast thread", 100),
new DelayedCallable("slow thread", 3000));

long startProcessingTime = System.currentTimeMillis();
List<Future<String>> futures = WORKER_THREAD_POOL.invokeAll(callables);

awaitTerminationAfterShutdown(WORKER_THREAD_POOL);

long totalProcessingTime = System.currentTimeMillis() - startProcessingTime;

assertTrue(totalProcessingTime >= 3000);

String firstThreadResponse = futures.get(0).get();

assertTrue("fast thread".equals(firstThreadResponse));

String secondThreadResponse = futures.get(1).get();
assertTrue("slow thread".equals(secondThreadResponse));

5 使用ExecutorCompletionService

另一种运行多线程的方法是使用ExecutorCompletionService。使用传入的ExecutorService执行任务。

invokeAll()的一个区别是返回的Future结果列表顺序与执行任务的返回顺序一样。ExecutorCompletionService使用队列按任务执行完成顺序保存结果,而invokeAll()会返回和给定的任务列表顺序一样的结果列表:

1
2
3
4
5
6
7
8
9
10
CompletionService<String> service
= new ExecutorCompletionService<>(WORKER_THREAD_POOL);

List<Callable<String>> callables = Arrays.asList(
new DelayedCallable("fast thread", 100),
new DelayedCallable("slow thread", 3000));

for (Callable<String> callable : callables) {
service.submit(callable);
}

使用take()方法可以访问结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
long startProcessingTime = System.currentTimeMillis();

Future<String> future = service.take();
String firstThreadResponse = future.get();
long totalProcessingTime
= System.currentTimeMillis() - startProcessingTime;

assertTrue("First response should be from the fast thread",
"fast thread".equals(firstThreadResponse));
assertTrue(totalProcessingTime >= 100
&& totalProcessingTime < 1000);
LOG.debug("Thread finished after: " + totalProcessingTime
+ " milliseconds");

future = service.take();
String secondThreadResponse = future.get();
totalProcessingTime
= System.currentTimeMillis() - startProcessingTime;

assertTrue(
"Last response should be from the slow thread",
"slow thread".equals(secondThreadResponse));
assertTrue(
totalProcessingTime >= 3000
&& totalProcessingTime < 4000);
LOG.debug("Thread finished after: " + totalProcessingTime
+ " milliseconds");

awaitTerminationAfterShutdown(WORKER_THREAD_POOL);

6 总结

根据使用场景,有多种方式实现等待线程完成执行。

CountDownLatch可以用于当我们想要一组操作被线程完成以后要通知其他线程时。

ExecutorCompletionService可以用于当我们需要尽快拿到任务结果时,而其他方法只能等到运行的任务执行完以后才能拿到。


本文为译文,作者通过翻译达到学习目的。 原文链接 | 原文源码链接 | 本站源码链接