java并发系列-介绍 CompletableFuture

1 介绍

这篇文章介绍 CompletableFuture 类的功能和用例 - 作为 Java 8 并发 API 增强的一部分引入。

2 Java 中的异步计算

异步计算比较复杂。通常我们把计算想象成一系列步骤。但是在异步计算场景中,表示成回调的操作常常会分散在代码中或者互相嵌套。当我们处理错误的时候,如果是发生在其中的一个步骤中,就变得很难排查。

Java 5 中添加的 Future 接口用于接收异步计算的结果,但是它没有提供方法来合并计算或者处理可能的错误。

Java 8 中引入了 CompletableFuture 类。除了 Future 接口,它也实现了 CompletionStage 接口。该接口定义了契约用于异步计算步骤,可以与其他异步计算步骤合并。

同时,CompletableFuture 是一个积木(building block)和框架,包含 50 个不同的方法用于组合,合并,执行异步计算步骤和处理错误。

这样一个大型 API 会让人很烦,但是可喜的是几乎所有方法都有明确和独特的用例。

3 将 CompletableFuture 作为一个简单的Future使用

首先,CompletableFuture 类实现了 Future 接口,因此你可以将它当做 Future 实现来用,不过会有额外的完成逻辑。

例如,你可以用无参构造方法(no-arg constructor)创建这个类的实例表示某些 future 结果,把它分发给消费者并在某一时刻调用 complete 方法来完成它。消费者可以通过 get 方法阻塞当前线程等待结果返回。

下面的例子提供了一个方法,方法里创建了 CompletableFuture 的实例,然后拆分一些计算到另一个线程中处理并立即返回 Future

当计算完成,方法中通过调用 complete 并传入结果完成 Future

1
2
3
4
5
6
7
8
9
10
11
12
public Future<String> calculateAsync() throws InterruptedException {
CompletableFuture<String> completableFuture
= new CompletableFuture<>();

Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500);
completableFuture.complete("Hello");
return null;
});

return completableFuture;
}

为了拆分计算,我们使用 Executor API 详见文章 “Introduction to Thread Pools in Java”,不过创建和完成 CompletableFuture 的这个方法可以结合任何并发机制或者含有原始线程(raw thread)的 API 一起使用。

注意 calculateAsync 方法返回一个 Future 实例。

我们简单地调用这个方法,拿到 Future 实例,当准备好阻塞以获取结果时调用该实例的 get 方法。

另外注意 get 方法会抛一些检查型异常(checked exception),也就是 ExecutionException (发生在计算执行期间的异常) 和 InterruptedException (通知执行某个方法的线程被中断):

1
2
3
4
5
6
Future<String> completableFuture = calculateAsync();

// ...

String result = completableFuture.get();
assertEquals("Hello", result);

如果你已经知道计算的结果,可以直接使用 completedFuture 静态方法并传入计算的结果作为参数。而 Futureget 方法将不会阻塞,会立马返回结果:

1
2
3
4
5
6
7
Future<String> completableFuture = 
CompletableFuture.completedFuture("Hello");

// ...

String result = completableFuture.get();
assertEquals("Hello", result);

考虑另外一种场景,你想要结束 Future 的执行。

假设我们不需要将拿到结果和决定取消异步执行一起做。这样可以通过 Futurecancel 方法实现。该方法接受一个 boolean 参数 mayInterruptIfRunning*。但是在 *CompletableFuture 的场景中没啥作用,因为中断不用于为 CompletableFuture 控制处理过程。

下面是示例异步方法的修改版:

1
2
3
4
5
6
7
8
9
10
11
public Future<String> calculateAsyncWithCancellation() throws InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();

Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500);
completableFuture.cancel(false);
return null;
});

return completableFuture;
}

当我们调用 Future.get() 方法阻塞以获取结果,如果 future 已经被取消,则会抛 CancellationException 异常:

1
2
Future<String> future = calculateAsyncWithCancellation();
future.get(); // CancellationException

译者注:实际上没有抛出 CancellationException 异常

4 封装计算逻辑的 CompletableFuture

上述代码可以让我们选择任何并发执行机制,但是如果我们想不用脚手架代码,只是想简单地异步执行某些代码呢?

静态方法 runAsyncsupplyAsync 分别用于创建 RunnableSupplier 函数式类型的 CompletableFuture 实例。

RunnableSupplier 都是函数式接口,Java 8 中可以用 lambda 表达式来表示它们的实例。

Runnable 接口也是一个用在线程中的老接口,它没有返回值。

Supplier 接口是一个泛型函数式接口,只有一个方法且无传参有参数化类型(parameterized type)的返回值。

它允许使用含有执行计算和返回结果的 lambda 表达式提供 Supplier 实例。和下面代码一样简单:

