前言
AQS就是java.util.concurrent.locks
包下的AbstractQueuedSynchronizer
类,这个类也是整个并发包的核心之一。并发包中像ReentrantLock
、CountDownLatch
等同步组件都有一个内部类Sync
,而所有的Sync
都是继承自AbstractQueuedSynchronizer
,因此,可以看出AQS的重要性是十分高的。
AQS主要的工作是维护线程同步队列(CLH)并且负责线程的阻塞和唤醒,它的方法基本可以分为三类:
- 独占式获取与释放同步状态
- 共享式获取与释放同步状态
- 查询同步队列中的等待线程情况
所谓独占就是一次只有一个线程能够获取,其它线程必须等它释放,共享则可以有多个线程同时获取。
CLH队列
AQS内部维护着一个FIFO双向队列,该队列就是CLH同步队列,AQS依赖它来完成同步状态的管理:
- 当前线程如果获取同步状态失败时,AQS会将当前线程以及等待状态等信息构成一个节点
Node
并将其加入到CLH同步队列,同时会阻塞当前线程。 - 当同步状态释放时,会把首节点唤醒,使其再次尝试获取同步状态。
CLH同步队列的结构图如下:
Node
Node
是AQS的静态内部类:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38static final class Node {
// 共享式节点,标记节点在共享模式下等待
static final Node SHARED = new Node();
// 独占式节点,标记节点在独占模式下等待
static final Node EXCLUSIVE = null;
// 因为超时或中断,节点会被设置为取消状态,不会参与到竞争当中
static final int CANCELLED = 1;
// 表示当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
static final int SIGNAL = -1;
// 节点在条件队列中,节点线程等待在 Condition 上,当其它线程对 Condition 调用了 signal() / signalAll() 后,该节点会从条件队列转移到同步队列,加入到同步状态的获取中
static final int CONDITION = -2;
// 下一次共享模式的同步状态获取会无条件地传播下去
static final int PROPAGATE = -3;
// 等待状态,也就是上面这几个,不过初始值为0
volatile int waitStatus;
// 前驱结点
volatile Node prev;
// 后继节点
volatile Node next;
// 此节点的线程
volatile Thread thread;
// 见 ConditionObject
Node nextWaiter;
// ...
}
入队
入队操作过程是很简单的,只需要将tail
指向新节点、新节点的prev
指向当前最后的节点、当前最后的节点的next
指向当前节点即可。但是在CLH的实现中需要考虑并发的情况,它通过CAS的方式,来保证正确的添加Node
:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); // 新建节点
// 记录原尾节点
Node pred = tail;
if (pred != null) {
// 设置新Node的前驱结点为原尾节点
node.prev = pred;
// CAS设置新Node为新的尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node; // 设置成功,将原尾节点的后继节点设为新节点
return node; // 返回新节点
}
}
enq(node); // 失败,多次尝试直到成功
return node; // 返回新节点
}
private Node enq(final Node node) {
for (;;) { // 多次尝试直到成功
Node t = tail; // 记录原尾节点
if (t == null) { // 原尾节点不存在
// 创建首尾节点都为 new Node(),作为一个占位节点(空节点)
if (compareAndSetHead(new Node()))
tail = head;
} else { // 原尾节点存在
// 将原尾节点设置为新节点的前驱节点
node.prev = t;
// CAS设置新Node为新的尾节点
if (compareAndSetTail(t, node)) {
t.next = node; // 设置成功,将原尾节点的后继节点设为新节点
return t; // 返回新节点
}
}
}
}
总体来说就是使用CAS设置新节点为尾节点,如果设置成功则返回新节点,如果失败则继续不断自旋CAS设置新节点为尾节点直到成功。
出队
首节点的线程释放同步状态后,会唤醒它的后继节点,后继节点在获取同步状态成功时将自己设置为首节点,因为只有一个线程能够成功获取到同步状态,所以该过程不需要CAS来保证:1
2
3
4
5private void setHead(Node node) {
head = node;
node.thread = null; // 将未使用的字段设为null以帮助GC
node.prev = null;
}
LockSupport
在进入AQS的核心源码之前有必要先了解一下大量使用到的LockSupport
类。
LockSupport
也是基于UNSAFE
的操作,提供park()
用来阻塞线程和unpark()
用来唤醒线程,LockSupport
的机制是每次unpark()
给线程一个“许可”,并且这个许可最多只能为1,如果当前线程有许可,那么park()
方法会消耗一个并返回,否则会阻塞线程直到线程重新获得许可,在线程启动之前调用park() / unpark()
没有任何效果。
这里简单的讲一下LockSupport.park() / unpark()
与object.wait() / notify()
的区别,他们主要的区别在于语义上的不同,阻塞和唤醒是对于线程来说的,LockSupport
的park() / unpark()
以线程作为方法的参数,更符合这个语义;而notify()
只能随机唤醒一个线程,notifyAll()
会唤醒所有线程,无法准确的控制某一个线程。
Thread.interrupt
AQS中大量用了中断,也就是Thread
的interrupt()
方法,但要注意的是该方法并不会中断一个正在运行的线程:当调用一个线程的该方法时,如果该线程处于被阻塞状态(如Object.wait()
、Thread.join()
、Thread.sleep()
),那么线程将立即退出被阻塞状态,并抛出一个InterruptedException
异常;而如果线程正在运行,那么会将该线程的中断标志设置为true
,此时线程将继续正常运行。
LockSupport.park()
也能响应中断信号,但它不会抛出InterruptedException
异常,因此要想知道线程是被unpark()
还是中断,就依赖于该线程的中断标志,可以通过Thread
的interrupted()
或isInterrupted()
方法获取该值,两个方法的区别是interrupted()
获取后会将标志位重置为false
。
同步状态的获取与释放
接下来进入AQS比较关键的部分:同步状态的获取与释放。这里主要分为以下两类进行分析:
- 独占式获取与释放同步状态
- 共享式获取与释放同步状态
独占式获取
独占就是一次只有一个线程能够获取到同步状态。首先来看独占式同步状态获取的方法:1
2
3
4
5public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
该方法是一个模板方法,其中tryAcquire(arg)
方法需要自定义同步组件(子类)自己去实现,它尝试获取同步状态,获取成功则设置锁状态并返回true
,此时该方法就可以直接返回了;否则获取失败返回false
,调用addWaiter(Node mode)
方法将当前线程包装成Node
加入到CLH同步队列尾部(上面已经介绍过),并且mode
参数为Node.EXCLUSIVE
,表示独占模式。
接下来会调用boolean acquireQueued(final Node node, int arg)
方法,每个线程包装成Node
进入同步队列后都会在该方法中自旋,一旦条件满足,获取到同步状态后,就可以从这个自旋过程中退出,否则会一直执行下去:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; // 记录是否获取同步状态成功
try {
boolean interrupted = false; // 记录该过程中是否发生过线程中断
for (;;) { // 开始自旋
final Node p = node.predecessor(); // 获取当前线程的前驱节点
if (p == head && tryAcquire(arg)) { // 当前线程的前驱节点是头节点,且获取同步状态成功
setHead(node); // 将当前线程设置为头节点
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果获取同步状态失败,则根据条件判断是否应该阻塞自己
// 如果不阻塞,CPU 就会处于忙等状态,这样会浪费 CPU 资源
// 并且从阻塞中返回时,要判断是否是因为中断造成的
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; // 因为 parkAndCheckInterrupt 方法中会将中断标志清除,所以这里重新设为true
}
} finally {
if (failed) // 获取同步状态发生异常,取消获取
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 获得前驱节点的等待状态
if (ws == Node.SIGNAL)
// 表示pred的下一个节点也就是node的线程需要阻塞等待。在pred的线程释放同步状态时会对node的线程进行唤醒通知,所以这里返回true表示当前线程可以放心的被park
return true;
if (ws > 0) { // Node.CANCELLED
// 等待状态为CANCELLED时,表示此时前驱结点已经等待超时或者被中断了,需要从CLH队列中将前驱节点删除,循环回溯,直到前一个节点状态 <= 0
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { // 0 或 Node.PROPAGATE
// CAS将状态修改为Node.SIGNAL,但是会返回false,这一次不会park
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 上面的方法一旦返回true了,就会执行此方法将当前线程park
return Thread.interrupted(); // 判断是否因为中断而醒的,并且将中断标志清除
}
private void cancelAcquire(Node node) {
if (node == null)
return;
// 节点的等待线程置空
node.thread = null;
// 跳过已经取消了的前驱结点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 记录非取消状态的前驱节点的后继节点,注意不是当前节点node
Node predNext = pred.next;
// 将当前节点等待状态设为 CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果当前节点是尾节点,则通过CAS设置前驱节点pred为尾节点
// 以下两个CAS即使失败了也没关系,失败了说明pred此时已经是尾节点了
if (node == tail && compareAndSetTail(node, pred)) {
// 如果设置成功,就通过CAS将pred的next置空,那么中间被取消的节点就“消失”了
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 根据条件判断是唤醒后继节点,还是将前驱节点和后继节点连接到一起
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
// 这里使用 CAS 设置 pred 的 next,表明多个线程同时在取消,这里存在竞争。
// 不过此处没针对 compareAndSetNext 方法失败后做一些处理,表明即使失败了也没关系。
// 实际上,多个线程同时设置 pred 的 next 引用时,只要有一个能设置成功即可
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
/*
* 唤醒后继节点对应的线程。这里简单讲一下为什么要唤醒后继线程,考虑下面一种情况:
* head node1 node2 tail
* ws=0 ws=1 ws=-1 ws=0
* +------+ prev +-----+ prev +-----+ prev +-----+
* | | <---- | | <---- | | <---- | |
* | | ----> | | ----> | | ----> | |
* +------+ next +-----+ next +-----+ next +-----+
*
* 头结点初始状态为 0,node1、node2 和 tail 节点依次入队。node1 自旋过程中调用
* tryAcquire 出现异常,进入 cancelAcquire。head 节点此时等待状态仍然是 0,它
* 会认为后继节点还在运行中,所它在释放同步状态后,不会去唤醒后继等待状态为非取消的
* 节点 node2。如果 node1 再不唤醒 node2 的线程,该线程面临无法被唤醒的情况。此
* 时,整个同步队列就回全部阻塞住。
*/
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 通过 CAS 将等待状态设为 0,让后继节点线程多一次尝试获取同步状态的机会
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 当前节点的后继节点
Node s = node.next;
// 后继节点为null或者其状态 > 0(表示超时或者被中断了)
if (s == null || s.waitStatus > 0) {
s = null;
// 从tail节点开始向前遍历找到最前面的可用节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒后继节点
if (s != null)
LockSupport.unpark(s.thread);
}
在上面的unparkSuccessor()
方法中有一个地方很有意思,就是当判断当前节点的next
引用为null
的时候,还需要从tail
节点向前遍历找到可用的节点,之所以要这么做的原因在于:在之前的addWaiter()
方法将新节点入队时,有这么一段代码:1
2
3
4
5
6
7// 设置新Node的前驱结点为原尾节点
node.prev = pred;
// CAS设置新Node为新的尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node; // 设置成功,将原尾节点的后继节点设为新节点
return node; // 返回新节点
}
也就是此时可能已经将新节点的prev
指向了原尾节点,但原尾节点的next
还并未指向新节点,因此不能从前往后遍历,而应从后往前遍历。
以下是上述步骤的流程图总结:
可以看出,虽然获取同步状态的过程表面上是自旋的操作,但是为了避免CPU资源的浪费,在获取同步状态失败后大部分情况还是进入了阻塞,但由于从阻塞中醒来不一定代表就可以获得同步状态了(有可能因为中断),所以此时会通过这个自旋循环再一次的去尝试获取同步状态,看看能不能获取成功。
独占式获取(响应中断)
上面的acquire(int arg)
方法对中断不响应,也就是线程被中断后,仅仅会通过selfInterrupt()
方法(其实就是Thread.currentThread().interrupt()
)将该线程的中断标志设置为true
,然后线程继续正常运行。为了响应中断,AQS提供了acquireInterruptibly(int arg)
方法,该方法在线程等待获取同步状态时如果被中断了,会立刻响应中断,抛出InterruptedException
异常:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted()) // 首先判断线程是否已经中断了,如果是的话就直接抛出异常
throw new InterruptedException();
if (!tryAcquire(arg)) // 尝试获取同步状态
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException { // 方法声明抛出异常
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return; // 不再返回是否发生过中断
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 与之前不响应中断的acquireQueued()方法唯一的区别在于这里判断发生了中断后会直接抛出异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
独占式获取(超时)
AQS除了提供上面两个独占式获取的方法外,还提供了一个增强版的方法tryAcquireNanos()
,该方法除了能响应中断外,还提供了超时控制,即如果当前线程没有在指定时间内获取同步状态,则会返回false
,否则返回true
:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted()) // 首先判断线程是否已经中断了,如果是的话就直接抛出异常
throw new InterruptedException();
return tryAcquire(arg) || // 尝试获取同步状态
doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 计算超时时间
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 走到这说明获取失败,重新计算需要休眠的时间(剩余时间)
nanosTimeout = deadline - System.nanoTime();
// 如果已经超时了,那么返回false
if (nanosTimeout <= 0L)
return false;
// 如果没有超时,判断了可以被park,并且nanosTimeout大于一个阈值,那么就进入休眠
// 当nanosTimeout小于阈值的时候不需要休眠,直接快速自旋
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException(); // 响应中断,抛出异常
}
} finally {
if (failed)
cancelAcquire(node);
}
}
独占式释放
当线程获取了同步状态,执行完相应逻辑后,就需要释放同步状态。AQS提供了release(int arg)
方法,释放同步状态:1
2
3
4
5
6
7
8
9public final boolean release(int arg) {
if (tryRelease(arg)) { // 由自定义同步组件自己实现
Node h = head;
if (h != null && h.waitStatus != 0) // 如果头节点不为null且状态不为0(等于0说明后继节点正在运行中,不需要唤醒)
unparkSuccessor(h); // 将后继节点唤醒
return true;
}
return false;
}
共享式获取
与独占模式不同,共享模式下,同一时刻会有多个线程获取共享同步状态。共享模式是实现读写锁中的读锁、CountDownLatch
和Semaphore
等同步组件的基础。
共享式同步状态获取的方法是acquireShared()
,对标独占式的acquire()
方法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0) // 由自定义同步组件自己实现。返回负数表示获取失败;返回0表示成功,但是后继争用线程不会成功;返回正数表示获取成功,并且后继争用线程也可能成功。
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
// 与之前独占式的参数不同,这里传入一个Node.SHARED
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); // 获取当前线程的前驱节点
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) { // 前驱节点是头节点并且获取同步状态成功
// 设置当前节点为头节点并且向后传播,不断唤醒下一个共享式节点,从而实现多个节点线程同时获取共享同步状态
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 记录原首节点
setHead(node); // 设置当前节点为新首节点
// 这里除了使用条件 propagate > 0 判断是否唤醒后继节点,还有其它的一些判断依据
// 比如 propagate 是 tryAcquireShared() 的返回值,也是这是决定是否传播唤醒的依据之一
// 更为详细的解释可以参考 https://www.cnblogs.com/micrari/p/6937995.html
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
/**
* 这是共享锁中的核心唤醒函数,主要做的事情就是唤醒下一个线程或者设置传播状态。
* 后继线程被唤醒后,会尝试获取共享锁,如果成功之后,则又会调用setHeadAndPropagate,将唤醒传播下去。
* 这个函数的作用是保障在 acquire 和 release 存在竞争的情况下,保证队列中处于等待状态的节点能够有办法被唤醒。
*/
private void doReleaseShared() {
/*
* 以下的循环做的事情就是,在队列存在后继线程的情况下,唤醒后继线程;
* 或者由于多线程同时释放共享锁由于处在中间过程,读到head节点等待状态为0的情况下,
* 虽然不能unparkSuccessor,但为了保证唤醒能够正确稳固传递下去,设置节点状态为PROPAGATE。
* 这样的话获取锁的线程在执行setHeadAndPropagate时可以读到PROPAGATE,从而由获取锁的线程去释放后继等待线程。
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
/*
* ws = 0 的情况下,这里要尝试将状态从 0 设为 PROPAGATE,保证唤醒向后
* 传播。setHeadAndPropagate 在读到 h.waitStatus < 0 时,可以继续唤醒
* 后面的节点。
*/
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
共享式释放
共享式释放同步状态的主要逻辑都在上面的doReleaseShared()
方法中,共享节点线程在获取同步状态和释放同步状态时都会调用doReleaseShared()
,所以doReleaseShared
是多线程竞争集中的地方。
1 | public final boolean releaseShared(int arg) { |
关于共享式同步状态的获取与释放,其实这里介绍的较为简单了,由于时间原因,很多细节没有解释清楚,更为详细的分析可以参考这篇优质博客:AbstractQueuedSynchronizer源码解读