`
BrokenDreams
  • 浏览: 257080 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
68ec41aa-0ce6-3f83-961b-5aa541d59e48
Java并发包源码解析
浏览量:101806
社区版块
存档分类
最新评论

Jdk1.6 JUC源码解析(16)-FutureTask

阅读更多

Jdk1.6 JUC源码解析(16)-FutureTask

作者:大飞

 

功能简介:
  • FutureTask是一种异步任务(或异步计算),举个栗子,主线程的逻辑中需要使用某个值,但这个值需要复杂的运算得来,那么主线程可以提前建立一个异步任务来计算这个值(在其他的线程中计算),然后去做其他事情,当需要这个值的时候再通过刚才建立的异步任务来获取这个值,有点并行的意思,这样可以缩短整个主线程逻辑的执行时间。
  • FutureTask也是基于AQS来构建的,使用共享模式,使用AQS的状态来表示异步任务的运行状态。
源码分析:
  • 先来看下FutureTask都实现了哪些接口。首先实现了RunnableFuture接口,先看下这个接口:
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

 

       RunnableFuture又扩展了Runnable和Future:

 

public interface Runnable {
    /**
     * When an object implementing interface <code>Runnable</code> is used 
     * to create a thread, starting the thread causes the object's 
     * <code>run</code> method to be called in that separately executing 
     * thread. 
     * <p>
     * The general contract of the method <code>run</code> is that it may 
     * take any action whatsoever.
     *
     * @see     java.lang.Thread#run()
     */
    public abstract void run();
}

 

public interface Future<V> {
    /**
     * 尝试取消任务的执行,如果任务已经完成或者已经被取消或者由于某种原因
     * 无法取消,方法返回false。如果任务取消成功,或者任务开始执行之前调用
     * 了取消方法,那么任务就永远不会执行了。mayInterruptIfRunning参数决定 
     * 了是否要中断执行任务的线程。
     */
    boolean cancel(boolean mayInterruptIfRunning);
    /**
     * 判断任务是否在完成之前被取消。
     */
    boolean isCancelled();
    /**
     * 判断任务是否完成。
     */
    boolean isDone();
    /**
     * 等待,直到获取任务的执行结果。如果任务还没执行完,这个方法会阻塞。
     */
    V get() throws InterruptedException, ExecutionException;
    /**
     * 等待,在给定的时间内获取任务的执行结果。
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
       Runnable接口经常写多线程程序的话一定非常熟悉了,这里不说了。看下Future接口,它提供了取消任务接口,并提供了查看任务状态的接口,最重要的是提供了有阻塞行为的获取任务执行结果的接口。
 
  • 接下来看下FutureTask的实现,由于其基于AQS实现,那先看一下内部的同步机制:
    private final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -7828117401763700385L;
        /** 表示任务正在执行 */
        private static final int RUNNING   = 1;
        /** 表示任务已经运行完毕 */
        private static final int RAN       = 2;
        /** 表示任务被取消 */
        private static final int CANCELLED = 4;
        /** 内部的callable */
        private final Callable<V> callable;
        /** 执行结果 */
        private V result;
        /** 执行过程中发生的异常 */
        private Throwable exception;
        /**
         * 执行当前任务的线程。在set/cancel之后置空,说明可以
         * 了。必须使用volatile来修饰,以确保任务完成后的可见性。 
         */
        private volatile Thread runner;
        Sync(Callable<V> callable) {
            this.callable = callable;
        }

 

       内部同步器接中使用了一个callable来保存要执行的任务,看下这个接口: 

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}
       这个接口的行为和Runnable类似,不同的是有返回值,且能抛出异常,算是对Runnable的补充。
 

       继续看下同步器的innerRun方法,这个方法用于支持FutureTask的run方法: 

        void innerRun() {
            //尝试设置任务运行状态为正在执行。
            if (!compareAndSetState(0, RUNNING))
                return; //如果设置失败,直接返回。
            try {
                runner = Thread.currentThread(); //设置执行线程。
                if (getState() == RUNNING) //再次检测任务状态
                    innerSet(callable.call()); //执行任务,然后设置执行结果。
                else
                    releaseShared(0); //说明任务已取消。
            } catch (Throwable ex) {
                innerSetException(ex); //如果执行任务过程中发生异常,设置异常。
            }
        }

 

       看下设置执行结果的innerSet方法: 

        void innerSet(V v) {
	       for (;;) {
		      int s = getState(); //获取任务执行状态。
		      if (s == RAN)
		          return; //如果任务已经执行完毕,退出。
              if (s == CANCELLED) {
		          //这里释放AQS控制权并设置runner为null, 
		          //为了避免正在和一个试图中断线程的取消请求竞
                  releaseShared(0);
                  return;
              }
              //尝试将任务状态设置为执行完成。
		      if (compareAndSetState(s, RAN)) {
                  result = v; //设置执行结果。
                  releaseShared(0); //释放AQS控制权。
                  done(); //这里调用一下done方法,子类可覆盖这个方法,做一些定制处理。
		          return;
              }
            }
        }

 

       AQS分析过,releaseShared方法中会调用tryReleaseShared方法,看一下当前同步器中这个方法的实现: 

        protected boolean tryReleaseShared(int ignore) {
            runner = null;
            return true;
        }

 

       innerRun方法中在执行抛异常后会调用innerSetException:

        void innerSetException(Throwable t) {
	        for (;;) {
		        int s = getState();
		        if (s == RAN)
		            return;
                if (s == CANCELLED) {
		            // aggressively release to set runner to null,
		            // in case we are racing with a cancel request
		            // that will try to interrupt runner
                    releaseShared(0);
                    return;
                 }
		         if (compareAndSetState(s, RAN)) {
                    exception = t;
                    result = null;
                    releaseShared(0);
                    done();
		            return;
                 }
	         }
        }
       过程和innerSet类似,只不过最后要设置异常,清空result。

       和innerRun类似还有innerRunAndReset方法,看下实现: 

        boolean innerRunAndReset() {
            if (!compareAndSetState(0, RUNNING))
                return false;
            try {
                runner = Thread.currentThread();
                if (getState() == RUNNING)
                    callable.call(); // don't set result
                runner = null;
                return compareAndSetState(RUNNING, 0);
            } catch (Throwable ex) {
                innerSetException(ex);
                return false;
            }
        }
       和innerRun的区别是不设置执行结果,最后执行完毕后重置异步任务状态为0。
 

       再看下同步器的innerGet方法,这个方法用于支持FutureTask的get方法: 

        V innerGet() throws InterruptedException, ExecutionException {
            //获取共享锁,无法获取时阻塞等待。
            acquireSharedInterruptibly(0);
            if (getState() == CANCELLED)
                throw new CancellationException(); //如果任务状态为取消,那么抛出CancellationException
            if (exception != null)
                throw new ExecutionException(exception);//如果任务执行异常,抛出ExecutionException,并传递异常。
            return result; //成功执行完成,返回执行结果。
        }

 

       类似的有带超时的innerGet:

 

        V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
            if (!tryAcquireSharedNanos(0, nanosTimeout))
                throw new TimeoutException();
            if (getState() == CANCELLED)
                throw new CancellationException();
            if (exception != null)
                throw new ExecutionException(exception);
            return result;
        }

 

       AQS分析过,acquireSharedInterruptibly和tryAcquireSharedNanos方法中会调用tryAcquireShared方法,看一下当前同步器中这个方法的实现:

        protected int tryAcquireShared(int ignore) {
            return innerIsDone()? 1 : -1;
        }
        boolean innerIsDone() {
            return ranOrCancelled(getState()) && runner == null;
        }
        private boolean ranOrCancelled(int state) {
            return (state & (RAN | CANCELLED)) != 0;
        }
       可见innerGet中会首先判断任务是否完成,要依据任务(完成或取消)状态来判断。
 

       最后看下同步器的innerCancel方法,这个方法用于支持FutureTask的cancel方法:

        boolean innerCancel(boolean mayInterruptIfRunning) {
	        for (;;) {
		        int s = getState();
		        if (ranOrCancelled(s))
		            return false; //如果任务已经执行完毕或者取消。
		        if (compareAndSetState(s, CANCELLED))//否则尝试设置任务状态为取消。
		            break;
	        }
            if (mayInterruptIfRunning) {
                Thread r = runner;
                if (r != null)
                    r.interrupt(); //如果设置了mayInterruptIfRunning为true,需要中断线程,
            }
            releaseShared(0); //释放AQS的控制权。
            done(); //这里也会调用done,定制子类时需要注意下。
            return true;
        }

   

  • 有了内部同步机制,FutureTask的实现起来就很容易了,看下代码:
public class FutureTask<V> implements RunnableFuture<V> {
    /** 内部同步器 */
    private final Sync sync;

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        sync = new Sync(callable);
    }

    public FutureTask(Runnable runnable, V result) {
        sync = new Sync(Executors.callable(runnable, result));
    }
    public boolean isCancelled() {
        return sync.innerIsCancelled();
    }
    public boolean isDone() {
        return sync.innerIsDone();
    }
    public boolean cancel(boolean mayInterruptIfRunning) {
        return sync.innerCancel(mayInterruptIfRunning);
    }

    public V get() throws InterruptedException, ExecutionException {
        return sync.innerGet();
    }

    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return sync.innerGet(unit.toNanos(timeout));
    }
    /**
     * 本类中是一个空实现,子类可以覆盖这个方法,做回调或一些记录工作。
     * 可以来实现里面通过任务状态来判断任务是否被取消。
     */
    protected void done() { }

    protected void set(V v) {
        sync.innerSet(v);
    }

    protected void setException(Throwable t) {
        sync.innerSetException(t);
    }

    public void run() {
        sync.innerRun();
    }

    protected boolean runAndReset() {
        return sync.innerRunAndReset();
    }
    ...

 

       实现都是基于上面分析过的方法,这里不啰嗦了。注意构造方法中有一个Runnable到callable的转换,使用了Executors中的方法,这个类后续会分析,这里简单看一下:

    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable  task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }
       就是简单的通过一个适配类在适配Runnable和Callable。
 
       小总结一下:
              1.当前线程建立异步任务后,异步任务处于初始状态(内部有一个数值表示状态,初始为0),一般交由其他线程执行任务(比如提交给线程池处理)。当前线程通过异步任务的get方法来获取执行结果时,如果异步任务此时还没执行完毕(内部状态既不是完成,也不是取消),那么当前线程会在get方法处阻塞。
              2.当其他线程(比如线程池中的工作线程)执行了异步任务,那么会将异步任务的状态改成"完成"(根据情况也可能是取消),同时将在get除等待的线程唤醒。
 

       FutureTask的代码解析完毕!

 

 

       参见:Jdk1.6 JUC源码解析(6)-locks-AbstractQueuedSynchronizer

 

  

 

分享到:
评论

相关推荐

    Java并发包源码分析(JDK1.8)

    Java并发包源码分析(JDK1.8):囊括了java.util.concurrent包中大部分类的源码分析,其中涉及automic包,locks包(AbstractQueuedSynchronizer、ReentrantLock、ReentrantReadWriteLock、LockSupport等),queue...

    西川的学习总结笔记,涵盖了java、spring、java其他常用框架,以及大数据组件相关等.zip

    可以加我微信纪实西川笔记Java系列java进阶java 泛型java实例化的软件开发方式nio基础ArrayList源码分析LinkedList源代码分析HashSet和TreeSet源码分析HashMap源码分析(JDK1.8)juc进阶多主题基础Callable、Future和...

    2018尚硅谷Java培训视频链接_Java基础阶段

    - JDK安装与配置:介绍如何下载安装JDK(Java Development Kit),包括环境变量的设置等。 - HelloWorld程序:通过编写第一个Java程序来熟悉Java开发环境。 - 数据类型与运算符:学习Java中的基本数据类型如整型...

    jdkLearning:阅读java原始代码包含:集合,JUC,Executor体系

    这个"jdkLearning"项目专注于深入理解JDK的源代码,特别是集合、并发编程(JUC)和Executor服务这三个核心领域。通过阅读和分析这些源码,开发者可以更深入地了解Java平台的工作原理,提升编程技能,并优化应用程序...

    AI从头到脚详解如何创建部署Azure Web App的OpenAI项目源码

    【AI】从头到脚详解如何创建部署Azure Web App的OpenAI项目源码

    人脸识别_卷积神经网络_CNN_ORL数据库_身份验证_1741779511.zip

    人脸识别项目实战

    人工智能-人脸识别代码

    人工智能-人脸识别代码,采用cnn的架构识别代码

    汽车配件制造业企业信息化整体解决方案.pptx

    汽车配件制造业企业信息化整体解决方案

    短期风速预测模型,IDBO-BiTCN-BiGRU-Multihead-Attention IDBO是,网上复现 评价指标:R方、MAE、MAPE、RMSE 附带测试数据集运行(风速数据) 提示:在

    短期风速预测模型,IDBO-BiTCN-BiGRU-Multihead-Attention IDBO是,网上复现 评价指标:R方、MAE、MAPE、RMSE 附带测试数据集运行(风速数据) 提示:在MATLAB2024a上测试正常 ,短期风速预测模型; IDBO-BiTCN-BiGRU-Multihead-Attention; 评价指标: R方、MAE、MAPE、RMSE; 复现; 测试数据集; MATLAB 2024a,短期风速预测模型:IDBO-BiTCN-BiGRU-Attention集成模型

    手势识别_数据融合_运动融合帧_Pytorch实现_1741857761.zip

    手势识别项目实战

    智慧园区IBMS可视化管理系统建设方案PPT(61页).pptx

    在智慧园区建设的浪潮中,一个集高效、安全、便捷于一体的综合解决方案正逐步成为现代园区管理的标配。这一方案旨在解决传统园区面临的智能化水平低、信息孤岛、管理手段落后等痛点,通过信息化平台与智能硬件的深度融合,为园区带来前所未有的变革。 首先,智慧园区综合解决方案以提升园区整体智能化水平为核心,打破了信息孤岛现象。通过构建统一的智能运营中心(IOC),采用1+N模式,即一个智能运营中心集成多个应用系统,实现了园区内各系统的互联互通与数据共享。IOC运营中心如同园区的“智慧大脑”,利用大数据可视化技术,将园区安防、机电设备运行、车辆通行、人员流动、能源能耗等关键信息实时呈现在拼接巨屏上,管理者可直观掌握园区运行状态,实现科学决策。这种“万物互联”的能力不仅消除了系统间的壁垒,还大幅提升了管理效率,让园区管理更加精细化、智能化。 更令人兴奋的是,该方案融入了诸多前沿科技,让智慧园区充满了未来感。例如,利用AI视频分析技术,智慧园区实现了对人脸、车辆、行为的智能识别与追踪,不仅极大提升了安防水平,还能为园区提供精准的人流分析、车辆管理等增值服务。同时,无人机巡查、巡逻机器人等智能设备的加入,让园区安全无死角,管理更轻松。特别是巡逻机器人,不仅能进行360度地面全天候巡检,还能自主绕障、充电,甚至具备火灾预警、空气质量检测等环境感知能力,成为了园区管理的得力助手。此外,通过构建高精度数字孪生系统,将园区现实场景与数字世界完美融合,管理者可借助VR/AR技术进行远程巡检、设备维护等操作,仿佛置身于一个虚拟与现实交织的智慧世界。 最值得关注的是,智慧园区综合解决方案还带来了显著的经济与社会效益。通过优化园区管理流程,实现降本增效。例如,智能库存管理、及时响应采购需求等举措,大幅减少了库存积压与浪费;而设备自动化与远程监控则降低了维修与人力成本。同时,借助大数据分析技术,园区可精准把握产业趋势,优化招商策略,提高入驻企业满意度与营收水平。此外,智慧园区的低碳节能设计,通过能源分析与精细化管理,实现了能耗的显著降低,为园区可持续发展奠定了坚实基础。总之,这一综合解决方案不仅让园区管理变得更加智慧、高效,更为入驻企业与员工带来了更加舒适、便捷的工作与生活环境,是未来园区建设的必然趋势。

    相亲交友系统源码 V10.5支持婚恋相亲M红娘系统.zip

    相亲交友系统源码 V10.5支持婚恋相亲、媒婆返利、红娘系统、商城系统等等 这款交友系统功能太多了,适合婚恋相亲,还有媒婆婚庆等等支持 PC和 H5还有小程序,可封装红年、APP,里面带安装教程

    单片机也能玩双核之你想不到c技巧系列-嵌入式实战(资料+视频教程)

    本资源《单片机也能玩双核之你想不到的C技巧系列——嵌入式实战》涵盖 双核单片机开发、C语言高级技巧、嵌入式系统优化 等核心内容,结合 实战案例与视频教程,帮助开发者深入理解并掌握高效编程技巧。 适用人群: 适合 嵌入式开发工程师、单片机开发者、电子信息相关专业学生,以及希望提升 C语言编程能力 和 嵌入式项目经验 的技术人员。 能学到什么: 双核单片机开发思路,提高并行处理能力。 C语言高级技巧,提升代码优化与执行效率。 嵌入式系统调试方法,掌握实际项目中的调试策略。 实战案例解析,学习如何在实际工程中应用双核技术。 阅读建议: 建议 先学习基础知识,再结合 示例代码与视频教程 进行实操,重点关注 代码优化、调试技巧与双核应用模式,通过实战演练提高嵌入式开发能力。

    计算机视觉_OpenCV_人脸识别_成本节约检测方案_1741779495.zip

    人脸识别项目源码实战

    `机器学习_深度学习_Keras_教程用途`.zip

    人脸识别项目源码实战

    地铁网络_Dijkstra_最短路径_查询工具_1741862725.zip

    c语言学习

    红外光伏缺陷目标检测模型,YOLOv8模型 基于红外光伏缺陷目标检测数据集训练,做了必要的数据增强处理,以达到缺陷类别间的平衡 可检测大面积热斑,单一热斑,二极管短路和异常低温四类缺陷 测试集指标如

    红外光伏缺陷目标检测模型,YOLOv8模型 基于红外光伏缺陷目标检测数据集训练,做了必要的数据增强处理,以达到缺陷类别间的平衡 可检测大面积热斑,单一热斑,二极管短路和异常低温四类缺陷 测试集指标如图所示 ,核心关键词:红外光伏缺陷目标检测模型; YOLOv8模型; 数据增强处理; 缺陷类别平衡; 大面积热斑; 单一热斑; 二极管短路; 异常低温。,基于YOLOv8的红外光伏缺陷检测模型

    基于PLC的自动浇花控制系统 西门子1200PLC博途仿真,提供HMI画面,接线图,IO分配表,演示视频,简单讲解视频 博图15.1及以上版本均可使用 ,核心关键词: PLC自动浇花控制系统; 西

    基于PLC的自动浇花控制系统 西门子1200PLC博途仿真,提供HMI画面,接线图,IO分配表,演示视频,简单讲解视频 博图15.1及以上版本均可使用 ,核心关键词: PLC自动浇花控制系统; 西门子1200PLC博途仿真; HMI画面; 接线图; IO分配表; 演示视频; 简单讲解视频; 博图15.1及以上版本。,基于PLC的自动浇花系统:西门子1200PLC博途仿真实践教程

    智慧园区标准化综合解决方案PPT(60页).pptx

    在智慧园区建设的浪潮中,一个集高效、安全、便捷于一体的综合解决方案正逐步成为现代园区管理的标配。这一方案旨在解决传统园区面临的智能化水平低、信息孤岛、管理手段落后等痛点,通过信息化平台与智能硬件的深度融合,为园区带来前所未有的变革。 首先,智慧园区综合解决方案以提升园区整体智能化水平为核心,打破了信息孤岛现象。通过构建统一的智能运营中心(IOC),采用1+N模式,即一个智能运营中心集成多个应用系统,实现了园区内各系统的互联互通与数据共享。IOC运营中心如同园区的“智慧大脑”,利用大数据可视化技术,将园区安防、机电设备运行、车辆通行、人员流动、能源能耗等关键信息实时呈现在拼接巨屏上,管理者可直观掌握园区运行状态,实现科学决策。这种“万物互联”的能力不仅消除了系统间的壁垒,还大幅提升了管理效率,让园区管理更加精细化、智能化。 更令人兴奋的是,该方案融入了诸多前沿科技,让智慧园区充满了未来感。例如,利用AI视频分析技术,智慧园区实现了对人脸、车辆、行为的智能识别与追踪,不仅极大提升了安防水平,还能为园区提供精准的人流分析、车辆管理等增值服务。同时,无人机巡查、巡逻机器人等智能设备的加入,让园区安全无死角,管理更轻松。特别是巡逻机器人,不仅能进行360度地面全天候巡检,还能自主绕障、充电,甚至具备火灾预警、空气质量检测等环境感知能力,成为了园区管理的得力助手。此外,通过构建高精度数字孪生系统,将园区现实场景与数字世界完美融合,管理者可借助VR/AR技术进行远程巡检、设备维护等操作,仿佛置身于一个虚拟与现实交织的智慧世界。 最值得关注的是,智慧园区综合解决方案还带来了显著的经济与社会效益。通过优化园区管理流程,实现降本增效。例如,智能库存管理、及时响应采购需求等举措,大幅减少了库存积压与浪费;而设备自动化与远程监控则降低了维修与人力成本。同时,借助大数据分析技术,园区可精准把握产业趋势,优化招商策略,提高入驻企业满意度与营收水平。此外,智慧园区的低碳节能设计,通过能源分析与精细化管理,实现了能耗的显著降低,为园区可持续发展奠定了坚实基础。总之,这一综合解决方案不仅让园区管理变得更加智慧、高效,更为入驻企业与员工带来了更加舒适、便捷的工作与生活环境,是未来园区建设的必然趋势。

    大型集团用户画像系统化标准化数字化用户主数据管理项目规划方案.pptx

    大型集团用户画像系统化标准化数字化用户主数据管理项目规划方案

Global site tag (gtag.js) - Google Analytics