1 介绍

哲学家就餐难题是用于描述多线程环境同步议题和解释如何解决该问题的技术的经典难题之一。Dijkstra 第一个提出了该难题并将其用于表达计算机访问磁带机外围设备的场景中。

现在的难题表述由 Tony Hoare 给出,他发明了快排算法。这篇文章中,我们分析这个著名的难题并编程解决。

2 哲学家就餐难题

哲学家就餐难题

上图展示了这个难题。五个安静的哲学家(P1-P5)围坐在一张圆桌前,只做吃饭和思考两件事。

他们共享五个叉子(1-5),为了吃饭,一个哲学家需要两只手同时都握着叉子。吃完饭,他将两个叉子都放下。另一个哲学家可以拿取进行就餐。

The goal is to come up with a scheme/protocol that helps the philosophers achieve their goal of eating and thinking without getting starved to death.

目标是找到一个方案帮助哲学家在吃饭和思考时不会因为拿不到刀叉而活活饿死。

3 解决方案

第一个方案是让各个哲学家遵循下面的约定:

1
2
3
4
5
6
7
8
9
10
11
12
13
while(true) { 
// Initially, thinking about life, universe, and everything
think();

// Take a break from thinking, hungry now
pick_up_left_fork();
pick_up_right_fork();
eat();
put_down_right_fork();
put_down_left_fork();

// Not hungry anymore. Back to thinking!
}

如上面伪代码(pseudo code)所述,各个哲学家初始处于思考状态。在一定时间后,哲学家感到饥饿并渴望吃饭。

这时,他尝试拿取放于两边的叉子,一旦拿到这两个叉子,他就开始吃饭。吃饭一旦结束,他就放下叉子,他盘边的哲学家就可以拿取这些叉子。

4 实现

我们将哲学家定义成类,该类实现了 Runnable 接口,这样我们可以将他们作为线程执行。各个 Philosopher 类可以访问他左右两边的 fork:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Philosopher implements Runnable {

// The forks on either side of this Philosopher
private Object leftFork;
private Object rightFork;

public Philosopher(Object leftFork, Object rightFork) {
this.leftFork = leftFork;
this.rightFork = rightFork;
}

@Override
public void run() {
// Yet to populate this method
}

}

我们也构造了 Philosopher 采取行动的方法 - 吃饭,思考,或者准备吃饭时拿去叉子:

1
2
3
4
5
6
7
8
9
10
11
12
public class Philosopher implements Runnable {

// Member variables, standard constructor

private void doAction(String action) throws InterruptedException {
System.out.println(
Thread.currentThread().getName() + " " + action);
Thread.sleep(((int) (Math.random() * 100)));
}

// Rest of the methods written earlier
}

如上述代码所示,各个动作通过在调用的线程中等待随机时间来模拟,这样执行顺序就不会单单受限于时间。

现在,我们实现 Philosopher 的核心逻辑。

为了模拟获取叉子,我们要锁定叉子以免同时有两个哲学家拿到它。

