前言

在Java中实现异步任务的处理,我们通常会使用Executor框架,而它的子类ThreadPoolExecutor则提供了线程池的实现,避免了线程频繁创建与销毁所带来的性能开销,为线程做了统一的管理与监控。因此,本文将从Executor接口开始逐层向下分析,重点关注ThreadPoolExecutor,这也是我们平时使用最多的一个类,深入理解它的原理还是很重要的。

继承体系

Executor接口位于整个体系的最顶层,它只包含一个execute()方法;ExecutorService也是接口,它在Executor接口的基础上添加了很多的接口方法,所以一般我们会使用这个接口;AbstractExecutorService是抽象类,实现了部分的方法,而把其它一些核心方法交给了子类去实现;ThreadPoolExecutor是最核心的一个类,它真正的实现了线程池的相关功能,是重点需要分析的一个类;ScheduledExecutorService是定时任务相关的接口,本文不会去分析该类。

除此之外,该体系还涉及一个Executors工具类,它提供了很多创建线程池的静态方法,为我们省去了创建线程池时需要关心的参数细节。

Executor接口

1
2
3
public interface Executor {
void execute(Runnable command);
}

Executor接口非常简单,只有一个execute()方法,用来提交一个任务去执行。注意参数是Runnable类型的,表示一个任务。

ExecutorService

由于Executor接口只有提交任务的功能,我们更多使用的是ExecutorService,它定义的方法比较丰富,大部分情况下已经能满足我们的需求了。例如 :

1
2
ExecutorService executor = Executors.newFixedThreadPool(args...);
ExecutorService executor = Executors.newCachedThreadPool(args...);

下面看看它的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public interface ExecutorService extends Executor {
// 关闭线程池:停止接受外部提交的新任务,而等到正在执行的任务以及队列中等待的任务执行完才真正关闭
void shutdown();

// 关闭线程池:停止接受外部提交的新任务,忽略队列里等待的任务,尝试将正在跑的任务中断,然后返回未执行的任务列表
List<Runnable> shutdownNow();

// 线程池是否关闭
boolean isShutdown();

// 如果调用了 shutdown() 或 shutdownNow() 方法后,所有任务结束了,那么返回 true
// 这个方法必须在调用 shutdown() 或 shutdownNow() 方法之后调用才会返回 true
boolean isTerminated();

// 关闭线程池后等待所有任务完成,并设置超时时间:调用 shutdown() 或 shutdownNow() 方法后,调用该方法阻塞直到所有任务执行完毕或发生了超时,返回 false 表示发生了超时
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;

// 提交一个 Callable 任务,返回一个 Future
<T> Future<T> submit(Callable<T> task);

// 提交一个 Runnable 任务,返回一个 Future,第二个参数会放到 Future 中作为返回值
<T> Future<T> submit(Runnable task, T result);

// 提交一个 Runnable 任务
Future<?> submit(Runnable task);

// ...
}

可以看出,这个接口提供了大部分我们需要的功能,一些不太常用的如invokeAll()invokeAny()等上面将其省略了,不进行分析。

AbstractExecutorService

AbstractExecutorService抽象类实现了几个实用的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public abstract class AbstractExecutorService implements ExecutorService {

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
// 将 Runnable 包装成 FutureTask,内部其实会通过 Executors#callable 方法将这个 Runnable 转换成 Callable
return new FutureTask<T>(runnable, value);
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

// 提交任务,和 execute() 不同的是这个会返回一个 Future
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null); // 将 Runnable 包装成 FutureTask
execute(ftask); // 执行这个任务,execute() 方法交由子类实现,FutureTask 间接实现了 Runnable 接口
return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result); // 将 Runnable 包装成 FutureTask
execute(ftask); // 执行这个任务,execute() 方法交由子类实现,FutureTask 间接实现了 Runnable 接口
return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task); // 将Callable 包装成 FutureTask
execute(ftask); // 执行这个任务,execute() 方法交由子类实现,FutureTask 间接实现了 Runnable 接口
return ftask;
}

// ...
}

