searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Java AQS框架基础

2023-08-01 12:32:51
7
0

AQS是AbstractQueuedSynchronizer类的简称,Java并发工具包的大部分类的实现基础都基于这个类。AQS的核心是通过一个共享变量来同步状态,AQS只负责:
• 线程阻塞队列的维护
• 线程阻塞和唤醒
共享变量的修改都是通过Unsafe类提供的CAS操作来实现的,AQS的主要方法是acquire和release,是个模板方法类。
并发工具包中AQS的类关系图

一、AQS的简单实现

下面通过一个demo来看怎么使用AQS和Lock接口来实现一个互斥锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public class Mutex implements Lock {
private static class Sync extends AbstractQueuedSynchronizer{
@Override
protected boolean tryAcquire(int arg) {
if(compareAndSetState(0,1)){
this.setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int releases) {
if(getState() == 0) throw new IllegalMonitorStateException();
this.setExclusiveOwnerThread(null);
this.setState(0);
return true;
}
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}


Condition newCondition(){
return new ConditionObject();
}
}

private final Sync sync = new Sync();

@Override
public void lock() {
sync.acquire(1);
}

@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(0);
}

public boolean isLocked(){
return sync.isHeldExclusively();
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
public static void main(String[] args) {
final Mutex lock = new Mutex();
for(int i = 0; i < 10; i++){
new Thread(new Runnable(){
@Override
public void run() {
lock.lock();
System.out.println(Thread.currentThread().getId() + " acquired the lock.");
lock.unlock();
}
}).start();
//简单地让线程让for循环的顺序阻塞在lock上
SleepUtils.second(10);
}
}

}

通过一个内部类来继承AbstractQueuedSynchronizer类,而Lock接口中的方法主要靠Sync内部类来实现。

二、CLH lock Queue
AQS的实现是基于CLH lock Queue线程阻塞队列的,CLH lock Queue是一个FIFO的双向队列。当线程获取同步状态失败时,同步器会将当前的线程和等待状态信息构造成一个Node对象并放到同步队列中,同时会阻塞当前线程。当同步状态释放时,会将首节点中的线程唤醒,使其再次尝试获取同步状态。
内部类Node就是CLH lock Queue的一个变种,通过自旋来等待而不是睡眠,自旋就一个循环的过程。
基于CLH lock Queue实现自旋锁的一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class ClhSpinLock {


private final ThreadLocal<Node> prev;
private final ThreadLocal<Node> node;
private final AtomicReference<Node> tail = new AtomicReference<Node>(new Node());

public ClhSpinLock(){
this.node = new ThreadLocal<Node>(){
protected Node initialValue(){
return new Node();
}
};
this.prev = new ThreadLocal<Node>(){
protected Node initialValue(){
return null;
}
};
}

public void lock(){
final Node node = this.node.get();
node.locked = true;
//一个CAS操作即可将当前线程对应的节点加到队列中,
//并且同时获取前继节点的引用,然后等待前继节点释放锁
Node prev = this.tail.getAndSet(node);
this.prev.set(prev);
System.out.println(Thread.currentThread().getName() + " waiting for lock");
while(prev.locked){//进入自旋
}
System.out.println(Thread.currentThread().getName() + " get the lock");
}

public void unlock(){
final Node node = this.node.get();
node.locked = false;
this.node.set(this.prev.get());
System.out.println(Thread.currentThread().getName() + " releases the lock.");
}

private static class Node{
private volatile boolean locked;
}

public static void main(String[] args) {
final ClhSpinLock lock = new ClhSpinLock();
lock.lock();
for(int i = 0; i < 10; i++){
new Thread(new Runnable(){
@Override
public void run() {
lock.lock();
SleepUtils.mills(10);
lock.unlock();
}
},"Thread"+i).start();

}
lock.unlock();
}

}

执行后输出结果为:
main waiting for lock
main get the lock
Thread0 waiting for lock
Thread3 waiting for lock
Thread1 waiting for lock
Thread4 waiting for lock
Thread2 waiting for lock
Thread7 waiting for lock
main releases the lock.
Thread0 get the lock
Thread8 waiting for lock
Thread5 waiting for lock
Thread0 releases the lock.
Thread9 waiting for lock
Thread6 waiting for lock
Thread3 get the lock
Thread3 releases the lock.
Thread1 get the lock
Thread1 releases the lock.
Thread4 get the lock
Thread4 releases the lock.
Thread2 get the lock
Thread7 get the lock
Thread2 releases the lock.
Thread7 releases the lock.
Thread8 get the lock
Thread5 get the lock
Thread8 releases the lock.
Thread5 releases the lock.
Thread9 get the lock
Thread6 get the lock
Thread9 releases the lock.
Thread6 releases the lock.