为了实现这个效果,我们使用关键字 synchronized 获取 fork 对象的内部监视器同时也避免了其他线程执行了相同操作。Java 中关键字 synchronized 的介绍查看这里。我们接着实现 Philosopher 类中的 run() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class Philosopher implements Runnable {

// Member variables, methods defined earlier

@Override
public void run() {
try {
while (true) {

// thinking
doAction(System.nanoTime() + ": Thinking");
synchronized (leftFork) {
doAction(
System.nanoTime()
+ ": Picked up left fork");
synchronized (rightFork) {
// eating
doAction(
System.nanoTime()
+ ": Picked up right fork - eating");

doAction(
System.nanoTime()
+ ": Put down right fork");
}

// Back to thinking
doAction(
System.nanoTime()
+ ": Put down left fork. Back to thinking");
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}

这个方法准确实现了之前描述的方案:Philosopher 思考了一会儿然后决定吃饭。之后,他拿到他左右两侧的叉子并开始吃饭。吃完饭,他放下叉子。我们也在各个行为里添加了时间戳,这可以帮助我们更好理解事情发生的顺序。

为了将整个流程跑起来,我们写了一个客户端,创建了 5 个 Philosopher 线程并将他们跑起来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class DiningPhilosophers {

public static void main(String[] args) throws Exception {

Philosopher[] philosophers = new Philosopher[5];
Object[] forks = new Object[philosophers.length];

for (int i = 0; i < forks.length; i++) {
forks[i] = new Object();
}

for (int i = 0; i < philosophers.length; i++) {
Object leftFork = forks[i];
Object rightFork = forks[(i + 1) % forks.length];

philosophers[i] = new Philosopher(leftFork, rightFork);

Thread t
= new Thread(philosophers[i], "Philosopher " + (i + 1));
t.start();
}
}
}

我们将 fork 定义成 Java 对象,并生成哲学家所需数量的 forks。我们将哲学家尝试使用关键字 synchronized锁定的左右两边的叉子交给他们。

执行这个代码会产生如下类似的输出。你的输出很可能与下面不同,因为 sleep() 方法调用时间间隔会变化:

1
2
3
4
5
6
7
8
9
10
11
12
13
Philosopher 1 8038014601251: Thinking
Philosopher 2 8038014828862: Thinking
Philosopher 3 8038015066722: Thinking
Philosopher 4 8038015284511: Thinking
Philosopher 5 8038015468564: Thinking
Philosopher 1 8038016857288: Picked up left fork
Philosopher 1 8038022332758: Picked up right fork - eating
Philosopher 3 8038028886069: Picked up left fork
Philosopher 4 8038063952219: Picked up left fork
Philosopher 1 8038067505168: Put down right fork
Philosopher 2 8038089505264: Picked up left fork
Philosopher 1 8038089505264: Put down left fork. Back to thinking
Philosopher 5 8038111040317: Picked up left fork

所有的哲学家刚开始都处于思考状态,Philosopher 1 开始拿起左右两边的叉子,然后吃饭接着放下叉子。之后,Philosopher 5 拿起叉子。

5 解决方案的问题:死锁

虽然上面的方案看起来没问题,但是可能会出现死锁。

A deadlock is a situation where the progress of a system is halted as each process is waiting to acquire a resource held by some other process.

死锁是由于各个线程互相等待其他线程的资源而导致系统无法进行下去的情况。

我们通过多次执行上述代码可以发现代码确实挂在那里进行不下去。下面是上述问题的一个类似输出:

1
2
3
4
5
6
7
8
9
10
Philosopher 1 8487540546530: Thinking
Philosopher 2 8487542012975: Thinking
Philosopher 3 8487543057508: Thinking
Philosopher 4 8487543318428: Thinking
Philosopher 5 8487544590144: Thinking
Philosopher 3 8487589069046: Picked up left fork
Philosopher 1 8487596641267: Picked up left fork
Philosopher 5 8487597646086: Picked up left fork
Philosopher 4 8487617680958: Picked up left fork
Philosopher 2 8487631148853: Picked up left fork

这个场景中,各个 Philosophers 拿到了他们左手边的叉子,但是拿不到他们右手边的叉子,因此右手边的叉子已经被旁边的哲学家拿走。这种场景称为循环等待(circular wait),是造成死锁的情况之一。

6 解决死锁

如上所见,死锁的主要原因是发生循环等待的情况,各个线程等待被其他线程持有的资源。因此,为了防止死锁,我们需要确保不会发生循环等待的情况。有好几种方式可以解决这个问题,下面描述了最简单的一种:

All Philosophers reach for their left fork first, except one who first reaches for his right fork.

所有的哲学家首先伸手去拿他们左手边的叉子,除了一个哲学家首先伸手去拿他右边的叉子。

我们稍微对之前的代码进行调整来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class DiningPhilosophers {

public static void main(String[] args) throws Exception {

final Philosopher[] philosophers = new Philosopher[5];
Object[] forks = new Object[philosophers.length];

for (int i = 0; i < forks.length; i++) {
forks[i] = new Object();
}

for (int i = 0; i < philosophers.length; i++) {
Object leftFork = forks[i];
Object rightFork = forks[(i + 1) % forks.length];

if (i == philosophers.length - 1) {

// The last philosopher picks up the right fork first
philosophers[i] = new Philosopher(rightFork, leftFork);
} else {
philosophers[i] = new Philosopher(leftFork, rightFork);
}

Thread t
= new Thread(philosophers[i], "Philosopher " + (i + 1));
t.start();
}
}
}

在上面代码的 17-19 行做了修改,让最后一个哲学家首先去拿右边的叉子而不是左边。这破坏了循环等待而产生不了死锁。

下面的输出展示了所有哲学家获得思考和吃饭的机会而不产生死锁的情况之一:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Philosopher 1 88519839556188: Thinking
Philosopher 2 88519840186495: Thinking
Philosopher 3 88519840647695: Thinking
Philosopher 4 88519840870182: Thinking
Philosopher 5 88519840956443: Thinking
Philosopher 3 88519864404195: Picked up left fork
Philosopher 5 88519871990082: Picked up left fork
Philosopher 4 88519874059504: Picked up left fork
Philosopher 5 88519876989405: Picked up right fork - eating
Philosopher 2 88519935045524: Picked up left fork
Philosopher 5 88519951109805: Put down right fork
Philosopher 4 88519997119634: Picked up right fork - eating
Philosopher 5 88519997113229: Put down left fork. Back to thinking
Philosopher 5 88520011135846: Thinking
Philosopher 1 88520011129013: Picked up left fork
Philosopher 4 88520028194269: Put down right fork
Philosopher 4 88520057160194: Put down left fork. Back to thinking
Philosopher 3 88520067162257: Picked up right fork - eating
Philosopher 4 88520067158414: Thinking
Philosopher 3 88520160247801: Put down right fork
Philosopher 4 88520249049308: Picked up left fork
Philosopher 3 88520249119769: Put down left fork. Back to thinking

这可以通过执行多次来验证,系统再也不会发生之前发生的死锁的情况。

7 总结

这篇文章,我们研究了著名的哲学家就餐难题以及循环等待和死锁的概念。我们编写了一个简单的方案触发了死锁,然后通过简单的修改破坏了循环等待而避免了死锁。这只是一个开始,其实还有很多精巧的解决方案。


本文为译文,作者通过翻译达到学习目的。 原文链接 | 原文源码链接 | 本站源码链接


1 介绍

Java 程序在操作中加入延迟或者暂停比较常见。这有助于任务调整节奏或者暂停执行直到另一个任务完成。

这篇教程描述 Java 中两种延迟的实现方式。

2 基于线程的方法

当 Java 程序运行时,它会在宿主机上创建一个进程。这个进程包含至少一个线程,即主线程,程序跑在里面。而且,Java 支持 multithreading,应用可以创建新的线程,与主线程并发执行,或者异步。

2.1 使用 Thread.sleep

Java 中快速和粗糙的方式去暂停是让当前线程 sleep 指定时间。这可以通过使用 Thread.sleep(milliseconds) 来做。

1
2
3
4
5
try {
Thread.sleep(secondsToSleep * 1000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}

比较好的实践是将 sleep 封装在 try/catch 块中以防止另外一个线程中断 sleeping 的线程。这个案例中,我们捕获 InterruptedException 并显式地中断了当前线程,因此它后续可以被捕获和处理。这在多线程程序中更重要,但是在单线程程序中仍然不失为一个好的实践以免后续我们增加了其他线程。

2.2 使用 TimeUnit.sleep

为了更好的可读性,我们使用 TimeUnit.XXX.sleep(y)*,其中 *XXX 表示 sleep 的时间单位(SECONDS,MINUTES等等),y 是 sleep 的时间长度。下面是 TimeUnit 语法的使用例子:

1
2
3
4
5
try {
TimeUnit.SECONDS.sleep(secondsToSleep);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}

然而,这些基于线程的方法有一些缺点:

  • sleep 时间实际上不准确,特别是使用更小的时间单位的时候,比如 milliseconds 和 nanosecondes。
  • 当在循环中使用时,sleep 会在循环迭代中因此其他代码执行而略微偏移,因此执行时间在多次迭代后会不准确。

3 基于 ExecutorService 的方法

Java 提供了 ScheduledExecutorService 接口类,它是更加健壮和精确的方案。这个接口能够调度代码在指定延迟后执行一次或者按固定时间间隔执行。

每隔一定延迟执行一次代码,使用 schedule 方法:

1
2
3
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

executorService.schedule(Classname::someTask, delayInSeconds, TimeUnit.SECONDS);

Classname::someTask 部分用于指定在具体延迟后要执行的方法:

  • someTask 是要执行的方法的名字
  • Classname 是含有 someTask 方法的类名

每个固定时间间隔运行任务,使用 scheduleAtFixedRate 方法:

1
2
3
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

executorService.scheduleAtFixedRate(Classname::someTask, 0, delayInSeconds, TimeUnit.SECONDS);

这将会重复调用 someTask 方法,每次调用之间都会停顿 delayInSeconds

除了支持更多时间相关选项外,ScheduledExecutorService 方法避免了漂移问题,所以可以实现更加准确的时间间隔。

4 总结

这篇文章,我们讨论了 Java 程序中创建延迟的两个方法。


本文为译文,作者通过翻译达到学习目的。 原文链接 | 原文源码链接 | 本站源码链接


1 概览

Java 支持多线程开箱即用。这意味着通过在分隔的工作者线程中并发执行字节码有能力改善应用的性能。

虽然多线程的功能非常强大,但是它也有成本。在多线程环境中,我们需要以线程安全的方式实现。这意味着不同的线程可以访问相同的资源而不用暴露错误的行文或者产生不可预测的结果。

这个编程方法就被称为线程安全。

这篇教程,我们将采用不同的方式来实现它。

2 无状态实现

大多数场景中,多线程应用的错误是线程间分享状态的错误方式引起。

因此,第一种方式是实现线程安全的方式是使用无状态实现。

为了更好地理解这种方式,我们来看看下面的一个简单的功能类,它有一个静态方法用于计算素数:

1
2
3
4
5
6
7
8
9
10
public class MathUtils {

public static BigInteger factorial(int number) {
BigInteger f = new BigInteger("1");
for (int i = 2; i <= number; i++) {
f = f.multiply(BigInteger.valueOf(i));
}
return f;
}
}

factorial() 方法是无状态确定性函数(deterministic function)。给定值,总是返回相同的结果。

该方法不依赖外部状态也不维护状态。因此,它被认为是线程安全并可以安全地被多个线程同时调用。

所有线程可以安全地调用 factorial() 方法并获得期望结果,而不会干扰其他线程并且该方法的输出不会改变。

3 不可变实现

如果我们需要在线程间共享状态,则我们可以让创建的类不可变来保证线程安全。

不可变性是非常强大的,与语言无关的概念,它在 Java 中实现非常简单。

简单地讲,一个类实例在创建后,当它内部状态不能修改时,就是不可变的。

Java 中 创建不可变类最简单的方式是将所有字段声明为 privatefinal 并且不提供 setters 方法:

1
2
3
4
5
6
7
8
9
10
11
public class MessageService {

private final String message;

public MessageService(String message) {
this.message = message;
}

// standard getter

}

MessageService 对象是不可变的,当它创建后状态是不可修改的。因此,它是线程安全的。

再者,如果 MessageService 是可变的,但是线程只能以只读方式访问它,那么也是线程安全的。

因此,不可变性仅仅是另一种实现线程安全的方式。

4 Thread-Local fields

在面向对象编程中(OOP),对象实际上需要通过字段维护状态并通过方法实现行为。

如果我们实际上需要维持状态,我们可以创建类时让它的字段是 thread-local 的,使得线程间不分享状态,来达到线程安全。

我们可以简单创建这样的类,它的字段在 Thread 子类中简单地定义成 private 的来实现 thread-local。

例如,一个 Thread 子类保存了一个整型列表:

1
2
3
4
5
6
7
8
9
public class ThreadA extends Thread {

private final List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);

@Override
public void run() {
numbers.forEach(System.out::println);
}
}

而另外一个保存了一个字符串列表:

1
2
3
4
5
6
7
8
9
public class ThreadB extends Thread {

private final List<String> letters = Arrays.asList("a", "b", "c", "d", "e", "f");

@Override
public void run() {
letters.forEach(System.out::println);
}
}

两个实现中的类都有他们自己的状态,但是没有和其它线程分享。因此,这些类是线程安全的。

相似地,我们可以创建 ThreadLocal 类型的字段。

例如,下面的 StateHolder 类:

1
2
3
4
5
6
public class StateHolder {

private final String state;

// standard constructors / getter
}

我们简单地将它创建成为 thread-local 变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class ThreadState {

public static final ThreadLocal<StateHolder> statePerThread = new ThreadLocal<StateHolder>() {

@Override
protected StateHolder initialValue() {
return new StateHolder("active");
}
};

public static StateHolder getState() {
return statePerThread.get();
}
}

Thread-local 字段和普通的类字段很像,除了各个线程通过 setter/getter 访问到都是这个字段的单独初始化副本使得各个线程有它自己的状态。

5 同步集合

通过 collections framework 中的同步封装类集合,我们可以创建线程安全的 Collections。

示例如下:

1
2
3
4
5
Collection<Integer> syncCollection = Collections.synchronizedCollection(new ArrayList<>());
Thread thread1 = new Thread(() -> syncCollection.addAll(Arrays.asList(1, 2, 3, 4, 5, 6)));
Thread thread2 = new Thread(() -> syncCollection.addAll(Arrays.asList(7, 8, 9, 10, 11, 12)));
thread1.start();
thread2.start();

我们需要知道同步 Collections 在各个方法中使用内在锁(稍后看内在锁 intrinsic locking)。

这意味着这些方法同时只能由一个线程访问,当方法被第一个线程锁定时,其他线程会被阻塞。

因此,由于同步访问的潜在逻辑,同步过程会对性能有一定损耗。

6 并发集合

作为同步集合替代,我们可以使用并发集合创建线程安全集合。

Java 提供了 java.util.concurrent 包,它含有若干并发集合实现,比如 ConcurrentHashMap

1
2
3
4
Map<String,String> concurrentMap = new ConcurrentHashMap<>();
concurrentMap.put("1", "one");
concurrentMap.put("2", "two");
concurrentMap.put("3", "three");

不像他们对应的同步实现,并发集合通过将他们的数据分割成片来实现线程安全。在 ConcurrentHashMap 中,线程可以在不同的 map 片上获得锁,所以多个线程可以同时访问 Map

并发集合的性能比同步集合要高得多,因此并发线程访问的内在优势。

值得提到的是同步和并发集合只是保证集合自身线程安全而不是内容。

7 原子对象

通过使用原子类(atomic classes)也能实现线程安全,包括 AtomicIntegerAtomicLongAtomicBooleanAtomicReference

原子类支持我们执行原子操作,它们是线程安全的,而不需使用同步。一个原子操作在单个机器级别操作中执行。

为了理解这种问题的解决方式,我们来看看下面的 Counter 类:

1
2
3
4
5
6
7
8
9
10
11
12
public class Counter {

private int counter = 0;

public void incrementCounter() {
counter += 1;
}

public int getCounter() {
return counter;
}
}

假设在竞争条件下,两个线程同时访问 incrementCounter() 方法。

理论上,counter 字段的 final 值将会是 2 。但是我们对结果不确定,因为线程同时执行同一个代码块,增量操作不是原子的。

让我们通过使用 AtomicInteger 对象创建一个 Counter 类的线程安全的实现:

1
2
3
4
5
6
7
8
9
10
11
12
public class AtomicCounter {

private final AtomicInteger counter = new AtomicInteger();

public void incrementCounter() {
counter.incrementAndGet();
}

public int getCounter() {
return counter.get();
}
}

为什么这个是线程安全的,因为 ++ 增量操作不是一个动作,而 incrementAndGet 是原子的。

8 同步方法

上面的方法对于集合和原始类型表现很好,而我们也常常需要比这个范围更大的控制。

因此,另外一个常见的实现线程安全的方式是实现同步方法。

简单地讲,只有一个线程可以同时访问某个同步方法,而其他访问该方法的线程会被阻塞,直到第一个线程执行结束或者抛异常。

我们现在通过同步方法来实现线程安全的 incrementCounter() 方法。

1
2
3
public synchronized void incrementCounter() {
counter += 1;
}

同步方法要在方法前面的前面加上 synchronized 关键字。

因为在某个时刻只有一个线程可以访问和执行 incrementCounter() 的同步方法,接着,其他线程也按照同样的方式进行。这样执行就不会出现重叠的问题。

同步方法依赖内在锁或者监督器锁的使用。内在锁是一个隐含的内部实体关联一个特定的类实例。

多线程上下文中,monitor 术语是一个角色的引用,该角色实现相关对象的锁在一组具体的方法或者声明上实行排他性访问。

当一个线程调用同步方法,它要获得内在锁。在该线程执行完方法,释放掉锁,才允许其他线程获取锁并访问方法。

我们可以在实例方法,静态方法和声明(同步声明)上实现同步。

9 同步声明

有时候,同步整个方法可能范围太大,因此我们只需要保证方法的部分代码是线程安全的。、

为了演示这样的场景,我们重构 incrementCounter() 方法:

1
2
3
4
5
6
public void incrementCounter() {
// additional unsynced operations
synchronized(this) {
counter += 1;
}
}

这个小例子展示了怎样创建同步声明。假设该方法执行加法操作,没有做同步。我们通过将操作包在同步代码块中实现相关状态修改部分的同步。

和同步方法不同,同步声明必须指定 object 提供内在锁,通常使用 this 引用。

同步代价很昂贵,通过这种方式,我们可以只同步一个方法的相关部分代码。

10 Volatile Fields

同步方法或代码块可以方便处理线程间的变量可见性问题。尽快如此,通常类字段的值可能被 CPU 缓存。因此,对特定字段持续更新,即使他们是同步的,也可能被其他线程不可见。

为了防止这种场景,我们使用 volatile 修饰类字段:

1
2
3
4
5
6
7
public class Counter {

private volatile int counter;

// standard constructors / getter

}

通过 volatile 关键字,我们指示 JVM 和编译器将变量 counter 保存到主内存区。通过这种方式,我们确保 JVM 每次读 counter 变量的值,都是从主内存区读取,而不是 CPU 缓存。同样地,JVM 每次写 counter 变量,也会写到主内存区。

更多地,使用 volatile 变量也会使得对给定线程可见的所有线程也都从主内存区读取。

思考下下面的例子:

1
2
3
4
5
6
7
8
public class User {

private String name;
private volatile int age;

// standard constructors / getters

}

这个例子中,每次 JVM 写 age volatile 到主内存区时,它也会将 非 volatile 的 name 变量写到主内存区中。这保证了两个变量的最新值都保存在主内存区中,因此对变量的连续更新也被其他线程可见。

简单地讲,如果一个线程读取一个 volatile 变量的值,对该线程可见的所有变量也将从主内存区读取。

volatile 变量提供的这种扩展保证称为 full volatile visibility guarantee

11 Extrinsic Locking

我们可以稍微增强 Counter 类的线程安全实现通过使用外部监控器锁代替内在锁。

外部锁在多线程环境中也提供了对共享资源的协调访问,但是它使用外部实体实施对资源的排他性访问:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class ExtrinsicLockCounter {

private int counter = 0;
private final Object lock = new Object();

public void incrementCounter() {
synchronized(lock) {
counter += 1;
}
}

// standard getter

}

我们使用一个普通的 Object 实例创建一个外部锁。这种实现更好,因此它在锁级别更安全。

使用内在锁,同步方法和代码块依赖于 this 引用,攻击者可以通过请求内在锁引发死锁并触发服务拒绝(DoS)的情况。

与内在锁不同,外部锁使用一个私有实体,不能被外部访问。这使得攻击者请求锁并引发死锁变得困难。

12 Reentrant Locks

Java 提供了一组 Lock 实现的增强集合,它们的行为比上面提供的内在锁更精巧。

使用内在锁,锁获取模型非常严苛:一个线程获取锁,然后执行方法或者代码块,最后释放锁,然后其他线程才能获取锁并访问方法。

没有潜在的机制检查入队的线程并对等待最久的线程给予优先访问的权利。

ReentrantLock 实例可以实现上面的情况,使得入队线程避免经历 resource starvation 的情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ReentrantLockCounter {

private int counter;
private final ReentrantLock reLock = new ReentrantLock(true);

public void incrementCounter() {
reLock.lock();
try {
counter += 1;
} finally {
reLock.unlock();
}
}

// standard constructors / getter

}

ReentrantLock 构造方法接受一个可选 faimess boolean 参数。当设置为 true 时,多个线程尝试请求锁时, JVM 给予等待最久的线程优先获取锁的权利。

13 读写锁

另外一个用于线程安全的强大方法是使用 ReadWriteLock 实现。

ReadWriteLock 锁实际上使用一对相关锁,一个用于只读操作,另一个用于写操作。

于是,当没有线程写它的时候,所有线程都可以读取资源。进一步讲,写资源的线程将阻止其他线程读取它。

我们可以使用 ReadWriteLock 锁如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ReentrantReadWriteLockCounter {

private int counter;
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();

public void incrementCounter() {
writeLock.lock();
try {
counter += 1;
} finally {
writeLock.unlock();
}
}

public int getCounter() {
readLock.lock();
try {
return counter;
} finally {
readLock.unlock();
}
}

// standard constructors

}

14 总结

这篇文章中,我们学习了 Java 中的线程安全并深入研究了不同方法怎样实现线程安全。


本文为译文,作者通过翻译达到学习目的。 原文链接 | 原文源码链接 | 本站源码链接


1 介绍

这篇文章,我们通过比较 CyclicBarrierCountDownLatch 来理解它们之间的相同点和不同点。

2 这些是什么?

当与并发有关时,怎样概念化它们试图要完成的事情是个挑战。

最重要的一点是,CyclicBarrierCountDownLatch 都用于管理多线程应用。并且,它们都试图表达一个给定线程或者线程组应该怎样等待。

2.1 CountDownLatch

CountDownLatch 结构,一个线程会在 latch 上等待,直到其他线程在 latch 上倒数并到达 0。

我们可以把这想象成餐厅正在准备的菜肴。不管哪个厨师来做,服务员都要等待餐盘装满 n 份菜肴为止。如果餐盘要装 n 菜肴,任何厨师在他放一个菜肴在餐盘时都要在 latch 上 count down

2.2 CyclicBarrier

CyclicBarrier 是可重用的结构,一组线程一起等待直到所有线程都到达。都到达后,barrier 就会被打破并执行指定动作。

我们可以把这想象成一组朋友。每次他们计划在餐馆聚餐,他们都要商量一个碰头的地方。他们在那里等待,直到所有人都到了才会一起去餐馆。

2.3 更进一步阅读

更多细节,请阅读之前的教程 CountDownLatchCyclicBarrier

3. Tasks vs. Threads

让我们深入挖掘下这两个类语义上的不同。

如定义所述,CyclicBarrier 支持指定数目的线程互相等待, 而 CountDownLatch 支持一个或者多个线程等待指定数目的任务完成。

简单地讲,CyclicBarrier 维护指定数据的线程而 CountDownLatch 维护指定数目的任务。

在下面代码中,我们定义一个 CountDownLatch 以及计数为 2 。然后,我们在一个线程中调用 countDown() 两次:

1
2
3
4
5
6
7
8
9
CountDownLatch countDownLatch = new CountDownLatch(2);
Thread t = new Thread(() -> {
countDownLatch.countDown();
countDownLatch.countDown();
});
t.start();
countDownLatch.await();

assertEquals(0, countDownLatch.getCount());

一旦 latch 到达 0,对 await() 的调用会返回。

注意这里,我们使同一个线程对 count 减少了两次。

CyclicBarrier,在这一点上,完全不同。

和上面例子类似,我们创建一个 CyclicBarrier*,再次传入 count 为 2,并在同一线程中调用 *await() 两次:

1
2
3
4
5
6
7
8
9
10
11
12
13
CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
Thread t = new Thread(() -> {
try {
cyclicBarrier.await();
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
// error handling
}
});
t.start();

assertEquals(1, cyclicBarrier.getNumberWaiting());
assertFalse(cyclicBarrier.isBroken());

这里的第一个区别是,等待的线程本身就是障碍(barrier)。

第二个区别是,第二个 await() 没有用。当个线程不能 count down 一个 barrier 两次。

事实上,因为 t 必须等待另外的线程调用 await() - 使 count 到达 2 - t 对 await() 的第二次调用实际上不会触发直到 barrier 被打破。

在我们的测试汇总,barrier 没有跨过去,因为我们只有一个线程等待而想要 barrier 被打破需要两个线程。这也可以通过 cyclicBarrier.isBroken() 方法证明,因为它返回 false

4. 可重用性

这两个类最显而易见的区别是可重用性。当 CyclicBarrier 中的 barrier 被打破,count 会重置为原始值。CountDownLatch 的 count 永远不会重置。

下面代码,我们定义一个 CountDownLatch 和 count 为 7,然后 20 个不同线程调用 count down:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CountDownLatch countDownLatch = new CountDownLatch(7);
ExecutorService es = Executors.newFixedThreadPool(20);
for (int i = 0; i < 20; i++) {
es.execute(() -> {
long prevValue = countDownLatch.getCount();
countDownLatch.countDown();
if (countDownLatch.getCount() != prevValue) {
outputScraper.add("Count Updated");
}
});
}
es.shutdown();

assertTrue(outputScraper.size() <= 7);

我们观察到即使 20 个不同线程调用 countDown(),count 到达 0 后也不会重置。

类似的,我们定义一个 CyclicBarrier 和 count 为 7,然后 20 个不同线程等待:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CyclicBarrier cyclicBarrier = new CyclicBarrier(7);

ExecutorService es = Executors.newFixedThreadPool(20);
for (int i = 0; i < 20; i++) {
es.execute(() -> {
try {
if (cyclicBarrier.getNumberWaiting() <= 0) {
outputScraper.add("Count Updated");
}
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
// error handling
}
});
}
es.shutdown();

assertTrue(outputScraper.size() > 7);

这里,我们观察到每次一个新线程运行,count就会减一,一旦它到达 0,就会重置为原始值。

5. 总结

总的来说, CountDownLatchCyclicBarrier 对于多线程同步都很有帮助。然而,他们提供的功能有根本性的不同。当使用它们处理任务时,要小心选择哪个更合适。


本文为译文,作者通过翻译达到学习目的。 原文链接 | 原文源码链接 | 本站源码链接


1 概览

生成随机数是常见任务,所以 Java 提供了 java.util.Random 类。

然而,这个类在多线程环境中表现并不好。

简单地讲,Random 在多线程环境中性能差的原因在于竞争-多个线程共享同一个 Random 实例。

为了摆脱这个限制,Java 在 JDK 7 中引入了 java.util.concurrent.ThreadLocalRandom 类 - 用于在多线程环境中生成随机数。

下面让我们看看如何在实际应用中使用它。

2 ThreadLocalRandom 超越 Random

ThreadLocalRandom 结合了 ThreadLocalRandom 类,会被隔离在当前线程中。因此,通过简单地避免并发访问 Random 对象,在多线程环境中,它就可以获得更好的性能。

一个线程获取随机数不受其它线程影像,而 java.util.Random 提供的随机数是全局。

同时,和 Random 不同的是,ThreadLocalRandom 不支持设置种子(seed)。它覆盖了继承自RandomsetSeed(long seed) 方法,调用该方法时总是会抛 UnsupportedOperationException

现在我们做一些产生随机 intlongdouble 数的实践。

3 使用 ThreadLocalRandom 生成随机数

根据 Oracle 文档,我们仅需调用 ThreadLocalRandom.current() 方法,就可以拿到当前线程的 ThreadLocalRandom 实例。我们可以通过调用该实例提供的方法生成随机数。

生成随机 int 值,不指定范围:

1
int unboundedRandomValue = ThreadLocalRandom.current().nextInt());

下面,生成指定范围的随机 int 值,该值在给定的最小和最大值之间。

这里的例子生成一个介于 0 和 100 之间的随机 int 值:

1
int boundedRandomValue = ThreadLocalRandom.current().nextInt(0, 100);

注意,这里 0 下限是包含的,而 100 上限是不包含的。

我们也可以像上述例子一样调用 nextLong()nextDouble() 方法生成随机 long 值 和 double 值。

Java 8 添加了 nextGaussian() 方法生成正态分布值(normally-distributed),该值来自平均值(mean)为 0.0,标准差为 1.0 的生成序列。

Random 类一样,我们也能使用 doubles()ints()longs() 方法生成随机数流。

4 使用 JMH 比较 ThreadLocalRandomRandom

让我们看看怎样使用这两个类在多线程环境中生成随机数,然后使用 JMH 比较它们的性能。

首先,创建一个例子,所有的线程共享一个 Random 实例。这里,我们提交一个使用 Random 实例生成随机数的任务给 ExecutorService

1
2
3
4
5
6
7
8
9
ExecutorService executor = Executors.newWorkStealingPool();
List<Callable<Integer>> callables = new ArrayList<>();
Random random = new Random();
for (int i = 0; i < 1000; i++) {
callables.add(() -> {
return random.nextInt();
});
}
executor.invokeAll(callables);

使用 JMH 基准分析法(benchmaking)检测上述代码的性能:

1
2
3
# Run complete. Total time: 00:00:36
Benchmark Mode Cnt Score Error Units
ThreadLocalRandomBenchMarker.randomValuesUsingRandom avgt 20 771.613 ± 222.220 us/op

类似地,现在我们使用 ThreadLocalRandom 代替 Random 实例,线程池中的各个线程使用一个 ThreadLocalRandom 实例:

1
2
3
4
5
6
7
8
ExecutorService executor = Executors.newWorkStealingPool();
List<Callable<Integer>> callables = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
callables.add(() -> {
return ThreadLocalRandom.current().nextInt();
});
}
executor.invokeAll(callables);

这里是使用 ThreadLocalRandom 的结果:

1
2
3
# Run complete. Total time: 00:00:36
Benchmark Mode Cnt Score Error Units
ThreadLocalRandomBenchMarker.randomValuesUsingThreadLocalRandom avgt 20 624.911 ± 113.268 us/op

最后,比较上面 RandomThreadLocalRandom 的 JMH 结果,使用 Random 生成 1000 个随机数的平均时间是 772 微秒,而使用 ThreadLocalRandom 大约为 625 微秒。

因此,我们认为 ThreadLocalRandom 在高并发环境中更高效。

关于 JMH 的更多信息,查看这里

5 总结

这篇文章解释了 java.util.Randomjava.util.concurrent.ThreadLocalRandom 的不同点。

我们也看到了在多线程环境中 ThreadLocalRandomRandom 更先进和性能更高,以及我们如何使用 ThreadLocalRandom 生成随机数。

ThreadLocalRandom 是 JDK 的一个简单添加的功能,但是在高并发应用中有显著作用。


本文为译文,作者通过翻译达到学习目的。 原文链接 | 原文源码链接 | 本站源码链接


1 介绍

CyclicBarriers 是 Java 5 中引入的同步构造(synchronization constructs),java.util.concurrent package 的一部分。

这篇文章,我们探索一下在并发场景中如何使用该类。

2 Java 并发 - Synchronizers

java.util.concurrent 包含有若干个类,用于帮助管理线程之间的协作。其中部分包括:

  • CyclicBarrier
  • Phaser
  • CountDownLatch
  • Exchanger
  • Semaphore
  • SynchronousQueue

这些类为线程之间的常用交互模式提供开箱即用的功能。如果我们有一组相互通信的线程并使用一种或者多种更常用的模式,如果我们有一组相互通信的线程,它们的交互模式与常用交互模式中的一种相似,我们只需简单地复用库里合适的类(也称为 Synchronizers*)而不是使用 locks 和 condition 对象以及 *synchronized 关键字再创造一个自定义方案。

接下来,研究下 CyclicBarrier

3 CyclicBarrier

CyclicBarrier 是一个同步器(Synchronizer),支持一组线程相互等待直到都到达共同的执行节点,也称为 barrier

CyclicBarriers are used in programs in which we have a fixed number of threads that must wait for each other to reach a common point before continuing execution.

Barrier 被叫做 cyclic,因为它在等待的线程释放后可以被复用。

4 用法

CyclicBarrier 的构造方法很简单。只需要传入一个整数,标记需要使用 barrier 实例调用 await() 方法的线程数,达到该线程数时通知指定数量的线程已到达公共执行节点:

1
public CyclicBarrier(int parties)

需要同步它们的执行的线程们也称为 parties 而调用 await() 方法就是我们注册到达 barrier point 的线程的方式。

这个调用是同步的并且线程调用该方法会挂起执行直到指定数量的线程在 barrier 上调用的了相同方法。指定数量的线程调用 await() 方法的场景称为 tripping the barrier

我们还可以传第二个参数给构造方法,这个参数是一个 Runnable 实例。它的逻辑会在最后一个线程到达 barrier 后执行。

1
public CyclicBarrier(int parties, Runnable barrierAction)

5 实现

下面场景展示 CyclicBarrier 如何使用。

这是固定数量的线程执行并保存相关结果到列表中的操作。当所有线程完成执行,他们中的一个(一般是最后一个 trips the barrier的)会开始处理从各个线程获取的数据。

让我们实现 action 发生的主类:

1
2
3
4
5
6
7
8
9
10
11
public class CyclicBarrierDemo {

private CyclicBarrier cyclicBarrier;
private List<List<Integer>> partialResults
= Collections.synchronizedList(new ArrayList<>());
private Random random = new Random();
private int NUM_PARTIAL_RESULTS;
private int NUM_WORKERS;

// ...
}

这个类非常清楚 - NUM_WORKERS是将要执行的线程数,NUM_PARTIAL_RESULTS是各个 worker 线程将要产生的结果数。

最后,我们有一个 partialResult 变量是列表类型,保存了各个 worker 线程处理结果。注意这个列表是 SynchronizedList,因为多个线程会同时放它里面写值,add() 方法对于普通的 ArrayList 不是线程安全的(thread-safe)。

现在,实现 worker 线程的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class CyclicBarrierDemo {

// ...

class NumberCruncherThread implements Runnable {

@Override
public void run() {
String thisThreadName = Thread.currentThread().getName();
List<Integer> partialResult = new ArrayList<>();

// Crunch some numbers and store the partial result
for (int i = 0; i < NUM_PARTIAL_RESULTS; i++) {
Integer num = random.nextInt(10);
System.out.println(thisThreadName
+ ": Crunching some numbers! Final result - " + num);
partialResult.add(num);
}

partialResults.add(partialResult);
try {
System.out.println(thisThreadName
+ " waiting for others to reach barrier.");
cyclicBarrier.await();
} catch (InterruptedException e) {
// ...
} catch (BrokenBarrierException e) {
// ...
}
}
}

}

然后,实现当 barrier 满足条件(即指定线程数调用了await(),也称为 barrier has been tripped)时运行的逻辑。

为了简单,我们仅仅将数字添加到结果列表中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class CyclicBarrierDemo {

// ...

class AggregatorThread implements Runnable {

@Override
public void run() {

String thisThreadName = Thread.currentThread().getName();

System.out.println(
thisThreadName + ": Computing sum of " + NUM_WORKERS
+ " workers, having " + NUM_PARTIAL_RESULTS + " results each.");
int sum = 0;

for (List<Integer> threadResult : partialResults) {
System.out.print("Adding ");
for (Integer partialResult : threadResult) {
System.out.print(partialResult+" ");
sum += partialResult;
}
System.out.println();
}
System.out.println(thisThreadName + ": Final result = " + sum);
}
}
}

最后一步构造 CyclicBarrier 并使用 main() 方法启动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class CyclicBarrierDemo {

// Previous code

public void runSimulation(int numWorkers, int numberOfPartialResults) {
NUM_PARTIAL_RESULTS = numberOfPartialResults;
NUM_WORKERS = numWorkers;

cyclicBarrier = new CyclicBarrier(NUM_WORKERS, new AggregatorThread());

System.out.println("Spawning " + NUM_WORKERS
+ " worker threads to compute "
+ NUM_PARTIAL_RESULTS + " partial results each");

for (int i = 0; i < NUM_WORKERS; i++) {
Thread worker = new Thread(new NumberCruncherThread());
worker.setName("Thread " + i);
worker.start();
}
}

public static void main(String[] args) {
CyclicBarrierDemo demo = new CyclicBarrierDemo();
demo.runSimulation(5, 3);
}
}

上述代码中,我们初始化 cyclic barrier,要求有 5 个线程,每个线程产生 3 个整数作为计算内容,并保存到同一个结果列表中。

一旦 barrier 条件满足,最后一个到达 barrier 的线程执行 AggregatorThread 中指定的逻辑,即 - 对各线程产生的整数求总和。

6 结果

这里是上面程序一次执行的输出 - 每次执行都可能产生不同的输出,因为线程会按不同顺序启动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Spawning 5 worker threads to compute 3 partial results each
Thread 0: Crunching some numbers! Final result - 6
Thread 0: Crunching some numbers! Final result - 2
Thread 0: Crunching some numbers! Final result - 2
Thread 0 waiting for others to reach barrier.
Thread 1: Crunching some numbers! Final result - 2
Thread 1: Crunching some numbers! Final result - 0
Thread 1: Crunching some numbers! Final result - 5
Thread 1 waiting for others to reach barrier.
Thread 3: Crunching some numbers! Final result - 6
Thread 3: Crunching some numbers! Final result - 4
Thread 3: Crunching some numbers! Final result - 0
Thread 3 waiting for others to reach barrier.
Thread 2: Crunching some numbers! Final result - 1
Thread 2: Crunching some numbers! Final result - 1
Thread 2: Crunching some numbers! Final result - 0
Thread 2 waiting for others to reach barrier.
Thread 4: Crunching some numbers! Final result - 9
Thread 4: Crunching some numbers! Final result - 3
Thread 4: Crunching some numbers! Final result - 5
Thread 4 waiting for others to reach barrier.
Thread 4: Computing final sum of 5 workers, having 3 results each.
Adding 6 2 2
Adding 2 0 5
Adding 6 4 0
Adding 1 1 0
Adding 9 3 5
Thread 4: Final result = 46

如上所示,Thread 4 最后到达 barrier 并执行求和逻辑。实际场景中,线程的执行顺序并不重要。

7 总结

这篇文章中,我们看到 CyclicBarrier 在哪些场景中非常有用。

我们也实现了一个场景,该场景需要固定数量的线程在执行完上一步逻辑后,要先在固定节点等待,然后一起开始执行下一步逻辑。


本文为译文,作者通过翻译达到学习目的。 原文链接 | 原文源码链接 | 本站源码链接


摘要

一个纯正的 P2P 版本电子现金支持在线支付会从一方直接发送到另一方,而不需要经过一个金融机构。数字签名解决了部分问题,但是如果还是需要一个可信的第三方来防止双花(double-spend)问题,那也就失去了主要利处。我们提出了一个使用 P2P 网络解决双花问题(double-spending problem)的方案。网络时间戳(timestamps)化交易(transactions)的方法是将它们散列(hash)到基于散列的工作量证明,不断增长的链(chain)上,形成一条记录,不可修改除非重做工作量证明。最长的链不仅作为目击事件顺序的证明,而且也证明它来自 CPU 算力最大的池。只要大量 CPU 算力控制在不攻击网络的节点里,它们将会产生最长的链而且比攻击者节点算得快。网络本身要求最小的结构。消息会尽量广泛传播,而节点可以随意离开或者加入网络,通过接收最长的工作量证明的链来确认它们离开网络时发生的事情。

译者注

双花问题:即一笔钱被花了两次或者两次以上,也叫”双重支付”。

1. 简介

互联网上的商业几乎完全依赖于金融机构作为可信赖的第三方来处理电子支付。虽然该系统对大多数交易(transactions)都能很好地工作,但它仍然存在基于信任的模型固有的缺陷。完全不可逆的交易实际上是不可能的,因为金融机构无法避免调解纠纷。调解成本增加了交易(transactions)成本,限制了最小实际交易(transactions)规模,并切断了小规模临时交易的可能性,而且在无法为不可逆服务进行不可逆支付的能力丧失方面存在更大的成本。有了逆转的可能性,对信任的需求就会扩大。商家必须提防他们的客户,麻烦他们提供除必须信息外的更多信息。一定比例的欺诈被通过不可避免。这些成本和支付不确定性可以通过使用物理货币在人身上避免,但是不存在这样的机制在没有可信第三方的情况下通过通信信道进行支付。

需要的是一个基于加密证明而不是信任的电子支付系统,允许任何两个自愿的当事方在不需要信任的第三方的情况下直接进行交易。在计算上无法逆转的交易将保护卖方免受欺诈,而常规的托管机制(escrow mechanisms)可以很容易地实施,以保护买方。这篇论文提出了一种双花问题的解决方案,利用 P2P 分布式时间戳服务器生成交易(transactions)按时间顺序排列的计算证明。只要诚实节点共同控制的 CPU 算力比任何攻击者节点的合作组多,系统就是安全的。

2. 交易 Transactions

我们定义了一种数字签名链的电子硬币。每个拥有者通过对上一次交易(transaction)和下个拥有者公钥的散列(hash)做数字签名并将它们添加到硬币的末端来转让硬币。收款人通过验证签名来验证链的拥有权。

译者理解:

私钥加密,公钥解密:一般被用于数字签名。数字签名是用于防篡改和防止假冒的,因为只有一人拥有私钥。甲方通过私钥对数据进行签名,乙方通过甲方的公钥验证签名,如果成功,说明确实是甲方发来的,并且数据没有被修改。

数字签名操作:先将上一次交易(transaction)和下个拥有者公钥进行散列(hash),然后用私钥对散列进行加密生成数字签名。

验证操作:收款人使用拥有者的公钥对数字签名进行解密,验证该签名是否来自拥有者。

Transaction

当然,问题是收款人不能证实所有拥有者都没有双花(double-spend)硬币。一个常见的解决方案是引入一个可信的中央机构或造币厂(mint),检查每笔交易是否存在双花。每笔交易后,必须将硬币退回造币厂(mint)发行新硬币,只有直接从造币厂发行的硬币才能相信没有双花(double-spend)。这个解决方案的问题在于,整个货币体系的命运取决于运营造币厂(mint)的公司,每笔交易都必须通过它们,就像银行一样。

我们需要一种方式让收款人知道先前的拥有者没有签名任何更早的交易(transactions)。

译者理解: 签名任何更早的交易指没有创建多个数字签名,即没有双花

就我们的目的而言,最早的交易才是最重要的,所以我们不关心后面交易试图双花的问题。确认一笔交易不存在的唯一方法是知道所有交易。在以造币厂为基础的模型中,造币厂知道所有的交易,并决定哪些交易最先到达。要在没有可信第三方的情况下实现这一点,交易必须公开宣布[1]并且我们需要一个系统,让参与者们对他们接收的交易顺序历史达成一致。

3. 时间戳服务器 Timestamp Server

我们提出的解决方案从时间戳服务器(timestamp server)开始。时间戳服务器的工作方式是对要加时间戳的项目块(a block of items)进行散列(hash),并广泛地发布该散列(hash),例如在报纸或全球新闻组网络(Usenet)帖子中[2-5]。时间戳证明数据必须在当时存在,才能进入到散列(hash)中。每个时间戳在其散列(hash)中包括上一个时间戳,形成一个链,每个附加的时间戳加固了其前面的时间戳。

timestamp-server

4. 工作量证明 Proof-of-Work

要在 P2P 的基础上实现分布式时间戳服务器,我们需要使用类似于 Adam Back’s Hashcash[6] 的工作量证明系统,而不是报纸或全球新闻组网络(Usenet)帖子。工作量证明涉及到寻找一个值,当被散列(hashed)时,例如使用 SHA-256,这个值的散列(hash)以若干个零位(ZERO bits)开始。所需的平均工作量随着指定的零位(ZERO bits)数增加而指数级增长,可以通过执行单个散列(hash)验证。

对于我们的时间戳网络,我们实现的工作量证明是通过递增 block 中的 nonce 直到找到一个值,该值使得 block 的散列(hash)的零位数符合要求。一旦消耗 CPU 算力满足了工作量证明,block 将不能修改除非重做工作量证明。因为新的 blocks 会链接在当前 block 后面,修改当前 block 的工作将包括重做它后面的所有块。

译者理解:比特币工作量证明具体实现是以 SHA256 算法计算一个目标哈希,使得这个哈希值符合前 N 位全是 0。

proof-of-work

工作量证明还解决了大多数决定中确定代表权的问题。如果大多数基于一个 IP 地址一票,那么任何能够分配许多IP的人都可以颠覆它。工作量证明本质上是一个 CPU 一个票。大多数决定由最长的链代表,最长的链意味着投入的工作量证明最大。如果 CPU 算力的大多数由诚实节点控制,那么最诚实的链将会增长最快并超过任何与之竞争的链。为了修改过去的 block,攻击者将不得不重做该 block 以及它后面的 blocks 的工作量证明而且要追上并超过最诚实的节点们的工作速度。我们稍后会讲到速度更慢的攻击者追上的可能性会随着后续块的增加指数级下降。

为了补偿随着时间过去硬件速度的提高和运行节点的兴趣变化,工作量证明的难度由一个以平均每小时产生的块数为目标的活动平均值来确定。如果它们增加太快,难度就会增加。

5. 网络 Network

运行该网络的步骤如下:

  1. 新交易向所有节点广播。
  2. 每个节点将新交易收集到 block 中。
  3. 每个节点为它的 block 计算工作量证明。
  4. 当一个节点计算得到工作量证明后,向所有节点广播 block。
  5. 节点接收那些含有的所有交易都有效并且没被花掉的 block。
  6. 节点通过在链中创造下个 block,使用接收的 block 的散列(hash)作为 privious hash 表达它们接收了 block。

节点始终将最长的链视为正确的链,并将继续努力扩展它。如果两个节点同时广播下一个块的不同版本,则某些节点可能首先接收这一个或另一个。在这种情况下,他们会在收到的第一个分支上工作,但会保留另一个分支,以防它变得更长。当下一个工作量证明被找到时一个分支会变得更长,它们的连接将不复存在;工作在另外一个分支上的节点将切换到更长的分支上。

新交易不一定需要到达所有节点。只要他们到达很多的节点,不久就会进到 block 中。Block 广播也容忍信息丢失。如果一个节点没有接收到 block,当它接收到下一个 block 并发现丢失了 block时,就会请求这个 block。

6. 激励

按照惯例,区块中的第一笔交易(transaction)是一种特殊交易(transaction),它产生(start)了一枚新硬币归 block 创建者所有。这增加了节点支持网络的动力,并提供了一种将硬币初始分配到流通中的方法,因为没有中央授权来发行它们。稳定增加一定数量的新硬币类似于黄金开采商(gold miner)消耗资源以增加黄金的流通量。在我们的场景中,花费的是 CPU 时间和电力。

激励措施也可以由交易费用(transaction fees)提供资金。如果交易的输出值小于其输入值,则差额是交易费用,该费用加到包含交易的 block 的激励值中。一旦预定数量的硬币进入流通,激励就可以完全过渡为交易费用,并且完全没有通货膨胀。

激励措施可能有助于鼓励节点保持诚实。如果一个贪婪的攻击者能够集合比所有诚实节点更多的 CPU 算力,他将面临选择使用它窃取他的付款来欺骗别人,还是使用它产生新的硬币。他应该会发现,遵守这些规则比从其他任何人那里骗得的钱要多得多,这些规则比其他任何人加起来都更有利于他,而不是去破坏系统和他拥有的财富的有效性。

译者理解

如何理解窃取他的付款来欺骗别人?

这就是 51% 攻击(majority attack)。根据第5节的工作量证明所讲可知,当拥有超过半数的算力时,不仅可以重做当前 block 和后面的 blocks,还能在重做完后创建新 block 赶超其它节点。使得它创建的链更长而获得全网认可。相当于可以篡改他之前的付款交易。

7. 回收磁盘空间

一旦硬币中的最新交易被埋在足够的 blocks 下,已支付的交易就可以丢弃,以节省磁盘空间。为了在不破坏 block 的散列(block’s hash)的情况下实现这一点,交易(transactions)在默克尔树(Merkle Tree)[7] [2] [5]中进行散列(hashed)处理,只有根包含在block 的散列(block’s hash)中。然后可以通过砍掉树的分支来压缩旧块。内部的散列(hash)不需要存储。

reclaim-disk-space

没有交易记录(transactions)的 block header 大约为 80 字节(bytes)。如果我们假设每10分钟产生一个 block,则每年 80 字节 * 6 * 24 * 365 = 4.2 MB。截止到2008年,销售的计算机系统通常带有 2GB RAM,而摩尔定律预测当前每年会增长 1.2 GB,即使必须将 block header 保存在内存中,存储也不成问题。

8. 简化支付验证

无需运行完整的网络节点就可以验证付款。用户只需要保留最长工作量证明链的 block header 的副本,在确信自己拥有最长的链之前,可以通过查询网络节点(network nodes)来获取该副本,以及获取默克尔分支(Merkle branch),该分支将交易(transaction)链接到已加上时间戳的 block 上。他本身无法检查交易(transaction),但是通过将其链接到链中的某个位置,他可以看到有网络节点已接受该交易,并且其之后添加的 blocks 也进一步确认网络已接受该交易。

译者理解

如何理解已加上时间戳的 block(the block it’s timestamped in)?

查看第三节 Timestamp Server

payment-verification

这样,只要诚实的节点控制网络,验证就是可靠的,但如果网络被攻击者控制,验证会更容易受到攻击。虽然网络节点(network nodes)可以自己验证交易(transaction),但只要攻击者能够继续控制网络,简化的验证方法就可能被攻击者捏造的交易(transaction)所欺骗。防止这种情况发生的一种策略是,接收来自网络节点检测到无效块时发出的警报,提示用户软件下载完整的 block 和被警告的交易(transaction)以确认是否不一致。经常收到付款的企业可能仍希望运行他们自己的节点,以获得更独立的安全性和更快的验证。

9. 合并和拆分价值

尽管可以分别处理硬币,但要对转账中的每一分钱都进行单独交易就会很不方便。为了允许价值被分割和合并,交易要包含多个输入和输出。通常情况下,会有一个单笔输入来自较大的上一笔交易(transaction)或多笔较小金额的组合输入,以及最多有两笔输出:一笔用于支付,另一笔将零钱(如果有)返回给发送方。

combine-splitting-value

应该注意的是,扇出(fan out),指一个交易依赖于多个交易,而这些交易又依赖于更多交易,在这里不是问题。因为永远不需要提取交易历史记录的完整独立副本。

译者理解

fan out:直译为扇出,也意为展开

10. 隐私

传统的银行模式通过限制相关方和受信任的第三方对信息的访问来达到一定程度的隐私。公布所有交易(transactions)的必要性排除了此方法,但仍可以通过在另一个地方中断信息流通来保持隐私:通过使公钥匿名。公众可以看到有人正在向其他人汇款,但没有信息将交易(transaction)和任何人联系在一起。这类似于证券交易所发布的信息级别,在该级别上,公开了单个交易的时间和规模,即”磁带”(tape),但没有告知参与方是谁。

译者理解

公钥匿名:使得我们不知道转账的发起方和接收方
tape:磁带;听到声音,却不能确定是谁的声音。

privacy

作为一个附加防火墙,每个交易(transaction)都应该使用新的密钥对,以防止它们指向同一所有者。在多输入交易(multi-input transactions)中,某些联系仍然不可避免,因为显然它们的输入都是由同一所有者拥有的。风险在于如果密钥的所有者暴露,该密钥所有者的其他交易也会暴露。

11. 计算

考虑攻击者试图生成一条替代诚实链的链且比诚实链延长速度更快的情况。即使做到这一点,也不意味着系统会被任意更改,比如凭空创造价值或获取不属于攻击者的钱。节点不会接受无效交易(transaction)的付款,诚实节点不会接受包含它们的 block。攻击者只能尝试更改他自己的交易(transaction),偷回他最近花掉的钱。

诚实链和攻击者链之间的竞争可以描述为二项随机漫步(Binomial Random Walk)。成功事件是诚实链延长一个 block,领先优势增加 1,失败事件是攻击者链延长一个 block,差距减少 1。

攻击者追赶上给定差距的概率类似于赌徒破产问题(Gambler’s Ruin problem)。假设信用能够无限透支的赌徒从欠钱开始赌钱,可以进行无数次尝试回本。我们可以计算出他能回本的概率,或者说攻击者链赶上诚实链的概率,如下所示[8]:

p = 诚实节点产生下一个 block 的概率

q = 攻击者产生下一个 block 的概率

qz = 攻击者落后 z 个 blocks 追上的概率


$$ q_{z}= \begin{pmatrix} 1&if p \leq q\\ \left (q/p \right )^{z}&\ if p>q \end{pmatrix} $$


假设 $p > q$,随着攻击者要追赶的 blocks 数不断增加,概率会呈指数下降。如果他在早期没有追赶成功,那么随着他落后地越来越多,他的机会也会变得越来越渺茫。

现在考虑下新交易的接收者需要等待多长时间才能充分确定发送者不能更改交易。我们假设发送者是攻击者,他想让接收者相信他已经支付过一段时间,然后再在一段时间后将交易切换为支付给自己。当这种情况发生时,接收者会收到警告消息,但发送者希望这已经为时已晚。

接收者生成一个新的密钥对,并在签名前不久才将公钥给发送者。这可以防止发送者提前准备一个区块链(a chain of blocks),通过不断地为该链生成 blocks,等到他伪造的链足够超前,然后在那一刻执行交易。一旦交易被发送,不诚实的发送者就开始偷偷准备包含其另一个版本的交易的并行链。

接收者会一直等待直到交易被添加到一个 block 中并且在其之后链接了 z 个 blocks。他不知道攻击者已经取得的确切进度(重做了多少 blocks),但是假设诚实的 block 花费的时间是每个 block 的平均预期时间,那么攻击者的潜在进度(重做了多少 blocks)将是一个泊松分布(Poisson distribution),期望值为:


$$ \lambda = z\frac{q}{p} $$


泊松分布

如果某事件以固定强度 $\lambda$,随机且独立地出现,该事件在单位时间内出现的次数(个数)可以看成是服从泊松分布。
参考 http://episte.math.ntu.edu.tw/articles/sm/sm_16_07_1/index.html

为了得到攻击者当前仍然可以追上的概率,我们将用表示攻击者取得的各个进度的泊松密度乘以他从该点可以追上的概率:


$$\sum_{k=0}^{\infty }\frac{\lambda ^{k}e^{-\lambda }}{k!}\cdot \begin{Bmatrix} \left ( q/p \right )^{\left ( z-k \right )} & if k\leqslant z\\ 1 & if k > z \end{Bmatrix}$$


重新调整避免对分布的无穷极数求和:


$$ 1-\sum_{k=0}^{\infty }\frac{\lambda ^{k}e^{-\lambda }}{k!} \left ( 1-\left ( q/p \right )^{\left ( z-k \right )} \right ) $$


译者注

用表示攻击者取得的各个进度的泊松密度乘以他从该点可以追上的概率,这句话我翻译不一定对。

转换成 C 语言代码…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include <math.h>
double AttackerSuccessProbability(double q, int z)
{
double p = 1.0 - q;
double lambda = z * (q / p);
double sum = 1.0;
int i, k;
for (k = 0; k <= z; k++)
{
double poisson = exp(-lambda);
for (i = 1; i <= k; i++)
poisson *= lambda / i;
sum -= poisson * (1 - pow(q / p, z - k));
}
return sum;
}

运行一些结果,我们可以看到概率随 z 增加呈指数下降。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
q=0.1
z=0 P=1.0000000
z=1 P=0.2045873
z=2 P=0.0509779
z=3 P=0.0131722
z=4 P=0.0034552
z=5 P=0.0009137
z=6 P=0.0002428
z=7 P=0.0000647
z=8 P=0.0000173
z=9 P=0.0000046
z=10 P=0.0000012

q=0.3
z=0 P=1.0000000
z=5 P=0.1773523
z=10 P=0.0416605
z=15 P=0.0101008
z=20 P=0.0024804
z=25 P=0.0006132
z=30 P=0.0001522
z=35 P=0.0000379
z=40 P=0.0000095
z=45 P=0.0000024
z=50 P=0.0000006

求解 P 当小于 0.1%…

1
2
3
4
5
6
7
8
9
P < 0.001
q=0.10 z=5
q=0.15 z=8
q=0.20 z=11
q=0.25 z=15
q=0.30 z=24
q=0.35 z=41
q=0.40 z=89
q=0.45 z=340

12. 总结

我们提出了一种不依赖信任的电子交易系统。我们从常见的数字签名硬币框架开始,该框架提供了对所有权的强大控制,但是它并不完善因为没有防止双花问题(double-spending)。为了解决这个问题,我们提出了一种使用工作量证明的 P2P 网络来记录交易的公开历史,如果诚实节点控制了大多数 CPU 算力,则对于攻击者而言,更改记录很快会变得在计算上不现实。该网络非结构化的简单性非常健壮(robust)。节点在几乎没有协调的情况下同时工作。它们不需要识别,因为消息不会被路由到任何特定的地方,只需要尽最大努力地传递。节点可以随意离开和重新加入网络,接收工作量证明链来证实它们离开时都发生了事情。它们用自己的 CPU 算力投票,通过扩展有效 blocks 来表达对有效块的接受,拒绝处理无效块来表示拒绝。任何必要的规则和激励措施都可以通过这种共识机制(consensus mechanism)来执行。

参考文献

[1] W. Dai, “b-money,” http://www.weidai.com/bmoney.txt, 1998.

[2] H. Massias, X.S. Avila, and J.-J. Quisquater, “Design of a secure timestamping service with minimal trust requirements,” In 20th Symposium on Information Theory in the Benelux, May 1999.

[3] S. Haber, W.S. Stornetta, “How to time-stamp a digital document,” In Journal of Cryptology, vol 3, no 2, pages 99-111, 1991.

[4] D. Bayer, S. Haber, W.S. Stornetta, “Improving the efficiency and reliability of digital time-stamping,” In Sequences II: Methods in Communication, Security and Computer Science, pages 329-334, 1993.

[5] S. Haber, W.S. Stornetta, “Secure names for bit-strings,” In Proceedings of the 4th ACM Conference on Computer and Communications Security, pages 28-35, April 1997.

[6] A. Back, “Hashcash - a denial of service counter-measure,” http://www.hashcash.org/papers/hashcash.pdf, 2002.

[7] R.C. Merkle, “Protocols for public key cryptosystems,” In Proc. 1980 Symposium on Security and
Privacy, IEEE Computer Society, pages 122-133, April 1980.

[8] W. Feller, “An introduction to probability theory and its applications,” 1957.


论文相关信息

作者 SatoShi Nakamoto

邮箱 satoshin@gmx.com

网址 www.bitcoin.org


1 介绍

这篇文章介绍 CompletableFuture 类的功能和用例 - 作为 Java 8 并发 API 增强的一部分引入。

2 Java 中的异步计算

异步计算比较复杂。通常我们把计算想象成一系列步骤。但是在异步计算场景中,表示成回调的操作常常会分散在代码中或者互相嵌套。当我们处理错误的时候,如果是发生在其中的一个步骤中,就变得很难排查。

Java 5 中添加的 Future 接口用于接收异步计算的结果,但是它没有提供方法来合并计算或者处理可能的错误。

Java 8 中引入了 CompletableFuture 类。除了 Future 接口,它也实现了 CompletionStage 接口。该接口定义了契约用于异步计算步骤,可以与其他异步计算步骤合并。

同时,CompletableFuture 是一个积木(building block)和框架,包含 50 个不同的方法用于组合,合并,执行异步计算步骤和处理错误。

这样一个大型 API 会让人很烦,但是可喜的是几乎所有方法都有明确和独特的用例。

3 将 CompletableFuture 作为一个简单的Future使用

首先,CompletableFuture 类实现了 Future 接口,因此你可以将它当做 Future 实现来用,不过会有额外的完成逻辑。

例如,你可以用无参构造方法(no-arg constructor)创建这个类的实例表示某些 future 结果,把它分发给消费者并在某一时刻调用 complete 方法来完成它。消费者可以通过 get 方法阻塞当前线程等待结果返回。

下面的例子提供了一个方法,方法里创建了 CompletableFuture 的实例,然后拆分一些计算到另一个线程中处理并立即返回 Future

当计算完成,方法中通过调用 complete 并传入结果完成 Future

1
2
3
4
5
6
7
8
9
10
11
12
public Future<String> calculateAsync() throws InterruptedException {
CompletableFuture<String> completableFuture
= new CompletableFuture<>();

Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500);
completableFuture.complete("Hello");
return null;
});

