java并发系列-java.util.concurrent概览

1 概述

java.util.concurrent包提供了创建并发应用的一系列功能类。

2 主要成员

java.util.concurrent包提供了太多特性,这篇文章不会一一详细讲述。本文主要将笔墨集中在这个包中若干最有用的功能上。包括如下类:

  • Executor
  • ExecutorService
  • ScheduledExecutorService
  • Future
  • CountDownLatch
  • CyclicBarrier
  • Semaphore
  • ThreadFactory
  • BlockingQueue
  • DelayQueue
  • Locks
  • Phaser

在下面每个类的介绍中也提供了它的详细文档地址。

2.1 Executor

Executor 是一个接口,用于描述执行给定任务的对象。

通过实现Executor,可以让任务在新线程或者当前线程中执行。因此使用这个接口,我们可以解耦实际任务执行过程中的任务执行流。

这里需要注意Executor没有严格要求任务执行必须是异步的。举一个最简单的例子,一个executor可以在正在唤醒的线程中立马唤醒提交的任务。

我们需要创建Invoker一个来创建evecutor实例:

1
2
3
4
5
6
public class Invoker implements Executor {
@Override
public void execute(Runnable r) {
r.run();
}
}

现在,我们可以使用这个invoker执行任务。

1
2
3
4
5
6
public void execute() {
Executor executor = new Invoker();
executor.execute( () -> {
// task to be performed
});
}

另外,如果executor不接受这种任务来执行,则它会抛出异常RejectedExecutionException

2.2 ExecutorService

ExecutorService是一个完整的异步处理解决方案。它管理一个内存队列,将提交的任务调度到可用线程上。

要是用ExecutorService,需要创建一个Runnable类。

1
2
3
4
5
6
public class Task implements Runnable {
@Override
public void run() {
// task details
}
}

现在我们来创建ExecutorService实例并调度执行任务。当创建的时候,需要为它执行线程池大小。

1
ExecutorService executor = Executors.newFixedThreadPool(10);

如果想要创建一个单线程ExecutorService,那么可以使用newSingleThreadExecutor(ThreadFactory threadFactory) 创建实例。

创建好executor,就可以使用它来提交任务。

1
2
3
public void execute() { 
executor.submit(new Task());
}

当提交任务的时候,也可以提交Runnable实例

1
2
3
executor.submit(() -> {
new Task();
});

它同时提供了两个中止执行的方法,分别是shutdown()shutdownNow()shutdown()会等待所有已提交的任务执行完中止。而shutdownNow()会立即中止等待或执行中的任务。

另外,还有一个awaitTermination(long timeout, TimeUnit unit)方法。在触发关闭事件,执行超时或者执行线程被中断发生时,该方法可以阻塞当前线程,等待所有任务执行完成,或者超过z指定的等待时间,才会继续执行走下一步。

1
2
3
4
5
try {
executor.awaitTermination( 20l, TimeUnit.NANOSECONDS );
} catch (InterruptedException e) {
e.printStackTrace();
}

2.3 ScheduledExecutorService

ScheduledExecutorService是继承了ExecutorService的一个简单接口类,具有周期执行任务的特性。

ExecutorExecutorService的方法通过java系统安排怎样运行,而没有引入任何人为延迟干预。0或者负数意味着请求需要即可执行。

接下来,我们使用Runnable和Callable接口来定义任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void execute() {
ScheduledExecutorService executorService
= Executors.newSingleThreadScheduledExecutor();

Future<String> future = executorService.schedule(() -> {
// ...
return "Hello world";
}, 1, TimeUnit.SECONDS);

ScheduledFuture<?> scheduledFuture = executorService.schedule(() -> {
// ...
}, 1, TimeUnit.SECONDS);

executorService.shutdown();
}

ScheduledExecutorService可以设定固定延迟时间,并在过了延迟时间后执行任务。

1
2
3
4
5
6
7
executorService.scheduleAtFixedRate(() -> {
// ...
}, 1, 10, TimeUnit.SECONDS);

executorService.scheduleWithFixedDelay(() -> {
// ...
}, 1, 10, TimeUnit.SECONDS);

这里,scheduleAtFixedRate( Runnable command, long initialDelay, long period, TimeUnit unit ) 方法创建并执行一个周期动作。首先过了初始延迟时间后任务被唤醒,接下来都按给定周期唤醒任务,直到服务实例被关闭。

scheduleWithFixedDelay( Runnable command, long initialDelay, long delay, TimeUnit unit ) 创建并执行一个周期动作。首先过了初始延迟时间后任务被唤醒,然后老任务执行结束后,新任务开始之前都会延迟指定时间。

2.4 Future

Future用来描述异步操作的结果。它有一组方法用于校验异步操作是否完成,获取计算结果等等。

另外,cancel(boolean mayInterruptIfRunning)方法结束操作并释放正在运行的线程。如果mayInterruptIfRunning是true,则正在执行的任务会马上被中止。否则,允许正在执行中的任务先执行完。

通过下面代码段可以创建一个future实例:

1
2
3
4
5
6
7
8
9
public void invoke() {
ExecutorService executorService = Executors.newFixedThreadPool(10);

Future<String> future = executorService.submit(() -> {
// ...
Thread.sleep(10000l);
return "Hello world";
});
}

