`
myway84
  • 浏览: 203025 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
文章分类
社区版块
存档分类
最新评论
阅读更多
JSR-166: The Java fork/join Framework

The JSR-166 are concurrent utilities that were included in Java 5.  The fork/join framework was a piece of it that didn’t make it into Java 5.  After all this time the fork/join framework is finally making it into JDK 7.  What surprised me about the framework is that it is so easy to use.

The fork/join framework is designed to make divide-and-conquer algorithms easy to parallelize.  More specifically, recursive algorithms where the control path branches out over a few paths and they each process an equal part of the data set.  The typical setup is a new class is created that extends either the RecursiveAction or RecursiveTask class.  The parameters that were sent into the recursive function become member variables in the newly defined class.  Then the recursive calls are replaced by invokeAll(…) rather than the calls to the function itself.

In writing this post, I kept going back for forth on whether I should use Fibonacci numbers as an example or something with more meat to it.  The computations done by each recursive call of a Fibonacci numbers algorithm is too small to matter, not only that, but there are much better non-parallel algorithms for Fibonacci numbers.  In the end, I decided on showing a merge sort.  It is used as the example in the fork/join documentation, but this will be a more complete example showing both the sequential algorithm and the changes made for the parallel version of the algorithm.  You’ll see that it’s not that hard.

First let me start by showing the source code for a typical MergeSort:

public class MergeSort {

    private static final int SIZE_THRESHOLD = 16;

    public static void sort(Comparable[] a) {
        sort(a, 0, a.length-1);
    }

    public static void sort(Comparable[] a, int lo, int hi) {
        if (hi - lo < SIZE_THRESHOLD) {
            insertionsort(a, lo, hi);
            return;
        }

        Comparable[] tmp = new Comparable[((hi - lo) / 2) + 1];
        mergeSort(a, tmp, lo, hi);
    }

    private static void mergeSort(Comparable[] a, Comparable[] tmp, int lo, int hi) {
        if (hi - lo < SIZE_THRESHOLD) {
            insertionsort(a, lo, hi);
            return;
        }

        int m = (lo + hi) / 2;
        mergeSort(a, tmp, lo, m);
        mergeSort(a, tmp, m + 1, hi);
        merge(a, tmp, lo, m, hi);
    }

    private static void merge(Comparable[] a, Comparable[] b, int lo, int m, int hi) {
        if (a[m].compareTo(a[m+1]) <= 0)
            return;

        System.arraycopy(a, lo, b, 0, m-lo+1);

        int i = 0;
        int j = m+1;
        int k = lo;

        // copy back next-greatest element at each time
        while (k < j && j <= hi) {
            if (b[i].compareTo(a[j]) <= 0) {
                a[k++] = b[i++];
            } else {
                a[k++] = a[j++];
            }
        }

        // copy back remaining elements of first half (if any)
        System.arraycopy(b, i, a, k, j-k);

    }

    private static void insertionsort(Comparable[] a, int lo, int hi) {
        for (int i = lo+1; i <= hi; i++) {
            int j = i;
            Comparable t = a[j];
            while (j > lo && t.compareTo(a[j - 1]) < 0) {
                a[j] = a[j - 1];
                --j;
            }
            a[j] = t;
        }
    }
}
Now here is the code for the parallel version of MergeSort:

public class ParallelMergeSort {

    private static final ForkJoinPool threadPool = new ForkJoinPool();
    private static final int SIZE_THRESHOLD = 16;

    public static void sort(Comparable[] a) {
        sort(a, 0, a.length-1);
    }

    public static void sort(Comparable[] a, int lo, int hi) {
        if (hi - lo < SIZE_THRESHOLD) {
            insertionsort(a, lo, hi);
            return;
        }

        Comparable[] tmp = new Comparable[a.length];
        threadPool.invoke(new SortTask(a, tmp, lo, hi));
    }

    /**
     * This class replaces the recursive function that was
     * previously here.
     */
    static class SortTask extends RecursiveAction {
        Comparable[] a;
        Comparable[] tmp;
        int lo, hi;
        public SortTask(Comparable[] a, Comparable[] tmp, int lo, int hi) {
            this.a = a;
            this.lo = lo;
            this.hi = hi;
            this.tmp = tmp;
        }

        @Override
        protected void compute() {
            if (hi - lo < SIZE_THRESHOLD) {
                insertionsort(a, lo, hi);
                return;
            }

            int m = (lo + hi) / 2;
            // the two recursive calls are replaced by a call to invokeAll
            invokeAll(new SortTask(a, tmp, lo, m), new SortTask(a, tmp, m+1, hi));
            merge(a, tmp, lo, m, hi);
        }
    }

    private static void merge(Comparable[] a, Comparable[] b, int lo, int m, int hi) {
        if (a[m].compareTo(a[m+1]) <= 0)
            return;

        System.arraycopy(a, lo, b, lo, m-lo+1);

        int i = lo;
        int j = m+1;
        int k = lo;

        // copy back next-greatest element at each time
        while (k < j && j <= hi) {
            if (b[i].compareTo(a[j]) <= 0) {
                a[k++] = b[i++];
            } else {
                a[k++] = a[j++];
            }
        }

        // copy back remaining elements of first half (if any)
        System.arraycopy(b, i, a, k, j-k);

    }

    private static void insertionsort(Comparable[] a, int lo, int hi) {
        for (int i = lo+1; i <= hi; i++) {
            int j = i;
            Comparable t = a[j];
            while (j > lo && t.compareTo(a[j - 1]) < 0) {
                a[j] = a[j - 1];
                --j;
            }
            a[j] = t;
        }
    }
}
As you can see the majority of the algorithm has remained intact.  As stated above a new class is created that extends RecursiveAction, and the parameters of the function are then passed into that class during creation.  One thing to take note, is that previously only half the size of the original array was created as secondary storage.  Now the entire length of the array is created as a temporary storage.  This is used to avoid different threads needing the same area of the array at the same time.

Changes to the algorithm may be needed, but it definitely helps in making it easier to move to parallel processing.  One other thing to note is the presence of the ForkJoinPool.  The default constructor looks at the processor and determines the appropriate level of parallelism for the task.

I have a quad core CPU, so the ForkJoinPool will spawn at least four threads if necessary. That said, I’ve seen in where only two threads are spawned because more than that was not necessary for the given task. The ForkJoinPool spawns more threads as deemed necessary without starting right at the maximum.

A complete API for the fork/join framework can be found here at the Concurrency JSR-166 Interest Site.  All that is needed for Java 6 is the jsr166y package.

Some other algorithms that are suited for parallelism that I’ve been thinking about are graph searching algorithms such as depth first and breadth first search.  Depending on whether they are done on a tree or a graph determines how much the underlying data structure will need to be changed to support the parallelism.  I plan to look at making a parallel version of the quicksort algorithm using this framework.  Most divide and conquer algorithms can be adapted fairly easily to be multi-threaded using this method, but remember for a performance benefit to be seen the task must be sufficiently large.


Posted on March 9, 2010 at 10:53 pm by Joe · Permalink
In: Uncategorized · Tagged with: java, sorting, threading
4 Responses

Subscribe to comments via RSS
Written by Riccardo
on April 27, 2010 at 12:12 pm
Reply · Permalink
Good performance, i compared your parallel merge sort to the standard java Arrays.sort() and the time needed is an half with two processors.

Optimal!

Written by Prabh
on July 15, 2010 at 5:47 pm
Reply · Permalink
I wrote one program to check the speed up happens due to ParallelArray predefined sort() method. However, it is running even slower than the sequential algorithm. Am I missing something in my code ? If not, then whats the point having a sort function in ParallelArray class as opposed to Arrays.sort(). Actually Arrays.sort() is even faster than parallel code written using Java 7 invoke() method. Here’s the code:

import java.util.Arrays;

import jsr166y.*;
import extra166y.*;

public class TestParallelArray {
public static void main(String[] args) {

int size = 15000000;
Integer[] data = new Integer[size];
int[] temp = new int[size];
int[] temp2 = new int[size];

for (int i = 0; i < size; i++) {
data[i] = new Integer((int)(Math.random()*10000000));
temp[i] = data[i];
temp2[i] = data[i];
}

ForkJoinPool forkJoinPool = new ForkJoinPool();
ParallelArray parallelArray = ParallelArray.createFromCopy(data, forkJoinPool);

TestParallelArray testParallelArray = new TestParallelArray();

System.out.println(“Sorting now”);
long start = System.currentTimeMillis();
testParallelArray.shuttleSort(temp2, temp, 0, size);
long finish = System.currentTimeMillis();
System.out.println(“Time taken = ” + (finish – start) + ” milliseconds (Using sequential)”);

start = System.currentTimeMillis();
forkJoinPool.invoke(testParallelArray.new SortTask(temp2, temp, 0, size));
finish = System.currentTimeMillis();
System.out.println(“Time taken = ” + (finish – start) + ” milliseconds (Parallel using invoke command)”);

start = System.currentTimeMillis();
parallelArray = parallelArray.sort(); //Even if i convert this line to “parallelArray.sort()” clock time almost remains same
finish = System.currentTimeMillis();
System.out.println(“Time taken = ” + (finish – start) + ” milliseconds (Using Parallel Sort)”);

start = System.currentTimeMillis();
Arrays.sort(temp); //Even if i convert this line to “parallelArray.sort()” clock time almost remains same
finish = System.currentTimeMillis();
System.out.println(“Time taken = ” + (finish – start) + ” milliseconds (Using Arrays.sort())”);

}

private void shuttleSort(int[] from, int[] to, int low, int high) {
if (high – low == 1) {
return;
}
int middle = (low + high) / 2;
shuttleSort(to, from, low, middle);
shuttleSort(to, from, middle, high);

int p = low;
int q = middle;

if (high – low >= 4 && from[middle - 1] <= from[middle]) {
for (int i = low; i < high; i++) {
to[i] = from[i];
}
return;
}

for (int i = low; i = high || (p < middle && from[p] = 4 && from[middle - 1] <= from[middle]) {
for (int i = low; i < high; i++) {
to[i] = from[i];
}
return;
}

for (int i = low; i = high || (p < middle && from[p] <= from[q])) {
to[i] = from[p++];
}
else {
to[i] = from[q++];
}
}
}
}

}

Written by Joe
on July 15, 2010 at 6:35 pm
Reply · Permalink
You’re not doing an apples to apples comparison.

The problem lies here:
Integer[] data = new Integer[size];
int[] temp = new int[size];
int[] temp2 = new int[size];

It should be:
Integer[] data = new Integer[size];
Integer[] temp = new Integer[size];
Integer[] temp2 = new Integer[size];

You’re comparing an array of objects to an array of ints. Java operates faster on primitive types.

Also, when benchmarking it’s a good idea to take the average of several runs.
分享到:
评论

相关推荐

    JSR166Y JAVA7并行

    JSR166Y

    jsr166.rar

    JSR 166,全称为Java Specification Request 166,是Java平台标准版(Java SE)的一个重要组成部分,主要关注并发和多线程编程。这个规范引入了一系列新的API和工具,使得Java程序员能够更高效、更安全地处理并发...

    Fork/Join框架Package jsr166y

    Fork/Join框架Package jsr166y是Java 7并行编程类的的初步版本(Preliminary versions of classes targeted for Java 7.)

    jsr166_to_jmh

    《JSR166到JMH:Java并发性能测试的演变与比较》 在Java并发编程领域,JSR166(Java Specification Request 166)是一个重要的里程碑,它引入了许多并发工具类,如`java.util.concurrent`包中的`CountDownLatch`、`...

    jsr166e-1.1.0.jar

    javaweb常用jar包,javaee框架常用jar包,亲测可用,若需其他版本可给我留言

    并发大神DougLea文章集锦

    JSR166规范的实施对Java并发编程产生了深远的影响,使得Java开发者能够更方便地编写高性能的并发程序。 并发工具库 Doug Lea开发了多个并发工具库,包括util.concurrent、collections和Microscope等,这些工具库...

    基于Java的数据库连接池 BoneCP.zip

    最后,`jsr166y`目录可能包含了JSR166的兼容库。JSR166是Java并发包(java.util.concurrent)的一部分, BoneCP 可能使用了其中的一些并发工具,如`Future`和`ExecutorService`,来实现高效的线程管理和任务调度,...

    基于Java的实例源码-数据库连接池 BoneCP.zip

    `jsr166y`目录可能包含了JSR 166y的兼容库,这是一个Java并发API的扩展, BoneCP可能利用其中的并发工具类来优化连接池的线程安全性和性能。例如,`java.util.concurrent.locks`包中的锁机制可能被用于确保在多线程...

    基于java的数据库连接池 BoneCP.zip

    `jsr166y`可能包含了Java并发库的扩展,如`java.util.concurrent`包中的类,这些类在BoneCP中可能用于实现线程安全的连接管理和并发控制。BoneCP利用了高级并发工具,如`Future`和`ExecutorService`,来优化连接的...

    Android 7.0源码(Nougat)

    `jsr166`是Java并发包的一部分,包含了Java并发特性的实现,如线程池、Future和CyclicBarrier等。 综上所述,Android 7.0源码提供了深入研究Android系统的机会,它涵盖了从低级硬件交互到高级应用开发的全部层面。...

    Android5.1源码(Lollipop)

    "junit"和"jsr166"则是与测试和并发编程相关的库,Junit用于编写测试用例,JSR166则包含了Java并发API,如线程池、并发集合等。 总的来说,Android 5.1源码是一个庞大的工程,涉及到操作系统、应用程序框架、开发者...

    Java并发编程实战

    本书作者都是Java Community Process JSR 166专家组(并发工具)的主要成员,并在其他很多JCP专家组里任职。Brian Goetz有20多年的软件咨询行业经验,并著有至少75篇关于Java开发的文章。 《Java并发编程实战》深入浅...

    Java解惑.中文完整版

    I cried, I threw up (my hands in admiration)." --Tim Peierls, president, Prior Artisans LLC, and member of the JSR 166 Expert Group How well do you really know Java? Are you a code sleuth? Have you ...

    Android 7.1.1源码(Nougat)

    4. **标准Java库**:`javax`和`jsr166`目录包含了Java标准库和JSR 166(并发包)的实现,为Android提供了丰富的基础功能支持。 5. **组织结构与依赖**:`org`目录包含了各种开源库和框架的源码,如Apache HTTP...

    Android8.0 源代码(O)

    7. **jsr166**: JSR 166是Java Community Process的一个规范,涉及并发API,如Future、CountDownLatch和CyclicBarrier。在Android 8.0源码中,这个部分可能包含了实现这些并发工具的代码,对于理解多线程编程和系统...

    Java多线程同步[文].pdf

    对于Java并发,还有一个重要的参考资料,即Java并发编程社区(JCP)发布的JSR166。JSR166是一个由Doug Lea领导的专家组,旨在改进Java平台的并发工具包。从JDK 1.5版本开始,该专家组引入了`java.util.concurrent`...

    J2SE各个版本帮助文档

    - **Java SE 6**:加强了GUI、数据库连接池、Web服务支持,并引入了JSR 166(并发工具包)。 - **Java SE 7**:添加了try-with-resources语句、动态类型语言支持、文件系统API(JSR 203)以及多路复用I/O(NIO.2)...

    Java 8 Nashorn 脚本引擎

    4. **API集成**:Nashorn提供了`jsr166y`库,包含了一些额外的JavaScript库函数,这些函数与Java的并发API类似,如`Future`和`ExecutorService`,使得JavaScript代码也能进行复杂的并发处理。 5. **全局作用域和上...

Global site tag (gtag.js) - Google Analytics