在JDK1.7中,JDK的并发包下新增了一个框架:ForkJoin框架(它只是一种思想,别一听框架就又懵了)。Fork在英文中是叉子的意思,Join是最后归集的意思,我们可以想象一下,你用叉子(ForkJoin框架)去拿一块牛排,分叉的部分合力将牛排叉住(共同处理任务),把儿的部分(Join)可以让我们用它来拿起牛排放到嘴里(最后的结果)。。。。
上面描述了一个过程,我们把这个过程叫做:分而治之。见下图
接触过大数据的童鞋应该都知道,分而治之方法是一个非常有效的处理大量数据的方法。大数据中MapReduce就是采用了这种方法,将任务分拆,使用多个节点去处理部分任务,然后再用归集节点处理最后的结果。
所有好的思想都是通用的,它们来源于生活,抽象于生活(都是聪明人,想着怎么高效怎么来)。
JDK1.7中提供了一种新的线程池:ForkJoinPool。我们使用前面ThreadPoolExecutor的分析方法,来分析下这个线程池。
一、ForkJoinPool的核心构造函数
ForkJoinPool核心构造函数,无一例外的还是最后一个最长的。我们看下它们源码。
public ForkJoinPool() { this(Runtime.getRuntime().availableProcessors(), defaultForkJoinWorkerThreadFactory, null, false); } public ForkJoinPool(int parallelism) { this(parallelism, defaultForkJoinWorkerThreadFactory, null, false); } /** * Creates a {@code ForkJoinPool} with the given parameters. * * @param parallelism the parallelism level. For default value, * use {@link java.lang.Runtime#availableProcessors}. * @param factory the factory for creating new threads. For default value, * use {@link #defaultForkJoinWorkerThreadFactory}. * @param handler the handler for internal worker threads that * terminate due to unrecoverable errors encountered while executing * tasks. For default value, use {@code null}. * @param asyncMode if true, * establishes local first-in-first-out scheduling mode for forked * tasks that are never joined. This mode may be more appropriate * than default locally stack-based mode in applications in which * worker threads only process event-style asynchronous tasks. * For default value, use {@code false}. * @throws IllegalArgumentException if parallelism less than or * equal to zero, or greater than implementation limit * @throws NullPointerException if the factory is null * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads * because it does not hold {@link * java.lang.RuntimePermission}{@code ("modifyThread")} */ public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, Thread.UncaughtExceptionHandler handler, boolean asyncMode) { checkPermission(); if (factory == null) throw new NullPointerException(); if (parallelism <= 0 || parallelism > MAX_ID) throw new IllegalArgumentException(); this.parallelism = parallelism; this.factory = factory; this.ueh = handler; //设置每个处理线程内任务队列的处理方式,FIFO or LIFO this.locallyFifo = asyncMode; long np = (long)(-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); //子任务的队列初始大小为INITIAL_QUEUE_CAPACITY,容量为8 this.submissionQueue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; // initialize workers array with room for 2*parallelism if possible int n = parallelism << 1; if (n >= MAX_ID) n = MAX_ID; else { // See Hackers Delight, sec 3.2, where n < (1 << 16) n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; } //设置线程池中工作线程初始大小默认为16个,n经过运算后为15 workers = new ForkJoinWorkerThread[n + 1]; this.submissionLock = new ReentrantLock(); this.termination = submissionLock.newCondition(); StringBuilder sb = new StringBuilder("ForkJoinPool-"); sb.append(poolNumberGenerator.incrementAndGet()); sb.append("-worker-"); this.workerNamePrefix = sb.toString(); }
重要参数解释(我们还是结合英文注释来看):
①parallelism:并行度( the parallelism level),默认情况下跟我们机器的cpu个数保持一致,使用 Runtime.getRuntime().availableProcessors()可以得到我们机器运行时可用的CPU个数(For default value,use {@link java.lang.Runtime#availableProcessors})
②factory:创建新线程的工厂( the factory for creating new threads)。默认情况下使用ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory( For default value,use {@link #defaultForkJoinWorkerThreadFactory})
③handler:线程异常情况下的处理器(Thread.UncaughtExceptionHandler handler),该处理器在线程执行任务时由于某些无法预料到的错误而导致任务线程中断时进行一些处理,默认情况为null。( the handler for internal worker threads that terminate due to unrecoverable errors encountered while executing tasks. For default value, use {@code null}.)
④asyncMode:这个参数要注意,在ForkJoinPool中,每一个工作线程都有一个独立的任务队列,asyncMode表示工作线程内的任务队列是采用何种方式进行调度,可以是先进先出FIFO,也可以是后进先出LIFO。如果为true,则线程池中的工作线程则使用先进先出方式进行任务调度,默认情况下是false。( if true,establishes local first-in-first-out scheduling mode for forked tasks that are never joined. This mode may be more appropriate than default locally stack-based mode in applications in which worker threads only process event-style asynchronous tasks. For default value, use {@code false}.)
二、核心源码解析
在ForkJoinPool中,我们常会使用到3个方法:submit()方法负责大任务提交,fork()负责将任务分拆,join()负责将任务执行结果归集。
以上三个方法都涉及到了任务,在ForkJoinPool中,任务的类型可以是普遍使用的ForkJoinTask,也可以是RecursiveAction和RecursiveTask<V>这两个ForkJoinTask的子类去执行特定类型(Recursive,递归调用)的任务。RecursiveAction表示没有返回值的任务,RecursiveTask<V>表示携带返回值result的任务(The result of the computation.)。
我们以一个示例开始深入理解ForkJoinPool的工作原理。
示例:计算1-5000的求和。
package com.sitech.threadPool; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; public class ForkJoinPoolDemo { private static final Integer THRESHOLD = 200; static class CountTask extends RecursiveTask<Integer> { private static final long serialVersionUID = 1L; //用start和end分别表示子任务计算开始和结束的值 private int start; private int end; public CountTask(int start , int end) { this.start = start; this.end = end; } @Override protected Integer compute() { //为了防止子任务划分层次太深而造成系统资源耗尽,需要加一个阈值 if(end - start < THRESHOLD) { System.out.println("子任务区间达到阈值,开始计算的区间:[" + start + "," + end + "]"); Integer sum = 0; for(int i = start ; i <= end ; i++) { sum += i; } return sum; }else { //否则再进行任务拆分,拆分成两个任务 CountTask left = new CountTask(start, (start + end) / 2); System.out.println("左半部分子任务,开始计算的区间:[" + start + "," + (start + end) / 2 + "]"); left.fork(); CountTask right = new CountTask((start + end) / 2 + 1 , end); System.out.println("右半部分子任务,开始计算的区间:[" + ((start + end) / 2 + 1) + "," + end + "]"); right.fork(); //左右半任务结果合并 return left.join() + right.join(); } } } public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(); ForkJoinTask<Integer> taskFuture = pool.submit(new CountTask(1,5000)); try { Integer result = taskFuture.get(); System.out.println("result = " + result); } catch (Exception e) { e.printStackTrace(); } } }
上述示例是将一个大的计算任务【1,5000】分拆成两个子任务:左半部分子任务【1,2500】和右半部分子任务【2501,5000】。因为在计算时候没有达到阈值,所以继续分拆:
①左半部分子任务【1,2500】继续拆分成【1,1250】和【1251,2500】
②右半部分子任务【2501,5000】继续拆分成【2501,3750】和【3751,5000】
............
一直持续拆分,直到拆分的子任务区间end-start在阈值(200)范围之内,开始使用循环进行计算,得到结果,然后两个左半部分的子任务结果不断join(最后得到【1,2500】的计算结果),右半部分的子任务结果不断join,得到【2501,5000】的计算结果,然后两部分再做一次join,就得到了【1,5000】的计算结果。
这里面涉及到4个重要方法,我们分别看下源码:
1、ForkJoinPool中的submit方法,源码如下:
在submit方法中,我们提交了大任务task,计算区间【1,5000】
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { if (task == null) throw new NullPointerException(); forkOrSubmit(task); return task; }在submit方法中,调用了forkOrSubmit方法,我们看下执行逻辑。这里有一个判断,首先判断当前线程是不是ForkJoinWorkerThread的实例,我们当前提交大任务的线程是main线程Thread[main,5,main],见程序执行图。所以判断条件不成立,走方法addSubmission(task);
private <T> void forkOrSubmit(ForkJoinTask<T> task) { ForkJoinWorkerThread w; Thread t = Thread.currentThread(); if (shutdown) throw new RejectedExecutionException(); if ((t instanceof ForkJoinWorkerThread) && (w = (ForkJoinWorkerThread)t).pool == this) w.pushTask(task); else addSubmission(task); }
forkOrSubmit程序执行图如下:
在addSubmission方法中,进行子任务添加,在判断条件if ((q = submissionQueue) != null)中,因为submissionQueue在初始化ForkJoinPool时,在构造函数中就已经赋值( this.submissionQueue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];),所以肯定成立。这里要弄明白UNSAFE.putOrderedObject(q, u, t)方法,它的作用是按照顺序将t值(我们提交的任务)按照偏移量u放在q中(submissionQueue).初始情况下queueBase和queueTop都在0位置.这里任务插入,queueTop需要移位+1,表示现在submissionQueue中有queueTop-queueBase个任务(当queueTop到达队列最后一个位置时,这里就要进行扩容,也就是调用方法growSubmissionQueue(),源码可以大家自己看).当任务进入submissionQueue后,就需要唤醒空闲线程进行处理(如果没有就需要新建,这里的线程就是ForkJoinPool中的工作线程:ForkJoinWorkerThread).
private void addSubmission(ForkJoinTask<?> t) { final ReentrantLock lock = this.submissionLock; lock.lock(); try { ForkJoinTask<?>[] q; int s, m; if ((q = submissionQueue) != null) { // ignore if queue removed long u = (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE; UNSAFE.putOrderedObject(q, u, t); queueTop = s + 1; if (s - queueBase == m) growSubmissionQueue(); } } finally { lock.unlock(); } signalWork(); }我们看下signalWork的源码实现:signalWork的作用就是注释中描述的---- The while condition is true if: (there is are too few total workers OR there is at least one waiter) AND (there are too few active workers OR the pool is terminating). The value of e distinguishes the remaining cases: zero (no waiters)for create, negative if terminating (in which case donothing), else release a waiter.如果条件满足其中之一,工作线程的总数很少或者有至少有一个工作线程等待唤醒(当然前提是线程池没有中断),那么创建一个工作线程或者唤醒一个工作线程(Wakes up or creates a worker.).
/* * Wakes up or creates a worker. */ final void signalWork() { /* * The while condition is true if: (there is are too few total * workers OR there is at least one waiter) AND (there are too * few active workers OR the pool is terminating). The value * of e distinguishes the remaining cases: zero (no waiters) * for create, negative if terminating (in which case do * nothing), else release a waiter. The secondary checks for * release (non-null array etc) can fail if the pool begins * terminating after the test, and don't impose any added cost * because JVMs must perform null and bounds checks anyway. */ long c; int e, u; while ((((e = (int)(c = ctl)) | (u = (int)(c >>> 32))) & (INT_SIGN|SHORT_SIGN)) == (INT_SIGN|SHORT_SIGN) && e >= 0) { if (e > 0) { // release a waiting worker int i; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws; if ((ws = workers) == null || (i = ~e & SMASK) >= ws.length || (w = ws[i]) == null) break; long nc = (((long)(w.nextWait & E_MASK)) | ((long)(u + UAC_UNIT) << 32)); if (w.eventCount == e && UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) { w.eventCount = (e + EC_UNIT) & E_MASK; if (w.parked) UNSAFE.unpark(w); break; } } else if (UNSAFE.compareAndSwapLong (this, ctlOffset, c, (long)(((u + UTC_UNIT) & UTC_MASK) | ((u + UAC_UNIT) & UAC_MASK)) << 32)) { addWorker(); break; } } }当然,我们现在还没有工作线程,因此需要创建一个,所以执行步骤走到了addWorker()方法。我们看下addWorker方法的源码,它的作用是在当前线程池对象上(this指java.util.concurrent.ForkJoinPool@1aba308[Running, parallelism = 4, size = 1, active = 1, running = 1, steals = 0, tasks = 0, submissions = 1])创建一个工作线程(注意:ForkJoinWorkerThread继承了Thread),表明这个工作线程工作在这个线程池中(the pool this thread works in)。
相关推荐
以下是RC滤波、LC滤波、CRC滤波、CLC滤波、DLC滤波、LCL滤波的概述: RC滤波 原理:利用电阻(R)和电容(C)对不同频率信号的阻抗变化来实现滤波。低频信号下,电容充电和放电较慢,对信号形成阻碍;高频信号下,电容能够快速充放电,对信号的阻碍较小。 类型: 低通RC滤波器:允许低频信号通过,抑制高频信号。当信号频率升高时,电容器充放电速度加快,使得高频信号在电阻两端产生压降,从而降低输出信号的幅度。 高通RC滤波器:允许高频信号通过,抑制低频信号。在低频时,电容器相当于开路,电路的大部分信号都会被电阻所吸收;在高频时,电容器相当于短路,输入信号能较完整地传到输出端。 优点:电路简单,成本低廉,易于设计和实现。 缺点:滤波效果相对较弱,对高频噪声的抑制能力有限。 应用:常用于简单的信号处理、去噪、音频系统中的低通和高通滤波等。 LC滤波 原理:基于电感(L)和电容(C)元件对频率的响应差异。电感对高频信号呈现高阻抗(近似短路),对低频信号呈现低阻抗(近似开路);电容则相反,对低频信号呈现高阻抗(近似开路),对高频信号呈现低阻抗(近似短路)。 类型: 低通滤波器:允许低频信号通过
校园服务系统 免费JAVA毕业设计 2024成品源码+论文+录屏+启动教程 启动教程:https://www.bilibili.com/video/BV1jKDjYrEz1 项目讲解视频:https://www.bilibili.com/video/BV1Tb421n72S 二次开发教程:https://www.bilibili.com/video/BV18i421i7Dx
**快速进阶:西门子PLC编程高手养成记** 这个标题涵盖了您提供的文字中的关键信息,包括“西门子PLC编程”、“高手养成”等元素,同时也保持了简洁明了的风格。,如何短时间内成为西门子PLC编程高手 看这里:码垛搬运模型 【功能块】码垛搬运功能块 【品牌】西门子 【PLC】1200 【编程软件】博图v16 【编程语言】scl 【特色】以设定的上限和下限为范围,生成随机数。 可以用作模拟量仿真,方便调试程序; 学习用SCL语言编程; 作为数据源演示给领导或客户看; 可无限复制使用。 【说明】:程序不要把时间用来造轮子,这里有的你拿走,保留精力用来创造优质的功能快让你在工作中事半功倍factory Io和博途软件进行联合仿真,码垛搬运层数可以自定义设置,最大层数3,有报警显示功能,位置监视,复位,停止功能。 程序通俗易懂,规范模块化,可以随意增加新功能。 物品有,Factory IO仿真模型+博途v16安装包+博途码垛程序+HMI程序+factory IO安装包2.50版本。 ,关键词
,电机控制器,IGBT结温估算(算法+模型)国际大厂机密算法,多年实际应用,准确度良好…… 能够同时对IGBT内部6个三极管和6个二极管温度进行估计,并输出其中最热的管子对应温度。 可用于温度保护,降额,提高产品性能。 simulink模型除仿真外亦可生成代码…… 提供直流、交流两个仿真模型 提供底层算法模型库(开源,带数据 ) 提供说明文档
"COMSOL模拟:双层多孔介质中油类物质地下渗透扩散现象的时空演变研究",comsol模拟油往地下渗透现象,考虑两层多孔介质,结果显示出油随着时间逐渐向下扩散。 ,comsol模拟;油渗透;两层多孔介质;时间扩散;结果展示,COMSOL模拟两层多孔介质中油渗透扩散现象。
4b076399e3f709dc8990bd0e12720254.part7
基于深度学习的钢轨病害检测算法研究.pdf
西门子Smart200PLC与多台台达变频器实现Modbus轮询通讯:读写参数、控制启停、设置频率及电流监控实用指南,西门子smart200plc与4台台达变频器modbus轮询通讯 VFD-EL小型矢量变频器 1,读写变频器的内部参数 2,控制变频器启停,读频率电流 3,设置变频器输出频率 4,有彩色接线图,和参数设置说明, 有详细注释,简单易懂,可以学习可用项目, ,西门子Smart200PLC; Modbus轮询通讯; 变频器控制; 读写参数; 输出频率设置; 彩色接线图; 参数设置说明; 简单易懂注释。,西门子PLC与台达变频器Modbus轮询通讯项目指南
EI复现:碳减排背景下综合能源服务商合作策略的纳什谈判理论与自适应交替方向乘子法求解,EI复现: 《考虑碳减排的综合能源服务商合作运行优化策略》 纯手工复现,主要通过纳什谈判理论进行博弈,并采用自适应交替方向乘子法进行分布式求解 ,核心关键词:EI复现; 碳减排; 综合能源服务商; 合作运行优化策略; 纳什谈判理论; 博弈; 自适应交替方向乘子法; 分布式求解,EI复现:纳什谈判理论下的碳减排能源服务商合作运行优化策略
"扬子YD9850A耐压仪的LabVIEW通讯源码解析与应用",扬子YD9850A耐压仪labVIEW通讯源码 ,扬子YD9850A; 耐压仪; labVIEW通讯; 源码,扬子YD9850A耐压仪LabVIEW通讯源码
全覆盖与随机碰撞路径规划——AGV避障技术在扫地机器人移动仿真中的应用与对比,AGV全覆盖移动避障路径规划 扫地机器人路径规划 第一类算法 全覆盖智能算法 %% 基于深度优先搜索算法的路径规划—扫地机器人移动仿真 % 返回深度优先搜索实现全覆盖的运行次数 % 将栅格模型的每一个栅格看成一个点 % 实际中栅格模型是连续的,在计算机处理时看作离散的 % 将栅格模型抽象为标识矩阵,矩阵对应位置的标记表示栅格对应位置的状态 第二对比算法 %% 随机碰撞的路径规划—扫地机器人移动仿真 % 返回深度优先搜索实现全覆盖的运行次数 % 将栅格模型的每一个栅格看成一个点 % 实际中栅格模型是连续的,在计算机处理时看作离散的 % 将栅格模型抽象为标识矩阵,矩阵对应位置的标记表示栅格对应位置的状态 ,核心关键词: AGV全覆盖移动避障; 扫地机器人路径规划; 全覆盖智能算法; 深度优先搜索算法; 栅格模型; 标识矩阵。,基于全覆盖智能算法的AGV避障路径规划
"基于Matlab仿真的15kW三相离网逆变器在不对称负载下的正负序控制策略研究及其实验验证",15kW三相离网逆变器在不对称负载下的正负序控制matlab仿真 【1】卖家的研究方向,可提供简单,提供参考文献。 【2】不对称控制包括: 正序分量处理+负序分量处理+正序控制环+负序控制环; 【3】正序控制路与负序控制路都采用dq轴上的电容电压外环+电感电流内环控制; 【4】直流电压Vdc=700V,总功率15kW,LC滤波,阻性负载; 【5】轻重负载切+不对称负载投切均可稳定运行,具体波形如图所示; ,1. 15kW三相离网逆变器; 2. 不对称负载下的正负序控制; 3. MATLAB仿真; 4. 正负序分量处理; 5. 环路控制; 6. dq轴控制; 7. LC滤波; 8. 阻性负载; 9. 轻重负载切换; 10. 不对称负载投切稳定运行。,15kW三相离网逆变器的不对称负载控制Matlab仿真研究
电影数据分析及可视化系统 免费Python毕业设计 2024成品源码+论文+录屏+启动教程 启动教程:https://www.bilibili.com/video/BV1jKDjYrEz1 项目讲解视频:https://www.bilibili.com/video/BV1Tb421n72S 二次开发教程:https://www.bilibili.com/video/BV18i421i7Dx
"COMSOL光学模型解析:点光源与平面波穿越透镜的动态演变过程",COMSOL光学模型演示:点光源和平面波穿过透镜动态过程 ,COMSOL光学模型;点光源;平面波;透镜;动态过程,COMSOL透镜中光波动态传播模型演示
"基于CEEMD-GWO-SVM算法的时间序列预测:风电、光伏、负荷预测通用解决方案",基于CEEMD+GWO+SVM的时间序列预测,风电,光伏,负荷预测,替数据就可以使用。 ,CEEMD; GWO; SVM; 时间序列预测; 风电; 光伏; 负荷预测; 替换数据,基于CEEMD-GWO-SVM算法的能源时间序列预测模型
基于85三菱组态王PLC的药片装瓶自动控制系统的设计与实现,85三菱组态王基于PLC的药片装瓶自动控制系统 ,基于该内容,核心关键词可以是:85三菱组态王;PLC;药片装瓶;自动控制系统。这些关键词用分号分隔的结果为:85三菱组态王; PLC; 药片装瓶; 自动控制系统。,基于PLC的85三菱组态王药片装瓶自动控制系统
《CARSIM与Simulink联合仿真:实现变道及复杂路径规划的MPC轨迹跟踪算法》,carsim+simulink联合仿真实现变道 包含路径规划算法+mpc轨迹跟踪算法 可选simulink版本和c++版本算法(价格一样,如需要2个版本多加30元) 可以适用于弯道道路,弯道车道保持,弯道变道 carsim内规划轨迹可视化 Carsim2020.0 Matlab2017b (可安装包) ,汽车仿真联合;变道与轨迹规划;MPC轨迹跟踪算法;路径规划算法;Carsim2020.0版使用。,"Carsim与Simulink联合仿真:变道与轨迹跟踪算法实现"
在tf.Keras中使用Scikit-Learn优化模型
基于EEMD-PCA-LSTM的优化模型:特征处理与预测效果提升的新方法,EEMD-PCA-LSTM(集合经验模态分解-主成分分析-长短期记忆网络) 将输入特征进行EEMD分解后,通过KPCA判定分解分解累计贡献率,将大于98%的作为新的输入特征同预测序列导入到LSTM进行预测。 与LSTM、EEMD-LSTM进行对比,预测效果获得提升。 该模型可提升度高。 ,EEMD; PCA; LSTM; 特征处理; 预测效果提升; 模型可提升度高,EEMD-PCA-LSTM混合模型:预测效果提升显著的可提升模型
shopping_basket.xlsx