java并发系列-介绍Java Phaser

1 总览

这篇文章中,我们将会看看java.util.concurrent包中的Phaser的结构。 这个结构和协调线程执行的CountDownLatch非常相似。和CountDownLatch相比,它有一些额外的功能。

Phaser是一个关卡,数量不定的线程在继续执行前会等在那里。而在CountDownLatch中线程数量不能动态配置,需要在创建实例的时候就配置好。

2 Phaser API

Phaser让我们可以在线程执行下一步前,自定义等待关卡的逻辑。

我们可以协调多个执行 phases,为各个编程 phase 复用Phaser实例。各个 phase 可以有不同数量的线程等待前进到另外一个 phase。我们后面会看看使用多个 phases 的例子。

线程要想加入上述场景,需要用register()方法将自己注册到Phaser实例中。注意这只是增加了 registerd parties 的数量,并且我们确认不了当前线程是否注册-我们必须通过子类化实现来支持。

通过调用arriveAndAwaitAdvance()线程通知它已经到达关卡,关卡实际上是一个阻塞方法。当 arrived parties 的数量等于 registerd parties 的数量时,程序会继续执行,phase 序号将会增加。我们可以通过调用getPhase()方法获取当前的 phase 序号。

当线程完成任务,我们应该调用arriveAndDeregister()方法提示当前线程在这个特定 phase 不需要再被考虑。

3 使用 Phaser API 实现逻辑

设想一下我们想要协调多个 phases 的行为。三个线程将处理第一 phase,然后两个线程处理第二 phase。

首先创建一个实现 Runnable 接口的 LongRunningAction 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class LongRunningAction implements Runnable {
private String threadName;
private Phaser ph;

LongRunningAction(String threadName, Phaser ph) {
this.threadName = threadName;
this.ph = ph;
ph.register();
}

@Override
public void run() {
ph.arriveAndAwaitAdvance();
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
ph.arriveAndDeregister();
}
}

我们的 action 类实例化后,使用register()方法将它注册到Phaser*实例中。这将会提高使用那个 *Phaser 的线程数量。

调用arriveAndAwaitAdvance()方法会使当前线程在 barrier 处等待。如上所述,当 arrived parties 的数量等于 registered parties 的数量时,执行会接着进行。

处理结束后,当前线程调用arriveAndDeregister()方法注销(deregister)自己。

在我们创建的测试用例中,首先启动三个LongRunningAction线程并阻塞在关卡处。下一步,在当前动作完成后,我们将创建另外两个的LongRunningAction线程执行下一个 phase 的处理。

当创建来自主线程的Phaser实例时,我们传递参数 1 。这等同于在当前线程中调用register()方法。我们为什么这样做,因为当我们创建三个 worker 线程时,主线程是协调者,所以Phaser需要这四个线程都注册。

1
2
3
4
ExecutorService executorService = Executors.newCachedThreadPool();
Phaser ph = new Phaser(1);

assertEquals(0, ph.getPhase());

初始化后的 phase 序号 等于 0 ;

Phaser 类有一个构造方法,可以传入父实例,对于大量的 parties 的场景很有用。这种场景会存在大量同步竞争开销,Phasers 的实例可能被创建,这样 sub-phasers 组可以共享父实例。

下面,我们启动三个LongRunningAction action 线程,它们会在关卡处等待,直到我们在主线程中调用arriveAndAwaitAdvance()方法。

记住我们初始化Phaser时传入了 1 并调用了三次 register()。现在,三个 action 线程已经到达关卡,所以需要再调用一次arriveAndAwaitAdvance(),在主线程中调用。

1
2
3
4
5
6
7
executorService.submit(new LongRunningAction("thread-1", ph));
executorService.submit(new LongRunningAction("thread-2", ph));
executorService.submit(new LongRunningAction("thread-3", ph));

ph.arriveAndAwaitAdvance();

assertEquals(1, ph.getPhase());

在上面的 phase 完成后,因为程序已经完成了第一步执行,所以getPhase() 将会返回 1。

接下来我们想要两个线程进行第二 phase 的处理。我们可以利用 Phaser 来实现动态配置等待在关卡上的线程数。我们启动两个新线程,它们会在主线程调用 arriveAndAwaitAdvance() 之后开始执行(和上面示例一样):

1
2
3
4
5
6
7
executorService.submit(new LongRunningAction("thread-4", ph));
executorService.submit(new LongRunningAction("thread-5", ph));
ph.arriveAndAwaitAdvance();

assertEquals(2, ph.getPhase());

ph.arriveAndDeregister();

上面处理完,getPhase() 会返回 phase 序号 2。当我们想结束程序运行,由于主线程还注册在Phaser中,我们需要调用arriveAndDeregister()方法。当注销操作让 registered parties 的数目变为 0 后,Phaser就会终止。所有对同步方法的调用将不再阻塞并立马返回。

返回程序将打印如下输出(打印内容的完整内容在代码库中):

我看到所有线程都等待执行到关卡打开。执行中的下个 phase 只在上一个 phase 完成后进行。

1
2
3
4
5
6
7
8
9
10
This is phase 0
This is phase 0
This is phase 0
Thread thread-2 before long running action
Thread thread-1 before long running action
Thread thread-3 before long running action
This is phase 1
This is phase 1
Thread thread-4 before long running action
Thread thread-5 before long running action

4 总结

这篇教程,我们学习了下来自 java.util.concurrentPhaser结构,并使用Phaser类实现了多 phases 的协同逻辑。


本文为译文,作者通过翻译达到学习目的。 原文链接 | 原文源码链接 | 本站源码链接