1 介绍

简单地讲,锁是一种比标准的synchronized代码块更加灵活和精巧的线程同步机制。

Lock自Java1.5引入。定义在java.util.concurrent.lock包中,提供与锁相关的可扩展操作。

这篇文章研究一下Lock接口类的不同实现和它们的应用场景。

2 Lock和Synchronized Block的区别

一些synchronized blockLockAPI之间的区别:

  • synchronized block整个存在于一个方法中 - *LockAPI的lock()和*unlock() 操作可以在不同的方法中
  • synchronized block不支持公平,任何线程在锁释放后都可以获得锁,不能设置获取锁的条件。而使用LockAPI通过设置公平属性使获取锁的保证公平。这样可以确保最长等待线程优先获得锁。
  • 如果线程没有获得synchronized block的访问权就会被阻塞。LockAPI提供了tryLock() 方法,线程可以在适当的时候获取锁。这样可以减少线程阻塞的事件。
  • 线程处于等待获取synchronized block访问权时,不能被中断。LockAPI提供lockInterruptibly() 方法当等待锁时可以中断。

3 Lock API

看看Lock接口类中的方法:

  • void lock() - 请求锁,如果拿不到锁则线程会阻塞,直到锁释放
  • void lockInterruptibly() - 和lock()相似,但是它允许阻塞线程抛出java.lang.InterruptedException中断请求锁而继续执行
  • boolean tryLock() - 这是lock() 方法的非阻塞版本;它尝试立即获得锁,如果锁成功返回true
  • boolean tryLock(long timeout, TimeUnit timeUnit) - 和tryLock() 相似,在放弃获取锁之前等待给定超时时间
  • void unlock() - 解锁Lock实例

一个被锁的实例应该总是要执行unlock以避免死锁情况。使用锁阻塞线程的推荐写法应该要有try/catchfinally块:

1
2
3
4
5
6
7
Lock lock = ...; 
lock.lock();
try {
// access to the shared resource
} finally {
lock.unlock();
}

除了Lock接口类,还有一个ReadWriteLock接口类维护一对锁,一个用于只读操作,一个用于写操作。只读锁可以同时由多个线程持有。

ReadWriteLock的请求只读或者写锁的方法声明:

  • Lock readLock() - 返回只读锁
  • Lock writeLock() - 返回写锁

4 Lock实现

4.1 ReentrantLock

ReentrantLock类实现了Lock接口类。它提供相同的并发和内存语义,使用synchronized方法和指令访问内部的监控器锁,已经一些扩张功能:

下面看看,怎么将ReentrantLock用于同步:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class SharedObject {
//...
ReentrantLock lock = new ReentrantLock();
int counter = 0;

public void perform() {
lock.lock();
try {
// Critical section here
count++;
} finally {
lock.unlock();
}
}
//...
}

确保将lock()unlock()调用放在try-finally块中以避免死锁情况。

下面看看tryLock()怎样工作:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void performTryLock(){
//...
boolean isLockAcquired = lock.tryLock(1, TimeUnit.SECONDS);

if(isLockAcquired) {
try {
//Critical section here
} finally {
lock.unlock();
}
}
//...
}

这个例子中,线程调用tryLock(), 等待一秒钟没有拿到锁就放弃。

4.2 ReentrantReadWriteLock

ReentrantReadWriteLock类实现了ReadWriteLock接口类。

看看一个线程获取ReadLock或者WriteLock的规则:

  • 读锁 - 如果没有线程获得写锁或者请求写锁,那么多个线程可以获得读锁
  • 写锁 - 如果没有线程在读或者写,那么有且仅有一个线程可以获得写锁

下面看看怎样使用ReadWriteLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class SynchronizedHashMapWithReadWriteLock {

Map<String,String> syncHashMap = new HashMap<>();
ReadWriteLock lock = new ReentrantReadWriteLock();
// ...
Lock writeLock = lock.writeLock();

public void put(String key, String value) {
try {
writeLock.lock();
syncHashMap.put(key, value);
} finally {
writeLock.unlock();
}
}
...
public String remove(String key){
try {
writeLock.lock();
return syncHashMap.remove(key);
} finally {
writeLock.unlock();
}
}
//...
}

对于写方法,需要用写锁将临界区包住,只有一个线程可以访问它:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Lock readLock = lock.readLock();
//...
public String get(String key){
try {
readLock.lock();
return syncHashMap.get(key);
} finally {
readLock.unlock();
}
}

public boolean containsKey(String key) {
try {
readLock.lock();
return syncHashMap.containsKey(key);
} finally {
readLock.unlock();
}
}

对于读锁,需要用读锁将临界区抱住。如果当前没有写操作,那么多个线程可以访问这个临界区。

4.3 StampedLock

StampedLockJava8引入。它也支持读写锁。不过,锁获取方法会放回一个邮戳(stamp),用于释放锁或者检查锁是否有效:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class StampedLockDemo {
Map<String,String> map = new HashMap<>();
private StampedLock lock = new StampedLock();

public void put(String key, String value){
long stamp = lock.writeLock();
try {
map.put(key, value);
} finally {
lock.unlockWrite(stamp);
}
}

public String get(String key) throws InterruptedException {
long stamp = lock.readLock();
try {
return map.get(key);
} finally {
lock.unlockRead(stamp);
}
}
}

StampedLock还提供了乐观锁(optimistic locking)。大多数时候,读操作不需要等待写操作完成再读。因此,严格的读锁并不需要。

下面,我们改下读锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public String readWithOptimisticLock(String key) {
long stamp = lock.tryOptimisticRead();
String value = map.get(key);

if(!lock.validate(stamp)) {
stamp = lock.readLock();
try {
return map.get(key);
} finally {
lock.unlock(stamp);
}
}
return value;
}

5 使用Conditions

Condition类支持一个线程在执行临界区的时候等待某些条件发生。

这个可以用在一个线程申请临界区的访问权但是没有执行操作必要的条件。例如,一个阅读器线程获得一个共享队列的锁,但是没有数据可以消费。

