在JDK1.7中,JDK的并发包下新增了一个框架:ForkJoin框架(它只是一种思想,别一听框架就又懵了)。Fork在英文中是叉子的意思,Join是最后归集的意思,我们可以想象一下,你用叉子(ForkJoin框架)去拿一块牛排,分叉的部分合力将牛排叉住(共同处理任务),把儿的部分(Join)可以让我们用它来拿起牛排放到嘴里(最后的结果)。。。。
上面描述了一个过程,我们把这个过程叫做:分而治之。见下图
接触过大数据的童鞋应该都知道,分而治之方法是一个非常有效的处理大量数据的方法。大数据中MapReduce就是采用了这种方法,将任务分拆,使用多个节点去处理部分任务,然后再用归集节点处理最后的结果。
所有好的思想都是通用的,它们来源于生活,抽象于生活(都是聪明人,想着怎么高效怎么来)。
JDK1.7中提供了一种新的线程池:ForkJoinPool。我们使用前面ThreadPoolExecutor的分析方法,来分析下这个线程池。
一、ForkJoinPool的核心构造函数
ForkJoinPool核心构造函数,无一例外的还是最后一个最长的。我们看下它们源码。
public ForkJoinPool() { this(Runtime.getRuntime().availableProcessors(), defaultForkJoinWorkerThreadFactory, null, false); } public ForkJoinPool(int parallelism) { this(parallelism, defaultForkJoinWorkerThreadFactory, null, false); } /** * Creates a {@code ForkJoinPool} with the given parameters. * * @param parallelism the parallelism level. For default value, * use {@link java.lang.Runtime#availableProcessors}. * @param factory the factory for creating new threads. For default value, * use {@link #defaultForkJoinWorkerThreadFactory}. * @param handler the handler for internal worker threads that * terminate due to unrecoverable errors encountered while executing * tasks. For default value, use {@code null}. * @param asyncMode if true, * establishes local first-in-first-out scheduling mode for forked * tasks that are never joined. This mode may be more appropriate * than default locally stack-based mode in applications in which * worker threads only process event-style asynchronous tasks. * For default value, use {@code false}. * @throws IllegalArgumentException if parallelism less than or * equal to zero, or greater than implementation limit * @throws NullPointerException if the factory is null * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads * because it does not hold {@link * java.lang.RuntimePermission}{@code ("modifyThread")} */ public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, Thread.UncaughtExceptionHandler handler, boolean asyncMode) { checkPermission(); if (factory == null) throw new NullPointerException(); if (parallelism <= 0 || parallelism > MAX_ID) throw new IllegalArgumentException(); this.parallelism = parallelism; this.factory = factory; this.ueh = handler; //设置每个处理线程内任务队列的处理方式,FIFO or LIFO this.locallyFifo = asyncMode; long np = (long)(-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); //子任务的队列初始大小为INITIAL_QUEUE_CAPACITY,容量为8 this.submissionQueue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; // initialize workers array with room for 2*parallelism if possible int n = parallelism << 1; if (n >= MAX_ID) n = MAX_ID; else { // See Hackers Delight, sec 3.2, where n < (1 << 16) n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; } //设置线程池中工作线程初始大小默认为16个,n经过运算后为15 workers = new ForkJoinWorkerThread[n + 1]; this.submissionLock = new ReentrantLock(); this.termination = submissionLock.newCondition(); StringBuilder sb = new StringBuilder("ForkJoinPool-"); sb.append(poolNumberGenerator.incrementAndGet()); sb.append("-worker-"); this.workerNamePrefix = sb.toString(); }
重要参数解释(我们还是结合英文注释来看):
①parallelism:并行度( the parallelism level),默认情况下跟我们机器的cpu个数保持一致,使用 Runtime.getRuntime().availableProcessors()可以得到我们机器运行时可用的CPU个数(For default value,use {@link java.lang.Runtime#availableProcessors})
②factory:创建新线程的工厂( the factory for creating new threads)。默认情况下使用ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory( For default value,use {@link #defaultForkJoinWorkerThreadFactory})
③handler:线程异常情况下的处理器(Thread.UncaughtExceptionHandler handler),该处理器在线程执行任务时由于某些无法预料到的错误而导致任务线程中断时进行一些处理,默认情况为null。( the handler for internal worker threads that terminate due to unrecoverable errors encountered while executing tasks. For default value, use {@code null}.)
④asyncMode:这个参数要注意,在ForkJoinPool中,每一个工作线程都有一个独立的任务队列,asyncMode表示工作线程内的任务队列是采用何种方式进行调度,可以是先进先出FIFO,也可以是后进先出LIFO。如果为true,则线程池中的工作线程则使用先进先出方式进行任务调度,默认情况下是false。( if true,establishes local first-in-first-out scheduling mode for forked tasks that are never joined. This mode may be more appropriate than default locally stack-based mode in applications in which worker threads only process event-style asynchronous tasks. For default value, use {@code false}.)
二、核心源码解析
在ForkJoinPool中,我们常会使用到3个方法:submit()方法负责大任务提交,fork()负责将任务分拆,join()负责将任务执行结果归集。
以上三个方法都涉及到了任务,在ForkJoinPool中,任务的类型可以是普遍使用的ForkJoinTask,也可以是RecursiveAction和RecursiveTask<V>这两个ForkJoinTask的子类去执行特定类型(Recursive,递归调用)的任务。RecursiveAction表示没有返回值的任务,RecursiveTask<V>表示携带返回值result的任务(The result of the computation.)。
我们以一个示例开始深入理解ForkJoinPool的工作原理。
示例:计算1-5000的求和。
package com.sitech.threadPool; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; public class ForkJoinPoolDemo { private static final Integer THRESHOLD = 200; static class CountTask extends RecursiveTask<Integer> { private static final long serialVersionUID = 1L; //用start和end分别表示子任务计算开始和结束的值 private int start; private int end; public CountTask(int start , int end) { this.start = start; this.end = end; } @Override protected Integer compute() { //为了防止子任务划分层次太深而造成系统资源耗尽,需要加一个阈值 if(end - start < THRESHOLD) { System.out.println("子任务区间达到阈值,开始计算的区间:[" + start + "," + end + "]"); Integer sum = 0; for(int i = start ; i <= end ; i++) { sum += i; } return sum; }else { //否则再进行任务拆分,拆分成两个任务 CountTask left = new CountTask(start, (start + end) / 2); System.out.println("左半部分子任务,开始计算的区间:[" + start + "," + (start + end) / 2 + "]"); left.fork(); CountTask right = new CountTask((start + end) / 2 + 1 , end); System.out.println("右半部分子任务,开始计算的区间:[" + ((start + end) / 2 + 1) + "," + end + "]"); right.fork(); //左右半任务结果合并 return left.join() + right.join(); } } } public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(); ForkJoinTask<Integer> taskFuture = pool.submit(new CountTask(1,5000)); try { Integer result = taskFuture.get(); System.out.println("result = " + result); } catch (Exception e) { e.printStackTrace(); } } }
上述示例是将一个大的计算任务【1,5000】分拆成两个子任务:左半部分子任务【1,2500】和右半部分子任务【2501,5000】。因为在计算时候没有达到阈值,所以继续分拆:
①左半部分子任务【1,2500】继续拆分成【1,1250】和【1251,2500】
②右半部分子任务【2501,5000】继续拆分成【2501,3750】和【3751,5000】
............
一直持续拆分,直到拆分的子任务区间end-start在阈值(200)范围之内,开始使用循环进行计算,得到结果,然后两个左半部分的子任务结果不断join(最后得到【1,2500】的计算结果),右半部分的子任务结果不断join,得到【2501,5000】的计算结果,然后两部分再做一次join,就得到了【1,5000】的计算结果。
这里面涉及到4个重要方法,我们分别看下源码:
1、ForkJoinPool中的submit方法,源码如下:
在submit方法中,我们提交了大任务task,计算区间【1,5000】
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { if (task == null) throw new NullPointerException(); forkOrSubmit(task); return task; }在submit方法中,调用了forkOrSubmit方法,我们看下执行逻辑。这里有一个判断,首先判断当前线程是不是ForkJoinWorkerThread的实例,我们当前提交大任务的线程是main线程Thread[main,5,main],见程序执行图。所以判断条件不成立,走方法addSubmission(task);
private <T> void forkOrSubmit(ForkJoinTask<T> task) { ForkJoinWorkerThread w; Thread t = Thread.currentThread(); if (shutdown) throw new RejectedExecutionException(); if ((t instanceof ForkJoinWorkerThread) && (w = (ForkJoinWorkerThread)t).pool == this) w.pushTask(task); else addSubmission(task); }
forkOrSubmit程序执行图如下:
在addSubmission方法中,进行子任务添加,在判断条件if ((q = submissionQueue) != null)中,因为submissionQueue在初始化ForkJoinPool时,在构造函数中就已经赋值( this.submissionQueue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];),所以肯定成立。这里要弄明白UNSAFE.putOrderedObject(q, u, t)方法,它的作用是按照顺序将t值(我们提交的任务)按照偏移量u放在q中(submissionQueue).初始情况下queueBase和queueTop都在0位置.这里任务插入,queueTop需要移位+1,表示现在submissionQueue中有queueTop-queueBase个任务(当queueTop到达队列最后一个位置时,这里就要进行扩容,也就是调用方法growSubmissionQueue(),源码可以大家自己看).当任务进入submissionQueue后,就需要唤醒空闲线程进行处理(如果没有就需要新建,这里的线程就是ForkJoinPool中的工作线程:ForkJoinWorkerThread).
private void addSubmission(ForkJoinTask<?> t) { final ReentrantLock lock = this.submissionLock; lock.lock(); try { ForkJoinTask<?>[] q; int s, m; if ((q = submissionQueue) != null) { // ignore if queue removed long u = (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE; UNSAFE.putOrderedObject(q, u, t); queueTop = s + 1; if (s - queueBase == m) growSubmissionQueue(); } } finally { lock.unlock(); } signalWork(); }我们看下signalWork的源码实现:signalWork的作用就是注释中描述的---- The while condition is true if: (there is are too few total workers OR there is at least one waiter) AND (there are too few active workers OR the pool is terminating). The value of e distinguishes the remaining cases: zero (no waiters)for create, negative if terminating (in which case donothing), else release a waiter.如果条件满足其中之一,工作线程的总数很少或者有至少有一个工作线程等待唤醒(当然前提是线程池没有中断),那么创建一个工作线程或者唤醒一个工作线程(Wakes up or creates a worker.).
/* * Wakes up or creates a worker. */ final void signalWork() { /* * The while condition is true if: (there is are too few total * workers OR there is at least one waiter) AND (there are too * few active workers OR the pool is terminating). The value * of e distinguishes the remaining cases: zero (no waiters) * for create, negative if terminating (in which case do * nothing), else release a waiter. The secondary checks for * release (non-null array etc) can fail if the pool begins * terminating after the test, and don't impose any added cost * because JVMs must perform null and bounds checks anyway. */ long c; int e, u; while ((((e = (int)(c = ctl)) | (u = (int)(c >>> 32))) & (INT_SIGN|SHORT_SIGN)) == (INT_SIGN|SHORT_SIGN) && e >= 0) { if (e > 0) { // release a waiting worker int i; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws; if ((ws = workers) == null || (i = ~e & SMASK) >= ws.length || (w = ws[i]) == null) break; long nc = (((long)(w.nextWait & E_MASK)) | ((long)(u + UAC_UNIT) << 32)); if (w.eventCount == e && UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) { w.eventCount = (e + EC_UNIT) & E_MASK; if (w.parked) UNSAFE.unpark(w); break; } } else if (UNSAFE.compareAndSwapLong (this, ctlOffset, c, (long)(((u + UTC_UNIT) & UTC_MASK) | ((u + UAC_UNIT) & UAC_MASK)) << 32)) { addWorker(); break; } } }当然,我们现在还没有工作线程,因此需要创建一个,所以执行步骤走到了addWorker()方法。我们看下addWorker方法的源码,它的作用是在当前线程池对象上(this指java.util.concurrent.ForkJoinPool@1aba308[Running, parallelism = 4, size = 1, active = 1, running = 1, steals = 0, tasks = 0, submissions = 1])创建一个工作线程(注意:ForkJoinWorkerThread继承了Thread),表明这个工作线程工作在这个线程池中(the pool this thread works in)。
相关推荐
Fork/Join框架是Java7引入的一种用于并行任务执行的框架,它允许将复杂任务拆分成多个子任务,并行执行,然后通过join操作将结果聚合。Fork/Join框架特别适合处理可以递归拆分的计算密集型任务,比如大数据集的搜索...
张孝祥Java多线程与并发库高级应用学习笔记,很经典的学习多线程和并发的资料。张孝祥Java多线程讲义笔记由张孝祥亲自整理,很实用的。
Java并发Fork-Join框架原理是Java7中提供的一种并行执行任务的框架,旨在提高程序的执行效率和性能。该框架的核心思想是将大任务分割成若干个小任务,并将其分配给不同的线程执行,以充分利用多核CPU的计算能力。 ...
ForkJoin并发框架是Java 7引入的一种高效并行计算框架,它基于分而治之(Divide and Conquer)的策略,适用于处理大量可分割的任务。这个框架的核心类是`ForkJoinPool`和`ForkJoinTask`,它们为开发者提供了创建和...
在Java编程中,多线程并发是提升程序执行效率、充分利用多核处理器资源的重要手段。本文将基于"java 多线程并发实例"这个主题,深入探讨Java中的多线程并发概念及其应用。 首先,我们要了解Java中的线程。线程是...
ForkJoin框架 是Java 7中引入的,旨在进一步提高并发程序的性能。它使用了一种称为“工作窃取”的算法,允许线程动态地重分配任务。ForkJoin的核心思想是将大任务分解为更小的任务,然后并行处理这些任务,最后合并...
《Java多线程编程实战指南-核心篇》是一本深入探讨Java并发编程的书籍,旨在帮助读者掌握在Java环境中创建、管理和同步线程的核心技术。Java的多线程能力是其强大之处,使得开发者能够在同一时间执行多个任务,提高...
黑马+传智 Java入门到精通视频教程+课件+代码,30套Java开发项目代码,Java多线程与并发库高级应用视频教程,及电子书,面试题,开发工具等
Java多线程实战精讲是Java开发者必备的技能之一,特别是在处理高并发场景时,它的重要性不言而喻。本文将深入探讨Java多线程的相关知识点,帮助你全面理解并掌握这一核心概念。 1. **线程基础** - **线程定义**:...
### Java多线程与并发库高级应用 #### 一、Java多线程基础 在深入探讨Java多线程与并发库的高级应用之前,我们首先需要回顾一下Java多线程的基础概念和技术要点。 ##### 1.1 线程的概念 在计算机科学中,线程是...
### 张孝祥Java多线程与并发库高级应用笔记概览 #### 一、Java多线程技术的重要性与挑战 Java线程技术是软件工程领域不可或缺的一部分,尤其在底层编程、Android应用开发以及游戏开发中,其重要性不言而喻。然而,...
Java多线程ForkJoinPool实例详解 Java多线程编程中的ForkJoinPool实例详解是Java 7中...ForkJoinPool是Java多线程编程中的一个高效的并发框架,可以高效地执行任务,并且提供了丰富的API来管理和监控任务的执行过程。
Java多线程与并发编程是Java语言中用于处理多任务执行的关键技术,它能够帮助开发者设计出能够有效应对高并发请求的应用程序。在现代的线上(Online)和离线(Offline)应用中,合理利用多线程技术可以大幅提高系统...
Java多线程与并发是Java开发中的重要领域,尤其在现代高性能应用中,对多核处理器的充分利用和高效系统设计离不开并发技术。本主题主要基于《Java多线程编程核心技术》和《Java+7并发编程实战手册》两本书籍的核心...
本篇文章将深入探讨Java中的多线程并发机制,并通过具体的示例来帮助读者更好地理解和掌握这一重要概念。 #### 二、为什么需要多线程? 多线程技术的存在主要解决了计算机系统中资源利用率低下的问题。在没有多...
ForkJoin框架自Java 7引入,它为处理大型任务提供了一种分解成多个子任务并行执行的机制,然后合并子任务的结果。这种机制特别适合处理可以被分解的问题,例如计算、搜索等。 首先,让我们回顾一下传统的并发实现...
ForkJoin框架是Java并发编程中的一个重要工具,它基于分治策略,旨在高效处理大量数据。框架的核心思想是将一个大型任务分解成多个小型任务,然后通过并行执行这些子任务来提高处理效率。ForkJoin框架在Hadoop ...
在Java编程领域,多线程和并发库是核心且复杂的一部分,它们对于构建高效、响应迅速的系统至关重要。本教程的焦点在于“张孝祥Java多线程与并发库高级应用视频教程”的实践代码,旨在帮助开发者深入理解并熟练掌握...
ForkJoinPool 线程泄漏 我的输出: Iteration 0: 3 threads Iteration 111: 118 threads Iteration 222: 229 threads Iteration 333: 340 threads Iteration 444: 451 threads Iteration 555: 562 threads ...