java同步机制之LockSupport::park/unpark实现溯源-part4

Posted by My Blog on April 3, 2024

0.问题

为什么要探究LockSuport的park()/unpark()实现,因为AQS线程等待和唤醒即依赖此二者。

1. LockSupport里的park/unpark
2. park/unpark在jvm层面是如何实现的
3. LockSupport里将锁设置为当前线程的parkBlocker,那么该锁对象是否与jvm底层实现如pthread_cond_wait具有关联

1.茴字写法

好奇park英文原意,下面是韦氏大词典的部分解释:

3.b语意解释比较符合park在锁实现中起的作用,即在某个地方停留一段时间
dictionary

2. jdk层面的park和unpark

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class LockSupport {
  // Disables the current thread for thread scheduling purposes unless the permit is available.
  public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }
  
  public static void unpark(Thread thread) {
        if (thread != null)
            U.unpark(thread);
  }
  
  private static void setBlocker(Thread t, Object arg) {
        UNSAFE.putObject(t, parkBlockerOffset, arg);
  }
}

从上面代码来看,park/unpark很简单,主要实现在Unsafe类:

1
2
3
4
public final class Unsafe {
	public native void unpark(Object thread);
	public native void park(boolean isAbsolute, long time);
}

但是Unsafe中的定义是native,也就意味着,得去hotspot源码中看真正的底层实现

3.jvm层面的park/unpark溯源

后续主要以hotspot在linux平台实现进行分析。

3.1 hotspot中的Unsafe实现

jdk8u/hotspot/src/share/vm/prims/unsafe.cpp

首先看park方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// These are the methods for 1.8.0
// 方法名映射
static JNINativeMethod methods_18[] = {
    ...
    ...
    {CC "park",               CC "(ZJ)V",                  FN_PTR(Unsafe_Park)},
    {CC "unpark",             CC "(" OBJ ")V",               FN_PTR(Unsafe_Unpark)}
};

UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
  UnsafeWrapper("Unsafe_Park");
  EventThreadPark event;
  ...
  JavaThreadParkedState jtps(thread, time != 0);
  // 重要方法
  thread->parker()->park(isAbsolute != 0, time);
	...
UNSAFE_END

再看unpark方法

1
2
3
4
5
6
7
8
9
10
11
12
13
UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread))
  UnsafeWrapper("Unsafe_Unpark");
  Parker* p = NULL;
  ...
  java_thread = JNIHandles::resolve_non_null(jthread);
  // jvm线程
  JavaThread* thr = java_lang_Thread::thread(java_thread);
  p = thr->parker();
  if (p != NULL) {
    // 关键点
    p->unpark();
  }
UNSAFE_END

可见,其关键实现在JavaThread中的parker对象

3.2 JavaThread

Java.lang.Thread是jdk应用层面的线程,JavaThread是hotspot中对前者的实现,关系如下

jvm中的线程
header

jdk8u/hotspot/src/share/vm/runtime/thread.hpp->JavaThread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Class hierarchy
// - Thread
//   - NamedThread
//     - VMThread
//     - ConcurrentGCThread
//     - WorkerThread
//       - GangWorker
//       - GCTaskThread
//   - JavaThread
//   - WatcherThread

class Thread: public ThreadShadow {
  
  protected:
  // OS data associated with the thread
  OSThread* _osthread;  // Platform-specific thread information
  
  ......
}

class JavaThread: public Thread {
	oop          _threadObj; // The Java level thread object
	
	// JSR166 per-thread parker
	private:
  	Parker*    _parker;  
  
  ......
}

还没到,我们还需要进一步看Parker实现

3.3 Parker和PlatformParker

jdk8u/hotspot/src/share/vm/runtime/park.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Parker : public os::PlatformParker {
private:
  // 重要变量,通过0/1表示授权,决定是否阻塞
  volatile int _counter ;
  JavaThread * AssociatedWith ; // Current association

public:
  Parker() : PlatformParker() {
    _counter       = 0 ;
    AssociatedWith = NULL ;
  }

public:
  void park(bool isAbsolute, jlong time);
  void unpark();
  ...
  ...
};

Parker继承自os::PlatformParker,也就是说,由各个平台具体实现

