关于java线程池ThreadPoolExecutor实现中参数设计,锁和中断的疑问

Posted by My Blog on April 11, 2024

0.问题

关于java线程池实现原理,网上已经有很多清晰的好文章1 2 3,本文不再狗尾续貂。笔者对其中的几个实现点有些疑惑并尝试给出一些解释。主要问题如下:

序号 问题
1. Future是如何异步得到执行结果的?
2. ThreadPoolExecutor::Worker为什么要实现AQS,在执行过程中又是如何应对中断的?
3. Executors返回的不同线程池,其内部任务队列,核心线程及最大线程的设置及关系?
4. 实际应用中的线程池实现,如tomcat线程池,数据库线程池,redis线程池是怎样的?
5. BlockingQueue的不同实现?

1.线程池存在意义及概要

从官方介绍及个人理解来说,线程池是为了解决两个问题:

  • 避免频繁启动销毁线程的开销
  • 对线程,任务队列等资源提供限制
  • 对普通用户来说,就不用自己再去实现一个异步任务调度框架了

要看线程池实现,可以遍览一下以下源码,重点在ThreadPoolExecutor:

| image | image | | ———————————————————— | ———————————————————— |

再借用一下调度原理图3

image

注意:源码为jdk1.8,部分截图来自参考博文

2.Future是如何做到异步获取执行结果的?

1
2
3
4
5
6
7
8
9
10
11
ExecutorService service = Executors.newSingleThreadExecutor();
Future<?> future = service.submit((Callable<Object>) () -> {
    int i = 0, sum = 0;
    while (i++ < 100000) {
           sum += i;
          }
      return sum;
    });
try {
    System.out.println(future.get());
} catch (Exception e) {}

相信大家对上面这段代码不会陌生,提交任务之后,可以异步获取执行结果,是怎么做到的呢?

首先看java/util/concurrent/AbstractExecutorService.java::submit干了什么

1
2
3
4
5
6
7
8
9
10
public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

可见submit生成了 FutureTask,提交线程池执行并将任务本身返回

那么重点就是 FutureTask到底是啥?再回溯一下这张图:

image

有必要简单列一下Future,因为Runnable我们太熟悉了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public interface Future<V> {

    /**
     * Attempts to cancel execution of this task. 
     * ...
     */
    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    /**
     * Waits if necessary for the computation to complete, and then
     * retrieves its result.
     *
     * @return the computed result
     */
    V get() throws InterruptedException, ExecutionException;

    /**
     * Waits if necessary for at most the given time for the computation
     * to complete, and then retrieves its result, if available.
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

再啰嗦一句,submit提交执行和返回的是同一个对象。我们重点看FutureTask::get()实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                // *A WaitNode() { thread = Thread.currentThread(); }
                q = new WaitNode(); 
            else if (!queued)
              	// *B
                // q.next = waiters, 相当于通过CAS将当前节点(含当前线程)设置为头节点
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                // *C
                LockSupport.park(this);
        }
    }

忽略细节,看核心代码,get可以阻塞,就在于LockSupport.park(this) (C处),原理可以参考笔者java同步相关博文。LockSupport支持jdk锁进入等待,此处并不需要加锁互斥,而仅需等待。因为可能有多个线程进行get操作,那么此处还构造了一个等待队列。既然有等待,那么必然有通知或者唤醒,在什么地方呢?应该在任务执行完成时,那么就应该在FutureTask::run内部实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public void run() {
  			...
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } 
    }

protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v; // *A
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

/**
     * Removes and signals all waiting threads, invokes done(), and
     * nulls out callable.
     */
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t); // *B
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

AB两处,完成了设置结果并通过LockSupport.unpark(t)唤醒所有调用了get方法被阻塞的线程。

到这里应该大体讲清了异步获取执行结果的原理和实现。

2.线程池中的Worker线程为何要实现AQS及其对中断的应对

2.1 继承AQS

实际上早期版本继承的是ReentrantLock,但存在潜在问题4,所以后续才改用AQS实现

2.2 为啥要引入锁功能呢?

根据注释,除非线程池被停止,否则Worker线程不应该被中断。Worker线程作为内部对象,不可能从外部被中断,大体有以下几种可能:

  • a. 内部任务执行错误,直接报错并导致当前Worker线程退出
  • b. setCorePoolSize 和 setMaximumPoolSize被调用
    • 尝试中断空闲Worker
  • c. ThreadPoolExecutor::shutdown()被调用
    • 拒绝新任务
    • 仍然执行已提交和正在执行任务
    • 尝试中断空闲Worker
  • d. ThreadPoolExecutor::shutdownNow()被调用
    • 拒绝新任务
    • 清除所有排队任务
    • 中断所有Worker,包括空闲和正在执行任务

