searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

ThreadPoolExecutor实现分析

2023-08-01 12:32:49
5
0

ThreadPoolExecutor是一个ExecutorService,实现了Executor接口,是一个线程池服务的实现。通常通过Executors类的工厂方法来创建。

一、实现原理

1、关键要素

要理解线程池的实现原理,主要理解以下的关键要素:

• 线程池的状态:提供了线程池生命周期中所处的不同状态,分别为以下状态(分别用0,1,2,3表示):
①RUNNING: 可以接收新任务和可以处理队列中的任务;
②SHUTDOWN: 不接收新任务,可以继续处理队列中的任务;
③STOP: 不接收新任务,也不处理队列任务,中断在执行的任务;
④TERMINATE: 类似STOP,所有的线程都会停止。
• 线程数量控制参数:用于控制何时创建线程,何时不能创建线程和任务逻辑控制。
①corePoolSize: 池中核心线程的数量;
②maximunPoolSize: 池中最大的线程数量;
③poolSize: 当前池中的线程数量;
• 工作者线程和工作队列:工作者线程从工作队列中获取任务并执行,如果设置了超时参数,工作者线程如果没有任务时,会被回收。工作队列使用的都是阻塞BlockingQueue。
• 线程工厂:ThreadFactory,用于创建新线程,可以自定义线程的一些属性,比如名称等,如果在初始化是没有配置,就会使用默认的ThreadFactory。
• 保持活动的时间:keepAliveTime,当线程池中有多于corePoolSize的线程时,多出的线程空闲时间超过keepAliveTime时将会被清理。
• 拒绝策略:当线程池已经关闭或线程数量达到最大数或工作队列已经饱和时,将拒绝执行新的任务, 主要实现了以下4种拒绝策略:
①AbortPolicy:中断策略,默认的策略,拒绝任务并抛出运行时异常RejectedExecutionException;
②CallerRunsPolicy:使用调用者线程执行任务,除非线程池已经shutdown。
③DiscardPolicy:不能执行的任务将删除;
④DiscardOldestPolicy:如果执行程序未关闭,位于队列头的任务被删除,然后重试执行程序(失败则重复此过程)。

以上的线程池关键要素都会在构造方法中初始化,并根据初始化配置的不同,将产生不同特点的线程池。比如
FixedThreadPool,CachedThreadPool,SingleThreadPool等。

2、执行任务

怎么执行提交到池中的任务是线程池的核心,是由execute方法实现的,执行的过程如下:
①判断核心线程是否都在执行任务,如果不是,则创建新线程来执行任务,如果核心线程都在执行任务,则进入②;
②判断工作队列是否已经满,如果不满,则把任务加入工作队列,如果已经满,则进入步骤③;
③判断线程池的线程是否都处于工作状态,如果没有,则创建一个新线程来执行任务,否则执行拒绝策略。

核心的execute代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//如果核心线程数小于当前池中的线程数,则创建新线程执行任务
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
//如果当前池中的线程数大于等于核心线程数,或者创建线程失败,则加入工作队列
if (runState == RUNNING && workQueue.offer(command)) {
//如果线程池不再运行或为第一个核心线程执行时
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
//如果线程池不在运行状态或者加入工作队列失败
//则尝试在最大线程数下创建新线程执行任务
else if (!addIfUnderMaximumPoolSize(command))
//再失败,则拒绝任务
reject(command); // is shutdown or saturated
}
}

从代码看出,可以分为以下情况:
①当前的线程数小于核心线程数,则可以继续创建新线程来执行任务;
②如果运行的线程多于corePoolSize,则把任务加入workQueue队列;
③如果无法将任务加入队列(队列已满),则创建新线程执行任务;
④如果创建新线程后的线程数大于maximunPoolSize,则创建失败,并通过拒绝策略拒绝执行新任务;