下面我们看linux下的实现:jdk8u/hotspot/src/os/linux/vm/os_linux.hpp->PlatformParker

1
2
3
4
5
6
7
8
9
class PlatformParker : public CHeapObj<mtInternal> {
    int _cur_index;  // which cond is in use: -1, 0, 1
    // 锁
    pthread_mutex_t _mutex [1] ;
    // 条件变量
    pthread_cond_t  _cond  [2] ; // one for relative times and one for abs.
    ...
    ...
};

park()/unpark()最终实现jdk8u/hotspot/src/os/linux/vm/os_linux.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void Parker::park(bool isAbsolute, jlong time) {
  // 原_counter不为零,有权限,不需等待
  // _counter 被设置为0
  if (Atomic::xchg(0, &_counter) > 0) return;
  
	// 加锁
  if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
    return;
  }

	......
	
  if (time == 0) {
  	// 等待
    status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;
  } else {
    // 计时等待
    status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;
  }
  
  ......
  
	// 从pthread_cond_wait return表明接收到signal,_counter=1
	// 授权已经使用一次,作废
  _counter = 0 ;
  status = pthread_mutex_unlock(_mutex) ;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void Parker::unpark() {
  // 整体按照加锁->通知->解锁的顺序
  int s, status ;
  status = pthread_mutex_lock(_mutex);
  s = _counter;
  _counter = 1;
  if (s < 1) {
    // thread might be parked
    if (_cur_index != -1) {
      // thread is definitely parked
      if (WorkAroundNPTLTimedWaitHang) {
        status = pthread_cond_signal (&_cond[_cur_index]);
        status = pthread_mutex_unlock(_mutex);
      } else {
        int index = _cur_index;
        status = pthread_mutex_unlock(_mutex);
        status = pthread_cond_signal (&_cond[index]);
      }
    } else {
      pthread_mutex_unlock(_mutex);
    }
  } else {
    pthread_mutex_unlock(_mutex);
  }
}

到这里,我们应该可以说是较为透彻但仅是粗线条的搞清了锁和线程同步底层实现的脉络

3.4 ParkEvent

与Parker非常类似,不做分析。按jvm注释,后续会用ParkEvent取代Parker。

3.5 类的关系

上面介绍的几个底层实现类,其关系如下

线程与关键属性及方法调用关系
header

3.6 综上

可见,锁及条件等待依赖于下面三个函数(仅列出部分):

  • pthread_mutex_lock
  • pthread_cond_signal
  • pthread_mutex_unlock

上述函数即为linux对posix标准接口的实现,提供锁互斥及条件等待功能。

4.pthread_cond_wait?

4.1 继续追踪

追踪至pthread_cond_wait,可以说明确了实现脉络,其内部究竟如何实现呢?

通过本节参考,使用strace,可以再进一步。示例代码节选自本节参考

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <stdio.h>
...

int main() {
    pthread_cond_t cond;
    pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;

    if (initializeCond(&cond)) {
        return -1;
    }
    ...
    
    struct timespec start, end, deadline;
    if (pthread_mutex_lock(&lock)) {
        return -1;
    }
    clock_gettime(CLOCK_TYPE, &start);
    addNanos(&start, &deadline, deltaNanos);

    int iteration = 0;
    while (!isDeadlineReached(&deadline)) {
        // pthread_cond_timedwait(&cond, &lock, &deadline);
    	  pthread_cond_wait(&cond, &lock);
        iteration++;;
    }
		...
    clock_gettime(CLOCK_TYPE, &end);
    return 0;
}

编译:gcc test.c -o a.out -lpthread

追踪:strace -e futex ./a.out

futex
header

本节参考1

由此可见,pthread_cond_wait/pthread_cond_timedwait由futex实现。那么futex又是什么东西呢?

4.2 futex

The futex() system call provides a method for waiting until a certain condition becomes true. It is typically used as a blocking construct in the context of shared-memory synchronization. When using futexes, the majority of the synchronization operations are performed in user space. A user-space program employs the futex() system call only when it is likely that the program has to block for a longer time until the condition becomes true. Other futex() operations can be used to wake any processes or threads waiting for a particular condition.

