并发编程专题-基础-12、AQS
1. 是什么
1.1. 大致结构
在 AbstractQueuedSynchronizer 类中,有几个属性和一个双向队列(CLH 队列)
AQS 就是并发包下的一个基类
AQS = state + CLH 队列
Node = waitStatud + Thread
1.2. 涉及锁
1.3. 模板设计模式
由子类实现具体方法逻辑
1.4. 类关系图
1.5. 加锁过程
- 如果是第一个线程 t1,那么和队列无关,线程直接持有锁。并且也不会初始化队列,如果接下来的线程都是交替执行,那么永远和 AQS 队列无关,都是直接线程持有锁
- 如果发生了竞争,比如 t1 持有锁的过程中 t2 来 lock,那么这个时候就会初始化 AQS,初始化 AQS 的时候会在队列的头部虚拟一个 Thread 为 NULL 的 Node,因为队列当中的 head 永远是持有锁的那个 node(除了第一次会虚拟一个,其他时候都是持有锁的那个线程锁封装的 node)
- 现在第一次的时候持有锁的是 t1 ,而 t1 不在队列当中所以虚拟了一个 node 节点,队列当中的除了 head 之外的所有的 node 都在 park
- 当 t1 释放锁之后 unpark 某个(基本是队列当中的第二个,为什么是第二个呢?前面说过 head 永远是持有锁的那个 node,当有时候也不会是第二个,比如第二个被 cancel 之后,至于为什么会被 cancel,不在我们讨论范围之内,cancel 的条件很苛刻,基本不会发生)node 之后,node 被唤醒,假设 node 是 t2,那么这个时候会首先把 t2 变成 head(sethead)
- 在 sethead 方法里面会把 t2 代表的 node 设置为 head,并且把 node 的 Thread 设置为 null,为什么需要设置 null?其实原因很简单,现在 t2 已经拿到锁了,node 就不要排队了,那么 node 对 Thread 的引用就没有意义了。所以队列的 head 里面的 Thread 永远为 null
- JUC AQS ReentrantLock源码分析(一)
1.6. 解锁过程
- 调用 unlock(),会调用到 release(),进而调用到 tryRelease(arg)
- tryRelease() 由 Sync 进行实现,其内部对 state 进行减一,完成一次释放锁,如果锁还有其它层未释放,返回 false,然后不做其它操作
- 如果锁已经完全释放,即 state=0,则将队列中的 head 节点的 waitStatusCAS 地设置为 0, 然后将第二个节点设置为 head 节点,并将其代表的 Thread,unpark 唤醒。该线程获得锁
- 如果第二个节点被取消,则从 tail 向前获得一个为被取消的 node 进行 Thread 唤醒。
2. 面试题
2.1. 谈谈对 AQS 的理解
AQS 是多线程同步器,它是 J.U.C 包中多个组件的底层实现,如 Lock、 CountDownLatch、Semaphore 等都用到了 AQS. 从本质上来说,AQS 提供了两种锁机制,分别是排它锁,和共享锁。排它锁,就是存在多线程竞争同一共享资源时,同一时刻只允许一个线程访问该共享资源,也就是多个线程中只能有一个线程获得锁资源,比如 Lock 中的 ReentrantLock 重入锁实现就是用到了 AQS 中的排它锁功能。共享锁也称为读锁,就是在同一时刻允许多个线程同时获得锁资源,比如 CountDownLatch 和 Semaphore 都是用到了 AQS 中的共享锁功能。
设计 AQS 整个体系需要解决的三个核心的问题:
①互斥变量的设计以及多线程同时更新互斥变量时的安全性
②未竞争到锁资源的线程的等待以及竞争到锁资源的线程释放锁之后的唤醒
③锁竞争的公平性和非公平性。
AQS 采用了一个 volatile int
类型的互斥变量 state
用来记录锁竞争的一个状态,0 表示当前没有任何线程竞争锁资源,而大于等于 1 表示已经有线程正在持有锁资源。
一个线程来获取锁资源的时候,首先判断 state 是否等于 0,如果是 (无锁状态),则把这个 state 通过 CAS 更新成 1,表示占用到锁。此时如果多个线程进行同样的操作,会造成线程安全问题。AQS 采用了 CAS 机制来保证互斥变量 state 的原子性。
未获取到锁资源的线程通过 LockSupport 的 park 方法 (底层是 Unsafe 类的 park 方法) 对线程进行阻塞,把阻塞的线程按照先进先出的原则加入到一个双向链表的结构中,当获得锁资源的线程释放锁之后,会从双向链表的头部去唤醒下一个等待的线程再去竞争锁。
另外关于公平性和非公平性问题,AQS 的处理方式是,在竞争锁资源的时候,公平锁需要判断双向链表中是否有阻塞的线程,如果有,则需要去排队等待;而非公平锁的处理方式是,不管双向链表中是否存在等待锁的线程,都会直接尝试更改互斥变量 state 去竞争锁。
2.2. AQS 为什么要使用双向链表
https://www.bilibili.com/video/BV1br4y1g7Mm?t=208.9
加入队列等待时需要判断前驱节点状态
竞争锁是需要判断前驱节点是否是头节点
需要查找 cancel 的节点
3. 源码解析
3.1. 重要变量
3.2. lock->A CAS 成功抢到锁
1 |
|
3.2.1. 非公平锁
通过 CAS的方式
尝试将 state 从 0 修改为 1,如果返回 true,代表修改成功,如果修改失败,返回 false。将一个属性设置为当前线程,这个属性是 AQS 的父类提供的
A 线程 compareAndSetState(0, 1)
成功
3.2.2. 区别
3.3. acquire
有 3 个判断方法:tryAcquire、acquireQueued、selfInterrupt
1 |
|
3.3.1. tryAcquire
1 |
|
3.3.1.1. nonfairTryAcquire->B 抢占失败
B 线程进入 nonfairTryAcquire() 方法,返回 false,取反后为 true,判断&&后面的条件,从而进入 addWaiter
方法
3.3.2. addWaiter->B 节点参与竞争抢占失败
1 |
|
3.3.2.1. B 第一个等待 enq
3.3.2.2. 死循环先生成 1 个占位节点
3.3.2.3. 第二次循环 B 节点入队
3.3.2.4. 其他节点类似 C 节点入队
3.3.3. acquireQueued->处理队列中的节点
第二个死循环
1 |
|
3.3.3.1. tryAcquire
acquireQueued
方法处理队列,因为 B 在队头,所以先处理 B。如果 p 是占位节点,继续&&后面的判断,即B 线程会执行 tryAcquire,再次进行尝试
predecessor 返回前驱节点,如果前驱节点为 null,抛出 NPE
B 节点 tryAcquire,再次尝试仍然失败,进入下面 if 判断中的 shouldParkAfterFailedAcquire 方法
3.3.3.2. shouldParkAfterFailedAcquire
3.3.3.2.1. 第 1 次循环 ->将前面节点 waitStatus 置为 -1
队头第一个线程节点 B 再次尝试失败时进入 shouldParkAfterFailedAcquire
方法
自旋进入第 1 次循环时,会走最下面的分支,将占位节点的 waitStatus
改为 -1
compareAndSetWaitStatus
1 |
|
3.3.3.2.2. 第 2 次循环 ->B park 住
自旋进入第 2 次循环时,shouldParkAfterFailedAcquire
中的 if 判断 ws == Node.SIGNAL
返回 ture,继续进行 acquireQueued 方法中第 2 个 if 判断中的 parkAndCheckInterrupt()
3.3.3.3. parkAndCheckInterrupt->park B 节点
3.3.3.4. C 节点重复 shouldParkAfterFailedAcquire 中的循环
第 1 次循环,将其前面的 B 节点的 waitStatus 置为 -1 (规律就是后面节点将前面节点的 waitStatus 置为 -1,表示已就绪)
第 2 次循环,执行 parkAndCheckInterrupt,将自己 park 住
被 park 后,线程 B、C 等都进入 WATING
状态,等待被唤醒
线程状态复习:并发基础-7、Thread
3.4. unlock
release(1)
3.5. release
3.5.1. tryRelease->返回 true
3.5.2. unparkSuccessor
3.5.2.1. compareAndSetWaitStatus
3.5.2.2. unpark
unpark 就会唤醒上面 parkAndCheckInterrupt 中 park 的线程,线程 B 被唤醒后,从parkAndCheckInterrupt
方法中的 837 行继续执行,出来后在自旋中继续循环
假设没有线程中断的特殊情况,B 节点出来 parkAndCheckInterrupt 方法之后,还在 acquireQueued 方法的死循环中继续循环。此时 node 是 B 节点,那么 p 就是占位节点,进入 if 判断中,p==head
为 true,继续&&后面的 tryAcquire 方法
3.5.2.2.1. B 在 tryAcquire 方法中 CAS 成功抢到锁
state 变为 1,占有窗口的 Thread 变为 Thread B
将节点 B 设置为 head,占位节点的 next 置为 null
也就是将原来的占位节点出队,B 变为新的占位节点
3.6. selfInterrupt
该方法其实是为了中断线程。但为什么获取了锁以后还要中断线程呢?这里简单分析一下:
因为 LockSupport.park 阻塞线程后,有两种可能被唤醒。
第一种情况,前节点是头节点,释放锁后,会调用 LockSupport.unpark 唤醒当前线程。整个过程没有涉及到中断,最终 acquireQueued 返回 false 时,不需要调用 selfInterrupt。
第二种情况,LockSupport.park 支持响应中断请求,能够被其他线程通过 interrupt() 唤醒。但这种唤醒并没有用,因为线程前面可能还有等待线程,在 acquireQueued 的循环里,线程会再次被阻塞。parkAndCheckInterrupt 返回的是 Thread.interrupted(),不仅返回中断状态,还会清除中断状态,保证阻塞线程忽略中断。最终 acquireQueued 返回 true 时,真正的中断状态已经被清除,需要调用 selfInterrupt 维持中断状态,即要进行“中断补偿”操作。
优雅退出线程中,也有设计中断补偿:并发基础-7、Thread
4. 实战经验
5. 参考与感谢
5.1. 尚硅谷周阳
https://www.bilibili.com/video/BV1uX4y1u7ht
5.2. 测试程序
/Users/taylor/Nutstore Files/Obsidian_data/pages/002-schdule/001-Arch/001-Subject/013-DemoCode/juc-day02
[[ReentrantLock3.java]]
[[AQSDemo.java]]
selfInterrupt:[[Java 并发之 AQS 详解(下) HeapDump性能社区]]
5.3. 整体流程图
https://www.processon.com/view/5e29b0e8e4b04579e40c15a7?fromnew=1