详见: http://blog.yemou.net/article/query/info/tytfjhfascvhzxcytp85
根据前文描述的Doug Lea的理论基础,在JDK1.7中已经给出了Fork Join的实现。在Java SE 7的API中,多了ForkJoinTask、ForkJoinPool、ForkJoinWorkerThread、RecursiveAction、RecursiveTask这样5个类。本文就对JDK1.7中增加这5个工具类实现做简要分析。
0. JDK中ForkJoin实现概述
在JavaSE7的API和JDK1.7中,分别集成了支持ForkJoin的五个类:
- ForkJoinPool 实现ForkJoin的线程池
- ForkJoinWorkerThread 实现ForkJoin的线程
- ForkJoinTask<V> 一个描述ForkJoin的抽象类
- RecursiveAction 无返回结果的ForkJoinTask实现
- RecursiveTask<V> 有返回结果的ForkJoinTask实现
ForkJoinPool维护了多个线程构成的数组,维护了任务提交队列,给出了多个线程之间工作窃取的实现。给出了任务类型适配,和提交任务逻辑的实现。需要和线程紧密配合。
而ForkJoinWorkerThread则继承了java.lang.Thread类,维护了线程自己的队列,同一个任务fork()操作原则上会添加到同一个线程队列中。而这个线程类需要和ForkJoinPool紧密合作,有指向对应ForkJoinPool对象的引用。
ForkJoinTask则实现了Future接口,除了对接口的实现外,主要是fork()和join()操作。注意,貌似fork()只有ForkJoinWorkerThread 中才能执行。
两个子类RecursiveAction和RecursiveTask则实现比较简单,区别就在于返回值的处理不同。
1. ForkJoinPool
ForkJoinPool是实现了 Fork Join 的线程池。看JDK源码我们知道ForkJoinPool是extends AbstractExecutorService的,也就是说间接地实现了Executor和ExecutorService接口。实际上也就意味着ForkJoinPool是继ThreadPoolExecutor后的又一个Executor(Service)的具体实现。
1.1. 构建初始化
我们先看ForkJoinPool的构造方法,一共有3个重载的实现。有一个单参数的默认实现,通常我们使用这个就足够了,这最终会以默认的参数调用3参数的构造方法。我们再来看3个参数的构造方法实现。其中:
- int parallelism 第一个参数是并行度,这个参数简介影响着(会额外做一些运算)这个ForkJoinPool的ForkJoinWorkerThread 线程数。默认情况下,这个参数是任务运行环境的处理器个数,比如系统提供的处理器数目为4,初始化线程池会开启16个线程。
- ForkJoinWorkerThreadFactory factory 这个是ForkJoinPool构建新线程ForkJoinWorkerThread 对象的工厂,类似于ThreadPoolExecutor中用到的ThreadFactory。
- Thread.UncaughtExceptionHandler handler 这个前面并发的文章页提到过,是线程异常处理器,这里不多说了。
1.2. 任务提交
前面已经提到,ForkJoinPool也是Executor(Service)的实现,那么execute()和submit()这样向ThreadPoolExecutor提交任务的方法对于ForkJoinPool来说也是一样有效的。
需要说明的是,除了增加支持ForkJoinTask对象参数的重载实现外,还在Runnable和Callable参数的方法中对原始的Runnable和Callable对象做了到ForkJoinTask的适配,使用的分别是ForkJoinTask的静态内部类AdaptedRunnable和AdaptedCallable的对象。而这两个类型参数对应的方法最终都会调用ForkJoinTask参数的方法:
1
2
3
4
5
6
|
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
if (task == null )
throw new NullPointerException();
forkOrSubmit(task);
return task;
} |
我们接下来再看下任务提交中被调用到的forkOrSubmit()方法:
1
2
3
4
5
6
7
8
9
10
11
|
private <T> void forkOrSubmit(ForkJoinTask<T> task) {
ForkJoinWorkerThread w;
Thread t = Thread.currentThread();
if (shutdown)
throw new RejectedExecutionException();
if ((t instanceof ForkJoinWorkerThread) &&
(w = (ForkJoinWorkerThread)t).pool == this )
w.pushTask(task);
else
addSubmission(task);
} |
逻辑很容易理解,先判断ForkJoinPool的状态,若已停止,则抛异常返回。之后如果当前线程是ForkJoinWorkerThread类型的,则将任务追加到ForkJoinWorkerThread对象中维护的队列上,否则将新的任务放入ForkJoinPool的提交队列中,并通知线程工作。
1.3. 线程的启动和工作
前面已经强调过,ForkJoinPool和ForkJoinWorkerThread是紧密相关,耦合在一起的。Thread的start()会调用run(),而ForkJoinWorkerThread类重写了run()方法,会调用对应的线程池ForkJoinPool对象的work()方法。
我们来看一下work()方法的实现。
1
2
3
4
5
6
7
8
9
10
11
|
final void work(ForkJoinWorkerThread w) {
boolean swept = false ; // true on empty scans
long c;
while (!w.terminate && ( int )(c = ctl) >= 0 ) {
int a; // active count
if (!swept && (a = ( int )(c >> AC_SHIFT)) <= 0 )
swept = scan(w, a);
else if (tryAwaitWork(w, c))
swept = false ;
}
} |
里面主要是一个while循环体,只要当前的线程和线程池不是处于终止状态,则这个循环一直执行。执行的内容则是这样的,如果能够根据scan()方法得到任务,并执行,否则进入阻塞状态。
我们来看一下scan()方法的实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
private boolean scan(ForkJoinWorkerThread w, int a) {
int g = scanGuard; // mask 0 avoids useless scans if only one active
int m = (parallelism == 1 - a && blockedCount == 0 ) ? 0 : g & SMASK;
ForkJoinWorkerThread[] ws = workers;
if (ws == null || ws.length <= m) // staleness check
return false ;
for ( int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {
ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
ForkJoinWorkerThread v = ws[k & m];
if (v != null && (b = v.queueBase) != v.queueTop &&
(q = v.queue) != null && (i = (q.length - 1 ) & b) >= 0 ) {
long u = (i << ASHIFT) + ABASE;
if ((t = q[i]) != null && v.queueBase == b &&
UNSAFE.compareAndSwapObject(q, u, t, null )) {
int d = (v.queueBase = b + 1 ) - v.queueTop;
v.stealHint = w.poolIndex;
if (d != 0 )
signalWork(); // propagate if nonempty
w.execTask(t);
}
r ^= r << 13 ; r ^= r >>> 17 ; w.seed = r ^ (r << 5 );
return false ; // store next seed
}
else if (j < 0 ) { // xorshift
r ^= r << 13 ; r ^= r >>> 17 ; k = r ^= r << 5 ;
}
else
++k;
}
if (scanGuard != g) // staleness check
return false ;
else { // try to take submission
ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
if ((b = queueBase) != queueTop &&
(q = submissionQueue) != null &&
(i = (q.length - 1 ) & b) >= 0 ) {
long u = (i << ASHIFT) + ABASE;
if ((t = q[i]) != null && queueBase == b &&
UNSAFE.compareAndSwapObject(q, u, t, null )) {
queueBase = b + 1 ;
w.execTask(t);
}
return false ;
}
return true ; // all queues empty
}
} |
看起来很复杂,实际的原理则很简单,就是先尝试做任务窃取( Work Stealing ),如果不满足条件则到提交队列中获取任务。而ForkJoinWorkerThread线程本身也维护了线程内fork和join任务操作得到的队列,结合起来,总体执行任务的顺序就是:
- 线程会先执行ForkJoinWorkerThread对象内维护的任务队列中的任务,即ForkJoinWorkerThread的execTask()方法中的循环实现。通常是LIFO,即去最新的任务。也有特殊情况,这个根据变量locallyFifo的值来判断。
- 之后会尝试做任务窃取,尝试从其他线程中获取任务
- 任务窃取条件不满足时,到提交队列中获取提交的任务
1.4. ForkJoinPool的其它属性
除了上述提到的操作,ForkJoin中还维护了
- 线程数组和提交任务的队列,这是最基本的
- 操作相关的锁和条件对象
- volatile long ctl; 等线程池ForkJoinPool状态的属性
- static final Random workerSeedGenerator; 等和任务窃取策略相关的一系列属性
- private volatile long stealCount; 等数据统计相关属性
等数据属性。
2. ForkJoinWorkerThread
ForkJoinWorkerThread扩展于Thread类,但提供了很多支持ForkJoin的特性。
上文在介绍ForkJoinPool的时候已经对这个类做了很多描述,也强调过线程类ForkJoinWorkerThread和ForkJoinPool相互依赖,放在一起才有意义。实际上,还要提到描述Fork Join任务的类ForkJoinTask。
除了上面提到的以外,对于ForkJoinWorkerThread这个类,再稍微提一下这样几个点:
- ForkJoinTask<?>[] queue; 这是维护和ForkJoin相关的(子)任务队列,还有queueTop和queueBase属性,分别标记队列的尾部和头部
- final ForkJoinPool pool; 指向线程池的引用,需要注意的是,这个属性被final修饰
- 和ForkJoinTask的fork()和join()方法相关的方法——pushTask()和unpushTask(),分别负责在当前ForkJoinWorkerThread对象维护的队列中新增和取回任务
- 其它与状态和统计相关的属性
3. ForkJoinTask及两个抽象子类
ForkJoinTask是ForkJoin框架中的主体,是ForkJoin中任务的体现。这个类实现了Future和Serializable接口。除了Futrue接口要满足的方法外,我想有这样3个方法是有必要知道的,分别是fork()、join()和exec()。
对于fork(),这个也许大家都很熟悉了,在这里也就是分解出子任务的执行。这个在实现上很简单那,就是在当前线程ForkJoinWorkerThread对象维护的队列中加入新的子任务。实现如下:
1
2
3
4
5
|
public final ForkJoinTask fork() {
((ForkJoinWorkerThread) Thread.currentThread())
.pushTask( this );
return this ;
} |
需要注意的是fork()方法的调用是在当前线程对象为ForkJoinWorkerThread的条件下。
我们再来看看对应的join()实现:
1
2
3
4
5
6
|
public final V join() {
if (doJoin() != NORMAL)
return reportResult();
else
return getRawResult();
} |
显然,它有调用了doJoin()方法,我们再来深入了解下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
private int doJoin() {
Thread t; ForkJoinWorkerThread w; int s; boolean completed;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
if ((s = status) < 0 )
return s;
if ((w = (ForkJoinWorkerThread)t).unpushTask( this )) {
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
return setCompletion(NORMAL);
}
return w.joinTask( this );
}
else
return externalAwaitDone();
} |
大概的逻辑是这样的,在当前线程对象为ForkJoinWorkerThread的条件下,从队列中取回当前任务ForkJoinTask对象,并尝试在调用线程对其直接执行,否则当前线程调用wait()阻塞等待。更深入的理解可续继续查阅源码。
最后,我们再来看看exec()方法,这个是在ForkJoinTask中是没有给出实现的。
在JDK中,有ForkJoinTask的两个抽象子类RecursiveAction和RecursiveTask,他们分别给出了exec()的实现,这也是这两个子类主要做的事情,实际上是调用了各自的compute()方法,而在RecursiveAction和RecursiveTask中compute()又是未给出实现的。
实际上,compute()方法就是Fork Join要执行的内容,是Fork Join任务的实质,需要开发者给出。
而RecursiveAction和RecursiveTask就是方便开发者使用Fork Join的,RecursiveAction和RecursiveTask这两个类的区别仅仅是返回结果的情况不同。而这个compute()方法就是留给开发者继承扩展使用的。这个会在下篇文章详细讲述。
相关推荐
ForkJoin并发框架是Java 7引入的一种高效并行计算框架,它基于分而治之(Divide and Conquer)的策略,适用于处理大量可分割的任务。这个框架的核心类是`ForkJoinPool`和`ForkJoinTask`,它们为开发者提供了创建和...
Fork/Join框架是Java7引入的一种用于并行任务执行的框架,它允许将复杂任务拆分成多个子任务,并行执行,然后通过join操作将结果聚合。Fork/Join框架特别适合处理可以递归拆分的计算密集型任务,比如大数据集的搜索...
良葛格————JavaJDK5.0学良葛格————JavaJDK5.0学习笔记PDF.rar习笔记PDF.rar良葛格良葛格————JavaJDK5.0学习笔记PDF.rar————JavaJDK5.0学习笔记PDF.rar良葛格————JavaJDK5.0学习笔记PDF.rar良...
在Java编程中,线程池和并发任务执行是常见的方式,但在某些场景下,我们可以利用JDK的ForkJoin框架来构建更加高效和易用的并发组件。ForkJoin框架自Java 7引入,它为处理大型任务提供了一种分解成多个子任务并行...
在本资源中,我们主要探讨的是“Java程序设计——基于JDK 6和NetBeans实现”的主题。这是一份与书本配套的源代码,旨在帮助读者深入理解Java编程,并通过实际操作提升技能。让我们详细地了解这个主题涵盖的知识点。 ...
《精通JAVA——JDK》是一本深度探讨Java编程语言及其开发工具集JDK的专业教程,旨在帮助Java开发者提升技能,实现技术的精进。作为Java的基石,JDK(Java Development Kit)是学习和开发Java应用程序必不可少的工具...
JDK 7 中引入的 Fork/Join 模式是一种基于分治策略的并行编程模型,旨在简化在多核处理器环境下实现高效的并行计算。这一模式的核心思想是将复杂的大任务拆分成一系列小任务,然后将这些小任务并行执行,最后再合并...
8. **Java开发工具**:除了基本的JVM和编译器,JDK还提供了如JConsole、JVisualVM等工具,帮助开发者监控和分析Java应用程序的性能。 9. **安全性增强**:JDK 1.8在安全性方面也有加强,例如对SSL/TLS协议的升级,...
10. **并发改进**:Java 8在并发编程方面也有优化,如Fork/Join框架的改进,以及并发集合类的增强,如ConcurrentHashMap的性能提升。 以上就是Java JDK 1.8的关键知识点,无论是64位还是32位版本,都为开发者提供了...
Java JDK 1.8.0_101是Oracle公司发布的Java Development Kit的一个版本,它包含了Java编程语言的运行环境和开发工具。这个版本对于Java开发者来说至关重要,因为它提供了编译、调试和运行Java应用程序所需的所有组件...
这是一个集成了jre,tomcat,mysql的绿色运行环境,解压之后就可以直接运行web(只要会点鼠标),不需要用户自己安装jre、tomcat、mysql,一键到位,看起来像是桌面程序的web应用.
此外,还增强了并发编程的工具,如Fork/Join框架,提高了多线程处理任务的效率。 安装JDK 7u65的过程通常包括以下步骤: 1. 下载适用于64位Windows系统的jdk-7u65-windows-x64.exe文件。 2. 双击执行安装程序,按照...
3. **多线程改进**:JDK 7提供了`Fork/Join`框架,这是一种基于工作窃取算法的并行编程模型,用于高效地执行大量可分解的任务。 4. **try-with-resources语句**:这个新特性允许自动关闭实现了`AutoCloseable`接口...
在IT行业中,JavaJDK是Java开发工具集的简称,它是Java编程语言的基础。这个压缩包文件的主题聚焦于“精通JavaJDK、数据库系统开发以及Web开发程序源文件”,这暗示了它包含了一系列用于学习和实践这三个核心领域的...
6. **改进的并发工具**: 如Fork/Join框架,用于并行执行任务,以及`java.util.concurrent`包中新增的`CompletableFuture`类,方便处理异步计算结果。 7. **try-with-resources**: 这个新特性确保了资源(如文件、...
3. **多线程并行流**:Java 7引入了Fork/Join框架和并行流API,提供了处理大量数据的并行计算能力,提高了数组和集合操作的效率,如并行排序。 4. **钻石运算符**:在创建匿名内部类或使用泛型时,可以省略掉类型...
在多线程处理上,JDK 1.7增强了Fork/Join框架,这是一个并行计算模型,用于分割大型任务并行执行,然后合并结果。它通过RecursiveTask和RecursiveAction类实现,对于大数据处理和计算密集型应用非常有用。 此外,...
7. **Fork/Join框架**:这个并行计算框架允许开发者将大任务分解为小任务,并行执行,从而提高程序的运行效率。 安装Java JDK 1.7 on Windows x64的步骤非常简单,只需双击下载的“jdk-7u80-windows-x64.exe”文件...
例如,动态语言支持、Fork/Join框架、并发工具的增强以及新的编译器——CTW(Compartment Type-Wise)等。这些更新显著提高了开发效率和程序性能。 针对Aarch64(也称为ARM64或AArch64)架构,这是一种64位指令集...
在并发编程方面,JDK 1.7引入了Fork/Join框架,这是一个用于并行计算的高级框架,它基于分治策略,可以有效地利用多核处理器的计算能力。`java.util.concurrent.ForkJoinPool`和`java.util.concurrent.RecursiveTask...