线程池底层源码分析

1. 线程池创建

先使用ThreadPoolExecutor手动创建一个线程池

根据阿里巴巴Java开发手册里面的要求,线程池不允许使用Executors创建,而是通过ThreadPoolExecutor的方式
这样的处理方式更加明确线程池的运行规则,规避资源耗尽的风险
说明:Executors返回的线程池对象的弊端如下:

  1. FixedThreadPoolSingleThreadPool: 允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量请求,从而导致OOM
  1. CachedThreadPoolScheduledThreadPool: 允许的创建线程数长度为Integer.MAX_VALUE,可能会创建大量线程,从而导致OOM
1
2
3
4
5
6
7
ThreadPoolExecutor executor = new ThreadPoolExecutor(2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());

ThreadPoolExecutor七大参数:

  1. corePoolSize:核心线程池大小
  2. maximumPoolSize:最大线程池大小
  3. keepAliveTime:空闲线程存活时间
  4. unit:时间单位
  5. workQueue:阻塞队列
  6. threadFactory:线程工厂:创建线程的,一般不用动
  7. handler:拒绝策略
    • new ThreadPoolExecutor.AbortPolicy() // 不执行新任务,直接抛出异常,提示线程池已满
    • new ThreadPoolExecutor.CallerRunsPolicy() // 哪来的去哪里!由调用线程处理该任务
    • new ThreadPoolExecutor.DiscardPolicy() //不执行新任务,也不抛出异常
    • new ThreadPoolExecutor.DiscardOldestPolicy() //丢弃队列最前面的任务,然后重新提交被拒绝的任务。

ThreadPoolExecutor构造方法源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

2. 线程池的基础属性和方法

在线程池的源码中,会通过一个 AtomicInteger类型的变量 ctl 来表示线程池的状态和当前线程池中的工作线程数量

1
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

一个Integer占4个字节,也就是 32 个bit,线程池有5个状态:

  1. RUNNING
  2. SHUTDOWN
  3. STOP
  4. TIDYING
  5. TERMINATED
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
     源码关于5中状态的说明:
    /* ******
    * The runState provides the main lifecycle control, taking on values:
    *
    * RUNNING: Accept new tasks and process queued tasks
    * SHUTDOWN: Don't accept new tasks, but process queued tasks
    * STOP: Don't accept new tasks, don't process queued tasks,
    * and interrupt in-progress tasks
    * TIDYING: All tasks have terminated, workerCount is zero,
    * the thread transitioning to state TIDYING
    * will run the terminated() hook method
    * TERMINATED: terminated() has completed
    *
    * The numerical order among these values matters, to allow
    * ordered comparisons. The runState monotonically increases over
    * time, but need not hit each state. The transitions are:
    *
    * RUNNING -> SHUTDOWN
    * On invocation of shutdown(), perhaps implicitly in finalize()
    * (RUNNING or SHUTDOWN) -> STOP
    * On invocation of shutdownNow()
    * SHUTDOWN -> TIDYING
    * When both queue and pool are empty
    * STOP -> TIDYING
    * When pool is empty
    * TIDYING -> TERMINATED
    * When the terminated() hook method has completed
    */
    2个 bit 能表示4种状态, 那5种状态就至少需要3个bit位,比如在线程池的源码中就是这么表示的
    1
    2
    3
    4
    5
    6
    7
    8
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY = (1 << COUNT_BITS) - 1;
    // runState is stored in the high-order bits
    private static final int RUNNING = -1 << COUNT_BITS;
    private static final int SHUTDOWN = 0 << COUNT_BITS;
    private static final int STOP = 1 << COUNT_BITS;
    private static final int TIDYING = 2 << COUNT_BITS;
    private static final int TERMINATED = 3 << COUNT_BITS;
    Integer.SIZE 为 32, 所以 COUNT_BITS = 29, 最终各个状态对应的二进制为:
  • RUNNING: 11100000 00000000 00000000 00000000
  • SHUTDOWN: 00000000 00000000 00000000 00000000
  • STOP: 00100000 00000000 00000000 00000000
  • TIDYING: 01000000 00000000 00000000 00000000
  • TERMINATED: 01100000 00000000 00000000 00000000

