1. 是什么

image.png

1.1. 大致结构

在 AbstractQueuedSynchronizer 类中,有几个属性和一个双向队列(CLH 队列)

AQS 就是并发包下的一个基类

AQS = state + CLH 队列

Node = waitStatud + Thread

1.2. 涉及锁

1.3. 模板设计模式


由子类实现具体方法逻辑

1.4. 类关系图

1.5. 加锁过程

  1. 如果是第一个线程 t1,那么和队列无关,线程直接持有锁。并且也不会初始化队列,如果接下来的线程都是交替执行,那么永远和 AQS 队列无关,都是直接线程持有锁
  2. 如果发生了竞争,比如 t1 持有锁的过程中 t2 来 lock,那么这个时候就会初始化 AQS,初始化 AQS 的时候会在队列的头部虚拟一个 Thread 为 NULL 的 Node,因为队列当中的 head 永远是持有锁的那个 node(除了第一次会虚拟一个,其他时候都是持有锁的那个线程锁封装的 node)
  3. 现在第一次的时候持有锁的是 t1 ,而 t1 不在队列当中所以虚拟了一个 node 节点,队列当中的除了 head 之外的所有的 node 都在 park
  4. 当 t1 释放锁之后 unpark 某个(基本是队列当中的第二个,为什么是第二个呢?前面说过 head 永远是持有锁的那个 node,当有时候也不会是第二个,比如第二个被 cancel 之后,至于为什么会被 cancel,不在我们讨论范围之内,cancel 的条件很苛刻,基本不会发生)node 之后,node 被唤醒,假设 node 是 t2,那么这个时候会首先把 t2 变成 head(sethead)
  5. 在 sethead 方法里面会把 t2 代表的 node 设置为 head,并且把 node 的 Thread 设置为 null,为什么需要设置 null?其实原因很简单,现在 t2 已经拿到锁了,node 就不要排队了,那么 node 对 Thread 的引用就没有意义了。所以队列的 head 里面的 Thread 永远为 null
  6. JUC AQS ReentrantLock源码分析(一)

1.6. 解锁过程

  1. 调用 unlock(),会调用到 release(),进而调用到 tryRelease(arg)
  2. tryRelease() 由 Sync 进行实现,其内部对 state 进行减一,完成一次释放锁,如果锁还有其它层未释放,返回 false,然后不做其它操作
  3. 如果锁已经完全释放,即 state=0,则将队列中的 head 节点的 waitStatusCAS 地设置为 0, 然后将第二个节点设置为 head 节点,并将其代表的 Thread,unpark 唤醒。该线程获得锁
  4. 如果第二个节点被取消,则从 tail 向前获得一个为被取消的 node 进行 Thread 唤醒。

2. 面试题

2.1. 谈谈对 AQS 的理解

AQS 是多线程同步器,它是 J.U.C 包中多个组件的底层实现,如 Lock、 CountDownLatch、Semaphore 等都用到了 AQS. 从本质上来说,AQS 提供了两种锁机制,分别是排它锁,和共享锁。排它锁,就是存在多线程竞争同一共享资源时,同一时刻只允许一个线程访问该共享资源,也就是多个线程中只能有一个线程获得锁资源,比如 Lock 中的 ReentrantLock 重入锁实现就是用到了 AQS 中的排它锁功能。共享锁也称为读锁,就是在同一时刻允许多个线程同时获得锁资源,比如 CountDownLatch 和 Semaphore 都是用到了 AQS 中的共享锁功能。

设计 AQS 整个体系需要解决的三个核心的问题:
①互斥变量的设计以及多线程同时更新互斥变量时的安全性
②未竞争到锁资源的线程的等待以及竞争到锁资源的线程释放锁之后的唤醒
③锁竞争的公平性和非公平性。

AQS 采用了一个 volatile int 类型的互斥变量 state 用来记录锁竞争的一个状态,0 表示当前没有任何线程竞争锁资源,而大于等于 1 表示已经有线程正在持有锁资源。
一个线程来获取锁资源的时候,首先判断 state 是否等于 0,如果是 (无锁状态),则把这个 state 通过 CAS 更新成 1,表示占用到锁。此时如果多个线程进行同样的操作,会造成线程安全问题。AQS 采用了 CAS 机制来保证互斥变量 state 的原子性。
未获取到锁资源的线程通过 LockSupport 的 park 方法 (底层是 Unsafe 类的 park 方法) 对线程进行阻塞,把阻塞的线程按照先进先出的原则加入到一个双向链表的结构中,当获得锁资源的线程释放锁之后,会从双向链表的头部去唤醒下一个等待的线程再去竞争锁。
另外关于公平性和非公平性问题,AQS 的处理方式是,在竞争锁资源的时候,公平锁需要判断双向链表中是否有阻塞的线程,如果有,则需要去排队等待;而非公平锁的处理方式是,不管双向链表中是否存在等待锁的线程,都会直接尝试更改互斥变量 state 去竞争锁。

2.2. AQS 为什么要使用双向链表

