java并发系列-介绍Java中CountDownLatch

1 介绍

这篇文章介绍 *CountDownLatch*类,并通过例子演示它的用法。

基本上,使用CountDownLatch会在其他线程完成给定的任务之前阻塞线程。

2 并发编程的用法

简单地讲,CountDownLatch有个counter字段,对它进行递减操作。我们可以用它来阻塞当前线程直到counter归零。

如果我们做一些并发处理,可以实例化一个CountDownLatch,让它的counter值等于工作者线程数。每个线程结束时,都调用countdown() 方法,这样可以一直阻塞当前线程直到工作者线程都执行完成。

3 等待线程池完成

首先,创建一个Worker类,里面有一个类型为CountDownLatch的变量,用于通知它完成执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Worker implements Runnable {
private List<String> outputScraper;
private CountDownLatch countDownLatch;

public Worker(List<String> outputScraper, CountDownLatch countDownLatch) {
this.outputScraper = outputScraper;
this.countDownLatch = countDownLatch;
}

@Override
public void run() {
doSomeWork();
outputScraper.add("Counted down");
countDownLatch.countDown();
}
}

然后,创建一个单元测试,证明通过CountDownLatch等待Worker完成执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Test
public void whenParallelProcessing_thenMainThreadWillBlockUntilCompletion()
throws InterruptedException {

List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
CountDownLatch countDownLatch = new CountDownLatch(5);
List<Thread> workers = Stream
.generate(() -> new Thread(new Worker(outputScraper, countDownLatch)))
.limit(5)
.collect(toList());

workers.forEach(Thread::start);
countDownLatch.await();
outputScraper.add("Latch released");

assertThat(outputScraper)
.containsExactly(
"Counted down",
"Counted down",
"Counted down",
"Counted down",
"Counted down",
"Latch released"
);
}

“Latch released”永远最后输出。因为程序于依赖CounDownLatch释放。

如果没有调用await(),则不能保证线程的执行顺序,单元测试会失败。

4 线程池等待开始

如果采用上面的例子,这一次会设置上千个线程而不是5个,结果可能是前面的线程会在后面的线程还没调用start()的时候就已经处理完成。这样就很难重新并发的场景。

为了实现这个场景,我们通过不同的方式来使用CountdownLatch。与阻塞父线程等待子线程处理完成相关,我们阻塞各个子线程直到所有子线程都开始。

下面我们修改run()方法,在处理任务之前阻塞:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class WaitingWorker implements Runnable {

private List<String> outputScraper;
private CountDownLatch readyThreadCounter;
private CountDownLatch callingThreadBlocker;
private CountDownLatch completedThreadCounter;

public WaitingWorker(
List<String> outputScraper,
CountDownLatch readyThreadCounter,
CountDownLatch callingThreadBlocker,
CountDownLatch completedThreadCounter) {

this.outputScraper = outputScraper;
this.readyThreadCounter = readyThreadCounter;
this.callingThreadBlocker = callingThreadBlocker;
this.completedThreadCounter = completedThreadCounter;
}

@Override
public void run() {
readyThreadCounter.countDown();
try {
callingThreadBlocker.await();
doSomeWork();
outputScraper.add("Counted down");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
completedThreadCounter.countDown();
}
}
}

现在,修改单元测试。首先阻塞,直到所有Worker开始解除阻塞。然后结束前阻塞,直到所有Worker处理完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Test
public void whenDoingLotsOfThreadsInParallel_thenStartThemAtTheSameTime()
throws InterruptedException {

List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
CountDownLatch readyThreadCounter = new CountDownLatch(5);
CountDownLatch callingThreadBlocker = new CountDownLatch(1);
CountDownLatch completedThreadCounter = new CountDownLatch(5);
List<Thread> workers = Stream
.generate(() -> new Thread(new WaitingWorker(
outputScraper, readyThreadCounter, callingThreadBlocker, completedThreadCounter)))
.limit(5)
.collect(toList());

workers.forEach(Thread::start);
readyThreadCounter.await();
outputScraper.add("Workers ready");
callingThreadBlocker.countDown();
completedThreadCounter.await();
outputScraper.add("Workers complete");

assertThat(outputScraper)
.containsExactly(
"Workers ready",
"Counted down",
"Counted down",
"Counted down",
"Counted down",
"Counted down",
"Workers complete"
);
}

这种方式对于重现并发问题很有帮助,因为可以测试上千线程并发处理某个逻辑。

5 早点终止CountdownLatch

有时候,可能会出现WorkerCountDownLatch倒计时完之前就错误终止。这会导致CountDownLatch永远不会数到0,await()也永远不会终止:

1
2
3
4
5
6
7
8
@Override
public void run() {
if (true) {
throw new RuntimeException("Oh dear, I'm a BrokenWorker");
}
countDownLatch.countDown();
outputScraper.add("Counted down");
}

这里我们修改之前的单元测试使用BrokenWorker类,来展示await()永远阻塞的情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
public void whenFailingToParallelProcess_thenMainThreadShouldGetNotGetStuck()
throws InterruptedException {

List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
CountDownLatch countDownLatch = new CountDownLatch(5);
List<Thread> workers = Stream
.generate(() -> new Thread(new BrokenWorker(outputScraper, countDownLatch)))
.limit(5)
.collect(toList());

workers.forEach(Thread::start);
countDownLatch.await();
}

当然这不是我们想要的,最好让应用继续执行而不是永远阻塞。
怎么解决这个问题呢,就是给await()指定超时时间:

1
2
boolean completed = countDownLatch.await(3L, TimeUnit.SECONDS);
assertThat(completed).isFalse();

这样这个测试会超时,然后返回false

6 总结

这篇文章,我们展示如何使用CountDownLatch阻塞线程等待其他线程完成处理。

同时,也展示了怎样让线程并行执行来调试并发问题。


本文为译文,作者通过翻译达到学习目的。 原文链接 | 原文源码链接 | 本站源码链接