`

CompletableFuture源码赏析

 
阅读更多

文章原创,转载请注明出处:http://abc08010051.iteye.com/blog/2409693

后面会再修改一下,让文章读起来更好读,现在的版本还比较粗糙

 

 

CompletableFuture是java 1.8提供的一个新类,是对Future的增强,吸收了guava异步线程的特点,可以实现一系列的异步线程操作,很多常规的用法网上有很多博客,这里说说部分代码的实现:



  

这是CompletableFuture的基本结构

 

CompletableFuture基本属性方法

    volatile Object result;       // Either the result or boxed AltResult
    volatile Completion stack;    // Top of Treiber stack of dependent actions

result用来存放线程返回的结果

stack 行为上就是一个栈的功能,先进后出,用来存放要执行的动作,这个在单个异步线程返回时是没用的,多个线程等待的时候才排上用场

 

Completion基本属性方法

        volatile Completion next;      // Treiber stack link

        abstract CompletableFuture<?> tryFire(int mode);

 next链式结构,存放下一个

 tryFire 方法,主要返回一个依赖的Completion

 

名词解释:

dependent:依赖,多个线程操作时,如何等待每个线程都完成才返回,主要是依靠这个依赖,没处理完就会调用依赖的postComplete()方法向上传递

source:源,用户自定义的线程的CompletableFuture

 

 

1 单一使用

  

    /**
     * Returns a new CompletableFuture that is asynchronously completed
     * by a task running in the {@link ForkJoinPool#commonPool()} with
     * the value obtained by calling the given Supplier.
     *
     * @param supplier a function returning the value to be used
     * to complete the returned CompletableFuture
     * @param <U> the function's return type
     * @return the new CompletableFuture
     */
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }

    static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
                                                     Supplier<U> f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture<U> d = new CompletableFuture<U>();
        e.execute(new AsyncSupply<U>(d, f));
        return d;
    }

    static final class AsyncSupply<T> extends ForkJoinTask<Void>
            implements Runnable, AsynchronousCompletionTask {
        CompletableFuture<T> dep; Supplier<T> fn;
        AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
            this.dep = dep; this.fn = fn;
        }

        public final Void getRawResult() { return null; }
        public final void setRawResult(Void v) {}
        public final boolean exec() { run(); return true; }

        public void run() {
            //CompletableFuture句柄,把Supplier的返回值放到CompletableFuture的result属性中,当前线程的执行是在默认的线程池中执行,在外部可以获取
            CompletableFuture<T> d; Supplier<T> f;
            if ((d = dep) != null && (f = fn) != null) {
                dep = null; fn = null;
                if (d.result == null) {
                    try {
                        d.completeValue(f.get());
                    } catch (Throwable ex) {
                        d.completeThrowable(ex);
                    }
                }
                d.postComplete();
            }
        }
    }

  /**
     * Pops and tries to trigger all reachable dependents.  Call only
     * when known to be done.
     */
    final void postComplete() {
        /*
         * On each step, variable f holds current dependents to pop
         * and run.  It is extended along only one path at a time,
         * pushing others to avoid unbounded recursion.
         */
        CompletableFuture<?> f = this; Completion h;
        //循环遍历CompletableFuture的stack属性,Completion是一个链式的操作,如果有下一个,触发下一个Completion的tryFire方法
        while ((h = f.stack) != null ||
               (f != this && (h = (f = this).stack) != null)) {
            CompletableFuture<?> d; Completion t;
            if (f.casStack(h, t = h.next)) {
                if (t != null) {
                    if (f != this) {
                        pushStack(h);
                        continue;
                    }
                    h.next = null;    // detach
                }
                f = (d = h.tryFire(NESTED)) == null ? this : d;
            }
        }
    }

    

   等待获取结果

    

    public T get() throws InterruptedException, ExecutionException {
        Object r;
        return reportGet((r = result) == null ? waitingGet(true) : r);
    }


private Object waitingGet(boolean interruptible) {
        Signaller q = null;
        boolean queued = false;
        int spins = -1;
        Object r;
        //循环获取result属性,判断是否为空,不为空获取到结果,跳出while循环
        while ((r = result) == null) {
            if (spins < 0)
                //多个线程在允许,就给spins赋值256,然后循环递减,如果此时还没有返回值,则走下面的else分支
                spins = (Runtime.getRuntime().availableProcessors() > 1) ?
                    1 << 8 : 0; // Use brief spin-wait on multiprocessors
            else if (spins > 0) {
                if (ThreadLocalRandom.nextSecondarySeed() >= 0)
                    --spins;
            }
            else if (q == null)
                //创建等待信号线程
                q = new Signaller(interruptible, 0L, 0L);
            else if (!queued)
                //替换stack属性,把替换是否成功的结果赋值给queued
                queued = tryPushStack(q);
            else if (interruptible && q.interruptControl < 0) {//允许中断,并且q.interruptControl = 1,不会走此分支, 下面的循环出现出现线程中断会走此分支
                q.thread = null;
                cleanStack();
                return null;
            }
            else if (q.thread != null && result == null) {//如果结果没有返回,会进入当前分支
                try {
                    //循环判断q是否释放,等待一直到满足Signaller释放条件(主要判断是否超时),上面Signaller的构造方法中,deadline为0, 不会因为超时释放,只有线程中断的时候才会释放
                    ForkJoinPool.managedBlock(q);
                } catch (InterruptedException ie) {//如果发生线程中断,把Signaller的interruptControl置为-1,等到下一个循环使用
                    q.interruptControl = -1;
                }
            }
        }
        if (q != null) {//信号线程不为null, 如果Signaller的中断控制标记位小于0,则返回null或者线程中断
            q.thread = null;
            if (q.interruptControl < 0) {
                if (interruptible)
                    r = null; // report interruption
                else
                    Thread.currentThread().interrupt();
            }
        }
        //传递给下一个Completion,没有则不执行
        postComplete();
        return r;
    }


    private static <T> T reportGet(Object r)
        throws InterruptedException, ExecutionException {
        //根据不同的情况做返回值的包装
        if (r == null) // by convention below, null means interrupted
            throw new InterruptedException();
        if (r instanceof AltResult) {
            Throwable x, cause;
            if ((x = ((AltResult)r).ex) == null)
                return null;
            if (x instanceof CancellationException)
                throw (CancellationException)x;
            if ((x instanceof CompletionException) &&
                (cause = x.getCause()) != null)
                x = cause;
            throw new ExecutionException(x);
        }
        @SuppressWarnings("unchecked") T t = (T) r;
        return t;
    }
 

 

 2 等待多个线程执行完成再做返回

   

//demo , stageRunnable是一个实现Runnable类型的变量
CompletableFuture future = CompletableFuture.allOf(CompletableFuture.runAsync(stageRunnable),
            CompletableFuture.runAsync(stageRunnable), CompletableFuture.runAsync(stageRunnable));
        System.out.println(JSON.toJSONString(future));
        future.get();


    public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
        return andTree(cfs, 0, cfs.length - 1);
    }

    //此方法是一个递归方法, 二分法把两个任务执行一个等待,每次二分都会创建一个CompletableFuture的depency
    static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs,
                                           int lo, int hi) {
        CompletableFuture<Void> d = new CompletableFuture<Void>();
        if (lo > hi) // empty
            d.result = NIL;
        else {
            CompletableFuture<?> a, b;
            int mid = (lo + hi) >>> 1;
            if ((a = (lo == mid ? cfs[lo] :
                      andTree(cfs, lo, mid))) == null ||
                (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
                      andTree(cfs, mid+1, hi)))  == null)
                throw new NullPointerException();
            if (!d.biRelay(a, b)) {//a,b两个子任务没有全部完成,走此分支
                BiRelay<?,?> c = new BiRelay<>(d, a, b);//创建一个Completion,一个依赖,两个source
                a.bipush(b, c);//把c推送到a,b的stack属性当中去
                c.tryFire(SYNC);//BiRelay触发实际操作
            }
        }
        return d;
    }

    //根据方法名直译的意思:是否两个传播都已经完成;两个任务有任何一个未完成,则返回false, 只有全部完成的时候才会返回true
    boolean biRelay(CompletableFuture<?> a, CompletableFuture<?> b) {
        Object r, s; Throwable x;
        if (a == null || (r = a.result) == null ||
            b == null || (s = b.result) == null)
            return false;
        if (result == null) {
            if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
                completeThrowable(x, r);
            else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null)
                completeThrowable(x, s);
            else
                completeNull();
        }
        return true;
    }
    
 
   等待多个线程结束是怎么等待的呢?
 
    //关于这个方法上面每个步骤都有注释,这里只写关键部分
    private Object waitingGet(boolean interruptible) {
        Signaller q = null;
        boolean queued = false;
        int spins = -1;
        Object r;
        while ((r = result) == null) {
            if (spins < 0)
                spins = (Runtime.getRuntime().availableProcessors() > 1) ?
                    1 << 8 : 0; // Use brief spin-wait on multiprocessors
            else if (spins > 0) {
                if (ThreadLocalRandom.nextSecondarySeed() >= 0)
                    --spins;
            }
            else if (q == null)
                q = new Signaller(interruptible, 0L, 0L);
            else if (!queued)
                //创建一个信号线程,并推送到Completable的stack属性中,等到线程执行完的时候会执行CompletableFuture这个依赖的Completion
                //即当前的Signaller类型的q,请看下面源码signal的tryFire方法实现
                queued = tryPushStack(q);
            else if (interruptible && q.interruptControl < 0) {
                q.thread = null;
                cleanStack();
                return null;
            }
            else if (q.thread != null && result == null) {
                try {
                    //调用Signaller的block和isReleasable方法,无法获取到结果的时候会被阻塞,看下面block具体的代码
                    ForkJoinPool.managedBlock(q);
                } catch (InterruptedException ie) {
                    q.interruptControl = -1;
                }
            }
        }
        if (q != null) {
            q.thread = null;
            if (q.interruptControl < 0) {
                if (interruptible)
                    r = null; // report interruption
                else
                    Thread.currentThread().interrupt();
            }
        }
        postComplete();
        return r;
    }
      
       //Signaller的block方法
       public boolean block() {
            if (isReleasable())
                return true;
            else if (deadline == 0L)//执行到此步时,线程会阻塞
                LockSupport.park(this);
            else if (nanos > 0L)
                LockSupport.parkNanos(this, nanos);
            return isReleasable();
        }

        //Signaller类的tryFire方法
        final CompletableFuture<?> tryFire(int ignore) {
            Thread w; // no need to atomically claim
            if ((w = thread) != null) {
                thread = null;
		//最重要的方法,对当前线程执行了一个unpark方法,此方法会在所有的任务线程执行完了之后,执行postComplete()方法时调用,
		//唤醒因为无法获取计算结果而阻塞的当前线程
                LockSupport.unpark(w);
            }
            return null;
        }

  

 

  • 大小: 150.1 KB
分享到:
评论

相关推荐

    103协议源码赏析

    103协议源码赏析

    Lua中文教程+源码赏析

    这个资源包包含了“Lua中文教程”和“Lua源码赏析”两部分,旨在帮助初学者在短短两小时内快速掌握Lua编程基础,并通过源码分析深入理解其内部机制。 “Lua中文教程”可能涵盖以下内容: 1. **基础语法**:Lua的...

    Spark-2.3.1源码解读

    Spark-2.3.1源码解读。 Spark Core源码阅读 Spark Context 阅读要点 Spark的缓存,变量,shuffle数据等清理及...PIDController源码赏析及 back pressure 实现思路 Streaming Context重点摘要 checkpoint 必知必会

    互联网常用框架源码赏析,含 Spring 等多框架及中间件底层原理剖析.zip

    该资源内项目源码是个人的课程设计、毕业设计,代码都测试ok,都是运行成功后才上传资源,答辩评审平均分达到96分,放心下载使用! ## 项目备注 1、该资源内项目代码都经过严格测试运行成功才上传的,请放心下载使用...

    《Lua 源码欣赏》

    在《Lua 源码欣赏》这篇文章中,作者云风详细地介绍了Lua语言的内部实现机制。Lua是一种小巧、高效的脚本语言,广泛应用于嵌入式系统和游戏开发中。Lua的源码对于理解和优化程序性能,以及设计新语言都有很大的帮助...

    skynet源码赏析

    skynet源代码详细解析,让你快速了解SKYNET的全貌。

    网狐6.6完整源码+内核源码+105款游戏源码(已解密).zip

    《网狐6.6完整源码与内核源码解析:105款游戏源码解密探索》 在IT行业中,源码是程序开发的核心,它揭示了软件的内部工作机制,是程序员进行二次开发、优化和调试的基础。"网狐6.6完整源码+内核源码+105款游戏源码...

    微信小程序源码下载 微信小程序源码下载 2000套微信小程序源码

    本资源包含2000套微信小程序的源码,对于开发者来说是一份宝贵的参考资料,可以用来学习、研究或者作为开发新项目的起点。 源码下载是开发者获取程序原始代码的方式,对于学习和理解编程逻辑至关重要。这些微信小...

    电商微信小程序源码+后台

    电商微信小程序源码+后台分享,亲测可用,有需要的朋友拿去!!! 电商微信小程序源码+后台分享,亲测可用,有需要的朋友拿去!!! 电商微信小程序源码+后台分享,亲测可用,有需要的朋友拿去!!! 电商微信小程序...

    饿了么源码 百度外卖源码 美团外卖源码 外卖系统源码

    订餐网,外卖网源码,带积分商城,商家系统,外卖网站建设! 系统特点: 周密策划、项目为先 "项目指导技术,技术服从项目",这是我们一贯秉承的原则,也是我们与其他系统开发商、网站建设公司的本质区别所在!我们...

    移动医疗APP源码 android (安卓版)妙手医生源码

    移动医疗APP源码是开发医疗健康应用的核心组成部分,它包含了应用程序的所有逻辑和界面设计。在Android平台上,这种源码通常是用Java或Kotlin语言编写的,并使用Android Studio作为集成开发环境(IDE)。在这个案例...

    ssh框架项目源码ssh框架项目源码ssh框架项目源码

    ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh...

    cocos creator完整麻将源码下载

    《cocos creator完整麻将源码解析与开发指南》 cocos Creator是一款强大的2D游戏开发引擎,被广泛应用于游戏开发,尤其是休闲娱乐类游戏,如麻将。本篇将深入探讨"麻将源码"这一主题,结合cocos Creator的特性,为...

    C#项目源码大集合系列一

    源码01 销售管理系统 源码02 彩票分析系统 源码03 餐饮管理系统 源码04 C#点名程序 源码05 象棋游戏 源码06 变色球游戏 源码07 多功能计算器 源码08 记事本 源码09 简易画图程序 源码10 成绩管理系统 源码11 BBS论坛...

    51套经典企业网站源码(一)

    0001-2科技发展有限公司升级版源码 0001科技发展有限公司修正版源码 0002机械配件制造销售公司修正版源码 0003家具地板公司修正版源码 0004-1机械有限公司修正版源码 0004机械有限公司修正版源码 0005机械产品公司...

    DIY个性T恤定制网站源码

    【DIY个性T恤定制网站源码】是一个用于创建在线个性化商品定制平台的软件系统,主要专注于T恤、杯子、台历和挂历等产品。这个源码允许用户通过简单的界面设计自己的产品,体现个人风格和创意。接下来,我们将深入...

    net-tools arp源码 ifconfig源码 route源码

    net-tools arp源码 ifconfig源码 route源码

    私服发布网站PHP源码

    源码完全由开发者手工编写,具备全后台动态更新前台的功能,这意味着管理员可以通过后台管理系统方便地更新和管理网站内容,无需懂得前端编程。 源码的亮点在于其灵活性和实用性,允许用户快速部署并开始运营,尤其...

    Linux系统下dhcp源码

    Linux系统下的DHCP(Dynamic Host Configuration Protocol)源码解析 DHCP是一种网络协议,用于自动分配IP地址、子网掩码、默认网关等网络配置信息给网络中的设备。在Linux环境中,DHCP服务器通常使用isc-dhcp-...

    多功能在线报价系统源码

    【标题】:“多功能在线报价系统源码” 在线报价系统是一种基于网络的应用程序,它使得企业或个人能够方便快捷地提供产品或服务的价格信息给潜在客户。这个“多功能在线报价系统源码”是专为此目的设计的,允许用户...

Global site tag (gtag.js) - Google Analytics