一般,Java提供wait()notify()notifyAll()方法用于线程间通信。Conditions也有相似的机制,但是它还可以提供多条件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ReentrantLockWithCondition {

Stack<String> stack = new Stack<>();
int CAPACITY = 5;

ReentrantLock lock = new ReentrantLock();
Condition stackEmptyCondition = lock.newCondition();
Condition stackFullCondition = lock.newCondition();

public void pushToStack(String item){
try {
lock.lock();
while(stack.size() == CAPACITY) {
stackFullCondition.await();
}
stack.push(item);
stackEmptyCondition.signalAll();
} finally {
lock.unlock();
}
}

public String popFromStack() {
try {
lock.lock();
while(stack.size() == 0) {
stackEmptyCondition.await();
}
return stack.pop();
} finally {
stackFullCondition.signalAll();
lock.unlock();
}
}
}

6 总结

这篇文章,列举了Lock接口类的不同实现和新引进的StampedLock类。然后,展示如何使用Condition类处理多条件的场景。


本文为译文,作者通过翻译达到学习目的。 原文链接 | 原文源码链接 | 本站源码链接


1 介绍

这篇文章介绍 *CountDownLatch*类,并通过例子演示它的用法。

基本上,使用CountDownLatch会在其他线程完成给定的任务之前阻塞线程。

2 并发编程的用法

简单地讲,CountDownLatch有个counter字段,对它进行递减操作。我们可以用它来阻塞当前线程直到counter归零。

如果我们做一些并发处理,可以实例化一个CountDownLatch,让它的counter值等于工作者线程数。每个线程结束时,都调用countdown() 方法,这样可以一直阻塞当前线程直到工作者线程都执行完成。

3 等待线程池完成

首先,创建一个Worker类,里面有一个类型为CountDownLatch的变量,用于通知它完成执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Worker implements Runnable {
private List<String> outputScraper;
private CountDownLatch countDownLatch;

public Worker(List<String> outputScraper, CountDownLatch countDownLatch) {
this.outputScraper = outputScraper;
this.countDownLatch = countDownLatch;
}

@Override
public void run() {
doSomeWork();
outputScraper.add("Counted down");
countDownLatch.countDown();
}
}

然后,创建一个单元测试,证明通过CountDownLatch等待Worker完成执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Test
public void whenParallelProcessing_thenMainThreadWillBlockUntilCompletion()
throws InterruptedException {

List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
CountDownLatch countDownLatch = new CountDownLatch(5);
List<Thread> workers = Stream
.generate(() -> new Thread(new Worker(outputScraper, countDownLatch)))
.limit(5)
.collect(toList());

workers.forEach(Thread::start);
countDownLatch.await();
outputScraper.add("Latch released");

assertThat(outputScraper)
.containsExactly(
"Counted down",
"Counted down",
"Counted down",
"Counted down",
"Counted down",
"Latch released"
);
}

“Latch released”永远最后输出。因为程序于依赖CounDownLatch释放。

如果没有调用await(),则不能保证线程的执行顺序,单元测试会失败。

4 线程池等待开始

如果采用上面的例子,这一次会设置上千个线程而不是5个,结果可能是前面的线程会在后面的线程还没调用start()的时候就已经处理完成。这样就很难重新并发的场景。

为了实现这个场景,我们通过不同的方式来使用CountdownLatch。与阻塞父线程等待子线程处理完成相关,我们阻塞各个子线程直到所有子线程都开始。

下面我们修改run()方法,在处理任务之前阻塞:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class WaitingWorker implements Runnable {

private List<String> outputScraper;
private CountDownLatch readyThreadCounter;
private CountDownLatch callingThreadBlocker;
private CountDownLatch completedThreadCounter;

public WaitingWorker(
List<String> outputScraper,
CountDownLatch readyThreadCounter,
CountDownLatch callingThreadBlocker,
CountDownLatch completedThreadCounter) {

this.outputScraper = outputScraper;
this.readyThreadCounter = readyThreadCounter;
this.callingThreadBlocker = callingThreadBlocker;
this.completedThreadCounter = completedThreadCounter;
}

@Override
public void run() {
readyThreadCounter.countDown();
try {
callingThreadBlocker.await();
doSomeWork();
outputScraper.add("Counted down");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
completedThreadCounter.countDown();
}
}
}

现在,修改单元测试。首先阻塞,直到所有Worker开始解除阻塞。然后结束前阻塞,直到所有Worker处理完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Test
public void whenDoingLotsOfThreadsInParallel_thenStartThemAtTheSameTime()
throws InterruptedException {

List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
CountDownLatch readyThreadCounter = new CountDownLatch(5);
CountDownLatch callingThreadBlocker = new CountDownLatch(1);
CountDownLatch completedThreadCounter = new CountDownLatch(5);
List<Thread> workers = Stream
.generate(() -> new Thread(new WaitingWorker(
outputScraper, readyThreadCounter, callingThreadBlocker, completedThreadCounter)))
.limit(5)
.collect(toList());

workers.forEach(Thread::start);
readyThreadCounter.await();
outputScraper.add("Workers ready");
callingThreadBlocker.countDown();
completedThreadCounter.await();
outputScraper.add("Workers complete");

assertThat(outputScraper)
.containsExactly(
"Workers ready",
"Counted down",
"Counted down",
"Counted down",
"Counted down",
"Counted down",
"Workers complete"
);
}

这种方式对于重现并发问题很有帮助,因为可以测试上千线程并发处理某个逻辑。

5 早点终止CountdownLatch

有时候,可能会出现WorkerCountDownLatch倒计时完之前就错误终止。这会导致CountDownLatch永远不会数到0,await()也永远不会终止:

1
2
3
4
5
6
7
8
@Override
public void run() {
if (true) {
throw new RuntimeException("Oh dear, I'm a BrokenWorker");
}
countDownLatch.countDown();
outputScraper.add("Counted down");
}

这里我们修改之前的单元测试使用BrokenWorker类,来展示await()永远阻塞的情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
public void whenFailingToParallelProcess_thenMainThreadShouldGetNotGetStuck()
throws InterruptedException {

List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
CountDownLatch countDownLatch = new CountDownLatch(5);
List<Thread> workers = Stream
.generate(() -> new Thread(new BrokenWorker(outputScraper, countDownLatch)))
.limit(5)
.collect(toList());

workers.forEach(Thread::start);
countDownLatch.await();
}

当然这不是我们想要的,最好让应用继续执行而不是永远阻塞。
怎么解决这个问题呢,就是给await()指定超时时间:

