searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Java 线程池的监控

2023-09-26 03:44:26
12
0

背景

  • 目前线上底层线程池没有很好的监控,因此出现了任务阻塞得不到处理的情况,并且又不能定量分析,所以需要分析一下目前线上用到的线程池,并对其监控起来。

ForkJoinPool 线程池的理解

由于目前线上用到的ForkJoinPool 线程池,因此必须首先对该线程池有一定的理解。

先看概念

  • 这里不再谈什么分而治之的思想,大多数程序员都应该知道这个,forkJoinPool 再牛逼,他也是一种线程池,因为见下图:
  • 但是jdk 肯定不会重复造轮子,他肯定有自己的独特之处,先简单分析一下里面的类。

ForkJoinTask

  • 抽象类
  • 有两个很重要的抽象子类
public abstract class RecursiveAction extends ForkJoinTask<Void>{
    ...
  /**
   * The main computation performed by this task.
   */
  protected abstract void compute();
  ...
}

public abstract class RecursiveTask<V> extends ForkJoinTask<V>{
    ...
  protected abstract V compute();
  ...
}
  • 很容易观察到,这两个子类有一个可以获取返回结果,一个不可以。

实现小 demo (Fibonacci 数列)

package com.tencent.egw.om;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.RecursiveTask;

/**
 * @author lucxszhang@tencent.com
 * @date 2022/2/4 3:58 下午
 */
public class ForkJoinDemo {

    public static void main(String[] args) {
        int n = 20;

        // 为了追踪子线程名称,需要重写 ForkJoinWorkerThreadFactory 的方法
        final ForkJoinPool.ForkJoinWorkerThreadFactory factory = pool -> {
            final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
            worker.setName("my-thread" + worker.getPoolIndex());
            return worker;
        };

        //创建分治任务线程池,可以追踪到线程名称
        ForkJoinPool forkJoinPool = new ForkJoinPool(4, factory, null, false);
        //创建分治任务
        Fibonacci fibonacci = new Fibonacci(n);

        //调用 invoke 方法启动分治任务
        Integer result = forkJoinPool.invoke(fibonacci);
        System.out.println(result);
    }
}

class Fibonacci extends RecursiveTask<Integer> {

    final int n;

    Fibonacci(int n) {
        this.n = n;
    }

    @Override
    public Integer compute() {
        //和递归类似,定义可计算的最小单元
        if (n <= 1) {
            return n;
        }
        // 想查看子线程名称输出的可以打开下面注释
        //log.info(Thread.currentThread().getName());

        Fibonacci f1 = new Fibonacci(n - 1);
        // 拆分成子任务
        f1.fork();
        Fibonacci f2 = new Fibonacci(n - 2);
        // f1.join 等待子任务执行结果
        f2.fork();
        return f2.join() + f1.join();
    }
}

原理解释

  • 通过上面的小例子,forkJoinPool 基本使用应该没问题了。

  • ThreadPoolExecutor 只有一个队列,比较难处理父子任务依赖层级非常深的场景。例如父任务等待子任务的结果,子任务又不能很快获取到结果。

  • 这个时候就forkJoinPool 就可以发挥其优势了。

  • 有多个任务队列,所以在 ForkJoinPool 中就有一个数组形式的成员变量 WorkQueue[]。那问题又来了(下面的原理很重要,会让你醍醐灌顶,彻底明白forkJoinPool的工作原理)

  • 任务队列有多个,提交的任务放到哪个队列中呢?(上图中的 Router Rule 部分)
    这就需要一套路由规则,从上面的代码 Demo 中可以理解,提交的任务主要有两种:

    • 有外部直接提交的(submission task)

    • 也有任务自己 fork 出来的(worker task)

    • 为了进一步区分这两种 task,Doug Lea 就设计一个简单的路由规则:

      • 将 submission task 放到 WorkQueue 数组的「偶数」下标中
      • 将 worker task 放在 WorkQueue 的「奇数」下标中,并且只有奇数下标才有线程( worker )与之相对
    • WorkQueue 就设计成了一个双端队列:

      • 支持 LIFO(last-in-first-out) 的push(放)和pop(拿)操作——操作 top 端支持
      • FIFO (first-in-first-out) 的 poll (拿)操作——操作 base 端线程(worker)操作自己的 WorkQueue 默认是 LIFO 操作(可选FIFO),当线程(worker)尝试窃取其他 WorkQueue 里的任务时,这个时候执行的是FIFO操作,即从 base 端窃取,用图丰富一下就是这样滴

如何实现forkJoinPool的监控:

