AbstractQueuedSynchronizer

前言

AbstractQueuedSynchronizer(AQS)是 locks 包下一个用于实现锁与同步器的工具类。这是 jdk 中提供的一个简单的实现类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class Mutex implements Lock, java.io.Serializable {

// Our internal helper class
private static class Sync extends AbstractQueuedSynchronizer {
// Acquires the lock if state is zero
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

// Releases the lock by setting state to zero
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// ...
}

// The sync object does all the hard work. We just forward to it.
private final Sync sync = new Sync();

public void lock() {
sync.acquire(1);
}

public void unlock() {
sync.release(1);
}
// ...
}

可以看到,通过 AQS,我们只需实现更改锁状态等非常简单的方法,就可以完成一个自定义的锁。通过 AQS,我们不仅可以实现独占锁,还可以实现共享锁。简单来说,AQS 通过一个双向的先进先出(FIFO)队列(下称同步队列)来管理等待线程,如果某个线程发现前驱的线程释放了锁,便会获得锁。乍看之下,AQS 是公平锁,但实际上,线程加入队列前会先尝试获取一次锁,失败后才会加入队列按序等待。所以,在没有特殊处理的情况下,AQS 的锁分配并不具备公平性,这与 synchronized 是十分相似的。

导航

独占锁

顾名思义,一旦一个线程拥有了独占锁,其他的线程都将被阻塞。一般来说,线程的阻塞可以通过挂起线程来实现。但挂起线程的消耗比较大,于是,另一种方法出现了:让线程空转,也就是自旋:

1
while(!tryAcquire());

自旋锁、Ticket Lock、MCS 锁、CLH 锁

通过自旋来阻塞线程的锁叫做自旋锁:

1
2
3
4
5
6
7
8
9
10
11
12
class SpinLock {

private AtomicBoolean state = new AtomicBoolean(false);

public void lock() {
while (!state.compareAndSet(false, true)) ;
}

public void unlock() {
state.compareAndSet(true, false);
}
}

自旋锁可以避免线程频繁挂起与恢复,但却无法保证公平性,可能存在一个线程长时间无法获得锁的现象。为了解决公平性的问题,各种排队自旋锁出现了,如 Ticket Lock、MCS 锁与 CLH 锁等等。

Ticket Lock 类似于排队叫号业务。线程按取号,按照号码等待执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class TicketLock {
private AtomicInteger serviceNum = new AtomicInteger(); // 服务号
private AtomicInteger ticketNum = new AtomicInteger(); // 排队号
private static final ThreadLocal<Integer> myNum = new ThreadLocal<>();

public void lock() {
// 获得排队号
myNum.set(ticketNum.getAndIncrement());
while (serviceNum.get() != myNum.get());
}

public void unlock() {
serviceNum.compareAndSet(myNum.get(), serviceNum.get() + 1)
}
}

虽然 Ticket Lock 保证了公平性,但 serviceNum 与 ticketNum 作为多个线程的共享变量,会被频繁进行读写。而使用 MCS 锁则可以减少使用共享变量而造成的额外开销。

MCS 锁使用队列来使线程排队获得锁。当一个线程释放锁时,将会通知自己的后继线程获得锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class MCSLock {
// 队列尾部
volatile MCSNode tail;

private volatile ThreadLocal<MCSNode> curNode=new ThreadLocal<>();

static class MCSNode {
// 是否被阻塞
volatile boolean isBlock = true;
MCSNode next;
}

public void lock() {
MCSNode mcsNode = new MCSNode();
// 使用CAS添加至同步队列
while (true) {
MCSNode oldTail = tail;
if (TAIL.compareAndSet(this, oldTail, mcsNode)) {
oldTail.next = mcsNode;
break;
}
}
// 等待前驱结点通知
while (mcsNode.isBlock) ;
}

public void unlock() {
// 通知后继结点
curNode.get().next.isBlock = false;
}

private static final VarHandle TAIL;

static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
TAIL = l.findVarHandle(MCSLock.class, "tail", MCSNode.class);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new ExceptionInInitializerError(e);
}
}
}

