ReentrantLock
ReentrantLock 是什么
java 除了使用关键字 synchronized 外,还可以使用 ReentrantLock 实现独占锁的功能。
ReentrantLock 相比 synchronized 而言功能更加丰富,使用起来更为灵活,也更适合复杂的并发场景。
ReentrantLock 与 Synchronzied 对比
都是独占锁
- synchronized 是独占锁,加锁和解锁的过程自动进行,易于操作,但不够灵活。
- ReentrantLock 也是独占锁,加锁和解锁的过程需要手动进行,不易操作,但非常灵活。
都是可重入锁
- synchronized 可重入锁,因为加锁和解锁自动进行,不必担心最后是否释放锁。
- ReentrantLock 也可重入锁,但加锁和解锁需要手动进行,且次数需一样,否则其他线程无法获得锁。
是否响应中断
- synchronized 不可响应中断,一个线程获取不到锁就一直等着。
- ReentrantLock 可以响应中断。
ReentrantLock 独有的
- ReentrantLock 还可以实现公平锁机制。
- 在锁上等待时间最长的线程将获得锁的使用权。
- 通俗的理解就是谁排队时间最长谁先执行获取锁。
接下来来看看几个 ReentrantLock 的示例如下
可重入锁使用 synchronized 实现
/**
* @author BNTang
*/
public class ReentrantLock01 {
synchronized void method1() {
for (int i = 0; i < 10; i++) {
SleepTools.sleepSecond(1);
System.out.println(i);
// 在一个锁当中调用了另一个带有锁的方法
if (i == 2) {
methods2();
}
}
}
synchronized void methods2() {
System.out.println("methods2 start...");
}
public static void main(String[] args) {
ReentrantLock01 rl = new ReentrantLock01();
new Thread(rl::method1).start();
}
}
使用 Reentrantlock 完成可重入锁
/**
* @author BNTang
*/
public class ReentrantLock02 {
Lock lock = new ReentrantLock();
void method1() {
try {
// synchronized(this)
lock.lock();
for (int i = 0; i < 10; i++) {
SleepTools.sleepSecond(1);
System.out.println(i);
}
} finally {
// lock必须手动释放锁,在finally中进行锁的释放
lock.unlock();
}
}
void methods2() {
try {
lock.lock();
System.out.println("methods2 start...");
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
ReentrantLock02 rl = new ReentrantLock02();
new Thread(rl::method1).start();
new Thread(rl::methods2).start();
}
}
tryLock 进行尝试锁
tryLock 进行尝试锁定,不管锁定与否,方法都将继续执行,根据 tryLock 的返回值来判定是否锁定
/**
* @author BNTang
*/
public class ReentrantLock03 {
Lock lock = new ReentrantLock();
void method1() {
try {
// synchronized(this)
lock.lock();
for (int i = 0; i < 3; i++) {
SleepTools.sleepSecond(1);
System.out.println(i);
}
} finally {
// lock必须手动释放锁,在finally中进行锁的释放
lock.unlock();
}
}
void methods2() {
boolean locked = lock.tryLock();
System.out.println("method start ..." + locked);
if (locked) {
lock.unlock();
}
}
public static void main(String[] args) {
ReentrantLock03 rl = new ReentrantLock03();
new Thread(rl::method1).start();
new Thread(rl::methods2).start();
}
}
/**
* @author BNTang
*/
public class ReentrantLock04 {
Lock lock = new ReentrantLock();
void method1() {
try {
// synchronized(this)
lock.lock();
for (int i = 0; i < 6; i++) {
SleepTools.sleepSecond(1);
System.out.println(i);
}
} finally {
// lock必须手动释放锁,在finally中进行锁的释放
lock.unlock();
}
}
void methods2() {
boolean locked = false;
try {
// 等待5秒 如果锁还没有释放 就结束 不再等待 如果释放 继续加锁处理
locked = lock.tryLock(5, TimeUnit.SECONDS);
System.out.println("method2 start ..." + locked);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (locked) {
lock.unlock();
}
}
}
public static void main(String[] args) {
ReentrantLock04 rl = new ReentrantLock04();
new Thread(rl::method1).start();
new Thread(rl::methods2).start();
}
}
线程打断
synchronized 没有办法打断线程
/**
* @author BNTang
**/
public class ReentrantLock05 {
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
synchronized (ReentrantLock05.class) {
System.out.println("thread1 start");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread1 end");
}
});
t1.start();
Thread t2 = new Thread(() -> {
synchronized (ReentrantLock05.class) {
System.out.println("thread2 start");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread2 end");
}
});
t2.start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 打断线程2的等待
t2.interrupt();
}
}
lock 打断线程等待
/**
* @author BNTang
**/
public class ReentrantLock06 {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
try {
lock.lock();
System.out.println("thread1 start");
TimeUnit.SECONDS.sleep(5);
System.out.println("thread1 end");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
});
t1.start();
Thread t2 = new Thread(() -> {
try {
// lock.lock() 调用此方法不会打断线程
// 调用lockInterruptibly会打断线程等
lock.lockInterruptibly();
System.out.println("thread2 start");
TimeUnit.SECONDS.sleep(2);
System.out.println("thread2 end");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
});
t2.start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 打断线程2的等待
t2.interrupt();
}
}
公平锁
/**
* @author BNTang
*/
public class ReentrantLock07 extends Thread {
/**
* 参数为true表示为公平锁
*/
private static final ReentrantLock LOCK = new ReentrantLock(true);
public void run() {
for (int i = 0; i < 100; i++) {
LOCK.lock();
try {
System.out.println(Thread.currentThread().getName() + "获得锁");
} finally {
LOCK.unlock();
}
}
}
public static void main(String[] args) {
ReentrantLock07 rl = new ReentrantLock07();
Thread thread1 = new Thread(rl);
Thread thread2 = new Thread(rl);
thread1.start();
thread2.start();
}
}
CountDownLatch
CountDownLatch 类的作用
- CountDownLatch 能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行
- 使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一
- CountDownLatch 是一个同步工具类
- 当计数器的值为 0 时,表示所有的线程都已经完成一些任务,然后在 CountDownLatch 上等待的线程就可以恢复执行接下来的任务
CountDownLatch 的用法
/**
* @author BNTang
*/
public class CountDownLatch01 {
public static void main(String[] args) {
// 线程池当中创建三个线程
ExecutorService service = Executors.newFixedThreadPool(3);
final CountDownLatch latch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
Runnable runnable = () -> {
System.out.println("子线程" + Thread.currentThread().getName() + "开始执行");
System.out.println("子线程" + Thread.currentThread().getName() + "执行完成");
// 当前线程调用此方法,则计数器减一
latch.countDown();
};
service.execute(runnable);
}
try {
System.out.println("主线程" + Thread.currentThread().getName() + "等待子线程执行完成...");
// 阻塞当前线程,直到计数器的值为 0
latch.await();
System.out.println("主线程" + Thread.currentThread().getName() + "开始执行...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
如上是一个小案例,接下来来看看一个需求:百米赛跑,4 名运动员选手到达场地等待裁判口令,裁判一声口令,选手听到后同时起跑,当所有选手到达终点,裁判进行汇总排名,如下就是代码实现。
/**
* @author BNTang
**/
public class CountDownLatch02 {
public static void main(String[] args) {
// 创建线程池
ExecutorService service = Executors.newCachedThreadPool();
final CountDownLatch cpOrder = new CountDownLatch(1);
final CountDownLatch personAnswer = new CountDownLatch(4);
for (int i = 0; i < 4; i++) {
Runnable runnable = () -> {
try {
System.out.println("选手" + Thread.currentThread().getName() + "运行员等待枪响...");
cpOrder.await();
System.out.println("选手" + Thread.currentThread().getName() + "听到指令,开始跑");
Thread.sleep((long)(Math.random() * 10000));
System.out.println("选手" + Thread.currentThread().getName() + "到达终点");
personAnswer.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
service.execute(runnable);
}
try {
// 所有线程就绪
SleepTools.sleepSecond(2);
for (int i = 3; i > 0; i--) {
System.out.println(i);
SleepTools.sleepSecond(1);
}
System.out.println("裁判开枪 " + Thread.currentThread().getName() + "彭~~~~~");
cpOrder.countDown();
System.out.println("裁判" + Thread.currentThread().getName() + "已开枪,等待所有选手到达终点");
// 终点等待 第一个线程中有countDown方法 四个执行完countDown=0 继续执行后续代码
personAnswer.await();
System.out.println("所有选手都到达终点");
System.out.println("裁判" + Thread.currentThread().getName() + "汇总成绩排名");
} catch (InterruptedException e) {
e.printStackTrace();
}
service.shutdown();
}
}
CyclicBarrier
CyclicBarrier 是什么
CyclicBarrier 中文意思是 “循环栅栏”
是 java.util.concurrent
包下面的多线程工具类。
它的作用就是会让所有线程都等待完成后才会继续下一步行动。
CyclicBarrier 的基本使用
/**
* @author BNTang
**/
public class CountDownLatch03 {
public static void main(String[] args) {
// 1.会议需要三个人
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
// 2、这是三个人都到齐之后会执行的代码
System.out.println("三个人都已到达会议室");
});
// 3、定义三个线程,相当于三个参会的人
for (int i = 0; i < 3; i++) {
SleepTools.sleepSecond(1);
new Thread(() -> {
SleepTools.sleepSecond(1);
System.out.println("第" + Thread.currentThread().getName() + "个人到达会议室");
try {
// 5、等待其他人到会议室
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "开始开会");
}).start();
}
}
}
CountDownLatch 和 CyclicBarrier 的区别
Phaser
Phaser 概述
Phaser 是 JDK7 新增的一个同步辅助类,在功能上跟 CyclicBarrier 和 CountDownLatch 差不多。
/**
* @author BNTang
*/
public class PhaserTest {
private final static Random RANDOM = new Random();
static class Task extends Thread {
private final Phaser phaser;
Task(Phaser phaser) {
this.phaser = phaser;
this.start();
}
public void run() {
SleepTools.sleepSecond(RANDOM.nextInt(3));
System.out.println("线程:" + this.getName() + "完成阶段1");
// 等待所有线程全部执行完阶段 1 之后才会进入下一个阶段
phaser.arriveAndAwaitAdvance();
SleepTools.sleepSecond(RANDOM.nextInt(5));
System.out.println("线程:" + this.getName() + "完成阶段2");
phaser.arriveAndAwaitAdvance();
System.out.println("线程:" + this.getName() + "完成阶段3");
phaser.arriveAndAwaitAdvance();
System.out.println("线程:" + this.getName() + "完成");
}
}
public static void main(String[] args) {
// 参数为参于者,写几, 就要有几个参与线程,如果多的话, 会一直处理阻塞状态
Phaser phaser = new Phaser(5);
for (int i = 0; i < 5; i++) {
new Task(phaser);
}
}
}
/**
* @author BNTang
*/
public class PhaserTest1 {
public static void main(String[] args) {
// 参数为参于者,写几, 就要有几个参与线程,如果多的话, 会一直处理阻塞状态
WorkPhase phaser = new WorkPhase(3, 5);
for (int i = 0; i < 5; i++) {
new PhaserTest.Task(phaser);
}
}
}
class WorkPhase extends Phaser {
private final int totalPhase;
WorkPhase(int totalPhase, int parties) {
super(parties);
this.totalPhase = totalPhase;
}
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(phase + "期工程完工验收,共" + registeredParties + "期");
// 返回true 后续的就不会再执行, 返回false后续的内容会继续执行
return totalPhase == phase || registeredParties == 0;
}
}
ReadWriteLock
ReadWriteLock 概述
- ReadWriteLock 是一个接口
- ReadWriteLock 管理一组锁,一个是只读的锁,一个是写锁
- Java 并发库中 ReetrantReadWriteLock 实现了 ReadWriteLock 接口并添加了可重入锁的特性
ReadLock
- 读锁。加了读锁的资源, 可以在没有写锁的时候被多个线程共享
如果 t1 线程已经获取了读锁,那么此时存在以下状态:
(1) 如果 t2 线程要申请写锁,则 t2 会一直等待 t1 释放读锁
(2) 如果 t2 线程要申请读锁,则 t2 可以直接访问读锁,也就是说 t1 和 t2 可以共享资源,就和没加锁的效果一样
WriteLock
- 写锁,是独占锁。加了写锁的资源,不能再被其他线程读或写
- 如果 t1 已经获取了写锁,那么此时无论线程 t2 要申请写锁还是读锁,都必须等待 t1 释放写锁
/**
* @author BNTang
*/
public class ReadWriteLock1 {
static Lock lock = new ReentrantLock();
private static int value;
public void read(Lock lock) {
lock.lock();
SleepTools.sleepSecond(1);
System.out.println("读完");
lock.unlock();
}
public void write(Lock lock, int v) {
lock.lock();
SleepTools.sleepSecond(1);
value = v;
System.out.println("写完");
lock.unlock();
}
public static void main(String[] args) {
ReadWriteLock1 wrl = new ReadWriteLock1();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
wrl.read(lock);
}).start();
}
for (int i = 0; i < 2; i++) {
new Thread(() -> {
wrl.write(lock, new Random().nextInt());
}).start();
}
}
}
/**
* @author BNTang
*/
public class ReadWriteLock2 {
/**
* 共享锁与排它锁
*/
static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
static Lock readLock = readWriteLock.readLock();
static Lock writeLock = readWriteLock.writeLock();
private static int value;
public void read(Lock lock) {
lock.lock();
SleepTools.sleepSecond(1);
System.out.println("读完");
lock.unlock();
}
public void write(Lock lock, int v) {
lock.lock();
SleepTools.sleepSecond(1);
value = v;
System.out.println("写完");
lock.unlock();
}
public static void main(String[] args) {
ReadWriteLock2 rt = new ReadWriteLock2();
// 加读锁的时候, 其它的线程也可以获取到锁
for (int i = 0; i < 10; i++) {
new Thread(() -> {
rt.read(readLock);
}).start();
}
// 写锁是独占
for (int i = 0; i < 2; i++) {
new Thread(() -> {
rt.write(writeLock, new Random().nextInt());
}).start();
}
}
}
Semaphore
Semaphore 概述
- Semaphore 通常我们叫它
信号量
- 使用 Semaphore 可以控制同时访问资源的线程个数,例如,实现一个文件允许的并发访问数
举个栗子
- 可以把它简单的理解成我们停车场入口立着的那个显示屏
- 每有一辆车进入停车场显示屏就会显示剩余车位 减1
- 每有一辆车从停车场出去,显示屏上显示的剩余车辆就会 加1
- 当显示屏上的剩余车位为 0 时,停车场入口的栏杆就不会再打开,车辆就无法进入停车场了,直到有一辆车从停车场出去为止
Semaphore 方法
方法名 |
作用 |
void acquire() |
从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断 |
void release() |
释放一个许可,将其返回给信号量 |
int availablePermits() |
返回此信号量中当前可用的许可数 |
boolean hasQueuedThreads() |
查询是否有线程正在等待获取 |
/**
* @author BNTang
**/
public class Semaphore01 {
public static void main(String[] args) {
Semaphore s = new Semaphore(1);
new Thread(() -> {
try {
s.acquire();
System.out.println("T1运行中...");
System.out.println("当前已有" + s.availablePermits() + "个并发");
Thread.sleep(200);
System.out.println("T1运行中...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("是否有人等待:" + s.hasQueuedThreads());
System.out.println("T1即将离开");
s.release();
}
}).start();
new Thread(() -> {
try {
s.acquire();
System.out.println("T2 运行中...");
System.out.println("当前有" + s.availablePermits() + "个并发");
Thread.sleep(200);
System.out.println("T2 运行中...");
s.release();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("是否有人等待:" + s.hasQueuedThreads());
System.out.println("T2即将离开");
s.release();
}
}).start();
}
}
模拟一个需求:
- 停车场容纳总停车量 10
- 当一辆车进入停车场后,显示牌的剩余车位数相应的
减1
- 每有一辆车驶出停车场后,显示牌的剩余车位数相应的 加1
- 停车场剩余车位不足时,车辆只能在外面等待
/**
* @author BNTang
**/
public class Semaphore02 {
/**
* 停车场同时容纳的车辆10
*/
private static final Semaphore semaphore = new Semaphore(10);
public static void main(String[] args) {
// 模拟50辆车进入停车场
for (int i = 0; i < 50; i++) {
Thread thread = new Thread(() -> {
try {
System.out.println("====" + Thread.currentThread().getName() + "来到停车场");
if (semaphore.availablePermits() == 0) {
System.out.println("车位不足,请耐心等待");
}
// 获取令牌尝试进入停车场
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "成功进入停车场");
// 模拟车辆在停车场停留的时间
Thread.sleep(new Random().nextInt(10000));
System.out.println(Thread.currentThread().getName() + "驶出停车场");
// 释放令牌,腾出停车场车位
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, i + "号车");
thread.start();
}
}
}
Exchanger
Exchanger 概述
- Exchanger(交换者)是一个用于线程间协作的工具类
- Exchanger 用于进行线程间的数据交换。它提供一个同步点,在这个同步点两个线程可以交换彼此的数据
- 这两个线程通过 exchange 方法交换数据,如果第一个线程先执行 exchange 方法
- 它会一直等待第二个线程也执行 exchange,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方
将纸制银流通过人工的方式录入成电子银行流水,为了避免错误,采用 AB 岗两人进行录入,录入到 Excel 之后,系统需要加载这两个 Excel,并对这两个 Excel 数据进行校对,看看是否录入的一致。
/**
* @author BNTang
**/
public class Exchanger01 {
private static Exchanger<String> ec = new Exchanger<>();
private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(() -> {
try {
// A录入银行流水数据
String A = "银行流水A";
String exchange = ec.exchange(A);
System.out.println(exchange);
} catch (InterruptedException e) {
}
});
threadPool.execute(() -> {
try {
// B录入银行流水数据
String B = "银行流水B";
String A = ec.exchange(B);
System.out.println("A和B数据是否一致:" + A.equals(B));
System.out.println("A录入的是:" + A);
System.out.println("B录入的是:" + B);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
threadPool.shutdown();
}
}
LockSupport
LockSupport 是什么
- LockSupport 是一个并发编程工具类,主要是为了阻塞和唤醒线程用的
- 可以让线程在任意位置阻塞,也可以在任意位置唤醒
LockSupport 方法
方法名 |
作用 |
park() |
阻塞线程 |
unpark(需要唤醒的线程) |
启动唤醒线程 |
LockSupport 与 wait 和 notify 的区别
获取锁
- wait 和 notify 都是 Object 中的方法,在调用这两个方法前必须先获得锁对象
- park 不需要获取某个对象的锁就可以锁住线程
唤醒
- notify 只能随机选择一个线程唤醒
- 无法唤醒指定的线程,unpark 却可以唤醒一个指定的线程
/**
* @author BNTang
*/
public class LockSupportTest1 {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
for (int i = 0; i < 10; i++) {
System.out.println(i);
if (i == 3) {
LockSupport.park();
}
SleepTools.sleepSecond(1);
}
});
thread.start();
}
}
/**
* @author BNTang
*/
public class LockSupportTest1 {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
for (int i = 0; i < 10; i++) {
System.out.println(i);
if (i == 3) {
LockSupport.park();
}
SleepTools.sleepSecond(1);
}
});
thread.start();
// 先interrupt 时, 程序不会被阻塞
thread.interrupt();
}
}
/**
* @author BNTang
*/
public class LockSupportTest1 {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
for (int i = 0; i < 10; i++) {
System.out.println(i);
if (i == 3) {
LockSupport.park();
}
SleepTools.sleepSecond(1);
}
});
thread.start();
SleepTools.sleepSecond(1);
// 当先执行unpark时, 程序不会被阻塞
LockSupport.unpark(thread);
}
}