Fork/Join框架的使用
Java7提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合并成总的计算结果。ForkJoinPool类是Fork/Join框架的核心,它和ThreadPoolExecutor一样也是ExecutorService接口的实现类。
ForkJoinPool最大的特殊之处就在于其实现了工作窃取(work-stealing)。所谓工作窃取,即当前线程的Task已经全被执行完毕,则自动从其他线程的任务池末尾取出任务继续执行。
一个fork/join框架之下的任务由ForkJoinTask类表示。ForkJoinTask实现了Future接口,可以按照Future接口的方式来使用。在ForkJoinTask类中之重要的两个方法fork和join。fork方法用异步方式启动任务的执行,join方法则等待任务完成并返回结果。
在创建任务时,通常不直接使用ForkJoinTask类,是使用它的两个抽象子类:
RecursiveAction:没有返回值的任务
RecursiveTask:有返回值的任务
对RecursiveAction抽象类的进一步封装:
public abstract class AbstractRecursiveAction<T> extends RecursiveAction { private T[] array; //数据数组 private int start; //要处理的数据段的开始索引 private int middle; //要处理的数据段的中间索引 private int end; //要处理的数据段的结束索引 private int threshold; //临界值:每个数据段的最大长度 public AbstractRecursiveAction(T[] array, int start, int end, int threshold) { super(); this.array = array; this.start = start; this.end = end; this.threshold = threshold; this.middle = start + (end - start) / 2; } @Override protected void compute() { if(notNeedRecursion()){ handleFinalAction(); }else{ handleRecursiveAction(); } } public abstract void handleFinalAction(); public abstract void handleRecursiveAction(); /** * 不需要递归拆分任务 */ protected boolean notNeedRecursion(){ return (end - start + 1) <= threshold; } protected Integer[] getLeftData() { Integer[] leftArray = new Integer[getMiddle() - getStart() + 1]; Arrays.stream(getArray(), getStart(), getMiddle()+1).collect(Collectors.toList()).toArray(leftArray); System.out.println(Arrays.toString(leftArray)); return leftArray; } protected Integer[] getRightData() { Integer[] rightArray = new Integer[getEnd() - getMiddle()]; Arrays.stream(getArray(), getMiddle()+1, getEnd()+1).collect(Collectors.toList()).toArray(rightArray); System.out.println(Arrays.toString(rightArray)); return rightArray; } public T[] getArray() { return array; } public int getStart() { return start; } public int getMiddle() { return middle; } public int getEnd() { return end; } public int getThreshold() { return threshold; } }
对RecursiveTask抽象类的进一步封装:
public abstract class AbstractRecursiveTask<T, V> extends RecursiveTask<V> { private T[] array; //数据数组 private int start; //要处理的数据段的开始索引 private int middle; //要处理的数据段的中间索引 private int end; //要处理的数据段的结束索引 private int threshold; //临界值:每个数据段的最大长度 public AbstractRecursiveTask(T[] array, int start, int end, int threshold) { super(); this.array = array; this.start = start; this.end = end; this.threshold = threshold; this.middle = start + (end - start) / 2; } @Override protected V compute() { if(notNeedRecursion()){ return handleFinalTask(); }else{ return handleRecursiveTask(); } } public abstract V handleFinalTask(); public abstract V handleRecursiveTask(); /** * 不需要递归拆分任务 */ protected boolean notNeedRecursion(){ return (end - start + 1) <= threshold; } protected Integer[] getLeftData() { Integer[] leftArray = new Integer[getMiddle() - getStart() + 1]; Arrays.stream(getArray(), getStart(), getMiddle()+1).collect(Collectors.toList()).toArray(leftArray); System.out.println(Arrays.toString(leftArray)); return leftArray; } protected Integer[] getRightData() { Integer[] rightArray = new Integer[getEnd() - getMiddle()]; Arrays.stream(getArray(), getMiddle()+1, getEnd()+1).collect(Collectors.toList()).toArray(rightArray); System.out.println(Arrays.toString(rightArray)); return rightArray; } public T[] getArray() { return array; } public int getStart() { return start; } public int getMiddle() { return middle; } public int getEnd() { return end; } public int getThreshold() { return threshold; } }
创建用于并行计算总和的RecursiveTask子类:
public class SumTask extends AbstractRecursiveTask<Integer, Integer> { public SumTask(Integer array[], int start, int end, int threshold){ super(array, start, end, threshold); } @Override public Integer handleFinalTask() { int sum = 0; try{ sum = Arrays.stream(getArray(), getStart(), getEnd()+1) .mapToInt(Integer::intValue) .sum(); System.out.println(Thread.currentThread().getName() + " sum=" + sum); }catch(Exception ex){ completeExceptionally(ex); } return sum; } @Override public Integer handleRecursiveTask() { //left Integer[] leftArray = getLeftData(); SumTask left = new SumTask(leftArray, 0, leftArray.length-1, getThreshold()); //right Integer[] rightArray = getRightData(); SumTask right = new SumTask(rightArray, 0, rightArray.length-1, getThreshold()); //第一种用法 // right.fork(); //fork:将任务放到线程池中异步调度 // Integer leftValue = left.compute(); //compute:在当前线程执行任务 // Integer rightValue = right.join(); //join:等待并获取任务结果 // return leftValue + rightValue; //第二种用法 //并行执行两个小任务 invokeAll(left, right); return left.join() + right.join(); } }
测试代码:
private static Integer[] getData(int length){ Integer[] array = new Integer[length]; IntStream.rangeClosed(1, length) .boxed() .collect(Collectors.toList()) .toArray(array); return array; } private static void handleSum(){ // ForkJoinPool pool = ForkJoinPool.commonPool(); //最大工作线程数等于CPU核数-1 ForkJoinPool pool = new ForkJoinPool(4); //指定处理任务的线程数 Integer sum = 0; int length = 10; Integer[] array = getData(length); SumTask task = new SumTask(array, 0, array.length-1, 5); Future<Integer> future = pool.submit(task); sum = 0; try { sum = future.get(); } catch (InterruptedException|ExecutionException e) { System.out.println(task.getException().toString()); } System.out.println("sum=" + sum); pool.shutdown(); }
相关推荐
设计和实现使用Fork/Join框架的应用程序需要仔细考虑任务的拆分和合并策略,以及如何有效地避免任务窃取引起的竞争和数据一致性问题。 在实际应用中,Fork/Join框架可以极大地提高应用程序的性能,尤其是在运行在...
在Java编程领域,Fork/Join框架是一种并行计算模型,设计用于高效处理大量数据,尤其是在多核处理器系统上。这个框架是Java 7引入的一个重要特性,它基于分而治之(Divide and Conquer)策略,将复杂任务拆分为更小...
Fork/Join框架Package jsr166y是Java 7并行编程类的的初步版本(Preliminary versions of classes targeted for Java 7.)
Fork/Join框架是Java并发处理的一个重要工具,它基于工作窃取算法,设计用于高效地执行并行计算任务。这个框架是Java 7引入的,位于`java.util.concurrent.fork/join`包中,目的是简化多核处理器环境下大规模数据...
在使用Fork/Join框架时,应该注意以下几点: 尽量使用ForkJoinPool.commonPool()来获取默认的线程池,以减少资源消耗。 合理设置任务的分解阈值,以避免过度分解导致的性能下降。 避免在RecursiveTask内部使用...
fork/join框架与其它ExecutorService的实现类相似,会给线程池中的线程分发任务,不同之处在于它使用了工作窃取算法,所谓工作窃取,指的是对那些处理完自身任务的线程,会从其它线程窃取任务执行。 fork/join...
结合这两个技术,你可以创建高效的并发服务器,比如一个服务器端使用NIO监听和处理来自多个客户端的连接,而每个客户端请求的处理则可以利用Fork/Join框架进行并行计算。在实际项目中,`nioSample`工程可能包含这些...
快速排序是一个经典的适合使用Fork/Join框架的例子。首先创建一个`RecursiveTask`来表示排序任务,任务会检查数组的大小,如果数组元素少于某个阈值,就直接排序(递归结束条件);否则,将数组一分为二,分别创建两...
Fork/Join框架基于分治策略,即将一个复杂的大任务分解成两个或更多个较小的任务,直到这些任务小到可以直接计算结果,然后将这些结果合并得到最终答案。在Java中,Fork/Join框架主要由`ForkJoinPool`线程池和`...
在使用Fork/Join框架时,开发者首先需要定义一个`ForkJoinTask`的子类,实现任务的分解和合并逻辑。一般来说,任务分解的过程是递归的,如果任务足够小,就直接执行;否则,将其拆分为更小的任务,然后调用`fork()`...
然而,需要注意的是,不是所有任务都适合使用Fork/Join框架,对于那些任务间的依赖性强或者无法轻易拆分的问题,使用传统的线程池可能更为合适。因此,在实际应用中,需要根据问题的特点来选择合适的并发策略。
总结来说,Fork/Join框架是Java并发编程的重要进步,它通过提供一种易于理解和使用的并行编程模型,降低了编写高效并发程序的难度。结合java.util.concurrent包中的其他工具,开发者可以构建出既安全又高效的并发...
一个最简单的例子是使用 Fork/Join 框架来求一个数组中的最大/最小值,这个任务就可以拆成很多小任务,大任务就是寻找一个大数组中的最大/最小值,我们可以将一个大数组拆成很多小数组,然后分别求解每个小数组中的...
全网第一篇通过图文介绍Fork/Join框架与CompleteableFuture的PPT
fork/join框架是ExecutorService接口的一个实现,可以帮助开发人员充分利用多核处理器的优势,编写出并行执行的程序,提高应用程序的性能;设计的目的是为了处理那些可以被递归拆分的任务。
Fork/Join框架的测试demo,含源代码。 Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。 我们再通过Fork和Join这两个...
Java的Fork/Join框架是Java 7引入的一个并行计算工具,它是基于分而治之(Divide and Conquer)策略的。该框架旨在简化并行编程,尤其是在多核处理器环境中提高性能。Fork/Join框架的核心类包括`ForkJoinPool`和`...
Java 8 中的 Fork/Join 框架和 Optional 框架使用 Java 8 中的 Fork/Join 框架和 Optional 框架是两个非常重要的框架,它们在多线程编程和数据处理中发挥着重要作用。下面我们将详细介绍这两个框架的使用和原理。 ...