`
richard_lee
  • 浏览: 16449 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

java异步计算场景应用

阅读更多
最近项目中遇到一个业务场景:
将当期数据库中的表迁移到另外一个数据库中,为满足迁移效率需要进行并发数据迁移。对每一数据表可以启动不同的线程同时迁移数据。迁移完成后,同步更新对应该迁移任务的状态字段。
最先想到的是使用java中并发工具类:同步屏障CyclicBarrier。
CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。
一、通过CyclicBarrier实现迁移任务代码:
package com.future.test;

import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class VerticaTransfer extends DataTransfer<DataInfo>{
    int threadCount = 10;
    //线程调度
    ExecutorService executor = null;
    CyclicBarrier barrier;
    //计算结果集
    protected void doBefore(DataInfo entity){
        //线程池
        executor = Executors.newFixedThreadPool(threadCount);
        //CyclicBarrier可以用于多线程计算数据,最后处理结果的场景
        barrier = new CyclicBarrier(threadCount,new DoAfter(this,entity));  
    }
    protected void doJob(DataInfo entity){
        //并行计算
        List<Product> ps = entity.getProducts();
        for (Product product : ps) {
            executor.execute(new VerticaTransferTask(barrier,product));
        }

    }
    @Override
    protected void doAfter(DataInfo entity) {
    }

}
/**
 * 合并计算处理
 * @author Administrator
 *
 */
class DoAfter implements Runnable {   
    private VerticaTransfer verticaTransfer;   
    private DataInfo entity;
    DoAfter(VerticaTransfer verticaTransfer,DataInfo entity) {   
        this.verticaTransfer = verticaTransfer;   
        this.entity = entity;
    }   
    public void run() {
        System.out.println("迁移完成。共迁移:" + entity.getProducts().size());
    }   
}
 
业务处理代码:
package com.future.test;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * 数据迁移执行任务
 * @author 
 *
 */
public class VerticaTransferTask implements Runnable{

    private CyclicBarrier barrier; 
    private Product product;
    VerticaTransferTask(Product product){
        this.product = product;
    }
    VerticaTransferTask(CyclicBarrier barrier,Product product){
        this.barrier = barrier;
        this.product = product;
    }


    @Override
    public void run() {
        // TODO Auto-generated method stub
        try {
            System.out.println("进行迁移 :" + product.getId());
            Thread.sleep(1000);
        }catch(Exception e){
            e.printStackTrace();
        } finally {
            try {
                barrier.await();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
}

测试入口:
package com.future.test;

import java.util.ArrayList;
import java.util.List;

public class VerticaTransferTest{
    public static void main(String[] args) {
            VerticaTransfer transfer = new VerticaTransfer(); //
            DataInfo data = new DataInfo();
            List<Product> ps = new ArrayList<Product>();
            int tmp = 0;
            for(int i = 0; i < 10;i++){
                Product p = new Product();
                p.setId(i + "");
                p.setPurchase_price(10);
                p.setSalse_price(10 + i);
                ps.add(p);
                tmp += i;
            }

            data.setProducts(ps);
            transfer.execute(data);
    }

}

通过上述实现步骤,完全可以实现业务场景。
增强业务场景:在上述场景基础上,对每次迁移的结果进行最终的汇总。多少迁移成功,多少迁移失败。也就是对每个线程处理结果进行汇总。
这个就涉及到线程间通信的问题。在现有处理的基础上,添加一个公共List变量,在迁移VerticaTransferTask run()方法中将迁移结果synchronized放在List
中即可。
但是,有没有更好的实现方式呢?
Future接口
描述:从Java 1.5开始,提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。
这就表示我们可以通过Future获取每个线程的执行结果。我以下通过并行计算产品利润的方式简单实现需求。
二、通过Future实现并行处理任务代码:
package com.test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class VerticaTransfer extends DataTransfer<DataInfo>{
    int threadCount = 10;
    //线程调度
    ExecutorService executor = null;
    //计算结果集
    List<Future<ResultInfo>> results = new ArrayList<Future<ResultInfo>>();
    protected void doBefore(DataInfo entity){
        //线程池
        executor = Executors.newFixedThreadPool(threadCount);

    }
    protected void doJob(DataInfo entity){
        //并行计算
        List<Product> ps = entity.getProducts();
        for (Product product : ps) {
            Future<ResultInfo> res = executor.submit(new VerticaTransferTask(product));
            results.add(res);
        }

    }
    @Override
    protected void doAfter(DataInfo entity) {
        double total = 0;
        List<Future<ResultInfo>> rs = this.results;
        for (Future<ResultInfo> future : rs) {
            try {
                ResultInfo info = future.get();
                total += info.getPrice();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        System.out.println("产品总利润:" + total);
    }

}

业务处理代码:
package com.test;

import java.util.concurrent.Callable;

/**
 * 数据迁移执行任务
 * @author
 *
 */
public class VerticaTransferTask implements Callable<ResultInfo>{

    private Product product;
    VerticaTransferTask(Product product){
        this.product = product;
    }


    @Override
    public ResultInfo call() throws Exception {
        // TODO Auto-generated method stub
        ResultInfo res = null;
        try {
            double money = product.getSalse_price() - product.getPurchase_price();
            res = new ResultInfo();
            res.setPrice(money);
            res.setProductId(product.getId());
            Thread.sleep(1000);
        }catch(Exception e){
            e.printStackTrace();
        } 
        return res;
    }
}

很简单我们就实现了并行计算并合并结果集。
那我能不能两者一起使用呢,我在CyclicBarrier处理结果DoAfter类中获取Future结果进行统计。
这样不就可以满足需求了吗。设想处理如下:
public class VerticaTransfer extends DataTransfer<DataInfo>{
    int threadCount = 10;
    //线程调度
    ExecutorService executor = null;
    CyclicBarrier barrier;
    //计算结果集
    List<Future<ResultInfo>> results = new ArrayList<Future<ResultInfo>>();
    protected void doBefore(DataInfo entity){
        //线程池
        executor = Executors.newFixedThreadPool(threadCount);
        //CyclicBarrier可以用于多线程计算数据,最后处理结果的场景
        barrier = new CyclicBarrier(threadCount,new DoAfter(this,entity));  
    }
    protected void doJob(DataInfo entity){
        //并行计算
        List<Product> ps = entity.getProducts();
        for (Product product : ps) {
            Future<ResultInfo> res = executor.submit(new VerticaTransferTask(product));
            results.add(res);
        }

    }
    @Override
    protected void doAfter(DataInfo entity) {

    }

}
/**
 * 合并计算处理
 * @author Administrator
 *
 */
class DoAfter implements Runnable {   
    private VerticaTransfer verticaTransfer;   
    private DataInfo entity;
    DoAfter(VerticaTransfer verticaTransfer,DataInfo entity) {   
        this.verticaTransfer = verticaTransfer;   
        this.entity = entity;
    }   
    public void run() {
        double total = 0;
        List<Future<ResultInfo>> rs = verticaTransfer.results;
        for (Future<ResultInfo> future : rs) {
            try {
                ResultInfo info = future.get();
                total += info.getPrice();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        System.out.println("产品总利润:" + total);
    }   
} 
 
业务处理VerticaTransferTask:
public class VerticaTransferTask implements Callable<ResultInfo>{

    private CyclicBarrier barrier; 
    private Product product;
    VerticaTransferTask(Product product){
        this.product = product;
    }
    VerticaTransferTask(CyclicBarrier barrier,Product product){
        this.barrier = barrier;
        this.product = product;
    }


    @Override
    public ResultInfo call() {
        // TODO Auto-generated method stub
        ResultInfo res = null;
        try {
            double money = product.getSalse_price() - product.getPurchase_price();
            res = new ResultInfo();
            res.setPrice(money);
            res.setProductId(product.getId());
            Thread.sleep(1000);
        }catch(Exception e){
            e.printStackTrace();
        } finally {
            try {
                barrier.await();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        return res;
    }
}

运行后发现死锁啦,原因是什么呢?
查了一下CyclicBarrier资料,注意这一点:
CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrier-Action),用于在线程到达屏障时,优先执行barrierAction。
也就是在barrier.await()执行之后会优先执行DoAfter类中的run, 而这时run中的 future.get()阻塞等待VerticaTransferTask call运行结果,形成了资源相互
抢占,造成了死锁。
这样我们就大概了解了在java中有两种实现并行计算的方式,那么具体遇到问题的时候如何选择呢?
我们还是要清楚两者的概念:
CyclicBarrier在到达屏障之后线程并没有处理结束,而是被阻塞等待,等有优先处理barrierAction完成后,被signalAll唤醒继续运行。
CyclicBarrier中的源代码:
   private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

而Future是等待线程运行完成之后才获取结果,否则一直阻塞等待。
附该Demo代码



















0
0
分享到:
评论
3 楼 richard_lee 2016-02-17  
smallbug_vip 写道
spiniper 写道
我只想问一句楼主,你使用的多线程处理和你单线程的处理真的有解决性能问题么?你的cpu只有一个,程序占用的cpu时间片段无论是单线程还是多线程应该都是一样,数据需要的内存空间也不会因为减少。单线程没有额外的线程资源和内存开销,不需要锁来维护共享数据的脏读问题,不需要仔细设计线程之间的协同问题只要保证逻辑的争取即结果正确,没有线程之间通信的额外开销和一些因为多线程而多出来的额外逻辑处理模块。
这就好像程序开发管理理论上的说明,多人处理一个问题真的会比一个人处理更快么?更何况在成本有限的情况下。
所以一般遇到性能问题,不要首先想到多线程来处理,这是不合理的思维,而是要以逻辑本身是否存在步骤或者算法上的优化为基础,因为事实证明,大多数性能问题都是业务设计的问题和不合适的算法造成的。


现在计算机早已进入了多核时代,一颗CPU照样可以并行运行任务。再者VerticaTransferTask类中复写的call方法中有一句Thread.sleep(1000);线程间状态切换所消耗的时间片绝对远远小于1000毫秒。所以即使是单核CPU开启多线程照样可以节省时间开销。虽然我并不知道为什么来一句Thread.sleep(1000),难道是模拟业务带来的时间开销,如果真实业务在这1000毫秒内线程不存在线程阻塞,一直在努力工作,那么单核CPU是不应该在多线程方面考虑解决方案了。

关于并发这块你说的对。即使是单核处理器也支持多线程执行代码,CPU通过给每个线程分配CPU时间片来实现这个机制。时间片是CPU分配给各个线程的时间,因为时间片非常短,所以CPU通过不停地切换线程执行,让我们感觉多个线程是同时执行的,时间片一般是几十毫秒(ms)。到多核时代,并发编程就更显示他的优势。更不用说对现在互联网时代高并发的业务支撑了。
至于Thread.sleep(1000);这块是我加的模拟业务时间(当然实际时间不止1s)
2 楼 smallbug_vip 2016-02-17  
spiniper 写道
我只想问一句楼主,你使用的多线程处理和你单线程的处理真的有解决性能问题么?你的cpu只有一个,程序占用的cpu时间片段无论是单线程还是多线程应该都是一样,数据需要的内存空间也不会因为减少。单线程没有额外的线程资源和内存开销,不需要锁来维护共享数据的脏读问题,不需要仔细设计线程之间的协同问题只要保证逻辑的争取即结果正确,没有线程之间通信的额外开销和一些因为多线程而多出来的额外逻辑处理模块。
这就好像程序开发管理理论上的说明,多人处理一个问题真的会比一个人处理更快么?更何况在成本有限的情况下。
所以一般遇到性能问题,不要首先想到多线程来处理,这是不合理的思维,而是要以逻辑本身是否存在步骤或者算法上的优化为基础,因为事实证明,大多数性能问题都是业务设计的问题和不合适的算法造成的。


现在计算机早已进入了多核时代,一颗CPU照样可以并行运行任务。再者VerticaTransferTask类中复写的call方法中有一句Thread.sleep(1000);线程间状态切换所消耗的时间片绝对远远小于1000毫秒。所以即使是单核CPU开启多线程照样可以节省时间开销。虽然我并不知道为什么来一句Thread.sleep(1000),难道是模拟业务带来的时间开销,如果真实业务在这1000毫秒内线程不存在线程阻塞,一直在努力工作,那么单核CPU是不应该在多线程方面考虑解决方案了。
1 楼 spiniper 2016-02-16  
我只想问一句楼主,你使用的多线程处理和你单线程的处理真的有解决性能问题么?你的cpu只有一个,程序占用的cpu时间片段无论是单线程还是多线程应该都是一样,数据需要的内存空间也不会因为减少。单线程没有额外的线程资源和内存开销,不需要锁来维护共享数据的脏读问题,不需要仔细设计线程之间的协同问题只要保证逻辑的争取即结果正确,没有线程之间通信的额外开销和一些因为多线程而多出来的额外逻辑处理模块。
这就好像程序开发管理理论上的说明,多人处理一个问题真的会比一个人处理更快么?更何况在成本有限的情况下。
所以一般遇到性能问题,不要首先想到多线程来处理,这是不合理的思维,而是要以逻辑本身是否存在步骤或者算法上的优化为基础,因为事实证明,大多数性能问题都是业务设计的问题和不合适的算法造成的。

相关推荐

    Java异步技术原理和实践.rar_Java异步开发

    总结,Java异步技术是提升系统效率的关键,从NIO的非阻塞I/O到高阶的`CompletableFuture`,再到事件驱动的Reactor模式,Java提供了丰富的工具和框架来应对各种异步编程场景。熟练掌握这些技术,能够帮助开发者构建出...

    java多线程异步性

    在Java中,通常通过Future和Callable接口实现异步计算,或者使用ExecutorService和CompletionService来管理和控制异步任务。 三、ExecutorService与ThreadPoolExecutor ExecutorService是Java并发框架中的核心接口...

    使用Java异步编程实现一个简单的网络请求.txt

    ### 使用Java异步编程实现简单网络请求的知识点详解 #### 一、概述 本文将详细介绍一个使用Java异步编程模型实现简单网络请求的例子。通过这个示例,我们可以了解到如何利用Java中的多线程机制和异步处理技术来...

    java同步异步知识

    - **作用**:表示异步计算的结果。 - **原理**:Future对象代表了一个尚未完成的任务的结果,可以通过get()方法获取结果,如果任务未完成,则会阻塞等待。 - **使用场景**:适用于需要异步获取计算结果的场景。 ...

    Java异步技术原理和实践.zip_异步

    总之,Java异步技术是一个深奥且强大的工具箱,它包含了一系列的机制和模式,旨在优化应用性能和用户体验。通过深入学习和实践,开发者可以更好地应对高并发场景,构建高效、响应迅速的应用程序。

    用JAVA写的一个异步多线程批处理的组件

    在IT行业中,尤其是在Java开发领域,异步多线程批处理是一种常见的技术手段,用于高效地处理大量数据。本文将详细解析标题为“用JAVA写的一个异步多线程批处理的组件”的核心知识点,以及如何利用这个组件来优化大...

    Java异步编程最佳实践_.docx

    Java异步编程是一种提高应用程序性能和响应速度的关键技术,特别是在处理I/O密集型任务时。在Java中,异步编程允许程序在等待某些操作完成(如网络请求、文件读写等)的同时,继续执行其他任务,从而提高了整体的...

    java android 贝塞尔曲线计算

    4. 多线程处理:可能使用了`AsyncTask`或者其他线程模型来异步计算贝塞尔曲线。 学习和掌握贝塞尔曲线计算以及在Android中的应用,不仅可以提升图形界面设计的能力,还能为游戏开发、动画制作等复杂场景提供支持。...

    JAVA实现异步调用实例代码

    在Java平台上,异步调用是一种重要的编程模式,它允许程序在执行耗时操作时不会阻塞主线程,从而提高应用程序的响应速度和用户体验。在Java中实现异步调用通常涉及多线程和回调机制。以下是基于给定实例代码的详细...

    异步调用

    异步调用是编程中的一个重要概念,特别是在高性能和高并发的应用场景中,它能显著提升系统的响应速度和用户体验。在本文中,我们将深入探讨异步调用的核心原理、使用场景以及如何在实践中应用。 首先,我们需要理解...

    分布式Java应用基础与实践pdf

    2. **RMI**:远程方法调用是Java中实现分布式计算的基础,允许一个Java对象调用另一个位于不同JVM上的对象的方法。RMI包括两个主要部分:服务器端(暴露远程接口和服务)和客户端(通过代理访问远程服务)。 3. **...

    JAVA实现模拟导入数据/上传文件进度条

    在文件上传进度条场景下,Servlet主要负责接收上传文件的请求,处理文件,并将文件的处理进度反馈给客户端。 接着,JSP(JavaServer Pages)是用来动态生成HTML页面的,它可以嵌入Java代码,使得页面和业务逻辑结合...

    分布式Java应用基础与实践源码.zip

    通过上述内容,我们可以看到分布式Java应用涉及的领域广泛,涵盖了从通信、数据存储到计算和事务处理等多个方面。源码分析是深入理解这些技术的关键,它可以帮助我们更好地掌握分布式系统的设计和实现。在实践中,...

    java9分布式计算

    6. **大数据处理**:分布式计算在处理海量数据时尤其有用,例如在大数据分析、流处理和实时计算场景。Apache Hadoop是典型的大数据处理框架,通过MapReduce和HDFS(Hadoop分布式文件系统)实现数据的分布式存储和...

    java网络编程与分布式计算

    Java网络编程是Java开发中的重要领域,它涵盖了网络通信的基础概念...通过深入学习"Java网络编程与分布式计算",开发者可以掌握构建大规模、高性能网络应用和分布式系统的技能,为现代互联网和云计算环境打下坚实基础。

    Java多线程之异步Future机制的原理和实现共5页.p

    在Java中,`java.util.concurrent.Future`接口代表一个异步计算的结果。它提供了检查计算是否完成、获取结果或取消任务的能力。但请注意,Future对象本身并不执行任何计算,而是与一个对应的`java.util.concurrent....

    《Java中间件技术及其应用开发》-李华飚-源代码

    书中的源代码可能涉及以上这些中间件技术的具体实现和应用场景,通过阅读和理解这些代码,读者可以深入学习Java中间件技术的工作原理,并提升实际开发能力。对于想从事Java后端开发或者对Java中间件感兴趣的开发者来...

    java的应用领域 操作符

    EJB(Enterprise JavaBeans)和JMS(Java Message Service)是其中的关键组件,用于处理事务管理、安全性以及异步通信。 3. **移动应用**:Android操作系统广泛采用Java作为其主要的开发语言。Android SDK允许...

    处理java异步事件的阻塞和非阻塞方法分析

    Java 异步事件处理方法分析 处理 Java 异步事件的阻塞和非阻塞方法分析是 Java 编程中非常重要的一部分。文中通过示例代码介绍了非常...了解这些方法的区别和应用场景,可以帮助您更好地编写高效、可靠的 Java 程序。

Global site tag (gtag.js) - Google Analytics