这个抽象类封装了一些基本的方法如submit(),但是都没有真正开启线程来执行任务,它们都只是在方法内部调用了execute()方法,而将该方法交由子类去实现。

ThreadPoolExecutor

ThreadPoolExecutor是JDK中的线程池实现,这个类实现了一个线程池需要的各个方法,比如任务提交、线程管理、监控等等。关于这个方法内容比较多,因此将会拆开来分析。

核心参数

我们先看看该类的构造函数:

1
2
3
4
5
6
7
8
9
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// ...
}

核心参数的作用分别如下:

  • corePoolSize:核心线程数,当线程数小于该值时,线程池会创建新线程来执行新任务
  • maximumPoolSize:线程池允许创建的最大线程数
  • keepAliveTime:空闲线程的存活时间,但要注意这个值不会对所有线程起作用,如果线程池中的线程数少于等于核心线程数corePoolSize,那么这些线程不会因为空闲太长时间而被关闭,但也可以通过调用allowCoreThreadTimeOut(true)使核心线程数内的线程也可以被回收。
  • workQueue:任务队列,用来存储未执行的任务,是BlockingQueue接口的某个实现
  • threadFactory:线程工厂,可通过工厂为新建的线程设置更有意义的名字
  • handler:拒绝策略,当线程池和任务队列均处于饱和状态时该使用的处理方式,默认为抛出异常

线程池状态

ThreadPoolExecutor采用一个32位的整数来存放线程池的状态和当前池中的线程数,其中高3位用于存放线程池状态,低29位表示线程数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/** 状态控制变量,该变量用于表示线程池的状态和线程数 */
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/** 该方法用于组合线程池的状态和线程数,通过按位或的方式 */
private static int ctlOf(int rs, int wc) { return rs | wc; }

/** 就是29 */
private static final int COUNT_BITS = Integer.SIZE - 3;
/** 线程池的最大线程数,也就是(2^29-1):000 11111111111111111111111111111 */
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// 线程池的状态存放在高3位中
private static final int RUNNING = -1 << COUNT_BITS; // 111 0000...
private static final int SHUTDOWN = 0 << COUNT_BITS; // 000 0000...,对应 shutdown()
private static final int STOP = 1 << COUNT_BITS; // 001 0000...,对应 shutdownNow()
private static final int TIDYING = 2 << COUNT_BITS; // 010 0000...
private static final int TERMINATED = 3 << COUNT_BITS; // 011 0000...

// 获取线程池的运行状态,~运算符i会将0、1取反
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取线程池中的线程数
private static int workerCountOf(int c) { return c & CAPACITY; }

这里面的状态还是比较清晰的,并且状态值是顺序递增的。RUNNING表示线程池的初始状态,而SHUTDOWNSTOP分别是调用了shutdown()shutdownNow()方法后进入的状态,其中在tryTerminate()方法中转换成TIDYING状态,表示在SHUTDOWN / STOP后任务队列和线程池都清空了,此时执行钩子方法terminated(),而当terminated()方法结束后,线程池的状态就会变为TERMINATED

Worker

WorkerThreadPoolExecutor的内部类,用于封装线程池中的工作线程,也就是用来执行任务的,而任务是Runnable(内部变量名叫taskcommand)。要注意的是,该类继承了AQS,用于实现一个简单的互斥锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

// 工作线程,用来执行任务的
final Thread thread;

// 初始任务
Runnable firstTask;

// 存放此线程完成的任务数
volatile long completedTasks;

Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // 通过构造线程池时传入的线程工厂来创建一个新线程
}

public void run() {
// 调用了外部的 runWorker() 方法
runWorker(this);
}


// 以下为AQS相关的方法:
// 0 表示解锁状态
// 1 表示加锁状态

protected boolean isHeldExclusively() {
return getState() != 0;
}

protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

runWorker

