`
raymond.chen
  • 浏览: 1436882 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

Fork/Join框架的使用

 
阅读更多

Fork/Join框架的使用

        Java7提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合并成总的计算结果。ForkJoinPool类是Fork/Join框架的核心,它和ThreadPoolExecutor一样也是ExecutorService接口的实现类。

 

        ForkJoinPool最大的特殊之处就在于其实现了工作窃取(work-stealing)。所谓工作窃取,即当前线程的Task已经全被执行完毕,则自动从其他线程的任务池末尾取出任务继续执行。

 

        一个fork/join框架之下的任务由ForkJoinTask类表示。ForkJoinTask实现了Future接口,可以按照Future接口的方式来使用。在ForkJoinTask类中之重要的两个方法fork和join。fork方法用异步方式启动任务的执行,join方法则等待任务完成并返回结果。

 

        在创建任务时,通常不直接使用ForkJoinTask类,是使用它的两个抽象子类:

               RecursiveAction:没有返回值的任务

               RecursiveTask:有返回值的任务

 

对RecursiveAction抽象类的进一步封装:

public abstract class AbstractRecursiveAction<T> extends RecursiveAction {
	private T[] array; //数据数组
	private int start; //要处理的数据段的开始索引
	private int middle; //要处理的数据段的中间索引
	private int end; //要处理的数据段的结束索引
	private int threshold; //临界值:每个数据段的最大长度
    
    public AbstractRecursiveAction(T[] array, int start, int end, int threshold) {
        super();
        this.array = array;
        this.start = start;
        this.end = end;
        this.threshold = threshold;
        this.middle = start + (end - start) / 2;
    }
    
	@Override
	protected void compute() {
		if(notNeedRecursion()){
			handleFinalAction();
        }else{
        	handleRecursiveAction();
        }
	}
	
	public abstract void handleFinalAction();
	
	public abstract void handleRecursiveAction();
	
	/**
	 * 不需要递归拆分任务
	 */
	protected boolean notNeedRecursion(){
		return (end - start + 1) <= threshold;
	}

	protected Integer[] getLeftData() {
		Integer[] leftArray = new Integer[getMiddle() - getStart() + 1];
    	Arrays.stream(getArray(), getStart(), getMiddle()+1).collect(Collectors.toList()).toArray(leftArray);
        System.out.println(Arrays.toString(leftArray));
		return leftArray;
	}

	protected Integer[] getRightData() {
		Integer[] rightArray = new Integer[getEnd() - getMiddle()];
    	Arrays.stream(getArray(), getMiddle()+1, getEnd()+1).collect(Collectors.toList()).toArray(rightArray);
        System.out.println(Arrays.toString(rightArray));
		return rightArray;
	}

	public T[] getArray() {
		return array;
	}

	public int getStart() {
		return start;
	}

	public int getMiddle() {
		return middle;
	}

	public int getEnd() {
		return end;
	}

	public int getThreshold() {
		return threshold;
	}
}

 

对RecursiveTask抽象类的进一步封装:

public abstract class AbstractRecursiveTask<T, V> extends RecursiveTask<V> {
	private T[] array; //数据数组
	private int start; //要处理的数据段的开始索引
	private int middle; //要处理的数据段的中间索引
	private int end; //要处理的数据段的结束索引
	private int threshold; //临界值:每个数据段的最大长度
    
    public AbstractRecursiveTask(T[] array, int start, int end, int threshold) {
        super();
        this.array = array;
        this.start = start;
        this.end = end;
        this.threshold = threshold;
        this.middle = start + (end - start) / 2;
    }
    
	@Override
	protected V compute() {
		if(notNeedRecursion()){
			return handleFinalTask();
        }else{
        	return handleRecursiveTask();
        }
	}
	
	public abstract V handleFinalTask();
	
	public abstract V handleRecursiveTask();
	
	/**
	 * 不需要递归拆分任务
	 */
	protected boolean notNeedRecursion(){
		return (end - start + 1) <= threshold;
	}

	protected Integer[] getLeftData() {
		Integer[] leftArray = new Integer[getMiddle() - getStart() + 1];
    	Arrays.stream(getArray(), getStart(), getMiddle()+1).collect(Collectors.toList()).toArray(leftArray);
        System.out.println(Arrays.toString(leftArray));
		return leftArray;
	}

	protected Integer[] getRightData() {
		Integer[] rightArray = new Integer[getEnd() - getMiddle()];
    	Arrays.stream(getArray(), getMiddle()+1, getEnd()+1).collect(Collectors.toList()).toArray(rightArray);
        System.out.println(Arrays.toString(rightArray));
		return rightArray;
	}

	public T[] getArray() {
		return array;
	}

	public int getStart() {
		return start;
	}

	public int getMiddle() {
		return middle;
	}

	public int getEnd() {
		return end;
	}

	public int getThreshold() {
		return threshold;
	}
}

 

创建用于并行计算总和的RecursiveTask子类:

public class SumTask extends AbstractRecursiveTask<Integer, Integer> {
	public SumTask(Integer array[], int start, int end, int threshold){
        super(array, start, end, threshold);
	}
	
	@Override
	public Integer handleFinalTask() {
		int sum = 0;
		try{
			sum = Arrays.stream(getArray(), getStart(), getEnd()+1)
					.mapToInt(Integer::intValue)
					.sum();
			System.out.println(Thread.currentThread().getName() + " sum=" + sum);
		}catch(Exception ex){
			completeExceptionally(ex);
		}
		return sum;
	}
	
	@Override
	public Integer handleRecursiveTask() {
    	//left
    	Integer[] leftArray = getLeftData();
    	SumTask left = new SumTask(leftArray, 0, leftArray.length-1, getThreshold());
    	
    	//right
    	Integer[] rightArray = getRightData();
    	SumTask right = new SumTask(rightArray, 0, rightArray.length-1, getThreshold());
        
        //第一种用法
//    	right.fork(); //fork:将任务放到线程池中异步调度
//    	Integer leftValue = left.compute(); //compute:在当前线程执行任务
//    	Integer rightValue = right.join();  //join:等待并获取任务结果
//    	return leftValue + rightValue;
    	
    	//第二种用法
        //并行执行两个小任务
    	invokeAll(left, right);
    	return left.join() + right.join();
	}
}

 

测试代码:

private static Integer[] getData(int length){
	Integer[] array = new Integer[length];
	IntStream.rangeClosed(1, length)
		.boxed()
		.collect(Collectors.toList())
		.toArray(array);
	return array;
}

private static void handleSum(){
//	ForkJoinPool pool = ForkJoinPool.commonPool(); //最大工作线程数等于CPU核数-1
	ForkJoinPool pool = new ForkJoinPool(4); //指定处理任务的线程数
	Integer sum = 0;
	
	int length = 10;
	Integer[] array = getData(length);
	
	SumTask task = new SumTask(array, 0, array.length-1, 5);
	Future<Integer> future = pool.submit(task);
	
	sum = 0;
	try {
		sum = future.get();
	} catch (InterruptedException|ExecutionException e) {
		System.out.println(task.getException().toString());
	}

	System.out.println("sum=" + sum);
	
	pool.shutdown();
}

 

分享到:
评论

相关推荐

    java Fork Join框架及使用

    设计和实现使用Fork/Join框架的应用程序需要仔细考虑任务的拆分和合并策略,以及如何有效地避免任务窃取引起的竞争和数据一致性问题。 在实际应用中,Fork/Join框架可以极大地提高应用程序的性能,尤其是在运行在...

    Fork/Join例子

    在Java编程领域,Fork/Join框架是一种并行计算模型,设计用于高效处理大量数据,尤其是在多核处理器系统上。这个框架是Java 7引入的一个重要特性,它基于分而治之(Divide and Conquer)策略,将复杂任务拆分为更小...

    Fork/Join框架Package jsr166y

    Fork/Join框架Package jsr166y是Java 7并行编程类的的初步版本(Preliminary versions of classes targeted for Java 7.)

    fork/join 实例

    Fork/Join框架是Java并发处理的一个重要工具,它基于工作窃取算法,设计用于高效地执行并行计算任务。这个框架是Java 7引入的,位于`java.util.concurrent.fork/join`包中,目的是简化多核处理器环境下大规模数据...

    Java中的Fork,Join框架深度解析

    在使用Fork/Join框架时,应该注意以下几点: 尽量使用ForkJoinPool.commonPool()来获取默认的线程池,以减少资源消耗。 合理设置任务的分解阈值,以避免过度分解导致的性能下降。 避免在RecursiveTask内部使用...

    Java中的Fork/Join框架

     fork/join框架与其它ExecutorService的实现类相似,会给线程池中的线程分发任务,不同之处在于它使用了工作窃取算法,所谓工作窃取,指的是对那些处理完自身任务的线程,会从其它线程窃取任务执行。  fork/join...

    java NIO用法及java fork/join 用法源码工程

    结合这两个技术,你可以创建高效的并发服务器,比如一个服务器端使用NIO监听和处理来自多个客户端的连接,而每个客户端请求的处理则可以利用Fork/Join框架进行并行计算。在实际项目中,`nioSample`工程可能包含这些...

    Java并发Fork and join

    快速排序是一个经典的适合使用Fork/Join框架的例子。首先创建一个`RecursiveTask`来表示排序任务,任务会检查数组的大小,如果数组元素少于某个阈值,就直接排序(递归结束条件);否则,将数组一分为二,分别创建两...

    Fork Join框架机制详解.docx

    Fork/Join框架基于分治策略,即将一个复杂的大任务分解成两个或更多个较小的任务,直到这些任务小到可以直接计算结果,然后将这些结果合并得到最终答案。在Java中,Fork/Join框架主要由`ForkJoinPool`线程池和`...

    Java Fork/Join框架

    在使用Fork/Join框架时,开发者首先需要定义一个`ForkJoinTask`的子类,实现任务的分解和合并逻辑。一般来说,任务分解的过程是递归的,如果任务足够小,就直接执行;否则,将其拆分为更小的任务,然后调用`fork()`...

    35 拆分你的任务—学习使用Fork-Join框架.pdf

    然而,需要注意的是,不是所有任务都适合使用Fork/Join框架,对于那些任务间的依赖性强或者无法轻易拆分的问题,使用传统的线程池可能更为合适。因此,在实际应用中,需要根据问题的特点来选择合适的并发策略。

    译文:Fork and Join: Java Can Excel at Painless Parallel Programming Too!

    总结来说,Fork/Join框架是Java并发编程的重要进步,它通过提供一种易于理解和使用的并行编程模型,降低了编写高效并发程序的难度。结合java.util.concurrent包中的其他工具,开发者可以构建出既安全又高效的并发...

    浅谈Java Fork/Join并行框架

    一个最简单的例子是使用 Fork/Join 框架来求一个数组中的最大/最小值,这个任务就可以拆成很多小任务,大任务就是寻找一个大数组中的最大/最小值,我们可以将一个大数组拆成很多小数组,然后分别求解每个小数组中的...

    Fork:join框架与CompleteableFuture源码解析.pptx

    全网第一篇通过图文介绍Fork/Join框架与CompleteableFuture的PPT

    java fork-join框架介绍

    fork/join框架是ExecutorService接口的一个实现,可以帮助开发人员充分利用多核处理器的优势,编写出并行执行的程序,提高应用程序的性能;设计的目的是为了处理那些可以被递归拆分的任务。

    ForkJoinUtil.java,一个分而治之的框架工具类

    Fork/Join框架的测试demo,含源代码。 Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。 我们再通过Fork和Join这两个...

    Java通过Fork/Join优化并行计算

    Java的Fork/Join框架是Java 7引入的一个并行计算工具,它是基于分而治之(Divide and Conquer)策略的。该框架旨在简化并行编程,尤其是在多核处理器环境中提高性能。Fork/Join框架的核心类包括`ForkJoinPool`和`...

    java8中forkjoin和optional框架使用

    Java 8 中的 Fork/Join 框架和 Optional 框架使用 Java 8 中的 Fork/Join 框架和 Optional 框架是两个非常重要的框架,它们在多线程编程和数据处理中发挥着重要作用。下面我们将详细介绍这两个框架的使用和原理。 ...

Global site tag (gtag.js) - Google Analytics