1 概述

Future接口从Java1.5开始出现,用于异步调用和并发处理。

2 Future使用场景

我们可以使用Future接口异步处理一些执行时间比较长的方法。

以下是适用场景

  • 计算密集型的任务,如学术和科学计算;
  • 处理大数据;
  • 远程调用,如下载文件、爬取网页,web service等

3 如何使用Future

3.1 准备测试类

准备了如下的类,我们通过Thread.sleep()让执行时间久一点。

1
2
3
4
5
6
7
8
9
10
11
12
public class SquareCalculator {    

private ExecutorService executor
= Executors.newSingleThreadExecutor();

public Future<Integer> calculate(Integer input) {
return executor.submit(() -> {
Thread.sleep(1000);
return input * input;
});
}
}

上面的类涉及到CallableExecutorService的使用。

这个类创建了Callable的实例,再通过ExecutorService执行器创建一个新的线程来执行这个实例方法,并返回Future对象。

Executors有多种工厂方法,这里使用了最简单的一种叫newSingleThreadExecutor,仅仅创建一个线程。

3.2 使用isDone()和get()获取结果

Future.isDone(): 判断executor是否执行完任务

Future.get(): 获取执行结果,当未执行完时,或被阻塞

如下例子简单测试了isDone和get两个方法的效果

1
2
3
4
5
6
7
8
Future<Integer> future = new SquareCalculator().calculate(10);

while(!future.isDone()) {
System.out.println("Calculating...");
Thread.sleep(300);
}

Integer result = future.get();

另外Future.get()方法提供了另一种使用方式. 当经过指定时间还没有获得结果时,抛出TimeoutException异常。

1
Integer result = future.get(500, TimeUnit.MILLISECONDS);

3.3 使用Future.cancel()结束Future

通过使用Future.cancel(boolean)可以让executor停止操作并中断线程。

1
2
3
Future<Integer> future = new SquareCalculator().calculate(4);

boolean canceled = future.cancel(true);

上述例子的操作,原文说永远完成不了(留疑)。

如果在调用了cancel()后调用get(),程序会报CancellationException.所以我们在使用get()之前,可以通过Future.isCancelled()来获知Future是否已经被结束。

cancel(boolean)中的boolean用于控制,是否直接中断任务执行。

3.4 多线程的例子

上面我们使用了Executors.newSingleThreadExecutor来初始化executor。
下面的例子提现了单线程的效果,可以看到两个计算是同步进行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
SquareCalculator squareCalculator = new SquareCalculator();

Future<Integer> future1 = squareCalculator.calculate(10);
Future<Integer> future2 = squareCalculator.calculate(100);

while (!(future1.isDone() && future2.isDone())) {
System.out.println(
String.format(
"future1 is %s and future2 is %s",
future1.isDone() ? "done" : "not done",
future2.isDone() ? "done" : "not done"
)
);
Thread.sleep(300);
}

Integer result1 = future1.get();
Integer result2 = future2.get();

System.out.println(result1 + " and " + result2);

squareCalculator.shutdown();
1
2
3
4
5
6
7
8
9
10
calculating square for: 10
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
calculating square for: 100
future1 is done and future2 is not done
future1 is done and future2 is not done
future1 is done and future2 is not done
100 and 10000

现在我们使用Executors.newFixedThreadPool()来指定多个线程。

下面例子可以看到我们创建了有两个线程的线程池,于是计算结果并行处理完成。

1
2
3
4
5
6
public class FixedThreadSquareCalculator {

private ExecutorService executor = Executors.newFixedThreadPool(2);

//...
}
1
2
3
4
5
6
7
calculating square for: 10
calculating square for: 100
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
100 and 10000

4 ForkJoinTask概述

ForkJoinTask是实现了Future的抽象类,可以通过ForkJoinPool提供若干线程来执行大量的任务。

ForkJoinTask通过fork()方法产生新的任务,通过join方法将所有任务的执行结果合并。

目前有两个类实现了ForkJoinTask分别是RecursiveTask和RecursiveAction,前者完成时返回结果后者不返回,都用于处理递归任务。

现在我们来做一个例子。给定一个正整数n,计算1到n的平方和。

首先定义一个类,这个类继承了RecursiveTask这个用于处理递归任务的抽象类,然后实现处理递归任务的compute()方法。当n不小于1时,通过fork()产生新的计算任务,通过join()来求和。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class FactorialSquareCalculator extends RecursiveTask<Integer> {

private Integer n;

public FactorialSquareCalculator(Integer n) {
this.n = n;
}

@Override
protected Integer compute() {
if (n <= 1) {
return n;
}

FactorialSquareCalculator calculator
= new FactorialSquareCalculator(n - 1);

calculator.fork();

return n * n + calculator.join();
}
}

最后,我们通过ForkJoinPool实例来执行RecursiveTask的实现类FactorialSquareCalculator。

1
2
3
4
5
ForkJoinPool forkJoinPool = new ForkJoinPool();

FactorialSquareCalculator calculator = new FactorialSquareCalculator(10);

forkJoinPool.execute(calculator);

5 总结

这篇文章,简单过了一遍Future接口以及它的所有方法,什么时候使用多线程,ForkJoinTask的主要方法fork()和join()。


本文为译文,作者通过翻译达到学习目的。 原文链接 | 本站源码链接


Welcome to Hexo! This is your very first post. Check documentation for more info. If you get any problems when using Hexo, you can find the answer in troubleshooting or you can ask me on GitHub.

Quick Start

Create a new post

1
$ hexo new "My New Post"

More info: Writing

Run server

1
$ hexo server

More info: Server

Generate static files

1
$ hexo generate

More info: Generating

Deploy to remote sites

1
$ hexo deploy

More info: Deployment