前言

Condition是一个与Object中的wait() / nofity() / notifyAll()功能相似的接口,AQS的内部类ConditionObject实现了该接口。它与Object提供的这些方法一样用来协调线程间的同步关系,而不同之处在于Object中的这些方法需要配合Synchronized关键字使用(否则会抛出异常),而Condition中的方法则要配合锁(独占锁)来使用(否则也会抛出异常)。

ConditionObject内部维护了一个条件队列,当线程不满足某些条件的时候就会通过await()方法将当前线程加入到条件队列中,而当条件队列上等待的线程被signal() / signalAll()后,又会被转移到AQS的同步队列中尝试获取锁。接下来就其中最核心的三个方法await()signal()signalAll()说起,它们也分别对标了Object中的那三个方法。

await()

await()方法负责将当前线程包装成一个Node后加入到条件队列中,并且需要释放持有的独占锁进入阻塞状态。这里先总结一下它的大致流程:

  1. 将当前线程加入到条件队列中
  2. 完全释放互斥锁
  3. 如果当前线程未在同步队列中,就将其阻塞
  4. 否则,重新获取锁并根据是否发生中断而做出不同反应(抛出异常或重新中断)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public final void await() throws InterruptedException {
if (Thread.interrupted()) // 响应中断
throw new InterruptedException();
// <1> 将当前线程包装成 Node 并加入到条件队列中
Node node = addConditionWaiter();
// <2> 完全释放互斥锁(不论锁是否可以重入),如果没有持锁,会抛出异常
int savedState = fullyRelease(node);
int interruptMode = 0;

while (!isOnSyncQueue(node)) {
// <3> 只要仍未转移到同步队列就阻塞,转移的情况如下:
// 1. 其它线程调用 signal 将当前线程节点转移到同步队列并唤醒当前线程
// 2. 其它线程调用 signalAll
// 3. 其它线程中断了当前线程,当前线程会自行尝试进入同步队列
LockSupport.park(this);
// 获取中断模式。在线程从park中被唤醒的时候,需要判断此时是否被中断,若中断则尝试转移到同步队列
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// <4> 重新获取互斥锁过程中如果发生中断并且 interruptMode 不为 THROW_IE,则将 interruptMode 设置为 REINTERRUPT
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 如果线程发生过中断则根据 THROW_IE 或是 REINTERRUPT 分别抛出异常或者重新中断
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

addConditionWaiter()

首先看看第一个关键步骤,也就是将当前线程加入到条件队列中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果条件队列中最后一个节点的状态是 CANCELLED
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters(); // 清理队列
t = lastWaiter; // 重读 lastWaiter
}
// 将当前线程封装为一个 Node
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null) // 如果当前队列没有节点
firstWaiter = node;
else // 将当前队列的尾节点连接到新节点
t.nextWaiter = node;
// 将新节点作为新尾节点
lastWaiter = node;
// 返回新节点
return node;
}

这里会先判断条件队列中的最后一个节点是否为取消状态,如果是的话就调用unlinkCancelledWaiters()进行清理,清理的过程其实就是将条件队列中所有取消的节点都移除。之后将当前线程封装成Node后与当前队列最后一个节点的nextWaiter关联即可。

fullyRelease()

此时已经将线程加入到条件队列中了,调用fullyRelease()方法完全释放同步状态。这里的“完全”指的是对于重入锁来说,每次加锁都会将AQS的整型成员变量state++,而每次解锁时会将state--,因此这里将state的数量完全释放掉。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 获取同步状态数值
int savedState = getState();
// 调用 release() 释放指定数量的同步状态
if (release(savedState)) {
failed = false;
return savedState; // 返回释放的数量
} else {
throw new IllegalMonitorStateException();
}
} finally {
// 如果释放时出现异常,将该 Node 的等待状态设置为 CANCELLED
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

isOnSyncQueue()

该方法用于判断某个节点是否转移到了同步队列上(因为别的线程有可能通过signal() / signalAll()将其转移了),如果没有就将其阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
final boolean isOnSyncQueue(Node node) {
// 等待状态如果是 CONDITION 则一定是在条件队列,或者如果 prev 为 null 也一定是在条件队列
// (同步队列新入队的节点的 prev 值是不可能为 null 的,因为有dummy节点的存在)
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false; // 不在同步队列中,直接返回 false

// 条件队列的节点是通过 nextWaiter 来维护的,不用 next 和 prev,因此如果节点在条件队列中则 next 和 prev 应该都为 null
// 如果 next 不为 null,则说明一定是在同步队列中
// 这里还要说明的是在 cancelAcquire() 方法中,一个节点取消的时候会把自己的 next 域指向自己,即 node.next = next; 而不是node.next = null;
// 通过这种方式,在这里就可以将其和在同步队列上的情况归一化判断,都返回 true
// 如果 cancelAcquire() 方法中写成 node.next = null; 的形式,这里的判断不满足条件,那么又要往底下进一步判断
if (node.next != null)
return true;

// 虽然 node.prev 为 null 可以说明此时节点不在同步队列中,
// 但如果 node.next 为 null 并不能说明 node 就不在同步队列中,因为新节点入队时会先设置 prev 然后再设置 next
// 此时由 tail 节点开始从后向前遍历一次,确定节点是否真的不在同步队列中
return findNodeFromTail(node);
}

checkInterruptWhileWaiting()

checkInterruptWhileWaiting()方法用于检测线程在等待期间是否发生了中断,注意该方法是在LockSupport.park(this);这一行之后,也就是说此时线程已经从阻塞中返回了,返回的原因有可能是因为中断,也有可能是因为signal() / signalAll()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private int checkInterruptWhileWaiting(Node node) {
// 检测线程在等待期间是否发生了中断,如果未发生中断,直接返回0
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}

/**
* 判断中断发生的时期,分为两种:
* 1. 中断在节点被转移到同步队列前发生,此时返回 true
* 2. 中断在节点被转移到同步队列后发生,此时返回 false
*/
final boolean transferAfterCancelledWait(Node node) {
// 第一种情况,中断在节点被转移到同步队列前发生
// 此时自行将节点转移到同步队列上,并返回 true
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}

// 如果上面的CAS失败了,说明已经有线程调用 signal() / signalAll() 方法了,
// 这两个方法都会先将节点等待状态由 CONDITION 设为0后,再调用 enq() 方法转移节点
// 此时有可能仅设置了等待状态而没来得及将节点转移到同步队列中就被切换走了,
// 此时自旋等待节点成功进入同步队列
while (!isOnSyncQueue(node))
Thread.yield(); // 让出 CPU
return false;
}