futex方法里重要的是FUTEX_WAITFUTEX_WAKE,基本对应了wait/notify。

何谓fast,大多数情况下,锁是没有竞争的,通过用户态原子操作即可完成,轻量级;少数需要等待情况下,才进入内核态,是谓重量级操作2 3 4

fast user space locking
header

5.parkBlocker是否与jvm实现如pthread_cond_wait具有关联

且看Class LockSupport里的一段话:

The three forms of park each also support a blocker object parameter. This object is recorded while the thread is blocked to permit monitoring and diagnostic tools to identify the reasons that threads are blocked. (Such tools may access blockers using method getBlocker(java.lang.Thread).) The use of these forms rather than the original forms without this parameter is strongly encouraged. The normal argument to supply as a blocker within a lock implementation is this.

​ 上面表达的意思是,锁对象被设置到当前线程的parkBlocker字段,但是仅作为监测和诊断使用。从严格实现角度来将,java层面的锁对象,与底层实现是无关的。在openjdk/hotspot源码中,对openjdk/hotspot/src/share/vm/classfile/javaClasses.cpp::_park_blocker_offsetopenjdk/hotspot/src/share/vm/runtime/thread.cpp::current_park_blocker()进行检索分析也能得到同样结论。

​ 再看LockSupport::park实现:

1
2
3
4
5
6
public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }

UNSAFE.park(false, 0L)来看也与blocker没有直接关系。设置blocker仅为标记,谁阻塞了当前线程。

  • 锁依赖底层实现,那么底层实现正常是不应该依赖顶层对象的
  • AQS的锁互斥是在java层面通过CAS操作整型变量实现的,而等待休眠则是通过park/unpark实现的
  • 不论一个线程有多少个锁,所有的锁等待依赖于线程内部同一组cond (pthread_cond_t *) 和 mutex (pthread_mutex_t *)来实现。对于同一个线程来说,不会同时等待多个锁,是没有问题的

因而从等待休眠的实现角度来说,二者没有直接关联。

5.对比java.lang.Object的wait/notify实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Object {
  // Causes the current thread to wait until it is awakened,
  // typically by being notified or interrupted
	public final void wait() throws InterruptedException {
        wait(0L);
  }
  
  public final native void wait(long timeoutMillis) throws InterruptedException;
  
  // Wakes up a single thread that is waiting on this object's monitor. 
  // If any threads are waiting on this object, one of them is chosen to be awakened. 
  // The choice is arbitrary and occurs at the discretion of the implementation. 
  // A thread waits on an object's monitor by calling one of the wait methods.
  // 
  // The awakened thread will not be able to proceed until the current thread 
  // relinquishes the lock on this object. The awakened thread will compete in the 
  // usual manner with any other threads that might be actively competing to synchronize 
  // on this object; for example, the awakened thread enjoys no reliable privilege or
  // disadvantage in being the next thread to lock this object.
  // 
  // This method should only be called by a thread that is the owner of this object's
  // monitor. A thread becomes the owner of the object's monitor in one of three ways:
  public final native void notify();
}

本节参考5 6

notify的注释来看,唤醒并不强制要求公平性,依赖于底层实现。

5.1 jdk层面

上面wait(0L)public final native void wait(long timeout) t实现,notify()则直接为native。

可以追踪对应native接口定义:jdk/src/share/native/java/lang/Object.c

1
2
3
4
5
6
7
static JNINativeMethod methods[] = {
    {"hashCode",    "()I",                    (void *)&JVM_IHashCode},
    {"wait",        "(J)V",                   (void *)&JVM_MonitorWait},
    {"notify",      "()V",                    (void *)&JVM_MonitorNotify},
    {"notifyAll",   "()V",                    (void *)&JVM_MonitorNotifyAll},
    {"clone",       "()Ljava/lang/Object;",   (void *)&JVM_Clone},
};

可见在jvm内部,两者分别由JVM_MonitorWaitJVM_MonitorNotify实现。

5.2 jvm层面

