前言
在Java中实现异步任务的处理,我们通常会使用Executor框架,而它的子类ThreadPoolExecutor
则提供了线程池的实现,避免了线程频繁创建与销毁所带来的性能开销,为线程做了统一的管理与监控。因此,本文将从Executor
接口开始逐层向下分析,重点关注ThreadPoolExecutor
,这也是我们平时使用最多的一个类,深入理解它的原理还是很重要的。
继承体系
Executor
接口位于整个体系的最顶层,它只包含一个execute()
方法;ExecutorService
也是接口,它在Executor
接口的基础上添加了很多的接口方法,所以一般我们会使用这个接口;AbstractExecutorService
是抽象类,实现了部分的方法,而把其它一些核心方法交给了子类去实现;ThreadPoolExecutor
是最核心的一个类,它真正的实现了线程池的相关功能,是重点需要分析的一个类;ScheduledExecutorService
是定时任务相关的接口,本文不会去分析该类。
除此之外,该体系还涉及一个Executors
工具类,它提供了很多创建线程池的静态方法,为我们省去了创建线程池时需要关心的参数细节。
Executor接口
1 | public interface Executor { |
Executor
接口非常简单,只有一个execute()
方法,用来提交一个任务去执行。注意参数是Runnable
类型的,表示一个任务。
ExecutorService
由于Executor
接口只有提交任务的功能,我们更多使用的是ExecutorService
,它定义的方法比较丰富,大部分情况下已经能满足我们的需求了。例如 :1
2ExecutorService executor = Executors.newFixedThreadPool(args...);
ExecutorService executor = Executors.newCachedThreadPool(args...);
下面看看它的源码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29public interface ExecutorService extends Executor {
// 关闭线程池:停止接受外部提交的新任务,而等到正在执行的任务以及队列中等待的任务执行完才真正关闭
void shutdown();
// 关闭线程池:停止接受外部提交的新任务,忽略队列里等待的任务,尝试将正在跑的任务中断,然后返回未执行的任务列表
List<Runnable> shutdownNow();
// 线程池是否关闭
boolean isShutdown();
// 如果调用了 shutdown() 或 shutdownNow() 方法后,所有任务结束了,那么返回 true
// 这个方法必须在调用 shutdown() 或 shutdownNow() 方法之后调用才会返回 true
boolean isTerminated();
// 关闭线程池后等待所有任务完成,并设置超时时间:调用 shutdown() 或 shutdownNow() 方法后,调用该方法阻塞直到所有任务执行完毕或发生了超时,返回 false 表示发生了超时
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// 提交一个 Callable 任务,返回一个 Future
<T> Future<T> submit(Callable<T> task);
// 提交一个 Runnable 任务,返回一个 Future,第二个参数会放到 Future 中作为返回值
<T> Future<T> submit(Runnable task, T result);
// 提交一个 Runnable 任务
Future<?> submit(Runnable task);
// ...
}
可以看出,这个接口提供了大部分我们需要的功能,一些不太常用的如invokeAll()
、invokeAny()
等上面将其省略了,不进行分析。
AbstractExecutorService
AbstractExecutorService
抽象类实现了几个实用的方法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
// 将 Runnable 包装成 FutureTask,内部其实会通过 Executors#callable 方法将这个 Runnable 转换成 Callable
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
// 提交任务,和 execute() 不同的是这个会返回一个 Future
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null); // 将 Runnable 包装成 FutureTask
execute(ftask); // 执行这个任务,execute() 方法交由子类实现,FutureTask 间接实现了 Runnable 接口
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result); // 将 Runnable 包装成 FutureTask
execute(ftask); // 执行这个任务,execute() 方法交由子类实现,FutureTask 间接实现了 Runnable 接口
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task); // 将Callable 包装成 FutureTask
execute(ftask); // 执行这个任务,execute() 方法交由子类实现,FutureTask 间接实现了 Runnable 接口
return ftask;
}
// ...
}
这个抽象类封装了一些基本的方法如submit()
,但是都没有真正开启线程来执行任务,它们都只是在方法内部调用了execute()
方法,而将该方法交由子类去实现。
ThreadPoolExecutor
ThreadPoolExecutor
是JDK中的线程池实现,这个类实现了一个线程池需要的各个方法,比如任务提交、线程管理、监控等等。关于这个方法内容比较多,因此将会拆开来分析。
核心参数
我们先看看该类的构造函数:
1 | public ThreadPoolExecutor(int corePoolSize, |
核心参数的作用分别如下:
corePoolSize
:核心线程数,当线程数小于该值时,线程池会创建新线程来执行新任务maximumPoolSize
:线程池允许创建的最大线程数keepAliveTime
:空闲线程的存活时间,但要注意这个值不会对所有线程起作用,如果线程池中的线程数少于等于核心线程数corePoolSize
,那么这些线程不会因为空闲太长时间而被关闭,但也可以通过调用allowCoreThreadTimeOut(true)
使核心线程数内的线程也可以被回收。workQueue
:任务队列,用来存储未执行的任务,是BlockingQueue
接口的某个实现threadFactory
:线程工厂,可通过工厂为新建的线程设置更有意义的名字handler
:拒绝策略,当线程池和任务队列均处于饱和状态时该使用的处理方式,默认为抛出异常
线程池状态
ThreadPoolExecutor
采用一个32位的整数来存放线程池的状态和当前池中的线程数,其中高3位用于存放线程池状态,低29位表示线程数。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21/** 状态控制变量,该变量用于表示线程池的状态和线程数 */
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/** 该方法用于组合线程池的状态和线程数,通过按位或的方式 */
private static int ctlOf(int rs, int wc) { return rs | wc; }
/** 就是29 */
private static final int COUNT_BITS = Integer.SIZE - 3;
/** 线程池的最大线程数,也就是(2^29-1):000 11111111111111111111111111111 */
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池的状态存放在高3位中
private static final int RUNNING = -1 << COUNT_BITS; // 111 0000...
private static final int SHUTDOWN = 0 << COUNT_BITS; // 000 0000...,对应 shutdown()
private static final int STOP = 1 << COUNT_BITS; // 001 0000...,对应 shutdownNow()
private static final int TIDYING = 2 << COUNT_BITS; // 010 0000...
private static final int TERMINATED = 3 << COUNT_BITS; // 011 0000...
// 获取线程池的运行状态,~运算符i会将0、1取反
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取线程池中的线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
这里面的状态还是比较清晰的,并且状态值是顺序递增的。RUNNING
表示线程池的初始状态,而SHUTDOWN
和STOP
分别是调用了shutdown()
和shutdownNow()
方法后进入的状态,其中在tryTerminate()
方法中转换成TIDYING
状态,表示在SHUTDOWN / STOP
后任务队列和线程池都清空了,此时执行钩子方法terminated()
,而当terminated()
方法结束后,线程池的状态就会变为TERMINATED
。
Worker
Worker
是ThreadPoolExecutor
的内部类,用于封装线程池中的工作线程,也就是用来执行任务的,而任务是Runnable
(内部变量名叫task
或command
)。要注意的是,该类继承了AQS,用于实现一个简单的互斥锁。
1 | private final class Worker extends AbstractQueuedSynchronizer implements Runnable { |
runWorker
Worker
实现了Runnable
接口,并将run()
方法的实现委托给了外部类ThreadPoolExecutor
的runWorker()
方法,这个方法就是不断的从任务队列中拿取任务运行:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null; // help GC
w.unlock(); // allow interrupts
boolean completedAbruptly = true; // 用于标记完成任务时是否有异常
try {
// 循环:初始任务(首次)或者从阻塞队列中拿一个(后续)
// 这也体现了线程池的意义,工作线程在执行完一个任务后,会再次到任务队列中获取新的任务,实现了”线程复用“
while (task != null || (task = getTask()) != null) {
// 获取互斥锁,在获取互斥锁时,调用 shutdown() 方法不会中断线程,但是 shutdownNow() 方法无视互斥锁,会中断所有线程
w.lock();
// 判断是否需要中断当前线程。如果线程池的状态 >= STOP ,当前线程未中断,则中断当前线程,否则清除线程中断位
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 交由子类实现的前置处理钩子
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 真正的执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 交由子类实现的后置处理钩子
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++; // 该 Worker 完成的任务数加一
w.unlock();
}
}
// while 循环之外
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly); // 处理工作线程退出
}
}
getTask
在runWorker()
方法中,会尝试通过阻塞队列获取任务来执行,而这个获取任务的逻辑则封装到了getTask()
这个核心方法中。在以下几种情况会返回null
从而接下来线程退出(runWorker()
方法中的循环结束):
- 当前工作线程数超过了
maximumPoolSize
(由于maximumPoolSize
可以动态调整,这是可能的) - 线程池状态为
STOP
(因为STOP
状态不处理阻塞队列中的任务了) - 线程池状态为
SHUTDOWN
,但阻塞队列为空 - 线程数量大于
corePoolSize
或allowCoreThreadTimeOut
设置为true
,当线程空闲时间超过keepAliveTime
(这里说的空闲时间其实就是poll()
方法阻塞在队列上的时间)
1 | private Runnable getTask() { |
执行任务
execute
有了上面的一些概念后,接下来我们看看最核心的execute()
方法,它包含了提交任务时的几大过程:
1 | public void execute(Runnable command) { |
这里先对上面的三大步骤做个抽象层面的梳理:
- 如果当前运行的线程数少于
corePoolSize
,则创建新线程来执行任务 - 如果运行的线程数大于或等于
corePoolSize
,则将任务加入阻塞队列 - 如果阻塞队列也满了,则以
maximumPoolSize
为界创建新线程,如果线程数比maximumPoolSize
还大,则执行拒绝策略
addWorker
下面开始更细的去分析上述三大流程中涉及的一些方法,首先是addWorker()
:
1 | /** |
这里笔者刚开始很疑惑的一点是,为什么t.start();
会执行到Worker
中的run()
方法,它不是Worker
中的属性吗,它自己本身并没有传入一个Runnable
吧。但实际上,在通过线程工厂创建这个线程的时候,是传入了一个Runnable
的,它就是Worker
本身:1
2
3
4
5Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // 注意这里的 this
}
因此,当我们使用t.start()
开启这个Thread
的时候,这个Thread
中的target
是为Worker
本身的,所以才会执行它的run()
方法:1
2
3
4
5
6
7// Thread.java
public void run() {
if (target != null) {
target.run();
}
}
此时就与之前分析Worker
时候的方法串起来了,Worker
的run()
方法的执行逻辑其实是委托给外部类的runWorker()
方法来完成,而runWorker()
方法最终调用的就是传入的firstTask
或者从阻塞队列中取到的某个任务,执行它的run()
方法。
addWorkerFailed
线程成功启动后的逻辑已经分析完了,接下来看看线程如果启动失败时会发生什么:
1 | private void addWorkerFailed(Worker w) { |
其实就是回滚后尝试终止线程池:
- 从
workers
中删除失败的Worker workerCount
减一- 调用
tryTerminate()
尝试终止线程池
线程池的关闭
shutdown
shutdown()
方法关闭线程池比较优雅,线程池进入SHUTDOWN
后不会再接受新任务,并且中断所有空闲线程(阻塞在队列上的线程),但是任务队列中已有的任务将会继续处理。
1 | public void shutdown() { |
shutdownNow
shutdownNow()
方法关闭线程池相比shutdown()
暴力了一些,会中断所有线程,哪怕线程正在执行任务。线程池进入STOP
状态后既不会接受新任务,也不会处理任务队列中已有的任务。需要注意的是,即便shutdownNow()
会中断正在执行任务的线程,但不代表任务一定会挂,因为如果提交的任务里面的代码没有对线程中断敏感的逻辑的话,线程中断是不会有任何效果的。
1 | public List<Runnable> shutdownNow() { |
拒绝策略
在execute()
方法中我们可以看到,有两种情况会调用reject()
拒绝策略来处理任务,一个是当任务加入阻塞队列后的短暂空窗期线程池已经关闭了,此时再次查看线程池的状态不为RUNNING
就会将任务移出队列并执行拒绝策略,另一个是当线程数超过了maximumPoolSize
,无法再创建新线程了。
1 | private volatile RejectedExecutionHandler handler; |
RejectedExecutionHandler
在ThreadPoolExecutor
中有四个已经定义好的实现类可供我们直接使用,当然,我们也可以实现自己的策略,不过一般也没有必要。
1 | // 如果线程池没有被关闭,那么由提交任务的线程自己来执行这个任务 |
Executors
Executors
是一个工具类,所有的方法都是static
的,它为我们创建线程池提供了很大的便利。
newFixedThreadPool
生成一个固定大小的线程池,最大线程数设置为与核心线程数相等,此时keepAliveTime
设置为0(因为这里它是没用的,即使不为0,线程池默认也不会回收corePoolSize
内的线程),阻塞队列采用LinkedBlockingQueue
无界队列。
1 | public static ExecutorService newFixedThreadPool(int nThreads) { |
newSingleThreadExecutor
生成只有一个线程的线程池,与newFixedThreadPool
唯一的不同在于核心线程和最大线程数都为1,不需要指定。
1 | public static ExecutorService newSingleThreadExecutor() { |
newCachedThreadPool
生成一个需要的时候就创建新线程的线程池。这种线程池对于任务可以比较快速地完成的情况下有比较好的性能,如果线程空闲了60秒都没有任务,那么将关闭此线程并从线程池中移除。所以如果线程池空闲了很长时间也不会有问题,因为随着所有线程的关闭,整个线程池不会占用任何的系统资源。
1 | public static ExecutorService newCachedThreadPool() { |
总结
本文从Executor
顶层接口逐层向下分析,重点讲解了ThreadPoolExecutor
的源码实现,包括核心参数、线程创建过程、执行任务、拒绝策略和线程池的关闭等,由于Executor体系本身内容还是比较多的,因此有些地方依然没有关注到,例如定时相关的ScheduledExecutorService
接口和同时实现了ThreadPoolExecutor
与ScheduledExecutorService
的ScheduledThreadPoolExecutor
,并且关于ThreadPoolExecutor
的线程池关闭这一块,也还有几个方法没有深入分析,将来有时间一定补上。