背景
- 目前线上底层线程池没有很好的监控,因此出现了任务阻塞得不到处理的情况,并且又不能定量分析,所以需要分析一下目前线上用到的线程池,并对其监控起来。
ForkJoinPool 线程池的理解
由于目前线上用到的ForkJoinPool 线程池,因此必须首先对该线程池有一定的理解。
先看概念
- 这里不再谈什么分而治之的思想,大多数程序员都应该知道这个,forkJoinPool 再牛逼,他也是一种线程池,因为见下图:
- 但是jdk 肯定不会重复造轮子,他肯定有自己的独特之处,先简单分析一下里面的类。
ForkJoinTask
- 抽象类
- 有两个很重要的抽象子类
public abstract class RecursiveAction extends ForkJoinTask<Void>{
...
/**
* The main computation performed by this task.
*/
protected abstract void compute();
...
}
public abstract class RecursiveTask<V> extends ForkJoinTask<V>{
...
protected abstract V compute();
...
}
- 很容易观察到,这两个子类有一个可以获取返回结果,一个不可以。
实现小 demo (Fibonacci 数列)
package com.tencent.egw.om;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.RecursiveTask;
/**
* @author lucxszhang@tencent.com
* @date 2022/2/4 3:58 下午
*/
public class ForkJoinDemo {
public static void main(String[] args) {
int n = 20;
// 为了追踪子线程名称,需要重写 ForkJoinWorkerThreadFactory 的方法
final ForkJoinPool.ForkJoinWorkerThreadFactory factory = pool -> {
final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName("my-thread" + worker.getPoolIndex());
return worker;
};
//创建分治任务线程池,可以追踪到线程名称
ForkJoinPool forkJoinPool = new ForkJoinPool(4, factory, null, false);
//创建分治任务
Fibonacci fibonacci = new Fibonacci(n);
//调用 invoke 方法启动分治任务
Integer result = forkJoinPool.invoke(fibonacci);
System.out.println(result);
}
}
class Fibonacci extends RecursiveTask<Integer> {
final int n;
Fibonacci(int n) {
this.n = n;
}
@Override
public Integer compute() {
//和递归类似,定义可计算的最小单元
if (n <= 1) {
return n;
}
// 想查看子线程名称输出的可以打开下面注释
//log.info(Thread.currentThread().getName());
Fibonacci f1 = new Fibonacci(n - 1);
// 拆分成子任务
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
// f1.join 等待子任务执行结果
f2.fork();
return f2.join() + f1.join();
}
}
原理解释
-
通过上面的小例子,forkJoinPool 基本使用应该没问题了。
-
ThreadPoolExecutor 只有一个队列,比较难处理父子任务依赖层级非常深的场景。例如父任务等待子任务的结果,子任务又不能很快获取到结果。
-
这个时候就forkJoinPool 就可以发挥其优势了。
-
有多个任务队列,所以在 ForkJoinPool 中就有一个数组形式的成员变量 WorkQueue[]。那问题又来了(下面的原理很重要,会让你醍醐灌顶,彻底明白forkJoinPool的工作原理)
-
任务队列有多个,提交的任务放到哪个队列中呢?(上图中的 Router Rule 部分)
这就需要一套路由规则,从上面的代码 Demo 中可以理解,提交的任务主要有两种:-
有外部直接提交的(submission task)
-
也有任务自己 fork 出来的(worker task)
-
为了进一步区分这两种 task,Doug Lea 就设计一个简单的路由规则:
- 将 submission task 放到 WorkQueue 数组的「偶数」下标中
- 将 worker task 放在 WorkQueue 的「奇数」下标中,并且只有奇数下标才有线程( worker )与之相对
-
WorkQueue 就设计成了一个双端队列:
- 支持 LIFO(last-in-first-out) 的push(放)和pop(拿)操作——操作 top 端支持
- FIFO (first-in-first-out) 的 poll (拿)操作——操作 base 端线程(worker)操作自己的 WorkQueue 默认是 LIFO 操作(可选FIFO),当线程(worker)尝试窃取其他 WorkQueue 里的任务时,这个时候执行的是FIFO操作,即从 base 端窃取,用图丰富一下就是这样滴:
-
如何实现forkJoinPool的监控:
回到最初的起点,各种监控api
-
getPoolSize(): 此方法返回 int 值,它是ForkJoinPool内部线程池的worker线程们的数量。
-
getParallelism(): 此方法返回池的并行的级别。
-
getActiveThreadCount(): 此方法返回当前执行任务的线程的数量。
-
getRunningThreadCount():此方法返回没有被任何同步机制阻塞的正在工作的线程。
- 统计了workQueue数据的和
-
getQueuedSubmissionCount(): 此方法返回已经提交给池还没有开始他们的执行的任务数。
-
getQueuedTaskCount(): 此方法返回已经提交给池已经开始他们的执行的任务数。
-
hasQueuedSubmissions(): 此方法返回 Boolean 值,表明这个池是否有queued任务还没有开始他们的执行。
-
getStealCount(): 此方法返回 long 值,worker 线程已经从另一个线程偷取到的时间数。
-
isTerminated(): 此方法返回 Boolean 值,表明 fork/join 池是否已经完成执行。
玩起来,在线程池执行完之前,打印
package com.tencent.egw.om;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;
/**
* @author lucxszhang@tencent.com
* @date 2022/2/4 3:58 下午
*/
public class ForkJoinDemo {
public static void main(String[] args) throws InterruptedException {
int n = 20;
// 为了追踪子线程名称,需要重写 ForkJoinWorkerThreadFactory 的方法
final ForkJoinPool.ForkJoinWorkerThreadFactory factory = pool -> {
final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName("my-thread" + worker.getPoolIndex());
return worker;
};
//创建分治任务线程池,可以追踪到线程名称
ForkJoinPool forkJoinPool = new ForkJoinPool(10, factory, null, false);
//创建分治任务
Fibonacci fibonacci = new Fibonacci(n);
forkJoinPool.execute(fibonacci);
//当任务还未结束执行,调用 showLog() 方法来把 ForkJoinPool 类的状态信息写入,然后让线程休眠一秒。
while (!fibonacci.isDone()) {
showLog(forkJoinPool);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//使用 shutdown() 方法关闭pool。
forkJoinPool.shutdown();
//使用 awaitTermination() 方法 等待pool的终结。
forkJoinPool.awaitTermination(1, TimeUnit.DAYS);
//调用 showLog() 方法写关于 ForkJoinPool 类状态的信息并写一条信息到操控台表明结束程序。
showLog(forkJoinPool);
System.out.printf("Main: End of the program.\n");
}
//实现 showLog() 方法。它接收 ForkJoinPool 对象作为参数和写关于线程和任务的执行的状态的信息。
private static void showLog(ForkJoinPool pool) {
System.out.printf("**********************\n");
System.out.printf("Main: Fork/Join Pool log\n");
System.out.printf("Main: Fork/Join Pool: Parallelism:%d\n",
pool.getParallelism());
System.out.printf("Main: Fork/Join Pool: Pool Size:%d\n",
pool.getPoolSize());
System.out.printf("Main: Fork/Join Pool: Active Thread Count:%d\n",
pool.getActiveThreadCount());
System.out.printf("Main: Fork/Join Pool: Running Thread Count:%d\n",
pool.getRunningThreadCount());
System.out.printf("Main: Fork/Join Pool: Queued Submission:%d\n",
pool.getQueuedSubmissionCount());
System.out.printf("Main: Fork/Join Pool: Queued Tasks:%d\n",
pool.getQueuedTaskCount());
System.out.printf("Main: Fork/Join Pool: Queued Submissions:%s\n",
pool.hasQueuedSubmissions());
System.out.printf("Main: Fork/Join Pool: Steal Count:%d\n",
pool.getStealCount());
System.out.printf("Main: Fork/Join Pool: Terminated :%s\n",
pool.isTerminated());
System.out.printf("**********************\n");
}
}
class Fibonacci extends RecursiveTask<Integer> {
final int n;
Fibonacci(int n) {
this.n = n;
}
@Override
public Integer compute() {
//和递归类似,定义可计算的最小单元
if (n <= 1) {
return n;
}
// 想查看子线程名称输出的可以打开下面注释
//log.info(Thread.currentThread().getName());
Fibonacci f1 = new Fibonacci(n - 1);
// 拆分成子任务
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
// f1.join 等待子任务执行结果
f2.fork();
return f2.join() + f1.join();
}
}
- 执行结果