执行完上面几个方法,当从while (!isOnSyncQueue(node))循环中跳出时,说明节点已经转移到了同步队列中了,此时通过acquireQueued(node, savedState)方法重新获取锁,并且如果线程发生过中断则根据THROW_IE或是REINTERRUPT分别抛出异常或者重新中断。

signal() / signalAll()

上面的await()方法中从while (!isOnSyncQueue(node))循环跳出可不是自己独立就能做到的,它是需要signal() / signalAll()配合的。signal() / signalAll()的工作就是负责将条件队列中的节点转移到同步队列中,两个方法的区别在于signal()只会转移首节点,而signalAll()会转移队列上的所有节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public final void signal() {
// 检查线程是否获取了独占锁,未获取独占锁调用 signal 方法是不允许的
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null) // 头节点不为 null
doSignal(first); // 将头节点转移到同步队列中
}

private void doSignal(Node first) {
do {
// 如果下面这个条件满足了,说明条件队列中只有一个节点,此时 lastWaiter 设为 null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 调用 transferForSignal() 将节点转移到同步队列中,如果失败,且 firstWaiter 不为null,则继续尝试,transferForSignal() 成功了或者队列中没节点了,while 循环就结束了
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

// 这个方法用于将条件队列中的节点转移到同步队列中
final boolean transferForSignal(Node node) {
// 如果将节点的等待状态由 CONDITION 设为0失败,则表明节点被取消
// 注意:因为 transferForSignal() 不存在竞争的问题,所以唯一的可能就是节点被取消
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

// 调用 enq() 方法将 node 转移到同步队列中,并返回 node 的前驱节点(原尾节点)p
Node p = enq(node);
int ws = p.waitStatus;
// 如果前驱结点状态为取消或者无法将状态CAS到SIGNAL,
// 则需要唤醒参数node节点对应的线程,使其能开始尝试争锁
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

可以看出,signal()一定会转移条件队列中的一个节点,除非队列中彻底空了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public final void signalAll() {
// // 检查线程是否获取了独占锁,未获取独占锁调用 signalAll() 方法是不允许的
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}

private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
// 将条件队列中所有节点都转移到同步队列中,与 doSignal() 的主要区别在于 while 循环条件上
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first); // 调用 transferForSignal() 将节点转移到同步队列中
first = next;
} while (first != null);
}

signalAll()signal()的主要不同在于循环条件中,因为它会将条件队列中的所有节点都转移,因此实现起来稍微简单一些。

JDK BUG

这里再讲一下jdk在上面实现中的一个bug。对比上面await()signal() / signalAll()的源码可以发现,await()方法并没有做同步控制,也就是signal() / signalAll()方法开头的if (!isHeldExclusively()) throw new IllegalMonitorStateException();。因此,如果没有获取锁就调用该方法,会产生线程竞争的情况,导致条件队列的结构被破坏。例如,以下添加节点到条件队列的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果条件队列中最后一个节点的状态是 CANCELLED
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters(); // 清理队列
t = lastWaiter; // 重读 lastWaiter
}
// 将当前线程封装为一个 Node
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null) // 如果当前队列没有节点
firstWaiter = node;
else // 将当前队列的尾节点连接到新节点
t.nextWaiter = node;
// 将新节点作为新尾节点
lastWaiter = node;
// 返回新节点
return node;
}

如果有两个线程同时执行到if (t == null)时,可能会造成firstWaiter先指向其中一个,之后却被另一个给覆盖了,那么此时其中一个线程将会一直阻塞下去,因为这个线程的node并不在条件队列中,也就永远不会被signal() / signalAll()转移到同步队列上,唯一能从阻塞中返回的可能就是被中断。

参考资料