0.问题
为什么要探究LockSuport的park()/unpark()实现,因为AQS线程等待和唤醒即依赖此二者。
1. | LockSupport里的park/unpark |
---|---|
2. | park/unpark在jvm层面是如何实现的 |
3. | LockSupport里将锁设置为当前线程的parkBlocker,那么该锁对象是否与jvm底层实现如pthread_cond_wait具有关联 |
1.茴字写法
好奇park英文原意,下面是韦氏大词典的部分解释:
3.b语意解释比较符合park在锁实现中起的作用,即在某个地方停留一段时间 |
---|
2. jdk层面的park和unpark
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class LockSupport {
// Disables the current thread for thread scheduling purposes unless the permit is available.
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
public static void unpark(Thread thread) {
if (thread != null)
U.unpark(thread);
}
private static void setBlocker(Thread t, Object arg) {
UNSAFE.putObject(t, parkBlockerOffset, arg);
}
}
从上面代码来看,park/unpark很简单,主要实现在Unsafe类:
1
2
3
4
public final class Unsafe {
public native void unpark(Object thread);
public native void park(boolean isAbsolute, long time);
}
但是Unsafe中的定义是native,也就意味着,得去hotspot源码中看真正的底层实现
3.jvm层面的park/unpark溯源
后续主要以hotspot在linux平台实现进行分析。
3.1 hotspot中的Unsafe实现
jdk8u/hotspot/src/share/vm/prims/unsafe.cpp
首先看park方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// These are the methods for 1.8.0
// 方法名映射
static JNINativeMethod methods_18[] = {
...
...
{CC "park", CC "(ZJ)V", FN_PTR(Unsafe_Park)},
{CC "unpark", CC "(" OBJ ")V", FN_PTR(Unsafe_Unpark)}
};
UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
UnsafeWrapper("Unsafe_Park");
EventThreadPark event;
...
JavaThreadParkedState jtps(thread, time != 0);
// 重要方法
thread->parker()->park(isAbsolute != 0, time);
...
UNSAFE_END
再看unpark方法
1
2
3
4
5
6
7
8
9
10
11
12
13
UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread))
UnsafeWrapper("Unsafe_Unpark");
Parker* p = NULL;
...
java_thread = JNIHandles::resolve_non_null(jthread);
// jvm线程
JavaThread* thr = java_lang_Thread::thread(java_thread);
p = thr->parker();
if (p != NULL) {
// 关键点
p->unpark();
}
UNSAFE_END
可见,其关键实现在JavaThread中的parker对象
3.2 JavaThread
Java.lang.Thread是jdk应用层面的线程,JavaThread是hotspot中对前者的实现,关系如下
jvm中的线程 |
---|
jdk8u/hotspot/src/share/vm/runtime/thread.hpp->JavaThread
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Class hierarchy
// - Thread
// - NamedThread
// - VMThread
// - ConcurrentGCThread
// - WorkerThread
// - GangWorker
// - GCTaskThread
// - JavaThread
// - WatcherThread
class Thread: public ThreadShadow {
protected:
// OS data associated with the thread
OSThread* _osthread; // Platform-specific thread information
......
}
class JavaThread: public Thread {
oop _threadObj; // The Java level thread object
// JSR166 per-thread parker
private:
Parker* _parker;
......
}
还没到,我们还需要进一步看Parker实现
3.3 Parker和PlatformParker
jdk8u/hotspot/src/share/vm/runtime/park.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Parker : public os::PlatformParker {
private:
// 重要变量,通过0/1表示授权,决定是否阻塞
volatile int _counter ;
JavaThread * AssociatedWith ; // Current association
public:
Parker() : PlatformParker() {
_counter = 0 ;
AssociatedWith = NULL ;
}
public:
void park(bool isAbsolute, jlong time);
void unpark();
...
...
};
Parker继承自os::PlatformParker,也就是说,由各个平台具体实现
下面我们看linux下的实现:jdk8u/hotspot/src/os/linux/vm/os_linux.hpp->PlatformParker
1
2
3
4
5
6
7
8
9
class PlatformParker : public CHeapObj<mtInternal> {
int _cur_index; // which cond is in use: -1, 0, 1
// 锁
pthread_mutex_t _mutex [1] ;
// 条件变量
pthread_cond_t _cond [2] ; // one for relative times and one for abs.
...
...
};
park()/unpark()最终实现:jdk8u/hotspot/src/os/linux/vm/os_linux.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void Parker::park(bool isAbsolute, jlong time) {
// 原_counter不为零,有权限,不需等待
// _counter 被设置为0
if (Atomic::xchg(0, &_counter) > 0) return;
// 加锁
if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
return;
}
......
if (time == 0) {
// 等待
status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;
} else {
// 计时等待
status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;
}
......
// 从pthread_cond_wait return表明接收到signal,_counter=1
// 授权已经使用一次,作废
_counter = 0 ;
status = pthread_mutex_unlock(_mutex) ;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void Parker::unpark() {
// 整体按照加锁->通知->解锁的顺序
int s, status ;
status = pthread_mutex_lock(_mutex);
s = _counter;
_counter = 1;
if (s < 1) {
// thread might be parked
if (_cur_index != -1) {
// thread is definitely parked
if (WorkAroundNPTLTimedWaitHang) {
status = pthread_cond_signal (&_cond[_cur_index]);
status = pthread_mutex_unlock(_mutex);
} else {
int index = _cur_index;
status = pthread_mutex_unlock(_mutex);
status = pthread_cond_signal (&_cond[index]);
}
} else {
pthread_mutex_unlock(_mutex);
}
} else {
pthread_mutex_unlock(_mutex);
}
}
到这里,我们应该可以说是较为透彻但仅是粗线条的搞清了锁和线程同步底层实现的脉络
3.4 ParkEvent
与Parker非常类似,不做分析。按jvm注释,后续会用ParkEvent取代Parker。
3.5 类的关系
上面介绍的几个底层实现类,其关系如下
线程与关键属性及方法调用关系 |
---|
3.6 综上
可见,锁及条件等待依赖于下面三个函数(仅列出部分):
- pthread_mutex_lock
- pthread_cond_signal
- pthread_mutex_unlock
上述函数即为linux对posix标准接口的实现,提供锁互斥及条件等待功能。
4.pthread_cond_wait?
4.1 继续追踪
追踪至pthread_cond_wait,可以说明确了实现脉络,其内部究竟如何实现呢?
通过本节参考,使用strace,可以再进一步。示例代码节选自本节参考
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <stdio.h>
...
int main() {
pthread_cond_t cond;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
if (initializeCond(&cond)) {
return -1;
}
...
struct timespec start, end, deadline;
if (pthread_mutex_lock(&lock)) {
return -1;
}
clock_gettime(CLOCK_TYPE, &start);
addNanos(&start, &deadline, deltaNanos);
int iteration = 0;
while (!isDeadlineReached(&deadline)) {
// pthread_cond_timedwait(&cond, &lock, &deadline);
pthread_cond_wait(&cond, &lock);
iteration++;;
}
...
clock_gettime(CLOCK_TYPE, &end);
return 0;
}
编译:gcc test.c -o a.out -lpthread
追踪:strace -e futex ./a.out
futex |
---|
本节参考1
由此可见,pthread_cond_wait/pthread_cond_timedwait由futex实现。那么futex又是什么东西呢?
4.2 futex
The
futex()
system call provides a method for waiting until a certain condition becomes true. It is typically used as a blocking construct in the context of shared-memory synchronization. When using futexes, the majority of the synchronization operations are performed in user space. A user-space program employs thefutex()
system call only when it is likely that the program has to block for a longer time until the condition becomes true. Otherfutex()
operations can be used to wake any processes or threads waiting for a particular condition.
futex方法里重要的是FUTEX_WAIT 和 FUTEX_WAKE,基本对应了wait/notify。
何谓fast,大多数情况下,锁是没有竞争的,通过用户态原子操作即可完成,轻量级;少数需要等待情况下,才进入内核态,是谓重量级操作2 3 4
fast user space locking |
---|
5.parkBlocker是否与jvm实现如pthread_cond_wait具有关联
且看Class LockSupport里的一段话:
The three forms of
park
each also support ablocker
object parameter. This object is recorded while the thread is blocked to permit monitoring and diagnostic tools to identify the reasons that threads are blocked. (Such tools may access blockers using methodgetBlocker(java.lang.Thread)
.) The use of these forms rather than the original forms without this parameter is strongly encouraged. The normal argument to supply as ablocker
within a lock implementation isthis
.
上面表达的意思是,锁对象被设置到当前线程的parkBlocker字段,但是仅作为监测和诊断使用。从严格实现角度来将,java层面的锁对象,与底层实现是无关的。在openjdk/hotspot源码中,对openjdk/hotspot/src/share/vm/classfile/javaClasses.cpp::_park_blocker_offset
和openjdk/hotspot/src/share/vm/runtime/thread.cpp::current_park_blocker()
进行检索分析也能得到同样结论。
再看LockSupport::park实现:
1
2
3
4
5
6
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
从UNSAFE.park(false, 0L)
来看也与blocker没有直接关系。设置blocker仅为标记,谁阻塞了当前线程。
- 锁依赖底层实现,那么底层实现正常是不应该依赖顶层对象的
- AQS的锁互斥是在java层面通过CAS操作整型变量实现的,而等待休眠则是通过park/unpark实现的
- 不论一个线程有多少个锁,所有的锁等待依赖于线程内部同一组cond (pthread_cond_t *) 和 mutex (pthread_mutex_t *)来实现。对于同一个线程来说,不会同时等待多个锁,是没有问题的
因而从等待休眠的实现角度来说,二者没有直接关联。
5.对比java.lang.Object的wait/notify实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Object {
// Causes the current thread to wait until it is awakened,
// typically by being notified or interrupted
public final void wait() throws InterruptedException {
wait(0L);
}
public final native void wait(long timeoutMillis) throws InterruptedException;
// Wakes up a single thread that is waiting on this object's monitor.
// If any threads are waiting on this object, one of them is chosen to be awakened.
// The choice is arbitrary and occurs at the discretion of the implementation.
// A thread waits on an object's monitor by calling one of the wait methods.
//
// The awakened thread will not be able to proceed until the current thread
// relinquishes the lock on this object. The awakened thread will compete in the
// usual manner with any other threads that might be actively competing to synchronize
// on this object; for example, the awakened thread enjoys no reliable privilege or
// disadvantage in being the next thread to lock this object.
//
// This method should only be called by a thread that is the owner of this object's
// monitor. A thread becomes the owner of the object's monitor in one of three ways:
public final native void notify();
}
从notify
的注释来看,唤醒并不强制要求公平性,依赖于底层实现。
5.1 jdk层面
上面wait(0L)
由public final native void wait(long timeout) t
实现,notify()则直接为native。
可以追踪对应native接口定义:jdk/src/share/native/java/lang/Object.c
1
2
3
4
5
6
7
static JNINativeMethod methods[] = {
{"hashCode", "()I", (void *)&JVM_IHashCode},
{"wait", "(J)V", (void *)&JVM_MonitorWait},
{"notify", "()V", (void *)&JVM_MonitorNotify},
{"notifyAll", "()V", (void *)&JVM_MonitorNotifyAll},
{"clone", "()Ljava/lang/Object;", (void *)&JVM_Clone},
};
可见在jvm内部,两者分别由JVM_MonitorWait
和JVM_MonitorNotify
实现。
5.2 jvm层面
openjdk8: hotspot/src/share/vm/prims/jvm.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
JVM_ENTRY(void, JVM_MonitorWait(JNIEnv* env, jobject handle, jlong ms))
JVMWrapper("JVM_MonitorWait");
Handle obj(THREAD, JNIHandles::resolve_non_null(handle));
JavaThreadInObjectWaitState jtiows(thread, ms != 0);
if (JvmtiExport::should_post_monitor_wait()) {
JvmtiExport::post_monitor_wait((JavaThread *)THREAD, (oop)obj(), ms);
}
ObjectSynchronizer::wait(obj, ms, CHECK);
JVM_END
JVM_ENTRY(void, JVM_MonitorNotify(JNIEnv* env, jobject handle))
JVMWrapper("JVM_MonitorNotify");
Handle obj(THREAD, JNIHandles::resolve_non_null(handle));
ObjectSynchronizer::notify(obj, CHECK);
JVM_END
再进一步追踪hotspot/src/share/vm/runtime/objectMonitor.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
...
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
AddWaiter (&node) ; // *A
Thread::SpinRelease (&_WaitSetLock) ;
if (millis <= 0) {
Self->_ParkEvent->park () ; // *B
} else {
ret = Self->_ParkEvent->park (millis) ;
}
...
}
void ObjectMonitor::notify(TRAPS) {
...
ObjectWaiter * iterator = DequeueWaiter() ; // *C
...
ParkEvent * ev = iterator->_event ;
iterator->TState = ObjectWaiter::TS_RUN ;
OrderAccess::fence() ;
ev->unpark() ; // *D
...
}
在wait
中首先将当前线程加入等待队列尾部(A处),然后调用ParkEvent->park()方法(B处)。在notify
方法中,首先从队列头部拿出等待线程(C处),然后调用unpark方法(D处),可见openjdk中notify
实现具备公平性,遵守FIFO原则。
6.线程sleep和wait()的一个参考实现
锁等待状态需要主动放弃时间片时,底层需要调用常见方法sleep/wait/yield,起到释放的作用。下面参考 xv6 proc.c操作系统对上面三个方法的实现,以管窥linux系统。其核心为通过系统调用,进入内核态,然后通过上下文切换进入进程调度器,由调度器选择其它可执行线程执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// Give up the CPU for one scheduling round.
void yield(void)
{
acquire(&ptable.lock); //DOC: yieldlock
myproc()->state = RUNNABLE;
sched();
release(&ptable.lock);
}
// Wait for a child process to exit and return its pid.
int wait(void)
{
...
for(;;){
for(p = ptable.proc; p < &ptable.proc[NPROC]; p++){
if(p->state == ZOMBIE){
// 回收子进程资源
pid = p->pid;
p->killed = 0;
...
p->state = UNUSED;
return pid;
}
}
sleep(curproc, &ptable.lock);
}
}
void sleep(void *chan, struct spinlock *lk)
{
...
// Go to sleep.
p->chan = chan;
p->state = SLEEPING;
sched();
// Tidy up.
p->chan = 0;
...
}
可见这三个方法的核心是sched()
1
2
3
4
5
6
7
8
9
10
11
// Enter scheduler
void sched(void)
{
int intena;
struct proc *p = myproc();
...
intena = mycpu()->intena;
// 切换当前进程上下文,调度器上下文,进入调度器
swtch(&p->context, mycpu()->scheduler);
mycpu()->intena = intena;
}
swtch.S是一段汇编代码,保存当前线程寄存器,并将寄存器内容设置为调度器上下文,以此完成切换
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# Context switch
#
# void swtch(struct context **old, struct context *new);
#
# Save the current registers on the stack, creating
# a struct context, and save its address in *old.
# Switch stacks to new and pop previously-saved registers.
.globl swtch
swtch:
movl 4(%esp), %eax
movl 8(%esp), %edx
# Save old callee-saved registers
pushl %ebp
pushl %ebx
pushl %esi
pushl %edi
# Switch stacks
movl %esp, (%eax)
movl %edx, %esp
# Load new callee-saved registers
popl %edi
popl %esi
popl %ebx
popl %ebp
ret