CAS
模拟高并发场景
需求:模拟一个网站的高并发访问,假设有 100
个用户,同时请求服务器 10
次。记录访问的总次数
代码实现
/**
* @author BNTang
*/
public class CasDemo1 {
/**
* 记录用户访问次数
*/
static int count;
public static void request() {
// 模拟请求耗时1毫秒
SleepTools.sleepMs(1);
count++;
}
public static void main(String[] args) throws InterruptedException {
long start = System.currentTimeMillis();
int threadSize = 100;
// CountDownLatch 类就是要保证完成 100 个用户请求之后再执行后续的代码
CountDownLatch countDownLatch = new CountDownLatch(threadSize);
for (int i = 0; i < threadSize; i++) {
Thread thread = new Thread(() -> {
// 模拟用户行为,访问10次网站
for (int j = 0; j < 10; j++) {
request();
}
// 每当一个任务线程执行完毕,就将计数器减1 countdownLatch.countDown(),
countDownLatch.countDown();
});
thread.start();
}
// 当计数器的值变为0时,在 CountDownLatch 上 await() 的线程就会被唤醒
countDownLatch.await();
System.out.println("所耗时长:" + (System.currentTimeMillis() - start));
System.out.println("访问总人数count=" + count);
}
}
查看结果发现,100 个用户每一个用户访问 10 次结果居然不是 1000
存在问题
最后结果却不是 1000
通过 javap -c -v CasDemo1
命令对 class 文件进行反编译,需要注意的是进入到 CasDemo1.class 目录当中执行
javap -c -v CasDemo1.class
request()
方法中的 count++
语句对应的字节码指令包括三个步骤分别如下
- getstatic:将常量值 count 放到方法的操作数栈上
- iconst_1&iadd将常量 1 放到方法的操作数栈上,然后弹出栈顶的前两个元素,执行相加操作,然后将结果压入操作数栈中
- putstatic:将操作数栈栈顶的结果赋值给常量
count
产生原因
- 当有两个线程同时将常量值 count 放到各自方法的操作数栈上时
- 然后同时执行相加操作,最后都赋值给常量 count
- 这个时候发现两个线程相等于只对 count 变量做了一个相加操作,导致结果不够
1000
解决办法
加 synchronized
在 request()
方法上加锁
/**
* @author BNTang
*/
public class CasDemo1 {
/**
* 记录用户访问次数
*/
static int count;
public synchronized static void request() {
// 模拟请求耗时1毫秒
SleepTools.sleepMs(1);
count++;
}
public static void main(String[] args) throws InterruptedException {
long start = System.currentTimeMillis();
int threadSize = 100;
// CountDownLatch 类就是要保证完成 100 个用户请求之后再执行后续的代码
CountDownLatch countDownLatch = new CountDownLatch(threadSize);
for (int i = 0; i < threadSize; i++) {
Thread thread = new Thread(() -> {
// 模拟用户行为,访问10次网站
for (int j = 0; j < 10; j++) {
request();
}
// 每当一个任务线程执行完毕,就将计数器减1 countdownLatch.countDown(),
countDownLatch.countDown();
});
thread.start();
}
// 当计数器的值变为0时,在 CountDownLatch 上 await() 的线程就会被唤醒
countDownLatch.await();
System.out.println("所耗时长:" + (System.currentTimeMillis() - start));
System.out.println("访问总人数count=" + count);
}
}
加了 synchronized 关键字修饰之后,结果能保证是 1000 了,但是还有一个更重要的问题要考虑,就是性能,加了 synchronized 关键字之后,程序大概需要执行 5 到 6 秒钟,在高并发场景下,响应时间这么慢是不被允许的,那是因为 synchronized 加锁的开销很大。
如何解决耗时过长的问题呢
首先之所以耗时长是因为,synchronized 的锁粒度很大,那么我们来降低锁粒度 SleepTools.sleepMs(1);
是不需要加锁的,然后 count++
中其实只有在执行第三步的时候会引发高并发中的可见性问题,一个线程在更新 count 值的时候,并不知道其他线程对 count 值的改变,导致结果不一致,所以只需要在 count++ 这段代码加锁即可如下图。
public static void request() {
// 模拟请求耗时1毫秒
SleepTools.sleepMs(1);
synchronized (CasDemo1.class) {
count++;
}
}
对第三步进行一个改良
- 获取 count 的值,记为 A
- 执行加一操作得到结果 B
更新 count 的值
- 获取锁
- 获取 count 的最新值,记为
LV
- 判断 LV 是否等于 A,如果是,则将最新值 B 赋值给 A,返回 true,否则返回 false
- 释放锁
为了获取到内存中 count 的最新值,我们使用 volatile
关键字修饰变量 count
对于第三步增加一个 compareAndSwap()
方法实现
/**
* 比较并交换
*
* @param expectedValue 期望值
* @param newValue 新值
* @return boolean
*/
public static synchronized boolean compareAndSwap(int expectedValue, int newValue) {
if (count == expectedValue) {
count = newValue;
return true;
}
return false;
}
/**
* @author BNTang
*/
public class CasDemo1 {
/**
* 记录用户访问次数
*/
static volatile int count;
public static void request() {
// 模拟请求耗时1毫秒
SleepTools.sleepMs(1);
int expectedValue;
while (!compareAndSwap(expectedValue = count, expectedValue + 1)) {
}
}
/**
* 比较并交换
*
* @param expectedValue 期望值
* @param newValue 新值
* @return boolean
*/
public static synchronized boolean compareAndSwap(int expectedValue, int newValue) {
if (count == expectedValue) {
count = newValue;
return true;
}
return false;
}
public static void main(String[] args) throws InterruptedException {
long start = System.currentTimeMillis();
int threadSize = 100;
// CountDownLatch 类就是要保证完成 100 个用户请求之后再执行后续的代码
CountDownLatch countDownLatch = new CountDownLatch(threadSize);
for (int i = 0; i < threadSize; i++) {
Thread thread = new Thread(() -> {
// 模拟用户行为,访问10次网站
for (int j = 0; j < 10; j++) {
request();
}
// 每当一个任务线程执行完毕,就将计数器减1 countdownLatch.countDown(),
countDownLatch.countDown();
});
thread.start();
}
// 当计数器的值变为0时,在 CountDownLatch 上 await() 的线程就会被唤醒
countDownLatch.await();
System.out.println("所耗时长:" + (System.currentTimeMillis() - start));
System.out.println("访问总人数count=" + count);
}
}
悲观锁
-
synchronized
是悲观锁,这种线程一旦得到锁,其他需要锁的线程就挂起的情况就是悲观锁- 会导致其它所有需要锁的线程挂起,等待持有锁的线程释放锁
- 写的多些(增删改)
乐观锁
- 每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止
- CAS 操作的就是乐观锁,每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止
- 读的多,写的少
- 通过版本控制
- 乐观锁用到的机制就是
CAS
,Compare and Swap/Set
什么是 CAS
CAS 的全称为 Compare And Swap,直译就是比较并交换。是一条 CPU 的原子指令,其作用是让 CPU 先进行比较两个值是否相等,然后原子地更新某个位置的值,其实现方式是基于硬件平台的汇编指令,在 intel
的 CPU 中,使用的是 cmpxchg
指令,就是说 CAS 是靠硬件实现的,从而在硬件层面提升效率,一种无锁的原子算法,乐观锁
。
过程
- 它包含 3 个参数 CAS(V,E,N)V 表示要更新变量的值,E 表示预期值,N 表示新值
- 仅当 V 值等于 E 值时,才会将 V 的值设为 N,如果 V 值和 E 值不同,则说明已经有其他线程做了更新,则当前线程则什么都不做
- 最后,CAS 返回当前 V 的真实值。CAS 操作时抱着乐观的态度进行的,它总是认为自己可以成功完成操作
作用与特点
- 当多个线程同时使用 CAS 操作一个变量时,只有一个会胜出,并成功更新,其余均会失败
- 失败的线程不会挂起,仅是被告知失败,并且允许再次尝试,当然也允许实现的线程放弃操作
- 基于这样的原理,CAS 操作即使没有锁,也可以发现其他线程对当前线程的干扰
- 与锁相比,使用 CAS 会使程序看起来更加复杂一些,但由于其非阻塞的,它对死锁问题天生免疫,并且,线程间的相互影响也非常小
- 更为重要的是,使用无锁的方式完全没有锁竞争带来的系统开销,也没有线程间频繁调度带来的开销
- 因此,他要比基于锁的方式拥有更优越的性能
总结
CAS 需要你额外给出一个期望值,也就是你认为这个变量现在应该是什么样子的,如果变量不是你想象的那样,哪说明它已经被别人修改过了。你就需要重新读取,再次尝试修改就好了。
CAS 算法底层原理
CAS 操作包含三个操作数,内存位置(V)预期原值(A)和新值(B)如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值 。否则,处理器不做任何操作。
CAS(V,Excepted,NewValue)
- if V == E { V = NewValue }
- otherwise try again or fail
- CPU 原语支持的:硬件厂商老早就在芯片中加入了大量直至并发操作的原语,从而在硬件层面提升效率
CAS 底层原理
- 直接给出结论:CAS 算法在操作系统底层是对应一条
cmpxchg
字节码指令- 指令的作用就是比较并且交换操作数,并且在多处理器的情况下,会在 cmpxchg 字节码指令前面加上
lock
前缀,确保了对内存的读写操作的原子性- cmpxchg 指令会比较
寄存器
中的值B
与内存中的值A
是否相等,如果相等,就将新值C
写入内存中- 如果不相等,则将值 A 赋值到寄存器中,然后循环调用 cmpxchg 指令
lock 前缀的作用
- 带有 lock 前缀的指令在早期的处理器中,是会向总线发送一个 LOCK# 信号,锁住总线,将 CPU 和内存之间的通信锁定了,使得锁定期间,其他处理器也不能操作其他内存地址的数据了,所以总线锁开销比较大。
- 后期的处理器使用了 ”缓存锁定“ 的方法来保证了对共享内存的原子操作,带有 lock 前缀的指令在执行写回内存操作的时候,不会直接在总线上加锁,采用 MESI 的方式完成。
CAS 在 Java 中的应用
/**
* @author BNTang
*/
public class CasDemo1 {
/**
* 记录用户访问次数
*/
static AtomicInteger count = new AtomicInteger();
public static void request() {
// 模拟请求耗时1毫秒
SleepTools.sleepMs(1);
count.getAndIncrement();
}
public static void main(String[] args) throws InterruptedException {
long start = System.currentTimeMillis();
int threadSize = 100;
// CountDownLatch 类就是要保证完成 100 个用户请求之后再执行后续的代码
CountDownLatch countDownLatch = new CountDownLatch(threadSize);
for (int i = 0; i < threadSize; i++) {
Thread thread = new Thread(() -> {
// 模拟用户行为,访问10次网站
for (int j = 0; j < 10; j++) {
request();
}
// 每当一个任务线程执行完毕,就将计数器减1 countdownLatch.countDown(),
countDownLatch.countDown();
});
thread.start();
}
// 当计数器的值变为0时,在 CountDownLatch 上 await() 的线程就会被唤醒
countDownLatch.await();
System.out.println("所耗时长:" + (System.currentTimeMillis() - start));
System.out.println("访问总人数count=" + count);
}
}
AtomicInteger 源码分析
首先通过 javap
来进行反编译查看一下效果如下图所示
目前基于的是 jdk1.8 进行查看的源码如果你使用的是 jdk11 会有稍微不同之处,就是变量的命名等等不同之处,首先从 getAndIncrement
点击进入源码当中
首先来看看 unsafe
unsafe
- Unsafe 是 CAS 的核心类,Java 无法直接访问底层操作系统,而是通过本地(native)方法来访问
- 不过尽管如此,JVM 还是开了一个后门
Unsafe
,它提供了硬件级别的原子操作
valueOffset
- 数据在内存中的偏移量
value
- 要修改的值,volatile 修饰保证了可见性
- var1:要操作对象的内存地址
- var2:要操作对象中属性地址的偏移量
- var3:表示需要修改数据的期望值
- var4:表示需要修改数据的新值
JDK 中依次调用的 C++
代码为如下图所示
ABA 问题
什么是 ABA 问题
CAS 需要检查操作值有没有发生改变,如果没有发生改变则更新。但是存在这样一种情况:如果一个值原来是 A,变成了 B,然后又变成了 A,那么在 CAS 检查的时候会发现没有改变,但是实质上它已经发生了改变。
/**
* @author BNTang
**/
public class CasDemo01 {
private static AtomicInteger atomic = new AtomicInteger(10);
public static void test1() {
new Thread(() -> {
// A B
System.out.println(atomic.compareAndSet(10, 11));
}).start();
new Thread(() -> {
SleepTools.sleepSecond(2);
// A
System.out.println(atomic.compareAndSet(11, 10));
}).start();
new Thread(() -> {
SleepTools.sleepSecond(3);
// 期望值与原始值相等, 会进行修改,其实这个10已经不是初始的值了,而是修改回来的值
System.out.println(atomic.compareAndSet(10, 12));
}).start();
}
public static void main(String[] args) {
test1();
}
}
/**
* @author BNTang
**/
public class CasDemo01 {
private static AtomicStampedReference asr = new AtomicStampedReference(10, 1);
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
SleepTools.sleepSecond(2);
System.out.println(asr.compareAndSet(10, 11, asr.getStamp(), asr.getStamp() + 1));
System.out.println(asr.compareAndSet(11, 10, asr.getStamp(), asr.getStamp() + 1));
});
Thread t2 = new Thread(() -> {
int stamp = asr.getStamp();
SleepTools.sleepSecond(4);
System.out.println(asr.compareAndSet(10, 12, stamp, stamp + 1));
});
t1.start();
t2.start();
}
}
通过版本号解决 ABA
问题
CAS 缺点
CAS 虽然高效地解决了原子操作,但是还是存在一些缺陷的,主要表现在三个方法
缺点
循环时间太长
- 如果 CAS 一直不成功呢?这种情况绝对有可能发生,如果自旋 CAS 长时间地不成功
- 则会给 CPU 带来非常大的开销。在 JUC 中有些地方就限制了 CAS 自旋的次数
只能保证一个共享变量的原子操作
- CAS 的实现就知道这只能针对一个共享变量
ABA 问题
- 对于 ABA 问题其解决方案是加上版本号
- 即在每个变量都加上一个版本号,每次改变时加 1
- 即 A → B → A,变成 1A → 2B → 3A
- 在 JDK 中提供了一个
AtomicStampedReference
类来解决 ABA 问题
完结????