而AQS的线程不单纯是自旋,还包括了反复地休眠和唤醒的过程,AQS的等待队列的后继是通过前继节点唤醒的。AQS结合了自旋和休眠/唤醒的优点。

三、LockSupport

除了LockSupport,Java中的Object对象就有wait/notify来实现线程的阻塞和唤醒。两者之间的区别主要是面向的对象不同。wait/notify是Object对象固有的方法,与对象有关。而LockSupport是与线程相关的,更符合线程语义。wait/notify被唤醒的线程是被动的,要准确控制哪个线程阻塞、唤醒很困难。
在使用wait之前需要获取对象的监视器,而线程唤醒后执行也需要获取对象的监控才行执行。LockSupport为需要获取监视器。LockSupport机制是每次unpark给线程一个许可,而park相反,如果当前线程有许可,则消耗一个许可并返回。否则会阻塞线程直到线程重新获取许可。这个wait/notify的机制是不一样的。

四、处理中断

​ Thread.interrupt()方法不会中断一个正在运行的线程。这一方法实际上完成的是,在线程受到阻塞时抛出一个中断信号,这样线程就得以退出阻塞的状态。更确切的说,如果线程被Object.wait, Thread.join和Thread.sleep三种方法之一阻塞,那么,它将接收到一个中断异常(InterruptedException),从而提早地终结被阻塞状态。
​ LockSupport也能响应中断,但不会抛出InterruptedException,它依赖中断状态(Interrupted status),如果线程被中断退出阻塞,该值设置为true。通过线程的interrupted/isInterrupted可能获取该值,两个方法的区别是interrupted获取后会清理,也就是将Interrupted status设置为false。
处理InterruptedException的原则有两种:
​ • 继续设置Interrupted status
​ • 抛出新的InterruptedException

0条评论
0 / 1000
chuoo
13文章数
0粉丝数
chuoo
13 文章 | 0 粉丝
原创

Java AQS框架基础

2023-08-01 12:32:51
7
0

AQS是AbstractQueuedSynchronizer类的简称,Java并发工具包的大部分类的实现基础都基于这个类。AQS的核心是通过一个共享变量来同步状态,AQS只负责:
• 线程阻塞队列的维护
• 线程阻塞和唤醒
共享变量的修改都是通过Unsafe类提供的CAS操作来实现的,AQS的主要方法是acquire和release,是个模板方法类。
并发工具包中AQS的类关系图

一、AQS的简单实现

下面通过一个demo来看怎么使用AQS和Lock接口来实现一个互斥锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public class Mutex implements Lock {
private static class Sync extends AbstractQueuedSynchronizer{
@Override
protected boolean tryAcquire(int arg) {
if(compareAndSetState(0,1)){
this.setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int releases) {
if(getState() == 0) throw new IllegalMonitorStateException();
this.setExclusiveOwnerThread(null);
this.setState(0);
return true;
}
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}


Condition newCondition(){
return new ConditionObject();
}
}

private final Sync sync = new Sync();

@Override
public void lock() {
sync.acquire(1);
}

@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(0);
}

public boolean isLocked(){
return sync.isHeldExclusively();
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
public static void main(String[] args) {
final Mutex lock = new Mutex();
for(int i = 0; i < 10; i++){
new Thread(new Runnable(){
@Override
public void run() {
lock.lock();
System.out.println(Thread.currentThread().getId() + " acquired the lock.");
lock.unlock();
}
}).start();
//简单地让线程让for循环的顺序阻塞在lock上
SleepUtils.second(10);
}
}

}

通过一个内部类来继承AbstractQueuedSynchronizer类,而Lock接口中的方法主要靠Sync内部类来实现。