1
2
boolean completed = countDownLatch.await(3L, TimeUnit.SECONDS);
assertThat(completed).isFalse();

这样这个测试会超时,然后返回false

6 总结

这篇文章,我们展示如何使用CountDownLatch阻塞线程等待其他线程完成处理。

同时,也展示了怎样让线程并行执行来调试并发问题。


本文为译文,作者通过翻译达到学习目的。 原文链接 | 原文源码链接 | 本站源码链接


1 介绍

Java8引入了流的概念,一种对数据执行批量操作的有效方式。并行流用在支持并发的场景中。

以多线程为代价,这些流能够提高处理效率。

这篇文章介绍一下流API最大的限制之一以及怎样使用自定义的线程池实例处理并行流工作,也可以通过这个来实现。

2 并行流(Parallel Stream)

首先展示一个小例子。在任一集合类型上调用parallelStream方法,返回一个并行流。

1
2
3
4
5
6
7
@Test
public void givenList_whenCallingParallelStream_shouldBeParallelStream(){
List<Long> aList = new ArrayList<>();
Stream<Long> parallelStream = aList.parallelStream();

assertTrue(parallelStream.isParallel());
}

在这个流中的默认处理方式是使用ForkJoinPool.commonPool(),这个公共线程池供整个应用共享。

3 自定义线程池

当处理流时,我们可以传递了一个自定义线程池。

下面的例子让并行流使用自定义线程池计算1到10000000的总和:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Test
public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal()
throws InterruptedException, ExecutionException {

long firstNum = 1;
long lastNum = 1000000;

List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
.collect(Collectors.toList());

ForkJoinPool customThreadPool = new ForkJoinPool(4);
long actualTotal = customThreadPool.submit(
() -> aList.parallelStream().reduce(0L, Long::sum)).get();

assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
}

首先,使用ForkJoinPool构造方法创建一个并发等级为4的实例。有些试验要求为不同环境指定最优的等级,不过也可以简单地根据自己的cpu核数来指定。

接着,处理并行流的内容。调用reduce求和。

这个例子没有展现出使用自定义线程池的所有用处,但是在某些场景好处显而易见。比如公共线程池在执行长任务,或者公共线程池被用于应用的其它组件。

4 总结

这篇文档,简要地介绍了如何使用自定义线程池处理并行流。在正确的环境下,使用合适的并行等级,能很好地提高处理能力。


本文为译文,作者通过翻译达到学习目的。 原文链接 | 原文源码链接 | 本站源码链接


1 介绍

Java7引入了fork/join框架。它利用所有可用的处理器来加速并行处理,通过分而治之的方法完成。

实践中,这意味着框架先“forks”,递归地将任务分解成更小的独立子任务,直到它们足够简单,再异步进行处理。

然后,“join”部分开始,所有子任务的结果递归地合并处理成单个结果,或者如果任务的返回是void,程序简单地等待每个子任务执行完。

为了让并发执行更有效率,fork/join框架使用叫ForkJoinPool的线程池,它管理ForkJoinWorkerThread类型的线程。

2 ForkJoinPool

ForkJoinPool是框架核心,是ExecutorService的实现类,管理工作者线程并且提供获取线程池信息和性能的工具。

工作者线程一次处理一个任务,但是ForkJoinPool不会为每个子任务创建单独的线程。相反,每个线程池的线程都会有一个双向队列存储任务。

这个机制,用到了工作窃取算法,对于平衡线程的工作负载很重要。

2.1 工作窃取算法

简单来说,空闲线程会尝试从繁忙线程的双向队列窃取工作。

一个工作线程默认会从它的双向队列头部拿去任务。当自己的队列为空后,线程就会从另外一个忙线程的双向队列尾部获取线程或者从全局队列获取,因为这里保存有最多的工作。

因为线程会先去找存在大量工作的队列,所以这个方法极大降低了线程竞争任务的可能,也减少了线程寻找任务的次数。

2.2 ForkJoinPool实例化

Java8中,最便捷的访问ForkJoinPool实例的方式是使用静态方法 *commonPool()。和它的名字一样,它会返回一个通用线程池的引用,处理ForkJoinTask*的默认线程池。

根据Oracle文档,使用预定义的通用线程池会减少资源浪费,它不会为每个任务创建单独的线程池。

1
ForkJoinPool commonPool = ForkJoinPool.commonPool();

类似于上述操作,Java7中可以通过创建一个ForkJoinPool,并把它保存在一个工具类的静态字段中。

1
public static ForkJoinPool forkJoinPool = new ForkJoinPool(2);

现在,可以访问它了。

1
ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool;

通过ForkJoinPool的构造方法,可以在创建线程池时指定并发级别,线程工厂和异常处理方法。例如上面代码,线程池的并发等级是2,即这个线程池使用两个处理器内核。

3 ForkJoinTask

ForkJoinTask是在ForkJoinPool中执行的任务的基本类型。在实际使用中,任务类需要继承它的两个子类之一。其中,RecursiveAction的任务没有返回值,RecursiveTask的任务有返回值。它们都有一个抽象方法compute()用于定义任务具体逻辑。

3.1 RecursiveAction - 一个例子

下面的例子,处理的工作量由String类型的变量workload表示。例子的任务没有实际意义,只是简单地将输入数据转成大写并打印日志。

为了演示这个框架的forking行为,例子里在workload的长度大于指定阈值时,用createSubtask()分解任务。

字符串递归地被分割成子字符串,并基于这些子字符串创建CustomRecursiveTask实例。

最后,方法返回一个List<CustomRecursiveAction类型的结果。

上面的列表通过invokeAll()方法提交给ForkJoinPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class CustomRecursiveAction extends RecursiveAction {

private String workload = "";
private static final int THRESHOLD = 4;

private static Logger logger =
Logger.getAnonymousLogger();

public CustomRecursiveAction(String workload) {
this.workload = workload;
}

@Override
protected void compute() {
if (workload.length() > THRESHOLD) {
ForkJoinTask.invokeAll(createSubtasks());
} else {
processing(workload);
}
}

private List<CustomRecursiveAction> createSubtasks() {
List<CustomRecursiveAction> subtasks = new ArrayList<>();

String partOne = workload.substring(0, workload.length() / 2);
String partTwo = workload.substring(workload.length() / 2, workload.length());

subtasks.add(new CustomRecursiveAction(partOne));
subtasks.add(new CustomRecursiveAction(partTwo));

return subtasks;
}

private void processing(String work) {
String result = work.toUpperCase();
logger.info("This result - (" + result + ") - was processed by "
+ Thread.currentThread().getName());
}
}

