JAVA && Spring && SpringBoot2.x — 学习目录
我们在JDK中,可以使用ThreadPoolExecutor提供线程池服务,相关理论,可以在多线程——线程池ThreadPoolExecutor了解。但是SpringBoot提供了@Async
[鹅神可]
注解,帮助我们更方便的将业务逻辑提交到线程池中异步处理。
1. SpringBoot对线程池的自动装载
源代码:org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration
@Bean
@ConditionalOnMissingBean
public TaskExecutorBuilder taskExecutorBuilder() {
TaskExecutionProperties.Pool pool = this.properties.getPool();
TaskExecutorBuilder builder = new TaskExecutorBuilder();
builder = builder.queueCapacity(pool.getQueueCapacity());
builder = builder.corePoolSize(pool.getCoreSize());
builder = builder.maxPoolSize(pool.getMaxSize());
builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
builder = builder.keepAlive(pool.getKeepAlive());
builder = builder.threadNamePrefix(this.properties.getThreadNamePrefix());
builder = builder.customizers(this.taskExecutorCustomizers);
builder = builder.taskDecorator(this.taskDecorator.getIfUnique());
return builder;
}
@Lazy
@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })
@ConditionalOnMissingBean(Executor.class)
public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
return builder.build();
}
我们可以在配置文件中配置连接池的相关参数。
2. 自定义线程池
2.1 根据业务配置不同的线程池
我们不推荐一个项目配置一个线程池,这样若是某些业务出现异常时,会影响到整个项目的健壮性。故我们可以根据业务,为不同的业务配置不同参数的数据库连接池。
@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig {
@Bean
public Executor asyncServiceExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new VisiableThreadPoolTaskExecutor();
//核心线程数
threadPoolTaskExecutor.setCorePoolSize(5);
threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
//最大线程数
threadPoolTaskExecutor.setMaxPoolSize(5);
//配置队列大小
threadPoolTaskExecutor.setQueueCapacity(50);
//配置线程池前缀
threadPoolTaskExecutor.setThreadNamePrefix("async-service-");
//拒绝策略
// threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
threadPoolTaskExecutor.setRejectedExecutionHandler(new PrintingPolicy());
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
@Bean
public Executor customServiceExecutor(){
ThreadPoolTaskExecutor threadPoolTaskExecutor=new ThreadPoolTaskExecutor();
//线程核心数目
threadPoolTaskExecutor.setCorePoolSize(10);
threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
//最大线程数
threadPoolTaskExecutor.setMaxPoolSize(10);
//配置队列大小
threadPoolTaskExecutor.setQueueCapacity(50);
//配置线程池前缀
threadPoolTaskExecutor.setThreadNamePrefix("custom-service-");
//配置拒绝策略
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
//数据初始化
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
}
若是想在使用连接池的时候,打印出连接池的各项参数,应当如何设置:
@Slf4j
public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
//打印队列的详细信息
private void showThreadPoolInfo(String prefix){
ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
if(null==threadPoolExecutor){
return;
}
log.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
this.getThreadNamePrefix(),
prefix,
threadPoolExecutor.getTaskCount(),
threadPoolExecutor.getCompletedTaskCount(),
threadPoolExecutor.getActiveCount(),
threadPoolExecutor.getQueue().size());
}
@Override
public void execute(Runnable task) {
showThreadPoolInfo("1. do execute");
super.execute(task);
}
@Override
public void execute(Runnable task, long startTimeout) {
showThreadPoolInfo("2. do execute");
super.execute(task, startTimeout);
}
@Override
public Future<?> submit(Runnable task) {
showThreadPoolInfo("1. do submit");
return super.submit(task);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
showThreadPoolInfo("2. do submit");
return super.submit(task);
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
showThreadPoolInfo("1. do submitListenable");
return super.submitListenable(task);
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
showThreadPoolInfo("2. do submitListenable");
return super.submitListenable(task);
}
}
2.2 如何使用连接池
在业务方法中使用@Async注解,并且可以选择使用的连接池。来启动一个异步任务。
- 若是想获取到任务返回值,可创建Callable任务
//带返回值的任务
@Async("asyncServiceExecutor")
public Future<String> doTask1() throws InterruptedException{
log.info("Task1 started.");
long start = System.currentTimeMillis();
Thread.sleep(5000);
long end = System.currentTimeMillis();
log.info("Task1 finished, time elapsed: {} ms.", end-start);
return new AsyncResult<>("Task1 accomplished!");
}
@Async("customServiceExecutor")
public Future<String> doTask2() throws InterruptedException{
log.info("Task2 started.");
long start = System.currentTimeMillis();
Thread.sleep(3000);
long end = System.currentTimeMillis();
log.info("Task2 finished, time elapsed: {} ms.", end-start);
return new AsyncResult<>("Task2 accomplished!");
}
- 若是创建的Runnable的异步任务
//创建的是Runnable的任务
@Async("asyncServiceExecutor")
public void executeAsync() {
log.info("start executeAsync");
try{
Thread.sleep(1000);
}catch(Exception e){
e.printStackTrace();
}
log.info("end executeAsync");
}
2.3 如何获取任务的返回值
若是我们使用线程池,来并发的执行任务,首先需要考虑的是,如何等待最后一个任务执行完毕,对任务结果进行汇总处理。
方法一:使用自旋操作,等待任务结果返回。
@RequestMapping("/helloFuture")
@ResponseBody
public String helloFuture() {
try {
Future<String> future1 = serviceImpl.doTask1();
Future<String> future2 = serviceImpl.doTask2();
//自旋锁,停止等待
while (true) {
if (future1.isDone() && future2.isDone()) {
log.info("Task1 result:{}", future1.get());
log.info("Task2 result:{}", future2.get());
break;
}
Thread.sleep(1000);
}
log.info("All tasks finished.");
return "S";
} catch (InterruptedException e) {
log.error("错误信息1", e);
return "F";
} catch (ExecutionException e) {
log.error("错误信息2", e);
return "F";
}
}
方法二:使用CountDownLatch计数器
相关理论可以参考:多线程——CountDownLatch详解
("/helloFuture2")
public String helloFuture2() {
try {
CountDownLatch latch=new CountDownLatch(2);
Future<String> future1 = serviceImpl.doTask1(latch);
Future<String> future2 = serviceImpl.doTask2(latch);
//等待两个线程执行完毕
latch.await();
log.info("All tasks finished!");
String result1 = future1.get();
String result2 = future2.get();
log.info(result1+"--"+result2);
return "S";
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return "F";
}
每个任务执行完毕,只需要调用latch.countDown();使得计数器-1。
//带返回值的任务
@Async("asyncServiceExecutor")
public Future<String> doTask1(CountDownLatch latch) throws InterruptedException{
log.info("Task1 started.");
long start = System.currentTimeMillis();
Thread.sleep(5000);
long end = System.currentTimeMillis();
log.info("Task1 finished, time elapsed: {} ms.", end-start);
latch.countDown();
return new AsyncResult<>("Task1 accomplished!");
}
@Async("customServiceExecutor")
public Future<String> doTask2(CountDownLatch latch) throws InterruptedException{
log.info("Task2 started.");
long start = System.currentTimeMillis();
Thread.sleep(3000);
long end = System.currentTimeMillis();
log.info("Task2 finished, time elapsed: {} ms.", end-start);
latch.countDown();
return new AsyncResult<>("Task2 accomplished!");
}
方式三:使用Future的get方法的阻塞特性
("/helloFuture2")
public String helloFuture2() {
try {
List<Future<String>> tasks = new ArrayList<>();
List<String> results = new ArrayList<>();
tasks.add(serviceImpl.doTask1());
tasks.add(serviceImpl.doTask2());
//各个任务执行完毕
for (Future<String> task : tasks) {
//每个任务都会再在此阻塞。
results.add(task.get());
}
log.info("All tasks finished!");
log.info("执行结果:{}", JSON.toJSONString(results));
return "S";
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return "F";
}
2.4 Runnable异常处理
该配置可与线程池配置在一起,若异步线程抛出异常,会由该类打印。
@Configuration
public class ExecutorConfig implements AsyncConfigurer {
//配置异常处理机制
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex,method,params)->{
log.error("异步线程执行失败。方法:[{}],异常信息[{}] : ", method, ex.getMessage(),ex);
};
}
}
效果图:
2019-12-25 19:14:09.851 ERROR [] --- [async-service-1] c.g.Config.threadPool.ExecutorConfig : 异步线程执行失败。方法:[public void com.galax.bussiness.account.impl.AccountServiceImpl.getAccInfoByTime(java.lang.String,java.lang.String)],异常信息[/ by zero] :
java.lang.ArithmeticException: / by zero
at com.galax.bussiness.account.impl.AccountServiceImpl.getAccInfoByTime(AccountServiceImpl.java:308)
at com.galax.bussiness.account.impl.AccountServiceImpl$$FastClassBySpringCGLIB$$4e0db2a2.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:749)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:93)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
彩蛋——Future<T>使用lambda表达式
public void sendMail(Map<String, Object> model, String title, String templateName, String toMail, String[] ccMail, long timeout) throws Exception {
Future<String> submit;
submit = emailServiceExecutor.submit(() ->{
try {
return "s";
} catch (Exception e) {
return "F";
}
});
}
彩蛋——若自定义实现线程池,如何获取到各个任务的结果
若是我们自己实现线程池,可以使用
java.util.concurrent.AbstractExecutorService#invokeAll(java.util.Collection<? extends java.util.concurrent.Callable<T>>)
方法进行定时任务的批量处理,返回值其实是List<Future<T>>
,我们可以循环遍历该List,最终拿到各个任务的执行结果。
@Test
public void test() throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(9));
//需要执行的任务
List<Account> students= new ArrayList<>(5);
//将任务转换为Callable对象
List<Callable<Integer>> callables = new ArrayList<>();
//保存返回结果
List<Integer> results=new ArrayList<>();
//开启线程,lambda表达式
for (Student student : students) {
callables.add(()->{
//插入操作,并发执行
log.info(JSON.toJSONString(student ));
//表示异步操作
int save = serviceImpl.getStu(student);
//返回值
return save ;
});
}
//获取到所有任务的处理结果
List<Future<Integer>> futures = executor.invokeAll(callables);
//遍历每个任务的执行结果,每次future.get()只有在任务执行完毕后,才会继续循环操作,否则会阻塞,等待线程执行结束
for (Future<Integer> future:futures){
try {
results.add(future.get());
} catch (ExecutionException e) {
e.printStackTrace();
}
}
executor.shutdown(); //关闭线程池
log.info("数据执行完毕!{}",JSON.toJSONString(results));
}
推荐阅读
作者:小胖学编程
链接:https://www.jianshu.com/p/3d875dd9d5db
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
相关推荐
基于springboot 2.x+vue+mybatisPlus+Oauth2.0+mysql+redis 纯净的前后端分离后台管理系统源码.zip 基于springboot 2.x+vue+mybatisPlus+Oauth2.0+mysql+redis 纯净的前后端分离后台管理系统源码.zip 基于springboot...
基于`SpringBoot2.X + Lettuce`实现Redis集成和多库实时切换是一个非常实用的项目。以下是对该项目的一些重点说明: 1. **项目概述** 该项目的主要目标是在`SpringBoot`应用中集成`Lettuce`作为Redis客户端,实现对...
在本项目中,我们将探讨如何将Spring Boot 1.x与Elasticsearch 5.x进行整合,尽管Spring Boot 1.x官方文档中只提供了与Elasticsearch 2.x的集成指南。 首先,要整合Spring Boot 1.x和Elasticsearch 5.x,我们需要...
分享一套关于酒店管理系统开发的教程——基于SpringBoot3.x+Vue3.x整合从0到1一步一步实现酒店管理系统,学完本课程,您将收获:增加项目实战经验;学习SpringBoot项目应用中遇到各种问题;学会使用前后端分离开发! ...
一款 Java 语言基于 SpringBoot2.x、MybatisPlus、Vue2.x、ElementUI、MySQL等框架精心打造的一款前后端分离框架,致力于实现模块化、组件化、可插拔的前后端分离架构敏捷开发框架,可用于快速搭建前后端分离后台...
2. **WebFlux**:SpringBoot 2.x 引入了对Reactive编程的支持,通过WebFlux模块,可以构建非阻塞式的Web应用,提高系统性能和可伸缩性。 3. **Actuator增强**:Actuator是用于监控和管理SpringBoot应用的组件,在2....
工作流项目源码
**SpringBoot2.X详解** SpringBoot是由Pivotal团队提供的全新框架,其设计目标是用来简化新Spring应用的初始搭建以及开发过程。它集成了大量常用的第三方库配置,如Redis、MongoDB、JPA、RabbitMQ、Quartz等,只...
基于springboot 2.x+vue+mybatisPlus+Oauth2.0+mysql+redis 纯净的前后端分离后台管理系统源码.zip基于springboot 2.x+vue+mybatisPlus+Oauth2.0+mysql+redis 纯净的前后端分离后台管理系统源码.zip基于springboot 2...
基于Vue2.x和SpringBoot2.x的追风考试系统设计源码,该项目包含214个文件,主要文件类型有95个java源文件,以及25个png图像文件。此外,还包括25个javascript文件,23个vue前端文件,以及17个scss样式文件。该项目是...
本文介绍了消息队列的基本概念和作用,重点讲解了ActiveMQ5.x的主要特性和SpringBoot2.x中如何整合及实现点对点消息的实战案例。通过对这些知识点的理解和实践,开发者能够更好地理解和应用消息队列技术,提升系统...
基于Java+Springboot2.x+MyBatis-Plus+MySQL 8.x+Vue的培训机构(高中)排课系统源码+项目说明.zip基于Java+Springboot2.x+MyBatis-Plus+MySQL 8.x+Vue的培训机构(高中)排课系统源码+项目说明.zip基于Java+...
本项目聚焦于SpringBoot 2.x版本,通过整合多种常见的开发工具,旨在提升开发效率和系统的整体性能。让我们详细了解一下如何在SpringBoot 2.x中集成Redis、MyBatisPlus和RocketMQ。 首先,**Redis** 是一个高性能的...
网上购买的SpringBoot2.x整合微信支付在线教育网站项目实战
SpringBoot 2.x 整合 Prometheus+Grafana 实践指南 在本文中,我们将探讨如何将 SpringBoot 2.x 与 Prometheus 和 Grafana 集成,以实现应用程序的监控和可视化。 Prometheus 介绍 Prometheus 是一个开源的系统...
本主题将深入探讨如何在SpringBoot 2.x框架中集成LCN(Local Transaction Coordinator for MySQL)分布式事务管理器,以解决在升级到SpringBoot 2.x版本后可能出现的兼容性问题。 LCN是一个专为MySQL设计的轻量级...
本源码为基于SpringBoot2.x的Free-Fs开源文件管理系统设计,共包含240个文件,其中gif文件75个,java文件57个,js文件34个,png文件29个,css文件10个,html文件8个,yml文件4个,xml文件3个,eot文件2个,svg文件2...
SpringBoot2.x版本在1.x的基础上进行了诸多改进和优化,增强了性能和易用性。下面将详细探讨SpringBoot2.x的核心概念、关键特性以及源码分析。 一、核心概念 1. 自动配置:SpringBoot的一大特色就是自动配置。它...
微信点餐系统-SpringBoot开发.zip微信点餐系统-SpringBoot开发.zip微信点餐系统-SpringBoot开发.zip微信点餐系统-SpringBoot开发.zip微信点餐系统-SpringBoot开发.zip微信点餐系统-SpringBoot开发.zip微信点餐系统-...