return completableFuture;
}

为了拆分计算,我们使用 Executor API 详见文章 “Introduction to Thread Pools in Java”,不过创建和完成 CompletableFuture 的这个方法可以结合任何并发机制或者含有原始线程(raw thread)的 API 一起使用。

注意 calculateAsync 方法返回一个 Future 实例。

我们简单地调用这个方法,拿到 Future 实例,当准备好阻塞以获取结果时调用该实例的 get 方法。

另外注意 get 方法会抛一些检查型异常(checked exception),也就是 ExecutionException (发生在计算执行期间的异常) 和 InterruptedException (通知执行某个方法的线程被中断):

1
2
3
4
5
6
Future<String> completableFuture = calculateAsync();

// ...

String result = completableFuture.get();
assertEquals("Hello", result);

如果你已经知道计算的结果,可以直接使用 completedFuture 静态方法并传入计算的结果作为参数。而 Futureget 方法将不会阻塞,会立马返回结果:

1
2
3
4
5
6
7
Future<String> completableFuture = 
CompletableFuture.completedFuture("Hello");

// ...

String result = completableFuture.get();
assertEquals("Hello", result);

考虑另外一种场景,你想要结束 Future 的执行。

假设我们不需要将拿到结果和决定取消异步执行一起做。这样可以通过 Futurecancel 方法实现。该方法接受一个 boolean 参数 mayInterruptIfRunning*。但是在 *CompletableFuture 的场景中没啥作用,因为中断不用于为 CompletableFuture 控制处理过程。