3.2 RecursiveTask

对于有返回值的任务,除了计算任务结果,这里的逻辑都比较简单。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class CustomRecursiveTask extends RecursiveTask<Integer> {
private int[] arr;

private static final int THRESHOLD = 20;

public CustomRecursiveTask(int[] arr) {
this.arr = arr;
}

@Override
protected Integer compute() {
if (arr.length > THRESHOLD) {
return ForkJoinTask.invokeAll(createSubtasks())
.stream()
.mapToInt(ForkJoinTask::join)
.sum();
} else {
return processing(arr);
}
}

private Collection<CustomRecursiveTask> createSubtasks() {
List<CustomRecursiveTask> dividedTasks = new ArrayList<>();
dividedTasks.add(new CustomRecursiveTask(
Arrays.copyOfRange(arr, 0, arr.length / 2)));
dividedTasks.add(new CustomRecursiveTask(
Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
return dividedTasks;
}

private Integer processing(int[] arr) {
return Arrays.stream(arr)
.filter(a -> a > 10 && a < 27)
.map(a -> a * 10)
.sum();
}
}

这个例子中,工作被保存在CustomRecursiveTask类的名为arr的列表类型的字段里。createSubtask()方法递归地将任务分解成小于阈值的子任务,然后使用invokeAll()提交这些子任务到线程池,最后返回Future类型的结果列表。

每个子任务调用join()触发执行。

代码使用了Java8的Stream APIsum()方法将子任务结果合并成最终结果。

4 提交任务给ForkJoinPool

提交任务给线程池,有以下一些方法。
submit()或者execute()方法(它们的用法一样):

1
2
forkJoinPool.execute(customRecursiveTask);
int result = customRecursiveTask.join();

invoke()方法fork任务并且等待结果,不需要主动join:

1
int result = forkJoinPool.invoke(customRecursiveTask);

invokeAll()方法是最便捷的方式提交一系列ForkJoinTasks任务到ForkJoinPool。它将任务作为参数,fork它们并返回Future类型的对象列表,列表按结果产生先后顺序。

1
2
customRecursiveTaskFirst.fork();
result = customRecursiveTaskLast.join();

在这个RecursiveTask的例子中,使用了invokeAll()方法提交一系列子任务到线程池。使用fork()join()也可以完成相同的工作。

为了避免混淆,通常推荐使用invokeAll()方法提交多个任务到ForkJoinPool

5 总结

使用fork/join框架能够加速处理大量任务,不过要遵循一些准则:

  • 尽量少使用线程池 - 最好每个应用或者系统使用一个线程池
  • 使用默认通用线程池,如果没有特殊要求
  • 使用合适的阈值拆解ForkJoinTask
  • 避免在ForkJoinTask中做阻塞动作

本文为译文,作者通过翻译达到学习目的。 原文链接 | 原文源码链接 | 本站源码链接


1 简介

ExecutorService是由JDK提供的用于简化任务异步执行的框架。ExecutorService会自动创建线程池,并提供接口提交任务。

2 实例化ExecutorService

2.1 Executors类的工厂方法

创建ExecutorService最简单方式就是使用Executors类的工厂方法之一。
例如,下面的代码将会创建10个线程的线程池。

1
ExecutorService executor = Executors.newFixedThreadPool(10);

还有其他工厂方法创建一些满足具体场景的预定义ExecutorService。更多关于介绍查看Oracle’s official documentation

2.2 直接创建一个ExecutorService

因为ExecutorService*是接口类,所有创建一个它的实现类的实例就可以。在 *java.util.concurrent 包中有几个它实现类。

1
2
3
ExecutorService executorService = 
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());

你可能会注意到上述代码与工厂方法newSingleThreadExecutor()源码很相似。

3 给ExecutorService分配任务

ExecutorService可以执行RunnableCallable任务。下面代码使用lambda表达式定义任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Runnable runnableTask = () -> {
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
};

Callable<String> callableTask = () -> {
TimeUnit.MILLISECONDS.sleep(300);
return "Task's execution";
};

List<Callable<String>> callableTasks = new ArrayList<>();
callableTasks.add(callableTask);
callableTasks.add(callableTask);
callableTasks.add(callableTask);

将任务分配给ExecutorService有几个方法,包括execute()集成自Executor,还有submit(),invokeAny(),invokeAll()等。
execute()方法没有返回值,执行这个方法,得不到任务的执行结果也不能检查任务的状态。

1
executorService.execute(runnableTask);

submit()方法接收一个CallableRunnable任务,并返回Future类型的结果。

1
2
Future<String> future = 
executorService.submit(callableTask);

invokeAny()接收一个任务集合,批量执行,任意一个任务执行成功就返回。

1
String result = executorService.invokeAny(callableTasks);

invokeAll接收一个任务集合,批量执行,所有任务执行结果都放在Future对象中以列表的形式返回。

1
List<Future<String>> futures = executorService.invokeAll(callableTasks

在进一步讨论前,还需要讨论下两个内容:关闭ExecutorService和处理返回类型Future。

4 关闭ExecutorService

通常,当没有任务处理的时候,ExecutorService不会自动摧毁。它会一直等待新任务。

在某些情况下很有用。例如,应用中任务出现时间不规则或者任务的数量在编译阶段不知道。

另外,应用已经执行完,但是它不会停下来,因为处于等待状态的ExecutorService会导致JVM保持运行状态。

关闭ExecutorService有两个方法分别为shutdown()shutdownNow()

shutdownNow()方法会立即尝试销毁ExecutorService,它不能保证所有运行中的线程同时停止。这个方法会返回等待被处理的任务列表。供开发者决定如何处理。

1
List<Runnable> notExecutedTasks = executorService.shutDownNow();

一个好的关闭ExecutorService的方法(Oracle推荐)是使用这些方法时和awaitTermination()结合使用。使用awaitTermination()方法,ExecutorService首先会停止接收新任务,提供一定时间让所有任务完成。如果时间超出,执行会立马停止:

1
2
3
4
5
6
7
8
executorService.shutdown();
try {
if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}

5 Future接口类

submit()invokeAll() 方法返回一个Future类型的对象或者列表,通过Future对象可以拿到任务执行结果或者校验任务状态。

Future接口类提供了一个阻塞式的方法get()Callable任务会返回实际执行结果,Runnable任务则返回null。当任务在执行的时候调用get()会造成当前线程阻塞,直到任务执行完成可以拿到结果后才恢复执行。

1
2
3
4
5
6
7
Future<String> future = executorService.submit(callableTask);
String result = null;
try {
result = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}

如果调用get()方法要阻塞很久,那么应用性能就降级了。如果执行结果不重要,那么在调用时指定超时时间来避免这种问题:

1
String result = future.get(200, TimeUnit.MILLISECONDS);

如果执行时间比指定超时时间长,则会抛出TimeoutException

isDone()方法用来校验分配的任务是否执行完毕。

Future接口类也提供了结束任务执行的方法cancel()方法,以及校验任务是否结束的isCancelled()方法:

1
2
boolean canceled = future.cancel(true);
boolean isCancelled = future.isCancelled();

6 ScheduledExecutorService接口类

ScheduledExecutorService执行任务可以设置预定义延迟和周期。实例化ScheduledExecutorService的最佳方式是使用Executors类的工厂方法。
下面代码片段,创建了单线程的ScheduledExecutorService

1
2
ScheduledExecutorService executorService = Executors
.newSingleThreadScheduledExecutor();

在固定延迟后调用任务,可以使用ScheduledExecutorServicescheduled()方法。有两个方法一个用于执行Runnable任务,一个用于执行Callable任务。

下面的代码在执行callableTask之前会延迟1秒:

1
2
Future<String> resultFuture = 
executorService.schedule(callableTask, 1, TimeUnit.SECONDS);

scheduleAtFixedRate()方法会在固定延迟后周期性执行一个任务。

下面的代码在延迟100毫秒后执行一个任务,并且每隔450毫秒再次执行相同任务。如果处理器需要比周期时间更长的事件来处理任务,ScheduledExecutorService将等待当前任务执行完成再开始下一个:

1
2
Future<String> resultFuture = service
.scheduleAtFixedRate(runnableTask, 100, 450, TimeUnit.MILLISECONDS);

如果任务迭代执行中间需要固定长度的延迟时间,则要用到scheduleWithFixedDelay()。例如,下面的代码在上一个任务结束到下一个任务开始间有一个150毫秒的固定暂停。

1
service.scheduleWithFixedDelay(task, 100, 150, TimeUnit.MILLISECONDS);

根据scheduleAtFixedRate()scheduleWithFixedDelay()方法声明,任务的执行周期会因为ExecutorService的终止或者任务抛出异常而终止。

7 ExecutorService vs. Fork/Join

Java7发布以后,许多开发者认为ExecutorService框架应该被fork/join框架代替。这个观点不太对。尽管fork/join框架用法更简单性能更佳,但是会控制并发执行的开发者人数也下降了。

ExecutorService是开发者可以控制线程的数量和任务的粒度。ExecutorService最佳使用场景是独立任务的处理,比如事务或者请求,根据一个线程执行一个任务的原则。

相比较,根据Oracle文档,fork/join框架设计成加速任务处理,递归地将工作分解成更小的单元。

8 总结

虽然ExecutorService比较简单,但是也有一些常见的问题。总结如下:

保持一个未用的ExecutorService处于执行状态:第四节做了详细解释
错误的线程池容量当使用固定长度的线程池时:判断需要多少线程才能有效地执行任务非常重要。线程池太大会导致不必要的浪费,创建的线程大多处于等待状态。太少会让应用看起来没有响应,因为任务需要等待很长时间才能分配到线程处理。
调用Future的get()方法在任务结束后:尝试获取已经结束的任务的执行结果会抛CancellationException
Future的get()方法导致不可预期的阻塞时间:应该指定超时时间来避免不可预期的等待。


本文为译文,作者通过翻译达到学习目的。 原文链接 | 原文源码链接


1 简介

这篇文章讨论守护进程有什么用途,以及它们跟用户线程的区别。

2 守护线程和用户线程的区别

Java提供两类线程:用户线程(user thread)和守护线程(daemon thread)。

用户线程是高优先级线程。JVM在终止用户线程之前,会等待它们完成任务。

相反,守护线程是低优先级线程,它唯一角色就是为用户线程提供服务。

因为守护线程是服务用户线程的并且只用在用户线程执行的时候,所以当所有用户线程结束执行时,守护线程不会阻止JVM退出。

这就是为什么守护进程存在无限循环也不会有问题的原因。因为一旦所有用户线程结束执行,任何代码包括finally块都不会执行。也因此,对于I/O任务,不推荐使用守护线程。

不过,也有一些情况不遵循这个规则。比如,在运行的守护线程上调用Thread.join()就可以阻塞程序退出。

3 守护线程的用法

守护线程对后台辅助任务很有用,比如垃圾收集(gc),释放无用对象的内存和从缓存删除不需要的条目、大多数的JVM线程是守护线程。

4 创建一个守护线程

将线程设置为守护线程,只需要调用Thread.setDaemon()。例如,下面继承了ThreadNewThread类:

1
2
3
NewThread daemonThread = new NewThread();
daemonThread.setDaemon(true);
daemonThread.start();

任何线程设置了守护状态即创建了守护线程。因为main线程是用户线程,所以任何在main方法中创建的线程默认都是用户线程。

setDaemon()方法只能在Thread对象被创建后还没有调用start()前调用。尝试对运行中的线程调用setDaemon()方法会抛出IllegalThreadStateException

1
2
3
4
5
6
@Test(expected = IllegalThreadStateException.class)
public void whenSetDaemonWhileRunning_thenIllegalThreadStateException() {
NewThread daemonThread = new NewThread();
daemonThread.start();
daemonThread.setDaemon(true);
}

5 校验线程是不是守护线程

最后,验证一个线程是不是守护线程,只需调用 isDaemon() 就可以。

1
2
3
4
5
6
7
8
9
10
11
@Test
public void whenCallIsDaemon_thenCorrect() {
NewThread daemonThread = new NewThread();
NewThread userThread = new NewThread();
daemonThread.setDaemon(true);
daemonThread.start();
userThread.start();

assertTrue(daemonThread.isDaemon());
assertFalse(userThread.isDaemon());
}

6 总结

这篇文章,介绍了守护线程是什么,用在什么场景中。


本文为译文,作者通过翻译达到学习目的。 原文链接 | 原文源码链接 | 本站源码链接


1 简介

这篇文章讨论Thread类的多个join()方法。了解下这些方法的细节,写几个例子。

就像wait()notify方法,join()也用于线程内同步。

关于wait()notify更多内容,查看这篇文章

2 Thread.join()方法

join方法定义在Thread类中:

public final void join() throws InterruptedException

Waits for this thread to die.

当调用一个线程的join()方法时,当前的线程会进入等待状态,直到被调用的线程终止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class SampleThread extends Thread {
public int processingCount = 0;

SampleThread(int processingCount) {
this.processingCount = processingCount;
LOGGER.info("Thread Created");
}

@Override
public void run() {
LOGGER.info("Thread " + this.getName() + " started");
while (processingCount > 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOGGER.info("Thread " + this.getName() + " interrupted");
}
processingCount--;
}
LOGGER.info("Thread " + this.getName() + " exiting");
}
}

@Test
public void givenStartedThread_whenJoinCalled_waitsTillCompletion()
throws InterruptedException {
Thread t2 = new SampleThread(1);
t2.start();
LOGGER.info("Invoking join");
t2.join();
LOGGER.info("Returned from join");
assertFalse(t2.isAlive());
}

正常情况下会打印如下日志:

1
2
3
4
5
INFO: Thread Created
INFO: Invoking join
INFO: Thread Thread-1 started
INFO: Thread Thread-1 exiting
INFO: Returned from join

当被调用的线程中断时,join() 方法也会返回。这种情况下,被调用线程会抛InterruptedException

最后,如果被调用的线程已经结束或者没有启动,调用join()方法也会立即返回。

1
2
Thread t1 = new SampleThread(0);
t1.join(); //returns immediately

3 带有超时时间的Thread.join()方法

被调用线程被阻塞或者执行很久,join() 方法会一直处于等待状态。这会导致当前线程处于无响应状态。为了解决这种问题,我们可以使用 join() 的重载方法,它可以指定超时时间。

有两个这样的重载方法:

public final void join(long millis) throws InterruptedException

Waits at most millis milliseconds for this thread to die. A timeout of 0 means to wait forever.

public final void join(long millis,int nanos) throws InterruptedException

Waits at most millis milliseconds plus nanos nanoseconds for this thread to die.

这样使用重载方法:

1
2
3
4
5
6
7
8
@Test
public void givenStartedThread_whenTimedJoinCalled_waitsUntilTimedout()
throws InterruptedException {
Thread t3 = new SampleThread(10);
t3.start();
t3.join(1000);
assertTrue(t3.isAlive());
}

上述例子中,当前线程最多花1秒钟等待线程t3完成,如果超过1秒钟还没有完成,join()方法会立即返回,把控制权交给当前线程。

带超时时间的join()方法,定时机制依赖于操作系统,所以超时时间不一定精确执行。

4 Thread.join()方法和同步

调用join方法,除了会一直等待到被调用线程结束,还有另外一个同步的副作用。会创建一个happens-before的关系。

All actions in a thread happen-before any other thread successfully returns from a join() on that thread.

一个线程的所有动作都会在成功调用join()之前发生。

这里的意思是当线程t1调用t2.join(),t2做的所有改变当join()返回后都是可见的。但是,如果我们没有调用join()或者使用其他的同步方式,就不能保证其他线程发生的变化能被当前线程看到,即使其他线程已经完成。

因此,就算调用join()方法的时候,线程已经处于结束状态时而且会马上返回,我们在某些场景仍然需要调用一次。

看看下面不正确的同步例子:

1
2
3
4
5
6
SampleThread t4 = new SampleThread(10);
t4.start();
// not guaranteed to stop even if t4 finishes.
do {

} while (t4.processingCount > 0);

为了正确地让上述代码达到同步效果,需要在上面循环中增加一句 t4.join() 或者使用其他同步机制。

5 总结

这篇文章讨论了join()方法和它们的行为,并展示了使用它们的小例子。可以看到join()方法在线程内同步中非常有用。


本文为译文,作者通过翻译达到学习目的。 原文链接 | 原文源码链接 | 本站源码链接


1 简介

从Java早期开始,多线程就是它主要的组成部分。Runnable是用于表示多线程任务的核心接口,Callable在Java1.5引入,是Runnable的增强版。

这篇文章来探索一下两个接口的应用差异。

2 执行机制

两个接口都是被设计成表示多线程执行的任务。Runnable任务可以由Thread类或者ExecutorService执行,而Callable任务只能由ExecutorService执行。

3 返回值

让我们深入看看这两个接口如何处理返回值。

3.1 使用Runnable

Runnable是一个函数式接口类,只有一个run()方法。不接收传参,也不返回结果。

这适用于不需要线程执行结果的场景,比如事件日志。

1
2
3
public interface Runnable {
public void run();
}

通过下述例子理解下:

1
2
3
4
5
6
7
8
9
public class EventLoggingTask implements  Runnable{
private Logger logger
= LoggerFactory.getLogger(EventLoggingTask.class);

@Override
public void run() {
logger.info("Message");
}
}

在这个例子中,线程仅仅从队列读取消息并记录到日志文件中。这个任务没有返回值,可以由ExecutorService执行:

1
2
3
4
5
public void executeTask() {
executorService = Executors.newSingleThreadExecutor();
Future future = executorService.submit(new EventLoggingTask());
executorService.shutdown();
}

这里的Future对象不会持有任何值。

3.2 使用Callable

Callable是一个泛型接口类,含有一个call()方法,返回泛型值V:

1
2
3
public interface Callable<V> {
V call() throws Exception;
}

看看一个数值阶乘的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class FactorialTask implements Callable<Integer> {
int number;

// standard constructors

public Integer call() throws InvalidParamaterException {
int fact = 1;
// ...
for(int count = number; count > 1; count--) {
fact = fact * count;
}

return fact;
}
}

下面测试中call()方法返回值由Future对象持有。

1
2
3
4
5
6
7
@Test
public void whenTaskSubmitted_ThenFutureResultObtained(){
FactorialTask task = new FactorialTask(5);
Future<Integer> future = executorService.submit(task);

assertEquals(120, future.get().intValue());
}

4 异常处理

接下来看看怎样处理异常合适。

4.1 使用Runnable

因为它的方法声明没有抛出异常,所以没有办法传递异常出去。

4.2 使用Callable

Callable’s call()方法含有“throws Exception”,因此可以抛异常出去。

1
2
3
4
5
6
7
8
9
10
public class FactorialTask implements Callable<Integer> {
// ...
public Integer call() throws InvalidParamaterException {

if(number < 0) {
throw new InvalidParamaterException("Number should be positive");
}
// ...
}
}

如果使用ExecutorService执行Callable,异常会存在Future对象中,可通过调用Future.get()方法来核实。实际上,它会抛出ExecutionException, 封装了原来的异常。

1
2
3
4
5
6
7
@Test(expected = ExecutionException.class)
public void whenException_ThenCallableThrowsIt() {

FactorialCallableTask task = new FactorialCallableTask(-5);
Future<Integer> future = executorService.submit(task);
Integer result = future.get().intValue();
}

上面的测试中,我们传进去了一个不合法的整数导致抛了ExecutionException。可以通过调用这个异常对象的getCause()方法,可以获取原来的异常。

如果没有调用Futureget()方法 ,由call()方法抛出的异常就不会抛到外层,而任务也被标记为完成。

1
2
3
4
5
6
7
@Test
public void whenException_ThenCallableDoesntThrowsItIfGetIsNotCalled(){
FactorialCallableTask task = new FactorialCallableTask(-5);
Future<Integer> future = executorService.submit(task);

assertEquals(false, future.isDone());
}

上面的测试会正常通过,尽管任务会因为传入了非法的整数抛了异常。

5 总结

这篇文章探索了RunnableCallable接口类之间的差异。


本文为译文,作者通过翻译达到学习目的。 原文链接 | 原文源码链接 | 本站源码链接


1 简介

这篇文章讨论下java中最基础的机制之一-线程同步。

首先,介绍一些并发相关的术语和方法。然后,通过实现一个用于解决并发问题的简单例子来更好的理解waitnotify方法。

2 Java中的线程同步

在多线程环境中,多个线程会尝试修改同一资源。如果线程管理不得当,就会引起一致性问题。

2.1 Java中的Guarded Blocks

Guarded Blocks(保护块)是Java中用于协调多线程行为的方法之一。这些保护块会在重新开始执行之前检查是否满足具体条件。

知道这些后,接下来将会用到如下方法:

  • Object.wait()-挂起线程
  • Object.notify()-唤醒进程

下图展示了线程的生命周期,可以让帮助我们理解。

线程生命周期

其实有很多方法可以操控线程生命周期。不过这篇文章,我们只关注wait()notify()方法。

3 wait()方法

简单地讲,当调用wait()时,当前线程会被切换到等待状态,直到处于同一个对象(object)上的其他线程调用notify()notifyAll,才重新启动。
为此,当前线程必须拥有该对象监视器(object monitor)。根据官方文档,拥有对象监视器的情况有以下三种:

  • 执行该对象synchronized实例方法时
  • 执行该对象synchronized代码块时
  • 执行该对象类的synchronized静态方法时

注意:对象监视器同一时刻只能被一个活的线程拥有

接下来,看看wait() 的三个重载方法。

3.1 wait()

wait()方法会让当前线程处于无限期地等待状态,直到另外一个线程调用notify()notifyAll,才重新启动。

3.2 wait(long timeout)

这个方法指定了超时时间,当线程等待超过超时时间,就会自动被唤醒。当然,也可以在超时时间之前调用notify()notifyAll()方法唤醒线程。

调用wait(0)wait()效果一样。

3.3 wait(long timeout, int nanos)

这个方法和上面的方法类似,不过它能提供更精确的超时时间。它的超时时间是100000*timeout + nanos,单位是纳秒。

4 notify()和notifyAll()

notify()方法用来唤醒那些等待对象监视器访问权的线程。有两种方式唤醒等待的线程。

4.1 notify()

对于所有等待对象监视器的线程(调用了任意一个wait()方法),notify()方法会通知其中的一个线程。具体唤醒哪个线程我们不知道, 这依赖于具体实现。

因为notify()方法会随机唤醒一个线程,所以在线程都执行相似任务的场景下它可以用来实现互斥锁。但大多数时候,推荐使用notifyAll()

4.2 notifyAll()

这个方法简单地唤醒所有等待对象监视器的线程。

唤醒的线程不处意外会执行完成。

但是在允许它们继续执行之前,系统总会快速检查一下是否满足线程的执行条件。因为可能某些场景,线程没有接收到通知就被唤醒。(这种场景后面会在例子里讨论)

5 Sender-Receiver 同步问题

了解了基础内容,现在看看一个简单的发送者-接收者程序。这里通过wait()notify()方法在它们之间实现同步:

  • 发送者发送一个数据包给接受者
  • 接收者等发送者完成发送后才能处理数据包
  • 同样,发送者等接收者已经处理完上一个包以后才可以发送下一个数据包

创建一个数据类,首先定义从发送者发到接收者的数据包结构。然后在发送和接收方法里使用wait()notifyAll()方法来实现同步效果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class Data {
private String packet;

// True if receiver should wait
// False if sender should wait
private boolean transfer = true;

public synchronized void send(String packet) {
while (!transfer) {
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Log.error("Thread interrupted", e);
}
}
transfer = false;

this.packet = packet;
notifyAll();
}

public synchronized String receive() {
while (transfer) {
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Log.error("Thread interrupted", e);
}
}
transfer = true;

notifyAll();
return packet;
}
}

