`
qindongliang1922
  • 浏览: 2184636 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
7265517b-f87e-3137-b62c-5c6e30e26109
证道Lucene4
浏览量:117554
097be4a0-491e-39c0-89ff-3456fadf8262
证道Hadoop
浏览量:125937
41c37529-f6d8-32e4-8563-3b42b2712a50
证道shell编程
浏览量:59935
43832365-bc15-3f5d-b3cd-c9161722a70c
ELK修真
浏览量:71310
社区版块
存档分类
最新评论

Java7之线程池ForkJoinPool

阅读更多
许多情况下,在一个程序中使用多线程是有益处的,可以大大提高程序的效率,多线程主要有以下3个优点1,资源利用率更好2,程序设计在某些情况下更简单3,程序响应更快。当然凡事有利就有弊,多线程也会使程序的开发,维护及调试更加复杂,当然如果我们能够扬长避短,在正确的场合下使用多线程,那么它将成为我们程序中开发的利器。


Java一直以来,对多线程的开发支持比较良好,特别在JDK5,6后引入的java.util.concurrent包,使用多线程的开发变的更加容易,这个包里面大部分的API都是更进一步的封装,作为开发者,我们只需熟悉它怎么使用,就能够很轻松的上手,当然如果你想完全搞懂它,那么就需要读读那些优秀的源码了。

ForkJoinPool这个类是JDK7后新增的线程池,很适合在单机多核的PC上部署多线程程序,ForkJoinPool使用的分而治之的思想,这一点与当前很火的大数据处理框架Hadoop的map/reduce思想非常类似,但是他们的使用场合却不一样,ForkJoinPool适合在一台PC多核CPU上运行,而hadoop则适合在分布式环境中进行大规模集群部署。

Fork/Join 模式有自己的适用范围。如果一个应用能被分解成多个子任务,并且组合多个子任务的结果就能够获得最终的答案,那么这个应用就适合用 Fork/Join 模式来解决.

ForkJoinPool使用的工作窃取的方式能够在最大方式上充分利用CPU的资源,一般流程是fork分解,join结合。本质是将一个任务分解成多个子任务,每个子任务用单独的线程去处理,主要几个常用方法有
fork( ForkJoinTask)异步执行一个线程
join( ForkJoinTask)等待任务完成并返回执行结果
execute( ForkJoinTask)执行不带返回值的任务
submit( ForkJoinTask)执行带返回值的任务
invoke( ForkJoinTask)执行指定的任务,等待完成,返回结果。
invokeAll(ForkJoinTask)执行指定的任务,等待完成,返回结果。
shutdown()执行此方法之后,ForkJoinPool 不再接受新的任务,但是已经提交的任务可以继续执行。如果希望立刻停止所有的任务,可以尝试 shutdownNow() 方法。
awaitTermination(int, TimeUnit.SECONDS)阻塞当前线程直到 ForkJoinPool 中所有的任务都执行结束。
compute()执行任务的具体方法
Runtime.getRuntime().availableProcessors()获取CPU个数的方法

关于上表中的ForkJoinTask是一个抽象类,代表一个可以fork与join的任务. ,它还有2个抽象子类:RecursiveAcion和RecursiveTask.其中RecursiveTask代表有泛型返回值的任务.而RecursiveAction代表没有返回值.


下面给出测试代码

package com.demo;

import java.util.concurrent.RecursiveAction;

/**
 * 
 * 继承RecursiveAction来实现可分解的任务
 * 注意无返回值
 * 
 * **/

public class PrintTask extends RecursiveAction {
 
	//每个小任务,最多只打印50个数
	private static final int threshold=50;
	//打印任务的开始
	private int start;
	//打印任务的结束
	private int end;
	
	public PrintTask() {
		// TODO Auto-generated constructor stub
	}
	
	
	
	//打印从start到end之间的任务
	public PrintTask(int start, int end) {
		super();
		this.start = start;
		this.end = end;
	}




	@Override
	protected void compute() {
		
		if(end-start<threshold){
			for(int i=start;i<end;i++){
				
				System.out.println(Thread.currentThread().getName()+"i的值:"+i);
			}
		}else{
			//当end与start之间的差大于threshold,及打印的数超过50个时,
			//将大任务分解成2个小任务
			int middle=(start+end)/2;
			PrintTask left=new PrintTask(start, middle);
			PrintTask  right=new PrintTask(middle, end);
			//并行执行两个小任务
			left.fork();
			right.fork();
			
		}
		
		
	}
	
	

}

package com.demo;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.TimeUnit;
/**
 * 测试打印
 * 
 * */
public class Test {

	
	public static void main(String[] args)throws Exception {
		ForkJoinPool pool=new ForkJoinPool();
		//提交可分解的任务
		pool.submit(new PrintTask(0,101));
		
		//阻塞等待所有任务完成 
		pool.awaitTermination(2, TimeUnit.SECONDS);
		pool.shutdown();//关闭线程池
		
	}
}

有返回值的demo
package com.demo;

import java.util.concurrent.RecursiveTask;
/***
 * 有返回值
 * 
 * */
public class CalTask  extends RecursiveTask<Integer>{

	
	
	//将每个小任务,最多只能累加20个数
	private static final int threshold=20;
	private int arr[];
	private int start;//开始
	private int end;//
	//累加从start到end之间的数
	
	public CalTask() {
		// TODO Auto-generated constructor stub
	}
	
	
	//累加从start到end的数组元素
	public CalTask(int[] arr, int start, int end) {
		super();
		this.arr = arr;
		this.start = start;
		this.end = end;
	}



	@Override
	protected Integer compute() {
		int sum=0;
		//当end与start之间的差小于threshold,开始进行累加
		if(end-start<threshold){
			for(int i=start;i<end;i++){
				sum+=arr[i];
			}
			return sum;
			
		}else{
			
			//当end与start之间的差大于threshold,要计算的数超过20个时,
			//将大任务分解成两个小任务
			
			int middle=(start+end)/2;
			CalTask left=new CalTask(arr, start, middle);
			CalTask right=new CalTask(arr, middle, end);
			//并行执行2个小任务
			left.fork();
			right.fork();
			//把2个小任务,累加的结果合并起来
			return left.join()+right.join();
		}
		 
		 
	}
	

}


package com.demo;

import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;

public class Sum {
	
	
	public static void main(String[] args)throws Exception {
		
		int []arr=new int[1100];
		Random rand=new Random();
		int total=0;
		//初始化100个数字
		for(int i=0;i<arr.length;i++){
			int tmp=rand.nextInt(20);
			//对元素赋值,并将数组元素的值添加到total总和中
			total+=(arr[i]=tmp);
			
			
		}
		System.out.println("正确的total:"+total);
		ForkJoinPool pool=new ForkJoinPool();
		//提交可分解的CalTask任务
		Future<Integer> future=pool.submit(new CalTask(arr, 0, arr.length));
		System.out.println(future.get());
		//关闭线程池
		
		pool.shutdown();
	}

}



分享到:
评论
1 楼 ksisn 2014-11-22  
楼主您好,很荣幸能拜读到您的博客,我是刚毕业的程序猿,用您这篇文章的例子进行一些测试,发现比单线程方式在时间上慢了很多倍,那么像这种ForkJoinPool 适用的场景又是什么呢,一直在拜读您的例子中,这个没弄懂,特地请教您。

相关推荐

    13、线程池ForkJoinPool工作原理分析

    线程池ForkJoinPool是Java并发编程中的一个重要工具,它是Java 7引入的一个新特性,主要用于优化并行计算,特别是在处理大量可分任务时,如递归算法。ForkJoinPool是基于工作窃取(Work-Stealing)算法的线程池,...

    Java线程池ForkJoinPool实例解析

    Java线程池ForkJoinPool实例解析是Java并发编程中的一种高级主题,ForkJoinPool是Java 7中引入的一种新的线程池实现,它可以充分利用多CPU和多核CPU的优势,使得并发编程变得更加高效。 ForkJoinPool的优势在于,...

    13、线程池ForkJoinPool实战及其工作原理分析(1).pdf

    - **定义**: `ForkJoinPool`是Java 7引入的一种用于执行大量细粒度并行任务的线程池。 - **特点**: 它主要通过`Fork-Join`框架实现,能够有效地管理并行任务,特别是适用于那些可以被细分为更小任务的任务类型。 ##...

    Java新的线程持池ForkJoinPool.pdf

    Java中的ForkJoinPool是Java 7引入的一种新的线程池实现,它是为了解决大量并行计算场景下的效率问题而设计的。ForkJoinPool的设计理念基于分治策略(Divide and Conquer),适用于那些可以拆分成多个子任务的任务,...

    Java8并行流中自定义线程池操作示例

    知识点:可以使用ForkJoinPool的构造方法并设定并行级别来创建一个自定义的线程池。 4. 总结 我们简要地看了一下,如何使用一个自定义的Thread Pool运行并行流。只要在正确的环境中配置了合适的平行级别,就能在...

    Java多线程ForkJoinPool实例详解

    Java多线程编程中的ForkJoinPool实例详解是Java 7中引入的一种高效的并发编程框架。ForkJoinPool是ExecutorService接口的实现,它管理工作窃取算法(Work-Stealing Algorithm)实现高效的任务执行和线程管理。 ...

    parallel-stream-fork-join-pool

    所有并行流执行都使用相同的(单例)线程池:ForkJoinPool.commonPool()。 这就是为什么在并行流中执行 IO(更常见的是阻塞调用)非常糟糕的原因:被阻塞的线程无法被 JVM 中的所有并行流使用。 为此,您必须改用 ...

    designpattern.zip

    13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、有序性、原子性与JMM内存模型 (1).pdf 15、CPU缓存架构详解&高性能内存队列Disruptor 实战 (1).pdf 16、常用并发设计模式精讲 (1)....

    forkjoin.zip

    13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、有序性、原子性与JMM内存模型 (1).pdf 15、CPU缓存架构详解&高性能内存队列Disruptor 实战 (1).pdf 16、常用并发设计模式精讲 (1)....

    disruptor.zip

    13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、有序性、原子性与JMM内存模型 (1).pdf 15、CPU缓存架构详解&高性能内存队列Disruptor 实战 (1).pdf 16、常用并发设计模式精讲 (1)....

    jmm(1).zip

    13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、有序性、原子性与JMM内存模型 (1).pdf 15、CPU缓存架构详解&高性能内存队列Disruptor 实战 (1).pdf 16、常用并发设计模式精讲 (1)....

    JAVA集中常用的线程池比较共8页.pdf.zip

    `ForkJoinPool`是Java 7引入的,用于支持`Fork/Join`框架,适合进行大量计算任务,如并行排序。它有自己的工作窃取算法,线程会主动去寻找其他已完成或未开始的任务来执行,提高并行性。 线程池的选择应根据实际...

    Java中的线程与线程池.pptx

    5. ForkJoinPool:用于执行Fork/Join框架的任务,适用于分割任务并行执行的场景。 ThreadPoolExecutor是线程池的核心实现,其构造函数的参数定义了线程池的行为: - corePoolSize:核心线程数,即使无任务,这些...

    java7 并发编程forjoinpool

    Java 7 引入了一种新的并发编程框架——ForkJoinPool,它是基于分而治之(Divide and Conquer)策略的并行计算模型。ForkJoinPool 和与其配合使用的 RecursiveAction 和 RecursiveTask 类,为开发者提供了更高效地...

    Java线程池介绍Java开发Java经验技巧共8页.pd

    - 对于短生命周期的任务,可选择`ForkJoinPool`或`ThreadPoolExecutor`配合`WorkStealingPool`。 - 定期分析线程池状态,根据实际情况调整参数。 理解并熟练运用Java线程池,不仅可以提升程序性能,还能有效控制...

    35 拆分你的任务—学习使用Fork-Join框架.pdf

    客户端代码中,首先创建一个ForkJoinPool实例,通常我们会使用ForkJoinPool.commonPool()来获取一个默认的线程池。然后,提交一个Task实例,ForkJoinPool会自动处理任务的拆分和结果的合并。最后,通过调用Task的get...

    常用多线程模板与鱼刺类多线程线程池应用小例子

    import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; public class SumCalculator extends RecursiveAction { private int[] array; private int start; private int end;...

    聊聊并发(3)Java线程池的分析和使用Java开发Jav

    例如,`Executors`类提供了几个预设的线程池构造方法,如`newFixedThreadPool`(固定大小线程池)、`newSingleThreadExecutor`(单线程线程池)和`newWorkStealingPool`(ForkJoinPool,适用于并行计算)。...

    java7并发编程实战手册 书中源码

    在Java 7中,`ForkJoinPool`和`RecursiveTask`或`RecursiveAction`是新增的重要特性,用于执行可拆分的计算任务,尤其适合大数据处理。`ForkJoinPool`使用工作窃取算法,使得线程能有效地平衡负载,提高并行性。 接...

Global site tag (gtag.js) - Google Analytics