`
BrokenDreams
  • 浏览: 254908 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
68ec41aa-0ce6-3f83-961b-5aa541d59e48
Java并发包源码解析
浏览量:100501
社区版块
存档分类
最新评论

Jdk1.7 JUC源码增量解析(2)-ForkJoin-框架概览

阅读更多

Jdk1.7 JUC源码增量解析(2)-ForkJoin-框架概览

作者:大飞

 

  • ForkJoin框架是什么?
       ForkJoin框架是jdk1.7提供的一个并行计算框架。

  • ForkJoin框架能干什么?
       首先ForkJoin框架是针对一些符合ForkJoin模型的任务而设计的,那什么是ForkJoin模型呢?看个图先:
 
       ForkJoin模型是指一些任务在执行过程中会分裂出一些子任务(或子过程),而且这些子任务是可以并行执行的(相互没有制约),子任务们执行完毕后合并到主任务中,主任务继续执行。这种模型本身也体现了分而治之的思想。
 
       能体现ForkJoin模型的典型栗子就是归并排序,归并排序这个老黄历就不解释了,还是看个图吧: 


         注:图片来之http://www.cnblogs.com/horizonice/p/4102553.html
 
       可见在归并过程中,会不断分裂出许多子任务,而同一层级子合并任务之间不会相互制约,可以并行执行。
       ForkJoin框架就是针对这种模型,提供的一套并行计算框架,可以很好的利用多核的能力来挖掘任务的并行性。目的是缩短任务的总体执行时间,同时也能充分的利用计算机资源。
       注:ForkJoin是一个单机框架,类似的分布式的框架有Hadoop这类的,它们的计算模型是MapReduce,体现了和ForkJoin一样的思想-分而治之。
 
  • 能不能给个ForkJoin框架的使用示例?
       为了更关注于框架本身,这里给一个最简单的示例,计算整数1到n的累加结果。(假设我们不知道高斯求和公式...)
       首先需要定义一个任务:
public class SumTask extends RecursiveTask<Long>{
	private static final int THRESHOLD = 10;
	
	private long start;
	private long end;
	
	public SumTask(long n) {
		this(1,n);
	}
	
	private SumTask(long start, long end) {
		this.start = start;
		this.end = end;
	}
	@Override
	protected Long compute() {
		long sum = 0;
		if((end - start) <= THRESHOLD){
			for(long l = start; l <= end; l++){
				sum += l;
			}
		}else{
			long mid = (start + end) >>> 1;
		    SumTask left = new SumTask(start, mid);
		    SumTask right = new SumTask(mid + 1, end);
			left.fork();
			right.fork();
			sum = left.join() + right.join();
		}
		return sum;
	}
	private static final long serialVersionUID = 1L;
}
       可见我们定义的任务首先要继承RecursiveTask类。简单说明一下RecursiveTask,它表示一种有返回值的ForkJoin任务,对应的无返回值(比如给数组排序这种任务)的版本是RecursiveAction。
       代码中定义了一个阀值10,也就是说我们会把1-n拆分成1-10、10-20...这样的长度小于10的分段(代码中fork出子任务),然后分别计算这些分段和,然后再不断汇总(通过join来获取子任务的结果),得到最终的结果。
 
       任务定义好了,下面我们来执行这个任务: 
	public static void main(String[] args) {
		ForkJoinPool forkJoinPool = new ForkJoinPool();
		//计算1-100的和。
		SumTask sumTask = new SumTask(100);
		ForkJoinTask<Long> forkJoinTask = forkJoinPool.submit(sumTask);
		try {
			long result = forkJoinTask.get();
			System.out.println("result="+result);
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}
	}
       输出结果: 
result=5050
       通过执行代码可见,我们会将任务提交到ForkJoinPool中,等待任务被执行完毕后,获取执行结果。是不是发现这种使用方式和JDK线程池的使用方式有些类似,没错,ForkJoinPool也是ExecutorService的一种实现并且ForkJoinTask也是Future的一种实现。
       注:JDK1.8中的Arrays提供了并行排序(parallelSort),就是利用ForkJoin框架来做的。
 
  • ForkJoin框架内部组件及工作原理
       ForkJoin框架包括下面这些类:
              ForkJoinPool
              ForkJoinWorkerThread
              ForkJoinTask
              RecursiveTask
              RecursiveAction
 
       ForkJoinPool充当ForkJoin工作线程池的角色,管理和维护ForkJoin的工作线程,也管理一部分任务。
       ForkJoinWorkerThread就是执行ForkJoin任务的工作线程。
       ForkJoinTask是ForkJoin任务,它定义ForkJoin任务的一些基础行为。
       RecursiveTask和RecursiveAction都继承自ForkJoinTask,分别表示有返回值和无返回值的任务。
 
      ForkJoin框架的主要逻辑功能都实现在ForkJoinPool、ForkJoinWorkerThread和ForkJoinTask中,RecursiveTask和RecursiveAction中只是简单的做了返回值的逻辑,然后开放了compute方法交由子类去实现具体的计算逻辑。
 
       ForkJoin框架内部采用了work-stealing(工作窃取)模式:每个工作线程都有自己的任务队列(一般是双端队列),工作线程首先会处理自己任务队列里面的任务,如果自己任务队列中没有任务了,会从其他工作线程的任务队列中窃取任务来执行。
       注:关于工作窃取的理论可以看下《多处理器编程的艺术》中的第16章相关内容,另外本书包含了JUC包实现的很多理论基础,虽然比较枯燥,但很值得一读。
 
  • ForkJoin框架内部重要类概览
       下面对ForkJoinPool、ForkJoinWorkerThread和ForkJoinTask这三个类做下概览,后续的源码分析也主要针对这三个类进行。
 
       首先看下ForkJoinPool,这个类是整个ForkJoin框架的主体,简单看下内部一些重要的域: 
    /**
     * ForkJoinPool的总控制信息,包含在一个long里面:
     * AC: 表示当前活动的工作线程的数量减去并行度得到的数值。(16 bits)
     * TC: 表示全部工作线程的数量减去并行度得到的数值。(16bits)
     * ST: 表示当前ForkJoinPool是否正在关闭。(1 bit)
     * EC: 表示Treiber stack顶端的等待工作线程的等待次数。(15 bits)
     * ID: Treiber stack顶端的等待工作线程的下标取反(16 bits)
     *
     * 1111111111111111 1111111111111111  1  111111111111111 1111111111111111
     * AC               TC                ST EC              ID 
     *
     * 如果AC为负数,说明没有足够的活动工作线程。
     * 如果TC为负数,说明工作线程数量没达到最大工作线程数量。
     * 如果ID为负数,说明至少有一个等待的工作线程。
     * 如果(int)ctl为负数,说明ForkJoinPool正在关闭。
     */
    volatile long ctl;
    // bit positions/shifts for fields
    private static final int  AC_SHIFT   = 48;
    private static final int  TC_SHIFT   = 32;
    private static final int  ST_SHIFT   = 31;
    private static final int  EC_SHIFT   = 16;
    // bounds
    private static final int  MAX_ID     = 0x7fff;  // max poolIndex
    private static final int  SMASK      = 0xffff;  // mask short bits
    private static final int  SHORT_SIGN = 1 << 15;
    private static final int  INT_SIGN   = 1 << 31;
    // masks
    private static final long STOP_BIT   = 0x0001L << ST_SHIFT;
    private static final long AC_MASK    = ((long)SMASK) << AC_SHIFT;
    private static final long TC_MASK    = ((long)SMASK) << TC_SHIFT;
    // units for incrementing and decrementing
    private static final long TC_UNIT    = 1L << TC_SHIFT;
    private static final long AC_UNIT    = 1L << AC_SHIFT;
    // masks and units for dealing with u = (int)(ctl >>> 32)
    private static final int  UAC_SHIFT  = AC_SHIFT - 32;
    private static final int  UTC_SHIFT  = TC_SHIFT - 32;
    private static final int  UAC_MASK   = SMASK << UAC_SHIFT;
    private static final int  UTC_MASK   = SMASK << UTC_SHIFT;
    private static final int  UAC_UNIT   = 1 << UAC_SHIFT;
    private static final int  UTC_UNIT   = 1 << UTC_SHIFT;
    // masks and units for dealing with e = (int)ctl
    private static final int  E_MASK     = 0x7fffffff; // no STOP_BIT
    private static final int  EC_UNIT    = 1 << EC_SHIFT;
       ctl是ForkJoinPool中最重要的,也是设计最精密的域,它是整个ForkJoinPool的总控信息。所有信息包含在一个long(64bit)中,这些信息包括:当前活动的工作线程数量、当前总的工作线程数量、ForkJoinPool的关闭标志、在Treiber stack(由全部等待工作线程组成的一个链)顶端等待的工作线程的等待次数、Treiber stack(由全部等待工作线程组成的一个链)顶端等待的工作线程的ID信息(工作线程的下标取反)。
       ctl还有一个相对不重要的作用就是,某些非volatile域会依赖ctl来保证可见性。 
 
    ForkJoinWorkerThread[] workers;
       workers是ForkJoinPool中保存工作线程的数组,它的更新会由一个锁(scanGuard)来保护。 
 
    volatile int scanGuard;
       scanGuard是另外一个比较重要的域,它有两个作用:1.作为更新工作线程数组是使用的(顺序)锁;2.作为扫描工作线程数组时使用的边界值来避免一些没用的扫描(当数组过大时)。 
 
    private static final int INITIAL_QUEUE_CAPACITY = 8;

    private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M

    private ForkJoinTask<?>[] submissionQueue;

    private final ReentrantLock submissionLock;
       ForkJoinPool中也有一个队列-submissionQueue,这个队列里存放的是有外部(非ForkJoin工作线程)提交到ForkJoinPool中的任务。
    volatile int queueBase;

    int queueTop;
       这两个域分别表示submissionQueue的顶部和底部。  
 
    final boolean locallyFifo;
       locallyFifo域也比较重要,它有ForkJoinPool的构造方法的参数asyncMode来指定。如果locallyFifo为true,表示内部将才用FIFO的方式来调度任务队列中的任务,而且这些任务可以分裂(fork),但最好不要合并(join),这种模式很适合来处理事件形式(event-style)的异步任务。默认locallyFifo为false。
 
       ForkJoinPool中一些比较重要的域就简单介绍到这里,后续的源码解析中还会详细分析。
 
       再看下ForkJoinWorkerThread,这个类是ForkJoin框架中负责执行具体任务的工作线程,简单看下内部一些重要的域: 
    private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;

    private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M

    ForkJoinTask<?>[] queue;

    int queueTop;

    volatile int queueBase;
       queue就是ForkJoinWorkerThread中的任务队列,当从其他工作线程中窃取任务时,就是从这个队列中进行窃取。 
 
    final int poolIndex;
       工作线程在ForkJoinPool中工作线程数组中的下标。
 
    int stealHint;
       stealHint保存了最近的窃取者(来窃取任务的工作线程)的下标(poolIndex)。注意这个值不准确,因为可能同时有很多窃取者来窃取任务,这个值只能记录其中之一。  
 
    int nextWait;
       nextWait算是比较难理解的一个域。首先所有的等待工作线程组成了一个隐式的单链(代码中也叫Treiber stack,由于行为类似于栈),链顶端的等待工作线程的信息保存在Pool的ctl中,新来的等待工作线程会将ctl中之前的等待工作线程信息保存到nextWait上,然后将自己的信息设置到ctl上。 
 
    final boolean locallyFifo;
       这个和Pool中的locallyFifo一致。 
 
    ForkJoinTask<?> currentSteal;
       当前工作线程最新窃取的任务。注意可以从其他工作线程的任务队列或者从Pool中的提交任务队列(submissionQueue)中窃取任务。 
 
    ForkJoinTask<?> currentJoin;
       当前工作线程正在合并的任务。
 
       最后看下ForkJoinTask,这个类表示ForkJoin框架中执行的任务,简单看下内部域: 
    /** The run status of this task */
    volatile int status; // accessed directly by pool and workers
    private static final int NORMAL      = -1;
    private static final int CANCELLED   = -2;
    private static final int EXCEPTIONAL = -3;
    private static final int SIGNAL      =  1;
       ForkJoinTask中只有一个表示运行状态的域。初始为0;1表示在等待被唤醒;负数都表示执行完毕,-1表示正常完成、-2表示被取消、-3表示异常结束。
 
  • 最后
       本篇的内容到此结束,由于本系列侧重于源码分析,所以这里只是简单的介绍了一下ForkJoin框架。
       后续的文章会进行源码细节的分析,ForkJoin框架为了性能的提升,源码中充斥着耦合、内联、位操作、细节优化等情况,再加上框架本身比较精密的设计,所以源码读起来会有一点点蛋疼,所以请务必了解下本篇介绍的ForkJoinPool大体的工作模式和组件类中一些重要域的作用。
       后续分析文章不会按照类为单位来分析(由于太耦合,这样分析会屎),而是按照一些运行过程来分析,请先了解下大体的执行过程。 
 
 
  • 大小: 44.6 KB
  • 大小: 14.1 KB
分享到:
评论

相关推荐

    jdk1.7官网 jdk-7u80-linux-x64.tar.gz.zip

    1. **Fork/Join框架**:这是一个并行计算的框架,用于将大任务拆分为小任务,利用多核处理器的优势提高性能。 2. **动态类型语言支持**:JDK 7通过JSR 292(invokedynamic指令)增强了对动态语言的支持,使得Java...

    java-jdk1.7-jdk-7u80-windows-x64.zip

    7. **Fork/Join框架**:这个并行计算框架允许开发者将大任务分解为小任务,并行执行,从而提高程序的运行效率。 安装Java JDK 1.7 on Windows x64的步骤非常简单,只需双击下载的“jdk-7u80-windows-x64.exe”文件...

    jdk1.7 64位官方版 jdk-7u79-linux-x64.tar.gz

    2. 使用解压命令(如tar -zxvf jdk-7u79-linux-x64.tar.gz)将内容解压到指定目录。 3. 设置环境变量,例如在~/.bashrc或~/.bash_profile文件中添加以下行: ```bash export JAVA_HOME=/path/to/your/jdk1.7.0_79 ...

    jdk-1.7-windows-32-1

    4部分: jdk-1.7-windows-32-1 jdk-1.7-windows-32-2 jdk-1.7-windows-32-3 jdk-1.7-windows-32-4

    jdk1.7版本-----------

    在并发处理方面,JDK 1.7引入了Fork/Join框架,这是一个并行计算的框架,适用于那些可以分解为较小子任务的问题。它基于工作窃取算法,能够有效利用多核处理器的优势。 在文件系统API方面,Java 7引入了新的NIO.2...

    jdk-1.7-windows-64-02

    三部分: jdk-1.7-windows-64-01 jdk-1.7-windows-64-02 jdk-1.7-windows-64-03

    jdk1.7 64位官方正式版 jdk-7u71-macosx-x64.dmg

    jdk1.7 64位官方正式版 jdk-7u71-macosx-x64.dmg

    java jdk1.7源码包,用于centos7使用jdk1.7编译openjdk1.8的 1.7版本

    7. **并发编程改进**:`Fork/Join`框架引入,用于高效地执行并行任务,以及`ConcurrentHashMap`的性能提升。 8. **动态语言支持**:JDK 7引入了`invokedynamic`指令,为Java虚拟机提供更灵活的调用机制,便于动态...

    jdk-1.7-linux-32-1

    三部分: jdk-1.7-linux-32-1 jdk-1.7-linux-32-2 jdk-1.7-linux-32-3

    jdk-1.7-linux-64-03

    三部分: jdk-1.7-linux-64-01 jdk-1.7-linux-64-02 jdk-1.7-linux-64-03

    jdk1.7 64位官方正式版 jdk-7u79-linux-x64

    - 解压下载的`jdk-7u79-linux-x64`压缩包到适当目录。 - 配置环境变量`JAVA_HOME`,`PATH`和`CLASSPATH`,使系统能够找到JDK。 - 使用`java -version`命令检查安装是否成功。 5. **开发和运行Java程序**: - ...

    jdk1.7 64位 Linux版 jdk-7u79-linux-x64.tar.gz

    标题"jdk1.7 64位 Linux版 jdk-7u79-linux-x64.tar.gz"明确指出我们讨论的是Java Development Kit (JDK) 的1.7版本,专为64位Linux操作系统设计。文件名"jdk-7u79-linux-x64.tar.gz"表明这是一个压缩文件,采用tar...

    springboot+mybatis+jdk1.7

    - JDK 1.7引入了一些重要的特性,如try-with-resources语句(自动关闭资源)、钻石操作符()用于泛型实例化、多路返回值(Fork/Join框架)、类型推断增强(钻石运算符)等。 - 虽然JDK 1.7已经较旧,但其稳定性和...

    jdk-1.7-linux-64-01

    三部分: jdk-1.7-linux-64-01 jdk-1.7-linux-64-02 jdk-1.7-linux-64-03

    jdk-1.7-windows-32-2

    4部分: jdk-1.7-windows-32-1 jdk-1.7-windows-32-2 jdk-1.7-windows-32-3 jdk-1.7-windows-32-4

    JDK1.7 64位 官方正版 jdk-7u80-macosx-x64.dmg

    文件比较大,给了百度云连接,直接下载 JDK1.7 64位 官方正版 jdk-7u80-macosx-x64.dmg 官方网站版本 苹果64位操作系统JDK

    jdk1.7 官方正式版32位下载(jdk-7u45-windows-i586.exe)

    jdk1.7 官方正式版32位下载 JDK详细介绍 JDK(Java Development Kit) 是 Java 语言的软件开发工具包(SDK)。 SE(J2SE),standard edition,标准版,是我们通常用的一个版本,从JDK 5.0开始,改名为Java SE。 EE(J2EE)...

    jdk-1.7-java-7-openjdk-amd64.zip

    标题中的"jdk-1.7-java-7-openjdk-amd64.zip"表明这是一个Java开发工具包(JDK)的压缩文件,版本为1.7,适用于AMD64架构的Linux系统。OpenJDK是Java Development Kit的一个开源实现,由Oracle公司支持并维护。这个...

    jdk1.7 32位官方正式版 jdk-7u79-windows-i586 下载

    jdk-7u79-windows-i586.exe JDK7 稳定版 源官方下载地址: http://www.oracle.com/technetwork/java/javase/downloads/jdk7-downloads-1880260.html

Global site tag (gtag.js) - Google Analytics