所以,只需要使用一个Integer数字的最高3个bit,就可以表示5种线程池状态,而剩下的29个bit就可以用来表示工作线程数,
比如:假设ctl为:11100000 00000000 00000000 00001010 就表示线程池的状态为RUNNING,线程池池目前在工作的线程有10个,
这里说的”在工作”指的是线程活着,要么在执行任务,要么在阻塞等待任务。

同时,线程池中也提供一些方法来获取线程池状态和工作线程数,比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//29 二进制为:00000000 00000000 00000000 00011101
private static final int COUNT_BITS = Integer.SIZE - 3;
//二进制为:00011111 11111111 11111111 11111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// Packing and unpacking ctl
//~CAPACITY: 11100000 00000000 00000000 00000000
// & 操作之后, 得到的就是 c 的高3位
private static int runStateOf(int c) { return c & ~CAPACITY; }

//CAPACITY: 00011111 11111111 11111111 11111111
// & 操作之后, 得到的就是 c 的低29位
private static int workerCountOf(int c) { return c & CAPACITY; }

/**
* 把运行状态和线程数量进行合并,传入的两个int 数字有限制, rs的低29位都必须是9
* wc的高3位都必须位0,这样经过或运算之后,才能得到准确的 ctl
*/
private static int ctlOf(int rs, int wc) { return rs | wc; }

execute方法

当执行线程池的execute方法时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//获取ctl, ctl初始值时 ctlOf(RUNNING, 0), 表示线程池处于运行中,工作线程数位0
int c = ctl.get();

//工作线程数于 corePoolSize, 则添加工作线程数,并把command作为该线程要执行的任务
if (workerCountOf(c) < corePoolSize) {
//true 表示添加的时核心工作线程,在addWorker内部会判断当前工作线程数是不是超过了corePoolSize
//如果超过了就会添加失败,addWorker返回false,表示不能开启新的线程来执行任务,而是应该先 入队
if (addWorker(command, true))
return;

//如果添加核心工作线程失败,要重新获取ctl, 可能是线程池状态被其他线程修改了
// 也可能是其他线程也在向线程池提交任务,导致核心工作线程已经超出了corePoolSize
c = ctl.get();
}
// 线程池状态是否还是RUNNING,如果是就把任务添加到阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {
//在任务入队时,线程池的状态也可能发生改变
int recheck = ctl.get();

// 再次检查线程池的状态,如果不是RUNNING,就不能再接受任务了,就把任务从队列中移除,并进行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
//不过为了确保刚刚入队的任务有线程回去处理它,需要判断以下工作线程数,如果位0,则添加一个非核心的工作线程
addWorker(null, false);
}
//如果线程池状态不是RUNNING,或者线程池状态时RUNNING但是队列满了,则去添加一个非核心工作线程
//实际上,addWorker中会判断线程池状态不是RUNNING,是不会添加工作线程的
//false表示非核心工作线程,作用是,在addWorker内部会判断当前工作线程数已经超过了maximumPoolSize
//如果超过了则会添加不成功,执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}

addWorker方法

addWorker方法是核心方法,用于添加线程的, core参数表示条件的是核心线程还是非核心线程

添加线程:实际上就是开启一个线程,不管是核心线程还是非核心线程都只是一个普通的线程,而核心和非核心的区别在于:

  1. 如果要添加核心工作线程,那么就要判断目前的工作线程数是否超过corePoolSize

    a. 如果没超过,则直接开启新的工作线程执行任务

    b. 如果超过了,则不会开启新的工作线程,而是把任务进行入队

  2. 如果添加非核心线程,那么就要判断目前的工作线程数是否超过maximumPoolSize

    a. 如果没超过,则直接开启新的工作线程执行任务

    b. 如果超过了,则拒绝执行任务

