`
m635674608
  • 浏览: 5031965 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

FutureTask 源码分析

    博客分类:
  • java
 
阅读更多

FutureTask是JDK中Future模式的标准实现,它同时实现了Runnable和Future两个接口,提供了可取消的异步计算,并且可以利用开始和取消计算的方法、查询计算是否完成的方法和获取计算结果的方法。

public class FutureTask<V> implements RunnableFuture<V> {
    /** 所有的方法全部委托sync */
    private final Sync sync;

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        sync = new Sync(callable);
    }

    public FutureTask(Runnable runnable, V result) {
        sync = new Sync(Executors.callable(runnable, result));
    }

    public boolean isCancelled() {
        return sync.innerIsCancelled();
    }

    public boolean isDone() {
        return sync.innerIsDone();
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return sync.innerCancel(mayInterruptIfRunning);
    }


    public V get() throws InterruptedException, ExecutionException {
        return sync.innerGet();
    }

    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return sync.innerGet(unit.toNanos(timeout));
    }

    protected void done() { }

    protected void set(V v) {
        sync.innerSet(v);
    }

    protected void setException(Throwable t) {
        sync.innerSetException(t);
    }

    public void run() {
        sync.innerRun();
    }

    protected boolean runAndReset() {
        return sync.innerRunAndReset();
    }

    private final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -7828117401763700385L;

        /** State value representing that task is ready to run */
        /** 代表起始状态 */
        private static final int READY     = 0;
        /** State value representing that task is running */
        /** 代表正在运行中状态 */
        private static final int RUNNING   = 1;
        /** State value representing that task ran */
        /** 代表运行完成的状态 */
        private static final int RAN       = 2;
        /** State value representing that task was cancelled */
        /** 代表被取消的状态 */
        private static final int CANCELLED = 4;

        /** The underlying callable */
        private final Callable<V> callable;
        /** The result to return from get() */
        private V result;
        /** The exception to throw from get() */
        private Throwable exception;

        /**
         * The thread running task. When nulled after set/cancel, this
         * indicates that the results are accessible.  Must be
         * volatile, to ensure visibility upon completion.
         */
        private volatile Thread runner;

        Sync(Callable<V> callable) {
            this.callable = callable;
        }

        /**
        *  判断是否完成或者是否取消
        *  传入0或者1 都返回0 说明任务没有完成 也没有取消
        */
        private boolean ranOrCancelled(int state) {
            return (state & (RAN | CANCELLED)) != 0;
        }

        /**
         * AbstractQueuedSynchronizer的模板方法 
         * 返回1可以获取锁 返回-1说明获取锁失败
         * 调用innerIsDone 返回TRUE 说明任务已经执行完毕
         * 返回FALSE 说明任务没有执行完毕
         */
        protected int tryAcquireShared(int ignore) {
            return innerIsDone() ? 1 : -1;
        }

        /**
         * 释放锁 将执行当前任务的线程设置为null
         */
        protected boolean tryReleaseShared(int ignore) {
            runner = null;
            return true;
        }

        //判断任务是否被取消
        boolean innerIsCancelled() {
            return getState() == CANCELLED;
        }

        //判断任务是否完成(取消也算完成)
        boolean innerIsDone() {
            return ranOrCancelled(getState()) && runner == null;
        }

        //获取结果
        V innerGet() throws InterruptedException, ExecutionException {
            //首先调用AbstractQueuedSynchronizer的方法,这个方法会调用子类方法tryAcquireShared 上面有讲
            //如果当前任务已经完成,那么当前线程可以向下运行,否则把当前线程加入队列阻塞.
            acquireSharedInterruptibly(0);
            //判断状态 如果取消了就抛CancellationException异常.
            if (getState() == CANCELLED)
                throw new CancellationException();
            //如果任务执行过程中出现异常,这里包装一下抛出ExecutionException.
            if (exception != null)
                throw new ExecutionException(exception);
            return result;
        }

        //获取结果
        V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
            //调用AbstractQueuedSynchronizer里的方法
            // return tryAcquireShared(arg) >= 0 ||doAcquireSharedNanos(arg, nanosTimeout);
            // 首先tryAcquireShared调用它获取锁,也就是看任务完事没,如果任务完事了就返回TRUE,那么执行逻辑同上。
            // 如果获取不到锁,那么就阻塞当前线程给定的时间,如果时间到了再次任务还没完成则抛出异常。
            if (!tryAcquireSharedNanos(0, nanosTimeout))
                throw new TimeoutException();
            if (getState() == CANCELLED)
                throw new CancellationException();
            if (exception != null)
                throw new ExecutionException(exception);
            return result;
        }


        void innerSet(V v) {
            for (;;) {
                int s = getState();
                if (s == RAN)
                    return;
                if (s == CANCELLED) {
                    // aggressively release to set runner to null,
                    // in case we are racing with a cancel request
                    // that will try to interrupt runner
                    releaseShared(0);
                    return;
                }
                //正常完成 设置状态为RAN
                if (compareAndSetState(s, RAN)) {
                    result = v;
                    releaseShared(0);
                    done(); //通知子类
                    return;
                }
            }
        }

        void innerSetException(Throwable t) {
            for (;;) {
                int s = getState();
                if (s == RAN)
                    return;
                if (s == CANCELLED) {
                    // aggressively release to set runner to null,
                    // in case we are racing with a cancel request
                    // that will try to interrupt runner
                    releaseShared(0);
                    return;
                }
                //设置异常
                if (compareAndSetState(s, RAN)) {
                    exception = t;
                    releaseShared(0);
                    done();//通知子类
                    return;
                }
            }
        }

        //取消任务
        boolean innerCancel(boolean mayInterruptIfRunning) {
            for (;;) {
                int s = getState();
                //如果任务已经结束,则返回FALSE
                if (ranOrCancelled(s))
                    return false;
                //设置任务的状态为CANCELLED
                if (compareAndSetState(s, CANCELLED))
                    break;
            }
            //如果参数mayInterruptIfRunning=TRUE,那么设置线程的终端状态
            if (mayInterruptIfRunning) {
                Thread r = runner;
                if (r != null)
                    r.interrupt();
            }
            //释放锁
            releaseShared(0);
            //调用子类方法,通知状态改变
            done();
            return true;
        }

        void innerRun() {
            //如果任务不是初始状态则直接结束
            if (!compareAndSetState(READY, RUNNING))
                return;

            runner = Thread.currentThread();
            if (getState() == RUNNING) { // recheck after setting thread
                V result;
                try {
                    result = callable.call();
                } catch (Throwable ex) {
                    //我们写的任务方法里如果出现异常则调用setException
                    setException(ex);
                    return;
                }
                //设置结果
                set(result);
            } else {
                //释放锁
                releaseShared(0); // cancel
            }
        }

        boolean innerRunAndReset() {
            if (!compareAndSetState(READY, RUNNING))
                return false;
            try {
                runner = Thread.currentThread();
                if (getState() == RUNNING)
                    callable.call(); // don't set result
                runner = null;
                return compareAndSetState(RUNNING, READY);
            } catch (Throwable ex) {
                setException(ex);
                return false;
            }
        }
    }
}

http://m.blog.csdn.net/article/details?id=46293927
分享到:
评论

相关推荐

    futuretask源码分析(推荐)

    主要介绍了futuretask源码分析(推荐),小编觉得还是挺不错的,这里给大家分享下,供各位参考。

    性能极限漂移特技,只有头发少的人才知道.docx

    1. **FutureTask源码分析**:深入理解`FutureTask`的工作原理,包括状态管理、执行流程、取消机制等。 2. **手动实现FutureTask**:尝试自己动手实现一个简易版的`FutureTask`类,加深对其内部机制的理解。 3. **...

    Java并发包源码分析(JDK1.8)

    Java并发包源码分析(JDK1.8):囊括了java.util.concurrent包中大部分类的源码分析,其中涉及automic包,locks包(AbstractQueuedSynchronizer、ReentrantLock、ReentrantReadWriteLock、LockSupport等),queue...

    并发编程、juc工具包源码分析笔记

    FutureTask&lt;Integer&gt; task3 = new FutureTask(() -&gt; { log.debug("hello"); return 100; }); new Thread(task3, "t3").start(); // 主线程阻塞,等待 task3 执行完毕并获取结果 Integer result = task3.get(); ...

    java8集合源码分析-Notes:笔记

    集合源码分析 [TOC] 0. 项目构建 0.1 版本控制 0.1.1 Git 0.2 项目管理 0.2.1 Maven 0.2.2 Gradle 1.:hot_beverage: Java 1.1 Java基础 1.1.1 算法与数据结构 字符串KMP算法 BitSet解决数据重复和是否存在等问题 ...

    Android AsyncTask源码分析

    - 结果通过`FutureTask`返回,`FutureTask`实现了`RunnableFuture`接口,能够管理和控制异步计算结果。 - 当`FutureTask`完成时,`done()`方法会被调用,进一步调用`postResultIfNotInvoked(get())`,将结果通过...

    每秒22W笔订单交易,T5大师直击京东平台性能调优

    视频讲解: 1,京东APP界面的接口如何正确调用; 2,从并发编程角度来提高系统性能;...4,从Futuretask类源码分析到手写; 5,京东平台如何提升Web项目吞吐量; 6,互联网职业生涯,你问我来答; 7,互动答疑。

    java源码学习比较java源码学习比较

    比如,对比分析多个并发模型的实现(如线程池、FutureTask和CompletableFuture),既能理解并发编程的核心概念,也能掌握实际应用技巧。 最后,参与开源项目或阅读其他开发者提交的代码也是一种提升方式。这不仅...

    java concurrent 精简源码

    这个“java concurrent 精简源码”资源很可能会包含上述概念的实际应用示例,通过学习和分析这些代码,你可以深入理解Java并发编程的精髓,并能更好地应用于实际项目中。在研究时,建议结合Java官方文档和相关的书籍...

    Android游戏开发之多线程的操作方式源码

    在Android游戏开发中,多线程的应用至关重要,它能让游戏运行更加流畅,避免因为主线程(UI线程)被阻塞而引发的ANR...通过分析和实践这些源码,开发者不仅能掌握多线程的使用,还能提升对Android游戏开发的整体理解。

    JAVA多线程设计模式 书和源码

    本书“JAVA多线程设计模式”深入探讨了如何在Java环境中有效地利用多线程,结合源码分析,为开发者提供了宝贵的实践经验。 一、线程基础 Java中的线程是程序执行的最小单元,由Java虚拟机(JVM)管理。通过`Thread`...

    Callable接口源码阅读1

    下面是Callable接口的详细分析: 1. **接口概述** Callable接口是Java中标准库的一部分,位于`java.util.concurrent`包下。它是一个泛型接口,允许我们在异步任务完成后获取一个类型化的结果。这与Runnable接口...

    《实战java高并发程序设计》源码 完整版 共10章 葛一鸣( 不是网友自己写的5章那种)

    源码中可能会使用FutureTask和Executor框架进行异步编程,实现任务的提交和结果获取。 6. **并发编程模式**:如生产者消费者模型、工作窃取、双端队列等并发编程模式在源码中得以体现,帮助我们理解如何设计高效的...

    JUC+课程源码+线程操作

    在`juc_atguigu`这个压缩包中,包含了上述组件的示例代码,通过实际运行和分析这些代码,你可以深入理解JUC的工作原理和使用场景。同时,这些示例也能帮助你学习如何在实际项目中有效地利用JUC来提高并发程序的性能...

    AsyncTask 源码解析

    本文将深入探讨 AsyncTask 的内部实现原理、工作流程以及关键代码分析。 首先,AsyncTask 有三个泛型参数,分别代表输入参数类型(Params)、进度更新参数类型(Progress)和结果参数类型(Result)。通过这些参数...

    thread源码java-JavaThreadCore:《Java多线程核心计数》源码笔记

    `JavaThreadCore`这个项目,基于《Java多线程核心计数》一书,提供了深入理解和实践Java线程的源码分析。通过这份源码笔记,我们可以深入探讨Java线程的内部机制,包括线程的创建、调度、同步和通信等方面的知识。 ...

    java高并发源码-java-concurrent:Java高并发,JUC,相关源码。1、马士兵高并发视频源码(听课时练习)

    在"java-concurrent-master"这个项目中,我们可以期待看到以上各种并发特性的实际应用和源码分析,这将帮助我们深入理解Java并发编程的原理和实践技巧。通过马士兵老师的视频源码,我们可以跟随他的讲解,一步步地...

    多线程操作实例源码

    本实例源码主要聚焦于多线程操作,旨在通过具体的代码示例来帮助开发者理解和掌握多线程的使用。 在Java等支持多线程的编程语言中,创建和管理线程主要有以下几种方式: 1. 继承Thread类:这是最基础的创建线程的...

Global site tag (gtag.js) - Google Analytics