`
raymond.chen
  • 浏览: 1426269 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

Fork/Join框架的使用

 
阅读更多

Fork/Join框架的使用

        Java7提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合并成总的计算结果。ForkJoinPool类是Fork/Join框架的核心,它和ThreadPoolExecutor一样也是ExecutorService接口的实现类。

 

        ForkJoinPool最大的特殊之处就在于其实现了工作窃取(work-stealing)。所谓工作窃取,即当前线程的Task已经全被执行完毕,则自动从其他线程的任务池末尾取出任务继续执行。

 

        一个fork/join框架之下的任务由ForkJoinTask类表示。ForkJoinTask实现了Future接口,可以按照Future接口的方式来使用。在ForkJoinTask类中之重要的两个方法fork和join。fork方法用异步方式启动任务的执行,join方法则等待任务完成并返回结果。

 

        在创建任务时,通常不直接使用ForkJoinTask类,是使用它的两个抽象子类:

               RecursiveAction:没有返回值的任务

               RecursiveTask:有返回值的任务

 

对RecursiveAction抽象类的进一步封装:

public abstract class AbstractRecursiveAction<T> extends RecursiveAction {
	private T[] array; //数据数组
	private int start; //要处理的数据段的开始索引
	private int middle; //要处理的数据段的中间索引
	private int end; //要处理的数据段的结束索引
	private int threshold; //临界值:每个数据段的最大长度
    
    public AbstractRecursiveAction(T[] array, int start, int end, int threshold) {
        super();
        this.array = array;
        this.start = start;
        this.end = end;
        this.threshold = threshold;
        this.middle = start + (end - start) / 2;
    }
    
	@Override
	protected void compute() {
		if(notNeedRecursion()){
			handleFinalAction();
        }else{
        	handleRecursiveAction();
        }
	}
	
	public abstract void handleFinalAction();
	
	public abstract void handleRecursiveAction();
	
	/**
	 * 不需要递归拆分任务
	 */
	protected boolean notNeedRecursion(){
		return (end - start + 1) <= threshold;
	}

	protected Integer[] getLeftData() {
		Integer[] leftArray = new Integer[getMiddle() - getStart() + 1];
    	Arrays.stream(getArray(), getStart(), getMiddle()+1).collect(Collectors.toList()).toArray(leftArray);
        System.out.println(Arrays.toString(leftArray));
		return leftArray;
	}

	protected Integer[] getRightData() {
		Integer[] rightArray = new Integer[getEnd() - getMiddle()];
    	Arrays.stream(getArray(), getMiddle()+1, getEnd()+1).collect(Collectors.toList()).toArray(rightArray);
        System.out.println(Arrays.toString(rightArray));
		return rightArray;
	}

	public T[] getArray() {
		return array;
	}

	public int getStart() {
		return start;
	}

	public int getMiddle() {
		return middle;
	}

	public int getEnd() {
		return end;
	}

	public int getThreshold() {
		return threshold;
	}
}

 

对RecursiveTask抽象类的进一步封装:

public abstract class AbstractRecursiveTask<T, V> extends RecursiveTask<V> {
	private T[] array; //数据数组
	private int start; //要处理的数据段的开始索引
	private int middle; //要处理的数据段的中间索引
	private int end; //要处理的数据段的结束索引
	private int threshold; //临界值:每个数据段的最大长度
    
    public AbstractRecursiveTask(T[] array, int start, int end, int threshold) {
        super();
        this.array = array;
        this.start = start;
        this.end = end;
        this.threshold = threshold;
        this.middle = start + (end - start) / 2;
    }
    
	@Override
	protected V compute() {
		if(notNeedRecursion()){
			return handleFinalTask();
        }else{
        	return handleRecursiveTask();
        }
	}
	
	public abstract V handleFinalTask();
	
	public abstract V handleRecursiveTask();
	
	/**
	 * 不需要递归拆分任务
	 */
	protected boolean notNeedRecursion(){
		return (end - start + 1) <= threshold;
	}

	protected Integer[] getLeftData() {
		Integer[] leftArray = new Integer[getMiddle() - getStart() + 1];
    	Arrays.stream(getArray(), getStart(), getMiddle()+1).collect(Collectors.toList()).toArray(leftArray);
        System.out.println(Arrays.toString(leftArray));
		return leftArray;
	}

	protected Integer[] getRightData() {
		Integer[] rightArray = new Integer[getEnd() - getMiddle()];
    	Arrays.stream(getArray(), getMiddle()+1, getEnd()+1).collect(Collectors.toList()).toArray(rightArray);
        System.out.println(Arrays.toString(rightArray));
		return rightArray;
	}

	public T[] getArray() {
		return array;
	}

	public int getStart() {
		return start;
	}

	public int getMiddle() {
		return middle;
	}

	public int getEnd() {
		return end;
	}

	public int getThreshold() {
		return threshold;
	}
}

 

创建用于并行计算总和的RecursiveTask子类:

public class SumTask extends AbstractRecursiveTask<Integer, Integer> {
	public SumTask(Integer array[], int start, int end, int threshold){
        super(array, start, end, threshold);
	}
	
	@Override
	public Integer handleFinalTask() {
		int sum = 0;
		try{
			sum = Arrays.stream(getArray(), getStart(), getEnd()+1)
					.mapToInt(Integer::intValue)
					.sum();
			System.out.println(Thread.currentThread().getName() + " sum=" + sum);
		}catch(Exception ex){
			completeExceptionally(ex);
		}
		return sum;
	}
	
	@Override
	public Integer handleRecursiveTask() {
    	//left
    	Integer[] leftArray = getLeftData();
    	SumTask left = new SumTask(leftArray, 0, leftArray.length-1, getThreshold());
    	
    	//right
    	Integer[] rightArray = getRightData();
    	SumTask right = new SumTask(rightArray, 0, rightArray.length-1, getThreshold());
        
        //第一种用法
//    	right.fork(); //fork:将任务放到线程池中异步调度
//    	Integer leftValue = left.compute(); //compute:在当前线程执行任务
//    	Integer rightValue = right.join();  //join:等待并获取任务结果
//    	return leftValue + rightValue;
    	
    	//第二种用法
        //并行执行两个小任务
    	invokeAll(left, right);
    	return left.join() + right.join();
	}
}

 

测试代码:

private static Integer[] getData(int length){
	Integer[] array = new Integer[length];
	IntStream.rangeClosed(1, length)
		.boxed()
		.collect(Collectors.toList())
		.toArray(array);
	return array;
}

private static void handleSum(){
//	ForkJoinPool pool = ForkJoinPool.commonPool(); //最大工作线程数等于CPU核数-1
	ForkJoinPool pool = new ForkJoinPool(4); //指定处理任务的线程数
	Integer sum = 0;
	
	int length = 10;
	Integer[] array = getData(length);
	
	SumTask task = new SumTask(array, 0, array.length-1, 5);
	Future<Integer> future = pool.submit(task);
	
	sum = 0;
	try {
		sum = future.get();
	} catch (InterruptedException|ExecutionException e) {
		System.out.println(task.getException().toString());
	}

	System.out.println("sum=" + sum);
	
	pool.shutdown();
}

 

分享到:
评论

相关推荐

    Fork/Join例子

    在Java编程领域,Fork/Join框架是一种并行计算模型,设计用于高效处理大量数据,尤其是在多核处理器系统上。这个框架是Java 7引入的一个重要特性,它基于分而治之(Divide and Conquer)策略,将复杂任务拆分为更小...

    java Fork Join框架及使用

    java Fork Join框架及使用,java自带的多线程框架,来处理多线程的问题

    Fork/Join框架Package jsr166y

    Fork/Join框架Package jsr166y是Java 7并行编程类的的初步版本(Preliminary versions of classes targeted for Java 7.)

    fork/join 实例

    Fork/Join框架是Java并发处理的一个重要工具,它基于工作窃取算法,设计用于高效地执行并行计算任务。这个框架是Java 7引入的,位于`java.util.concurrent.fork/join`包中,目的是简化多核处理器环境下大规模数据...

    Java中的Fork/Join框架

     fork/join框架与其它ExecutorService的实现类相似,会给线程池中的线程分发任务,不同之处在于它使用了工作窃取算法,所谓工作窃取,指的是对那些处理完自身任务的线程,会从其它线程窃取任务执行。  fork/join...

    java NIO用法及java fork/join 用法源码工程

    结合这两个技术,你可以创建高效的并发服务器,比如一个服务器端使用NIO监听和处理来自多个客户端的连接,而每个客户端请求的处理则可以利用Fork/Join框架进行并行计算。在实际项目中,`nioSample`工程可能包含这些...

    Java并发Fork and join

    快速排序是一个经典的适合使用Fork/Join框架的例子。首先创建一个`RecursiveTask`来表示排序任务,任务会检查数组的大小,如果数组元素少于某个阈值,就直接排序(递归结束条件);否则,将数组一分为二,分别创建两...

    Java Fork/Join框架

    在使用Fork/Join框架时,开发者首先需要定义一个`ForkJoinTask`的子类,实现任务的分解和合并逻辑。一般来说,任务分解的过程是递归的,如果任务足够小,就直接执行;否则,将其拆分为更小的任务,然后调用`fork()`...

    浅谈Java Fork/Join并行框架

    一个最简单的例子是使用 Fork/Join 框架来求一个数组中的最大/最小值,这个任务就可以拆成很多小任务,大任务就是寻找一个大数组中的最大/最小值,我们可以将一个大数组拆成很多小数组,然后分别求解每个小数组中的...

    Fork:join框架与CompleteableFuture源码解析.pptx

    全网第一篇通过图文介绍Fork/Join框架与CompleteableFuture的PPT

    java fork-join框架介绍

    fork/join框架是ExecutorService接口的一个实现,可以帮助开发人员充分利用多核处理器的优势,编写出并行执行的程序,提高应用程序的性能;设计的目的是为了处理那些可以被递归拆分的任务。

    ForkJoinUtil.java,一个分而治之的框架工具类

    Fork/Join框架的测试demo,含源代码。 Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。 我们再通过Fork和Join这两个...

    Java通过Fork/Join优化并行计算

    Java的Fork/Join框架是Java 7引入的一个并行计算工具,它是基于分而治之(Divide and Conquer)策略的。该框架旨在简化并行编程,尤其是在多核处理器环境中提高性能。Fork/Join框架的核心类包括`ForkJoinPool`和`...

    java8中forkjoin和optional框架使用

    Java 8 中的 Fork/Join 框架和 Optional 框架使用 Java 8 中的 Fork/Join 框架和 Optional 框架是两个非常重要的框架,它们在多线程编程和数据处理中发挥着重要作用。下面我们将详细介绍这两个框架的使用和原理。 ...

    Java并发Fork-Join框架原理

    Java并发Fork-Join框架原理 Java并发Fork-Join框架原理是Java7中提供的一种并行执行任务的框架,旨在...在实际应用中,Fork-Join框架和并行流可以组合使用,以充分利用多核CPU的计算能力和提高程序的执行效率和性能。

    Fork Join框架机制详解.docx

    Java提供Fork/Join框架用于并行执行任务,核心的思想就是将一个大任务切分成多个小任务,然后汇总每个小任务的执行结果得到这个大任务的最终结果。 这种机制策略在分布式数据库中非常常见,数据分布在不同的数据库的...

    Java ForkJoin框架的原理及用法

    ForkJoin框架的使用可以提高数据的计算速度,但需要注意数据共享问题。 ForkJoin的使用可以分为两步: 第一步:创建ForkJoin的任务,需要继承ForkJoinTask抽象类的其中一个,并实现compute方法。例如, SumTask...

    java-fork-join-example

    与ExecutorService其他实现不同,Fork / Join框架使用工作窃取算法( ),该算法可最大程度地利用线程,并提供了一种更简单的方式来处理产生其他任务的任务(称为子任务)。 以下列出的所有代码都可以在以下位置...

    Fork-Join框架演示

    Java Fork-Join 论文 排序

    ForkJoin并发框架入门示例

    `ForkJoin入门.ppt`是PPT文件,里面详细介绍了并发与并行的概念以及ForkJoin框架的使用方法,包括如何创建和执行ForkJoin任务。`FileSize.java`可能包含了一个实际的ForkJoinTask示例,用于计算文件大小或其他类似的...

Global site tag (gtag.js) - Google Analytics