`
zhaohaolin
  • 浏览: 1017755 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Java 7 Fork/Join 并行计算框架概览

    博客分类:
  • JAVA
阅读更多

Java 7 Fork/Join 并行计算框架概览

文章分类:Java编程

应用程序并行计算遇到的问题

    当硬件处理能力不能按摩尔定律垂直发展的时候,选择了水平发展。多核处理器已广泛应用,未来处理器的核心数将进一步发布,甚至达到上百上千的数量。而现在 很多的应用程序在运行在多核心的处理器上并不能得到很好的性能提升,因为应用程序的并发处理能力不强,不能够合理有效地的利用计算资源。线性的计算只能利 用n分之一的计算支援。
    要提高应用程序在多核处理器上的执行效率,只能想办法提高应用程序的本身的并行能力。常规的做法就是使用多线程,让更多的任务同时处理,或者让一部分操作 异步执行,这种简单的多线程处理方式在处理器核心数比较少的情况下能够有效地利用处理资源,因为在处理器核心比较少的情况下,让不多的几个任务并行执行即 可。但是当处理器核心数发展很大的数目,上百上千的时候,这种按任务的并发处理方法也不能充分利用处理资源,因为一般的应用程序没有那么多的并发处理任务 (服务器程序是个例外)。所以,只能考虑把一个任务拆分为多个单元,每个单元分别得执行最后合并每个单元的结果。一个任务的并行拆分,一种方法就是寄希望 于硬件平台或者操作系统,但是目前这个领域还没有很好的结果。另一种方案就是还是只有依靠应用程序本身对任务经行拆封执行。
Fork/Join框架

    依靠应用程序本身并行拆封任务,如果使用简单的多线程程序的方法,复杂度必然很大。这就需要一个更好的范式或者工具来代程序员处理这类问题。Java 7也意识到了这个问题,才标准库中集成了由Doug Lea开发的Fork/Join并行计算框架。通过使用 Fork/Join 模式,软件开发人员能够方便地利用多核平台的计算能力。尽管还没有做到对软件开发人员完全透明,Fork/Join 模式已经极大地简化了编写并发程序的琐碎工作。对于符合 Fork/Join 模式的应用,软件开发人员不再需要处理各种并行相关事务,例如同步、通信等,以难以调试而闻名的死锁和 data race 等错误也就不会出现,提升了思考问题的层次。你可以把 Fork/Join 模式看作并行版本的 Divide and Conquer 策略,仅仅关注如何划分任务和组合中间结果,将剩下的事情丢给 Fork/Join 框架。但是Fork/Join并行计算框架,并不是银弹,并不能解决所有应用程序在超多核心处理器上的并发问题。
    如果一个应用能被分解成多个子任务,并且组合多个子任务的结果就能够获得最终的答案,那么这个应用就适合用 Fork/Join 模式来解决。其原理如下图。

    应用程序开发者需要做的就是拆分任务并组合每个子任务的中间结果,而不用再考虑线程和锁的问题。
一个简单的例子

我们首先看一个简单的Fork/Join的任务定义。

Java代码  收藏代码
  1. public   class  Calculator  extends  RecursiveTask<Integer> {    
  2.     
  3.     private   static   final   int  THRESHOLD =  100 ;    
  4.     private   int  start;    
  5.     private   int  end;    
  6.     
  7.     public  Calculator( int  start,  int  end) {    
  8.         this .start = start;    
  9.         this .end = end;    
  10.     }    
  11.     
  12.     @Override     
  13.     protected  Integer compute() {    
  14.         int  sum =  0 ;    
  15.         if ((start - end) < THRESHOLD){    
  16.             for ( int  i = start; i< end;i++){    
  17.                 sum += i;    
  18.             }    
  19.         }else {    
  20.             int  middle = (start + end) / 2 ;    
  21.             Calculator left = new  Calculator(start, middle);    
  22.             Calculator right = new  Calculator(middle +  1 , end);    
  23.             left.fork();    
  24.             right.fork();    
  25.     
  26.             sum = left.join() + right.join();    
  27.         }    
  28.         return  sum;    
  29.     }    
  30.     
  31. }    
public class Calculator extends RecursiveTask<Integer> {  
  
    private static final int THRESHOLD = 100;  
    private int start;  
    private int end;  
  
    public Calculator(int start, int end) {  
        this.start = start;  
        this.end = end;  
    }  
  
    @Override  
    protected Integer compute() {  
        int sum = 0;  
        if((start - end) < THRESHOLD){  
            for(int i = start; i< end;i++){  
                sum += i;  
            }  
        }else{  
            int middle = (start + end) /2;  
            Calculator left = new Calculator(start, middle);  
            Calculator right = new Calculator(middle + 1, end);  
            left.fork();  
            right.fork();  
  
            sum = left.join() + right.join();  
        }  
        return sum;  
    }  
  
}  


    这段代码中,定义了一个累加的任务,在compute方法中,判断当前的计算范围是否小于一个值,如果是则计算,如果没有,就把任务拆分为连个子任务,并合并连个子任务的中间结果。程序递归的完成了任务拆分和计算。
    任务定义之后就是执行任务,Fork/Join提供一个和Executor框架 的扩展线程池来执行任务。
Java代码  收藏代码
  1. @Test     
  2. public   void  run()  throws  Exception{    
  3.     ForkJoinPool forkJoinPool = new  ForkJoinPool();    
  4.     Future<Integer> result = forkJoinPool.submit(new  Calculator( 0 10000 ));    
  5.     
  6.     assertEquals(new  Integer( 49995000 ), result.get());    
  7. }    
@Test  
public void run() throws Exception{  
    ForkJoinPool forkJoinPool = new ForkJoinPool();  
    Future<Integer> result = forkJoinPool.submit(new Calculator(0, 10000));  
  
    assertEquals(new Integer(49995000), result.get());  
}  


Fork/Join框架的主要类



RecursiveAction供不需要返回值的任务继续。
RecursiveTask通过泛型参数设置计算的返回值类型。
ForkJoinPool提供了一系列的submit方法,计算任务。ForkJoinPool默认的线程数通过Runtime.availableProcessors()获得,因为在计算密集型的任务中,获得多于处理性核心数的线程并不能获得更多性能提升。
Java代码  收藏代码
  1. public  <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {  
  2.     doSubmit(task);  
  3.     return  task;  
  4. }  
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
    doSubmit(task);
    return task;
}

sumit方法返回了task本身,ForkJoinTask实现了Future接口,所以可以通过它等待获得结果。
另一例子

这个例子并行排序数组,不需要返回结果,所以继承了RecursiveAction。

Java代码  收藏代码
  1. public   class  SortTask  extends  RecursiveAction {    
  2.     final   long [] array;    
  3.     final   int  start;    
  4.     final   int  end;    
  5.     private   int  THRESHOLD =  100 //For demo only     
  6.     
  7.     public  SortTask( long [] array) {    
  8.         this .array = array;    
  9.         this .start =  0 ;    
  10.         this .end = array.length -  1 ;    
  11.     }    
  12.     
  13.     public  SortTask( long [] array,  int  start,  int  end) {    
  14.         this .array = array;    
  15.         this .start = start;    
  16.         this .end = end;    
  17.     }    
  18.     
  19.     protected   void  compute() {    
  20.         if  (end - start < THRESHOLD)    
  21.             sequentiallySort(array, start, end);    
  22.         else  {    
  23.             int  pivot = partition(array, start, end);    
  24.             new  SortTask(array, start, pivot -  1 ).fork();    
  25.             new  SortTask(array, pivot +  1 , end).fork();    
  26.         }    
  27.     }    
  28.     
  29.     private   int  partition( long [] array,  int  start,  int  end) {    
  30.         long  x = array[end];    
  31.         int  i = start -  1 ;    
  32.         for  ( int  j = start; j < end; j++) {    
  33.             if  (array[j] <= x) {    
  34.                 i++;    
  35.                 swap(array, i, j);    
  36.             }    
  37.         }    
  38.         swap(array, i + 1 , end);    
  39.         return  i +  1 ;    
  40.     }    
  41.     
  42.     private   void  swap( long [] array,  int  i,  int  j) {    
  43.         if  (i != j) {    
  44.             long  temp = array[i];    
  45.             array[i] = array[j];    
  46.             array[j] = temp;    
  47.         }    
  48.     }    
  49.     
  50.     private   void  sequentiallySort( long [] array,  int  lo,  int  hi) {    
  51.         Arrays.sort(array, lo, hi + 1 );    
  52.     }    
  53. }   
public class SortTask extends RecursiveAction {  
    final long[] array;  
    final int start;  
    final int end;  
    private int THRESHOLD = 100; //For demo only  
  
    public SortTask(long[] array) {  
        this.array = array;  
        this.start = 0;  
        this.end = array.length - 1;  
    }  
  
    public SortTask(long[] array, int start, int end) {  
        this.array = array;  
        this.start = start;  
        this.end = end;  
    }  
  
    protected void compute() {  
        if (end - start < THRESHOLD)  
            sequentiallySort(array, start, end);  
        else {  
            int pivot = partition(array, start, end);  
            new SortTask(array, start, pivot - 1).fork();  
            new SortTask(array, pivot + 1, end).fork();  
        }  
    }  
  
    private int partition(long[] array, int start, int end) {  
        long x = array[end];  
        int i = start - 1;  
        for (int j = start; j < end; j++) {  
            if (array[j] <= x) {  
                i++;  
                swap(array, i, j);  
            }  
        }  
        swap(array, i + 1, end);  
        return i + 1;  
    }  
  
    private void swap(long[] array, int i, int j) {  
        if (i != j) {  
            long temp = array[i];  
            array[i] = array[j];  
            array[j] = temp;  
        }  
    }  
  
    private void sequentiallySort(long[] array, int lo, int hi) {  
        Arrays.sort(array, lo, hi + 1);  
    }  
} 



Java代码  收藏代码
  1. @Test     
  2. public   void  run()  throws  InterruptedException {    
  3.     ForkJoinPool forkJoinPool = new  ForkJoinPool();    
  4.     Random rnd = new  Random();    
  5.     long [] array =  new   long [SIZE];    
  6.     for  ( int  i =  0 ; i < SIZE; i++) {    
  7.         array[i] = rnd.nextInt();    
  8.     }    
  9.     forkJoinPool.submit(new  SortTask(array));    
  10.     
  11.     forkJoinPool.shutdown();    
  12.     forkJoinPool.awaitTermination(1000 , TimeUnit.SECONDS);    
  13.     
  14.     for  ( int  i =  1 ; i < SIZE; i++) {    
  15.         assertTrue(array[i - 1 ] < array[i]);    
  16.     }    
  17. }    
@Test  
public void run() throws InterruptedException {  
    ForkJoinPool forkJoinPool = new ForkJoinPool();  
    Random rnd = new Random();  
    long[] array = new long[SIZE];  
    for (int i = 0; i < SIZE; i++) {  
        array[i] = rnd.nextInt();  
    }  
    forkJoinPool.submit(new SortTask(array));  
  
    forkJoinPool.shutdown();  
    forkJoinPool.awaitTermination(1000, TimeUnit.SECONDS);  
  
    for (int i = 1; i < SIZE; i++) {  
        assertTrue(array[i - 1] < array[i]);  
    }  
}  


动手尝试

Fork/Join框架的代码已经整合到了最新的JDK7的Binary Snapshot Releases中,可以通过这个地址 下载。
本文中的代码见附件。

分享到:
评论

相关推荐

    Packt.Java.7.Concurrency.Cookbook

    - **Fork/Join框架**:这是Java 7中最引人注目的新特性之一。Fork/Join框架为分治算法(divide-and-conquer algorithms)提供了一种高效实现方式,它可以将一个大任务分解成若干个小任务,并在多核处理器上并行执行...

    Beginning Java 7

    Fork/Join 框架是一种新的并行计算框架,它简化了多线程编程模型。这个框架鼓励开发者将任务分解成更小的任务,然后将结果合并。这种方式非常适合于需要进行大量并行处理的应用场景。 ##### 2. Objects 类 Java 7 ...

    风辰的CUDA入门教程

    - **定义**:Fork/Join是一种并行计算框架,用于高效地分配任务并在多核处理器上执行。 - **CUDA与Fork/Join**:在CUDA中,可以通过类似Fork/Join的方式来实现数据并行处理,但具体实现方式有所不同,更多依赖于CUDA...

    深入浅出Java多线程.pdf

    Java 8 Stream 并行计算原理** - **Stream API**:Java 8 引入的新特性,支持函数式编程风格。 - **并行流**:利用多核处理器的优势,将数据分割为多个部分并行处理。 - **并行流与串行流对比**:并行流适用于数据...

    图灵图书:图解JAVA多线程设计模式

    - Fork/Join框架介绍 - CompletableFuture的使用案例 - Stream API与并行流 - 并发安全的集合类 #### Java多线程基础 **线程的概念与创建方式:** - **概念:** 在计算机科学中,线程是操作系统能够进行运算...

    Java 并发编程实战.pdf

    又如,在数据分析场景下,利用`Fork/Join`框架进行并行计算,可以大幅减少数据处理时间。 综上所述,“Java并发编程实战.pdf”这一文档为读者提供了丰富的理论基础和技术实践指导,帮助开发人员更好地理解和掌握...

    Java Concurrency In Practice.pdf

    书中介绍了如何使用Fork/Join框架来实现高效的并行递归算法。 以上内容概述了《Java Concurrency In Practice》这本书中所涉及的关键知识点和技术细节,希望能为读者提供有价值的参考和启示。通过深入学习这些内容...

    Verilog HDL 可综合语法总结

    - **fork/join**:并行执行控制结构。 - **initial**:初始块。 - **delays**:显式延时。 - **UDP**:用户定义的原语。 - **wait**:等待条件。 这些结构通常不被综合工具所支持,因为它们主要用于测试和验证目的...

    SystemVerilog与功能验证文前

    - **并发控制结构**: 如`fork...join`结构,支持高效的并行处理。 - **通信机制**: 如`mailbox`、`semaphore`和`event`等,用于协调并发进程间的通信。 - **虚接口**: 用于连接设计模块和验证环境,简化了接口的复杂...

    线程与并发:Ruby并行世界的探索之旅

    ### 线程与并发:Ruby并行世界的探索之旅 #### Ruby 语言概览 Ruby 是一种高级的、面向对象的编程语言,由日本开发者松本行弘(Yukihiro "Matz" Matsumoto)于 1995 年创建。其设计初衷旨在实现简单、自然且强大的...

    IEEE standard for SystemVerilog

    - **高级并发控制**:SystemVerilog提供了多种并发控制机制,包括fork/join、wait、@等操作符,使得开发者能够更容易地处理并发事件。 - **接口和封装**:为了提高代码的可复用性,SystemVerilog支持接口和封装的...

    A trip through the Graphics Pipeline 2011

    像素处理分为分叉阶段(Fork Phase)和合并阶段(Join Phase),这两个阶段处理像素的着色和其他计算任务。像素着色器(Pixel Shader)或片段着色器(Fragment Shader)在这个过程中计算像素的颜色和其他属性。 ...

    UNIX环境高级编程第二版

    - **并行计算**:利用多线程或多进程实现高效的数据处理任务。 - **系统服务开发**:构建后台运行的服务程序,例如守护进程(daemon)。 #### 6. 总结 《UNIX环境高级编程(第2版)》不仅是一本理论性强的技术书籍...

    uml之活动图

    9. **分叉和连接点(Fork and Join Nodes)**:支持并发处理,分叉点启动多个线程,连接点等待所有线程完成后再继续后续流程。 10. **扩展域(Expansion Region)**:一种特殊的区域,允许活动的重复执行,如迭代或...

    伯克利硬件名师的大作Kluwer Verilog Quickstart 3Rd Edition

    - **并行执行**:探讨了如何使用`fork-join`结构来编写并行执行的代码,这对于提高系统性能有着重要作用。 #### 六、系统任务 - **显示结果**:介绍了如何使用系统任务(system tasks)来显示仿真结果,包括常用的`$...

    testbench编写技巧

    为了实现更复杂的控制流,可以在 `initial` 块内部嵌套循环语句(如 `forever`),或者使用 `fork` 和 `join` 来并行执行多个任务,提高仿真效率。 ##### 遍历测试中的for循环应用 对于具有多种工作模式的设计,...

    多线程编程指南 linux

    #### 知识点概览 本文档旨在提供一个全面且深入的理解关于Linux环境下的多线程编程技术。主要内容涵盖了多线程的基础概念、线程编程的基本方法、线程属性的配置以及同步机制的使用等关键领域。以下是根据文档标题、...

Global site tag (gtag.js) - Google Analytics