openjdk8: hotspot/src/share/vm/prims/jvm.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
JVM_ENTRY(void, JVM_MonitorWait(JNIEnv* env, jobject handle, jlong ms))
  JVMWrapper("JVM_MonitorWait");
  Handle obj(THREAD, JNIHandles::resolve_non_null(handle));
  JavaThreadInObjectWaitState jtiows(thread, ms != 0);
  if (JvmtiExport::should_post_monitor_wait()) {
    JvmtiExport::post_monitor_wait((JavaThread *)THREAD, (oop)obj(), ms);
  }
  ObjectSynchronizer::wait(obj, ms, CHECK);
JVM_END

JVM_ENTRY(void, JVM_MonitorNotify(JNIEnv* env, jobject handle))
  JVMWrapper("JVM_MonitorNotify");
  Handle obj(THREAD, JNIHandles::resolve_non_null(handle));
  ObjectSynchronizer::notify(obj, CHECK);
JVM_END

再进一步追踪hotspot/src/share/vm/runtime/objectMonitor.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
   ...
   Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
   AddWaiter (&node) ; // *A
   Thread::SpinRelease (&_WaitSetLock) ;
  
   if (millis <= 0) {
      Self->_ParkEvent->park () ; // *B
   } else {
      ret = Self->_ParkEvent->park (millis) ;
   }
   ...
}

void ObjectMonitor::notify(TRAPS) {
  ...
  ObjectWaiter * iterator = DequeueWaiter() ; // *C
  ...
  ParkEvent * ev = iterator->_event ;
  iterator->TState = ObjectWaiter::TS_RUN ;
  OrderAccess::fence() ;
  ev->unpark() ; // *D
  ...
}

wait中首先将当前线程加入等待队列尾部(A处),然后调用ParkEvent->park()方法(B处)。在notify方法中,首先从队列头部拿出等待线程(C处),然后调用unpark方法(D处),可见openjdk中notify实现具备公平性,遵守FIFO原则。

6.线程sleep和wait()的一个参考实现

锁等待状态需要主动放弃时间片时,底层需要调用常见方法sleep/wait/yield,起到释放的作用。下面参考 xv6 proc.c操作系统对上面三个方法的实现,以管窥linux系统。其核心为通过系统调用,进入内核态,然后通过上下文切换进入进程调度器,由调度器选择其它可执行线程执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// Give up the CPU for one scheduling round.
void yield(void)
{
  acquire(&ptable.lock);  //DOC: yieldlock
  myproc()->state = RUNNABLE;
  sched();
  release(&ptable.lock);
}

// Wait for a child process to exit and return its pid.
int wait(void)
{
  ...
  for(;;){
    for(p = ptable.proc; p < &ptable.proc[NPROC]; p++){
      if(p->state == ZOMBIE){
        // 回收子进程资源
        pid = p->pid;
        p->killed = 0;
        ...
        p->state = UNUSED;
        return pid;
      }
    }

    sleep(curproc, &ptable.lock);
  }
}

void sleep(void *chan, struct spinlock *lk)
{
  ...
  // Go to sleep.
  p->chan = chan;
  p->state = SLEEPING;

  sched();

  // Tidy up.
  p->chan = 0;
  ...
}

可见这三个方法的核心是sched()

1
2
3
4
5
6
7
8
9
10
11
// Enter scheduler
void sched(void)
{
  int intena;
  struct proc *p = myproc();
  ...
  intena = mycpu()->intena;
  // 切换当前进程上下文,调度器上下文,进入调度器
  swtch(&p->context, mycpu()->scheduler);
  mycpu()->intena = intena;
}

swtch.S是一段汇编代码,保存当前线程寄存器,并将寄存器内容设置为调度器上下文,以此完成切换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# Context switch
#
#   void swtch(struct context **old, struct context *new);
# 
# Save the current registers on the stack, creating
# a struct context, and save its address in *old.
# Switch stacks to new and pop previously-saved registers.

.globl swtch
swtch:
  movl 4(%esp), %eax
  movl 8(%esp), %edx

  # Save old callee-saved registers
  pushl %ebp
  pushl %ebx
  pushl %esi
  pushl %edi

  # Switch stacks
  movl %esp, (%eax)
  movl %edx, %esp

  # Load new callee-saved registers
  popl %edi
  popl %esi
  popl %ebx
  popl %ebp
  ret

7.参考