1. 线程池创建
先使用ThreadPoolExecutor
手动创建一个线程池
根据阿里巴巴Java开发手册里面的要求,线程池不允许使用
Executors
创建,而是通过ThreadPoolExecutor
的方式
这样的处理方式更加明确线程池的运行规则,规避资源耗尽的风险
说明:Executors返回的线程池对象的弊端如下:
FixedThreadPool
和SingleThreadPool
: 允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量请求,从而导致OOM
CachedThreadPool
和ScheduledThreadPool
: 允许的创建线程数长度为Integer.MAX_VALUE,可能会创建大量线程,从而导致OOM
1 | ThreadPoolExecutor executor = new ThreadPoolExecutor(2, |
ThreadPoolExecutor七大参数:
- corePoolSize:核心线程池大小
- maximumPoolSize:最大线程池大小
- keepAliveTime:空闲线程存活时间
- unit:时间单位
- workQueue:阻塞队列
- threadFactory:线程工厂:创建线程的,一般不用动
- handler:拒绝策略
- new ThreadPoolExecutor.AbortPolicy() // 不执行新任务,直接抛出异常,提示线程池已满
- new ThreadPoolExecutor.CallerRunsPolicy() // 哪来的去哪里!由调用线程处理该任务
- new ThreadPoolExecutor.DiscardPolicy() //不执行新任务,也不抛出异常
- new ThreadPoolExecutor.DiscardOldestPolicy() //丢弃队列最前面的任务,然后重新提交被拒绝的任务。
ThreadPoolExecutor构造方法源码
1 | public ThreadPoolExecutor(int corePoolSize, |
2. 线程池的基础属性和方法
在线程池的源码中,会通过一个 AtomicInteger
类型的变量 ctl 来表示线程池的状态和当前线程池中的工作线程数量。
1 | private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); |
一个Integer占4个字节,也就是 32 个bit,线程池有5个状态:
- RUNNING
- SHUTDOWN
- STOP
- TIDYING
- TERMINATED2个
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28源码关于5中状态的说明:
/* ******
* The runState provides the main lifecycle control, taking on values:
*
* RUNNING: Accept new tasks and process queued tasks
* SHUTDOWN: Don't accept new tasks, but process queued tasks
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
* TIDYING: All tasks have terminated, workerCount is zero,
* the thread transitioning to state TIDYING
* will run the terminated() hook method
* TERMINATED: terminated() has completed
*
* The numerical order among these values matters, to allow
* ordered comparisons. The runState monotonically increases over
* time, but need not hit each state. The transitions are:
*
* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
* STOP -> TIDYING
* When pool is empty
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
*/bit
能表示4种状态, 那5种状态就至少需要3个bit位,比如在线程池的源码中就是这么表示的1
2
3
4
5
6
7
8private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;Integer.SIZE
为 32, 所以COUNT_BITS = 29
, 最终各个状态对应的二进制为:
- RUNNING: 11100000 00000000 00000000 00000000
- SHUTDOWN: 00000000 00000000 00000000 00000000
- STOP: 00100000 00000000 00000000 00000000
- TIDYING: 01000000 00000000 00000000 00000000
- TERMINATED: 01100000 00000000 00000000 00000000
所以,只需要使用一个Integer数字的最高3个bit,就可以表示5种线程池状态,而剩下的29个bit就可以用来表示工作线程数,
比如:假设ctl
为:11100000 00000000 00000000 00001010
就表示线程池的状态为RUNNING
,线程池池目前在工作的线程有10个,
这里说的”在工作”指的是线程活着,要么在执行任务,要么在阻塞等待任务。
同时,线程池中也提供一些方法来获取线程池状态和工作线程数,比如:
1 | private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); |
execute方法
当执行线程池的execute
方法时
1 | public void execute(Runnable command) { |
addWorker方法
addWorker方法是核心方法,用于添加线程的, core参数表示条件的是核心线程还是非核心线程
添加线程:实际上就是开启一个线程,不管是核心线程还是非核心线程都只是一个普通的线程,而核心和非核心的区别在于:
如果要添加核心工作线程,那么就要判断目前的工作线程数是否超过
corePoolSize
a. 如果没超过,则直接开启新的工作线程执行任务
b. 如果超过了,则不会开启新的工作线程,而是把任务进行入队
如果添加非核心线程,那么就要判断目前的工作线程数是否超过
maximumPoolSize
a. 如果没超过,则直接开启新的工作线程执行任务
b. 如果超过了,则拒绝执行任务
所以在addWorker方法中,首先就要判断工作线程有没有超过限制,如果没有超过限制再去开启一个线程。
并且在addWorker方法中,还得判断线程池的状态,如果线程池的状态不是RUNNING状态了,那就没必要要去添加线程了,当然有一种特例,就是线程池的状态是SHUTDOWN,但是队列中有任务,那此时还是需要添加添加一个线程的。
我们前面提到的都是开启新的工作线程,那么工作线程怎么回收呢?不可能开启的工作线程一直活着,因为如果任务由多变少,那也就不需要过多的线程资源,所以线程池中会有机制对开启的工作线程进行回收,如何回收的,后文会提到,我们这里先分析,有没有可能线程池中所有的线程都被回收了,答案的是有的。
首先非核心工作线程被回收是可以理解的,那核心工作线程要不要回收掉呢?其实线程池存在的意义,就是提交生成好线程资源,需要线程的时候直接使用就可以,而不需要临时去开启线程,所以正常情况下,开启的核心工作线程是不用回收掉的,就算暂时没有任务要处理,也不用回收,就让核心工作线程在那等着就可以了。
但是,在线程池中有这么一个参数:allowCoreThreadTimeOut
表示是否允许核心工作线程超时,意思就是是否允许核心工作线程回收。默认这个参数为false,但是我们可以调用allowCoreThreadTimeOut(boolean value)来把这个参数改为true,只要改了,那么核心工作线程也就会被回收了,那这样线程池中的所有工作线程都可能被回收掉,那如果所有工作线程都被回收掉之后,阻塞队列中来了一个任务,这样就形成了特例情况。
1 | private boolean addWorker(Runnable firstTask, boolean core) { |
所以,对于addWorker
方法,核心逻辑就是:
- 先判断工作线程数是否超过了限制
- 修改ctl,使工作线程数 +1
- 构造Work对象,并把它添加到workers集合中
- 启动Work对象对应的工作线程