回到最初的起点,各种监控api

  • getPoolSize(): 此方法返回 int 值,它是ForkJoinPool内部线程池的worker线程们的数量。

  • getParallelism(): 此方法返回池的并行的级别。

  • getActiveThreadCount(): 此方法返回当前执行任务的线程的数量。

  • getRunningThreadCount():此方法返回没有被任何同步机制阻塞的正在工作的线程。

    • 统计了workQueue数据的和
  • getQueuedSubmissionCount(): 此方法返回已经提交给池还没有开始他们的执行的任务数。

  • getQueuedTaskCount(): 此方法返回已经提交给池已经开始他们的执行的任务数。

  • hasQueuedSubmissions(): 此方法返回 Boolean 值,表明这个池是否有queued任务还没有开始他们的执行。

  • getStealCount(): 此方法返回 long 值,worker 线程已经从另一个线程偷取到的时间数。

  • isTerminated(): 此方法返回 Boolean 值,表明 fork/join 池是否已经完成执行。

玩起来,在线程池执行完之前,打印

package com.tencent.egw.om;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;

/**
 * @author lucxszhang@tencent.com
 * @date 2022/2/4 3:58 下午
 */
public class ForkJoinDemo {

    public static void main(String[] args) throws InterruptedException {
        int n = 20;
        // 为了追踪子线程名称,需要重写 ForkJoinWorkerThreadFactory 的方法
        final ForkJoinPool.ForkJoinWorkerThreadFactory factory = pool -> {
            final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
            worker.setName("my-thread" + worker.getPoolIndex());
            return worker;
        };

        //创建分治任务线程池,可以追踪到线程名称
        ForkJoinPool forkJoinPool = new ForkJoinPool(10, factory, null, false);
        //创建分治任务
        Fibonacci fibonacci = new Fibonacci(n);
        forkJoinPool.execute(fibonacci);
        //当任务还未结束执行,调用 showLog() 方法来把 ForkJoinPool 类的状态信息写入,然后让线程休眠一秒。
        while (!fibonacci.isDone()) {
            showLog(forkJoinPool);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //使用 shutdown() 方法关闭pool。
        forkJoinPool.shutdown();
        //使用 awaitTermination() 方法 等待pool的终结。
        forkJoinPool.awaitTermination(1, TimeUnit.DAYS);
        //调用 showLog() 方法写关于 ForkJoinPool 类状态的信息并写一条信息到操控台表明结束程序。
        showLog(forkJoinPool);
        System.out.printf("Main: End of the program.\n");
    }

    //实现 showLog() 方法。它接收 ForkJoinPool 对象作为参数和写关于线程和任务的执行的状态的信息。
    private static void showLog(ForkJoinPool pool) {
        System.out.printf("**********************\n");
        System.out.printf("Main: Fork/Join Pool log\n");
        System.out.printf("Main: Fork/Join Pool: Parallelism:%d\n",
                pool.getParallelism());
        System.out.printf("Main: Fork/Join Pool: Pool Size:%d\n",
                pool.getPoolSize());
        System.out.printf("Main: Fork/Join Pool: Active Thread Count:%d\n",
                pool.getActiveThreadCount());
        System.out.printf("Main: Fork/Join Pool: Running Thread Count:%d\n",
                pool.getRunningThreadCount());
        System.out.printf("Main: Fork/Join Pool: Queued Submission:%d\n",
                pool.getQueuedSubmissionCount());
        System.out.printf("Main: Fork/Join Pool: Queued Tasks:%d\n",
                pool.getQueuedTaskCount());
        System.out.printf("Main: Fork/Join Pool: Queued Submissions:%s\n",
                pool.hasQueuedSubmissions());
        System.out.printf("Main: Fork/Join Pool: Steal Count:%d\n",
                pool.getStealCount());
        System.out.printf("Main: Fork/Join Pool: Terminated :%s\n",
                pool.isTerminated());
        System.out.printf("**********************\n");
    }
}

class Fibonacci extends RecursiveTask<Integer> {

    final int n;

    Fibonacci(int n) {
        this.n = n;
    }

    @Override
    public Integer compute() {
        //和递归类似,定义可计算的最小单元
        if (n <= 1) {
            return n;
        }
        // 想查看子线程名称输出的可以打开下面注释
        //log.info(Thread.currentThread().getName());

        Fibonacci f1 = new Fibonacci(n - 1);
        // 拆分成子任务
        f1.fork();
        Fibonacci f2 = new Fibonacci(n - 2);
        // f1.join 等待子任务执行结果
        f2.fork();
        return f2.join() + f1.join();
    }
}

