0.问题
关于java线程池实现原理,网上已经有很多清晰的好文章1 2 3,本文不再狗尾续貂。笔者对其中的几个实现点有些疑惑并尝试给出一些解释。主要问题如下:
序号 | 问题 |
---|---|
1. | Future是如何异步得到执行结果的? |
2. | ThreadPoolExecutor::Worker为什么要实现AQS,在执行过程中又是如何应对中断的? |
3. | Executors返回的不同线程池,其内部任务队列,核心线程及最大线程的设置及关系? |
4. | 实际应用中的线程池实现,如tomcat线程池,数据库线程池,redis线程池是怎样的? |
5. | BlockingQueue的不同实现? |
1.线程池存在意义及概要
从官方介绍及个人理解来说,线程池是为了解决两个问题:
- 避免频繁启动销毁线程的开销
- 对线程,任务队列等资源提供限制
- 对普通用户来说,就不用自己再去实现一个异步任务调度框架了
要看线程池实现,可以遍览一下以下源码,重点在ThreadPoolExecutor
:
| | | | ———————————————————— | ———————————————————— |
再借用一下调度原理图3:
注意:源码为jdk1.8,部分截图来自参考博文
2.Future是如何做到异步获取执行结果的?
1
2
3
4
5
6
7
8
9
10
11
ExecutorService service = Executors.newSingleThreadExecutor();
Future<?> future = service.submit((Callable<Object>) () -> {
int i = 0, sum = 0;
while (i++ < 100000) {
sum += i;
}
return sum;
});
try {
System.out.println(future.get());
} catch (Exception e) {}
相信大家对上面这段代码不会陌生,提交任务之后,可以异步获取执行结果,是怎么做到的呢?
首先看java/util/concurrent/AbstractExecutorService.java::submit干了什么
1
2
3
4
5
6
7
8
9
10
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
可见submit
生成了 FutureTask
,提交线程池执行并将任务本身返回
那么重点就是 FutureTask
到底是啥?再回溯一下这张图:
有必要简单列一下Future
,因为Runnable
我们太熟悉了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public interface Future<V> {
/**
* Attempts to cancel execution of this task.
* ...
*/
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
*/
V get() throws InterruptedException, ExecutionException;
/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
再啰嗦一句,submit提交执行和返回的是同一个对象。我们重点看FutureTask::get()
实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
// *A WaitNode() { thread = Thread.currentThread(); }
q = new WaitNode();
else if (!queued)
// *B
// q.next = waiters, 相当于通过CAS将当前节点(含当前线程)设置为头节点
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
// *C
LockSupport.park(this);
}
}
忽略细节,看核心代码,get
可以阻塞,就在于LockSupport.park(this)
(C处),原理可以参考笔者java同步相关博文。LockSupport
支持jdk锁进入等待,此处并不需要加锁互斥,而仅需等待。因为可能有多个线程进行get操作,那么此处还构造了一个等待队列。既然有等待,那么必然有通知或者唤醒,在什么地方呢?应该在任务执行完成时,那么就应该在FutureTask::run
内部实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public void run() {
...
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
}
}
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v; // *A
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
/**
* Removes and signals all waiting threads, invokes done(), and
* nulls out callable.
*/
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t); // *B
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
看A和B两处,完成了设置结果并通过LockSupport.unpark(t)
唤醒所有调用了get方法被阻塞的线程。
到这里应该大体讲清了异步获取执行结果的原理和实现。
2.线程池中的Worker线程为何要实现AQS及其对中断的应对
2.1 继承AQS
实际上早期版本继承的是ReentrantLock
,但存在潜在问题4,所以后续才改用AQS实现
2.2 为啥要引入锁功能呢?
根据注释,除非线程池被停止,否则Worker线程不应该被中断。Worker线程作为内部对象,不可能从外部被中断,大体有以下几种可能:
- a. 内部任务执行错误,直接报错并导致当前Worker线程退出
- b. setCorePoolSize 和 setMaximumPoolSize被调用
- 尝试中断空闲Worker
- c. ThreadPoolExecutor::shutdown()被调用
- 拒绝新任务
- 仍然执行已提交和正在执行任务
- 尝试中断空闲Worker
- d. ThreadPoolExecutor::shutdownNow()被调用
- 拒绝新任务
- 清除所有排队任务
- 中断所有Worker,包括空闲和正在执行任务
2.2.1 Worker 核心循环实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
Worker(Runnable firstTask) {
// *A
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// *B
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
// *C
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// *D
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
先看AB两处注释(理解还需要看shutdown和shutdownNow的解读,提前了),初始化时锁状态被设置为-1则其它人不可能加锁成功,直到B处unlock,那么后续调用才有可能加锁成功,从而通过shutdown/Now调用中断方法。所以这个地方注释为: allow interrupts有点费解。
1
2
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
再看C处,当任务进入执行时,是不允许中断的,所以执行前先加锁保护。
再看D处,如果ThreadPoolExecutor没有被停止,那么即便当前Worker接收到了interrupt,那么也会被忽略,大家可以仔细分析一下判断条件。仅当线程池停止,并且Worker收到中断时,自行调用interrupt方法,退出当前循环,此时已取出的任务还没被执行,不影响任务一致性。
2.2.2 shutdown实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) { // *E
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
从E处可以看出,仅当当前线程没有执行任务时,尝试加锁成功后,才会中断此Worker线程。如果加锁失败,则表明Worker正在执行任务。
2.2.3 shutdownNow实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { // *F
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
从F处来看,只要锁状态>=0,也就意味着不论当前Worker有没有加锁,有没有任务执行,都会去尝试中断。对于有任务的Worker来说:
- 拿到任务,加锁成功后,尚未执行,那么根据前面D处代码,就会自行中断退出
- 如果已经进入任务执行,那么下次循环拿任务时,getTask方法会判断线程池已经终止,返回null,从而退出循环进而退出线程
2.3 再认识Thread.Interrupt
1
2
3
4
5
6
7
8
9
10
11
12
13
Thread abnormal = new Thread(() -> {
System.out.println("abnormal thread start");
int i = 0;
while (true) { // *A
i++;
}
}, "abnormal_thread");
abnormal.start();
TimeUnit.MILLISECONDS.sleep(100);
abnormal.interrupt(); // *B
System.out.println(abnormal.isInterrupted());
abnormal.join();
可以测试上述代码,尽管B处调用了interrupt,但是abnormal_thread并不会退出。实际上interrupt只是软性标记,还需要线程配合才能及时退出。并不像早期的stop方法那么暴力,可以直接导致线程退出。如果将A处改为!Thread.interrupted()
则可以配合退出。
2.4 对另一处注释的疑惑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Class Worker mainly maintains interrupt control state for
* threads running tasks, along with other minor bookkeeping.
* This class opportunistically extends AbstractQueuedSynchronizer
* to simplify acquiring and releasing a lock surrounding each
* task execution. This protects against interrupts that are
* intended to wake up a worker thread waiting for a task from
* instead interrupting a task being run. We implement a simple
* non-reentrant mutual exclusion lock rather than use
* ReentrantLock because we do not want worker tasks to be able to
* reacquire the lock when they invoke pool control methods like
* setCorePoolSize. Additionally, to suppress interrupts until
* the thread actually starts running tasks, we initialize lock
* state to a negative value, and clear it upon start (in
* runWorker).
*/
private final class Worker ...
This protects against interrupts that are intended to wake up a worker thread waiting for a task from instead interrupting a task being run
这句话让我费了不少思量。其实就是比如shutdow的时候,通过锁机制,只能去中断等待任务的空Worker线程,而不去尝试中断执行中的Worker线程。而Worker的所谓空闲与否,就是靠能否加锁来判断的。
2.5 Worker不用锁行吗?
其中4给出了一个场景的潜在bug,必须要继承AQS且不可重入,取代了之前的ReentrantLock5。
从目前的代码逻辑来看,似乎用一个变量来控制也ok?。如果仅从互斥的角度和目前的实现来看,似乎是可以的。但是从开发便利角度,Worker直接继承使用了AQS。
3.Executors中不同线程池实现
线程池 | 关键参数:corePoolSize, maximumPoolSize, keepAliveTime, workQueue | |
---|---|---|
newSingleThreadExecutor | ThreadPoolExecutor(1, 1, 0L, MILLISECONDS, LinkedBlockingQueue<>()) | |
newFixedThreadPool | ThreadPoolExecutor(nThreads, nThreads, 0L, MILLISECONDS, LinkedBlockingQueue<>()) | |
newCachedThreadPool | ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, SECONDS, SynchronousQueue<>()) | |
newScheduledThreadPool | ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, DelayedWorkQueue()) |
Queuing6中对上述参数及其关系有详细讨论。也部分回答了为什么Executors
不在默认实现时,对线程和队列上限进行限制
4. 一些常用线程池的实现
- tomcat: StandardThreadExecutor
- apache: druid
- redis: lettuce
4.1 tomcat
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* max number of threads
*/
protected int maxThreads = 200;
/**
* min number of threads
*/
protected int minSpareThreads = 25;
/**
* idle time in milliseconds
*/
protected int maxIdleTime = 60000;
/**
* The maximum number of elements that can queue up before we reject them
*/
protected int maxQueueSize = Integer.MAX_VALUE;
protected void startInternal() throws LifecycleException {
taskqueue = new TaskQueue(maxQueueSize);
TaskThreadFactory tf = new TaskThreadFactory(namePrefix, daemon, getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS, taskqueue, tf);
executor.setThreadRenewalDelay(threadRenewalDelay);
taskqueue.setParent(executor);
setState(LifecycleState.STARTING);
}
// 一个真实工程配置
server:
tomcat:
accept-count: 1000
threads:
max: 500
min-spare: 50
可见tomcat线程池直接使用了jdk中的ThreadPoolExecutor,而且默认对线程数做了限制
4.2 连接池
- apache: druid
- redis: lettuce
这二者算是连接池的典型代表,与线程池有异曲同工的地方,待后续研究
5. BlockingQueue实现
待后续研究