前言
Condition
是一个与Object
中的wait() / nofity() / notifyAll()
功能相似的接口,AQS的内部类ConditionObject
实现了该接口。它与Object
提供的这些方法一样用来协调线程间的同步关系,而不同之处在于Object
中的这些方法需要配合Synchronized
关键字使用(否则会抛出异常),而Condition
中的方法则要配合锁(独占锁)来使用(否则也会抛出异常)。
ConditionObject
内部维护了一个条件队列,当线程不满足某些条件的时候就会通过await()
方法将当前线程加入到条件队列中,而当条件队列上等待的线程被signal() / signalAll()
后,又会被转移到AQS的同步队列中尝试获取锁。接下来就其中最核心的三个方法await()
、signal()
、signalAll()
说起,它们也分别对标了Object
中的那三个方法。
await()
await()
方法负责将当前线程包装成一个Node
后加入到条件队列中,并且需要释放持有的独占锁进入阻塞状态。这里先总结一下它的大致流程:
- 将当前线程加入到条件队列中
- 完全释放互斥锁
- 如果当前线程未在同步队列中,就将其阻塞
- 否则,重新获取锁并根据是否发生中断而做出不同反应(抛出异常或重新中断)
1 | public final void await() throws InterruptedException { |
addConditionWaiter()
首先看看第一个关键步骤,也就是将当前线程加入到条件队列中:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果条件队列中最后一个节点的状态是 CANCELLED
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters(); // 清理队列
t = lastWaiter; // 重读 lastWaiter
}
// 将当前线程封装为一个 Node
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null) // 如果当前队列没有节点
firstWaiter = node;
else // 将当前队列的尾节点连接到新节点
t.nextWaiter = node;
// 将新节点作为新尾节点
lastWaiter = node;
// 返回新节点
return node;
}
这里会先判断条件队列中的最后一个节点是否为取消状态,如果是的话就调用unlinkCancelledWaiters()
进行清理,清理的过程其实就是将条件队列中所有取消的节点都移除。之后将当前线程封装成Node
后与当前队列最后一个节点的nextWaiter
关联即可。
fullyRelease()
此时已经将线程加入到条件队列中了,调用fullyRelease()
方法完全释放同步状态。这里的“完全”指的是对于重入锁来说,每次加锁都会将AQS的整型成员变量state++
,而每次解锁时会将state--
,因此这里将state
的数量完全释放掉。
1 | final int fullyRelease(Node node) { |
isOnSyncQueue()
该方法用于判断某个节点是否转移到了同步队列上(因为别的线程有可能通过signal() / signalAll()
将其转移了),如果没有就将其阻塞。
1 | final boolean isOnSyncQueue(Node node) { |
checkInterruptWhileWaiting()
checkInterruptWhileWaiting()
方法用于检测线程在等待期间是否发生了中断,注意该方法是在LockSupport.park(this);
这一行之后,也就是说此时线程已经从阻塞中返回了,返回的原因有可能是因为中断,也有可能是因为signal() / signalAll()
。
1 | private int checkInterruptWhileWaiting(Node node) { |
执行完上面几个方法,当从while (!isOnSyncQueue(node))
循环中跳出时,说明节点已经转移到了同步队列中了,此时通过acquireQueued(node, savedState)
方法重新获取锁,并且如果线程发生过中断则根据THROW_IE
或是REINTERRUPT
分别抛出异常或者重新中断。
signal() / signalAll()
上面的await()
方法中从while (!isOnSyncQueue(node))
循环跳出可不是自己独立就能做到的,它是需要signal() / signalAll()
配合的。signal() / signalAll()
的工作就是负责将条件队列中的节点转移到同步队列中,两个方法的区别在于signal()
只会转移首节点,而signalAll()
会转移队列上的所有节点。
1 | public final void signal() { |
可以看出,signal()
一定会转移条件队列中的一个节点,除非队列中彻底空了。
1 | public final void signalAll() { |
signalAll()
与signal()
的主要不同在于循环条件中,因为它会将条件队列中的所有节点都转移,因此实现起来稍微简单一些。
JDK BUG
这里再讲一下jdk在上面实现中的一个bug。对比上面await()
和signal() / signalAll()
的源码可以发现,await()
方法并没有做同步控制,也就是signal() / signalAll()
方法开头的if (!isHeldExclusively()) throw new IllegalMonitorStateException();
。因此,如果没有获取锁就调用该方法,会产生线程竞争的情况,导致条件队列的结构被破坏。例如,以下添加节点到条件队列的方法:
1 | private Node addConditionWaiter() { |
如果有两个线程同时执行到if (t == null)
时,可能会造成firstWaiter
先指向其中一个,之后却被另一个给覆盖了,那么此时其中一个线程将会一直阻塞下去,因为这个线程的node并不在条件队列中,也就永远不会被signal() / signalAll()
转移到同步队列上,唯一能从阻塞中返回的可能就是被中断。