文章原创,转载请注明出处:http://abc08010051.iteye.com/blog/2409693
后面会再修改一下,让文章读起来更好读,现在的版本还比较粗糙
CompletableFuture是java 1.8提供的一个新类,是对Future的增强,吸收了guava异步线程的特点,可以实现一系列的异步线程操作,很多常规的用法网上有很多博客,这里说说部分代码的实现:
这是CompletableFuture的基本结构
CompletableFuture基本属性方法
volatile Object result; // Either the result or boxed AltResult volatile Completion stack; // Top of Treiber stack of dependent actions
result用来存放线程返回的结果
stack 行为上就是一个栈的功能,先进后出,用来存放要执行的动作,这个在单个异步线程返回时是没用的,多个线程等待的时候才排上用场
Completion基本属性方法
volatile Completion next; // Treiber stack link abstract CompletableFuture<?> tryFire(int mode);
next链式结构,存放下一个
tryFire 方法,主要返回一个依赖的Completion
名词解释:
dependent:依赖,多个线程操作时,如何等待每个线程都完成才返回,主要是依靠这个依赖,没处理完就会调用依赖的postComplete()方法向上传递
source:源,用户自定义的线程的CompletableFuture
1 单一使用
/** * Returns a new CompletableFuture that is asynchronously completed * by a task running in the {@link ForkJoinPool#commonPool()} with * the value obtained by calling the given Supplier. * * @param supplier a function returning the value to be used * to complete the returned CompletableFuture * @param <U> the function's return type * @return the new CompletableFuture */ public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); } static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) { if (f == null) throw new NullPointerException(); CompletableFuture<U> d = new CompletableFuture<U>(); e.execute(new AsyncSupply<U>(d, f)); return d; } static final class AsyncSupply<T> extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask { CompletableFuture<T> dep; Supplier<T> fn; AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) { this.dep = dep; this.fn = fn; } public final Void getRawResult() { return null; } public final void setRawResult(Void v) {} public final boolean exec() { run(); return true; } public void run() { //CompletableFuture句柄,把Supplier的返回值放到CompletableFuture的result属性中,当前线程的执行是在默认的线程池中执行,在外部可以获取 CompletableFuture<T> d; Supplier<T> f; if ((d = dep) != null && (f = fn) != null) { dep = null; fn = null; if (d.result == null) { try { d.completeValue(f.get()); } catch (Throwable ex) { d.completeThrowable(ex); } } d.postComplete(); } } } /** * Pops and tries to trigger all reachable dependents. Call only * when known to be done. */ final void postComplete() { /* * On each step, variable f holds current dependents to pop * and run. It is extended along only one path at a time, * pushing others to avoid unbounded recursion. */ CompletableFuture<?> f = this; Completion h; //循环遍历CompletableFuture的stack属性,Completion是一个链式的操作,如果有下一个,触发下一个Completion的tryFire方法 while ((h = f.stack) != null || (f != this && (h = (f = this).stack) != null)) { CompletableFuture<?> d; Completion t; if (f.casStack(h, t = h.next)) { if (t != null) { if (f != this) { pushStack(h); continue; } h.next = null; // detach } f = (d = h.tryFire(NESTED)) == null ? this : d; } } }
等待获取结果:
public T get() throws InterruptedException, ExecutionException { Object r; return reportGet((r = result) == null ? waitingGet(true) : r); } private Object waitingGet(boolean interruptible) { Signaller q = null; boolean queued = false; int spins = -1; Object r; //循环获取result属性,判断是否为空,不为空获取到结果,跳出while循环 while ((r = result) == null) { if (spins < 0) //多个线程在允许,就给spins赋值256,然后循环递减,如果此时还没有返回值,则走下面的else分支 spins = (Runtime.getRuntime().availableProcessors() > 1) ? 1 << 8 : 0; // Use brief spin-wait on multiprocessors else if (spins > 0) { if (ThreadLocalRandom.nextSecondarySeed() >= 0) --spins; } else if (q == null) //创建等待信号线程 q = new Signaller(interruptible, 0L, 0L); else if (!queued) //替换stack属性,把替换是否成功的结果赋值给queued queued = tryPushStack(q); else if (interruptible && q.interruptControl < 0) {//允许中断,并且q.interruptControl = 1,不会走此分支, 下面的循环出现出现线程中断会走此分支 q.thread = null; cleanStack(); return null; } else if (q.thread != null && result == null) {//如果结果没有返回,会进入当前分支 try { //循环判断q是否释放,等待一直到满足Signaller释放条件(主要判断是否超时),上面Signaller的构造方法中,deadline为0, 不会因为超时释放,只有线程中断的时候才会释放 ForkJoinPool.managedBlock(q); } catch (InterruptedException ie) {//如果发生线程中断,把Signaller的interruptControl置为-1,等到下一个循环使用 q.interruptControl = -1; } } } if (q != null) {//信号线程不为null, 如果Signaller的中断控制标记位小于0,则返回null或者线程中断 q.thread = null; if (q.interruptControl < 0) { if (interruptible) r = null; // report interruption else Thread.currentThread().interrupt(); } } //传递给下一个Completion,没有则不执行 postComplete(); return r; } private static <T> T reportGet(Object r) throws InterruptedException, ExecutionException { //根据不同的情况做返回值的包装 if (r == null) // by convention below, null means interrupted throw new InterruptedException(); if (r instanceof AltResult) { Throwable x, cause; if ((x = ((AltResult)r).ex) == null) return null; if (x instanceof CancellationException) throw (CancellationException)x; if ((x instanceof CompletionException) && (cause = x.getCause()) != null) x = cause; throw new ExecutionException(x); } @SuppressWarnings("unchecked") T t = (T) r; return t; }
2 等待多个线程执行完成再做返回
//demo , stageRunnable是一个实现Runnable类型的变量 CompletableFuture future = CompletableFuture.allOf(CompletableFuture.runAsync(stageRunnable), CompletableFuture.runAsync(stageRunnable), CompletableFuture.runAsync(stageRunnable)); System.out.println(JSON.toJSONString(future)); future.get(); public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) { return andTree(cfs, 0, cfs.length - 1); } //此方法是一个递归方法, 二分法把两个任务执行一个等待,每次二分都会创建一个CompletableFuture的depency static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs, int lo, int hi) { CompletableFuture<Void> d = new CompletableFuture<Void>(); if (lo > hi) // empty d.result = NIL; else { CompletableFuture<?> a, b; int mid = (lo + hi) >>> 1; if ((a = (lo == mid ? cfs[lo] : andTree(cfs, lo, mid))) == null || (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : andTree(cfs, mid+1, hi))) == null) throw new NullPointerException(); if (!d.biRelay(a, b)) {//a,b两个子任务没有全部完成,走此分支 BiRelay<?,?> c = new BiRelay<>(d, a, b);//创建一个Completion,一个依赖,两个source a.bipush(b, c);//把c推送到a,b的stack属性当中去 c.tryFire(SYNC);//BiRelay触发实际操作 } } return d; } //根据方法名直译的意思:是否两个传播都已经完成;两个任务有任何一个未完成,则返回false, 只有全部完成的时候才会返回true boolean biRelay(CompletableFuture<?> a, CompletableFuture<?> b) { Object r, s; Throwable x; if (a == null || (r = a.result) == null || b == null || (s = b.result) == null) return false; if (result == null) { if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) completeThrowable(x, r); else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null) completeThrowable(x, s); else completeNull(); } return true; }
相关推荐
Lua 源码赏析.pdf, 其中有着非常棒的lua源代码资源可供学习。下载文件为该书的百度云连接地址。
103协议源码赏析
这个资源包包含了“Lua中文教程”和“Lua源码赏析”两部分,旨在帮助初学者在短短两小时内快速掌握Lua编程基础,并通过源码分析深入理解其内部机制。 “Lua中文教程”可能涵盖以下内容: 1. **基础语法**:Lua的...
Spark-2.3.1源码解读。 Spark Core源码阅读 Spark Context 阅读要点 Spark的缓存,变量,shuffle数据等清理及...PIDController源码赏析及 back pressure 实现思路 Streaming Context重点摘要 checkpoint 必知必会
programming in lua 4th,lua程序设计第4版,我学第一版的时候是200多页,现在2016年出的第四版还是200多页,更新了一些内容。还有国内大神云峰编写的《lua源码鉴赏》,分享出来一块学习进步吧。
在《Lua 源码欣赏》这篇文章中,作者云风详细地介绍了Lua语言的内部实现机制。Lua是一种小巧、高效的脚本语言,广泛应用于嵌入式系统和游戏开发中。Lua的源码对于理解和优化程序性能,以及设计新语言都有很大的帮助...
skynet源代码详细解析,让你快速了解SKYNET的全貌。
SSCOM源码 DELPHI 源码 绝对源码!欢迎下载
本资源包含2000套微信小程序的源码,对于开发者来说是一份宝贵的参考资料,可以用来学习、研究或者作为开发新项目的起点。 源码下载是开发者获取程序原始代码的方式,对于学习和理解编程逻辑至关重要。这些微信小...
【借贷公司源码 网贷平台源码 php借贷源码】这个标题揭示了我们要讨论的核心内容,即一套用于建立在线借贷或网贷平台的源代码,该源代码是基于PHP编程语言实现的。PHP是一种广泛使用的开源服务器端脚本语言,尤其在...
订餐网,外卖网源码,带积分商城,商家系统,外卖网站建设! 系统特点: 周密策划、项目为先 "项目指导技术,技术服从项目",这是我们一贯秉承的原则,也是我们与其他系统开发商、网站建设公司的本质区别所在!我们...
易语言源码就是用这种语言编写的程序代码,通过阅读和理解这些源码,开发者可以学习到如何利用易语言来实现特定功能,比如变速齿轮。 在易语言中实现变速齿轮功能,主要涉及到以下几个关键知识点: 1. **系统时间...
提供的源码资源涵盖了安卓应用、小程序、Python应用和Java应用等多个领域,每个领域都包含了丰富的实例和项目。这些源码都是基于各自平台的最新技术和标准编写,确保了在对应环境下能够无缝运行。同时,源码中配备了...
8. **并发改进**:`java.util.concurrent` 包下,Java 8 对并发工具类进行了增强,例如 `ForkJoinPool` 和 `CompletableFuture`,提供了更高级的并发编程模型。 9. **反射与注解处理**:Sun 包源码中包含了 `sun....
ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh...
《cocos creator完整麻将源码解析与开发指南》 cocos Creator是一款强大的2D游戏开发引擎,被广泛应用于游戏开发,尤其是休闲娱乐类游戏,如麻将。本篇将深入探讨"麻将源码"这一主题,结合cocos Creator的特性,为...
源码01 销售管理系统 源码02 彩票分析系统 源码03 餐饮管理系统 源码04 C#点名程序 源码05 象棋游戏 源码06 变色球游戏 源码07 多功能计算器 源码08 记事本 源码09 简易画图程序 源码10 成绩管理系统 源码11 BBS论坛...
移动医疗APP源码是开发医疗健康应用的核心组成部分,它包含了应用程序的所有逻辑和界面设计。在Android平台上,这种源码通常是用Java或Kotlin语言编写的,并使用Android Studio作为集成开发环境(IDE)。在这个案例...
0001-2科技发展有限公司升级版源码 0001科技发展有限公司修正版源码 0002机械配件制造销售公司修正版源码 0003家具地板公司修正版源码 0004-1机械有限公司修正版源码 0004机械有限公司修正版源码 0005机械产品公司...
net-tools arp源码 ifconfig源码 route源码