第一部分:概述
线程是比较贵重的资源, 线程池是利用对象复用的原理, 对线程资源加以有效利用
线程池分为以下五类:
singleThreadExecutor
fixedThreadPool
cachedThreadPool
scheduledThreadPool
workStealingPool(区别于上述四类,使用多CPU)
下面分别说明一下每个线程池的用途:
1. singleThreadExecutor 单线程线程池
使用无界阻塞队列 LinkedBlockingQueue
主要用于执行需要顺序执行的任务组, 所有任务按顺序执行
2. fixedThreadPool 固定大小线程池
使用无界阻塞队列 LinkedBlockingQueue
核心线程数和最大线程数一致, 从0慢慢增加到N, 即使线程空闲, 仍然不释放, 一直保持固定数量的线程数
3. cachedThreadPool
使用同步队列 SynchronousQueue
一般意义上的线程池, 线程随着任务数的增加, 达到上限后不再增加, 空闲线程被后续任务重复利用,
超过一定时间, 线程将被回收, 对于执行短生命周期的线程任务, 能够极大提交线程利用效率
4 scheduledThreadPool 定时线程池
使用 延时队列 DelayedWorkQueue
用于延迟,或者周期性的执行某项计划任务
5. workStealingPool
用于执行比较耗时的操作, 当一个线程队列的任务被执行完, 会尝试去拉取其他工作队列的任务来执行
充分利用CPU多核的并行计算概念
第二部分:源码解读
首先:推荐自定义构建线程池(Executors构建使用的都是无界队列, 在任务并发量比较大的情况下会造成OOM)
其实线程池的初始化大同小异, 都有的几个核心参数是 核心线程数, 最大线程数, 线程池队列
差异化主要体现在上面几个参数, 前两个没什么好说的,我们来说说线程池队列
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
//上述三个使用的都是ThreadPoolExecutor线程池执行器, 其中1,3使用的是无界链表阻塞队列
//2使用的是同步队列
//使用ScheduledThreadPoolExecutor 执行器(继承自ThreadPoolExecutor)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
//ForkJoinPool这个线程池比较特殊,直接继承自抽象执行器服务
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
对于最后一个可窃取线程池, 有兴趣的可以看这里
//空参数表示无界队列,不建议使用,有OOM风险
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
//指定容量的队列
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
线程池主要操作解读
submit方法
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
//对线程进行包装
RunnableFuture<Void> ftask = newTaskFor(task, null);
//执行线程
execute(ftask);
//返回线程的持有对象
return ftask;
}
execute核心执行方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//工作线程数小于核心线程数,直接加入到工作线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//线程池处于可执行状态,尝试将任务加入任务队列等待执行
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//加入队列失败, 尝试直接执行(如果有空闲线程的话)
else if (!addWorker(command, false))
//无空闲线程,执行拒绝策略
reject(command);
}
reject方法(拒绝策略执行)
final void reject(Runnable command) {
//根据拒绝设定的策略, 执行拒绝操作
handler.rejectedExecution(command, this);
}
到这里就得讲讲拒绝策略,总共分4种
1.AbortPolicy 直接终止策略,直接停止抛出异常
2.DiscardPolicy 丢弃策略, 静默丢弃,无任何反馈
3. DiscardOldestPolicy, 丢弃最早任务策略, 丢弃一个最早加入队列的任务,并尝试将当前任务重新执行
4. CallerRunsPolicy 直接执行策略, 只要线程池未关闭, 直接运行当前线程
后记: 拒绝策略类位于ThreadPoolExecutor.java文件当中, 非常简单, 看一下就知道了
至于使用那种策略, 可以根据实际情况选择