java并发系列-Java 中的 CyclicBarrier

1 介绍

CyclicBarriers 是 Java 5 中引入的同步构造(synchronization constructs),java.util.concurrent package 的一部分。

这篇文章,我们探索一下在并发场景中如何使用该类。

2 Java 并发 - Synchronizers

java.util.concurrent 包含有若干个类,用于帮助管理线程之间的协作。其中部分包括:

  • CyclicBarrier
  • Phaser
  • CountDownLatch
  • Exchanger
  • Semaphore
  • SynchronousQueue

这些类为线程之间的常用交互模式提供开箱即用的功能。如果我们有一组相互通信的线程并使用一种或者多种更常用的模式,如果我们有一组相互通信的线程,它们的交互模式与常用交互模式中的一种相似,我们只需简单地复用库里合适的类(也称为 Synchronizers*)而不是使用 locks 和 condition 对象以及 *synchronized 关键字再创造一个自定义方案。

接下来,研究下 CyclicBarrier

3 CyclicBarrier

CyclicBarrier 是一个同步器(Synchronizer),支持一组线程相互等待直到都到达共同的执行节点,也称为 barrier

CyclicBarriers are used in programs in which we have a fixed number of threads that must wait for each other to reach a common point before continuing execution.

Barrier 被叫做 cyclic,因为它在等待的线程释放后可以被复用。

4 用法

CyclicBarrier 的构造方法很简单。只需要传入一个整数,标记需要使用 barrier 实例调用 await() 方法的线程数,达到该线程数时通知指定数量的线程已到达公共执行节点:

1
public CyclicBarrier(int parties)

需要同步它们的执行的线程们也称为 parties 而调用 await() 方法就是我们注册到达 barrier point 的线程的方式。

这个调用是同步的并且线程调用该方法会挂起执行直到指定数量的线程在 barrier 上调用的了相同方法。指定数量的线程调用 await() 方法的场景称为 tripping the barrier

我们还可以传第二个参数给构造方法,这个参数是一个 Runnable 实例。它的逻辑会在最后一个线程到达 barrier 后执行。

1
public CyclicBarrier(int parties, Runnable barrierAction)

5 实现

下面场景展示 CyclicBarrier 如何使用。

这是固定数量的线程执行并保存相关结果到列表中的操作。当所有线程完成执行,他们中的一个(一般是最后一个 trips the barrier的)会开始处理从各个线程获取的数据。

让我们实现 action 发生的主类:

1
2
3
4
5
6
7
8
9
10
11
public class CyclicBarrierDemo {

private CyclicBarrier cyclicBarrier;
private List<List<Integer>> partialResults
= Collections.synchronizedList(new ArrayList<>());
private Random random = new Random();
private int NUM_PARTIAL_RESULTS;
private int NUM_WORKERS;

// ...
}

这个类非常清楚 - NUM_WORKERS是将要执行的线程数,NUM_PARTIAL_RESULTS是各个 worker 线程将要产生的结果数。

最后,我们有一个 partialResult 变量是列表类型,保存了各个 worker 线程处理结果。注意这个列表是 SynchronizedList,因为多个线程会同时放它里面写值,add() 方法对于普通的 ArrayList 不是线程安全的(thread-safe)。

现在,实现 worker 线程的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class CyclicBarrierDemo {

// ...

class NumberCruncherThread implements Runnable {

@Override
public void run() {
String thisThreadName = Thread.currentThread().getName();
List<Integer> partialResult = new ArrayList<>();

// Crunch some numbers and store the partial result
for (int i = 0; i < NUM_PARTIAL_RESULTS; i++) {
Integer num = random.nextInt(10);
System.out.println(thisThreadName
+ ": Crunching some numbers! Final result - " + num);
partialResult.add(num);
}

partialResults.add(partialResult);
try {
System.out.println(thisThreadName
+ " waiting for others to reach barrier.");
cyclicBarrier.await();
} catch (InterruptedException e) {
// ...
} catch (BrokenBarrierException e) {
// ...
}
}
}

}