下面代码段展示了判断future结果是否准备好,若计算已经完成,则获取结果。

1
2
3
4
5
6
7
if (future.isDone() && !future.isCancelled()) {
try {
str = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}

也可以为获取结果方法指定超时时间。如果超过指定时间,则抛出TimeoutException异常。

1
2
3
4
5
try {
future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}

2.5 CountDownLatch

CountDownLatch工具类,jdk5开始出现,用于阻塞线程集合直到完成某些操作。

CountDownLatch通过counter(Integer type)进行初始化;计数器随着相关联的线程任务执行完成而减数。当计数器数值减为0时,被阻塞的线程获得释放。

2.6 CyclicBarrier

CyclicBarrierCountDownLatch相比功能差不多,除了CyclicBarrier可以复用。它允许线程间可以通过await()方法(阻碍条件)等待其他线程,等到所有线程都处于await()时,再调用最终任务。

接下来,我们创建一个Runnable任务实例初始化阻碍条件(barrier condition):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Task implements Runnable {

private CyclicBarrier barrier;

public Task(CyclicBarrier barrier) {
this.barrier = barrier;
}

@Override
public void run() {
try {
LOG.info(Thread.currentThread().getName() + " is waiting");
barrier.await();
LOG.info(Thread.currentThread().getName() + " is released");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}

}

调用三个线程做竞争。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void start() {

CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
// ...
LOG.info("All previous tasks are completed");
});

Thread t1 = new Thread(new Task(cyclicBarrier), "T1");
Thread t2 = new Thread(new Task(cyclicBarrier), "T2");
Thread t3 = new Thread(new Task(cyclicBarrier), "T3");

if (!cyclicBarrier.isBroken()) {
t1.start();
t2.start();
t3.start();
}
}

这里,isBroken()方法检查,当时是否有线程被中断。我们应该总是在执行实际逻辑时,执行这个检查。

2.7 Semaphore

Semaphore用于阻塞对某些物理或者逻辑资源的线程级访问。一个semaphore包含一个permit集合;无论何时当一个线程想要访问临界区,它都需要检查semaphore还有没有可用的permit。

如果没有一个permit可用(via tryAcquire()),那么这个线程就不允许进入临界区;反之,则授予访问权限,同时permit计数器减一。

一旦正在执行的线程释放了临界区,那么permit计数器就会加一(通过release()方法)。

tryAcquire(long timeout, TimeUnit unit) 可以让我们为获取的访问权限指定超时时间。

我们还可以获取可用permit的数量或请求permit的线程的数量。

下面的代码段来实现一个semaphore。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static Semaphore semaphore = new Semaphore(10);

public void execute() throws InterruptedException {

LOG.info("Available permit : " + semaphore.availablePermits());
LOG.info("Number of threads waiting to acquire: " +
semaphore.getQueueLength());

if (semaphore.tryAcquire()) {
semaphore.acquire();
// ...
semaphore.release();
}

}

可以使用Semaphore实现类Mutex的数据结果(互斥数据结构)。

2.8 ThreadFactory

ThreadFactory类似于一个线程池(实际不存在),按需创建新线程。它封装了大量用于有效创建线程的基础代码,让我们可以方便创建线程。

下面定义一个ThreadFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class MyThreadFactory implements ThreadFactory {
private int threadId;
private String name;

public MyThreadFactory(String name) {
threadId = 1;
this.name = name;
}

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, name + "-Thread_" + threadId);
LOG.info("created new thread with id : " + threadId +
" and name : " + t.getName());
threadId++;
return t;
}
}

通过newThread(Runnable r)方法再运行时创建新线程。

1
2
3
4
5
6
MyThreadFactory factory = new MyThreadFactory( 
"MyThreadFactory");
for (int i = 0; i < 10; i++) {
Thread t = factory.newThread(new Task());
t.start();
}

2.9 BlockingQueue

异步编程中,最常见的集成模式(设计模式)之一是生产者消费者模式java.util.concurrent包里面有一个数据结构BlockingQueue,在异步场景中非常有用。

2.10 DelayQueue

DelayQueue是一个无限阻塞队列,超过指定过期时间后,里面的元素只能被拉取(pull)。因此,最顶层的元素将经过最长的延迟时间被弹出(poll)。

2.11 Locks

Lock是一个工具类,用于阻塞当前的线程和别的线程访问特定部分的代码。

Lock和同步阻塞(synchronized)的主要区别是同步阻塞的实现完完全全就在一个方法里面,而Lock可以在不同的方法中执行它提供的lock()unlock()操作。。

2.12 Phaser

Phaser是一个比CyclicBarrierCountDownLatch更灵活的方案。表现的像一个可复用的barrier,在继续执行下一阶段前,当前参与的线程都会为彼此等待。通过在各个程序阶段复用Phaser实例,我们可以协调多个线程按阶段完成任务。

3 总结

这篇文章主要过了一遍java.util.concurrent包的各个功能。


本文为译文,作者通过翻译达到学习目的。 原文链接 | 原文源码链接 | 本站源码链接