下面解释下上面的代码:

  • 变量packet表示传输的数据
  • 布尔变量transfer在发送者和接收者做同步的时候会用到:
    • 如果为true,则接收者要等待发送者发送完数据
    • 如果为false,则发送者要等待接收者接收完数据
  • 发送者调用send()方法发送数据到接收者:
    • 如果transfer为false,则调用wait()方法等待
    • 如果transfer为true,则将transfer置为false,设置数据并调用notifyAll()方法唤醒其它线程。其它线程被唤醒后,会检查它们是否能够执行。
  • 同样,接收者调用receive()方法:
    • 如果transfer为true,则调用wait()方法等待
    • 如果transfer为false,则将transfer置为true,调用notifyAll()方法唤醒其他等待的线程并返回数据包。

5.1 为什么将wait()放在while循环中?

因为notify()notifyAll()会随机唤醒等待对象监视器的线程,所以是否满足条件就不一定重要。有时候会出现线程被唤醒,但是实际条件不满足的情况。

也可以通过为那些不需要接收通知就能唤醒线程的情况增加一个检查来避免这种假唤醒。

5.2 为什么需要同步send()和receive()方法?

将方法放到同步(synchronized)方法中可以拿到内在锁(intrinsic lock)。如果线程调用wait()方法没有内在锁,就会报错。