Worker实现了Runnable接口,并将run()方法的实现委托给了外部类ThreadPoolExecutorrunWorker()方法,这个方法就是不断的从任务队列中拿取任务运行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null; // help GC
w.unlock(); // allow interrupts
boolean completedAbruptly = true; // 用于标记完成任务时是否有异常
try {
// 循环:初始任务(首次)或者从阻塞队列中拿一个(后续)
// 这也体现了线程池的意义,工作线程在执行完一个任务后,会再次到任务队列中获取新的任务,实现了”线程复用“
while (task != null || (task = getTask()) != null) {
// 获取互斥锁,在获取互斥锁时,调用 shutdown() 方法不会中断线程,但是 shutdownNow() 方法无视互斥锁,会中断所有线程
w.lock();
// 判断是否需要中断当前线程。如果线程池的状态 >= STOP ,当前线程未中断,则中断当前线程,否则清除线程中断位
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 交由子类实现的前置处理钩子
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 真正的执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 交由子类实现的后置处理钩子
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++; // 该 Worker 完成的任务数加一
w.unlock();
}
}
// while 循环之外

completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly); // 处理工作线程退出
}
}

getTask

runWorker()方法中,会尝试通过阻塞队列获取任务来执行,而这个获取任务的逻辑则封装到了getTask()这个核心方法中。在以下几种情况会返回null从而接下来线程退出(runWorker()方法中的循环结束):

  1. 当前工作线程数超过了maximumPoolSize(由于maximumPoolSize可以动态调整,这是可能的)
  2. 线程池状态为STOP(因为STOP状态不处理阻塞队列中的任务了)
  3. 线程池状态为SHUTDOWN,但阻塞队列为空
  4. 线程数量大于corePoolSizeallowCoreThreadTimeOut设置为true,当线程空闲时间超过keepAliveTime(这里说的空闲时间其实就是poll()方法阻塞在队列上的时间)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private Runnable getTask() {
// 上次从阻塞队列 poll 任务时是否超时
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// if 条件等价于 rs >= STOP || (rs == SHUTDOWN && workQueue.isEmpty())
// 此时将工作线程数减一
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// allowCoreThreadTimeOut 是用于设置核心线程是否受 keepAliveTime 影响,
// 在 allowCoreThreadTimeOut 为 true 或工作线程数 > corePoolSize的情况下,
// 当前的工作线程会受 keepAliveTime 影响
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

// 1. 工作线程数 > maximumPoolSize,当前工作线程需要退出
// 2. timed && timedOut == true 说明当前线程受 keepAliveTime 影响并且上次获取任务超时。这种情况下,如果当前线程不是最后一个线程或者队列为空,则可以退出
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
// 根据 timed 变量的值决定是限时阻塞获取还是一直阻塞获取队列中的任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 超时退出
workQueue.take(); // 一直阻塞
if (r != null)
return r;
timedOut = true; // 走到这说明 poll 超时了
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

执行任务

execute

有了上面的一些概念后,接下来我们看看最核心的execute()方法,它包含了提交任务时的几大过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get(); // 获取线程池的状态控制变量
// 1. 如果线程数少于核心线程池的大小,则添加一个 Worker 来执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2. 到这里说明当前线程数大于等于核心线程池大小(或者 addWorker() 失败),如果线程池处于 RUNNING 状态,则将这个任务添加到阻塞队列 workQueue 中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 2.1 线程池已经关闭了,则移除队列中刚提交的任务
if (! isRunning(recheck) && remove(command))
reject(command);
// 2.2 没有工作线程了,则添加一个空任务工作线程用于执行提交的任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 如果阻塞队列满了,那么以 maximumPoolSize 为界创建新的 Worker,
// 如果失败,说明当前线程数已经达到 maximumPoolSize,此时执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}

这里先对上面的三大步骤做个抽象层面的梳理:

  1. 如果当前运行的线程数少于corePoolSize,则创建新线程来执行任务
  2. 如果运行的线程数大于或等于corePoolSize,则将任务加入阻塞队列
  3. 如果阻塞队列也满了,则以maximumPoolSize为界创建新线程,如果线程数比maximumPoolSize还大,则执行拒绝策略

addWorker

下面开始更细的去分析上述三大流程中涉及的一些方法,首先是addWorker()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/**
* @param firstTask 准备提交给这个线程执行的任务,可以为 null
* @param core 如果为 true,表示使用核心线程数 corePoolSize 作为创建线程的界限;如果为 false,表示使用最大线程数 maximumPoolSize 作为创建线程的界限
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get(); // 获取线程池的状态控制变量
int rs = runStateOf(c); // 获取线程池的状态

// 当线程池状态小于 SHUTDOWN 时,直接往下继续执行
// 当线程池状态等于 SHUTDOWN 时,如果 firstTask 为 null,且 workQueue 不为空,是允许创建新的 Worker 的,因为此时要把 workQueue 中的任务执行完;否则,当其中一个条件不满足时,不会继续往下执行
// 当线程池状态大于 SHUTDOWN 时,不允许创建新的 Worker 提交任务,不会继续往下执行
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c); // 获取线程数
if (wc >= CAPACITY || // 如果超过了 2^29-1 这个上限,或者超过了 corePoolSize 或 maximumPoolSize(由传入参数决定使用哪个),一样不会继续往下执行
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 如果成功 CAS 新增 Worker 的数目,跳出循环往下走
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 重读状态控制变量,如果线程池状态变了,则重试整个大循环
if (runStateOf(c) != rs)
continue retry;
// 否则,仅仅是 workerCount 变了,也就是 CAS 新增 workerCount 失败,重试内层循环
}
}

// 运行到此处时,线程池线程数已经成功+1,下面进行实质操作

boolean workerStarted = false; // Worker 是否已经启动
boolean workerAdded = false; // Worker 是否已经添加到 workers 中
Worker w = null;
try {
w = new Worker(firstTask); // 在这个 Worker 的构造函数中,会通过线程工厂 new 一个新线程
final Thread t = w.thread; // 获取在构造 Worker 时线程工厂 new 出的新线程
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 整个线程池的全局锁,因为关闭线程池是需要这个锁的,这能保证持有锁的期间,线程池不会关闭
mainLock.lock();
try {
// 由于获取锁之前线程池状态可能发生了变化,这里需要重新读一次状态
int rs = runStateOf(ctl.get());
// 如果小于 SHUTDOWN 或者等于 SHUTDOWN 但 firstTask == null(不接受新任务但会继续执行阻塞队列中的任务)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // worker 中的 thread 不能是已经启动了的,不然要抛出异常
throw new IllegalThreadStateException();
workers.add(w); // 将新创建的 Worker 加入到 workers 这个 HashSet 中
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s; // 记录线程池的历史最大值
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// Worker 添加成功,启动线程
t.start();
workerStarted = true;
}
}
} finally {
// 如果 Worker 线程启动失败,则做一些回滚操作
if (! workerStarted)
addWorkerFailed(w);
}
// 返回线程是否启动成功
return workerStarted;
}

这里笔者刚开始很疑惑的一点是,为什么t.start();会执行到Worker中的run()方法,它不是Worker中的属性吗,它自己本身并没有传入一个Runnable吧。但实际上,在通过线程工厂创建这个线程的时候,是传入了一个Runnable的,它就是Worker本身:

1
2
3
4
5
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // 注意这里的 this
}

因此,当我们使用t.start()开启这个Thread的时候,这个Thread中的target是为Worker本身的,所以才会执行它的run()方法:

1
2
3
4
5
6
7
// Thread.java

public void run() {
if (target != null) {
target.run();
}
}

此时就与之前分析Worker时候的方法串起来了,Workerrun()方法的执行逻辑其实是委托给外部类的runWorker()方法来完成,而runWorker()方法最终调用的就是传入的firstTask或者从阻塞队列中取到的某个任务,执行它的run()方法。

addWorkerFailed

线程成功启动后的逻辑已经分析完了,接下来看看线程如果启动失败时会发生什么:

1
2
3
4
5
6
7
8
9
10
11
12
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}

其实就是回滚后尝试终止线程池:

  1. workers中删除失败的Worker
  2. workerCount减一
  3. 调用tryTerminate()尝试终止线程池

线程池的关闭

shutdown

shutdown()方法关闭线程池比较优雅,线程池进入SHUTDOWN后不会再接受新任务,并且中断所有空闲线程(阻塞在队列上的线程),但是任务队列中已有的任务将会继续处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess(); // 检查是否有 shutdown 的权限,非重点
advanceRunState(SHUTDOWN); // 状态切换到 SHUTDOWN
interruptIdleWorkers(); // 中断所有空闲线程,或者说在任务队列上阻塞的线程
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试终止线程池(状态流转至 TERMINATED)
tryTerminate();
}

private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 工作线程在处理任务阶段是被互斥锁保护着的,所以 tryLock() 会返回 false,不会中断到
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

shutdownNow

shutdownNow()方法关闭线程池相比shutdown()暴力了一些,会中断所有线程,哪怕线程正在执行任务。线程池进入STOP状态后既不会接受新任务,也不会处理任务队列中已有的任务。需要注意的是,即便shutdownNow()会中断正在执行任务的线程,但不代表任务一定会挂,因为如果提交的任务里面的代码没有对线程中断敏感的逻辑的话,线程中断是不会有任何效果的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess(); // 检查是否有 shutdown 的权限,非重点
advanceRunState(STOP); // 状态切换到 STOP
interruptWorkers(); // 与 SHUTDOWN 不同的是,直接中断所有线程
tasks = drainQueue(); // 将任务队列中的任务收集到 tasks
} finally {
mainLock.unlock();
}
// 尝试终止线程池(状态流转至 TERMINATED)
tryTerminate();
return tasks;
}

private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}

// 此方法在 Worker 类中
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}

拒绝策略

execute()方法中我们可以看到,有两种情况会调用reject()拒绝策略来处理任务,一个是当任务加入阻塞队列后的短暂空窗期线程池已经关闭了,此时再次查看线程池的状态不为RUNNING就会将任务移出队列并执行拒绝策略,另一个是当线程数超过了maximumPoolSize,无法再创建新线程了。

1
2
3
4
5
private volatile RejectedExecutionHandler handler;

final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}

RejectedExecutionHandlerThreadPoolExecutor中有四个已经定义好的实现类可供我们直接使用,当然,我们也可以实现自己的策略,不过一般也没有必要。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 如果线程池没有被关闭,那么由提交任务的线程自己来执行这个任务
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}

// 不管怎样,直接抛出 RejectedExecutionException 异常
// 这个是默认的策略,如果在构造线程池时不传相应的 handler 的话,那就会使用这个
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

// 不做任何处理,直接忽略掉这个任务
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}

// 如果线程池没有被关闭,那么丢弃任务队列中首部的任务,然后提交该任务
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}

Executors

Executors是一个工具类,所有的方法都是static的,它为我们创建线程池提供了很大的便利。

newFixedThreadPool

生成一个固定大小的线程池,最大线程数设置为与核心线程数相等,此时keepAliveTime设置为0(因为这里它是没用的,即使不为0,线程池默认也不会回收corePoolSize内的线程),阻塞队列采用LinkedBlockingQueue无界队列。

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

newSingleThreadExecutor

生成只有一个线程的线程池,与newFixedThreadPool唯一的不同在于核心线程和最大线程数都为1,不需要指定。

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

newCachedThreadPool

生成一个需要的时候就创建新线程的线程池。这种线程池对于任务可以比较快速地完成的情况下有比较好的性能,如果线程空闲了60秒都没有任务,那么将关闭此线程并从线程池中移除。所以如果线程池空闲了很长时间也不会有问题,因为随着所有线程的关闭,整个线程池不会占用任何的系统资源。

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

总结

本文从Executor顶层接口逐层向下分析,重点讲解了ThreadPoolExecutor的源码实现,包括核心参数、线程创建过程、执行任务、拒绝策略和线程池的关闭等,由于Executor体系本身内容还是比较多的,因此有些地方依然没有关注到,例如定时相关的ScheduledExecutorService接口和同时实现了ThreadPoolExecutorScheduledExecutorServiceScheduledThreadPoolExecutor,并且关于ThreadPoolExecutor的线程池关闭这一块,也还有几个方法没有深入分析,将来有时间一定补上。

参考资料