下面是示例异步方法的修改版:

1
2
3
4
5
6
7
8
9
10
11
public Future<String> calculateAsyncWithCancellation() throws InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();

Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500);
completableFuture.cancel(false);
return null;
});

return completableFuture;
}

当我们调用 Future.get() 方法阻塞以获取结果,如果 future 已经被取消,则会抛 CancellationException 异常:

1
2
Future<String> future = calculateAsyncWithCancellation();
future.get(); // CancellationException

译者注:实际上没有抛出 CancellationException 异常

4 封装计算逻辑的 CompletableFuture

上述代码可以让我们选择任何并发执行机制,但是如果我们想不用脚手架代码,只是想简单地异步执行某些代码呢?

静态方法 runAsyncsupplyAsync 分别用于创建 RunnableSupplier 函数式类型的 CompletableFuture 实例。

RunnableSupplier 都是函数式接口,Java 8 中可以用 lambda 表达式来表示它们的实例。

Runnable 接口也是一个用在线程中的老接口,它没有返回值。

Supplier 接口是一个泛型函数式接口,只有一个方法且无传参有参数化类型(parameterized type)的返回值。

它允许使用含有执行计算和返回结果的 lambda 表达式提供 Supplier 实例。和下面代码一样简单:

1
2
3
4
5
6
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> "Hello");