所以在addWorker方法中,首先就要判断工作线程有没有超过限制,如果没有超过限制再去开启一个线程。

并且在addWorker方法中,还得判断线程池的状态,如果线程池的状态不是RUNNING状态了,那就没必要要去添加线程了,当然有一种特例,就是线程池的状态是SHUTDOWN,但是队列中有任务,那此时还是需要添加添加一个线程的。

我们前面提到的都是开启新的工作线程,那么工作线程怎么回收呢?不可能开启的工作线程一直活着,因为如果任务由多变少,那也就不需要过多的线程资源,所以线程池中会有机制对开启的工作线程进行回收,如何回收的,后文会提到,我们这里先分析,有没有可能线程池中所有的线程都被回收了,答案的是有的。

首先非核心工作线程被回收是可以理解的,那核心工作线程要不要回收掉呢?其实线程池存在的意义,就是提交生成好线程资源,需要线程的时候直接使用就可以,而不需要临时去开启线程,所以正常情况下,开启的核心工作线程是不用回收掉的,就算暂时没有任务要处理,也不用回收,就让核心工作线程在那等着就可以了。

但是,在线程池中有这么一个参数:allowCoreThreadTimeOut表示是否允许核心工作线程超时,意思就是是否允许核心工作线程回收。默认这个参数为false,但是我们可以调用allowCoreThreadTimeOut(boolean value)来把这个参数改为true,只要改了,那么核心工作线程也就会被回收了,那这样线程池中的所有工作线程都可能被回收掉,那如果所有工作线程都被回收掉之后,阻塞队列中来了一个任务,这样就形成了特例情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

//线程池状态不是RUNNING,并且不是特例情况(线程池状态是SHUTDOWN并且队列不为空)
//如果是RUNNING或者是特例情况,就准备新建工作线程
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
//判断工作线程数是否超过了限制
//如果超过了,则return false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
//如果没有超过限制,则修改ctl,增加工作线程数,cas成功则退出外层retry循环,去创建新的工作线程
break retry;
//如果cas失败,则表示有其他线程也在提交任务,也在增加工作线程数,此时重新获取ctl
c = ctl.get(); // Re-read ctl

//如果发现线程池的状态发生了变化,则继续回到retry,重新判断线程池的状态是否是RUNNING
//如果没有发生变化,则继续利用CAS来增加工作线程数,直到cas成功
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//ctl 修改成功,也就是工作线程数 +1 成功
//接下来就要开启一个新的工作线程了
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//Workker实现了Runnable接口,构造一个Worker对象时,就会利用ThreadFactory新建一个线程
//Worker对象有两个属性:
//Runnable firstTask: 表示worker待执行的第一个任务,第二个任务会从阻塞队列中获取
// Thread thread: 表示Worker对应的线程,就是这个线程来获取并执行任务的
w = new Worker(firstTask);

//拿出线程对象,还没有start
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

// 如果线程池的状态是RUNNING
//或者线程池的状态变成了SHUTDOWN,但是当前线程没有自己的第一个任务,那就表示当前调用addWorker方法是为了从队列中获取任务来执行
// 正常情况下线程池的状态如果是SHUTDOWN,是不能创建新的工作线程的,但是队列中如果有任务,那就是上面说的特例情况
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();

// workers用来记录当前线程池中工作线程
workers.add(w);

//largestPoolSize 用来跟踪线程池在运行中工作线程数据的峰值
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//运行线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//在上述过程中如果抛出了异常,需要从workers中移除添加的work,并且还要修改ctl , 工作线程数据 -1,表示新建工作线程失败
if (! workerStarted)
addWorkerFailed(w);
}
//表示添加工作线程成功
return workerStarted;
}

所以,对于addWorker方法,核心逻辑就是:

  1. 先判断工作线程数是否超过了限制
  2. 修改ctl,使工作线程数 +1
  3. 构造Work对象,并把它添加到workers集合中
  4. 启动Work对象对应的工作线程