- 浏览: 797322 次
- 性别:
- 来自: 上海
文章分类
- 全部博客 (480)
- Spring (12)
- Hibernate (8)
- Struts2 (4)
- Java 基础-JDK-类-接口-URI-专题研究 (27)
- 线程、线程池、多线程高并发高可用、Socket通信 (15)
- Oracle数据库 (20)
- 一般-前端js-html-其它 (25)
- JYSK-互联网金融、金融科技、支付、公司、新闻等等 (8)
- Ajax-jQuery开源框架 (7)
- Json-轻量级的数据交换格式 (14)
- JavaScript (15)
- Jsp、Servlet、Servlet+JSP+JavaBean开发模式(MVC) (18)
- Html-JavaScript-前端-调用接口 (12)
- Sql Server 2005 (6)
- 正则表达式 (2)
- Java tools (18)
- 加签与验签、加密与解密 (3)
- Ajax技术核心-xmlHttpRequest(简称XHR) (6)
- xml-数据交换格式 (3)
- 信息采集 (1)
- Http - Https - HttpClient - httpCore-SSL-TLS (10)
- HtmlParser (2)
- 标签库 (1)
- SMS (2)
- jxl-导入导出 (4)
- poi-导入导出 (2)
- 定时器Timer+Quartz (6)
- 工作流引擎JBPM3.2.3 (4)
- 工作流引擎JBPM4 (0)
- 数据源-JNDI (0)
- tomcat、weblogic等应用服务器 (6)
- 工作流引擎jbpm5 (1)
- 搜索引擎Lucene (1)
- ant (1)
- 大数据-HBase (2)
- bigtable (0)
- 数据库设计 (4)
- jquery tab (0)
- mybatis (5)
- jquery ui 1.10.3 (5)
- Jboss7 (1)
- 规则引擎drools (0)
- 工作流引擎Activiti5 (0)
- 数据库-用户自定义函数 (0)
- 数据库-存储过程 (2)
- 数据库-视图 (0)
- 数据库-触发器 (0)
- 数据库-sql (2)
- highcharts-图表工具 (1)
- sql server 2008 (6)
- 诗词-工作室 (2)
- 数据割接 (1)
- GIS-地理信息系统 (2)
- RS-遥感技术 (1)
- GPS-全球定位系统 (1)
- java整合flex_RIA开发 (3)
- C#编程语言 (5)
- webservice_axis2_cxf_soap_wsdl (2)
- sql语句 (3)
- Flex_WebService_GIS (25)
- PHP编程语言 (0)
- ExtJS4.2 (1)
- Spring mvc (5)
- EasyUI1.4.2 (16)
- 日期时间工具类 (4)
- 随机数 (1)
- Arcgis api for js (0)
- Mysql数据库 (9)
- 移动互联网 java html5/flash socket netty (0)
- API接口 (1)
- AndroidStudio (0)
- Git (2)
- Maven (5)
- IDEA (0)
- 大数据-Hadoop (2)
- JPA (0)
- Spring boot (4)
- JSF (0)
- nginx_lua_module_redis (2)
- Activiti (1)
- bootstrap (1)
- AngularJS (10)
- 数据库-索引 (1)
- Linux及其连接工具SSH (4)
- java日志管理 (2)
- islider滑动控件 (1)
- jquery (1)
- 异常处理Exception (1)
- 秒杀与类秒杀系统 (1)
- 连接数据库、数据库连接池 (4)
- 数据库-临时表 (1)
- 软件设计模式-单例、多例、代理、工厂、观察者 (3)
- 集合框架 (5)
- 人工智能--Artificial intelligence、神经网络算法、机器学习 (1)
- 分布式应用 (1)
- SOA服务-Dubbo框架-Thrift框架 (2)
- Zookeeper分布式服务框架 (2)
- intellij idea (1)
- python编程语言 (0)
- 消息队列_MQ (0)
- 消息队列_RabbitMQ (2)
- 消息队列_ActiveMQ (1)
- 消息队列_Kafka (2)
- 缓存_Redis (4)
- 缓存_Memcache (0)
- 缓存_Ehcache (0)
- ivy-ivyde (1)
- google-protocol buffers (1)
- 正向代理-正向代理服务器 (1)
- 反向代理-反向代理服务器 (1)
- JVM内存模型 (0)
- Thunder框架 (1)
- NIO-非阻塞式IO (0)
- 软件测试、性能测试 (1)
- 序列化、Serializable接口、Externalizable接口 (3)
- 线程池-ExecutorService-ThreadPoolExecutor (1)
- web.xml (1)
- java开发-java工具-实用工具网站 (6)
- 医疗 (1)
- Filter-过滤器 (2)
- Unicode码-双字节字符编码 (1)
- OpenResty (1)
- 计算机网络 (1)
- eclipse_myeclipse_intellij idea (3)
- Enum (1)
- 大数据--Big Data (1)
- 云计算--Cloud computing (1)
- Elastic-Job (1)
- Redis (2)
- 文件流-IO操作 (6)
- 计算机基础知识 (1)
- Hessian-二进制RPC协议 (1)
- String类 (3)
- BigDecimal类 (1)
- java重要接口 (1)
- ReactJS (1)
- 跨域问题 (0)
- Map (1)
- 注解 (1)
- ASCII码-单字节字符编码 (1)
- 微服务、微服务架构 (2)
- RPC协议、RPC服务、RPC框架 (0)
- java反射 (1)
- java项目之classpath (1)
- 经典算法-树 (1)
- listener-监听器 (1)
- Interceptor-拦截器 (1)
- pojo javabean (2)
- 计算机科学与技术-进阶 (1)
- 代码规范与文档编写 (1)
- UML-统一建模语言 (1)
- 对接微信、支付宝 (3)
- 压力测试 (1)
- 办公软件-Excel (1)
- 办公软件-PPT (0)
- UTF8、GBK编码 (1)
- 微服务架构:Spring Cloud架构-Dubbo架构 (6)
- Nginx (1)
- 点滴业务 (1)
- form表单-json数据-转换与接口调用 (1)
- Junit单元测试 (1)
- 大数据-Spark (1)
- 大数据-Storm (1)
- 数据库事务-Spring事务 (0)
- elasticsearch (1)
- windows (1)
最新评论
java异步方式(结合@Async和CompletableFuture)处理批量任务执行和回滚(只回滚执行失败的批次,执行成功的批次不回滚)
@Autowired @Qualifier(value = "customThreadPoolTaskExecutor") private ThreadPoolTaskExecutor customThreadPoolTaskExecutor; @Autowired private Environment environment; @Autowired private StringRedisTemplate redisTemplate; @Autowired private PlatformTransactionManager platformTransactionManager; /** * 测试异步批量审批与回滚 * @param orderList * @param traceId * @throws Exception */ @Async(value = "customThreadPoolTaskExecutor") public void batchAuditDemo(List<Order> orderList, String traceId){ log.info("异步开始...traceId={}", traceId); long start = System.currentTimeMillis(); //批量任务拆分成多个批次执行,每个批次跑固定数量的任务 List<String> orderIdList = new ArrayList<>(); for (Order temp : orderList) { orderIdList.add(temp.getId()); } Integer size = Integer.parseInt(environment.getProperty(Constant.SIZE)); //拆分 Map<Integer, List<String>> map = spiltListFun(orderIdList, size); //批次执行结果 boolean batchResult = false; for (Integer batchNo : map.keySet()) { //单批次异步审批(手动控制事务) batchResult = batchAsyncFun(batchNo, map.get(batchNo),traceId); if(batchResult){ //... log.info("第{}批次执行失败,已回滚", batchNo); } } //... log.info("异步结束, 总耗时:{}", (System.currentTimeMillis() - start)); orderIdList = null; map = null; }
/** * 单批次异步审批(手动控制事务) * @param batchNo * @param orderIdList * @param traceId 本次批量审批的标记 * @return */ public boolean batchAsyncFun(int batchNo, List<String> orderIdList, String traceId){ log.info("本次批量审批第{}批次开始执行:orderIdList={}", batchNo, orderIdList); redisTemplate.delete(currentBatchKey); //本批次是否进行了回滚 final Map<Integer, Boolean> returnMap = new ConcurrentHashMap<>(); CompletableFuture<Void> all = null; final Map<String, TransactionStatus> transactionMap = new ConcurrentHashMap<>(); //用于保存当前批量审批任一批次执行结果的缓存key String currentBatchKey = RedisConstant.LIST_BATCH_APPROVAL + traceId; final CountDownLatch countDownLatch = new CountDownLatch(orderIdList.size()); for (String orderId : orderIdList) { CompletableFuture<Boolean> cf = CompletableFuture.supplyAsync(() -> { //1.创建异步操作,支持返回值 log.info("orderId={}, threadName={}", orderId, Thread.currentThread().getName()); //手动控制事务 DefaultTransactionDefinition def = new DefaultTransactionDefinition(); //新发起一个事务 def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); TransactionStatus status = platformTransactionManager.getTransaction(def); transactionMap.put(orderId, status); //审批 boolean eachResult = false; try { //核心业务 //core code...; }catch (Exception e){ log.info("捕捉到异常:batchNo={}, orderId={}", batchNo, orderId); e.printStackTrace(); eachResult =true; } if(eachResult){ redisTemplate.opsForList().leftPush(currentBatchKey, "0"); }else{ redisTemplate.opsForList().leftPush(currentBatchKey, "1"); } //根据eachResult结果判断当前记录是否需要回滚 return eachResult; }, customThreadPoolTaskExecutor).handle((s, t) -> { //2.执行任务完成时对结果的处理(包括异常和非异常结果) //第一个参数s为CompletableFuture 返回的结果, 第二个参数t为抛出的异常 //log.info("子任务完成:batchNo={}, orderId={}, s={}", batchNo, orderId, s); boolean needRollback = false; int size = Integer.valueOf(Long.toString(redisTemplate.opsForList().size(currentBatchKey))); //循环等待所有任务都有结果(考虑引入超时机制) while(true){ //log.info("循环等待...realSize={}", size); if(size == orderIdList.size()){ List<String> ls = redisTemplate.opsForList().range(currentBatchKey, 0, -1); for (String str : ls) { if(str.equals("0")){ needRollback = true; break; } } if(!returnMap.containsKey(batchNo)){ returnMap.put(batchNo, needRollback); } //根据情况,提交或者回滚事务 if(needRollback){ platformTransactionManager.rollback(transactionMap.get(orderId)); }else{ platformTransactionManager.commit(transactionMap.get(orderId)); } countDownLatch.countDown(); break; }else{ size = Integer.valueOf(Long.toString(redisTemplate.opsForList().size(currentBatchKey))); } } return needRollback; }); all = CompletableFuture.allOf(cf); } //CompletableFuture有任务结果返回,但事务不一定提交或者回滚 //log.info("主线程阻塞1..."); all.join(); try { //等待本批次所有任务彻底执行完 //log.info("主线程阻塞2..."); countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } //本批次是否回滚:true - 是; false - 否; return returnMap.get(batchNo); }
/** * 将指定集合进行拆分 * @param list 集合 * @param len 拆分长度,每个集合按照拆分长度进行分割 * @return */ public static Map<Integer, List<String>> spiltListFun(List<String> list, int len){ Map<Integer, List<String>> map = new HashMap<>(); if(null == list || len < 1){ map.put(1, new ArrayList<>()); return map; } int size = list.size(); log.info("size={}, len={}", size, len); if(size <= len){ map.put(1, list); }else{ //分隔后的集合个数 int count = (size + len - 1) / len; for (int i = 0; i < count; i++) { //子集合 List<String> subList = list.subList(i * len, (len * (i + 1) > size ? size : len * (i + 1))); //批次号,子集合 map.put(i+1, subList); } } // log.info("map={}", map.toString()); return map; }
发表评论
-
URI - URL处理
2021-10-30 16:38 228import okhttp3.HttpUrl; impo ... -
linux下设置redis开机自动运行
2020-01-07 19:37 327linux环境: 1、新建文件夹并复制redis.conf文 ... -
restTemplate调用GET/POST接口
2019-07-17 11:21 2032org.springframework.web.client. ... -
配置文件:yml文件和properties文件
2018-12-11 12:22 14471、读取配置文件属性值 方式一: @Autowired ... -
java 递归 获取树形结构数据
2018-10-24 11:00 2996private static void print2() ... -
Emoji 表情符号 处理
2018-08-21 17:56 1279-Emoji 百科 https://baike.baidu.c ... -
Java基础-01
2018-08-10 14:49 4800、Java 基本数据类型 --8种基本类型 http://w ... -
Spring框架之InitializingBean接口、DisposableBean接口
2018-03-26 10:29 705引用参考: --Spring提供的InitializingBe ... -
Spring 使用国际化信息 MessageSource
2018-03-20 15:50 1157引用参考: //第一种 【ResourceBundleMess ... -
java基础-02-JavaBean、Final关键字等等
2017-08-31 10:50 426【JavaBean】 参考博客: ---- JavaBean ... -
Spring
2017-08-29 11:31 384参考博客: --spring加载多个配置文件 http://w ... -
涉及计算机概念-Windows-Linux
2017-08-15 18:57 4171、域控 --windows域 https://baike.b ... -
Spring boot框架、jar启动
2017-07-11 11:35 4831、Spring Boot JDBC 连接数据库 http:/ ... -
IO、集合、多线程等基础框架
2017-07-10 15:04 11721、集合 Java集合总览 http://www.import ... -
Intellij idea使用经验
2017-05-08 18:52 373lombok配置和介绍 @Data @NoArgsConst ... -
form表单
2017-04-11 17:44 3901、postman中 form-data、x-www-form ... -
处理图片
2017-04-11 11:12 4161、import com.sun.image.codec.jp ... -
Spring框架 计划任务Schedule job demo
2017-02-20 10:19 571import java.util.Date; impor ... -
关心的技术
2017-01-23 09:21 409排名前5的编程语言: JAVA 、C、 C++、 C#、 PY ... -
app接口响应定向跳转页面
2017-01-19 16:40 1164http://127.0.0.1:8888/app/goo ...
相关推荐
在SQL Server环境中,批量执行SQL脚本文件是数据库管理员和开发者日常工作中常见的一项任务。本文将深入探讨如何在.NET环境中高效地实现这一功能,并对比使用sqlcmd工具的方法。以下是一些关键知识点: 1. **SQL ...
异步请求是现代Web应用中提升用户体验的关键技术,它允许后台处理任务而不阻塞用户界面。在这个项目中,很可能使用了Ajax(Asynchronous JavaScript and XML)或者基于jQuery的Ajax方法来实现异步通信。当用户触发...
C#的async/await关键字可以帮助实现异步批量导入。 9. **并行处理**:如果系统资源允许,可以考虑使用多线程或异步并行处理,将数据分成多个部分同时导入,进一步提升效率。 在给定的文件列表中,"AddMany.aspx"和...
在这个例子中,`ExecuteSqlCommandsAsync`方法会异步地启动一个任务来为每个数据库连接执行SQL命令,而`ExecuteSqlCommandAsync`则通过Task.Run封装了数据库操作,将其放到后台线程执行,避免阻塞UI。 在实际应用中...
还可以使用`dispatchNow()`立即执行任务,而不是放入队列。 5. **任务失败处理**:Laravel提供了`failed`方法来处理任务失败的情况。可以通过`php artisan queue:retry <job_id>`重试失败的任务,或`...
9. **批量执行SQL脚本**: 对于大量SQL命令,可以考虑使用SqlBulkCopy类进行批量插入,提高效率。但这不适用于DML(INSERT、UPDATE、DELETE)以外的操作,如DDL(CREATE、ALTER、DROP)。 10. **异步执行**: C#...
- **异步调用**:支持异步执行任务,不阻塞主线程,提升用户体验。 - **错误处理**:提供异常处理机制,确保任务失败时能适当回滚或通知用户。 - **可扩展性**:允许开发者自定义线程类和任务处理过程,满足各种复杂...
6. **异步处理**:由于删除大量联系人可能需要较长时间,为了提供良好的用户体验,通常会将此操作放在异步任务(如AsyncTask)中执行,防止UI线程被阻塞。 7. **用户确认**:在执行批量删除前,通常会弹出确认...
在IT行业中,批量处理是一项非常重要的任务,尤其是在大型项目或者工程管理中。...以上就是"工程名称批量替换"所涵盖的主要技术点,理解和掌握这些知识点,将有助于在实际工作中有效地执行类似的任务。
在C#中处理海量数据的批量插入和更新是一项常见的任务,尤其是在大数据应用或者ETL(提取、转换、加载)流程中。尽管ADO.NET在某些方面可能不如JDBC提供那么直观和便捷的批量操作API,但它仍然提供了高效的方法来...
提供了异步执行作业的能力,允许同时处理多个任务,提高系统的并发性和处理效率。这对于处理大量数据或并行执行独立的作业非常有效。 3. **事务管理功能 (BL-03)**: 支持事务管理,确保数据的一致性和完整性。在...
9. **脚本自动化**:结合定时任务服务(如Windows的任务计划程序或Linux的cron),可以定期自动执行文件批量修改任务。 10. **工具软件**:除了编程,也有一些现成的工具如Bulk Rename Utility(批量重命名工具)、...
这些worker进程在后台持续运行,当有新的任务到达时,它们会自动获取并执行任务。Swoole的事件驱动模型确保了高效的任务调度和资源利用。 4. **任务状态跟踪**:Viswoole提供了任务状态监控机制,开发者可以查看...
10. **性能优化**:`psycopg2` 允许用户设置连接和游标的参数以优化性能,如预编译 SQL 语句、批量执行操作等。 综上所述,`psycopg2-2.5.4.tar.gz` 文件包含的是 `psycopg2` 库的源代码,用户可以通过解压并安装这...
- **批量操作支持**:如批量分配、批量完成等,适用于大量相似任务的快速处理场景。 #### 六、REST API集成 - **API文档完善**:官方提供了详细的 RESTful 接口文档,覆盖了几乎所有核心功能模块,方便第三方系统...
在IT行业中,文件管理和自动化处理是一项常见的任务,尤其是在处理大量数据时。本篇文章将深入探讨如何使用.Net平台下的C#编程语言实现一个批量以文件夹名命名文件的工具,这在整理照片、文档等文件时非常实用。我们...
2. 异步处理:为了处理高并发的充值请求,系统往往采用异步处理机制,将充值任务放入队列,后台线程负责执行,从而提高系统的响应速度。 3. 安全支付:集成第三方支付接口,如支付宝、微信支付等,确保交易的安全性...
5. 异步处理:对于不紧急的封包,如统计类数据,可以先放入队列,然后由后台服务异步处理,避免阻塞主线程,保证游戏的流畅性。 实现网络封包与数据库封包关联的装置,可能包括服务器端的处理模块、数据库连接池、...
7. **性能优化**:对于大量数据的读写操作,考虑使用预编译的语句(`SQLPrepare`),批量执行(`SQLSetStmtAttr`,设置`SQL_ATTR_ARRAY_SIZE`)和连接池等技术以提升性能。 8. **连接池**:连接池可以重复使用已...
9. **异步操作**:现代应用程序常常需要并发处理多个数据库请求,瀚高DB驱动可能支持异步API,让应用能够在等待数据库响应的同时执行其他任务。 10. **元数据获取**:驱动程序提供获取数据库表结构、索引、约束等元...