`

如何使用 ForkJoinPool 以及原理

 
阅读更多

前言

Java 1.7 引入了一种新的并发框架—— Fork/Join Framework。

本文的主要目的是介绍 ForkJoinPool 的适用场景,实现原理,以及示例代码。

TLDR; 如果觉得文章太长的话,以下就是结论

  • ForkJoinPool 不是为了替代 ExecutorService,而是它的补充,在某些应用场景下性能比 ExecutorService 更好。(见 Java Tip: When to use ForkJoinPool vs ExecutorService )
  • ForkJoinPool 主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数,例如 quick sort 等。
  • ForkJoinPool 最适合的是计算密集型的任务,如果存在 I/O,线程间同步,sleep() 等会造成线程长时间阻塞的情况时,最好配合使用 ManagedBlocker

使用

首先介绍的是大家最关心的 Fork/Join Framework 的使用方法,如果对使用方法已经很熟悉的话,可以跳过这一节,直接阅读原理

用一个特别简单的求整数数组所有元素之和来作为我们现在需要解决的问题吧。

问题

计算1至1000的正整数之和。

解决方法

For-loop

最简单的,显然是不使用任何并行编程的手段,只用最直白的 for-loop 来实现。下面就是具体的实现代码。

不过为了便于横向对比,也为了让代码更加 Java Style,首先我们先定义一个 interface。

public interface Calculator {
    long sumUp(long[] numbers);
}

这个 interface 非常简单,只有一个函数 sumUp,就是返回数组内所有元素的和。

再写一个 main 方法。

public class Main {
    public static void main(String[] args) {
        long[] numbers = LongStream.rangeClosed(1, 1000).toArray();
        Calculator calculator = new MyCalculator();
        System.out.println(calculator.sumUp(numbers)); // 打印结果500500
    }
}

接下来就是我们的 Plain Old For-loop Calculator,简称 POFLC 的实现了。(这其实是个段子,和主题完全无关,感兴趣的请见文末的彩蛋

public class ForLoopCalculator implements Calculator {
    public long sumUp(long[] numbers) {
        long total = 0;
        for (long i : numbers) {
            total += i;
        }
        return total;
    }
}

这段代码毫无出奇之处,也就不多解释了,直接跳入下一节——并行计算。

ExecutorService

在 Java 1.5 引入 ExecutorService 之后,基本上已经不推荐直接创建 Thread 对象,而是统一使用 ExecutorService。毕竟从接口的易用程度上来说 ExecutorService 就远胜于原始的 Thread,更不用提 java.util.concurrent 提供的数种线程池,Future 类,Lock 类等各种便利工具。

使用 ExecutorService 的实现

public class ExecutorServiceCalculator implements Calculator {
    private int parallism;
    private ExecutorService pool;

    public ExecutorServiceCalculator() {
        parallism = Runtime.getRuntime().availableProcessors(); // CPU的核心数
        pool = Executors.newFixedThreadPool(parallism);
    }

    private static class SumTask implements Callable<Long> {
        private long[] numbers;
        private int from;
        private int to;

        public SumTask(long[] numbers, int from, int to) {
            this.numbers = numbers;
            this.from = from;
            this.to = to;
        }

        @Override
        public Long call() throws Exception {
            long total = 0;
            for (int i = from; i <= to; i++) {
                total += numbers[i];
            }
            return total;
        }
    }

    @Override
    public long sumUp(long[] numbers) {
        List<Future<Long>> results = new ArrayList<>();

        // 把任务分解为 n 份,交给 n 个线程处理
        int part = numbers.length / parallism;
        for (int i = 0; i < parallism; i++) {
            int from = i * part;
            int to = (i == parallism - 1) ? numbers.length - 1 : (i + 1) * part - 1;
            results.add(pool.submit(new SumTask(numbers, from, to)));
        }

        // 把每个线程的结果相加,得到最终结果
        long total = 0L;
        for (Future<Long> f : results) {
            try {
                total += f.get();
            } catch (Exception ignore) {}
        }

        return total;
    }
}

如果对 ExecutorService 不太熟悉的话,推荐阅读《七天七并发模型》的第二章,对 Java 的多线程编程基础讲解得比较清晰。当然著名的《Java并发编程实战》也是不可多得的好书。

ForkJoinPool

前面花了点时间讲解了 ForkJoinPool 之前的实现方法,主要为了在代码的编写难度上进行一下对比。现在就列出本篇文章的重点——ForkJoinPool 的实现方法。

public class ForkJoinCalculator implements Calculator {
    private ForkJoinPool pool;

    private static class SumTask extends RecursiveTask<Long> {
        private long[] numbers;
        private int from;
        private int to;

        public SumTask(long[] numbers, int from, int to) {
            this.numbers = numbers;
            this.from = from;
            this.to = to;
        }

        @Override
        protected Long compute() {
            // 当需要计算的数字小于6时,直接计算结果
            if (to - from < 6) {
                long total = 0;
                for (int i = from; i <= to; i++) {
                    total += numbers[i];
                }
                return total;
            // 否则,把任务一分为二,递归计算
            } else {
                int middle = (from + to) / 2;
                SumTask taskLeft = new SumTask(numbers, from, middle);
                SumTask taskRight = new SumTask(numbers, middle+1, to);
                taskLeft.fork();
                taskRight.fork();
                return taskLeft.join() + taskRight.join();
            }
        }
    }

    public ForkJoinCalculator() {
        // 也可以使用公用的 ForkJoinPool:
        // pool = ForkJoinPool.commonPool()
        pool = new ForkJoinPool();
    }

    @Override
    public long sumUp(long[] numbers) {
        return pool.invoke(new SumTask(numbers, 0, numbers.length-1));
    }
}

可以看出,使用了 ForkJoinPool 的实现逻辑全部集中在了 compute() 这个函数里,仅用了14行就实现了完整的计算过程。特别是,在这段代码里没有显式地“把任务分配给线程”,只是分解了任务,而把具体的任务到线程的映射交给了 ForkJoinPool 来完成。

原理

如果你除了 ForkJoinPool 的用法以外,对 ForkJoinPoll 的原理也感兴趣的话,那么请接着阅读这一节。在这一节中,我会结合 ForkJoinPool 的作者 Doug Lea 的论文——《A Java Fork/Join Framework》,尽可能通俗地解释 Fork/Join Framework 的原理。

我一直以为,要理解一样东西的原理,最好就是自己尝试着去实现一遍。根据上面的示例代码,可以看出 fork() 和 join() 是 Fork/Join Framework “魔法”的关键。我们可以根据函数名假设一下 fork() 和 join() 的作用:

  • fork():开启一个新线程(或是重用线程池内的空闲线程),将任务交给该线程处理。
  • join():等待该任务的处理线程处理完毕,获得返回值。

以上模型似乎可以(?)解释 ForkJoinPool 能够多线程执行的事实,但有一个很明显的问题

当任务分解得越来越细时,所需要的线程数就会越来越多,而且大部分线程处于等待状态。

但是如果我们在上面的示例代码加入以下代码

System.out.println(pool.getPoolSize());

这会显示当前线程池的大小,在我的机器上这个值是4,也就是说只有4个工作线程。甚至即使我们在初始化 pool 时指定所使用的线程数为1时,上述程序也没有任何问题——除了变成了一个串行程序以外。

public ForkJoinCalculator() {
    pool = new ForkJoinPool(1);
}

这个矛盾可以导出,我们的假设是错误的,并不是每个 fork() 都会促成一个新线程被创建,而每个 join() 也不是一定会造成线程被阻塞。Fork/Join Framework 的实现算法并不是那么“显然”,而是一个更加复杂的算法——这个算法的名字就叫做 work stealing 算法。

work stealing 算法在 Doung Lea 的论文中有详细的描述,以下是我在结合 Java 1.8 代码的阅读以后——现有代码的实现有一部分相比于论文中的描述发生了变化——得到的相对通俗的解释:

基本思想

  • ForkJoinPool 的每个工作线程都维护着一个工作队列WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务ForkJoinTask)。
  • 每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。
  • 每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。
  • 在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。
  • 在既没有自己的任务,也没有可以窃取的任务时,进入休眠。

下面来介绍一下关键的两个函数:fork() 和 join() 的实现细节,相比来说 fork() 比 join() 简单很多,所以先来介绍 fork()

fork

fork() 做的工作只有一件事,既是把任务推入当前工作线程的工作队列里。可以参看以下的源代码:

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}

join

join() 的工作则复杂得多,也是 join() 可以使得线程免于被阻塞的原因——不像同名的 Thread.join()

  1. 检查调用 join() 的线程是否是 ForkJoinThread 线程。如果不是(例如 main 线程),则阻塞当前线程,等待任务完成。如果是,则不阻塞。
  2. 查看任务的完成状态,如果已经完成,直接返回结果。
  3. 如果任务尚未完成,但处于自己的工作队列内,则完成它。
  4. 如果任务已经被其他的工作线程偷走,则窃取这个小偷的工作队列内的任务(以 FIFO 方式),执行,以期帮助它早日完成欲 join 的任务。
  5. 如果偷走任务的小偷也已经把自己的任务全部做完,正在等待需要 join 的任务时,则找到小偷的小偷,帮助它完成它的任务。
  6. 递归地执行第5步。

将上述流程画成序列图的话就是这个样子:

以上就是 fork() 和 join() 的原理,这可以解释 ForkJoinPool 在递归过程中的执行逻辑,但还有一个问题

最初的任务是 push 到哪个线程的工作队列里的?

这就涉及到 submit() 函数的实现方法了

submit

其实除了前面介绍过的每个工作线程自己拥有的工作队列以外,ForkJoinPool 自身也拥有工作队列,这些工作队列的作用是用来接收由外部线程(非 ForkJoinThread 线程)提交过来的任务,而这些工作队列被称为 submitting queue 。

submit() 和 fork() 其实没有本质区别,只是提交对象变成了 submitting queue 而已(还有一些同步,初始化的操作)。submitting queue 和其他 work queue 一样,是工作线程”窃取“的对象,因此当其中的任务被一个工作线程成功窃取时,就意味着提交的任务真正开始进入执行阶段。

总结

在了解了 Fork/Join Framework 的工作原理之后,相信很多使用上的注意事项就可以从原理中找到原因。例如:为什么在 ForkJoinTask 里最好不要存在 I/O 等会阻塞线程的行为?,这个我姑且留作思考题吧 :)

还有一些延伸阅读的内容,在此仅提及一下:

  1. ForkJoinPool 有一个 Async Mode ,效果是工作线程在处理本地任务时也使用 FIFO 顺序。这种模式下的 ForkJoinPool 更接近于是一个消息队列,而不是用来处理递归式的任务。
  2. 在需要阻塞工作线程时,可以使用 ManagedBlocker
  3. Java 1.8 新增加的 CompletableFuture 类可以实现类似于 Javascript 的 promise-chain,内部就是使用 ForkJoinPool 来实现的。

rel: http://blog.dyngr.com/blog/2016/09/15/java-forkjoinpool-internals/#%E5%8E%9F%E7%90%86

分享到:
评论

相关推荐

    13、线程池ForkJoinPool工作原理分析

    此外,理解ForkJoinPool的工作原理并合理地设计任务分解逻辑,是充分发挥其性能的关键。 总结来说,ForkJoinPool是Java并发编程中一种高效的线程池实现,特别适合处理可并行的递归任务。其核心的Work-Stealing算法...

    13、线程池ForkJoinPool实战及其工作原理分析(1).pdf

    **ForkJoinPool实战及工作原理** #### 1.1 ForkJoinPool简介 - **定义**: `ForkJoinPool`是Java 7引入的一种用于执行大量细粒度并行任务的线程池。 - **特点**: 它主要通过`Fork-Join`框架实现,能够有效地管理...

    Java中的Fork,Join框架深度解析

    尽量使用ForkJoinPool.commonPool()来获取默认的线程池,以减少资源消耗。 合理设置任务的分解阈值,以避免过度分解导致的性能下降。 避免在RecursiveTask内部使用ForkJoinPool的invoke方法,而应该直接调用compute...

    fork/join 实例

    Fork/Join框架是Java并发处理的一个重要工具,它基于工作窃取算法,设计...在实际项目中,根据业务需求正确配置和使用ForkJoinPool,以及合理设计RecursiveAction或RecursiveTask,能够有效提升应用程序的并发性能。

    java Fork Join框架及使用

    Fork/Join框架是Java7引入的一种用于并行任务执行的框架,它允许将复杂任务拆分成多个子任务,并行...此外,还有一些在线资源和文章提供了ForkJoinPool以及其原理的内部解析,这些也是学习Fork/Join框架的重要资源。

    java并行排序程序 parallel

    在这个经典例子中,我们将深入探讨Java并行排序的工作原理、实现方式以及如何在实际项目中应用。 首先,Java并行排序是基于`java.util.concurrent.ForkJoinPool`类和`java.util.concurrent.RecursiveAction`类实现...

    designpattern.zip

    13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、有序性、原子性与JMM内存模型 (1).pdf 15、CPU缓存架构详解&高性能内存队列Disruptor 实战 (1).pdf 16、常用并发设计模式精讲 (1)....

    forkjoin.zip

    13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、有序性、原子性与JMM内存模型 (1).pdf 15、CPU缓存架构详解&高性能内存队列Disruptor 实战 (1).pdf 16、常用并发设计模式精讲 (1)....

    disruptor.zip

    13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、有序性、原子性与JMM内存模型 (1).pdf 15、CPU缓存架构详解&高性能内存队列Disruptor 实战 (1).pdf 16、常用并发设计模式精讲 (1)....

    jmm(1).zip

    13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、有序性、原子性与JMM内存模型 (1).pdf 15、CPU缓存架构详解&高性能内存队列Disruptor 实战 (1).pdf 16、常用并发设计模式精讲 (1)....

    ForkJoin并发框架入门示例

    与标准的ExecutorService不同,`ForkJoinPool`使用了一种称为工作窃取(Work Stealing)的算法。每个工作线程都有自己的双端队列,用于存储待执行的任务。当一个线程完成自己的任务后,它会尝试从其他线程的队列中...

    Fork Join框架机制详解.docx

    通过`ForkJoinPool`和`ForkJoinTask`的配合,以及工作窃取算法的使用,可以在多核处理器环境中充分利用硬件资源,显著提升计算性能。在实际开发中,尤其是在大数据计算、分布式数据库查询等场景下,Fork/Join框架...

    Java 1.8 源码

    例如,使用`ForkJoinPool`处理大量数据: ```java ForkJoinPool pool = new ForkJoinPool(); pool.invoke(new RecursiveTask() { // 实现任务逻辑 }); ``` 6. **方法引用**: 方法引用是与Lambda表达式相关...

    关于 java.util.concurrent 您不知道的 5 件事,第 2 部分

    ForkJoinPool是Java 7引入的一个高级线程池,用于执行ForkJoinTask任务,特别适合于可以分解为更小子任务的计算密集型任务。RecursiveTask是一个抽象类,继承自FutureTask,用于表示具有结果的递归计算。通过...

    Java多线程的4种实现方式(源代码)

    - **实现原理**:`ForkJoinPool`提供了一种更高效的任务分解和合并机制,适用于大量细粒度任务的并行处理。 - **示例代码**: ```java public class MyRecursiveTask extends RecursiveTask&lt;Integer&gt; { private ...

    80w字Java面试宝典(非常全)

    ava八股文通常是指Java技术面试中常见的知识点集合,...线程池及并发包:Executors、ForkJoinPool、CountDownLatch、CyclicBarrier等。 并发优化:CAS、无锁编程、AQS原理等。 内容不限于这些,适合中、大厂面试来用!

    详解Java8与Runtime.getRuntime().availableProcessors()

    总结来说,Java 8中的`Runtime.getRuntime().availableProcessors()`提供了系统处理器核心数的信息,这对并行流、`ForkJoinPool`的配置以及多线程程序的性能优化具有重要影响。开发者应理解其工作原理,考虑环境因素...

    Java ForkJoin框架的原理及用法

    第二步:使用ForkJoinPool进行执行,task要通过ForkJoinPool来执行,分割的子任务也会添加到当前工作线程的双端队列中,进入队列的头部。当一个工作线程中没有任务时,会从其他工作线程的队列尾部获取一个任务(工作...

Global site tag (gtag.js) - Google Analytics