// ...

assertEquals("Hello", future.get());

5 异步计算的处理结果

处理计算结果的最泛化(generic)的方式是交给函数。thenApply 方法就是这样:接受一个 Function 实例,用它处理结果并通过函数返回 持有值的 Future

1
2
3
4
5
6
7
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<String> future = completableFuture
.thenApply(s -> s + " World");

assertEquals("Hello World", future.get());

如果你不需要返回值给 Future chain*,可以使用 *Consumer 函数式接口的实例。它的方法有一个传参和返回 void

下面代码最后的 future.get() 调用返回 Void 类型的实例。

1
2
3
4
5
6
7
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<Void> future = completableFuture
.thenAccept(s -> System.out.println("Computation returned: " + s));

future.get();

最后,如果你既不需要传值供计算也不想要有返回值,那您可以传一个 Runnable lambdathenRun 方法。下面例子中,调用 future.get() 方法后,简单地打印了一行日志到控制台:

1
2
3
4
5
6
7
CompletableFuture<String> completableFuture 
= CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<Void> future = completableFuture
.thenRun(() -> System.out.println("Computation finished."));

future.get();

6 合并 Futures

CompletableFuture API 最好的部分是合并 CompletableFuture 实例到计算步骤链(chain of computation steps)的能力。

这个链的结果本身就是 CompletableFuture 所以可以进行下一步链接和合并。这种方式在函数式语言中普遍存在(ubiquitous)并常被认为是一元设计模式(monadic design pattern)。

