java并发系列-Java中的wait()和notify()方法

1 简介

这篇文章讨论下java中最基础的机制之一-线程同步。

首先,介绍一些并发相关的术语和方法。然后,通过实现一个用于解决并发问题的简单例子来更好的理解waitnotify方法。

2 Java中的线程同步

在多线程环境中,多个线程会尝试修改同一资源。如果线程管理不得当,就会引起一致性问题。

2.1 Java中的Guarded Blocks

Guarded Blocks(保护块)是Java中用于协调多线程行为的方法之一。这些保护块会在重新开始执行之前检查是否满足具体条件。

知道这些后,接下来将会用到如下方法:

  • Object.wait()-挂起线程
  • Object.notify()-唤醒进程

下图展示了线程的生命周期,可以让帮助我们理解。

线程生命周期

其实有很多方法可以操控线程生命周期。不过这篇文章,我们只关注wait()notify()方法。

3 wait()方法

简单地讲,当调用wait()时,当前线程会被切换到等待状态,直到处于同一个对象(object)上的其他线程调用notify()notifyAll,才重新启动。
为此,当前线程必须拥有该对象监视器(object monitor)。根据官方文档,拥有对象监视器的情况有以下三种:

  • 执行该对象synchronized实例方法时
  • 执行该对象synchronized代码块时
  • 执行该对象类的synchronized静态方法时

注意:对象监视器同一时刻只能被一个活的线程拥有

接下来,看看wait() 的三个重载方法。

3.1 wait()

wait()方法会让当前线程处于无限期地等待状态,直到另外一个线程调用notify()notifyAll,才重新启动。

3.2 wait(long timeout)

这个方法指定了超时时间,当线程等待超过超时时间,就会自动被唤醒。当然,也可以在超时时间之前调用notify()notifyAll()方法唤醒线程。

调用wait(0)wait()效果一样。

3.3 wait(long timeout, int nanos)

这个方法和上面的方法类似,不过它能提供更精确的超时时间。它的超时时间是100000*timeout + nanos,单位是纳秒。

4 notify()和notifyAll()

notify()方法用来唤醒那些等待对象监视器访问权的线程。有两种方式唤醒等待的线程。

4.1 notify()

对于所有等待对象监视器的线程(调用了任意一个wait()方法),notify()方法会通知其中的一个线程。具体唤醒哪个线程我们不知道, 这依赖于具体实现。

因为notify()方法会随机唤醒一个线程,所以在线程都执行相似任务的场景下它可以用来实现互斥锁。但大多数时候,推荐使用notifyAll()

4.2 notifyAll()

这个方法简单地唤醒所有等待对象监视器的线程。

唤醒的线程不处意外会执行完成。

但是在允许它们继续执行之前,系统总会快速检查一下是否满足线程的执行条件。因为可能某些场景,线程没有接收到通知就被唤醒。(这种场景后面会在例子里讨论)

5 Sender-Receiver 同步问题

了解了基础内容,现在看看一个简单的发送者-接收者程序。这里通过wait()notify()方法在它们之间实现同步:

  • 发送者发送一个数据包给接受者
  • 接收者等发送者完成发送后才能处理数据包
  • 同样,发送者等接收者已经处理完上一个包以后才可以发送下一个数据包

创建一个数据类,首先定义从发送者发到接收者的数据包结构。然后在发送和接收方法里使用wait()notifyAll()方法来实现同步效果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class Data {
private String packet;

// True if receiver should wait
// False if sender should wait
private boolean transfer = true;

public synchronized void send(String packet) {
while (!transfer) {
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Log.error("Thread interrupted", e);
}
}
transfer = false;

this.packet = packet;
notifyAll();
}

public synchronized String receive() {
while (transfer) {
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Log.error("Thread interrupted", e);
}
}
transfer = true;

notifyAll();
return packet;
}
}

下面解释下上面的代码:

  • 变量packet表示传输的数据
  • 布尔变量transfer在发送者和接收者做同步的时候会用到:
    • 如果为true,则接收者要等待发送者发送完数据
    • 如果为false,则发送者要等待接收者接收完数据
  • 发送者调用send()方法发送数据到接收者:
    • 如果transfer为false,则调用wait()方法等待
    • 如果transfer为true,则将transfer置为false,设置数据并调用notifyAll()方法唤醒其它线程。其它线程被唤醒后,会检查它们是否能够执行。
  • 同样,接收者调用receive()方法:
    • 如果transfer为true,则调用wait()方法等待
    • 如果transfer为false,则将transfer置为true,调用notifyAll()方法唤醒其他等待的线程并返回数据包。

5.1 为什么将wait()放在while循环中?

因为notify()notifyAll()会随机唤醒等待对象监视器的线程,所以是否满足条件就不一定重要。有时候会出现线程被唤醒,但是实际条件不满足的情况。

也可以通过为那些不需要接收通知就能唤醒线程的情况增加一个检查来避免这种假唤醒。

5.2 为什么需要同步send()和receive()方法?

将方法放到同步(synchronized)方法中可以拿到内在锁(intrinsic lock)。如果线程调用wait()方法没有内在锁,就会报错。

现在创建发送者和接收者类,它们都实现了Runnable接口,这样可以让线程执行它们的实例。

首先,看看发送者如何工作:

  • 创建了一些数据用于发送
  • 通过send()方法发送每个数据包
  • 调用Thread.sleep()方法,通过休眠随机时间来模拟服务端正在处理繁重的任务
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    public class Sender implements Runnable {
    private Data data;

    // standard constructors

    public void run() {
    String packets[] = {
    "First packet",
    "Second packet",
    "Third packet",
    "Fourth packet",
    "End"
    };

    for (String packet : packets) {
    data.send(packet);

    // Thread.sleep() to mimic heavy server-side processing
    try {
    Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
    } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    Log.error("Thread interrupted", e);
    }
    }
    }
    }

然后,实现接收者。这里简单的在循环中调用locd.recevie()方法,直到接收到End数据包后结束:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Receiver implements Runnable {
private Data load;

// standard constructors

public void run() {
for(String receivedMessage = load.receive();
!"End".equals(receivedMessage);
receivedMessage = load.receive()) {

System.out.println(receivedMessage);

// ...
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Log.error("Thread interrupted", e);
}
}
}
}

接下来,看看下面的小例子:

1
2
3
4
5
6
7
8
public static void main(String[] args) {
Data data = new Data();
Thread sender = new Thread(new Sender(data));
Thread receiver = new Thread(new Receiver(data));

sender.start();
receiver.start();
}

打印了如下数据:

1
2
3
4
First packet
Second packet
Third packet
Fourth packet

这里成功地在发送者和接收者之间建立了通信,并正确有序地接收到了所有数据包。

6 总结

这篇文章,讨论了如何使用wait()notify()解决同步问题,并通过小例子进行了实践。

另外,虽然这些传统的低级方法,如wait()notify()notifyAll()都能解决问题,但是高级方法会更简单有效,如Java中的LockCondition接口(包java.util.concurrent.locks)。

更多关于java.util.concurrent包的介绍,查看这篇文章java并发系列-java.util.concurrent概览LockCondition的介绍在这里guide to java.util.concurrent.Locks

7 旁白

文章中讲到的notify()notifyAll在唤醒线程前会做一个是否满足条件的检查,不太理解。检查什么?


本文为译文,作者通过翻译达到学习目的。 原文链接 | 原文源码链接 | 本站源码链接