`

CompletableFuture源码赏析

 
阅读更多

文章原创,转载请注明出处:http://abc08010051.iteye.com/blog/2409693

后面会再修改一下,让文章读起来更好读,现在的版本还比较粗糙

 

 

CompletableFuture是java 1.8提供的一个新类,是对Future的增强,吸收了guava异步线程的特点,可以实现一系列的异步线程操作,很多常规的用法网上有很多博客,这里说说部分代码的实现:



  

这是CompletableFuture的基本结构

 

CompletableFuture基本属性方法

    volatile Object result;       // Either the result or boxed AltResult
    volatile Completion stack;    // Top of Treiber stack of dependent actions

result用来存放线程返回的结果

stack 行为上就是一个栈的功能,先进后出,用来存放要执行的动作,这个在单个异步线程返回时是没用的,多个线程等待的时候才排上用场

 

Completion基本属性方法

        volatile Completion next;      // Treiber stack link

        abstract CompletableFuture<?> tryFire(int mode);

 next链式结构,存放下一个

 tryFire 方法,主要返回一个依赖的Completion

 

名词解释:

dependent:依赖,多个线程操作时,如何等待每个线程都完成才返回,主要是依靠这个依赖,没处理完就会调用依赖的postComplete()方法向上传递

source:源,用户自定义的线程的CompletableFuture

 

 

1 单一使用

  

    /**
     * Returns a new CompletableFuture that is asynchronously completed
     * by a task running in the {@link ForkJoinPool#commonPool()} with
     * the value obtained by calling the given Supplier.
     *
     * @param supplier a function returning the value to be used
     * to complete the returned CompletableFuture
     * @param <U> the function's return type
     * @return the new CompletableFuture
     */
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }

    static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
                                                     Supplier<U> f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture<U> d = new CompletableFuture<U>();
        e.execute(new AsyncSupply<U>(d, f));
        return d;
    }

    static final class AsyncSupply<T> extends ForkJoinTask<Void>
            implements Runnable, AsynchronousCompletionTask {
        CompletableFuture<T> dep; Supplier<T> fn;
        AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
            this.dep = dep; this.fn = fn;
        }

        public final Void getRawResult() { return null; }
        public final void setRawResult(Void v) {}
        public final boolean exec() { run(); return true; }

        public void run() {
            //CompletableFuture句柄,把Supplier的返回值放到CompletableFuture的result属性中,当前线程的执行是在默认的线程池中执行,在外部可以获取
            CompletableFuture<T> d; Supplier<T> f;
            if ((d = dep) != null && (f = fn) != null) {
                dep = null; fn = null;
                if (d.result == null) {
                    try {
                        d.completeValue(f.get());
                    } catch (Throwable ex) {
                        d.completeThrowable(ex);
                    }
                }
                d.postComplete();
            }
        }
    }

  /**
     * Pops and tries to trigger all reachable dependents.  Call only
     * when known to be done.
     */
    final void postComplete() {
        /*
         * On each step, variable f holds current dependents to pop
         * and run.  It is extended along only one path at a time,
         * pushing others to avoid unbounded recursion.
         */
        CompletableFuture<?> f = this; Completion h;
        //循环遍历CompletableFuture的stack属性,Completion是一个链式的操作,如果有下一个,触发下一个Completion的tryFire方法
        while ((h = f.stack) != null ||
               (f != this && (h = (f = this).stack) != null)) {
            CompletableFuture<?> d; Completion t;
            if (f.casStack(h, t = h.next)) {
                if (t != null) {
                    if (f != this) {
                        pushStack(h);
                        continue;
                    }
                    h.next = null;    // detach
                }
                f = (d = h.tryFire(NESTED)) == null ? this : d;
            }
        }
    }

    

   等待获取结果

    

    public T get() throws InterruptedException, ExecutionException {
        Object r;
        return reportGet((r = result) == null ? waitingGet(true) : r);
    }


private Object waitingGet(boolean interruptible) {
        Signaller q = null;
        boolean queued = false;
        int spins = -1;
        Object r;
        //循环获取result属性,判断是否为空,不为空获取到结果,跳出while循环
        while ((r = result) == null) {
            if (spins < 0)
                //多个线程在允许,就给spins赋值256,然后循环递减,如果此时还没有返回值,则走下面的else分支
                spins = (Runtime.getRuntime().availableProcessors() > 1) ?
                    1 << 8 : 0; // Use brief spin-wait on multiprocessors
            else if (spins > 0) {
                if (ThreadLocalRandom.nextSecondarySeed() >= 0)
                    --spins;
            }
            else if (q == null)
                //创建等待信号线程
                q = new Signaller(interruptible, 0L, 0L);
            else if (!queued)
                //替换stack属性,把替换是否成功的结果赋值给queued
                queued = tryPushStack(q);
            else if (interruptible && q.interruptControl < 0) {//允许中断,并且q.interruptControl = 1,不会走此分支, 下面的循环出现出现线程中断会走此分支
                q.thread = null;
                cleanStack();
                return null;
            }
            else if (q.thread != null && result == null) {//如果结果没有返回,会进入当前分支
                try {
                    //循环判断q是否释放,等待一直到满足Signaller释放条件(主要判断是否超时),上面Signaller的构造方法中,deadline为0, 不会因为超时释放,只有线程中断的时候才会释放
                    ForkJoinPool.managedBlock(q);
                } catch (InterruptedException ie) {//如果发生线程中断,把Signaller的interruptControl置为-1,等到下一个循环使用
                    q.interruptControl = -1;
                }
            }
        }
        if (q != null) {//信号线程不为null, 如果Signaller的中断控制标记位小于0,则返回null或者线程中断
            q.thread = null;
            if (q.interruptControl < 0) {
                if (interruptible)
                    r = null; // report interruption
                else
                    Thread.currentThread().interrupt();
            }
        }
        //传递给下一个Completion,没有则不执行
        postComplete();
        return r;
    }


    private static <T> T reportGet(Object r)
        throws InterruptedException, ExecutionException {
        //根据不同的情况做返回值的包装
        if (r == null) // by convention below, null means interrupted
            throw new InterruptedException();
        if (r instanceof AltResult) {
            Throwable x, cause;
            if ((x = ((AltResult)r).ex) == null)
                return null;
            if (x instanceof CancellationException)
                throw (CancellationException)x;
            if ((x instanceof CompletionException) &&
                (cause = x.getCause()) != null)
                x = cause;
            throw new ExecutionException(x);
        }
        @SuppressWarnings("unchecked") T t = (T) r;
        return t;
    }
 

 

 2 等待多个线程执行完成再做返回

   

//demo , stageRunnable是一个实现Runnable类型的变量
CompletableFuture future = CompletableFuture.allOf(CompletableFuture.runAsync(stageRunnable),
            CompletableFuture.runAsync(stageRunnable), CompletableFuture.runAsync(stageRunnable));
        System.out.println(JSON.toJSONString(future));
        future.get();


    public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
        return andTree(cfs, 0, cfs.length - 1);
    }

    //此方法是一个递归方法, 二分法把两个任务执行一个等待,每次二分都会创建一个CompletableFuture的depency
    static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs,
                                           int lo, int hi) {
        CompletableFuture<Void> d = new CompletableFuture<Void>();
        if (lo > hi) // empty
            d.result = NIL;
        else {
            CompletableFuture<?> a, b;
            int mid = (lo + hi) >>> 1;
            if ((a = (lo == mid ? cfs[lo] :
                      andTree(cfs, lo, mid))) == null ||
                (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
                      andTree(cfs, mid+1, hi)))  == null)
                throw new NullPointerException();
            if (!d.biRelay(a, b)) {//a,b两个子任务没有全部完成,走此分支
                BiRelay<?,?> c = new BiRelay<>(d, a, b);//创建一个Completion,一个依赖,两个source
                a.bipush(b, c);//把c推送到a,b的stack属性当中去
                c.tryFire(SYNC);//BiRelay触发实际操作
            }
        }
        return d;
    }

    //根据方法名直译的意思:是否两个传播都已经完成;两个任务有任何一个未完成,则返回false, 只有全部完成的时候才会返回true
    boolean biRelay(CompletableFuture<?> a, CompletableFuture<?> b) {
        Object r, s; Throwable x;
        if (a == null || (r = a.result) == null ||
            b == null || (s = b.result) == null)
            return false;
        if (result == null) {
            if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
                completeThrowable(x, r);
            else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null)
                completeThrowable(x, s);
            else
                completeNull();
        }
        return true;
    }
    
 
   等待多个线程结束是怎么等待的呢?
 
    //关于这个方法上面每个步骤都有注释,这里只写关键部分
    private Object waitingGet(boolean interruptible) {
        Signaller q = null;
        boolean queued = false;
        int spins = -1;
        Object r;
        while ((r = result) == null) {
            if (spins < 0)
                spins = (Runtime.getRuntime().availableProcessors() > 1) ?
                    1 << 8 : 0; // Use brief spin-wait on multiprocessors
            else if (spins > 0) {
                if (ThreadLocalRandom.nextSecondarySeed() >= 0)
                    --spins;
            }
            else if (q == null)
                q = new Signaller(interruptible, 0L, 0L);
            else if (!queued)
                //创建一个信号线程,并推送到Completable的stack属性中,等到线程执行完的时候会执行CompletableFuture这个依赖的Completion
                //即当前的Signaller类型的q,请看下面源码signal的tryFire方法实现
                queued = tryPushStack(q);
            else if (interruptible && q.interruptControl < 0) {
                q.thread = null;
                cleanStack();
                return null;
            }
            else if (q.thread != null && result == null) {
                try {
                    //调用Signaller的block和isReleasable方法,无法获取到结果的时候会被阻塞,看下面block具体的代码
                    ForkJoinPool.managedBlock(q);
                } catch (InterruptedException ie) {
                    q.interruptControl = -1;
                }
            }
        }
        if (q != null) {
            q.thread = null;
            if (q.interruptControl < 0) {
                if (interruptible)
                    r = null; // report interruption
                else
                    Thread.currentThread().interrupt();
            }
        }
        postComplete();
        return r;
    }
      
       //Signaller的block方法
       public boolean block() {
            if (isReleasable())
                return true;
            else if (deadline == 0L)//执行到此步时,线程会阻塞
                LockSupport.park(this);
            else if (nanos > 0L)
                LockSupport.parkNanos(this, nanos);
            return isReleasable();
        }

        //Signaller类的tryFire方法
        final CompletableFuture<?> tryFire(int ignore) {
            Thread w; // no need to atomically claim
            if ((w = thread) != null) {
                thread = null;
		//最重要的方法,对当前线程执行了一个unpark方法,此方法会在所有的任务线程执行完了之后,执行postComplete()方法时调用,
		//唤醒因为无法获取计算结果而阻塞的当前线程
                LockSupport.unpark(w);
            }
            return null;
        }

  

 

  • 大小: 150.1 KB
分享到:
评论

相关推荐

    103协议源码赏析

    103协议源码赏析

    Lua中文教程+源码赏析

    这个资源包包含了“Lua中文教程”和“Lua源码赏析”两部分,旨在帮助初学者在短短两小时内快速掌握Lua编程基础,并通过源码分析深入理解其内部机制。 “Lua中文教程”可能涵盖以下内容: 1. **基础语法**:Lua的...

    Spark-2.3.1源码解读

    Spark-2.3.1源码解读。 Spark Core源码阅读 Spark Context 阅读要点 Spark的缓存,变量,shuffle数据等清理及...PIDController源码赏析及 back pressure 实现思路 Streaming Context重点摘要 checkpoint 必知必会

    《Lua 源码欣赏》

    在《Lua 源码欣赏》这篇文章中,作者云风详细地介绍了Lua语言的内部实现机制。Lua是一种小巧、高效的脚本语言,广泛应用于嵌入式系统和游戏开发中。Lua的源码对于理解和优化程序性能,以及设计新语言都有很大的帮助...

    微信小程序商城系统源码

    微信小程序 商城 (源码)微信小程序 商城 (源码)微信小程序 商城 (源码)微信小程序 商城 (源码)微信小程序 商城 (源码)微信小程序 商城 (源码)微信小程序 商城 (源码)微信小程序 商城 (源码)微信小程序 商城 (源码)...

    SSCOM源码 DELPHI 源码

    SSCOM源码 DELPHI 源码 绝对源码!欢迎下载

    微信小程序源码下载 微信小程序源码下载 2000套微信小程序源码

    本资源包含2000套微信小程序的源码,对于开发者来说是一份宝贵的参考资料,可以用来学习、研究或者作为开发新项目的起点。 源码下载是开发者获取程序原始代码的方式,对于学习和理解编程逻辑至关重要。这些微信小...

    电商微信小程序源码+后台

    电商微信小程序源码+后台分享,亲测可用,有需要的朋友拿去!!! 电商微信小程序源码+后台分享,亲测可用,有需要的朋友拿去!!! 电商微信小程序源码+后台分享,亲测可用,有需要的朋友拿去!!! 电商微信小程序...

    饿了么源码 百度外卖源码 美团外卖源码 外卖系统源码

    订餐网,外卖网源码,带积分商城,商家系统,外卖网站建设! 系统特点: 周密策划、项目为先 "项目指导技术,技术服从项目",这是我们一贯秉承的原则,也是我们与其他系统开发商、网站建设公司的本质区别所在!我们...

    变速齿轮 易语言源码 变速齿轮源码 变速器源码

    易语言源码就是用这种语言编写的程序代码,通过阅读和理解这些源码,开发者可以学习到如何利用易语言来实现特定功能,比如变速齿轮。 在易语言中实现变速齿轮功能,主要涉及到以下几个关键知识点: 1. **系统时间...

    移动医疗APP源码 android (安卓版)妙手医生源码

    移动医疗APP源码是开发医疗健康应用的核心组成部分,它包含了应用程序的所有逻辑和界面设计。在Android平台上,这种源码通常是用Java或Kotlin语言编写的,并使用Android Studio作为集成开发环境(IDE)。在这个案例...

    ssh框架项目源码ssh框架项目源码ssh框架项目源码

    ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh框架项目源码ssh...

    cocos creator完整麻将源码下载

    《cocos creator完整麻将源码解析与开发指南》 cocos Creator是一款强大的2D游戏开发引擎,被广泛应用于游戏开发,尤其是休闲娱乐类游戏,如麻将。本篇将深入探讨"麻将源码"这一主题,结合cocos Creator的特性,为...

    C#项目源码大集合系列一

    源码01 销售管理系统 源码02 彩票分析系统 源码03 餐饮管理系统 源码04 C#点名程序 源码05 象棋游戏 源码06 变色球游戏 源码07 多功能计算器 源码08 记事本 源码09 简易画图程序 源码10 成绩管理系统 源码11 BBS论坛...

    51套经典企业网站源码(一)

    0001-2科技发展有限公司升级版源码 0001科技发展有限公司修正版源码 0002机械配件制造销售公司修正版源码 0003家具地板公司修正版源码 0004-1机械有限公司修正版源码 0004机械有限公司修正版源码 0005机械产品公司...

    捕鱼游戏源码 下载 最新完整版

    捕鱼游戏源码是一种基于计算机编程技术,用于开发模拟海洋捕鱼场景的电子游戏的代码集合。这类源码通常包含了游戏逻辑、图形渲染、音频处理、用户交互等多个方面的详细实现,为开发者提供了一个深入理解游戏开发过程...

    53客服系统源码 53kf源码

    在源码层面,53客服系统源码主要包含了以下几个关键知识点: 1. **多平台支持**:53客服系统通常支持网页、手机APP、微信等多种渠道的接入,源码中会有相应的接口实现,以确保用户可以通过不同的终端与客服进行交互...

    仿58同城赶集网源码

    分类源码,信息发布网站源码,信息源码,信息港网站源码,asp信息港源码,信息类网站源码,多种分类源码,信息网源码下载,asp信息网源码,信息发布系统源码,物流信息源码,房产信息网源码.net源码,公安信息网源码,家教信息...

    DIY个性T恤定制网站源码

    【DIY个性T恤定制网站源码】是一个用于创建在线个性化商品定制平台的软件系统,主要专注于T恤、杯子、台历和挂历等产品。这个源码允许用户通过简单的界面设计自己的产品,体现个人风格和创意。接下来,我们将深入...

    net-tools arp源码 ifconfig源码 route源码

    net-tools arp源码 ifconfig源码 route源码

Global site tag (gtag.js) - Google Analytics