下面的例子中,我们使用 thenCompose 方法串联两个 Futures

注意这个方法接受一个函数并返回 CompletableFuture 实例。这个函数的参数是上一步计算的结果。它让我们可以在下一步 CompletableFuture 的 lambda 中使用这个值:

1
2
3
4
5
CompletableFuture<String> completableFuture 
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));

assertEquals("Hello World", completableFuture.get());

thenComposethenApply 一起实现了一元模式的基础模块。它们和 Java 8 引入的类 StreammapflatMap 方法 以及 Optional 类紧密相关。

两个方法都接收一个函数并应用于计算结果,但是 thenCompose (flatMap) 方法接受的函数返回一个与上个计算结果同类型的值。函数式结果支持将这些类的实例像积木一样组合在一起。

如果你想要执行两个独立的 Futures 并对它们的结果做点事情,可以使用 thenCombine 方法接受 一个 Future 和 一个接受两个参数的 Function 来处理这两个结果(上一个结果和传入的结果):

1
2
3
4
5
6
CompletableFuture<String> completableFuture 
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCombine(CompletableFuture.supplyAsync(
() -> " World"), (s1, s2) -> s1 + s2));

assertEquals("Hello World", completableFuture.get());

一个更简单的例子,当你想要对两个 Future 的结果做点事情,但不需要将任何结果相关的值再往下传。可以用 thenAcceptBoth 方法:

