java同步机制之lock(基于AbstractQueuedSynchronizer)-part2

Posted by My Blog on June 3, 2021

[toc]

0.从问题出发

1 为什么要设计AbstractQueuedSynchronizer(后面简称AQS)而不直接封装利用(比如pthread_mutex_lock/pthread_cond_wait)?
2 AQS如何实现公平竞争?
3 什么是Condition,为什么要有条件变量?
4 LockSupport里面park/unpark在底层hotspot是如何实现的?
5 Thread.interrupt在底层是如何实现的?
6 LockSupport里面parkBlockerOffset是如何跟hotspot jvm底层pthread_mutex_lock/pthread_cond_wait建立关联的?

1.为什么要设计AQS

锁及条件等待以在linux平台上是通过pthread_mutex_lockpthread_cond_wait实现,那么hotspot jvm为什么不直接简单封装来提供对应功能呢?我认为原因有下面几个因素:

  • 性能考虑。如果封装实现,那么就需要系统调用,性能损耗大。而AQS利用轻量级volatileCAS规避了这一点,只有在多个线程同时竞争才会利用pthread相应API进入等待
  • 公平性考虑。操作系统调度的不确定性,使得依赖底层函数无法保证公平性。因此,AQS根据加锁先后顺序建立了FIFO等待队列

此外,相对于synchronized,AQS提供了:

  • 更佳的性能,更灵活的控制
  • 公平等待队列(Object.notify不保证唤醒顺序)
  • 更便捷更多的条件变量

2.AQS特性

AQS是java中几乎所有锁的基类,内置了CAS,等待队列,park/unpark,Condition等核心功能。

  • 在不需或者不考虑等待时,直接利用CAS操作抢占锁,是为轻量级
  • FIFO队列,唤醒时优先从队列头部开始(不论锁是否公平),也避免了操作系统唤醒的不确定和无序性
  • 所谓公平:有人排队则先加入队尾排队,是为公平;先抢,失败再入队尾排队,是为非公平
  • 利用park/unpark方法进入休眠和唤醒
  • 理论上无限多条件等待变量和队列

3.AQS如何实现锁公平竞争

加锁时,会利用CAS尝试加锁,成功则直接返回。失败则加入等待队列。锁持有者在释放锁时,会优先通知等待队列头部线程。

3.1 加锁过程

java.util.concurrent.locks.AbstractQueuedSynchronizer

1
2
3
4
5
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

首先看tryAcquire,由于AQS是基类,以ReentrantLock为例,其内部根据公平与否有两个实现:

java.util.concurrent.locks.ReentrantLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 公平实现
protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            ...
            return false;
        }
// 非公平实现
final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            ...
            return false;
        }

所以此处所谓公平是指如果有等待队列,则不去竞争锁。而非公平锁,是就新加入的竞争线程而言,它不考虑是否有等待队列,直接参与争抢。而已经进入等待队列者,则遵循FIFO原则,都是公平的。

如果上一步失败,则看acquireQueued,它首先会通过addWaiter将当前线程封装成一个Node,放入等待队列尾部:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 先进入等待队列尾部
private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
// 在依照顺序在队列中等待拿锁
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) { // *A
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) // *B
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

acquireQueued中有两个关键点:

  • A 表明队列头部线程会首先尝试加锁。成功后依次出队,保证了加锁的公平性
  • 如果当前线程节点不是头部线程,那么会通过parkAndCheckInterrupt进入休眠等待

3.2 锁释放及通知过程

java.util.concurrent.locks.AbstractQueuedSynchronizer

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

从上述代码可见,当持有者释放锁后,会优先通知等待队列头部线程。因此在释放通知时也保证了公平

4.AQS如何解决操作系统线程唤醒不确定性问题

且看acquireQueued,虽然前面提到了这个代码,但是此处是从不同角度来看这个问题。假设队列中某非头部线程被错误唤醒(注释B处),那么会再次进入for循环,发现自己并不是头部节点,再次休眠等待。这样就解决了操作系统唤醒的随机性问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) { // *A
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) // *B
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

5.什么是Condition,为什么要有Condition?

  • 锁的核心功能是线程间互斥

  • Condition的核心功能是线程间协作

  • 锁保护的关键数据可能处于不同的状态,因而可能需要多个Condition

  • 每个Condition可以提供独立的等待队列,这个是合理的;每个队列各自仍符合FIFO原则

  • Condition调用signal时,会将Condition队列头部线程移交到锁等待队列(sync queue)的尾部;

    也就是说条件等待线程被激活时,仍然要按照公平原则加入锁等待队列

  • 比锁更高级的封装,支持更高级的功能

5.1 Condition等待队列与锁同步队列的关系

锁通过生成新的Condition,原则上可以有无限多个等待队列。等待队列只能通过await()方法进入,通过signal()方法退出

thread state case1: await()
thread state case2: signal()

5.2 Condition实现

java.util.concurrent.locks.ConditionObject从代码角度分析,首先看进入等待队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
        /**
         * ConditionObject implements Condition
         * 
         * Implements interruptible condition wait.
         */
public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 进入Condition等待队列
            Node node = addConditionWaiter();
            // 释放锁
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // 查看是否有被signal()通知,没有则休眠
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 退出前,再次拿锁
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

再来看看signal()实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
        /**
         * Moves the longest-waiting thread, if one exists, from the
         * wait queue for this condition to the wait queue for the
         * owning lock.
         *
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
                	do {
                	  // 将等待队列里第一个线程出队
                		if ( (firstWaiter = first.nextWaiter) == null)
                    		lastWaiter = null;
                		first.nextWaiter = null;
            				} while (!transferForSignal(first) &&
                     				(first = firstWaiter) != null);
        }
        
        // 再看看transferForSignal
        transferForSignal(first)
        // 将退出等待队列的第一个线程放入抢锁队列
        enq(node)

6.线程唤醒notify的隐含语义

线程状态 notify语义1
thread state thread state

java使用的是mesa语义,也就是说仅将激活线程放入ready 队列,并不一定会立即得到操作系统调度机会。这也就意味着,当线程终于执行时,它所等待的状态可能已经改变,所以需要while循环判断,而不能仅仅if

7.其它疑问

  • LockSupport里面park/unpark在底层hotspot层面是如何实现的?

  • Thread.interrupt在底层是如何实现的?

  • LockSupport里面parkBlockerOffset是如何跟hotspot jvm底层pthread_mutex_lock/pthread_cond_wait建立关联的?

    这些都是很有意思的问题,且看下回分解

8.references