现在创建发送者和接收者类,它们都实现了Runnable接口,这样可以让线程执行它们的实例。

首先,看看发送者如何工作:

  • 创建了一些数据用于发送
  • 通过send()方法发送每个数据包
  • 调用Thread.sleep()方法,通过休眠随机时间来模拟服务端正在处理繁重的任务
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    public class Sender implements Runnable {
    private Data data;

    // standard constructors

    public void run() {
    String packets[] = {
    "First packet",
    "Second packet",
    "Third packet",
    "Fourth packet",
    "End"
    };

    for (String packet : packets) {
    data.send(packet);

    // Thread.sleep() to mimic heavy server-side processing
    try {
    Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
    } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    Log.error("Thread interrupted", e);
    }
    }
    }
    }

然后,实现接收者。这里简单的在循环中调用locd.recevie()方法,直到接收到End数据包后结束:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Receiver implements Runnable {
private Data load;

// standard constructors

public void run() {
for(String receivedMessage = load.receive();
!"End".equals(receivedMessage);
receivedMessage = load.receive()) {

System.out.println(receivedMessage);

// ...
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Log.error("Thread interrupted", e);
}
}
}
}

接下来,看看下面的小例子:

1
2
3
4
5
6
7
8
public static void main(String[] args) {
Data data = new Data();
Thread sender = new Thread(new Sender(data));
Thread receiver = new Thread(new Receiver(data));

sender.start();
receiver.start();
}