  • 执行结果

0条评论
0 / 1000
一个正经的博主
4文章数
0粉丝数
一个正经的博主
4 文章 | 0 粉丝
一个正经的博主
4文章数
0粉丝数
一个正经的博主
4 文章 | 0 粉丝
原创

Java 线程池的监控

2023-09-26 03:44:26
12
0

背景

  • 目前线上底层线程池没有很好的监控,因此出现了任务阻塞得不到处理的情况,并且又不能定量分析,所以需要分析一下目前线上用到的线程池,并对其监控起来。

ForkJoinPool 线程池的理解

由于目前线上用到的ForkJoinPool 线程池,因此必须首先对该线程池有一定的理解。

先看概念

  • 这里不再谈什么分而治之的思想,大多数程序员都应该知道这个,forkJoinPool 再牛逼,他也是一种线程池,因为见下图:
  • 但是jdk 肯定不会重复造轮子,他肯定有自己的独特之处,先简单分析一下里面的类。

ForkJoinTask

  • 抽象类
  • 有两个很重要的抽象子类
public abstract class RecursiveAction extends ForkJoinTask<Void>{
    ...
  /**
   * The main computation performed by this task.
   */
  protected abstract void compute();
  ...
}

public abstract class RecursiveTask<V> extends ForkJoinTask<V>{
    ...
  protected abstract V compute();
  ...
}
  • 很容易观察到,这两个子类有一个可以获取返回结果,一个不可以。

实现小 demo (Fibonacci 数列)

package com.tencent.egw.om;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.RecursiveTask;

/**
 * @author lucxszhang@tencent.com
 * @date 2022/2/4 3:58 下午
 */
public class ForkJoinDemo {

    public static void main(String[] args) {
        int n = 20;

        // 为了追踪子线程名称,需要重写 ForkJoinWorkerThreadFactory 的方法
        final ForkJoinPool.ForkJoinWorkerThreadFactory factory = pool -> {
            final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
            worker.setName("my-thread" + worker.getPoolIndex());
            return worker;
        };

        //创建分治任务线程池,可以追踪到线程名称
        ForkJoinPool forkJoinPool = new ForkJoinPool(4, factory, null, false);
        //创建分治任务
        Fibonacci fibonacci = new Fibonacci(n);

        //调用 invoke 方法启动分治任务
        Integer result = forkJoinPool.invoke(fibonacci);
        System.out.println(result);
    }
}

class Fibonacci extends RecursiveTask<Integer> {

    final int n;

    Fibonacci(int n) {
        this.n = n;
    }

    @Override
    public Integer compute() {
        //和递归类似,定义可计算的最小单元
        if (n <= 1) {
            return n;
        }
        // 想查看子线程名称输出的可以打开下面注释
        //log.info(Thread.currentThread().getName());

        Fibonacci f1 = new Fibonacci(n - 1);
        // 拆分成子任务
        f1.fork();
        Fibonacci f2 = new Fibonacci(n - 2);
        // f1.join 等待子任务执行结果
        f2.fork();
        return f2.join() + f1.join();
    }
}

原理解释

  • 通过上面的小例子,forkJoinPool 基本使用应该没问题了。

  • ThreadPoolExecutor 只有一个队列,比较难处理父子任务依赖层级非常深的场景。例如父任务等待子任务的结果,子任务又不能很快获取到结果。

  • 这个时候就forkJoinPool 就可以发挥其优势了。

  • 有多个任务队列,所以在 ForkJoinPool 中就有一个数组形式的成员变量 WorkQueue[]。那问题又来了(下面的原理很重要,会让你醍醐灌顶,彻底明白forkJoinPool的工作原理)

  • 任务队列有多个,提交的任务放到哪个队列中呢?(上图中的 Router Rule 部分)
    这就需要一套路由规则,从上面的代码 Demo 中可以理解,提交的任务主要有两种:

    • 有外部直接提交的(submission task)

    • 也有任务自己 fork 出来的(worker task)

    • 为了进一步区分这两种 task,Doug Lea 就设计一个简单的路由规则:

      • 将 submission task 放到 WorkQueue 数组的「偶数」下标中
      • 将 worker task 放在 WorkQueue 的「奇数」下标中,并且只有奇数下标才有线程( worker )与之相对
    • WorkQueue 就设计成了一个双端队列:

      • 支持 LIFO(last-in-first-out) 的push(放)和pop(拿)操作——操作 top 端支持
      • FIFO (first-in-first-out) 的 poll (拿)操作——操作 base 端线程(worker)操作自己的 WorkQueue 默认是 LIFO 操作(可选FIFO),当线程(worker)尝试窃取其他 WorkQueue 里的任务时,这个时候执行的是FIFO操作,即从 base 端窃取,用图丰富一下就是这样滴

如何实现forkJoinPool的监控:

回到最初的起点,各种监控api