1
2
3
4
5
6
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> "Hello");

// ...

assertEquals("Hello", future.get());

5 异步计算的处理结果

处理计算结果的最泛化(generic)的方式是交给函数。thenApply 方法就是这样:接受一个 Function 实例,用它处理结果并通过函数返回 持有值的 Future

1
2
3
4
5
6
7
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<String> future = completableFuture
.thenApply(s -> s + " World");

assertEquals("Hello World", future.get());

如果你不需要返回值给 Future chain*,可以使用 *Consumer 函数式接口的实例。它的方法有一个传参和返回 void

下面代码最后的 future.get() 调用返回 Void 类型的实例。

1
2
3
4
5
6
7
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<Void> future = completableFuture
.thenAccept(s -> System.out.println("Computation returned: " + s));

future.get();

最后,如果你既不需要传值供计算也不想要有返回值,那您可以传一个 Runnable lambdathenRun 方法。下面例子中,调用 future.get() 方法后,简单地打印了一行日志到控制台:

1
2
3
4
5
6
7
CompletableFuture<String> completableFuture 
= CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<Void> future = completableFuture
.thenRun(() -> System.out.println("Computation finished."));

future.get();

6 合并 Futures

CompletableFuture API 最好的部分是合并 CompletableFuture 实例到计算步骤链(chain of computation steps)的能力。

这个链的结果本身就是 CompletableFuture 所以可以进行下一步链接和合并。这种方式在函数式语言中普遍存在(ubiquitous)并常被认为是一元设计模式(monadic design pattern)。

下面的例子中,我们使用 thenCompose 方法串联两个 Futures

注意这个方法接受一个函数并返回 CompletableFuture 实例。这个函数的参数是上一步计算的结果。它让我们可以在下一步 CompletableFuture 的 lambda 中使用这个值:

1
2
3
4
5
CompletableFuture<String> completableFuture 
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));

assertEquals("Hello World", completableFuture.get());

thenComposethenApply 一起实现了一元模式的基础模块。它们和 Java 8 引入的类 StreammapflatMap 方法 以及 Optional 类紧密相关。

两个方法都接收一个函数并应用于计算结果,但是 thenCompose (flatMap) 方法接受的函数返回一个与上个计算结果同类型的值。函数式结果支持将这些类的实例像积木一样组合在一起。

如果你想要执行两个独立的 Futures 并对它们的结果做点事情,可以使用 thenCombine 方法接受 一个 Future 和 一个接受两个参数的 Function 来处理这两个结果(上一个结果和传入的结果):

1
2
3
4
5
6
CompletableFuture<String> completableFuture 
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCombine(CompletableFuture.supplyAsync(
() -> " World"), (s1, s2) -> s1 + s2));

assertEquals("Hello World", completableFuture.get());

一个更简单的例子,当你想要对两个 Future 的结果做点事情,但不需要将任何结果相关的值再往下传。可以用 thenAcceptBoth 方法:

1
2
3
ompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
.thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"),
(s1, s2) -> System.out.println(s1 + s2));

7 thenApply()thenCompose() 的不同点

上一部分,我们展示了 thenApply()thenCompose() 的示例。两类 APIs 都用于将不同的 CompletableFuture 调用链接在一起,但是这两个函数的用法不同。

7.1 thenApply()

该方法用于处理上一个调用的结果。但是记住关键点是返回类型是所有调用的合并。

所以当我们想要转换 CompletableFuture 的调用是很有用:

1
CompletableFuture<Integer> finalResult = compute().thenApply(s-> s + 1);

7.2 thenCompose()

thenCompose() 方法和 thenApply 方法类似都是返回一个新的完成阶段(Completion Stage)。但是,thenCompose() 用上一阶段作为参数。它会扁平化(flatten)并直接返回带结果的 Future*,而不是 *thenApply() 返回的嵌套的 future

1
2
3
4
CompletableFuture<Integer> computeAnother(Integer i){
return CompletableFuture.supplyAsync(() -> 10 + i);
}
CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);

因为,如果想要链接 CompletableFuture 方法,那么最好使用 thenCompose()

同时,注意两个方法的区别类似 the difference between map() and flatMap()

8 并行运行多个 Futures

当我们并发执行多个 Futures 时,通常想要所有 Future 都执行完,然后处理它们的合并结果。

CompletableFuture.allOf 静态方法支持等待所有 Futures 完成并返回一个变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CompletableFuture<String> future1  
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2
= CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3
= CompletableFuture.supplyAsync(() -> "World");

CompletableFuture<Void> combinedFuture
= CompletableFuture.allOf(future1, future2, future3);

// ...

combinedFuture.get();

assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());

注意 CompletableFuture.allOf() 的返回类型是 CompletableFuture*。这个方法的限制是不会返回所有 *Futures 的合并结果。你只能从 Futures 那里一个一个获得结果。没关系,我们可以用 CompletableFuture.join() 方法和 Java 8 Streams API,会是这些操作简化:

1
2
3
4
5
String combined = Stream.of(future1, future2, future3)
.map(CompletableFuture::join)
.collect(Collectors.joining(" "));

assertEquals("Hello Beautiful World", combined);

CompletableFuture.join() 方法类似 get 方法,但是它会在Future没有正常完成时抛出未检查型异常(unchecked exception)。在Stream.map() 方法中可以传入 CompletableFuture.join() 方法的引用。

9 处理错误

为了处理异步计算步骤链中的错误,throw/catch 原语(idiom)也要做相应的适配。

取代在语法块(syntactic block)捕获异常的方式,CompletableFuture 将异常传到 handle 方法中。该方法接受两个参数:计算的结果(如果成功结束)和抛出的异常(如果某些计算步骤没有正常完成)。

下面的示例中我们使用 handle 方法返回一个默认值,当异步处理步骤因为没有 name 值提供而抛异常时:

1
2
3
4
5
6
7
8
9
10
11
12
13
String name = null;

// ...

CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> {
if (name == null) {
throw new RuntimeException("Computation error!");
}
return "Hello, " + name;
})}).handle((s, t) -> s != null ? s : "Hello, Stranger!");

assertEquals("Hello, Stranger!", completableFuture.get());

在看另外一个场景,与第一个示例不同,我们想要手动完成带值的Future,也有办法在完成时配置异常。completeExceptionally 方法就是用来干这个。下面的示例中 completableFuture.get() 方法会抛出 ExecutionException 异常并将 RuntimeException 作为触发异常原因:

1
2
3
4
5
6
7
8
9
10
CompletableFuture<String> completableFuture = new CompletableFuture<>();

// ...

completableFuture.completeExceptionally(
new RuntimeException("Calculation failed!"));

// ...

completableFuture.get(); // ExecutionException

上面的示例中,我们可以使用 handle 方法异步处理异常,不过使用 get 方法我们可以实现更加典型的同步异常处理的方式。

10 异步方法

CompletableFuture 中的 Fluent API 的大多数方法都有两个变型,一个带 Async 后缀,一个不带。这些方法 Async 方法通常用于在另外一个线程中执行相应的步骤。

不带 Async 后缀的方法使用当前调用的线程执行下一个阶段。 没有 Executor 参数的 Async 方法使用由 Executor 实现的公共 fork/join 线程池,它通过 ForkJoinPool.commonPool() 方法访问。有 Executor 参数的 Async 方法使用传递的 Executor 执行步骤。

下面是一个修改的示例,使用一个 Function 实例处理计算的实例。唯一可见的区别是 thenApplyAsync * 方法。但是在底层,其实函数的应用被封装在 *ForkJoinTask 实例中(关于fork/join框架的更多信息,详见“Guide to the Fork/Join Framework in Java”)。这可以使你的计算更加并行化,更加有效地利用系统资源。

1
2
3
4
5
6
7
CompletableFuture<String> completableFuture  
= CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<String> future = completableFuture
.thenApplyAsync(s -> s + " World");

assertEquals("Hello World", future.get());

11 JDK 9 CompletableFuture API

Java 9 中 CompletableFuture API 进一步增强,改变如下:

  • New factory methods added
  • Support for delays and timeouts
  • Improved support for subclassing.

新的实例 APIs 引入:

  • Executor defaultExecutor()
  • CompletableFuture<U> newIncompleteFuture()
  • CompletableFuture<T> copy()
  • CompletionStage<T> minimalCompletionStage()
  • CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, Executor executor)
  • CompletableFuture<T> completeAsync(Supplier<? extends T> supplier)
  • CompletableFuture<T> orTimeout(long timeout, TimeUnit unit)
  • CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit)

我们现在也有了一些静态工具方法:

  • Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)
  • Executor delayedExecutor(long delay, TimeUnit unit)
  • <U> CompletionStage<U> completedStage(U value)
  • <U> CompletionStage<U> failedStage(Throwable ex)
  • <U> CompletableFuture<U> failedFuture(Throwable ex)

最后,为了处理超时,Java 9 引入了两个新的函数:

  • orTimeout()
  • completeOnTimeout()

详见:Java 9 CompletableFuture API Improvements

12 总结

这篇文章我们介绍了 CompletableFuture 类的方法和典型用例。


本文为译文,作者通过翻译达到学习目的。 原文链接 | 原文源码链接 | 本站源码链接