打印了如下数据:

1
2
3
4
First packet
Second packet
Third packet
Fourth packet

这里成功地在发送者和接收者之间建立了通信,并正确有序地接收到了所有数据包。

6 总结

这篇文章,讨论了如何使用wait()notify()解决同步问题,并通过小例子进行了实践。

另外,虽然这些传统的低级方法,如wait()notify()notifyAll()都能解决问题,但是高级方法会更简单有效,如Java中的LockCondition接口(包java.util.concurrent.locks)。

更多关于java.util.concurrent包的介绍,查看这篇文章java并发系列-java.util.concurrent概览LockCondition的介绍在这里guide to java.util.concurrent.Locks

7 旁白

文章中讲到的notify()notifyAll在唤醒线程前会做一个是否满足条件的检查,不太理解。检查什么?


本文为译文,作者通过翻译达到学习目的。 原文链接 | 原文源码链接 | 本站源码链接


1 简介

这篇文章介绍实现Runnable接口类还是继承Thread类在具体场景中,哪种更合适,为什么更合适。

2 使用线程(Thread)

首先,定义一个继承Thread的类SimpleThread:

1
2
3
4
5
6
7
8
9
10
11
public class SimpleThread extends Thread {

private String message;

// standard logger, constructor

@Override
public void run() {
log.info(message);
}
}