二、CLH lock Queue
AQS的实现是基于CLH lock Queue线程阻塞队列的,CLH lock Queue是一个FIFO的双向队列。当线程获取同步状态失败时,同步器会将当前的线程和等待状态信息构造成一个Node对象并放到同步队列中,同时会阻塞当前线程。当同步状态释放时,会将首节点中的线程唤醒,使其再次尝试获取同步状态。
内部类Node就是CLH lock Queue的一个变种,通过自旋来等待而不是睡眠,自旋就一个循环的过程。
基于CLH lock Queue实现自旋锁的一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class ClhSpinLock {


private final ThreadLocal<Node> prev;
private final ThreadLocal<Node> node;
private final AtomicReference<Node> tail = new AtomicReference<Node>(new Node());

public ClhSpinLock(){
this.node = new ThreadLocal<Node>(){
protected Node initialValue(){
return new Node();
}
};
this.prev = new ThreadLocal<Node>(){
protected Node initialValue(){
return null;
}
};
}

public void lock(){
final Node node = this.node.get();
node.locked = true;
//一个CAS操作即可将当前线程对应的节点加到队列中,
//并且同时获取前继节点的引用,然后等待前继节点释放锁
Node prev = this.tail.getAndSet(node);
this.prev.set(prev);
System.out.println(Thread.currentThread().getName() + " waiting for lock");
while(prev.locked){//进入自旋
}
System.out.println(Thread.currentThread().getName() + " get the lock");
}

public void unlock(){
final Node node = this.node.get();
node.locked = false;
this.node.set(this.prev.get());
System.out.println(Thread.currentThread().getName() + " releases the lock.");
}

private static class Node{
private volatile boolean locked;
}

public static void main(String[] args) {
final ClhSpinLock lock = new ClhSpinLock();
lock.lock();
for(int i = 0; i < 10; i++){
new Thread(new Runnable(){
@Override
public void run() {
lock.lock();
SleepUtils.mills(10);
lock.unlock();
}
},"Thread"+i).start();

}
lock.unlock();
}

}

执行后输出结果为:
main waiting for lock
main get the lock
Thread0 waiting for lock
Thread3 waiting for lock
Thread1 waiting for lock
Thread4 waiting for lock
Thread2 waiting for lock
Thread7 waiting for lock
main releases the lock.
Thread0 get the lock
Thread8 waiting for lock
Thread5 waiting for lock
Thread0 releases the lock.
Thread9 waiting for lock
Thread6 waiting for lock
Thread3 get the lock
Thread3 releases the lock.
Thread1 get the lock
Thread1 releases the lock.
Thread4 get the lock
Thread4 releases the lock.
Thread2 get the lock
Thread7 get the lock
Thread2 releases the lock.
Thread7 releases the lock.
Thread8 get the lock
Thread5 get the lock
Thread8 releases the lock.
Thread5 releases the lock.
Thread9 get the lock
Thread6 get the lock
Thread9 releases the lock.
Thread6 releases the lock.

而AQS的线程不单纯是自旋,还包括了反复地休眠和唤醒的过程,AQS的等待队列的后继是通过前继节点唤醒的。AQS结合了自旋和休眠/唤醒的优点。

三、LockSupport

除了LockSupport,Java中的Object对象就有wait/notify来实现线程的阻塞和唤醒。两者之间的区别主要是面向的对象不同。wait/notify是Object对象固有的方法,与对象有关。而LockSupport是与线程相关的,更符合线程语义。wait/notify被唤醒的线程是被动的,要准确控制哪个线程阻塞、唤醒很困难。
在使用wait之前需要获取对象的监视器,而线程唤醒后执行也需要获取对象的监控才行执行。LockSupport为需要获取监视器。LockSupport机制是每次unpark给线程一个许可,而park相反,如果当前线程有许可,则消耗一个许可并返回。否则会阻塞线程直到线程重新获取许可。这个wait/notify的机制是不一样的。

四、处理中断

​ Thread.interrupt()方法不会中断一个正在运行的线程。这一方法实际上完成的是,在线程受到阻塞时抛出一个中断信号,这样线程就得以退出阻塞的状态。更确切的说,如果线程被Object.wait, Thread.join和Thread.sleep三种方法之一阻塞,那么,它将接收到一个中断异常(InterruptedException),从而提早地终结被阻塞状态。
​ LockSupport也能响应中断,但不会抛出InterruptedException,它依赖中断状态(Interrupted status),如果线程被中断退出阻塞,该值设置为true。通过线程的interrupted/isInterrupted可能获取该值,两个方法的区别是interrupted获取后会清理,也就是将Interrupted status设置为false。
处理InterruptedException的原则有两种:
​ • 继续设置Interrupted status
​ • 抛出新的InterruptedException

文章来自个人专栏
容器
13 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0