`

[转]Java7中的ForkJoin并发框架初探(下)—— ForkJoin的应用

阅读更多

详见: http://blog.yemou.net/article/query/info/tytfjhfascvhzxcytp86

 

前两篇文章已经对Fork Join的设计和JDK中源码的简要分析。这篇文章,我们来简单地看看我们在开发中怎么对JDK提供的工具类进行应用,以提高我们的需求处理效率。

Fork Join这东西确实用好了能给我们的任务处理提高效率,也为开发带来方便。但Fork Join不是那么容易用好的,我们先来看几个例子(反例)。

0. 反例错误分析

我们先来看看这篇文章中提供的例子:http://www.iteye.com/topic/643724 (因为是反例,就不提供超链接了,只以普通文本给出URL)

这篇文章是我学习和整理Fork Join时搜索到的一篇文章,其实总的来说这篇文章前面分析得还是比较好的,只是给出的第一个例子(有返回结果的RecursiveTask应用的例子)没有正确地对Fork Join进行应用。为了方便分析,还是贴下这个例子中具体的的代码吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Calculator extends RecursiveTask {
 
    private static final int THRESHOLD = 100;
    private int start;
    private int end;
 
    public Calculator(int start, int end) {
        this.start = start;
        this.end = end;
    }
 
    @Override
    protected Integer compute() {
        int sum = 0;
        if((start - end) < THRESHOLD){
            for(int i = start; i< end;i++){
                sum += i;
            }
        }else{
            int middle = (start + end) /2;
            Calculator left = new Calculator(start, middle);
            Calculator right = new Calculator(middle + 1, end);
            left.fork();
            right.fork();
 
            sum = left.join() + right.join();
        }
        return sum;
    }
 
}

我们看到其中一段已经高亮的代码,显示对两个子任务进行fork()调用,即分别提交给当前线程的任务队列,依次加到末尾。紧接着,又按照调用fork()的顺序执行两个子任务对象的join()方法。

其实,这样就有一个问题,在每次迭代中,第一个子任务会被放到线程队列的倒数第二个位置,第二个子任务是最后一个位置。当执行join()调用的时候,由于第一个子任务不在队列尾而不能通过执行ForkJoinWorkerThread的unpushTask()方法取出任务并执行,线程最终只能挂起阻塞,等待通知。而Fork Join本来的做法是想通过子任务的合理划分,避免过多的阻塞情况出现。这样,这个例子中的操作就违背了Fork Join的初衷,每次子任务的迭代,线程都会因为第一个子任务的join()而阻塞,加大了代码运行的成本,提高了资源开销,不利于提高程序性能。

除此之外,这段程序还是不能进入Fork Join的过程,因为还有一个低级错误。看下第15、16行代码的条件,就清楚了。按照逻辑,start必然是比end小的。这将导致所有任务都将以循环累加的方式完成,而不会执行fork()和join()。

由此可见,Fork Join的使用还是要注意对其本身的理解和对开发过程中细节的把握的。我们看下JDK中RecursiveAction和RecursiveTask这两个类。

1. RecursiveAction分析及应用实例

这两个类都是继承了ForkJoinTask,本身给出的实现逻辑并不多不复杂,在JDK的类文件中,它的注释比源码还要多。我们可以看下它的实现代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public abstract class RecursiveAction extends ForkJoinTask<Void> {
    private static final long serialVersionUID = 5232453952276485070L;
 
    protected abstract void compute();
 
    public final Void getRawResult() { return null; }
 
    protected final void setRawResult(Void mustBeNull) { }
 
    protected final boolean exec() {
        compute();
        return true;
    }
}

我们看到其中两个方法是关于处理空返回值的方法。而exec方法则是调用了compute(),这个compute就是我们使用Fork Join时需要自己实现的逻辑。

我们可以看下API中给出的一个最简单最具体的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class IncrementTask extends RecursiveAction {
   final long[] array; final int lo; final int hi;
   IncrementTask(long[] array, int lo, int hi) {
     this.array = array; this.lo = lo; this.hi = hi;
   }
   protected void compute() {
     if (hi - lo < THRESHOLD) {
       for (int i = lo; i < hi; ++i)
         array[i]++;
     }
     else {
       int mid = (lo + hi) >>> 1;
       invokeAll(new IncrementTask(array, lo, mid),
                 new IncrementTask(array, mid, hi));
     }
   }
 }

大致的逻辑就是,对给定一个特定数组的某段,进行逐个加1的操作。我们看到else中的代码块,显示取一个lo和hi的中间值,此后分割成两个子任务,并进行invokeAll()调用。我们来看下继承自FutureTask的invokeAll()方法实现。很简单:

1
2
3
4
5
public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
    t2.fork();
    t1.invoke();
    t2.join();
}

对于参数中的两个子任务,对第二个子任务进行fork(),即放入线程对应队列的结尾,然后执行第一个子任务,再调用第二个子任务的join(),实际上就是跳转到第二个子任务,进行执行(当然如果不能执行,就需要阻塞等待了)。

其实invokeAll()是个重载方法,同名的还有另外两个,基本逻辑都是一样的,我们拿出一个通用一点的来看一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static void invokeAll(ForkJoinTask<?>... tasks) {
    Throwable ex = null;
    int last = tasks.length - 1;
    for (int i = last; i >= 0; --i) {
        ForkJoinTask<?> t = tasks[i];
        if (t == null) {
            if (ex == null)
                ex = new NullPointerException();
        }
        else if (i != 0)
            t.fork();
        else if (t.doInvoke() < NORMAL && ex == null)
            ex = t.getException();
    }
    for (int i = 1; i <= last; ++i) {
        ForkJoinTask<?> t = tasks[i];
        if (t != null) {
            if (ex != null)
                t.cancel(false);
            else if (t.doJoin() < NORMAL && ex == null)
                ex = t.getException();
        }
    }
    if (ex != null)
        UNSAFE.throwException(ex);
}

我们发现第一个子任务(i==0的情况)没有进行fork,而是直接执行,其余的统统先调用fork()放入任务队列,之后再逐一join()。其实我们注意到一个要点就是第一个任务不要fork()再join(),也就是上面中例子的错误所在,这样会造成阻塞,而不能充分利用Fork Join的特点,也就不能保证任务执行的性能。

Oracle的JavaSE7 API中在RecursiveAction里还有一个更复杂的例子,是计算double数组平方和的,由于代码较长,就不列在这里了。总体思路和上面是一样的,额外增加了动态阈值的判断,感兴趣的想深入理解的可以到这里去参考一下。

http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/RecursiveAction.html

2. RecursiveTask简要说明

其实说完了RecursiveAction,RecursiveTask可以用“同理”来解释。实现代码也很简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
    private static final long serialVersionUID = 5232453952276485270L;
 
    V result;
 
    protected abstract V compute();
 
    public final V getRawResult() {
        return result;
    }
 
    protected final void setRawResult(V value) {
        result = value;
    }
 
    protected final boolean exec() {
        result = compute();
        return true;
    }
 
}

我们看到唯一不同的是返回结果的处理,其余都可以和RecursiveAction一样使用。

3. Fork Join应用小结

Fork Join是为我们提供了一个非常好的“分而治之”思想的实现平台,并且在一定程度上实现了“变串行并发为并行”。但Fork Join不是万能的页不完全是通用的,对于可很好分解成子任务的场景,我们可以对其进行应用,更多时候要考虑需

分享到:
评论

相关推荐

    ForkJoin并发框架入门示例

    ForkJoin并发框架是Java 7引入的一种高效并行计算框架,它基于分而治之(Divide and Conquer)的策略,适用于处理大量可分割的任务。这个框架的核心类是`ForkJoinPool`和`ForkJoinTask`,它们为开发者提供了创建和...

    Java并发Fork-Join框架原理

    Java并发Fork-Join框架原理是Java7中提供的一种并行执行任务的框架,旨在提高程序的执行效率和性能。该框架的核心思想是将大任务分割成若干个小任务,并将其分配给不同的线程执行,以充分利用多核CPU的计算能力。 ...

    探索Java并发:Future与ForkJoin框架深度解析

    ForkJoin框架 是Java 7中引入的,旨在进一步提高并发程序的性能。它使用了一种称为“工作窃取”的算法,允许线程动态地重分配任务。ForkJoin的核心思想是将大任务分解为更小的任务,然后并行处理这些任务,最后合并...

    java8中forkjoin和optional框架使用

    Fork/Join 框架是 Java 7 中引入的一种新的并发编程模型,它可以将一个大任务拆分成多个小任务,并将这些小任务分配给多个线程来执行,然后将这些小任务的结果合并起来,形成最终的结果。 Fork/Join 框架的优点是...

    基于JDK的ForkJoin构建一个简单易用的并发组件1

    在Java编程中,线程池和并发任务执行是常见的方式,但在某些场景下,我们可以利用JDK的ForkJoin框架来构建更加高效和易用的并发组件。ForkJoin框架自Java 7引入,它为处理大型任务提供了一种分解成多个子任务并行...

    Java ForkJoin框架的原理及用法

    Java ForkJoin框架是Java 1.7后提供的一种多线并发处理框架,主要思想是分而治之,将复杂的计算按照设定的阈值进行分解成多个计算,然后将各个计算结果进行汇总。ForkJoin框架的使用可以提高数据的计算速度,但需要...

    Java8集合 CompletableFuture lambda表达式 新的TimeAPI 和ForkJoin Demo包

    ForkJoin框架是Java 7引入的,但Java 8对其进行了优化。它是一种基于工作窃取算法的并行计算模型,适用于分解大型任务为小任务并行处理,然后合并结果。`RecursiveTask`和`RecursiveAction`是ForkJoin框架的基础,...

    Java并发编程之——Amino框架

    书中的内容可能涵盖了如何通过调整系统配置、优化算法和数据结构,以及使用像Amino这样的框架来最大化Java应用程序的性能。 总之,理解和掌握Java并发编程以及相关的工具框架,如Amino,对于开发高效、稳定的分布式...

    高并发Web架构实现思路——java版

    ### 高并发Web架构实现思路——Java版 #### 一、引言 随着互联网的快速发展,Web应用系统面临着巨大的挑战,尤其是如何处理高并发、海量数据的情况。传统的Web架构已经无法满足当前的需求,这就需要我们探索新的...

    67-ForkJoin框架学习笔记1

    ForkJoin框架是Java并发编程中的一个重要工具,它基于分治策略,旨在高效处理大量数据。框架的核心思想是将一个大型任务分解成多个小型任务,然后通过并行执行这些子任务来提高处理效率。ForkJoin框架在Hadoop ...

    Fork Join框架机制详解.docx

    K/Join框架是Java并发处理的一个重要工具,它被设计用于高效地执行那些可以分解为更小子任务的计算密集型任务。这个框架的核心概念是工作窃取算法,它允许并行执行大量的任务,并通过合并子任务的结果来解决原始问题...

    《Java7并发编程实战手册》书中实例代码

    1. **Fork/Join框架**:Java 7引入了Fork/Join框架,这是一个用于并行执行任务的高级框架,基于工作窃取算法。它将大任务拆分为小任务,并在多个线程间分发,提高了计算密集型任务的执行效率。例如,`java.util....

    java高并发秒杀api源码

    在Java开发领域,高并发秒杀API是电商、抢购等场景中不可或缺的一部分。这个"java高并发秒杀api源码"很可能是一个实现这类功能的示例项目,它结合了Spring和MyBatis两大主流框架,以提升系统性能和可维护性。下面,...

    java并发编程实战源码,java并发编程实战pdf,Java

    9. ** Fork/Join框架**:这是一种并行计算模型,适用于那些可以拆分为子任务并进行并行处理的问题。 10. **并发模式**:书中可能还会介绍生产者消费者模式、读写锁模式、双端队列模式等经典的并发设计模式,帮助...

    java7并发编程实战手册+源码

    Java 7在并发方面引入了许多改进,包括Fork/Join框架、新的并发集合、更完善的线程池API等,这些都极大地简化了并发编程的工作。 1. **Fork/Join框架**:这是Java 7中的一个新特性,灵感来源于并行计算的分而治之...

    Java并发实战

    8. 并发控制的高级概念,如fork/join框架,它用于处理可以递归分解的并行任务;以及并发集合类,它们在多线程环境下提供了线程安全的集合操作。 在学习Java并发编程时,除了对上述概念的理解,还需要通过大量的实践...

    Java 7并发编程实战手册

    全书分为9章,涵盖了线程管理、线程同步、线程执行器、fork/join框架、并发集合、定制并发类、测试并发应用等内容。全书通过60多个简单而非常有效的实例,帮助读者快速掌握java 7多线程应用程序的开发技术。学习完...

Global site tag (gtag.js) - Google Analytics