`
Franciswmf
  • 浏览: 796975 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

异步批量执行任务与回滚

 
阅读更多
java异步方式(结合@Async和CompletableFuture)处理批量任务执行和回滚(只回滚执行失败的批次,执行成功的批次不回滚)

    @Autowired
    @Qualifier(value = "customThreadPoolTaskExecutor")
    private ThreadPoolTaskExecutor customThreadPoolTaskExecutor;
    @Autowired
    private Environment environment;
    @Autowired
    private StringRedisTemplate redisTemplate;
    @Autowired
    private PlatformTransactionManager platformTransactionManager;
    /**
     * 测试异步批量审批与回滚
     * @param orderList
     * @param traceId
     * @throws Exception
     */
    @Async(value = "customThreadPoolTaskExecutor")
    public void batchAuditDemo(List<Order> orderList, String traceId){
        log.info("异步开始...traceId={}", traceId);
        long start = System.currentTimeMillis();
        //批量任务拆分成多个批次执行,每个批次跑固定数量的任务
        List<String> orderIdList = new ArrayList<>();
        for (Order temp : orderList) {
            orderIdList.add(temp.getId());
        }
        Integer size = Integer.parseInt(environment.getProperty(Constant.SIZE));
        //拆分
        Map<Integer, List<String>> map = spiltListFun(orderIdList, size);
        //批次执行结果
        boolean batchResult = false;
        for (Integer batchNo : map.keySet()) {
            //单批次异步审批(手动控制事务)
            batchResult = batchAsyncFun(batchNo, map.get(batchNo),traceId);
            if(batchResult){
                //...
                log.info("第{}批次执行失败,已回滚", batchNo);
            }
        }
        //...
        log.info("异步结束, 总耗时:{}", (System.currentTimeMillis() - start));
        orderIdList = null;
        map = null;
    }


    /**
     * 单批次异步审批(手动控制事务)
     * @param batchNo
     * @param orderIdList
     * @param traceId 本次批量审批的标记
     * @return
     */
    public boolean batchAsyncFun(int batchNo, List<String> orderIdList, String traceId){
        log.info("本次批量审批第{}批次开始执行:orderIdList={}", batchNo, orderIdList);
		redisTemplate.delete(currentBatchKey);
		//本批次是否进行了回滚
        final Map<Integer, Boolean> returnMap = new ConcurrentHashMap<>();
        CompletableFuture<Void> all = null;
        final Map<String, TransactionStatus> transactionMap = new ConcurrentHashMap<>();
        //用于保存当前批量审批任一批次执行结果的缓存key
        String currentBatchKey = RedisConstant.LIST_BATCH_APPROVAL + traceId;
        final CountDownLatch countDownLatch = new CountDownLatch(orderIdList.size());
        for (String orderId : orderIdList) {
            CompletableFuture<Boolean> cf = CompletableFuture.supplyAsync(() -> {
                //1.创建异步操作,支持返回值
                log.info("orderId={}, threadName={}", orderId, Thread.currentThread().getName());
                //手动控制事务
                DefaultTransactionDefinition def = new DefaultTransactionDefinition();
                //新发起一个事务
                def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
                TransactionStatus status = platformTransactionManager.getTransaction(def);
                transactionMap.put(orderId, status);
                //审批
                boolean eachResult = false;
                try {
                    //核心业务
                    //core code...;
                }catch (Exception e){
                    log.info("捕捉到异常:batchNo={}, orderId={}", batchNo, orderId);
                    e.printStackTrace();
                    eachResult =true;
                }
                if(eachResult){
                    redisTemplate.opsForList().leftPush(currentBatchKey, "0");
                }else{
                    redisTemplate.opsForList().leftPush(currentBatchKey, "1");
                }
                //根据eachResult结果判断当前记录是否需要回滚
                return eachResult;
            }, customThreadPoolTaskExecutor).handle((s, t) -> {
                //2.执行任务完成时对结果的处理(包括异常和非异常结果)
                //第一个参数s为CompletableFuture 返回的结果, 第二个参数t为抛出的异常
                //log.info("子任务完成:batchNo={}, orderId={}, s={}", batchNo, orderId, s);
                boolean needRollback = false;
                int size = Integer.valueOf(Long.toString(redisTemplate.opsForList().size(currentBatchKey)));
                //循环等待所有任务都有结果(考虑引入超时机制)
                while(true){
                    //log.info("循环等待...realSize={}", size);
                    if(size == orderIdList.size()){
                        List<String> ls = redisTemplate.opsForList().range(currentBatchKey, 0, -1);
                        for (String str : ls) {
                            if(str.equals("0")){
                                needRollback = true;
                                break;
                            }
                        }
                        if(!returnMap.containsKey(batchNo)){
                            returnMap.put(batchNo, needRollback);
                        }
                        //根据情况,提交或者回滚事务
                        if(needRollback){
                            platformTransactionManager.rollback(transactionMap.get(orderId));
                        }else{
                            platformTransactionManager.commit(transactionMap.get(orderId));
                        }
                        countDownLatch.countDown();
                        break;
                    }else{
                        size = Integer.valueOf(Long.toString(redisTemplate.opsForList().size(currentBatchKey)));
                    }
                }
                return needRollback;
            });
            all = CompletableFuture.allOf(cf);
        }
        //CompletableFuture有任务结果返回,但事务不一定提交或者回滚
        //log.info("主线程阻塞1...");
        all.join();
        
        try {
		    //等待本批次所有任务彻底执行完
            //log.info("主线程阻塞2...");
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //本批次是否回滚:true - 是; false - 否;
        return returnMap.get(batchNo);
    }


    /**
     * 将指定集合进行拆分
     * @param list 集合
     * @param len 拆分长度,每个集合按照拆分长度进行分割
     * @return
     */
    public static Map<Integer, List<String>> spiltListFun(List<String> list, int len){
        Map<Integer, List<String>> map = new HashMap<>();
        if(null == list || len < 1){
            map.put(1, new ArrayList<>());
            return map;
        }
        int size = list.size();
        log.info("size={}, len={}", size, len);
        if(size <= len){
            map.put(1, list);
        }else{
            //分隔后的集合个数
            int count = (size + len - 1) / len;
            for (int i = 0; i < count; i++) {
                //子集合
                List<String> subList = list.subList(i * len, (len * (i + 1) > size ? size : len * (i + 1)));
                //批次号,子集合
                map.put(i+1, subList);
            }
        }
//        log.info("map={}", map.toString());
        return map;
    }
分享到:
评论

相关推荐

    sqlserver批量执行脚本文件

    在SQL Server环境中,批量执行SQL脚本文件是数据库管理员和开发者日常工作中常见的一项任务。本文将深入探讨如何在.NET环境中高效地实现这一功能,并对比使用sqlcmd工具的方法。以下是一些关键知识点: 1. **SQL ...

    springmvc+mysql增删改查批量删除异步请求全选反选

    异步请求是现代Web应用中提升用户体验的关键技术,它允许后台处理任务而不阻塞用户界面。在这个项目中,很可能使用了Ajax(Asynchronous JavaScript and XML)或者基于jQuery的Ajax方法来实现异步通信。当用户触发...

    C#批量导入数据

    C#的async/await关键字可以帮助实现异步批量导入。 9. **并行处理**:如果系统资源允许,可以考虑使用多线程或异步并行处理,将数据分成多个部分同时导入,进一步提升效率。 在给定的文件列表中,"AddMany.aspx"和...

    WPF mvvm 同时连接多个sqlserver执行指定sql命令

    在这个例子中,`ExecuteSqlCommandsAsync`方法会异步地启动一个任务来为每个数据库连接执行SQL命令,而`ExecuteSqlCommandAsync`则通过Task.Run封装了数据库操作,将其放到后台线程执行,避免阻塞UI。 在实际应用中...

    Laravel开发-laravel-queue-manager

    还可以使用`dispatchNow()`立即执行任务,而不是放入队列。 5. **任务失败处理**:Laravel提供了`failed`方法来处理任务失败的情况。可以通过`php artisan queue:retry &lt;job_id&gt;`重试失败的任务,或`...

    C# 执行SQL脚本

    9. **批量执行SQL脚本**: 对于大量SQL命令,可以考虑使用SqlBulkCopy类进行批量插入,提高效率。但这不适用于DML(INSERT、UPDATE、DELETE)以外的操作,如DDL(CREATE、ALTER、DROP)。 10. **异步执行**: C#...

    ThreadProV2.1

    - **异步调用**:支持异步执行任务,不阻塞主线程,提升用户体验。 - **错误处理**:提供异常处理机制,确保任务失败时能适当回滚或通知用户。 - **可扩展性**:允许开发者自定义线程类和任务处理过程,满足各种复杂...

    Android 一个批量删除联系人的Demo.rar

    6. **异步处理**:由于删除大量联系人可能需要较长时间,为了提供良好的用户体验,通常会将此操作放在异步任务(如AsyncTask)中执行,防止UI线程被阻塞。 7. **用户确认**:在执行批量删除前,通常会弹出确认...

    工程名称批量替换

    在IT行业中,批量处理是一项非常重要的任务,尤其是在大型项目或者工程管理中。...以上就是"工程名称批量替换"所涵盖的主要技术点,理解和掌握这些知识点,将有助于在实际工作中有效地执行类似的任务。

    C#中海量数据的批量插入和更新[顶].pdf

    在C#中处理海量数据的批量插入和更新是一项常见的任务,尤其是在大数据应用或者ETL(提取、转换、加载)流程中。尽管ADO.NET在某些方面可能不如JDBC提供那么直观和便捷的批量操作API,但它仍然提供了高效的方法来...

    TERASOLUNA Batch Framework for Java 機能説明書1

    提供了异步执行作业的能力,允许同时处理多个任务,提高系统的并发性和处理效率。这对于处理大量数据或并行执行独立的作业非常有效。 3. **事务管理功能 (BL-03)**: 支持事务管理,确保数据的一致性和完整性。在...

    文件海量修改

    9. **脚本自动化**:结合定时任务服务(如Windows的任务计划程序或Linux的cron),可以定期自动执行文件批量修改任务。 10. **工具软件**:除了编程,也有一些现成的工具如Bulk Rename Utility(批量重命名工具)、...

    Viswoole:基于swoole扩展开发的队列服务

    这些worker进程在后台持续运行,当有新的任务到达时,它们会自动获取并执行任务。Swoole的事件驱动模型确保了高效的任务调度和资源利用。 4. **任务状态跟踪**:Viswoole提供了任务状态监控机制,开发者可以查看...

    psycopg2-2.5.4.tar.gz

    10. **性能优化**:`psycopg2` 允许用户设置连接和游标的参数以优化性能,如预编译 SQL 语句、批量执行操作等。 综上所述,`psycopg2-2.5.4.tar.gz` 文件包含的是 `psycopg2` 库的源代码,用户可以通过解压并安装这...

    Activiti 5.16

    - **批量操作支持**:如批量分配、批量完成等,适用于大量相似任务的快速处理场景。 #### 六、REST API集成 - **API文档完善**:官方提供了详细的 RESTful 接口文档,覆盖了几乎所有核心功能模块,方便第三方系统...

    批量以文件夹名命名文件_照片批量命名_文件夹名命名文件_批量命名文件_以文件夹名批量命名文件_

    在IT行业中,文件管理和自动化处理是一项常见的任务,尤其是在处理大量数据时。本篇文章将深入探讨如何使用.Net平台下的C#编程语言实现一个批量以文件夹名命名文件的工具,这在整理照片、文档等文件时非常实用。我们...

    话费充值系统/话费直充/快充慢充系统/话费直充系统

    2. 异步处理:为了处理高并发的充值请求,系统往往采用异步处理机制,将充值任务放入队列,后台线程负责执行,从而提高系统的响应速度。 3. 安全支付:集成第三方支付接口,如支付宝、微信支付等,确保交易的安全性...

    网络游戏-网络封包与数据库封包关联方法、装置和实现装置.zip

    5. 异步处理:对于不紧急的封包,如统计类数据,可以先放入队列,然后由后台服务异步处理,避免阻塞主线程,保证游戏的流畅性。 实现网络封包与数据库封包关联的装置,可能包括服务器端的处理模块、数据库连接池、...

    c++连接远程sqlserver

    7. **性能优化**:对于大量数据的读写操作,考虑使用预编译的语句(`SQLPrepare`),批量执行(`SQLSetStmtAttr`,设置`SQL_ATTR_ARRAY_SIZE`)和连接池等技术以提升性能。 8. **连接池**:连接池可以重复使用已...

    瀚高DB驱动.zip

    9. **异步操作**:现代应用程序常常需要并发处理多个数据库请求,瀚高DB驱动可能支持异步API,让应用能够在等待数据库响应的同时执行其他任务。 10. **元数据获取**:驱动程序提供获取数据库表结构、索引、约束等元...

Global site tag (gtag.js) - Google Analytics