1 学习内容
- notifyAll
- 生产者、消费者经典范式
- 线程休息室 wait set
- synchronized关键字的缺陷
- 自定义显式锁BooleanLock
- 总结
2 具体内容
2.1 多线程通信
2.1.1 notifyAll方法
多线程的通信要用到Object的notifyAll方法,notifyAll方法可以同时唤醒阻塞的全部线程,当然了被唤醒的线程仍需要继续争抢monitor的锁。
2.1.2 生产者消费者
上一节中我们定义了一个EventQueue,此队列的在多线程的环境中会出现数据不一致的情况,其中的情形是:LinkedList中没有元素的时候仍然调用removeFirst方法;当LinkList中的元素超过10个的时候仍旧执行了addLast方法
- LinkedList为空时执行removeFirst方法分析
其实我想着我们在EventQueue的方法中都加了synchronized数据同步,为什么还会出现不一致的情况?我们假设EventQueue的元素为空,两个线程在执行take方法时都陷入阻塞中,另外一个offer线程执行了addLast方法之后唤醒了其中一个阻塞的take线程,该线程消费了一个元素之后刚好唤醒了一个take线程,这时就会执行空LinkedList
2.1.3 改进EventQueue中的offer和take方法
/**
* 如果事件队列没有满则添加到队尾,否则等待
* @param event
*/
public void offer(Event event) {
synchronized (eventQueue) {
while (eventQueue.size() >= max) { //事件队列 > 队列定义的最大值
try {
console("the Queue is full.");
eventQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
console("the new event is committed.");
eventQueue.addLast(event);
eventQueue.notifyAll(); //唤醒那些曾经执行monitor的wait方法而陷入阻塞的线程
}
}
/**
* 从队头获取数据,如果队列中可用的数据那么工作线程就会调用wait阻塞
*/
public Event take() {
synchronized (eventQueue) {
if (eventQueue.isEmpty()) {
try {
console("the queue is empty. 没有可以拿的要我怎么办!");
eventQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Event event = eventQueue.removeLast();
this.eventQueue.notifyAll(); // 将等待队列中的任务全部唤醒
console("the event" + event + "is handled.");
return event;
}
}
其实只需要将临界值的判断if更改为while,将notify改为notifyAll就可以了。
2.1.4 等待/通知经典范式
如上一节EventQueue例子如是也(有点勉强),范式分为两部份,分别针对等待方(生产者)和通知方(消费者)
- 等待方遵循如下原则
- 获取对象的锁
- 如果条件不满足,则调用对象的wait方法,被通知后仍要检查条件
- 条件满足则执行相应的逻辑
对应伪代码如下
synchronized(对象){
while(条件不满足){
对象.wait();
}
对应的逻辑处理
}
-
通知方遵循如下原则
- 获取对象的锁
- 改变条件
- 通知所有等待在对象上的线程
对应伪代码如下
synchronized(对象){
改变条件
对象.notifyAll();
}
2.2 生产者消费者经典范式(notifyAll)
可以看出Buffer缓冲区作为一个中介,将生产者和消费者分开,使得两部分相对独立,生产者消费者不需要知道对方的实现逻辑,对其中一个的修改,不会影响另一个,从设计模式的角度看,降低了耦合度。而对于图中处在多线程环境中Buffer,需要共享给多个多个生产者和消费者,为了保证读写数据和操作的正确性与时序性,程序需要对Buffer结构进行同步处理。通常情况下,生产者-消费者模式中的广泛使用的Buffer缓冲区结构是阻塞队列。
生产者消费示例(线程假死)
package com.thread.basicmethod.chapter05;
import java.util.stream.Stream;
/********************************
* @Author: kangna
* @Date: 2019/8/23 22:39
* @Version: 2.0
* @Desc: 生产者 消费者
********************************/
public class ProduceConsumerVersion_2 {
private int i = 0;
final private Object LOCK = new Object();
private volatile boolean isProduced = false; // 保证boolean 值可见
public void produce() {
synchronized (LOCK) {
//如果已经生产过,别人还没有消费,那就等一下
if (isProduced) {
try {
LOCK.wait(); // 也可以被打断
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
i++;
System.out.println("produce--->" + i);
LOCK.notify(); // 唤醒wait set中的 消费者
isProduced = true; // 生产者已经生产
}
}
}
public void consume() {
synchronized (LOCK) {
if (isProduced) { // 如果生产者生产
System.out.println("consume--->" + i);
// 消费者消费了之后,通知生产者,再生产
LOCK.notify(); // 唤醒生产者(有多个生产者),赶紧给我 生产,我要
isProduced = false; // 消费之后,将状态置为 false
} else {
try {
LOCK.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
ProduceConsumerVersion_2 pc = new ProduceConsumerVersion_2();
Stream.of("P1","P2").forEach(n ->
new Thread("P") {
@Override
public void run() {
while (true) {
pc.produce();
}
}
}.start()
);
Stream.of("C1","C2").forEach(n ->
new Thread("S") {
@Override
public void run() {
while (true) {
pc.consume();
}
}
}.start()
);
}
}
运行的结果如下:
两个生产者P1,P2,两个消费者C1,C2,为什么线程会进入假死,不进行生产和消费了,我们进行了jstack 信息打印看了一下,也确定没有死锁。问题就出在notify,因为到后边生产者生产了一个货物之后,不知道notify谁,陷入了两难的境地,最后大家都blocked了。
接下来我们修改一下代码其实跟上边的EventQueue一样的
package com.thread.basicmethod.chapter05;
import java.util.stream.Stream;
/********************************
* @Author: kangna
* @Date: 2019/8/23 23:31
* @Version: 3.0
* @Desc: 多线程状态下的生产者和消费者
********************************/
public class ProduceConsumerVersion_3 {
private int i = 0; // 货物
final private Object LOCK = new Object(); // 定义共享锁
private volatile boolean isProduced = false;
public void produce() {
synchronized (LOCK) {
while (isProduced) { // 避免生产者生产 重复数据
try {
LOCK.wait(); // 也可以被打断
} catch (InterruptedException e) {
e.printStackTrace();
}
}
i++;
System.out.println("product--->" + i);
LOCK.notifyAll();
isProduced = true; // 生产者已经生产了。
}
}
public void consume() {
synchronized (LOCK) {
while (!isProduced) { // 避免生产一次,消费多次的状况
try {
LOCK.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("consume--->" + i);
LOCK.notifyAll();
isProduced = false;
}
}
public static void main(String[] args) {
ProduceConsumerVersion_3 pc = new ProduceConsumerVersion_3();
Stream.of("P1","P2").forEach(n ->
new Thread("P") {
@Override
public void run() {
while (true) {
pc.produce();
}
}
}.start()
);
Stream.of("C1","C2").forEach(n ->
new Thread("S") {
@Override
public void run() {
while (true) {
pc.consume();
}
}
}.start()
);
}
}
运行结果
将循环判断 while 判断改为 if 看看运行结果
- 说明,为什么会出现生产一个数据,有两次消费的情况?
p1,p2只有一个可以抢到锁,首先P1抢到锁,生产了一个数据,P2已经进入wait队列中放弃monitor,p1生产完之后,也会进入wait Set队列中。会执notifyAll,这里就是问题,不会循环判断,进而会继续向下执行(i++),唤醒wait set中的P2,p2又生产一个,P1生产了一个消费者还没有消费,P2又生产了一个。
- 为什么使用while 而不是if
在循环(loop)里调用 wait 和 notify,不是在 If语句,wait应该在被synchronized的背景下和那个被多线程共享的对象上调用,应该永远在while循环,而不是if语句中调用wait。因为线程是在某些条件下等待的——在我们的例子里,即“如果消费者没有消费的话,那么生产者线程应该等待”,你可能直觉就会写一个if语句。但 if 语句存在一些微妙的小问题,我们使用 if 确实出现了问题。所以如果不在线程被唤醒后再次使用while 循环检查唤醒条件是否被满足(满足条件),程序就有可能会出错——例如上面例子中我们生产了重复相同的数据(覆盖),导致消费者消费重复数据。
基于以上认知,下面这个是使用wait和notify函数的规范代码模板:
// The standard idiom for calling the wait method in Java
synchronized(sharedObject) {
while(condition) {
sharedObject.wait();
// (Releases lock, and reacquires on wakeup)
}
// do action based upon condition e.g. take or put into queue
}
在while循环里使用 wait 的目的,是在线程被唤醒的前后都持续检查条件是否被满足。如果条件并未改变,wait被调用之前notify的唤醒通知就来了,那么这个线程并不能保证被唤醒,有可能会导致死锁问题。
注意:
- 永远在synchronized的方法或对象里使用wait、notify和notifyAll,不然Java虚拟机会生成
IllegalMonitorStateException。 - 永远在while循环里而不是if语句下使用wait。这样,循环会在线程睡眠前后都检查wait的条件,并在条件实际上并未改变的情况下处理唤醒通知。
- 永远在多线程间共享的对象(在生产者消费者模型里即缓冲区队列)上使用wait。
2.2线程休息室wait set
在虚拟机规范中存在一个wait set的概念,线程调用了对象的wait方法之后线程会被加入与该对象monitor关联的wait set中,并且释放monitor的所有权,然后会进入可执行状态,继续加入抢锁的队伍中,如果抢到锁,就会继续执行,相反,则不能继续执行。
- 如图,是若干个线程调用wait方法之后被加入与monitor关联的wait set中,当另外一个线程调用该monitor的notify方法之后,其中一个线程会从wait set中弹出,至于是随机弹出还是以先进先出的方式弹出,虚拟机没说
而执行notifyAll则不需要考虑那个线程会被弹出,因为wait set中的所有wait线程都将被弹出
2.3 再看monitor(监视器和wait set)
接下来我们再看下在monitor在Java虚拟机(HotSpot)中的实现,其实现是基于C++的,由ObjectMonitor实现的,其主要数据结构如下:
ObjectMonitor() {
_header = NULL;
_count = 0;
_waiters = 0,
_recursions = 0;
_object = NULL;
_owner = NULL;
_WaitSet = NULL;
_WaitSetLock = 0 ;
_Responsible = NULL ;
_succ = NULL ;
_cxq = NULL ;
FreeNext = NULL ;
_EntryList = NULL ;
_SpinFreq = 0 ;
_SpinClock = 0 ;
OwnerIsThread = 0 ;
}
ObjectMonitor.hpp源码地址:
在上面的源码我们可以看到ObjectMonitor中有几个关键属性:
- _owner:指向持有ObjectMonitor对象的线程
- _WaitSet:存放处于wait状态的线程队列
- _EntryList:存放处于等待锁block状态的线程队列
- _recursions:锁的重入次数
- _count:用来记录该线程获取锁的次数
当多个线程同时访问一段同步代码时,首先会进入_EntryList队列中,当某个线程获取到对象的monitor后进入_Owner区域并把monitor中的_owner变量设置为当前线程,同时monitor中的计数器_count加1。即获得对象锁。
若持有monitor的线程调用wait()方法,将释放当前持有的monitor,_owner变量恢复为null,_count自减1,同时该线程进入_WaitSet集合中等待被唤醒。若当前线程执行完毕也将释放monitor(锁)并复位变量的值,以便其他线程进入获取monitor(锁)。如下图所示:
2.4 自定义显式锁BooleanLock
2.4.1 synchronized关键字的缺陷
synchronized关键字提供了一种排他式的数据同步机制,某个线程在获取monitor锁的时候可能陷入阻塞,而这种阻塞有两种缺陷:
- 无法控制阻塞时长
- 阻塞不可中断
示例:
public class SynchronizedDefect {
public synchronized void syncMethod(){
try {
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String args[]) throws InterruptedException{
SynchronizedDefect defect = new SynchronizedDefect();
Thread t1 = new Thread(defect :: syncMethod, "T1");
t1.start();
TimeUnit.MILLISECONDS.sleep(2); //主线程休息2毫秒
Thread t2 = new Thread(defect :: syncMethod, "T2");
t2.start();
}
}
上面的代码T1线程会最先进入同步方法,而导致T2阻塞,T2的执行完全取决于T1何时释放,如果T2计划最后一分钟获得所有权,否则就放弃,很显然这种方式是做不到的,其实这就是前面说的阻塞时长无法控制。
另外一个缺陷就是T2线程会因争抢某个monitor的锁而进入阻塞状态,它是无法中断的,虽然T2可以设置中断interrupt标识,但是synchronized不像wait与sleep一样能够捕捉到中断信号。
2.4.2 显式锁BooleanLock
现在我们构造一个显式的BooleanLock,使其不仅具备synchronized关键字的所有功能同时具备可中断可超时的功能。
1.定义Lock接口
public interface Lock {
/**
* lock方法永远阻塞,除非获得了锁,这一点和synchronized相似,但是该 方法可以被中断的,中断时抛出异常InterruptedException
* @throws InterruptedException
*/
public void lock() throws InterruptedException;
/**
* lock方法除了可以被中断外 还增加了超时功能
* @param mills
* @throws InterruptedException
* @throws TimeoutException
*/
public void lock(long mills) throws InterruptedException, TimeoutException;
/**
* 该方法可用来进行锁的释放
*/
public void unlock();
/**
* @return 获取当前有哪些线程被阻塞
*/
public List<Thread> getBlockedThreads();
}
2 实现BooleanLock
BooleanLock是Lock的一个Boolean实现,通过控制一个Boolean变量的开关来决定是否允许当前的线程获取该锁。
package com.kangna.concurrent.chapter05;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import static java.lang.Thread.currentThread;
import static java.lang.System.currentTimeMillis;
public class BooleanLock implements Lock{
//当前拥有锁的线程
private Thread currentThread;
//locked是一个Boolean开关, true代表该锁已经被某个线程获得
private boolean locked = false;
//存储拿些线程在获取当前线程时进入了阻塞状态
private final List<Thread> blockedList = new ArrayList<>();
@Override
public void lock() throws InterruptedException {
synchronized(this){ // 同步代码块进行方法同步
// 如果当前锁已经被某个线程获得,则该线程加入阻塞队列,并且使当前线程wait释放对this monitor的所有权
while(locked){
//暂停当前线程
final Thread tempThread = currentThread();
try {
if (!blockedList.contains(tempThread)) {
blockedList.add(currentThread());
this.wait(); //等待
}
} catch (InterruptedException e) {
//如果当前线程在wait时被中断,则从blockedList中将其删除,避免内存泄漏
blockedList.remove(currentThread());
throw e;
}
}
//如果当前锁没有被其它线程获得,则该线程将尝试从阻塞队列中删除自己,
blockedList.remove(currentThread());
//该锁已经被某个线程获得
this.locked = true;
//记录获取锁的线程
this.currentThread = currentThread();
}
}
@Override
public void lock(long mills) throws InterruptedException, TimeoutException {
synchronized(this){
if(mills <= 0){
this.lock(); // 抛异常也可以
} else {
long remainingMills = mills;
long endMills = currentTimeMillis() + remainingMills;
while(locked){
//如果remainingMills小于等于0,则意味着当前线程被其它线程唤醒或者在指定的wait时间后还没有获得锁,会抛出超时异常
if(remainingMills <= 0){
throw new TimeoutException("can not get the lock during " + mills + " ms.");
}
if(!blockedList.contains(currentThread)){
blockedList.add(currentThread);
//等待remainingMills的毫秒数,
this.wait(remainingMills);
remainingMills = endMills - currentTimeMillis();
}
//获得锁,并且从block列表中删除当前线程,将locked的状态改为true并且指定获得锁的线程就是当前线程
blockedList.remove(currentThread());
this.locked = true;
this.currentThread = currentThread();
}
}
}
}
/**
* 此方法将locked状态改为false,并且唤醒在wait set中的其它线程,再次争抢锁资源,,注意哪个线程 加的锁只能由该线程来解锁
*/
@Override
public void unlock() {
synchronized(this){
//判断当前线程是否为获取锁的线程,只有加了锁的线程才有资格进行解锁,
if(currentThread == currentThread()){
this.locked = false;
Optional.of(currentThread().getName() + "release the lock.").ifPresent(System.out :: println);
//通知wait set 中的线程你们可以再次尝试抢锁了
this.notifyAll();
}
}
}
@Override
public List<Thread> getBlockedThreads() {
return Collections.unmodifiableList(blockedList);
}
}
3.使用BooleanLock
(1)多个线程通过lock()方法争抢锁
package com.kangna.concurrent.chapter05;
import static java.lang.Thread.currentThread;
import static java.util.concurrent.ThreadLocalRandom.current;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
public class BooleanLockTest {
//定义BooleanLock
private final Lock lock = new BooleanLock();
//使用try finally 语句确保lock每次都能正确的释放
public void synMethod(){
try{
lock.lock();
int randomInt = current().nextInt(10);
System.out.println(currentThread() + "get the lock.");
TimeUnit.SECONDS.sleep(randomInt);
} catch(InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock(); //释放锁
}
}
public static void main(String args[]){
BooleanLockTest blt = new BooleanLockTest();
//定义一个线程并且启动
IntStream.range(0, 10)
.mapToObj(i -> new Thread(blt :: synMethod))
.forEach(Thread :: start);
}
}
测试结果
Thread[Thread-0,5,main]get the lock.
Thread-0release the lock.
Thread[Thread-9,5,main]get the lock.
Thread-9release the lock.
Thread[Thread-1,5,main]get the lock.
Thread-1release the lock.
Thread[Thread-8,5,main]get the lock.
Thread-8release the lock.
Thread[Thread-2,5,main]get the lock.
Thread-2release the lock.
Thread[Thread-7,5,main]get the lock.
Thread-7release the lock.
Thread[Thread-3,5,main]get the lock.
Thread-3release the lock.
Thread[Thread-6,5,main]get the lock.
Thread-6release the lock.
Thread[Thread-4,5,main]get the lock.
Thread-4release the lock.
Thread[Thread-5,5,main]get the lock.
Thread-5release the lock.
从测试结果我们可以看出,每次确保只有一个线程能够获得锁的执行权限,这一点与synchronized很接近了。
(2)可中断被阻塞的线程
修改测试类
BooleanLockTest blt = new BooleanLockTest();
new Thread(blt :: synMethod, "T1").start();
TimeUnit.MICROSECONDS.sleep(2);
Thread t2 = new Thread(blt :: synMethod, "T2");
t2.start();
TimeUnit.MICROSECONDS.sleep(10);
t2.interrupt(); //T2线程运行10毫秒后主动将其打断
(3)阻塞的线程可超时
public class BooleanLockTest {
//定义BooleanLock
private final Lock lock = new BooleanLock();
public void synMethodTimeoutable(){
try{
lock.lock(1000);
System.out.println(currentThread() + "get the lock.");
int randomInt = current().nextInt(10);
TimeUnit.SECONDS.sleep(randomInt);
} catch (InterruptedException | TimeoutException e){
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String args[]) throws InterruptedException{
BooleanLockTest blt = new BooleanLockTest();
new Thread(blt :: synMethod, "T1").start();
TimeUnit.MICROSECONDS.sleep(2);
Thread t2 = new Thread(blt :: synMethodTimeoutable, "T2");
t2.start();
TimeUnit.MICROSECONDS.sleep(10);
}
}
运行结果
Thread[T1,5,main]get the lock.
T1release the lock.
java.util.concurrent.TimeoutException: can not get the lock during 1000 ms.
T2release the lock.
at com.kangna.concurrent.chapter05.BooleanLock.lock(BooleanLock.java:59)
at com.kangna.concurrent.chapter05.BooleanLockTest.synMethodTimeoutable(BooleanLockTest.java:27)
at java.lang.Thread.run(Unknown Source)
3 总结
- 生产者与消费者是最常用的模型之一
- wait set介绍
- 针对synchronized缺陷开发显式锁BooleanLock