`

第六章 - 使用Fork/Join 框架

阅读更多

Java 7 引入了一个特殊的 executor 使用 Fork/Join 框架。Fork/Join 框架用来解决那些能使用分治算法解决的问题

 

Fork/Join 框架介绍

为了使用分治算法,你必须把一个问题分解成小问题。使用递归方法来重复相同的操作直到问题被细分为能直接被解决的足够小的问题。这些小问题可以用 executor 来解决,但是为了更有效地解决这些小问题,Java 7 引入了 Fork/Join 框架。

这个框架基于 ForkJoinPool 类,它是一个特殊的 executor,它包含有两个操作 fork() 和 join()  (以及它们的变种形式),以及一个名为工作窃取算法的内部算法。

 

Fork/Join 框架的基本特征

使用此框架时,你的主方法代码类似于:

if ( problem.size() > DEFAULT_SIZE) {
       divideTasks();
       executeTask();
       taskResults=joinTasksResult();
       return taskResults;
   } else {
       taskResults=solveBasicProblem();
       return taskResults;
}

 代码中最重要的部分是允许你有效地分解并执行子任务并从这些子任务中获得运行结果用来计算父任务的结果。这个功能由 ForkJoinTask 的以下两个方法提供:

  • fork() 方法:这个方法允许你发送一个子任务给 Fork/Join executor
  • join() 方法:该方法允许你等待一个子任务的完成并返回其运行结果

这两个方法还有其它不同的变形。Fork/Join 框架还有一个关键的特性:决定哪些任务被执行的工作窃取算法。当一个任务使用 join() 方法等待一个子任务完成时,执行此任务的线程会从任务池中取不同的任务运行。通过此方法,Fork/Join executor 的线程总是在执行任务,从而提高了应用的性能。

 

Java 8 提供了 Fork/Join 框架的一个新特性。现在每个 Java 应用程序都有一个默认的名为 ForkJoinPool 的通用池。你可以通过调用静态方法 ForkJoinPool.commonPool() 来获得。默认情况下这个默认的 Fork/Join executor 会被使用,executor 里线程数由运行计算机的可用处理器数来决定。你可以通过改变操作系统值 java.util.concurrent.ForkJoinPool.common.parallelism 来改变其默认行为。

 

一些 Java API 本身也使用 Fork/Join 框架来实现并行操作。例如用并发方式来对数组排序的 Arrays 类的 parallelSort() 方法,以及 Java 8 提供的 parallel streams。

 

Fork/Join 框架的局限性

Fork/Join 框架具有以下局限性:

  • 你不会再细分的基本问题不能太小也不能太大。根据 Java API 文档,它必须具有 100 到 10,000 个基本运算步骤
  • 你不能使用能阻塞的 I/O 操作,例如用户输入或从网络中获取数据。这些操作会造成CPU处理器空闲,从而降低并发处理速度
  • 你不能在任务里抛出checked exceptions。你必须在代码中获取这些 checked exceptions (例如把它们封装成 unchecked RuntimeException),Unchecked exceptions会被特殊处理。

Fork/Join 框架的组件

Fork/Join 框架有以下五个基础类:

  • ForkJoinPool类:该类实现了 Executor 和 ExecutorService 接口,它用来执行 Fork/Join 任务。Java 提供了一个默认的 ForkJoinPool 对象 (通用池),但你可以使用一些构造函数来创建你自己的对象。你必须制定并发级别 (最大的运行并发线程数)。默认情况下,它使用可用的处理器数作为并发级别。
  • ForkJoinTask类:该类是所有的 Fork/Join 任务的基础抽象类。它提供了 fork() 和 join() 方法以及一些其他的变种。同时它也实现了 Future 接口并提供了一些方法来判断一个任务是否正常结束,被取消或是否抛出了 unchecked 异常。RecursiveTask, RecursiveAction 以及 CountedCompleter 类提供了 compute() 这个抽象犯法,你必须在子类中实现它来做一些实际的计算操作。
  • RecursiveTask类:该类继承了 ForkJoinTask 类。它也是一个抽象类,你可以使用它如果你要实现有返回值的 Fork/Join 任务。
  • RecursiveAction类:该类继承了 ForkJoinTask类。它也是一个抽象类,你可以使用它如果你要实现没有返回值的 Fork/Join 任务。
  • CountedCompleter类:该类继承了 ForkJoinTask 类。它是 Java 8 API 的一个新特性。如果任务结束后需要触发其它任务,你可以使用它。

 

public class ForkJoinDemo {
    public static void main(String[] args) {
        int[] ints = IntStream.range(1, 5).toArray();

        ForkJoinPool forkJoinPool = new ForkJoinPool(4);
        SumTask sumTask = new SumTask(ints, 0, ints.length, 1);
        long startTime = System.currentTimeMillis();
        int sum = forkJoinPool.invoke(sumTask);
        long endTime = System.currentTimeMillis();
        System.out.println("Fork/join sum: " + sum + " in " + (endTime - startTime) + " ms.");
        forkJoinPool.shutdown();
        try {
            forkJoinPool.awaitTermination(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class SumTask extends RecursiveTask<Integer> {

    private int[] dataList;
    private int from;
    private int end;
    private int divideFactor;

    public SumTask(int[] dataList, int from, int end, int divideFactor) {
        this.dataList = dataList;
        this.from = from;
        this.end = end;
        this.divideFactor = divideFactor;
    }

    @Override
    protected Integer compute() {

        if ((end - from) <= divideFactor) {
            int sum = 0;
            for (int i = from; i < end; i++) {
                sum += dataList[i];
            }

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
            System.out.println(String.format("%s : compute %d~%d = %d", Thread.currentThread().getName(), from, end, sum));

            return sum;
        } else {
            int middle = (end + from) / 2;
            System.out.println(String.format("%s : split %d~%d ==> %d~%d, %d~%d", Thread.currentThread().getName(), from, end, from, middle, middle, end));
            SumTask task1 = new SumTask(dataList, from, middle, divideFactor);
            SumTask task2 = new SumTask(dataList, middle, end, divideFactor);

            // 这里我们使用 invokeAll() 方法,如果使用 task1.fork() 和 task2.fork() 方法
            // 会导致当前线程变成监工,不参与子任务的执行,从而降低性能
            invokeAll(task1, task2);

            int sum1 = task1.join();
            int sum2 = task2.join();

            int sum = sum1 + sum2;
            System.out.println(Thread.currentThread().getName() + " : " + "result = " + sum1 + " + " + sum2 + " ==> " + sum);

            return sum1 + sum2;
        }
    }
}

 

Fork/Join 框架

ForkJoinPool 类提供了 execute(),invoke(),submit() 方法来向线程池中提交任务。它们之间的区别是:

  • execute():发送任务给 ForkJoinPool 并立刻返回一个 void 值 (即提交无返回值的任务)
  • invoke():发送任务给 ForkJoinPool,当任务运行结束才返回
  • submit():发送任务给 ForkJoinPool,立刻返回一个 Future 对象用来控制任务的状态并获取结果

我们知道 ForkJoinPool 处理基于 ForkJoinTask 类的任务,但是也能执行基于 Runnable 和 Callable 接口的任务。你可以使用 submit() 方法,该方法可以接收一个 Runnable 对象,一个有返回值的 Runnable 对象,以及一个 Callable 对象。

 

ForkJoinTask:

  • get (long timeout, TimeUnit unit) 来获取任务返回的结果。这个方法在指定时间内等待任务执行完并返回结果。如果指定时间之内任务无法完成,则抛出 TimeoutException 异常。
  • invoke() 方法从语法上类似于 fork(); join() 但它总是尝试在当前线程执行。
  • 名字以 “quiet" 开始的方法没有返回运行结果也不报告异常。这些方法当有多个任务在执行时,结果或异常在运行结束后才被处理是很有用。
  • 一对 fork-join 就像并行递归方法里的 call (fork) 和 return (join)。因此 join 必须从最里面开始,例如, a.fork();b.form();b.join();a.join() 比 a 在 b 之前 join 更有效率。
  • 大小: 127.4 KB
分享到:
评论

相关推荐

    JUC面试知识点手册快速版

    第一章:Java并发简介 1.1 什么是并发编程 1.2 Java中的并发编程模型 1.3 线程的生命周期 第二章:基础同步工具 2.1 synchronized关键字 ...第七章:Fork/Join框架 7.1 ForkJoinPool 等等

    java 并行爬取网页

    3. **Fork/Join框架**:Fork/Join框架的核心是`ForkJoinPool`和`ForkJoinTask`。前者是线程池,后者是任务类,支持任务的分解和合并。使用`fork()`方法分解任务,`join()`方法等待子任务完成并合并结果。 4. **...

    JAVA高质量并发详解,多线程并发深入讲解

    - **第6章:ExecutorService的使用** 深入探讨如何创建和使用`ExecutorService`,以及如何通过它管理线程生命周期。 - **第7章:Runnable与Callable的区别** 对比`Runnable`和`Callable`接口的不同之处,特别是...

    Java相关的技术文档保存

    Java 7引入了新的并发工具,如Fork/Join框架和并发集合。学习这部分内容包括: - 线程基础:创建和管理线程,理解同步和互斥。 - 锁机制:synchronized关键字、ReentrantLock等。 - 高级并发工具:如...

    jdk-7u80-windows-x64.exe

    "jdk-7" 指的是Java SE(标准版)7,是Java的一个重要版本,引入了多个新特性,如动态类型语言支持( invokedynamic 字节码指令)、Fork/Join框架、并发改进等。"jdk" 是Java Development Kit的缩写,是开发Java应用...

    linux-solaris.zip

    - 并发改进:包括Fork/Join框架和Parallel Streams,提升了多线程编程的效率。 2. **Linux Solaris操作系统**: - Linux Solaris是Oracle Solaris操作系统的一个变体,它在Linux内核上运行Solaris用户空间程序,...

    jdk7官方源码工程

    【标签】"jdk7"指代了Java平台的第七个主要版本,它在2011年发布,引入了许多新的特性,如try-with-resources语句、多线程的Fork/Join框架、改进的类型推断(Project Coin)、文件系统API(NIO.2)以及并发工具的...

    JDK7新特性(完整篇)

    Fork/Join框架是Java并行编程的一个重要工具,它基于分治策略,允许开发者将大任务拆分成小任务,然后并行执行,最后合并结果,显著提高了处理大规模计算任务的能力。 6. **JDK7新特性&lt;六&gt; 监听文件系统的更改** ...

    JDK1.7免安装版JAVA

    9. **Fork/Join框架**:Java 7引入了一个新的并行编程模型,Fork/Join框架,它基于分治策略,适用于计算密集型任务,可以充分利用多核处理器的性能。 10. **动态语言支持**:Java 7添加了 invokedynamic 指令,为...

    linux 64位环境jdk1.7安装文件

    Java 7是Oracle公司于2011年发布的版本,引入了多项新特性,如try-with-resources语句、多线程的Fork/Join框架、类型推断、改进的字符串处理等。虽然现在已有更新的Java版本,但一些遗留项目可能仍依赖于Java 7。 *...

    jdk7-linux-x64.tar.gz

    1. **多线程并发控制**:引入了Fork/Join框架,通过工作窃取算法提高并行计算效率,同时提供了`Phaser`、`CountDownLatch` 和 `CyclicBarrier` 等并发工具类。 2. **Strings in Switch**:在switch语句中可以直接...

    jdk-7u80-linux-x64.tar.gz

    4. 并发改进:Fork/Join框架和Parallel Streams的引入,使得并行编程更容易且更有效。 5.钻石操作符:在创建匿名对象时,编译器能够自动推断泛型类型的实参,简化了代码。 压缩包内的"jdk1.7.txt"可能包含的是JDK ...

    包含1.7JDK 64位,1.6JDK64位

    - **Fork/Join框架**:用于并行执行任务的框架,提高了多核处理器上的程序性能。 2. JDK 1.6(Java 6) JDK 1.6,即Java SE 6,是在2006年12月发布的主要版本。相比之前的版本,它包含了大量增强和优化,包括: ...

    jdk-7u79-windows-x64

    1. **多线程并行流**:Java 7引入了Fork/Join框架,允许开发者利用多核处理器并行执行任务,提高了处理大量数据的速度。 2. **钻石操作符**:在创建匿名类实例或使用泛型时,自动推断类型,使得代码更简洁,如`new ...

    bellsoft-jdk8u322+6-windows-i586

    此外,JDK 8还增强了并发处理,如Fork/Join框架和Parallel Streams,提升了多核环境下的性能。 在标签中,“jdk8”和“openjdk8”是关键词,分别代表了Java 8和OpenJDK的第八个主要版本。Java 8是Oracle JDK和...

    java8高级应用与开发课件及案例代码

    本章将讲述线程的创建、同步、死锁避免、线程池的使用,还会涉及Java8中新增的并发工具,如Fork/Join框架和Stream API。 6. **第6章 网络编程**: Java提供了Socket编程接口来实现网络通信。本章会讲解TCP和UDP协议...

    jdk-7u271-linux-x64

    6. **Fork/Join框架**:这个并行计算框架使得开发者能够更容易地利用多核处理器的性能,通过JSR 166实现。 7. **改进的编译器**:JDK 7的编译器(称为Javac)进行了优化,提供了更好的类型推断和更快的编译速度。 ...

    jdk-7u80-windows-x64.zip

    - 并发改进:如Fork/Join框架,用于并行执行任务。 - 文件系统API(NIO.2):提供了更强大的文件操作能力,如异步文件通道和文件属性查询。 - 字符串inswitch语句:允许在switch语句中直接使用字符串。 - try-...

    java7源码-LearnJava7ConcurrencyCook:Java7ConcurrencyCook这本书源代码的学习和注解

    java 7 源码 LearnJava7ConcurrencyCook Java7ConcurrencyCook 这本书源代码的学习和注解 第一章 线程的管理 第二章 线程同步基础 synchronized关键字 ...第六章 并发容器 第七章 定制并发类 第八章 测试并发应用

Global site tag (gtag.js) - Google Analytics