1
2
3
ompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
.thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"),
(s1, s2) -> System.out.println(s1 + s2));

7 thenApply()thenCompose() 的不同点

上一部分,我们展示了 thenApply()thenCompose() 的示例。两类 APIs 都用于将不同的 CompletableFuture 调用链接在一起,但是这两个函数的用法不同。

7.1 thenApply()

该方法用于处理上一个调用的结果。但是记住关键点是返回类型是所有调用的合并。

所以当我们想要转换 CompletableFuture 的调用是很有用:

1
CompletableFuture<Integer> finalResult = compute().thenApply(s-> s + 1);

7.2 thenCompose()

thenCompose() 方法和 thenApply 方法类似都是返回一个新的完成阶段(Completion Stage)。但是,thenCompose() 用上一阶段作为参数。它会扁平化(flatten)并直接返回带结果的 Future*,而不是 *thenApply() 返回的嵌套的 future

1
2
3
4
CompletableFuture<Integer> computeAnother(Integer i){
return CompletableFuture.supplyAsync(() -> 10 + i);
}
CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);

因为,如果想要链接 CompletableFuture 方法,那么最好使用 thenCompose()

同时,注意两个方法的区别类似 the difference between map() and flatMap()

8 并行运行多个 Futures

当我们并发执行多个 Futures 时,通常想要所有 Future 都执行完,然后处理它们的合并结果。

CompletableFuture.allOf 静态方法支持等待所有 Futures 完成并返回一个变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CompletableFuture<String> future1  
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2
= CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3
= CompletableFuture.supplyAsync(() -> "World");

CompletableFuture<Void> combinedFuture
= CompletableFuture.allOf(future1, future2, future3);

// ...

combinedFuture.get();

assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());

注意 CompletableFuture.allOf() 的返回类型是 CompletableFuture*。这个方法的限制是不会返回所有 *Futures 的合并结果。你只能从 Futures 那里一个一个获得结果。没关系,我们可以用 CompletableFuture.join() 方法和 Java 8 Streams API,会是这些操作简化:

1
2
3
4
5
String combined = Stream.of(future1, future2, future3)
.map(CompletableFuture::join)
.collect(Collectors.joining(" "));

assertEquals("Hello Beautiful World", combined);

CompletableFuture.join() 方法类似 get 方法,但是它会在Future没有正常完成时抛出未检查型异常(unchecked exception)。在Stream.map() 方法中可以传入 CompletableFuture.join() 方法的引用。

9 处理错误

为了处理异步计算步骤链中的错误,throw/catch 原语(idiom)也要做相应的适配。

取代在语法块(syntactic block)捕获异常的方式,CompletableFuture 将异常传到 handle 方法中。该方法接受两个参数:计算的结果(如果成功结束)和抛出的异常(如果某些计算步骤没有正常完成)。

下面的示例中我们使用 handle 方法返回一个默认值,当异步处理步骤因为没有 name 值提供而抛异常时:

1
2
3
4
5
6
7
8
9
10
11
12
13
String name = null;

// ...

CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> {
if (name == null) {
throw new RuntimeException("Computation error!");
}
return "Hello, " + name;
})}).handle((s, t) -> s != null ? s : "Hello, Stranger!");

assertEquals("Hello, Stranger!", completableFuture.get());

在看另外一个场景,与第一个示例不同,我们想要手动完成带值的Future,也有办法在完成时配置异常。completeExceptionally 方法就是用来干这个。下面的示例中 completableFuture.get() 方法会抛出 ExecutionException 异常并将 RuntimeException 作为触发异常原因:

1
2
3
4
5
6
7
8
9
10
CompletableFuture<String> completableFuture = new CompletableFuture<>();

// ...

completableFuture.completeExceptionally(
new RuntimeException("Calculation failed!"));

// ...

completableFuture.get(); // ExecutionException

上面的示例中,我们可以使用 handle 方法异步处理异常,不过使用 get 方法我们可以实现更加典型的同步异常处理的方式。

10 异步方法

CompletableFuture 中的 Fluent API 的大多数方法都有两个变型,一个带 Async 后缀,一个不带。这些方法 Async 方法通常用于在另外一个线程中执行相应的步骤。

不带 Async 后缀的方法使用当前调用的线程执行下一个阶段。 没有 Executor 参数的 Async 方法使用由 Executor 实现的公共 fork/join 线程池,它通过 ForkJoinPool.commonPool() 方法访问。有 Executor 参数的 Async 方法使用传递的 Executor 执行步骤。

下面是一个修改的示例,使用一个 Function 实例处理计算的实例。唯一可见的区别是 thenApplyAsync * 方法。但是在底层,其实函数的应用被封装在 *ForkJoinTask 实例中(关于fork/join框架的更多信息,详见“Guide to the Fork/Join Framework in Java”)。这可以使你的计算更加并行化,更加有效地利用系统资源。

1
2
3
4
5
6
7
CompletableFuture<String> completableFuture  
= CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<String> future = completableFuture
.thenApplyAsync(s -> s + " World");

assertEquals("Hello World", future.get());

11 JDK 9 CompletableFuture API

Java 9 中 CompletableFuture API 进一步增强,改变如下:

  • New factory methods added
  • Support for delays and timeouts
  • Improved support for subclassing.