  • getPoolSize(): 此方法返回 int 值,它是ForkJoinPool内部线程池的worker线程们的数量。

  • getParallelism(): 此方法返回池的并行的级别。

  • getActiveThreadCount(): 此方法返回当前执行任务的线程的数量。

  • getRunningThreadCount():此方法返回没有被任何同步机制阻塞的正在工作的线程。

    • 统计了workQueue数据的和
  • getQueuedSubmissionCount(): 此方法返回已经提交给池还没有开始他们的执行的任务数。

  • getQueuedTaskCount(): 此方法返回已经提交给池已经开始他们的执行的任务数。

  • hasQueuedSubmissions(): 此方法返回 Boolean 值,表明这个池是否有queued任务还没有开始他们的执行。

  • getStealCount(): 此方法返回 long 值,worker 线程已经从另一个线程偷取到的时间数。

  • isTerminated(): 此方法返回 Boolean 值,表明 fork/join 池是否已经完成执行。

玩起来,在线程池执行完之前,打印

package com.tencent.egw.om;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;

/**
 * @author lucxszhang@tencent.com
 * @date 2022/2/4 3:58 下午
 */
public class ForkJoinDemo {

    public static void main(String[] args) throws InterruptedException {
        int n = 20;
        // 为了追踪子线程名称,需要重写 ForkJoinWorkerThreadFactory 的方法
        final ForkJoinPool.ForkJoinWorkerThreadFactory factory = pool -> {
            final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
            worker.setName("my-thread" + worker.getPoolIndex());
            return worker;
        };

        //创建分治任务线程池,可以追踪到线程名称
        ForkJoinPool forkJoinPool = new ForkJoinPool(10, factory, null, false);
        //创建分治任务
        Fibonacci fibonacci = new Fibonacci(n);
        forkJoinPool.execute(fibonacci);
        //当任务还未结束执行,调用 showLog() 方法来把 ForkJoinPool 类的状态信息写入,然后让线程休眠一秒。
        while (!fibonacci.isDone()) {
            showLog(forkJoinPool);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //使用 shutdown() 方法关闭pool。
        forkJoinPool.shutdown();
        //使用 awaitTermination() 方法 等待pool的终结。
        forkJoinPool.awaitTermination(1, TimeUnit.DAYS);
        //调用 showLog() 方法写关于 ForkJoinPool 类状态的信息并写一条信息到操控台表明结束程序。
        showLog(forkJoinPool);
        System.out.printf("Main: End of the program.\n");
    }

    //实现 showLog() 方法。它接收 ForkJoinPool 对象作为参数和写关于线程和任务的执行的状态的信息。
    private static void showLog(ForkJoinPool pool) {
        System.out.printf("**********************\n");
        System.out.printf("Main: Fork/Join Pool log\n");
        System.out.printf("Main: Fork/Join Pool: Parallelism:%d\n",
                pool.getParallelism());
        System.out.printf("Main: Fork/Join Pool: Pool Size:%d\n",
                pool.getPoolSize());
        System.out.printf("Main: Fork/Join Pool: Active Thread Count:%d\n",
                pool.getActiveThreadCount());
        System.out.printf("Main: Fork/Join Pool: Running Thread Count:%d\n",
                pool.getRunningThreadCount());
        System.out.printf("Main: Fork/Join Pool: Queued Submission:%d\n",
                pool.getQueuedSubmissionCount());
        System.out.printf("Main: Fork/Join Pool: Queued Tasks:%d\n",
                pool.getQueuedTaskCount());
        System.out.printf("Main: Fork/Join Pool: Queued Submissions:%s\n",
                pool.hasQueuedSubmissions());
        System.out.printf("Main: Fork/Join Pool: Steal Count:%d\n",
                pool.getStealCount());
        System.out.printf("Main: Fork/Join Pool: Terminated :%s\n",
                pool.isTerminated());
        System.out.printf("**********************\n");
    }
}

class Fibonacci extends RecursiveTask<Integer> {

    final int n;

    Fibonacci(int n) {
        this.n = n;
    }

    @Override
    public Integer compute() {
        //和递归类似,定义可计算的最小单元
        if (n <= 1) {
            return n;
        }
        // 想查看子线程名称输出的可以打开下面注释
        //log.info(Thread.currentThread().getName());

        Fibonacci f1 = new Fibonacci(n - 1);
        // 拆分成子任务
        f1.fork();
        Fibonacci f2 = new Fibonacci(n - 2);
        // f1.join 等待子任务执行结果
        f2.fork();
        return f2.join() + f1.join();
    }
}

  • 执行结果

文章来自个人专栏
java 知识整理
2 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0