然后,实现当 barrier 满足条件(即指定线程数调用了await(),也称为 barrier has been tripped)时运行的逻辑。

为了简单,我们仅仅将数字添加到结果列表中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class CyclicBarrierDemo {

// ...

class AggregatorThread implements Runnable {

@Override
public void run() {

String thisThreadName = Thread.currentThread().getName();

System.out.println(
thisThreadName + ": Computing sum of " + NUM_WORKERS
+ " workers, having " + NUM_PARTIAL_RESULTS + " results each.");
int sum = 0;

for (List<Integer> threadResult : partialResults) {
System.out.print("Adding ");
for (Integer partialResult : threadResult) {
System.out.print(partialResult+" ");
sum += partialResult;
}
System.out.println();
}
System.out.println(thisThreadName + ": Final result = " + sum);
}
}
}

最后一步构造 CyclicBarrier 并使用 main() 方法启动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class CyclicBarrierDemo {

// Previous code

public void runSimulation(int numWorkers, int numberOfPartialResults) {
NUM_PARTIAL_RESULTS = numberOfPartialResults;
NUM_WORKERS = numWorkers;

cyclicBarrier = new CyclicBarrier(NUM_WORKERS, new AggregatorThread());

System.out.println("Spawning " + NUM_WORKERS
+ " worker threads to compute "
+ NUM_PARTIAL_RESULTS + " partial results each");

for (int i = 0; i < NUM_WORKERS; i++) {
Thread worker = new Thread(new NumberCruncherThread());
worker.setName("Thread " + i);
worker.start();
}
}

public static void main(String[] args) {
CyclicBarrierDemo demo = new CyclicBarrierDemo();
demo.runSimulation(5, 3);
}
}

上述代码中,我们初始化 cyclic barrier,要求有 5 个线程,每个线程产生 3 个整数作为计算内容,并保存到同一个结果列表中。

一旦 barrier 条件满足,最后一个到达 barrier 的线程执行 AggregatorThread 中指定的逻辑,即 - 对各线程产生的整数求总和。

6 结果

这里是上面程序一次执行的输出 - 每次执行都可能产生不同的输出,因为线程会按不同顺序启动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Spawning 5 worker threads to compute 3 partial results each
Thread 0: Crunching some numbers! Final result - 6
Thread 0: Crunching some numbers! Final result - 2
Thread 0: Crunching some numbers! Final result - 2
Thread 0 waiting for others to reach barrier.
Thread 1: Crunching some numbers! Final result - 2
Thread 1: Crunching some numbers! Final result - 0
Thread 1: Crunching some numbers! Final result - 5
Thread 1 waiting for others to reach barrier.
Thread 3: Crunching some numbers! Final result - 6
Thread 3: Crunching some numbers! Final result - 4
Thread 3: Crunching some numbers! Final result - 0
Thread 3 waiting for others to reach barrier.
Thread 2: Crunching some numbers! Final result - 1
Thread 2: Crunching some numbers! Final result - 1
Thread 2: Crunching some numbers! Final result - 0
Thread 2 waiting for others to reach barrier.
Thread 4: Crunching some numbers! Final result - 9
Thread 4: Crunching some numbers! Final result - 3
Thread 4: Crunching some numbers! Final result - 5
Thread 4 waiting for others to reach barrier.
Thread 4: Computing final sum of 5 workers, having 3 results each.
Adding 6 2 2
Adding 2 0 5
Adding 6 4 0
Adding 1 1 0
Adding 9 3 5
Thread 4: Final result = 46

如上所示,Thread 4 最后到达 barrier 并执行求和逻辑。实际场景中,线程的执行顺序并不重要。

7 总结

这篇文章中,我们看到 CyclicBarrier 在哪些场景中非常有用。

我们也实现了一个场景,该场景需要固定数量的线程在执行完上一步逻辑后,要先在固定节点等待,然后一起开始执行下一步逻辑。


本文为译文,作者通过翻译达到学习目的。 原文链接 | 原文源码链接 | 本站源码链接