新的实例 APIs 引入:

  • Executor defaultExecutor()
  • CompletableFuture<U> newIncompleteFuture()
  • CompletableFuture<T> copy()
  • CompletionStage<T> minimalCompletionStage()
  • CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, Executor executor)
  • CompletableFuture<T> completeAsync(Supplier<? extends T> supplier)
  • CompletableFuture<T> orTimeout(long timeout, TimeUnit unit)
  • CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit)

我们现在也有了一些静态工具方法:

  • Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)
  • Executor delayedExecutor(long delay, TimeUnit unit)
  • <U> CompletionStage<U> completedStage(U value)
  • <U> CompletionStage<U> failedStage(Throwable ex)
  • <U> CompletableFuture<U> failedFuture(Throwable ex)

最后,为了处理超时,Java 9 引入了两个新的函数:

  • orTimeout()
  • completeOnTimeout()

详见:Java 9 CompletableFuture API Improvements

12 总结

这篇文章我们介绍了 CompletableFuture 类的方法和典型用例。


本文为译文,作者通过翻译达到学习目的。 原文链接 | 原文源码链接 | 本站源码链接


1 总览

这篇文章中,我们将会看看java.util.concurrent包中的Phaser的结构。 这个结构和协调线程执行的CountDownLatch非常相似。和CountDownLatch相比,它有一些额外的功能。

Phaser是一个关卡,数量不定的线程在继续执行前会等在那里。而在CountDownLatch中线程数量不能动态配置,需要在创建实例的时候就配置好。

2 Phaser API

Phaser让我们可以在线程执行下一步前,自定义等待关卡的逻辑。

我们可以协调多个执行 phases,为各个编程 phase 复用Phaser实例。各个 phase 可以有不同数量的线程等待前进到另外一个 phase。我们后面会看看使用多个 phases 的例子。

线程要想加入上述场景,需要用register()方法将自己注册到Phaser实例中。注意这只是增加了 registerd parties 的数量,并且我们确认不了当前线程是否注册-我们必须通过子类化实现来支持。

通过调用arriveAndAwaitAdvance()线程通知它已经到达关卡,关卡实际上是一个阻塞方法。当 arrived parties 的数量等于 registerd parties 的数量时,程序会继续执行,phase 序号将会增加。我们可以通过调用getPhase()方法获取当前的 phase 序号。

当线程完成任务,我们应该调用arriveAndDeregister()方法提示当前线程在这个特定 phase 不需要再被考虑。

3 使用 Phaser API 实现逻辑

设想一下我们想要协调多个 phases 的行为。三个线程将处理第一 phase,然后两个线程处理第二 phase。

首先创建一个实现 Runnable 接口的 LongRunningAction 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class LongRunningAction implements Runnable {
private String threadName;
private Phaser ph;

LongRunningAction(String threadName, Phaser ph) {
this.threadName = threadName;
this.ph = ph;
ph.register();
}

@Override
public void run() {
ph.arriveAndAwaitAdvance();
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
ph.arriveAndDeregister();
}
}

我们的 action 类实例化后,使用register()方法将它注册到Phaser*实例中。这将会提高使用那个 *Phaser 的线程数量。

调用arriveAndAwaitAdvance()方法会使当前线程在 barrier 处等待。如上所述,当 arrived parties 的数量等于 registered parties 的数量时,执行会接着进行。

处理结束后,当前线程调用arriveAndDeregister()方法注销(deregister)自己。

在我们创建的测试用例中,首先启动三个LongRunningAction线程并阻塞在关卡处。下一步,在当前动作完成后,我们将创建另外两个的LongRunningAction线程执行下一个 phase 的处理。

当创建来自主线程的Phaser实例时,我们传递参数 1 。这等同于在当前线程中调用register()方法。我们为什么这样做,因为当我们创建三个 worker 线程时,主线程是协调者,所以Phaser需要这四个线程都注册。

1
2
3
4
ExecutorService executorService = Executors.newCachedThreadPool();
Phaser ph = new Phaser(1);

assertEquals(0, ph.getPhase());

初始化后的 phase 序号 等于 0 ;

Phaser 类有一个构造方法,可以传入父实例,对于大量的 parties 的场景很有用。这种场景会存在大量同步竞争开销,Phasers 的实例可能被创建,这样 sub-phasers 组可以共享父实例。

下面,我们启动三个LongRunningAction action 线程,它们会在关卡处等待,直到我们在主线程中调用arriveAndAwaitAdvance()方法。

记住我们初始化Phaser时传入了 1 并调用了三次 register()。现在,三个 action 线程已经到达关卡,所以需要再调用一次arriveAndAwaitAdvance(),在主线程中调用。

1
2
3
4
5
6
7
executorService.submit(new LongRunningAction("thread-1", ph));
executorService.submit(new LongRunningAction("thread-2", ph));
executorService.submit(new LongRunningAction("thread-3", ph));

ph.arriveAndAwaitAdvance();

assertEquals(1, ph.getPhase());

在上面的 phase 完成后,因为程序已经完成了第一步执行,所以getPhase() 将会返回 1。

接下来我们想要两个线程进行第二 phase 的处理。我们可以利用 Phaser 来实现动态配置等待在关卡上的线程数。我们启动两个新线程,它们会在主线程调用 arriveAndAwaitAdvance() 之后开始执行(和上面示例一样):

1
2
3
4
5
6
7
executorService.submit(new LongRunningAction("thread-4", ph));
executorService.submit(new LongRunningAction("thread-5", ph));
ph.arriveAndAwaitAdvance();

assertEquals(2, ph.getPhase());

ph.arriveAndDeregister();

上面处理完,getPhase() 会返回 phase 序号 2。当我们想结束程序运行,由于主线程还注册在Phaser中,我们需要调用arriveAndDeregister()方法。当注销操作让 registered parties 的数目变为 0 后,Phaser就会终止。所有对同步方法的调用将不再阻塞并立马返回。

返回程序将打印如下输出(打印内容的完整内容在代码库中):

我看到所有线程都等待执行到关卡打开。执行中的下个 phase 只在上一个 phase 完成后进行。

1
2
3
4
5
6
7
8
9
10
This is phase 0
This is phase 0
This is phase 0
Thread thread-2 before long running action
Thread thread-1 before long running action
Thread thread-3 before long running action
This is phase 1
This is phase 1
Thread thread-4 before long running action
Thread thread-5 before long running action

4 总结

这篇教程,我们学习了下来自 java.util.concurrentPhaser结构,并使用Phaser类实现了多 phases 的协同逻辑。


本文为译文,作者通过翻译达到学习目的。 原文链接 | 原文源码链接 | 本站源码链接


1 介绍

ExecutorService框架使得在多线程中处理任务变得简单。文中会举一些等待线程完成执行的例子。

同时会展示怎样优雅地关闭ExecutorService并等待已经运行的线程结束执行。

2 Executor关闭后

当使用Executor时,通过调用shutdown()或者shutdownNow()方法关闭。不过,它不会等待所有线程都执行完才关闭。

等待现存的线程完成执行再关闭可以通过调用awaitTermination()方法实现。

下面的例子阻塞当前线程直到所有任务都执行完成或者超过指定的超时时间:

1
2
3
4
5
6
7
8
9
10
11
public void awaitTerminationAfterShutdown(ExecutorService threadPool) {
threadPool.shutdown();
try {
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
threadPool.shutdownNow();
}
} catch (InterruptedException ex) {
threadPool.shutdownNow();
Thread.currentThread().interrupt();
}
}

3 使用CountDownLatch

下面,看看另外一种解决上面问题的办法-使用CountDownLatch通知任务完成。

我们可以初始化一个值用于调用await()方法的线程在这个值递减到0后会被通知。

例如,如果我们需要当前线程等待另外N个线程完成执行。我们可以用N初始化latch:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ExecutorService WORKER_THREAD_POOL 
= Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
WORKER_THREAD_POOL.submit(() -> {
try {
// ...
latch.countDown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}

// wait for the latch to be decremented by the two remaining threads
latch.await();

4 使用invokeAll()

第一个我们用于运行线程的方法是invokeAll()。在所有任务执行结束或者超过超时时间后,方法会返回Future对象列表。

同时注意返回的Future对象列表顺序和提供的Callable对象列表顺序一样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ExecutorService WORKER_THREAD_POOL = Executors.newFixedThreadPool(10);

List<Callable<String>> callables = Arrays.asList(
new DelayedCallable("fast thread", 100),
new DelayedCallable("slow thread", 3000));

long startProcessingTime = System.currentTimeMillis();
List<Future<String>> futures = WORKER_THREAD_POOL.invokeAll(callables);

awaitTerminationAfterShutdown(WORKER_THREAD_POOL);

long totalProcessingTime = System.currentTimeMillis() - startProcessingTime;

assertTrue(totalProcessingTime >= 3000);

String firstThreadResponse = futures.get(0).get();

assertTrue("fast thread".equals(firstThreadResponse));

String secondThreadResponse = futures.get(1).get();
assertTrue("slow thread".equals(secondThreadResponse));

5 使用ExecutorCompletionService

另一种运行多线程的方法是使用ExecutorCompletionService。使用传入的ExecutorService执行任务。

invokeAll()的一个区别是返回的Future结果列表顺序与执行任务的返回顺序一样。ExecutorCompletionService使用队列按任务执行完成顺序保存结果,而invokeAll()会返回和给定的任务列表顺序一样的结果列表:

1
2
3
4
5
6
7
8
9
10
CompletionService<String> service
= new ExecutorCompletionService<>(WORKER_THREAD_POOL);

List<Callable<String>> callables = Arrays.asList(
new DelayedCallable("fast thread", 100),
new DelayedCallable("slow thread", 3000));

for (Callable<String> callable : callables) {
service.submit(callable);
}

使用take()方法可以访问结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
long startProcessingTime = System.currentTimeMillis();

Future<String> future = service.take();
String firstThreadResponse = future.get();
long totalProcessingTime
= System.currentTimeMillis() - startProcessingTime;

assertTrue("First response should be from the fast thread",
"fast thread".equals(firstThreadResponse));
assertTrue(totalProcessingTime >= 100
&& totalProcessingTime < 1000);
LOG.debug("Thread finished after: " + totalProcessingTime
+ " milliseconds");

future = service.take();
String secondThreadResponse = future.get();
totalProcessingTime
= System.currentTimeMillis() - startProcessingTime;

assertTrue(
"Last response should be from the slow thread",
"slow thread".equals(secondThreadResponse));
assertTrue(
totalProcessingTime >= 3000
&& totalProcessingTime < 4000);
LOG.debug("Thread finished after: " + totalProcessingTime
+ " milliseconds");

awaitTerminationAfterShutdown(WORKER_THREAD_POOL);

6 总结

根据使用场景,有多种方式实现等待线程完成执行。

CountDownLatch可以用于当我们想要一组操作被线程完成以后要通知其他线程时。

ExecutorCompletionService可以用于当我们需要尽快拿到任务结果时,而其他方法只能等到运行的任务执行完以后才能拿到。


本文为译文,作者通过翻译达到学习目的。 原文链接 | 原文源码链接 | 本站源码链接