[toc]
0.从问题出发
1 | 为什么要设计AbstractQueuedSynchronizer(后面简称AQS)而不直接封装利用(比如pthread_mutex_lock/pthread_cond_wait)? |
---|---|
2 | AQS如何实现公平竞争? |
3 | 什么是Condition,为什么要有条件变量? |
4 | LockSupport里面park/unpark在底层hotspot是如何实现的? |
5 | Thread.interrupt在底层是如何实现的? |
6 | LockSupport里面parkBlockerOffset是如何跟hotspot jvm底层pthread_mutex_lock/pthread_cond_wait建立关联的? |
1.为什么要设计AQS
锁及条件等待以在linux平台上是通过pthread_mutex_lock
和pthread_cond_wait
实现,那么hotspot jvm为什么不直接简单封装来提供对应功能呢?我认为原因有下面几个因素:
- 性能考虑。如果封装实现,那么就需要系统调用,性能损耗大。而AQS利用轻量级
volatile
和CAS
规避了这一点,只有在多个线程同时竞争才会利用pthread相应API进入等待 - 公平性考虑。操作系统调度的不确定性,使得依赖底层函数无法保证公平性。因此,AQS根据加锁先后顺序建立了FIFO等待队列
此外,相对于synchronized
,AQS提供了:
- 更佳的性能,更灵活的控制
- 公平等待队列(Object.notify不保证唤醒顺序)
- 更便捷更多的条件变量
2.AQS特性
AQS是java中几乎所有锁的基类,内置了CAS,等待队列,park/unpark,Condition等核心功能。
- 在不需或者不考虑等待时,直接利用CAS操作抢占锁,是为轻量级
- FIFO队列,唤醒时优先从队列头部开始(不论锁是否公平),也避免了操作系统唤醒的不确定和无序性
- 所谓公平:有人排队则先加入队尾排队,是为公平;先抢,失败再入队尾排队,是为非公平
- 利用park/unpark方法进入休眠和唤醒
- 理论上无限多条件等待变量和队列
3.AQS如何实现锁公平竞争
加锁时,会利用CAS尝试加锁,成功则直接返回。失败则加入等待队列。锁持有者在释放锁时,会优先通知等待队列头部线程。
3.1 加锁过程
java.util.concurrent.locks.AbstractQueuedSynchronizer
1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
首先看tryAcquire
,由于AQS是基类,以ReentrantLock为例,其内部根据公平与否有两个实现:
java.util.concurrent.locks.ReentrantLock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 公平实现
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
...
return false;
}
// 非公平实现
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
...
return false;
}
所以此处所谓公平是指如果有等待队列,则不去竞争锁。而非公平锁,是就新加入的竞争线程而言,它不考虑是否有等待队列,直接参与争抢。而已经进入等待队列者,则遵循FIFO原则,都是公平的。
如果上一步失败,则看acquireQueued
,它首先会通过addWaiter
将当前线程封装成一个Node,放入等待队列尾部:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 先进入等待队列尾部
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
// 在依照顺序在队列中等待拿锁
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // *A
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // *B
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquireQueued
中有两个关键点:
- A 表明队列头部线程会首先尝试加锁。成功后依次出队,保证了加锁的公平性
- 如果当前线程节点不是头部线程,那么会通过
parkAndCheckInterrupt
进入休眠等待
3.2 锁释放及通知过程
java.util.concurrent.locks.AbstractQueuedSynchronizer
1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
从上述代码可见,当持有者释放锁后,会优先通知等待队列头部线程。因此在释放通知时也保证了公平
4.AQS如何解决操作系统线程唤醒不确定性问题
且看acquireQueued
,虽然前面提到了这个代码,但是此处是从不同角度来看这个问题。假设队列中某非头部线程被错误唤醒(注释B处),那么会再次进入for循环,发现自己并不是头部节点,再次休眠等待。这样就解决了操作系统唤醒的随机性问题。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // *A
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // *B
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
5.什么是Condition,为什么要有Condition?
-
锁的核心功能是线程间互斥
-
Condition的核心功能是线程间协作
-
锁保护的关键数据可能处于不同的状态,因而可能需要多个Condition
-
每个Condition可以提供独立的等待队列,这个是合理的;每个队列各自仍符合FIFO原则
-
Condition调用signal时,会将Condition队列头部线程移交到锁等待队列(sync queue)的尾部;
也就是说条件等待线程被激活时,仍然要按照公平原则加入锁等待队列
-
比锁更高级的封装,支持更高级的功能
5.1 Condition等待队列与锁同步队列的关系
锁通过生成新的Condition,原则上可以有无限多个等待队列。等待队列只能通过await()方法进入,通过signal()方法退出
case1: await() | |
---|---|
case2: signal() |
5.2 Condition实现
java.util.concurrent.locks.ConditionObject
从代码角度分析,首先看进入等待队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* ConditionObject implements Condition
*
* Implements interruptible condition wait.
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 进入Condition等待队列
Node node = addConditionWaiter();
// 释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 查看是否有被signal()通知,没有则休眠
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 退出前,再次拿锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
再来看看signal()实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
do {
// 将等待队列里第一个线程出队
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
// 再看看transferForSignal
transferForSignal(first)
// 将退出等待队列的第一个线程放入抢锁队列
enq(node)
6.线程唤醒notify的隐含语义
线程状态 | notify语义1 |
---|---|
java使用的是mesa语义,也就是说仅将激活线程放入ready 队列,并不一定会立即得到操作系统调度机会。这也就意味着,当线程终于执行时,它所等待的状态可能已经改变,所以需要while
循环判断,而不能仅仅if
7.其它疑问
-
LockSupport里面park/unpark在底层hotspot层面是如何实现的?
-
Thread.interrupt在底层是如何实现的?
-
LockSupport里面parkBlockerOffset是如何跟hotspot jvm底层pthread_mutex_lock/pthread_cond_wait建立关联的?
这些都是很有意思的问题,且看下回分解