`
QING____
  • 浏览: 2253228 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

JAVA Fork/Join框架

    博客分类:
  • JAVA
 
阅读更多

一、简述

    1、并发模型



 

 

     主线程可以将待执行(计算)任务,分解为多个(通常是2个,易于递归合并)子任务,并等待子任务的执行结果进而合并;所有任务,都会并行执行。

 

    2、伪代码

solve(problem):
    if problem is small enough:
        solve problem directly (sequential algorithm)
    else:
        for part in subdivide(problem)
            fork subtask to solve(part)
        join all subtasks spawned in previous loop
        return combined results

 

    任何任务都可以fork多个子任务,并等待和合并子任务结果(join);如果任务执行复杂度足够小,可直接执行。

 

    Fork产出的子任务,与当前任务并行执行(单独的子线程),在join阶段同步等待直到fork的子任务执行结束;join就像一个barrier,不过join之后当前任务线程任然会继续执行。

 

    3、工作窃取(work-stealing)



  

二、ForkJoin示例

 

    public static void main(String[] args) throws Exception {
        long[] source = new long[100];
        for (int i = 0; i < 100; i++) {
            source[i] = i;
        }

        FJTask submission = new FJTask(source,0,99);
        ForkJoinPool pool = new ForkJoinPool();
        pool.submit(submission);

        //
        System.out.println(submission.get());//等待结果
    }

 

public class FJTask extends RecursiveTask<Long> {

    private final long[] source;
    private int start;
    private int end;//不包含

    public FJTask(long[] source,int start,int end) {
        this.source = source;
        this.start = start;
        this.end = end;
    }


    @Override
    protected Long compute() {
        int length = end  - start;
        if (length <= 32) {
            return directCompute();
        }
        //二分
        int m = start + (end - start) / 2;
        FJTask t1 = new FJTask(source,start,m);
        t1.fork();

        FJTask t2 = new FJTask(source,m + 1,end);
        t2.fork();

        return t1.join() + t2.join();

    }

    private long directCompute() {
        int result = 0;
        for (int i = start; i <= end; i++) {
            result += source[i];
        }
        return result;
    }
}

  

三、JAVA Fork/Join框架API介绍

     ForkJoin框架主要有ForkJoinPool、ForkJoinTask(以及其子类)组成;ForJoinPool继承自ExecutorService,由此可见它也是线程池任务执行容器,实现模型遵循Fork/Join,且执行Work-Stealing设计模式。ForkJoin框架在实际业务中,使用的场景并没有其他并发框架或者线程池常见,不过对于解决比如与分治策略有关的“多个小文件数据清洗或计算”、“并发迁移数据”、“流式数据计算”等比较有帮助;有关Fork/Join的原理分析,请参看下文。

 

    ForkJoinPool主要API

    1、ForkJoinPool(int parallelism,ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,boolean asyncMode)

    parallelism即为并发级别,也是线程池中workerThread的最大数量,默认为系统CPU个数,考虑到Fork/Join执行的任务通常为计算密集型,所以并发级别与CPU个数一致是合理的。最大并发级别为32767。

 

    factory即为worker线程的工厂,基本要求与我们常见的ThreadPoolFactory一样,facotry只有一个方法newThread(ForkJoinPool),此方法用于当线程池需要创建新的worker线程时调用,生成ForkJoinWorkerThread,此后Pool将新的线程加入到容器并启动。你可以自定义factory用于构建自定义的ForkJoinWorkerThread实例。

 

    UncaughtExceptionHandler为异常处理器,用于设定Thread实例,当线程异常退出时(主要是run方法抛出异常退出时),由此异常处理器执行。

 

    2、submit(ForkJoinTask):提交任务,任务将会被添加到队列,并有worker线程执行。此方法与ExecutorService保持一致,不会等待任务执行结果。

    3、execute(ForkJoinTask):同submit,只是submit会返回Task本身,而execute为void。

    4、invoke(ForkJoinTask):提交任务,并阻塞等待任务执行完毕。同submit,只是会额外封装了task.join(),返回值为任务执行结果(如果有异常,则抛出)。

 

    ForkJoinTask是个抽象类,扩展子Future,其本质与RunnableFuture没有太大区别,表示一个可被ForkJoinPool执行的任务单元,但是具有Fork/Join的特性。其主要实现类有:

    1、RecursiveTask抽象类,主要抽象方法为complete(),此方法返回执行的结果。(即Future.get可以返回值)

    2、RecursiveAction:抽象类,主要抽象方法为complete(),此方法没有返回值,即在泛型为Void。

 

    1、fork():分叉,派生;将当前Task实例添加到当前work线程的队列中,等到执行。(异步)

    2、join():交叉;阻塞等待当前Task执行完毕,并返回结果。

    3、invoke():阻塞等待任务执行完毕并返回结果(全同步)。invoke方法与join的执行过程稍有区别,join是优先检查当前task是否位于当前work线程任务队列的顶端,如果是,则直接执行,否则就阻塞等待work线程的调度(可能在当前work线程,也可能被其他work线程steal)和执行的结果;invoke方法,则是在当前work线程中直接执行。如果此Task在invoke之前,已经执行过fork,那么其作用同join。

    4、invokeAll(ForkJoinTask t1,t2):最常用方法,fork + join的过程整合,将t2任务fork异步执行,t1直接在当前work线程中执行。

@Override
protected Long compute() {
    int length = end  - start;
    if (length <= 32) {
        return directCompute();
    }
    //二分
    int m = start + (end - start) / 2;
    FJTask t1 = new FJTask(source,start,m);
    FJTask t2 = new FJTask(source,m + 1,end);
    invokeAll(t1,t2);
    return t1.join() + t2.join();
}

 

四、ForkJoin框架原理分析

    ForkJoin主要适用于:计算密集型任务,且任务可被分解执行;在此前提下,并发执行任务和分解的子任务,可以提高执行的效率,充分利用CPU和内存资源。无论如何ForkJoin中不建议使用Block IO以及额外的同步、锁等,这种阻塞操作,会导致ForkJoin的并发能力受限,而且严重时会导致框架的线程池调度失效。

    1、ForkJoinPool内部仍为线程池模式,且没有调度线程。

    2、Pool中在存取任务、worker线程执行和stealing任务时,几乎没有使用到任何的显式锁,其内部主要使用Unsafe + CAS;所有标记状态、计数等可能被并发访问的字段,都经过CAS操作。尽管JDK中已经存在很多并发API,比如ArrayBlockingQueue等,但是ForkJoinPool并没有直接使用它们,而是继续CAS重建了相关语义。

    此外,因为ForkJoin中没有调度线程,比如均衡分配任务、唤醒等Master线程;所以所有的提交任务的线程、Worker线程,都会在各自的时机上,承担一定的调度职能,worker线程的stealing机制潜在引入并发问题,ForkJoinPool的设计巧妙之处,就是在没有使用Lock的情况下,能够达到并发安全。

 

    通过源码发现,Pool中很多实现,都是刻意避免并发(线程的join),而不是lock来兼容和支持并发。

 

    3、ForkJoinPool框架中,并没有太多的、复杂的数据结构,用于保存比如线程池状态、任务列表和状态等。而是使用简单类型的字段,通过位运算来实现很多的状态标记,通过使用CAS原子性更新实现类似于锁状态(qlock)。

 

 

    ForkJoinPool:

    1、内部使用使用一个可以按需扩容的数据表示线程池:WorkQueue[] workerQueues,此数据总是2倍扩容,其实际最大容量与“ parallelism”保持一致。WorkQueue实例中持有一个Worker线程和Task队列,可以简单认为一个WorkQueue就是一个工作线程单元;

    1)在任务提交时(submit/execute),如果workQueues尚未达到“并发级别”,即当前线程池未满,则直接创建新的WorkerQueue(worker线程)并由其执行当前任务。此时还有workerQueues扩容问题。

    2)如果workerQueues容量达到阈值(即线程池满),将随机(ThreadLocalRandom)从workerQueues中找个workQueue并将此task添加到其任务队列中,之所以随机查找workQueues,也是为了避免并发提交时,submission线程并发操作workQueues的概率;有一种特殊的情况,就是随机找到的这个workQueue可能为null,因为此workQueue持有的工作线程因为各种原因(比如空闲超时,异常退出)被注销了(deregister),此时将会创建一个新的WorkQueue实例并补偿在此位置。

    3)ForkJoinPool内部并没有scheduler线程。

    4)我们可以通过shutdown/shutdownNow来关闭Pool,此后Pool将不会接收新的submission;此时将会触发所有worker线程的退出操作(deregister):修改线程状态、唤醒阻塞、清空本地任务列表等。

   

    WorkQueue:

    1、逻辑上的worker线程,内部持有实际的ForkJoinWorkerThread和Task列表。同时,还持有pool的引用。

    2、ForkJoinWorkerThread同线程池中的worker线程。

    3、Task列表,作为WorkQueue的本地任务队列,此任务队列,是一个Deque,但是并没有使用JDK中已有的LinkedBlockDeque,而是继续ArrayBlockingQueue的思想(循环数组) + CAS实现的简单双端队列,最终由WorkQueue支持三种操作:poll、push、pop用于操作此Task列表;其中poll和push有当前ForkJoinWorkerThread操作,poll使用FIFO方式弹出任务并执行,push有submission线程添加任务到队列尾部,此外当前线程窃取操作也会执行push;对于pop,将有其他worker线程窃取任务时,从队列尾部窃取(LIFO)。

    4、WorkQueue对象,将有submission线程提交任务时创建。此外,Fork子任务时,优先将子任务添加到本地队列,同时也会触发线程池容量检测,如果workQueues容量未满,也会创建新的worker线程和workQueue对象。(参见signalWorker())

 

    ForkJoinWorkerThread:

    1、同ExecutorService中的worker线程,守护线程。有WorkQueue引用并持有,同时也持有workQueue和pool的引用。

    2、在submission线程提交任务时,如果线程池容量尚未达到并发级别上限,将会优先创建新的worker线程并由其执行task。在其他worker线程中fork操作时,优先将子任务添加到本地任务列表,并检测线程池容量(即workQueues大小,当然源码中有单独的便捷字段来控制),如果未达到上线,也将会触发创建新的worker线程。

    3、worker线程启动后,将会优先检测其workQueue中的本地队列是否有亟待执行的任务,如果有则逐个执行。直到任务队列全部执行完毕,开始steal。

    4、steal操作,也是ForkJoinPool的核心特性;当worker线程的本地任务队列执行完毕后,触发stealing;从Pool的workQueues的随机位置开始,按照一定的节奏推进(跳)(随机 + 跳,主要目的就是避免与其他worker线程的并发冲突,因为stealing操作可能在多个worker线程同时发生),检测遇到的每个workQueue的本地队列,如果有剩余任务,则使用workQueue.pop窃取一个;直到遍历完workQueues,如果还没有steal到任何任务,这说明整个线程池中任务数量相对较少(饥饿),为了避免整个线程池都处于盲目的、无休止的steal,此时worker线程将会被“钝化”(park)一段时间,并等待submission线程或者其他worker线程fork操作唤醒;如果一个worker线程IDLE一段时间之后(2秒),将会被释放销毁。(scan(),tryRelease())。

 

    5、worker线程在执行任何任务时,都会尝试捕捉异常,并将异常作为Futrue的结果,在使用ForkJoinTask.get()时重新抛出。

    6、由此可见,我们不能再ForkJoinTask中使用时间不可控的block操作,假如所有的ForkJoinTask的work线程都阻塞,整个线程池也将完全失效(即不能fork新任务触发唤醒,worker线程也无法触发其他线程的唤醒)。

 

 

     参考文档:

     1)http://gee.cs.oswego.edu/dl/papers/fj.pdf

     2)https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html

     3)https://en.wikipedia.org/wiki/Work_stealing

     4)https://en.wikipedia.org/wiki/Fork%E2%80%93join_model

  • 大小: 19.5 KB
  • 大小: 12.8 KB
分享到:
评论

相关推荐

    java Fork Join框架及使用

    Fork/Join框架是Java7引入的一种用于并行任务执行的框架,它允许将复杂任务拆分成多个子任务,并行执行,然后通过join操作将结果聚合。Fork/Join框架特别适合处理可以递归拆分的计算密集型任务,比如大数据集的搜索...

    Java Fork/Join框架

    Java Fork/Join框架是Java 7引入的一种并行计算模型,设计目的是为了高效地处理大量数据,尤其是在多核处理器环境中。该框架的核心理念是通过将复杂的大任务分解为多个小任务,然后并行执行这些小任务,从而加速计算...

    java NIO用法及java fork/join 用法源码工程

    现在转向Java的Fork/Join框架,它是Java 7引入的一个并行计算模型,基于分而治之的策略。Fork/Join框架主要由以下几个关键组件组成: 1. **ForkJoinPool**:这是执行ForkJoinTask的线程池。它不同于普通的...

    Fork/Join例子

    在Java编程领域,Fork/Join框架是一种并行计算模型,设计用于高效处理大量数据,尤其是在多核处理器系统上。这个框架是Java 7引入的一个重要特性,它基于分而治之(Divide and Conquer)策略,将复杂任务拆分为更小...

    Fork/Join框架Package jsr166y

    Fork/Join框架Package jsr166y是Java 7并行编程类的的初步版本(Preliminary versions of classes targeted for Java 7.)

    Java中的Fork,Join框架深度解析

    Java的Fork/Join框架是一种用于并行计算的框架,它基于分治法的原理,将大任务分解成小任务并行执行,最后再将结果合并。这种框架特别适合于可以分解为多个子任务且子任务可以并行处理的场景。本文将详细介绍Fork/...

    fork/join 实例

    Fork/Join框架是Java并发处理的一个重要工具,它基于工作窃取算法,设计用于高效地执行并行计算任务。这个框架是Java 7引入的,位于`java.util.concurrent.fork/join`包中,目的是简化多核处理器环境下大规模数据...

    Java中的Fork/Join框架

     fork/join框架是ExecutorService接口的一个实现,可以帮助开发人员充分利用多核处理器的优势,编写出并行执行的程序,提高应用程序的性能;设计的目的是为了处理那些可以被递归拆分的任务。  fork/join框架与...

    浅谈Java Fork/Join并行框架

    Java Fork/Join 并行框架 Java Fork/Join 并行框架是 Java 7 中引入的一个并行任务框架,可以将任务分割成足够小的小任务,然后让不同的线程来做这些分割出来的小事情,然后完成之后再进行 join,将小任务的结果...

    Java并发Fork and join

    Fork/Join框架是Java并发库中的一部分,自Java 7开始引入,它为开发者提供了一种高效的处理大规模计算任务的方法。这个框架基于分治策略,将大任务分解成若干小任务,然后并行执行这些小任务,最后再将结果合并。...

    java fork-join框架介绍

    fork/join框架是ExecutorService接口的一个实现,可以帮助开发人员充分利用多核处理器的优势,编写出并行执行的程序,提高应用程序的性能;设计的目的是为了处理那些可以被递归拆分的任务。

    Fork Join框架机制详解.docx

    在Java中,Fork/Join框架主要由`ForkJoinPool`线程池和`ForkJoinTask`任务类组成。 1. `ForkJoinPool`:这是Fork/Join框架的工作线程池。它维护着一组工作线程,用于执行`ForkJoinTask`。线程池中的每个工作线程都...

    译文:Fork and Join: Java Can Excel at Painless Parallel Programming Too!

    本文将简要回顾Java中的并发编程基础知识,介绍java.util.concurrent包提供的高级并发原语,并深入探讨Fork/Join框架及其在Java SE 7中的应用。 首先,让我们回顾一下Java中基本的并发机制。自Java早期版本起,线程...

    ForkJoinUtil.java,一个分而治之的框架工具类

    Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。 我们再通过Fork和Join这两个单词来理解下Fork/Join框架,Fork就是...

    Java通过Fork/Join优化并行计算

    Java的Fork/Join框架是Java 7引入的一个并行计算工具,它是基于分而治之(Divide and Conquer)策略的。该框架旨在简化并行编程,尤其是在多核处理器环境中提高性能。Fork/Join框架的核心类包括`ForkJoinPool`和`...

    Java并发Fork-Join框架原理

    Java并发Fork-Join框架原理 Java并发Fork-Join框架原理是Java7中提供的一种并行执行任务的框架,旨在提高程序的执行效率和性能。该框架的核心思想是将大任务分割成若干个小任务,并将其分配给不同的线程执行,以...

    35 拆分你的任务—学习使用Fork-Join框架.pdf

    在Java并发编程中,Fork/Join框架是一个强大的工具,尤其在处理大量数据时能显著提升性能。这个框架从Java 7开始引入,是ExecutorService的一个实现,它基于分而治之的策略,将大任务分解成多个小任务,然后并行地...

    Java ForkJoin框架的原理及用法

    Java ForkJoin框架的原理及用法 Java ForkJoin框架是Java 1.7后提供的一种多线并发处理框架,主要思想是分而治之,将复杂的计算按照设定的阈值进行分解成多个计算,然后将各个计算结果进行汇总。ForkJoin框架的使用...

Global site tag (gtag.js) - Google Analytics