CLH 锁与 MCS 锁很类似,不同的是,MCS 中自旋轮询己身状态来等待前驱结点的通知,而 CLH 锁则自旋轮询前驱结点的状态来判断是否结束自旋。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class CLHLock {
volatile CLHNdoe tail;

private volatile ThreadLocal<CLHNdoe> curNode=new ThreadLocal<>();

static class CLHNdoe {
CLHNdoe prev;
volatile boolean isLock = true;
}

public void lock() {
CLHNdoe clhNdoe = new CLHNdoe();
curNode.set(clhNdoe);
// 使用CAS添加至同步队列
while (true) {
CLHNdoe oldTail = tail;
if (TAIL.compareAndSet(this, oldTail, clhNdoe)) {
clhNdoe.prev = oldTail;
break;
}
}
// 前驱结点是否释放了锁
while (clhNdoe.prev.isLock) ;
}

public void unlock() {
// 释放锁
curNode.get().isLock = false;
}

private static final VarHandle TAIL;

static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
TAIL = l.findVarHandle(MCSLock.class, "tail", CLHNdoe.class);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new ExceptionInInitializerError(e);
}
}
}

AQS 中的 独占锁

不响应中断

AQS 使用 CLH 锁原理实现,但将等待锁的操作,调整为通过 LockSupport.park()让线程阻塞。一个线程结点获得锁大体分为三步:尝试获取锁->失败则加入尾端->等待获得锁三步:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 尝试获取锁,也就是是新线程加入队列之前,会首先尝试争抢锁
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 独占模式(EXCLUSIVE),将线程结点加入队列尾部(addWaiter),并尝试获得锁(acquireQueued)
selfInterrupt();
}

// 子类需重写
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

// 将线程结点加入队列
private Node addWaiter(Node mode) {
Node node = new Node(mode);

for (;;) {
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail); // 设置前驱结点
if (compareAndSetTail(oldTail, node)) { // 通过CAS将结点加入队列
oldTail.next = node; // 加入成功,则设置next连接
return node;
}
} else {
initializeSyncQueue(); // 初始化队列:设置头结点Head与尾结点tail
}
}
}

final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
// 尝试获得锁,失败则阻塞(park),等待恢复(unpark)
for (;;) {
final Node p = node.predecessor(); // 获取前驱结点
if (p == head && tryAcquire(arg)) { // head为队列首节点,也就是占有锁的结点,也就是说当前线程通过排队可以获得锁、并且成功争抢到了锁
setHead(node); // 设置头结点
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node)) // 判断是否需要park
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}

private void setHead(Node node) {
head = node;
node.thread = null; // 前驱释放锁、unpark后继(node)之后,node才可能加锁成功,才会调用此方法,因此thread不再需要
node.prev = null;
}

释放锁大体分为两步:尝试释放锁->恢复后继线程

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) { // 尝试释放锁
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // unpark 后继线程
return true;
}
return false;
}

响应中断

AQS 还支持相应中断的解锁:如果等正在等待锁的线程被中断,则抛出 InterruptedException 并返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 先行检查
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg)) // 加锁失败
doAcquireInterruptibly(arg); // 支持中断的加锁
}

private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE); // 增加至队列尾部
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException(); // 如果线程被中断,则抛出异常
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}

限时抢占

AQS 还支持在一定时间内尝试获取锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout); // 限时抢占
}

private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE); // 增加至队列尾部
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) { // 超时
cancelAcquire(node); // 取消抢占
return false;
}
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) // 时限大于SPIN_FOR_TIMEOUT_THRESHOLD才会采取park线程策略,否则自旋
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException(); // 中断异常
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}

共享锁

除了独占锁,AQS 还支持共享锁:同一时刻可以有多个线程获取到锁。与独占锁一样,共享锁也支持不响应中断、响应中断、限时抢占三种方式。三者实现的区别与独占锁相似,这里不再多述,只以不响应中断的锁举例。