然后,看看怎样执行这种类型的线程:

1
2
3
4
5
6
7
8
9
@Test
public void givenAThread_whenRunIt_thenResult()
throws Exception {

Thread thread = new SimpleThread(
"SimpleThread executed using Thread");
thread.start();
thread.join();
}

也可以用ExecutorService执行这个线程:

1
2
3
4
5
6
7
@Test
public void givenAThread_whenSubmitToES_thenResult()
throws Exception {

executorService.submit(new SimpleThread(
"SimpleThread executed using ExecutorService")).get();
}

看起来写了很多代码,为了实现在一个线程中执行记录日志操作。

同时,SimpleThread类也不能再继承其他类,因为Java不支持多继承。

3 实现runnable接口类

首先,创建一个实现了java.lang.Runnable接口类的任务类SimpleRunnable

1
2
3
4
5
6
7
8
9
10
11
class SimpleRunnable implements Runnable {

private String message;

// standard logger, constructor

@Override
public void run() {
log.info(message);
}
}

怎样在线程中执行SimpleRunnable这个任务类?有很多方式,其中一种是使用Thread类:

1
2
3
4
5
6
7
8
@Test
public void givenRunnable_whenRunIt_thenResult()
throws Exception {
Thread thread = new Thread(new SimpleRunnable(
"SimpleRunnable executed using Thread"));
thread.start();
thread.join();
}

也可以使用ExecutorService

1
2
3
4
5
6
7
@Test
public void givenARunnable_whenSubmitToES_thenResult()
throws Exception {

executorService.submit(new SimpleRunnable(
"SimpleRunnable executed using ExecutorService")).get();
}

关于ExecutorService的更多知识在这里

这里的SimpleRunnable类只实现了一个接口类,所以有需要的话还能继承其他类。

从Java8开始,任何只暴露一个抽象方法的接口类也被看作是函数式接口,可以用lambda表达式来表示。所以可以用lambda表达式重写上面Runnable代码。

1
2
3
4
5
6
7
@Test
public void givenARunnableLambda_whenSubmitToES_thenResult()
throws Exception {

executorService.submit(
() -> log.info("Lambda runnable executed!"));
}

4 Runnable 或 Thread

通常比起Thread,更推荐用Runnable,原因如下:

  • 继承Thread类的时候,我们不能覆盖它的任何方法。反之,可以覆盖Runnable的方法(Thread happens to implement ???)。这明显违反了IS-A Thread的原则。(不太明白这句啥意思)
  • 创建Runnable的实现并将它传给Thread类执行,通过组合的方式而不是继承,更加灵活。
  • 继承了Thread类,不能再继承其他类
  • 从Java8开始,Runnable可以通过lambda表达式来表示

5 总结

通过这篇短文知道,实现Runnable接口类的方式比继承Thread类有更多优势。


本文为译文,作者通过翻译达到学习目的。 原文链接 | 原文源码链接