Executor与线程池

前言

“池”这个字恐怕大家都耳熟能详,诸如连接池、对象池以及线程池等等。我们可以将暂时用不到的资源搁置在池中,等待需要使用的时候,就可以直接从池中获取。池不仅可以重复利用资源,而无需重新进行资源的构建工作,还可以因此简化资源的管理与调用,提升效率与简化编码。

而线程池,就是一种常见的“池”。比起降低新建线程的开销,线程池更可以帮助我们处理线程的切换,减少线程上下文切换的开销。同时,也可以帮助任务的调度与管理。

导航

Executor

在了解线程池 ThreadPoolExecutor 之前,我们先来看一下它实现的顶级接口——Executor:执行器。

1
2
3
public interface Executor {
void execute(Runnable command);
}

Executor 可以说是非常简单了,只有一个 execute 方法。使用过线程池的读者们对这个方法大概很熟悉,那么 Executor 接口是用来做什么的?这是文档上的解释:

这个接口解耦了任务的提交与执行。

线程池无疑解耦了任务的提交与执行,所以实现了 Executor。而 Executor 接口下,并不止线程池这一部分。

执行器框架

顺着上图,我们也就可以理解在 java 中,线程池的实现类属于一种执行器,所以,线程池才被命名为 ThreadPoolExecutor。

ThreadPoolExecutor

这是我们所熟知的线程池实现类,先上示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class NetworkService implements Runnable {
private final ExecutorService pool; // ExecutorService接口继承了Executor接口,额外提供对线程的终止,与对异步任务的支持等

public NetworkService(int port, int poolSize)
throws IOException {
pool = Executors.newFixedThreadPool(poolSize); // 通过Executors类的工厂方法来获取线程池
}

public void run() { // run the service
for (; ; ) {
pool.execute(new Runnable() { // 提交任务
@Override
public void run() {
// do something
}
});
}
}
}

示例代码往往展现了最基础的功能。由上述代码,我们可以看到,线程池的关键步骤有两个:

  • 线程池的创建
  • 任务的提交

线程池的创建

Executors.newFixedThreadPool(poolSize) 方法究竟做了什么呢?

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

emmm,参数有点多。这是完整参数版的构造方法:

1
2
3
4
5
6
7
8
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}

在介绍这些参数的含义之前,我们约定一些名词:

  • workers:工作线程,也就是执行任务的线程,以下简称线程
  • alive:线程存活,也就是线程没有被终止

那么,这几个参数的意义是:

  • corePoolSize:核心线程数,允许长期空闲的线程数量
  • maximumPoolSize:最大线程数量
  • keepAliveTime:超过 corePoolSize 的空闲线程存活时间
  • RejectedExecutionHandler:拒绝执行处理器,当任务被拒绝执行时触发
  • workQueue:暂存任务的工作队列,下称队列
  • threadFactory: 创建线程的线程工厂

要了解这些参数的作用,还得先清除线程池的执行流程。因此,我们再来看另一关键步骤:execute。

任务的提交

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

int c = ctl.get(); // 获取池的状态

if (workerCountOf(c) < corePoolSize) { // 如果线程数量少于核心线程
if (addWorker(command, true)) // 创建新核心线程执行
return; // 成功则返回
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { // 加入队列
int recheck = ctl.get(); // 重新检查状态,以防池关闭或者线程终止
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) // 创建新线程执行(可以是非核心线程)
reject(command);
}

因此,线层执行的三个步骤是非常清楚的:

线程池执行

了解了大体流程,我们再来关注 execute 方法的细节。在笔者看来,execute 方法有着两个关键点:ctl 与 worker。下面,我们就来这两个对象。

ctl

java 中有不少使用一个字段来控制/记录对象状态的,ctl 就是其一。线程池一共有 5 中状态,分别是:RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED。这些状态的表示如下:

1
2
3
4
5
private static final int RUNNING    = -1 << COUNT_BITS; // 接受新任务并且处理任务
private static final int SHUTDOWN = 0 << COUNT_BITS; // 不接受新任务但处理任务
private static final int STOP = 1 << COUNT_BITS; // 不接受新任务,不处理任务,并且会中断正在执行的任务
private static final int TIDYING = 2 << COUNT_BITS; // 无任务,无存在线程,将会执行terminated()方法
private static final int TERMINATED = 3 << COUNT_BITS; // terminated()方法执行完成,

读者们可能已经意识到了,这些状态是有递增顺序的,并且与值的大小相对应。因此,这些协助判断状态的方法也不难理解了:

1
2
3
4
5
6
7
8
9
10
11
private static boolean runStateLessThan(int c, int s) {
return c < s;
}

private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}

private static boolean isRunning(int c) {
return c < SHUTDOWN;
}

对线程池的状态有了初步的了解,我们再来看 ctl。ctl 是 AtomicInteger 类型,value 是 32 位的 int。ctl 用最高 3 位表示线程池状态(2^3=8>5),低 29 位表示线程数量。因此,相关的方法如下:

1
2
3
4
5
6
7
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

private static int runStateOf(int c) { return c & ~COUNT_MASK; } // 高3位
private static int workerCountOf(int c) { return c & COUNT_MASK; } // 低29位
private static int ctlOf(int rs, int wc) { return rs | wc; } // 线程池状态|线程数量

worker

worker 继承了 AQS,通过锁来防止线程正在运行时被中断。而我们优先的关注点,自然在线程执行的 run()方法上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private final class Worker extends AbstractQueuedSynchronizer
implements Runnable {

public void run() {
runWorker(this);
}

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null; // 更新firstTask字段
w.unlock(); // allow interrupts
boolean completedAbruptly = true; // 执行中抛出异常
try {
while (task != null ||
(task = getTask()) != null) { // 从队列中获取任务
w.lock(); // 加锁,防止执行任务过程中被中断
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) // 线程池不执行任务,保证线程中断,不再执行任务
&& !wt.isInterrupted()) // 线程池可以执行任务,保证线程没被中断
wt.interrupt(); // 处理被清除的中断状态
try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock(); // 释放锁
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly); // 收拾一下
}
}
}