内容概要
CompletionService的优点在于能够解耦任务提交与结果获取,有效的整合线程池与阻塞队列,实现任务完成顺序的处理,提升系统吞吐量,它简化了多线程编程的复杂性,使开发者能够更专注于业务逻辑,而不必过多关注线程管理细节。
核心概念
CompletionService
是一个结合了Executor
和BlockingQueue
功能的服务,它主要用于解决异步任务执行中的两个问题:
- 任务管理和结果收集:当有一组并行或异步执行的任务,并且想要以它们完成的顺序(而不是启动的顺序)收集结果时,
CompletionService
就非常适合,它提交任务给Executor
去执行,并在任务完成时,将结果放入一个阻塞队列中,这样可以从队列中取出已完成的任务结果,而不需要知道具体是哪个任务先完成。 - 提高资源利用率:在传统的多线程任务执行中,如果某些任务执行得比其他任务快得多,那么等待所有任务完成可能会浪费资源(如CPU的时间),使用
CompletionService
,可以立即处理已完成的任务,而不必等待其他较慢的任务,这种即时的任务处理方式可以减少资源空闲时间,提高整体系统的吞吐量和响应速度。
CompletionService
主要用来解决异步任务执行中的任务管理和结果收集问题,以及通过优化任务处理顺序来提高资源利用率,这使得它非常适合于处理大量并行任务,并且需要按照完成顺序处理结果的场景。
官方文档:https:///jdk11/api/java.base/java/util/concurrent/CompletionService.html
代码案例
import java.util.concurrent.*;
// 任务类,实现了Callable接口,用于异步计算
class ComputationTask implements Callable<Integer> {
private final int number;
public ComputationTask(int number) {
this.number = number;
}
@Override
public Integer call() throws Exception {
// 模拟耗时计算,比如计算一个数的平方
int result = number * number;
// 让线程睡眠一段时间来模拟不同的任务执行时间
Thread.sleep(number * 100);
return result;
}
}
public class CompletionServiceDemo {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 创建一个固定大小的线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
// 创建一个CompletionService,用于管理异步任务的完成
CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);
// 提交一些异步任务到CompletionService
for (int i = 1; i <= 5; i++) {
completionService.submit(new ComputationTask(i));
}
// 从CompletionService中获取并处理已完成的任务结果
for (int i = 0; i < 5; i++) {
// take() 方法会阻塞,直到有任务完成
Future<Integer> future = completionService.take();
// 获取任务的结果
Integer result = future.get();
// 输出结果
System.out.println("Result: " + result);
}
// 关闭线程池
executor.shutdown();
// 等待所有任务完成
executor.awaitTermination(1, TimeUnit.MINUTES);
}
}
在上面代码中,ComputationTask
是一个简单的任务类,它实现Callable
接口来计算一个数的平方,并通过Thread.sleep
模拟了不同的任务执行时间。
在main
方法中,创建了一个固定大小的线程池和一个CompletionService
,然后,我们提交了5个ComputationTask
到CompletionService
,接着,使用一个循环来从CompletionService
中获取已完成的任务,并输出它们的结果,注意,completionService.take()
会阻塞直到有一个任务完成。
运行代码会输出如下内容:
Result: 1
Result: 4
Result: 9
Result: 16
Result: 25
注意:CompletionService
确保了可以按照任务完成的顺序来处理结果,而不是按照任务提交的顺序。
核心API
CompletionService
结合了Executor
和BlockingQueue
的功能,用于处理异步任务的执行和结果的收集,CompletionService
将生产新异步任务和消费已完成任务的结果分离开来,使得可以以任务完成的顺序而不是提交的顺序来获取结果,下面是CompletionService
中主要方法的含义:
1、Future<V> take() throws InterruptedException
:这个方法从完成队列中移除并返回已完成的Future
,如果当前没有已完成的Future
,则此方法会阻塞直到有结果可用。
2、Future<V> poll()
:这个方法尝试从完成队列中获取并移除一个已完成的Future
,如果当前没有已完成的Future
,则此方法会立即返回null
而不会阻塞。
3、Future<V> poll(long timeout, TimeUnit unit)
:这个方法尝试在给定的超时时间内从完成队列中获取并移除一个已完成的Future
,如果在超时时间内没有可用的结果,则返回null
。
4、void submit(Callable<T> task)
:提交一个Callable
任务用于执行,并返回一个表示该任务的未决结果的Future
,这个任务是由关联的Executor
来执行的。
5、Future<Void> submit(Runnable task, V result)
:提交一个Runnable
任务用于执行,并返回一个表示该任务的未决结果的Future
,这个方法还允许你为Runnable
任务提供一个结果,这个结果将在Future.get()
方法被调用时返回,这个任务是由关联的Executor
来执行的。
6、Future<Void> submit(Runnable task)
:提交一个Runnable
任务用于执行,并返回一个表示该任务的未决结果的Future
,因为这个任务是Runnable
,所以Future.get()
方法返回null
,这个任务是由关联的Executor
来执行的。
CompletionService
只是一个接口,本身并不直接实现这些方法,而是通过具体的实现类(如ExecutorCompletionService
)来提供这些方法的具体实现。ExecutorCompletionService
将Executor
和BlockingQueue
结合在一起,可以将Callable
或Runnable
任务提交给Executor
执行,并通过CompletionService
按照它们完成的顺序来检索结果。
核心总结
CompletionService结合了Executor和BlockingQueue的功能,可以异步地提交任务并获取它们的结果,它最大的优点就是任务完成顺序与提交顺序无关,先完成的任务可以先获取结果,这大大提高了处理效率。
此外,它还能很好地处理异常,确保我们在获取结果时不会因某个任务的异常而阻塞,但是,它在某些场景下可能略显复杂,例如,业务要求严格按照任务提交的顺序来获取结果,CompletionService就不那么适用了。
此外,它并不能直接取消已经提交但尚未开始执行的任务,这在某些需要灵活控制任务执行的场景中可能会带来不便。