ThreadPoolExecutor是一个ExecutorService,实现了Executor接口,是一个线程池服务的实现。通常通过Executors类的工厂方法来创建。
一、实现原理
1、关键要素
要理解线程池的实现原理,主要理解以下的关键要素:
• 线程池的状态:提供了线程池生命周期中所处的不同状态,分别为以下状态(分别用0,1,2,3表示):
①RUNNING: 可以接收新任务和可以处理队列中的任务;
②SHUTDOWN: 不接收新任务,可以继续处理队列中的任务;
③STOP: 不接收新任务,也不处理队列任务,中断在执行的任务;
④TERMINATE: 类似STOP,所有的线程都会停止。
• 线程数量控制参数:用于控制何时创建线程,何时不能创建线程和任务逻辑控制。
①corePoolSize: 池中核心线程的数量;
②maximunPoolSize: 池中最大的线程数量;
③poolSize: 当前池中的线程数量;
• 工作者线程和工作队列:工作者线程从工作队列中获取任务并执行,如果设置了超时参数,工作者线程如果没有任务时,会被回收。工作队列使用的都是阻塞BlockingQueue。
• 线程工厂:ThreadFactory,用于创建新线程,可以自定义线程的一些属性,比如名称等,如果在初始化是没有配置,就会使用默认的ThreadFactory。
• 保持活动的时间:keepAliveTime,当线程池中有多于corePoolSize的线程时,多出的线程空闲时间超过keepAliveTime时将会被清理。
• 拒绝策略:当线程池已经关闭或线程数量达到最大数或工作队列已经饱和时,将拒绝执行新的任务, 主要实现了以下4种拒绝策略:
①AbortPolicy:中断策略,默认的策略,拒绝任务并抛出运行时异常RejectedExecutionException;
②CallerRunsPolicy:使用调用者线程执行任务,除非线程池已经shutdown。
③DiscardPolicy:不能执行的任务将删除;
④DiscardOldestPolicy:如果执行程序未关闭,位于队列头的任务被删除,然后重试执行程序(失败则重复此过程)。
以上的线程池关键要素都会在构造方法中初始化,并根据初始化配置的不同,将产生不同特点的线程池。比如
FixedThreadPool,CachedThreadPool,SingleThreadPool等。
2、执行任务
怎么执行提交到池中的任务是线程池的核心,是由execute方法实现的,执行的过程如下:
①判断核心线程是否都在执行任务,如果不是,则创建新线程来执行任务,如果核心线程都在执行任务,则进入②;
②判断工作队列是否已经满,如果不满,则把任务加入工作队列,如果已经满,则进入步骤③;
③判断线程池的线程是否都处于工作状态,如果没有,则创建一个新线程来执行任务,否则执行拒绝策略。
核心的execute代码如下:
1 |
public void execute(Runnable command) { |
从代码看出,可以分为以下情况:
①当前的线程数小于核心线程数,则可以继续创建新线程来执行任务;
②如果运行的线程多于corePoolSize,则把任务加入workQueue队列;
③如果无法将任务加入队列(队列已满),则创建新线程执行任务;
④如果创建新线程后的线程数大于maximunPoolSize,则创建失败,并通过拒绝策略拒绝执行新任务;
把任务交给worker线程处理,addThread方法实现了这个逻辑:
1 |
private Thread addThread(Runnable firstTask) { |
3、worker线程实现
worker线程是真正实现执行任务的线程,是通过内部类Worker来实现的,Worker通过在run方法中循环从
workQueue队列中获取任务来执行:
1 |
public void run() { |
run方法通过getTask()方法不断从阻塞队列中获取任务,getTask实现过程如下:
1 |
Runnable getTask() { |
Worker类中的runTask方法是实际执行任务的方法,其实现过程如下:
1 |
private void runTask(Runnable task) { |
可以看到,在任务执行的前后都可以插入一些代码,由子类来实现监控或其它功能。run方法退出前会执行workDown方法来维护线程池的一些状态信息:
1 |
void workerDone(Worker w) { |
至此,线程池执行任务的相关逻辑结束。
4、关闭线程池
关闭线程池可以通过shutdown()方法或shutdownNow方法来实现,在关闭前都需要进行安全检查,以防恶意代码执行。两个方法不同的地方在于中断worker线程不一样,shutdown调用的是interruptIfIdle方法,而shutdownNow比较粗暴,直接interrupt线程。但shutdownNow会把当前队列中的任务拿出来返回。
1 |
public void shutdown() { |
1 |
void interruptIfIdle() { |
5、并发控制
ThreadPoolExecutor在实现中使用到了ReentrantLock来进行并发控制。核心的表示线程数量的int变量都使用了volatile来保证它的可见性。在Worker实现中也用到了ReentrantLock来控制任务的执行。ThreadPoolExecutor的主要方法都是在lock的控制下进行,比如创建线程时,关闭线程池时。所以,为了避免锁带来的问题,在execute方法中,有这样一个判断:如果当前线程数大于核心线程时,将会把任务加入队列中,这个过程是不需要加锁的,这样就提升了线程池的性能。当线程池预热创建线程数达到核心线程数后都将执行此逻辑。
二、ThreadPoolExecutor的使用
1、创建线程池
一般不推荐直接通过构造函数来创建,应该通过Executors类的工厂方法来创建线程池。因为如果配置的参数不当,线程池的性能可能会受到很大影响。
创建线程池需要指定如下参数:
• corePoolSize: 核心线程数;可以通过prestartAllCoreThreads方法来预热创建核心线程;
• runnableTaskQueue: 任务队列,是阻塞队列blockingQueue的各种实现类。
• maximunPoolSize:池中允许的最大线程数。
• threadFactory: 线程工厂类
• RejectExecutionHandler:拒绝策略处理类。
2、提交任务
提交任务可以使用execute方法或者submit方法,后者会返回一个异步计算结果。
3、关闭线程池
可以通过调用shutdown或shutdownNow方法来关闭线程池。
4、根据任务来配置线程池
任务性质:CPU密集型 、IO密集型、混合型
任务的优先级:高、中、低
任务的执行时间:长、中、短
任务的依赖性:是否依赖外部资源,如数据库。
CPU密集型任务可以配置少些线程,IO密集型可以配置多些线程,优先级不同可以用PriorityBlockingQueue来作为工作队列。对于依赖外部资源的任务,因为空闲线程可能较多,可以配置多些线程来利用CPU资源 。