前言
AbstractQueuedSynchronizer(AQS)是 locks 包下一个用于实现锁与同步器的工具类。这是 jdk 中提供的一个简单的实现类:
1 | class Mutex implements Lock, java.io.Serializable { |
可以看到,通过 AQS,我们只需实现更改锁状态等非常简单的方法,就可以完成一个自定义的锁。通过 AQS,我们不仅可以实现独占锁,还可以实现共享锁。简单来说,AQS 通过一个双向的先进先出(FIFO)队列(下称同步队列)来管理等待线程,如果某个线程发现前驱的线程释放了锁,便会获得锁。乍看之下,AQS 是公平锁,但实际上,线程加入队列前会先尝试获取一次锁,失败后才会加入队列按序等待。所以,在没有特殊处理的情况下,AQS 的锁分配并不具备公平性,这与 synchronized 是十分相似的。
导航
独占锁
顾名思义,一旦一个线程拥有了独占锁,其他的线程都将被阻塞。一般来说,线程的阻塞可以通过挂起线程来实现。但挂起线程的消耗比较大,于是,另一种方法出现了:让线程空转,也就是自旋:
1 | while(!tryAcquire()); |
自旋锁、Ticket Lock、MCS 锁、CLH 锁
通过自旋来阻塞线程的锁叫做自旋锁:
1 | class SpinLock { |
自旋锁可以避免线程频繁挂起与恢复,但却无法保证公平性,可能存在一个线程长时间无法获得锁的现象。为了解决公平性的问题,各种排队自旋锁出现了,如 Ticket Lock、MCS 锁与 CLH 锁等等。
Ticket Lock 类似于排队叫号业务。线程按取号,按照号码等待执行。
1 | public class TicketLock { |
虽然 Ticket Lock 保证了公平性,但 serviceNum 与 ticketNum 作为多个线程的共享变量,会被频繁进行读写。而使用 MCS 锁则可以减少使用共享变量而造成的额外开销。
MCS 锁使用队列来使线程排队获得锁。当一个线程释放锁时,将会通知自己的后继线程获得锁:
1 | class MCSLock { |
CLH 锁与 MCS 锁很类似,不同的是,MCS 中自旋轮询己身状态来等待前驱结点的通知,而 CLH 锁则自旋轮询前驱结点的状态来判断是否结束自旋。
1 | class CLHLock { |
AQS 中的 独占锁
不响应中断
AQS 使用 CLH 锁原理实现,但将等待锁的操作,调整为通过 LockSupport.park()让线程阻塞。一个线程结点获得锁大体分为三步:尝试获取锁->失败则加入尾端->等待获得锁三步:
1 | public final void acquire(int arg) { |
释放锁大体分为两步:尝试释放锁->恢复后继线程
1 | public final boolean release(int arg) { |
响应中断
AQS 还支持相应中断的解锁:如果等正在等待锁的线程被中断,则抛出 InterruptedException 并返回:
1 | public final void acquireInterruptibly(int arg) |
限时抢占
AQS 还支持在一定时间内尝试获取锁:
1 | public final boolean tryAcquireNanos(int arg, long nanosTimeout) |
共享锁
除了独占锁,AQS 还支持共享锁:同一时刻可以有多个线程获取到锁。与独占锁一样,共享锁也支持不响应中断、响应中断、限时抢占三种方式。三者实现的区别与独占锁相似,这里不再多述,只以不响应中断的锁举例。
结点的 waitStatus 有不同的几个状态:
- SIGNAL:后继线程处于或将被 park,表明需要 unpark 后继线程
- CANCELLED:被取消
- CONDITION:等待条件
- PROPAGATE:共享状态将会传播下去
所以,线程通过不断传播共享状态来实现锁的共享。
相对于加锁,释放共享锁相对简单:如果后继线程已经或将被 park(waitStatus=Node.SIGNAL),则 unpark 后继线程。
1 | public final boolean releaseShared(int arg) { |
而共享锁的加锁流程与独占锁大同小异:
1 | public final void acquireShared(int arg) { |
看到这里,可能会有疑惑:为什么释放锁的实现中,拥有 SIGNAL 状态,而在加锁方法中并没有出现?其实,SIGNAL 状态的设置,在 shouldParkAfterFailedAcquire 中:
1 | private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { |
Condition
AQS 中实现 Condition 接口的是 ConditionObject。ConditionObject 采用队列来管理线程(以下称为条件队列),firstWaiter 表示头结点,lastWaiter 表示尾结点。当执行 signal()方法时,会首先判断锁是否被当前线程独占,独占则进行 signal 操作,否则抛出 IllegalMonitorStateException。
1 | public class ConditionObject implements Condition, java.io.Serializable { |
从 singal 方法,我们可以大体知道,AQS 通过将结点从条件队列转移至同步队列来实现 signal。因此,我们可以大致想象出 await 的过程:
- 将结点加入条件队列
- 将结点从同步队列中移出,并更新后继结点状态
- 等待
1 |
|
为什么要从尾部遍历呢?因为加入同步队列的操作,保证 node.prev 不为空,但不保证已经增加至同步队列:
1 | // .. |