`
不爱不见
  • 浏览: 281572 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Fork/Join 型线程池与 Work-Stealing 算法

    博客分类:
  • JDK
 
阅读更多
JDK 1.7 时,标准类库添加了 ForkJoinPool,作为对 Fork/Join 型线程池的实现。Fork 在英文中有 分叉 的意思,而 Join 有 合并 的意思。ForkJoinPool 的功能也是如此:Fork 将大任务分叉为多个小任务,然后让小任务执行,Join 是获得小任务的结果,然后进行合并,将合并的结果作为大任务的结果 —— 并且这会是一个递归的过程 —— 因为任务如果足够大,可以将任务多级分叉直到任务足够小。



由此可见,ForkJoinPool 可以满足 并行 地实现 分治算法(Divide-and-Conquer) 的需要。

ForkJoinPool 的类图如下:


可以看到 ForkJoinPool 实现了 ExecutorService 接口,所以首先 ForkJoinPool 也是一个 线程池。因而 Runnable 和 Callable 类型的任务,ForkJoinPool 也可以通过 submit、invokeAll 和 invokeAny 等方法来执行。但是标准类库还为 ForkJoinPool 定义了一种新的任务,它就是 ForkJoinTask<V>。

ForkJoinTask 相关类图:


ForkJoinTask<V> 用来专门定义 Fork/Join 型任务 —— 完成将大任务分割为小任务以及合并结果的工作。一般我们不需要直接继承 ForkJoinTask<V>,而是继承它的子类 RecursiveAction 和 RecursiveTask 并实现对应的抽象方法 —— compute 。其中,RecursiveAction 是不带返回值的 Fork/Join 型任务,所以使用此类任务并不产生结果,也就不涉及到结果的合并;而 RecursiveTask 是带返回值的 Fork/Join 型任务,使用此类任务需要我们进行结果的合并。通过 fork 方法,我们可以产生子任务并执行;通过 join 方法,我们可以获得子任务的结果。

ForkJoinPool 用三种方法用来执行 ForkJoinTask:

invoke 方法:


invoke 方法用来执行一个带返回值的任务(通常继承自RecursiveTask),并且该方法是阻塞的,直到任务执行完毕,该方法才会停止阻塞并返回任务的执行结果。

submit 方法:


除了从 ExecutorService 继承的 submit 方法外,ForkJoinPool 还定义了用来执行 ForkJoinTask 的 submit 方法 —— 一般该 submit 方法用来执行带返回值的ForkJoinTask(通常继承自RecursiveTask)。该方法是非阻塞的,调用之后将任务提交给 ForkJoinPool 去执行便立即返回,返回的便是已经提交到 ForkJoinPool 去执行的 task —— 由类图可知 ForkJoinTask 实现了 Future 接口,所以可以直接通过 task 来和已经提交的任务进行交互。

execute 方法:


除了从 Executor 获得的 execute 方法外,ForkJoinPool 也定义了用来执行ForkJoinTask 的 execute 方法 —— 一般该 execute 方法用来执行不带返回值的ForkJoinTask(通常继承自RecursiveAction) ,该方法同样是非阻塞的。

现在让我们来实践下 ForkJoinPool 的功能:计算 π 的值。
计算 π 的值有一个通过多项式方法,即:
π = 4 * (1 - 1/3 + 1/5 - 1/7 + 1/9 - ……)
多项式的项数越多,计算出的 π 的值越精确。

首先我们定义用来估算 π 的 PiEstimateTask:

static class PiEstimateTask extends RecursiveTask<Double> {

    private final long begin;
    private final long end;
    private final long threshold; // 分割任务的临界值

    public PiEstimateTask(long begin, long end, long threshold) {
        this.begin = begin;
        this.end = end;
        this.threshold = threshold;
    }

    @Override
    protected Double compute() {
        if (end - begin <= threshold) {

            int sign = 1; // 符号,取 1 或者 -1
            double result = 0.0;
            for (long i = begin; i < end; i++) {
                result += sign / (i * 2.0 + 1);
                sign = -sign;
            }

            return result * 4;
        }

        // 分割任务
        long middle = (begin + end) / 2;
        PiEstimateTask leftTask = new PiEstimateTask(begin, middle, threshold);
        PiEstimateTask rightTask = new PiEstimateTask(middle, end, threshold);

        leftTask.fork();  // 异步执行 leftTask
        rightTask.fork(); // 异步执行 rightTask

        double leftResult = leftTask.join();   // 阻塞,直到 leftTask 执行完毕返回结果
        double rightResult = rightTask.join(); // 阻塞,直到 rightTask 执行完毕返回结果

        return leftResult + rightResult; // 合并结果
    }

}
然后我们使用 ForkJoinPool 的 invoke 执行 PiEstimateTask:

public class ForkJoinPoolTest {

    public static void main(String[] args) throws Exception {
        ForkJoinPool forkJoinPool = new ForkJoinPool(4);
   
        // 计算 10 亿项,分割任务的临界值为 1 千万
        PiEstimateTask task = new PiEstimateTask(0, 1_000_000_000, 10_000_000);
   
        double pi = forkJoinPool.invoke(task); // 阻塞,直到任务执行完毕返回结果
   
        System.out.println("π 的值:" + pi);
       
        forkJoinPool.shutdown(); // 向线程池发送关闭的指令
    }
}
运行结果:


我们也可以使用 submit 方法异步的执行任务(此处 submit 方法返回的 future 指向的对象即提交任务时的 task):

public static void main(String[] args) throws Exception {
    ForkJoinPool forkJoinPool = new ForkJoinPool(4);

    PiEstimateTask task = new PiEstimateTask(0, 1_000_000_000, 10_000_000);
    Future<Double> future = forkJoinPool.submit(task); // 不阻塞
   
    double pi = future.get();
    System.out.println("π 的值:" + pi);
    System.out.println("future 指向的对象是 task 吗:" + (future == task));
   
    forkJoinPool.shutdown(); // 向线程池发送关闭的指令
}
运行结果:


值得注意的是,选取一个合适的分割任务的临界值,对 ForkJoinPool 执行任务的效率有着至关重要的影响。临界值选取过大,任务分割的不够细,则不能充分利用 CPU;临界值选取过小,则任务分割过多,可能产生过多的子任务,导致过多的线程间的切换和加重 GC 的负担从而影响了效率。所以,需要根据实际的应用场景选择一个合适的分割任务的临界值。

ForkJoinPool 相比于 ThreadPoolExecutor,还有一个非常重要的特点(优点)在于,ForkJoinPool具有 Work-Stealing (工作窃取)的能力。所谓 Work-Stealing,在 ForkJoinPool 中的实现为:线程池中每个线程都有一个互不影响的任务队列(双端队列),线程每次都从自己的任务队列的队头中取出一个任务来运行;如果某个线程对应的队列已空并且处于空闲状态,而其他线程的队列中还有任务需要处理但是该线程处于工作状态,那么空闲的线程可以从其他线程的队列的队尾取一个任务来帮忙运行 —— 感觉就像是空闲的线程去偷人家的任务来运行一样,所以叫 “工作窃取”。

Work-Stealing 的适用场景是不同的任务的耗时相差比较大,即某些任务需要运行较长时间,而某些任务会很快的运行完成,这种情况下用 Work-Stealing 很合适;但是如果任务的耗时很平均,则此时 Work-Stealing 并不适合,因为窃取任务时也是需要抢占锁的,这会造成额外的时间消耗,而且每个线程维护双端队列也会造成更大的内存消耗。所以 ForkJoinPool 并不是 ThreadPoolExecutor 的替代品,而是作为对 ThreadPoolExecutor 的补充。

总结:
ForkJoinPool 和 ThreadPoolExecutor 都是 ExecutorService(线程池),但ForkJoinPool 的独特点在于:

ThreadPoolExecutor 只能执行 Runnable 和 Callable 任务,而 ForkJoinPool 不仅可以执行 Runnable 和 Callable 任务,还可以执行 Fork/Join 型任务 —— ForkJoinTask —— 从而满足并行地实现分治算法的需要;
ThreadPoolExecutor 中任务的执行顺序是按照其在共享队列中的顺序来执行的,所以后面的任务需要等待前面任务执行完毕后才能执行,而 ForkJoinPool 每个线程有自己的任务队列,并在此基础上实现了 Work-Stealing 的功能,使得在某些情况下 ForkJoinPool 能更大程度的提高并发效率。

转自:https://segmentfault.com/a/1190000008140126
分享到:
评论

相关推荐

    Java Fork/Join框架

    Fork/Join框架基于工作窃取算法(Work-Stealing Algorithm),这种算法确保了任务的均衡分配和高效的资源利用。 工作窃取算法的工作原理是,当一个工作线程完成自己的任务后,它会去检查其他工作线程的队列,如果...

    JDK7中的ForkJoin模式

    Fork/Join 模式的优势在于它可以自动管理任务的调度和并行度,以及提供一种优雅的方式来处理工作窃取(work-stealing)策略,即当一个工作线程完成自己的任务后,会尝试从其他工作线程的队列中窃取任务来执行,从而...

    azureblob-1.5.5.zip

    Fork/Join框架的核心概念包括工作窃取(Work Stealing)算法和分而治之(Divide and Conquer)策略。在测试场景下,这意味着测试用例可以被并行化执行,加快整个测试套件的运行速度。"fork-join-junit-extensions-...

    ForkJoin并发框架入门示例

    与标准的ExecutorService不同,`ForkJoinPool`使用了一种称为工作窃取(Work Stealing)的算法。每个工作线程都有自己的双端队列,用于存储待执行的任务。当一个线程完成自己的任务后,它会尝试从其他线程的队列中...

    Stream流式计算、ForkJoin和异步回调.md

    ForkJoin框架的核心思想之一是“工作窃取”(Work Stealing)算法。这种算法的基本思路是当一个线程没有任务可做时,它可以从其他线程的队列中“窃取”任务来执行,这样可以充分利用多核处理器的能力。 #### 2.3 ...

    高性能并发业务.pdf

    以ForkJoinPool为例,它是Java并发工具中的一种,适用于计算密集型的任务,它通过Work-Stealing算法高效地管理和执行任务。 Java并发的内置实现涉及到了Thread、Runnable、Callable等基础构建,以及ExecuterService...

    调度算法_多核_调度算法_correctlyq2j_多核调度_多核操作系统_

    首先,"Real-time semi-partitioned scheduling of fork-join tasks using work-stealing.pdf"探讨了基于工作窃取的实时半分区调度方法。Fork-join模型是一种并行计算模型,其中任务可以分解为子任务,然后合并结果...

    simple-fork-join:ForkJoin的简单示例

    在实际开发中,ForkJoin框架常用于大规模数据处理、矩阵运算、排序算法(如快速排序)等场景。`simple-fork-join`项目提供的示例代码可以帮助初学者理解如何在Java中使用ForkJoin框架来编写高效的并行计算程序。通过...

    fj-demo:ForkJoin可视化

    ForkJoin框架的设计理念是通过工作窃取(Work-Stealing)算法来确保所有可用的处理器资源都被充分利用。 1. **ForkJoinPool**: ForkJoinPool是ForkJoin框架的核心,它负责管理线程和任务。与标准的ExecutorService...

    Java多线程ForkJoinPool实例详解

    ForkJoinPool是ExecutorService接口的实现,它管理工作窃取算法(Work-Stealing Algorithm)实现高效的任务执行和线程管理。 ForkJoinPool的主要组成部分有两个类:ForkJoinPool和ForkJoinTask。ForkJoinPool类实现...

    java 线程池的实现方法

    5. **newWorkStealingPool(int parallelism)**:这是Java 8引入的新特性,基于Fork/Join框架的线程池,使用Work-Stealing算法。每个工作线程都有自己的双端队列,任务会被随机分配给工作线程,这样可以提高并行度,...

    JAVA高质量并发详解,多线程并发深入讲解

    - **Fork/Join框架:** 一种基于工作窃取(Work Stealing)算法的并发框架,能够有效地利用多核处理器的优势。 #### 三、并发编程中常见的问题及其解决方案 - **死锁:** 当两个或更多的线程无限期地等待彼此持有...

    Java_JDK_1.7 64位

    3. **Fork/Join框架**:这是并行编程的一个新框架,通过Work-Stealing算法提高多核处理器环境下程序的执行速度。 4. **字符串改进**:JDK1.7中,字符串拼接优化了StringBuilder,提高了性能;另外,还引入了`String...

    JavaMagic:我在哪里进行有关Java新功能的研究

    JavaMagicwhere i doing some research about java new featuresKMP算法实现JDK8新特性:lambda表达式和stream流式编程以及函数参数forkjoin的work-stealing模式,类似于CPU内核级别的hadoop,充分利用CPU资源,相比...

    jdk api 1.8_China.zip

    它通过WorkStealing算法自动平衡子任务,减少了锁的使用。 2. 并发集合:ConcurrentHashMap在1.8中进行了优化,提升了并发性能。另外,新增了ConcurrentLinkedQueue和ConcurrentSkipListMap等高效并发集合。 七、 ...

    13、线程池ForkJoinPool实战及其工作原理分析(1).pdf

    - **特点**: 它主要通过`Fork-Join`框架实现,能够有效地管理并行任务,特别是适用于那些可以被细分为更小任务的任务类型。 #### 1.2 ForkJoinPool的工作原理 - **任务的分割与合并**: 在`ForkJoinPool`中,任务...

    Java分支与合并框架详解.pdf

    它的工作线程池采用了一种叫做工作窃取算法(Work-Stealing Algorithm),可以高效地处理大量的微小任务,避免了线程阻塞等待新任务的情况。当一个工作线程完成任务后,它会尝试从其他工作线程的队列中窃取任务来...

    深入了解Java语言中的并发性选项有何不同

    ForkJoinPool是线程池实现,WorkStealing算法保证了任务的高效分配和执行。 函数式编程风格在处理并发时具有天然优势,因为它鼓励使用不可变数据和无副作用的函数,减少了并发编程中的许多挑战。Scala和Groovy等...

Global site tag (gtag.js) - Google Analytics