在JAVA7之前,并行处理数据非常麻烦。第一,你得明确把包含数据的数据结构分成若干份。第二,你要将每个子部分分配给一个独立的线程。第三,你要在恰当的时候对它们进行同步避免不希望的竞争条件,等待所有线程完成,最后把这些部分结果合并起来。在Java 7引入了分支/合并框架,让这些操作更稳定、更不容易出错。
分支/合并框架的目的是以递归的方式将可以并行的任务拆分为更小的任务,然后将每个子任务的结果合并起来生成整体结果。要把子任务提交到ForkJoinPool必须创建RecursiveTask<R>的子类。需要实现它唯一的抽象方法 protected abstract R compute(); 在这个方法中定义了将任务拆分成子任务的逻辑,以及无法拆分时生成单个子任务结果的逻辑。
计算1到10000000的和
/** * Desc:Fork/Join框架的目的是以递归方式将可以并行的任务拆分为更小的任务,然后将每个子任务的结果合并起来生成一个整体结果。 * 要把任务提交到ForkJoinPool必须创建RecursiveTask<T> 的一个子类 * * @author wei.zw * @since 2016年7月6日 下午9:27:56 * @version v 0.1 */ public class ForkJoinSumCalculator extends RecursiveTask<Long> { /** */ private static final long serialVersionUID = -8013303660374621470L; private final long[] numbers; private final int start; private final int end; private static final long THRESHOLD = 1000; /** * @param numbers * @param start * @param end */ public ForkJoinSumCalculator(long[] numbers, int start, int end) { super(); this.numbers = numbers; this.start = start; this.end = end; } /** * @param numbers */ public ForkJoinSumCalculator(long[] numbers) { super(); this.numbers = numbers; this.start = 0; this.end = numbers.length; } /** * @see java.util.concurrent.RecursiveTask#compute() */ @Override protected Long compute() { int length = end - start; if (length <= THRESHOLD) { return computeSequentially(); } ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2); leftTask.fork(); ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end); rightTask.fork(); Long rightResult = 0L; try { rightResult = rightTask.get(); } catch (Exception e) { } Long leftResult = leftTask.join(); return leftResult + rightResult; } /** * * @return * @author wei.zw */ private Long computeSequentially() { long sum = 0; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; } public static void main(String[] args) { long[] numbers = LongStream.rangeClosed(1, 10000000).toArray(); long start = System.currentTimeMillis(); System.out.println(new ForkJoinPool().invoke(new ForkJoinSumCalculator(numbers)) + " 耗时:" + (System.currentTimeMillis() - start)); } }
结果是:50000005000000 耗时:37
优化后的
/** * @see java.util.concurrent.RecursiveTask#compute() */ @Override protected Long compute() { int length = end - start; if (length <= THRESHOLD) { return computeSequentially(); } ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2); leftTask.fork(); ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end); Long rightResult = rightTask.compute(); Long leftResult = leftTask.join(); return leftResult + rightResult; }
计算结果是:50000005000000 耗时:25
使用Fork/Join框架的最佳做法:
- 对一个任务调用join方法会阻塞调用方,直到该任务作出结果。因此,又必须要在两个子任务的计算都开始之后再调用它。
- 不应该在RecursiveTask内部使用ForkJoinPool的invoke方法,应该直接调用compute或者fork方法
- 对子任务调用fork方法可以将这个子任务排进ForkJoinPool。同时对左右两边的子任务都调用似乎很自然,但是这样做的效率比直接对其中一个调用compute方法低。这样做可以为其中一个子任务重用同一线程,从而避免在线程池中多分配一个任务造成的开销。
相关推荐
Java线程池ForkJoinPool实例解析 Java线程池ForkJoinPool实例解析是Java并发编程中的一种高级主题,ForkJoinPool是Java 7中引入的一种新的线程池实现,它可以充分利用多CPU和多核CPU的优势,使得并发编程变得更加...
- **任务的分割与合并**: 在`ForkJoinPool`中,任务通过`ForkJoinTask`接口表示,并可以通过`ForkJoinPool`的`invoke()`方法提交。当一个任务被提交时,它会被分解成更小的任务,直到达到一定的阈值(通常称为基础...
客户端代码中,首先创建一个ForkJoinPool实例,通常我们会使用ForkJoinPool.commonPool()来获取一个默认的线程池。然后,提交一个Task实例,ForkJoinPool会自动处理任务的拆分和结果的合并。最后,通过调用Task的get...
- `ForkJoinPool.go`: 实现了`ForkJoinPool`结构体和相关方法,如`NewForkJoinPool()`用于创建一个新的pool,`Submit()`用于提交任务,以及`Start()`和`Shutdown()`用于控制pool的生命周期。 - `ForkJoinTask.go`: ...
Java NIO(非阻塞I/O)是一种在Java中处理I/O操作的新方式,相比于传统的BIO(阻塞I/O)...在实际项目中,`nioSample`工程可能包含这些技术的示例代码,帮助开发者理解和学习如何在Java中有效利用NIO和Fork/Join框架。
在Java 8中,`ForkJoinPool`是默认的并行流执行器,它会根据CPU核心数动态调整线程数量,以充分利用硬件资源。 在实际应用中,为了更好地利用并行计算,需要注意以下几点: 1. **任务粒度**:并行计算的优势在于...
在IT行业中,尤其是在Java编程领域...通过学习并实践这个"BigNums"示例,开发者能够掌握如何在Java中利用并行编程和多线程技术来优化大数搜索,从而提升程序性能。这将对解决大规模数据处理和高性能计算问题大有裨益。
默认情况下,Dubbo 使用 JBoss 的 ForkJoinPool 作为工作线程池。设置 `executes` 参数可以避免服务因过多并发请求而耗尽资源,起到流量控制的作用。例如,如果将 `executes` 设置为 100,那么服务端在同一时刻最多...
1. **ForkJoinPool**:这是执行ForkJoinTask的线程池。每个工作线程维护一个双端工作队列,用于存放待处理的任务。当一个工作线程的任务完成后,它会尝试从其他工作线程的工作队列尾部“窃取”任务,这种机制称为**...
`ForkJoinPool`和`CompletableFuture`在JDK 9中得到优化,提供了更好的性能和资源管理。 以上只是JDK 9新特性的一部分,还有其他如改进的Javadoc、新的`var`关键字(仅限局部变量)等。通过这些示例代码,您可以更...
Java 8提供了新的并发工具,如ForkJoinPool和CompletableFuture,它们是基于Java Fork/Join框架的。这些工具使得多线程编程更加高效且易于理解。例如,ForkJoinPool用于执行分治策略的任务,CompletableFuture则支持...
`ScheduledThreadPoolExecutor`允许设定延时或周期性的任务执行,而`ForkJoinPool`是Java 7引入的,适用于递归计算任务,它利用工作窃取来提高并行效率。 博客中可能还会涉及线程池的性能调优,如根据系统资源和...
java8 源码 GH-Demo 1、jOOR源码研习,并优化对Class、...9、学习ForkJoinPool并发编程方式 10、学习Android中的StateMachine的层次状态机,并编写测试用例 11、学习LeadkCanary中对内存泄漏的检测方法--WeakReference
Java并发库包括了许多类和接口,如`ExecutorService`, `Future`, `Callable`, `Semaphore`, `CyclicBarrier`, `CountDownLatch`, `ThreadPoolExecutor`, `ForkJoinPool`以及各种原子类(如`AtomicInteger`, `...
1. **ForkJoinPool**: ForkJoinPool是ForkJoin框架的核心,它负责管理线程和任务。与标准的ExecutorService不同,ForkJoinPool专为ForkJoinTask设计,能更高效地处理递归任务和短生命周期的任务。它的工作线程会尝试...
`ForkJoinPool`基于工作窃取算法,而`CompletableFuture`则支持异步编程和复杂的依赖关系。 **9. Type接口和TypeToken** 类型接口(Type Interface)和TypeToken为泛型类型提供了运行时反射支持,特别是在Gson库中...
并行流底层使用了`ForkJoinPool`,这是一种工作窃取算法的线程池,能够有效地利用多核处理器资源,提高并行计算性能。 #### 2.2 `ConcurrentHashMap`的改进 `ConcurrentHashMap`在1.8中进行了优化,提供了更好的...
8. **并行和并发改进**:Java8对并行和并发进行了优化,比如`ForkJoinPool`和`Parallel Streams`,使得并行计算更加简单高效。 9. **改进的类型推断**:Java8增强了类型推断,使得编译器能够更好地理解代码,减少...
1. **多线程并行流(Fork/Join框架)**:Java 7引入了ForkJoinPool和RecursiveTask/RecursiveAction类,用于实现高效的并行计算,通过分治策略将大任务拆分为小任务执行。 2. **try-with-resources语句**:这是一个...
10. **并发改进**: 并发包`java.util.concurrent`也得到了增强,例如`ForkJoinPool`和`CompletableFuture`,提供了一种更有效率的异步编程模型。 **示例代码的价值** 这些官方示例代码是学习和掌握Java 8新特性的...