把任务交给worker线程处理,addThread方法实现了这个逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Thread addThread(Runnable firstTask) {
//构造一个worker任务
Worker w = new Worker(firstTask);
//产生一个新线程,把woker封装进去
Thread t = threadFactory.newThread(w);
if (t != null) {
w.thread = t;
workers.add(w);
int nt = ++poolSize;
if (nt > largestPoolSize)
largestPoolSize = nt;
}
return t;
}

3、worker线程实现

worker线程是真正实现执行任务的线程,是通过内部类Worker来实现的,Worker通过在run方法中循环从

workQueue队列中获取任务来执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
//从阻塞队列中获取任务执行
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}

run方法通过getTask()方法不断从阻塞队列中获取任务,getTask实现过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
Runnable getTask() {
for (;;) {
try {
int state = runState;
//线程池不在运行状态
if (state > SHUTDOWN)
return null;
Runnable r;
//关闭状态,把任务poll出
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
//超时版本的poll
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
//带有阻塞的take,如果队列为空则阻塞
r = workQueue.take();
if (r != null)
return r;
//如果worker线程能够退出
if (workerCanExit()) {
if (runState >= SHUTDOWN) // Wake up others
//清理空闲的worker
interruptIdleWorkers();
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}

Worker类中的runTask方法是实际执行任务的方法,其实现过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void runTask(Runnable task) {
final ReentrantLock runLock = this.runLock;
//加锁
runLock.lock();
try {
//检查线程池状态和线程状态
if (runState < STOP &&
Thread.interrupted() &&
runState >= STOP)
thread.interrupt();
boolean ran = false;
//执行之前可以插入的操作
beforeExecute(thread, task);
try {
task.run();
ran = true;
//执行之后
afterExecute(task, null);
++completedTasks;
} catch (RuntimeException ex) {
if (!ran)
afterExecute(task, ex);
throw ex;
}
} finally {
runLock.unlock();
}
}

可以看到,在任务执行的前后都可以插入一些代码,由子类来实现监控或其它功能。run方法退出前会执行workDown方法来维护线程池的一些状态信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void workerDone(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//增加完成的任务计数
completedTaskCount += w.completedTasks;
//从worker队列移出
workers.remove(w);
//如果当前的线程数为0,则尝试停止线程池
if (--poolSize == 0)
tryTerminate();
} finally {
mainLock.unlock();
}
}

至此,线程池执行任务的相关逻辑结束。

4、关闭线程池

关闭线程池可以通过shutdown()方法或shutdownNow方法来实现,在关闭前都需要进行安全检查,以防恶意代码执行。两个方法不同的地方在于中断worker线程不一样,shutdown调用的是interruptIfIdle方法,而shutdownNow比较粗暴,直接interrupt线程。但shutdownNow会把当前队列中的任务拿出来返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public void shutdown() { 

SecurityManager security = System.getSecurityManager();
//检查是否有权限
if (security != null)
security.checkPermission(shutdownPerm);

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//再次检查调用者是否有权限修改线程
if (security != null) { // Check if caller can modify our threads
for (Worker w : workers)
security.checkAccess(w.thread);
}
//修改线程池状态
int state = runState;
if (state < SHUTDOWN)
runState = SHUTDOWN;

try {
//中断所有空闲worker线程
for (Worker w : workers) {
w.interruptIfIdle();
}
} catch (SecurityException se) { // Try to back out
runState = state;
// tryTerminate() here would be a no-op
throw se;
}
//关闭线程池
tryTerminate(); // Terminate now if pool and queue empty
} finally {
mainLock.unlock();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
void interruptIfIdle() {
final ReentrantLock runLock = this.runLock;
//尝试加锁
if (runLock.tryLock()) {
try {
//如果worker中的线程不是当前调用线程
if (thread != Thread.currentThread())
thread.interrupt();
} finally {
runLock.unlock();
}
}
}

5、并发控制

ThreadPoolExecutor在实现中使用到了ReentrantLock来进行并发控制。核心的表示线程数量的int变量都使用了volatile来保证它的可见性。在Worker实现中也用到了ReentrantLock来控制任务的执行。ThreadPoolExecutor的主要方法都是在lock的控制下进行,比如创建线程时,关闭线程池时。所以,为了避免锁带来的问题,在execute方法中,有这样一个判断:如果当前线程数大于核心线程时,将会把任务加入队列中,这个过程是不需要加锁的,这样就提升了线程池的性能。当线程池预热创建线程数达到核心线程数后都将执行此逻辑。

二、ThreadPoolExecutor的使用

1、创建线程池

一般不推荐直接通过构造函数来创建,应该通过Executors类的工厂方法来创建线程池。因为如果配置的参数不当,线程池的性能可能会受到很大影响。
创建线程池需要指定如下参数:
• corePoolSize: 核心线程数;可以通过prestartAllCoreThreads方法来预热创建核心线程;
• runnableTaskQueue: 任务队列,是阻塞队列blockingQueue的各种实现类。
• maximunPoolSize:池中允许的最大线程数。
• threadFactory: 线程工厂类
• RejectExecutionHandler:拒绝策略处理类。

2、提交任务

提交任务可以使用execute方法或者submit方法,后者会返回一个异步计算结果。

3、关闭线程池

可以通过调用shutdown或shutdownNow方法来关闭线程池。

4、根据任务来配置线程池

任务性质:CPU密集型 、IO密集型、混合型
任务的优先级:高、中、低
任务的执行时间:长、中、短
任务的依赖性:是否依赖外部资源,如数据库。

CPU密集型任务可以配置少些线程,IO密集型可以配置多些线程,优先级不同可以用PriorityBlockingQueue来作为工作队列。对于依赖外部资源的任务,因为空闲线程可能较多,可以配置多些线程来利用CPU资源 。

0条评论
作者已关闭评论
chuoo
13文章数
0粉丝数
chuoo
13 文章 | 0 粉丝
原创

ThreadPoolExecutor实现分析

2023-08-01 12:32:49
5
0

ThreadPoolExecutor是一个ExecutorService,实现了Executor接口,是一个线程池服务的实现。通常通过Executors类的工厂方法来创建。

一、实现原理

1、关键要素

要理解线程池的实现原理,主要理解以下的关键要素:

• 线程池的状态:提供了线程池生命周期中所处的不同状态,分别为以下状态(分别用0,1,2,3表示):
①RUNNING: 可以接收新任务和可以处理队列中的任务;
②SHUTDOWN: 不接收新任务,可以继续处理队列中的任务;
③STOP: 不接收新任务,也不处理队列任务,中断在执行的任务;
④TERMINATE: 类似STOP,所有的线程都会停止。
• 线程数量控制参数:用于控制何时创建线程,何时不能创建线程和任务逻辑控制。
①corePoolSize: 池中核心线程的数量;
②maximunPoolSize: 池中最大的线程数量;
③poolSize: 当前池中的线程数量;
• 工作者线程和工作队列:工作者线程从工作队列中获取任务并执行,如果设置了超时参数,工作者线程如果没有任务时,会被回收。工作队列使用的都是阻塞BlockingQueue。
• 线程工厂:ThreadFactory,用于创建新线程,可以自定义线程的一些属性,比如名称等,如果在初始化是没有配置,就会使用默认的ThreadFactory。
• 保持活动的时间:keepAliveTime,当线程池中有多于corePoolSize的线程时,多出的线程空闲时间超过keepAliveTime时将会被清理。
• 拒绝策略:当线程池已经关闭或线程数量达到最大数或工作队列已经饱和时,将拒绝执行新的任务, 主要实现了以下4种拒绝策略:
①AbortPolicy:中断策略,默认的策略,拒绝任务并抛出运行时异常RejectedExecutionException;
②CallerRunsPolicy:使用调用者线程执行任务,除非线程池已经shutdown。
③DiscardPolicy:不能执行的任务将删除;
④DiscardOldestPolicy:如果执行程序未关闭,位于队列头的任务被删除,然后重试执行程序(失败则重复此过程)。

以上的线程池关键要素都会在构造方法中初始化,并根据初始化配置的不同,将产生不同特点的线程池。比如
FixedThreadPool,CachedThreadPool,SingleThreadPool等。

2、执行任务

怎么执行提交到池中的任务是线程池的核心,是由execute方法实现的,执行的过程如下:
①判断核心线程是否都在执行任务,如果不是,则创建新线程来执行任务,如果核心线程都在执行任务,则进入②;
②判断工作队列是否已经满,如果不满,则把任务加入工作队列,如果已经满,则进入步骤③;
③判断线程池的线程是否都处于工作状态,如果没有,则创建一个新线程来执行任务,否则执行拒绝策略。

核心的execute代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//如果核心线程数小于当前池中的线程数,则创建新线程执行任务
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
//如果当前池中的线程数大于等于核心线程数,或者创建线程失败,则加入工作队列
if (runState == RUNNING && workQueue.offer(command)) {
//如果线程池不再运行或为第一个核心线程执行时
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
//如果线程池不在运行状态或者加入工作队列失败
//则尝试在最大线程数下创建新线程执行任务
else if (!addIfUnderMaximumPoolSize(command))
//再失败,则拒绝任务
reject(command); // is shutdown or saturated
}
}

从代码看出,可以分为以下情况:
①当前的线程数小于核心线程数,则可以继续创建新线程来执行任务;
②如果运行的线程多于corePoolSize,则把任务加入workQueue队列;
③如果无法将任务加入队列(队列已满),则创建新线程执行任务;
④如果创建新线程后的线程数大于maximunPoolSize,则创建失败,并通过拒绝策略拒绝执行新任务;

把任务交给worker线程处理,addThread方法实现了这个逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Thread addThread(Runnable firstTask) {
//构造一个worker任务
Worker w = new Worker(firstTask);
//产生一个新线程,把woker封装进去
Thread t = threadFactory.newThread(w);
if (t != null) {
w.thread = t;
workers.add(w);
int nt = ++poolSize;
if (nt > largestPoolSize)
largestPoolSize = nt;
}
return t;
}

3、worker线程实现

worker线程是真正实现执行任务的线程,是通过内部类Worker来实现的,Worker通过在run方法中循环从

workQueue队列中获取任务来执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
//从阻塞队列中获取任务执行
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}

run方法通过getTask()方法不断从阻塞队列中获取任务,getTask实现过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
Runnable getTask() {
for (;;) {
try {
int state = runState;
//线程池不在运行状态
if (state > SHUTDOWN)
return null;
Runnable r;
//关闭状态,把任务poll出
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
//超时版本的poll
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
//带有阻塞的take,如果队列为空则阻塞
r = workQueue.take();
if (r != null)
return r;
//如果worker线程能够退出
if (workerCanExit()) {
if (runState >= SHUTDOWN) // Wake up others
//清理空闲的worker
interruptIdleWorkers();
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}

Worker类中的runTask方法是实际执行任务的方法,其实现过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void runTask(Runnable task) {
final ReentrantLock runLock = this.runLock;
//加锁
runLock.lock();
try {
//检查线程池状态和线程状态
if (runState < STOP &&
Thread.interrupted() &&
runState >= STOP)
thread.interrupt();
boolean ran = false;
//执行之前可以插入的操作
beforeExecute(thread, task);
try {
task.run();
ran = true;
//执行之后
afterExecute(task, null);
++completedTasks;
} catch (RuntimeException ex) {
if (!ran)
afterExecute(task, ex);
throw ex;
}
} finally {
runLock.unlock();
}
}

可以看到,在任务执行的前后都可以插入一些代码,由子类来实现监控或其它功能。run方法退出前会执行workDown方法来维护线程池的一些状态信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void workerDone(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//增加完成的任务计数
completedTaskCount += w.completedTasks;
//从worker队列移出
workers.remove(w);
//如果当前的线程数为0,则尝试停止线程池
if (--poolSize == 0)
tryTerminate();
} finally {
mainLock.unlock();
}
}

至此,线程池执行任务的相关逻辑结束。

4、关闭线程池

关闭线程池可以通过shutdown()方法或shutdownNow方法来实现,在关闭前都需要进行安全检查,以防恶意代码执行。两个方法不同的地方在于中断worker线程不一样,shutdown调用的是interruptIfIdle方法,而shutdownNow比较粗暴,直接interrupt线程。但shutdownNow会把当前队列中的任务拿出来返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public void shutdown() { 

SecurityManager security = System.getSecurityManager();
//检查是否有权限
if (security != null)
security.checkPermission(shutdownPerm);

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//再次检查调用者是否有权限修改线程
if (security != null) { // Check if caller can modify our threads
for (Worker w : workers)
security.checkAccess(w.thread);
}
//修改线程池状态
int state = runState;
if (state < SHUTDOWN)
runState = SHUTDOWN;

try {
//中断所有空闲worker线程
for (Worker w : workers) {
w.interruptIfIdle();
}
} catch (SecurityException se) { // Try to back out
runState = state;
// tryTerminate() here would be a no-op
throw se;
}
//关闭线程池
tryTerminate(); // Terminate now if pool and queue empty
} finally {
mainLock.unlock();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
void interruptIfIdle() {
final ReentrantLock runLock = this.runLock;
//尝试加锁
if (runLock.tryLock()) {
try {
//如果worker中的线程不是当前调用线程
if (thread != Thread.currentThread())
thread.interrupt();
} finally {
runLock.unlock();
}
}
}

5、并发控制

ThreadPoolExecutor在实现中使用到了ReentrantLock来进行并发控制。核心的表示线程数量的int变量都使用了volatile来保证它的可见性。在Worker实现中也用到了ReentrantLock来控制任务的执行。ThreadPoolExecutor的主要方法都是在lock的控制下进行,比如创建线程时,关闭线程池时。所以,为了避免锁带来的问题,在execute方法中,有这样一个判断:如果当前线程数大于核心线程时,将会把任务加入队列中,这个过程是不需要加锁的,这样就提升了线程池的性能。当线程池预热创建线程数达到核心线程数后都将执行此逻辑。

二、ThreadPoolExecutor的使用

1、创建线程池

一般不推荐直接通过构造函数来创建,应该通过Executors类的工厂方法来创建线程池。因为如果配置的参数不当,线程池的性能可能会受到很大影响。
创建线程池需要指定如下参数:
• corePoolSize: 核心线程数;可以通过prestartAllCoreThreads方法来预热创建核心线程;
• runnableTaskQueue: 任务队列,是阻塞队列blockingQueue的各种实现类。
• maximunPoolSize:池中允许的最大线程数。
• threadFactory: 线程工厂类
• RejectExecutionHandler:拒绝策略处理类。

2、提交任务

提交任务可以使用execute方法或者submit方法,后者会返回一个异步计算结果。

3、关闭线程池

可以通过调用shutdown或shutdownNow方法来关闭线程池。

4、根据任务来配置线程池

任务性质:CPU密集型 、IO密集型、混合型
任务的优先级:高、中、低
任务的执行时间:长、中、短
任务的依赖性:是否依赖外部资源,如数据库。

CPU密集型任务可以配置少些线程,IO密集型可以配置多些线程,优先级不同可以用PriorityBlockingQueue来作为工作队列。对于依赖外部资源的任务,因为空闲线程可能较多,可以配置多些线程来利用CPU资源 。

文章来自个人专栏
容器
13 文章 | 1 订阅
0条评论
作者已关闭评论
作者已关闭评论
0
0