https://www.bilibili.com/video/BV1br4y1g7Mm?t=208.9

加入队列等待时需要判断前驱节点状态
竞争锁是需要判断前驱节点是否是头节点
需要查找 cancel 的节点

3. 源码解析

3.1. 重要变量

3.2. lock->A CAS 成功抢到锁

1
2
3
4
public void lock() {
// sync分为了公平和非公平
sync.lock();
}

3.2.1. 非公平锁

通过 CAS的方式 尝试将 state 从 0 修改为 1,如果返回 true,代表修改成功,如果修改失败,返回 false。将一个属性设置为当前线程,这个属性是 AQS 的父类提供的

A 线程 compareAndSetState(0, 1) 成功

3.2.2. 区别


3.3. acquire

有 3 个判断方法:tryAcquire、acquireQueued、selfInterrupt

1
2
3
4
5
6
7
8
public final void acquire(int arg) {
// tryAcquire再次尝试获取锁资源,如果尝试成功,返回true
if (!tryAcquire(arg) &&
// 获取锁资源失败后,需要将当前线程封装成一个Node,追加到AQS的队列中
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 线程中断
selfInterrupt();
}

3.3.1. tryAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取AQS的state的值
int c = getState();
// 如果state为0,代表,尝试再次获取锁资源
if (c == 0) {
// CAS尝试修改state,从0-1,如果成功,设置ExclusiveOwnerThread属性为当前线程
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 当前占有锁资源的线程是否是当前线程
else if (current == getExclusiveOwnerThread()) {
// 将state + 1
int nextc = c + acquires;
// 如果加1后,小于0,超所锁可重入的最大值,抛出Error
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 没问题,就重新对state进行复制
setState(nextc);
// 锁重入成功
return true;
}
return false;
}

3.3.1.1. nonfairTryAcquire->B 抢占失败

B 线程进入 nonfairTryAcquire() 方法,返回 false,取反后为 true,判断&&后面的条件,从而进入 addWaiter 方法

3.3.2. addWaiter->B 节点参与竞争抢占失败

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 说明前面获取锁资源失败,放到队列中等待。。。
private Node addWaiter(Node mode) {
// 创建Node类,并且设置thread为当前线程,设置为排它锁
Node node = new Node(Thread.currentThread(), mode);
// 获取AQS中队列的尾部节点
Node pred = tail;
// 如果tail != null,现在队列有人排队
if (pred != null) {
// 将当前节点的prev,设置为刚才的尾部节点
node.prev = pred;
// 基于CAS的方式,将tail节点设置为当前节点
if (compareAndSetTail(pred, node)) {
// 将之前的为节点的next,设置为当前节点
pred.next = node;
return node;
}
}
// 查看下面~
enq(node);
return node;
}

//-------------------------------
// 现在没人排队,我是第一个~~, 如果前面CAS失败,也会进到这个位置重新往队列尾巴塞。
private Node enq(final Node node) {
// 死循环~~
for (;;) {
// 重新获取当前的tail节点为t
Node t = tail;
if (t == null) {
// 现在没人排队, 我是第一个,没头没尾,都是空
if (compareAndSetHead(new Node())) // 初始化一个Node作为head,而这个head没有意义。
// 将头尾都指向了这个初始化的Node
tail = head;
} else {
// 有人排队,往队列尾巴塞
// 当前节点的上一个指向tail。
node.prev = t;
// 基于CAS的方式,将tail节点设置为当前节点
if (compareAndSetTail(t, node)) {
// 将之前的为节点的next,设置为当前节点
t.next = node;
return t;
}
}
}
}

3.3.2.1. B 第一个等待 enq


3.3.2.2. 死循环先生成 1 个占位节点


3.3.2.3. 第二次循环 B 节点入队


3.3.2.4. 其他节点类似 C 节点入队


3.3.3. acquireQueued->处理队列中的节点

第二个死循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 已经将node加入到了双向队列中,然后执行当前方法
final boolean acquireQueued(final Node node, int arg) {
// 标识!!!!
boolean failed = true;
try {
// 标识!!!!
boolean interrupted = false;
for (;;) {
// 获取当前节点的上一个节点p
final Node p = node.predecessor();
// 如果p是头,再次尝试获取锁资源(state从0-1,锁重入操作),成功返回true,失败返回false
if (p == head && tryAcquire(arg)) {
// 拿到锁资源设置head节点为当前节点,将thread,prev设置为null,因为拿到锁资源了,排队跟我没关系
setHead(node);
p.next = null; // 帮助GC回收
failed = false; // 将标识修改为false
return interrupted; // 返回interrupted
}
// 保证上一个节点是-1,才会返回true,才会将线程阻塞,等待唤醒获取锁资源
if (shouldParkAfterFailedAcquire(p, node) &&
// 基于Unsafe类的park方法,挂起线程~~~
parkAndCheckInterrupt()) // 针对fail属性,这里是唯一可能出现异常的地方,JVM内部出现问题时,可以这么理解,fianlly代码块中的内容,执行的几率约等于0~
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

3.3.3.1. tryAcquire

acquireQueued 方法处理队列,因为 B 在队头,所以先处理 B。如果 p 是占位节点,继续&&后面的判断,即B 线程会执行 tryAcquire,再次进行尝试

predecessor 返回前驱节点,如果前驱节点为 null,抛出 NPE

B 节点 tryAcquire,再次尝试仍然失败,进入下面 if 判断中的 shouldParkAfterFailedAcquire 方法

3.3.3.2. shouldParkAfterFailedAcquire

3.3.3.2.1. 第 1 次循环 ->将前面节点 waitStatus 置为 -1

队头第一个线程节点 B 再次尝试失败时进入 shouldParkAfterFailedAcquire 方法
自旋进入第 1 次循环时,会走最下面的分支,将占位节点waitStatus 改为 -1
compareAndSetWaitStatus

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// node是当前节点,pred是上一个节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取上一个节点的状态
int ws = pred.waitStatus;
// 如果上一个节点状态为SIGNAL,一切正常!
if (ws == Node.SIGNAL)
return true;
// 如果上一个节点已经失效了
if (ws > 0) {
do {
// 将当前节点的prev指针指向了上一个的上一个!
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0); // 一致找到小于等于0的
// 将重新标识好的最近的有效节点的next
pred.next = node;
} else {
// 小于等于0,不等于-1,将上一个有效节点状态修改为-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

3.3.3.2.2. 第 2 次循环 ->B park 住

自旋进入第 2 次循环时,shouldParkAfterFailedAcquire 中的 if 判断 ws == Node.SIGNAL 返回 ture,继续进行 acquireQueued 方法中第 2 个 if 判断中的 parkAndCheckInterrupt()

3.3.3.3. parkAndCheckInterrupt->park B 节点

3.3.3.4. C 节点重复 shouldParkAfterFailedAcquire 中的循环

第 1 次循环,将其前面的 B 节点的 waitStatus 置为 -1 (规律就是后面节点将前面节点的 waitStatus 置为 -1,表示已就绪)
第 2 次循环,执行 parkAndCheckInterrupt,将自己 park 住

被 park 后,线程 B、C 等都进入 WATING 状态,等待被唤醒

线程状态复习:并发基础-7、Thread

3.4. unlock

release(1)

3.5. release

3.5.1. tryRelease->返回 true


3.5.2. unparkSuccessor

3.5.2.1. compareAndSetWaitStatus

3.5.2.2. unpark

unpark 就会唤醒上面 parkAndCheckInterrupt 中 park 的线程,线程 B 被唤醒后,从parkAndCheckInterrupt 方法中的 837 行继续执行,出来后在自旋中继续循环

假设没有线程中断的特殊情况,B 节点出来 parkAndCheckInterrupt 方法之后,还在 acquireQueued 方法的死循环中继续循环。此时 node 是 B 节点,那么 p 就是占位节点,进入 if 判断中,p==head 为 true,继续&&后面的 tryAcquire 方法

3.5.2.2.1. B 在 tryAcquire 方法中 CAS 成功抢到锁

state 变为 1,占有窗口的 Thread 变为 Thread B

将节点 B 设置为 head,占位节点的 next 置为 null

也就是将原来的占位节点出队,B 变为新的占位节点

3.6. selfInterrupt

该方法其实是为了中断线程。但为什么获取了锁以后还要中断线程呢?这里简单分析一下:

因为 LockSupport.park 阻塞线程后,有两种可能被唤醒。

第一种情况,前节点是头节点,释放锁后,会调用 LockSupport.unpark 唤醒当前线程。整个过程没有涉及到中断,最终 acquireQueued 返回 false 时,不需要调用 selfInterrupt。

第二种情况,LockSupport.park 支持响应中断请求,能够被其他线程通过 interrupt() 唤醒。但这种唤醒并没有用,因为线程前面可能还有等待线程,在 acquireQueued 的循环里,线程会再次被阻塞。parkAndCheckInterrupt 返回的是 Thread.interrupted(),不仅返回中断状态,还会清除中断状态,保证阻塞线程忽略中断。最终 acquireQueued 返回 true 时,真正的中断状态已经被清除,需要调用 selfInterrupt 维持中断状态,即要进行“中断补偿”操作。

优雅退出线程中,也有设计中断补偿:并发基础-7、Thread

4. 实战经验

5. 参考与感谢

5.1. 尚硅谷周阳

https://www.bilibili.com/video/BV1uX4y1u7ht

5.2. 测试程序

/Users/taylor/Nutstore Files/Obsidian_data/pages/002-schdule/001-Arch/001-Subject/013-DemoCode/juc-day02
[[ReentrantLock3.java]]
[[AQSDemo.java]]
selfInterrupt:[[Java 并发之 AQS 详解(下) HeapDump性能社区]]

5.3. 整体流程图

https://www.processon.com/view/5e29b0e8e4b04579e40c15a7?fromnew=1