线程池可以用于解决单线程干某件事情比较慢的问题
AsyncConfigurer:通过实现AsyncConfigurer自定义线程池,包含异常处理
实现AsyncConfigurer接口对异常线程池更加细粒度的控制
步骤:*
a) 创建线程自己的线程池
b) 对void方法抛出的异常处理的类AsyncUncaughtExceptionHandler
demo
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.lang.reflect.Method;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author
* @version 1.0
* <p>
* 配置类实现AsyncConfigurer接口并重写getAsyncExecutor方法,并返回ThreadPoolTaskExecutor,
* 这样我们就获得了一个基于线程池TaskExecutor
*/
@Configuration
@ComponentScan("com.demo") //扫描一定要包含执行类
@EnableAsync
@PropertySource(value = "classpath:application.properties", ignoreResourceNotFound = true)//手动配置目录,解决property配置找不到的问题
public class ThreadConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//核心线程数
taskExecutor.setCorePoolSize(20);
//队列最大长度
taskExecutor.setQueueCapacity(1000);
//最大线程数
taskExecutor.setMaxPoolSize(50);
//线程池维护线程所允许的空闲时间(单位秒)
taskExecutor.setKeepAliveSeconds(10);
//设置名称
taskExecutor.setThreadNamePrefix("demo-thread");
/*
* 线程池对拒绝任务(无线程可用)的处理策略
* ThreadPoolExecutor.CallerRunsPolicy策略,调用者的线程会执行该任务,如果执行器已关闭,则丢弃.
*/
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
taskExecutor.initialize();
return taskExecutor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SpringAsyncExceptionHandler();
}
class SpringAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable throwable, Method method, Object... objects) {
System.out.println("线程池错误!" + throwable.getMessage());
}
}
}
线程池批量调用待执行任务对任务进行并发执行:
ExecutorService pool = Executors.newFixedThreadPool(webProcessInfo.getMaxThread());
List<Callable<Integer>> callers = new ArrayList<>();
callers.add(()->{
crawlParseService.getChildUrls(linkurl, projectId, batchId, baseUrl, webProcessInfo, uncheckedUrlSet);
return null;
});
//用线程池来执行callers中的批量任务
try {
pool.invokeAll(callers);
pool.shutdown();
}catch (InterruptedException e){
e.printStackTrace();
}