结点的 waitStatus 有不同的几个状态:

  • SIGNAL:后继线程处于或将被 park,表明需要 unpark 后继线程
  • CANCELLED:被取消
  • CONDITION:等待条件
  • PROPAGATE:共享状态将会传播下去

所以,线程通过不断传播共享状态来实现锁的共享。

相对于加锁,释放共享锁相对简单:如果后继线程已经或将被 park(waitStatus=Node.SIGNAL),则 unpark 后继线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // 子类需重写
doReleaseShared();
return true;
}
return false;
}

private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // head的后继线程已经或者将会park
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) // 将状态设置成初始化状态
continue; // loop to recheck cases
unparkSuccessor(h); // unpark head的后继结点
}
else if (ws == 0 && // 是初始化状态
!h.compareAndSetWaitStatus(0, Node.PROPAGATE)) // 设置成传播状态(share模式,后继线程可以获得锁)失败
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

而共享锁的加锁流程与独占锁大同小异:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0) // 子类需重写:负数失败,0表示成功但后续线程不能继续获取锁,正数表示成功且后续线程能继续获取锁
doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED); // 结点是share模式
boolean interrupted = false;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) { // 加锁成功
setHeadAndPropagate(node, r); // 设置头结点并且通知后继线程是否可以同时获取锁
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
} finally {
if (interrupted)
selfInterrupt();
}
}

private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
// 结点容许锁共享或者新旧head线程不是取消与初始化状态
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared(); // 共享传播
}
}

看到这里,可能会有疑惑:为什么释放锁的实现中,拥有 SIGNAL 状态,而在加锁方法中并没有出现?其实,SIGNAL 状态的设置,在 shouldParkAfterFailedAcquire 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // 前驱处于SIGNAL态,直接返回true
return true;
if (ws > 0) { // 前驱被取消
// 向前遍历,找到没有被取消的线程结点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 前驱waitStatus为0或者PROPAGATE,则设置成SIGNAL
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}

Condition

AQS 中实现 Condition 接口的是 ConditionObject。ConditionObject 采用队列来管理线程(以下称为条件队列),firstWaiter 表示头结点,lastWaiter 表示尾结点。当执行 signal()方法时,会首先判断锁是否被当前线程独占,独占则进行 signal 操作,否则抛出 IllegalMonitorStateException。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

public final void signal() {
if (!isHeldExclusively()) // 是否独占,子类需重写
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first); // signal first结点
}

private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && // 转移至同步队列
(first = firstWaiter) != null);
}

// 将条件队列中的线程结点转移至同步队列中
final boolean transferForSignal(Node node) {
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false;

Node p = enq(node); // 加入同步队列,返回原tail结点
int ws = p.waitStatus;
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) // 被取消或CAS失败。
LockSupport.unpark(node.thread);
return true;
}

从 singal 方法,我们可以大体知道,AQS 通过将结点从条件队列转移至同步队列来实现 signal。因此,我们可以大致想象出 await 的过程:

  • 将结点加入条件队列
  • 将结点从同步队列中移出,并更新后继结点状态
  • 等待
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException(); // 中断异常
Node node = addConditionWaiter(); // 添加至条件队列
int savedState = fullyRelease(node); // 释放node的锁
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // node不在同步队列中
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 被中断
break;
}
// 被唤醒后
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // to THROW_IE:throw InterruptedException on exit from wait
interruptMode = REINTERRUPT; // REINTERRUPT: reinterrupt on exit from wait
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters(); // 遍历全部
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode); // 中断处理
}

// 是否在同步队列中
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;

return findNodeFromTail(node); // 从尾部遍历,判断是否在同步队列中
}
}

为什么要从尾部遍历呢?因为加入同步队列的操作,保证 node.prev 不为空,但不保证已经增加至同步队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// ..
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) { // CAS操作不保证成功
oldTail.next = node;
return oldTail;
}
} else {
initializeSyncQueue();
}
}
// ..