2.2.1 Worker 核心循环实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
Worker(Runnable firstTask) {
            // *A
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // *B
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                // *C
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                // *D
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

先看AB两处注释(理解还需要看shutdown和shutdownNow的解读,提前了),初始化时锁状态被设置为-1则其它人不可能加锁成功,直到B处unlock,那么后续调用才有可能加锁成功,从而通过shutdown/Now调用中断方法。所以这个地方注释为: allow interrupts有点费解。

1
2
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.

再看C处,当任务进入执行时,是不允许中断的,所以执行前先加锁保护。

再看D处,如果ThreadPoolExecutor没有被停止,那么即便当前Worker接收到了interrupt,那么也会被忽略,大家可以仔细分析一下判断条件。仅当线程池停止,并且Worker收到中断时,自行调用interrupt方法,退出当前循环,此时已取出的任务还没被执行,不影响任务一致性。

2.2.2 shutdown实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) { // *E
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

从E处可以看出,仅当当前线程没有执行任务时,尝试加锁成功后,才会中断此Worker线程。如果加锁失败,则表明Worker正在执行任务。

2.2.3 shutdownNow实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { // *F
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }

从F处来看,只要锁状态>=0,也就意味着不论当前Worker有没有加锁,有没有任务执行,都会去尝试中断。对于有任务的Worker来说:

  • 拿到任务,加锁成功后,尚未执行,那么根据前面D处代码,就会自行中断退出
  • 如果已经进入任务执行,那么下次循环拿任务时,getTask方法会判断线程池已经终止,返回null,从而退出循环进而退出线程

2.3 再认识Thread.Interrupt

1
2
3
4
5
6
7
8
9
10
11
12
13
Thread abnormal = new Thread(() -> {
            System.out.println("abnormal thread start");
            int i = 0;
            while (true) { // *A
                i++;
            }
        }, "abnormal_thread");

        abnormal.start();
        TimeUnit.MILLISECONDS.sleep(100);
        abnormal.interrupt(); // *B
        System.out.println(abnormal.isInterrupted());
        abnormal.join();

可以测试上述代码,尽管B处调用了interrupt,但是abnormal_thread并不会退出。实际上interrupt只是软性标记,还需要线程配合才能及时退出。并不像早期的stop方法那么暴力,可以直接导致线程退出。如果将A处改为!Thread.interrupted()则可以配合退出。

2.4 对另一处注释的疑惑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
 * Class Worker mainly maintains interrupt control state for
 * threads running tasks, along with other minor bookkeeping.
 * This class opportunistically extends AbstractQueuedSynchronizer
 * to simplify acquiring and releasing a lock surrounding each
 * task execution.  This protects against interrupts that are
 * intended to wake up a worker thread waiting for a task from
 * instead interrupting a task being run.  We implement a simple
 * non-reentrant mutual exclusion lock rather than use
 * ReentrantLock because we do not want worker tasks to be able to
 * reacquire the lock when they invoke pool control methods like
 * setCorePoolSize.  Additionally, to suppress interrupts until
 * the thread actually starts running tasks, we initialize lock
 * state to a negative value, and clear it upon start (in
 * runWorker).
 */
 private final class Worker ...

This protects against interrupts that are intended to wake up a worker thread waiting for a task from instead interrupting a task being run

这句话让我费了不少思量。其实就是比如shutdow的时候,通过锁机制,只能去中断等待任务的空Worker线程,而不去尝试中断执行中的Worker线程。而Worker的所谓空闲与否,就是靠能否加锁来判断的。

2.5 Worker不用锁行吗?

其中4给出了一个场景的潜在bug,必须要继承AQS且不可重入,取代了之前的ReentrantLock5

 从目前的代码逻辑来看,似乎用一个变量来控制也ok? 

。如果仅从互斥的角度和目前的实现来看,似乎是可以的。但是从开发便利角度,Worker直接继承使用了AQS。

3.Executors中不同线程池实现

线程池 关键参数:corePoolSize, maximumPoolSize, keepAliveTime, workQueue  
newSingleThreadExecutor ThreadPoolExecutor(1, 1, 0L, MILLISECONDS, LinkedBlockingQueue<>())  
newFixedThreadPool ThreadPoolExecutor(nThreads, nThreads, 0L, MILLISECONDS, LinkedBlockingQueue<>())  
newCachedThreadPool ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, SECONDS, SynchronousQueue<>())  
newScheduledThreadPool ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, DelayedWorkQueue())  

Queuing6中对上述参数及其关系有详细讨论。也部分回答了为什么Executors不在默认实现时,对线程和队列上限进行限制

4. 一些常用线程池的实现

  • tomcat: StandardThreadExecutor
  • apache: druid
  • redis: lettuce

4.1 tomcat

StandardThreadExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
    /**
     * max number of threads
     */
    protected int maxThreads = 200;

    /**
     * min number of threads
     */
    protected int minSpareThreads = 25;

    /**
     * idle time in milliseconds
     */
    protected int maxIdleTime = 60000;

    /**
     * The maximum number of elements that can queue up before we reject them
     */
    protected int maxQueueSize = Integer.MAX_VALUE;

protected void startInternal() throws LifecycleException {
        taskqueue = new TaskQueue(maxQueueSize);
        TaskThreadFactory tf = new TaskThreadFactory(namePrefix, daemon, getThreadPriority());
        executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS, taskqueue, tf);
        executor.setThreadRenewalDelay(threadRenewalDelay);
        taskqueue.setParent(executor);
        setState(LifecycleState.STARTING);
    }

// 一个真实工程配置
server:
  tomcat:
    accept-count: 1000
    threads:
      max: 500
      min-spare: 50

可见tomcat线程池直接使用了jdk中的ThreadPoolExecutor,而且默认对线程数做了限制

4.2 连接池

  • apache: druid
  • redis: lettuce

这二者算是连接池的典型代表,与线程池有异曲同工的地方,待后续研究

5. BlockingQueue实现

待后续研究

6. 参考