`
Donald_Draper
  • 浏览: 979797 次
社区版块
存档分类
最新评论

FutureTask解析

    博客分类:
  • JUC
阅读更多
package java.util.concurrent;
import java.util.concurrent.locks.*;

/**
 * A cancellable asynchronous computation.  This class provides a base
 * implementation of {@link Future}, with methods to start and cancel
 * a computation, query to see if the computation is complete, and
 * retrieve the result of the computation.  The result can only be
 * retrieved when the computation has completed; the <tt>get</tt>
 * method will block if the computation has not yet completed.  Once
 * the computation has completed, the computation cannot be restarted
 * or cancelled.
 *
 FutureTask是一个可取消的异步计算任务,并提供了基于Future接口实现的开始和取消
计算任务,查看计算任务状态和在计算任务结束后获取结果的方法。计算任务的结果,只有
在计算任务完成时,才能取得,如果计算任务还没完成,将会阻塞。只要计算任务完成,
计算任务就不能被取消或重新启动。
 * <p>A <tt>FutureTask</tt> can be used to wrap a {@link Callable} or
 * {@link java.lang.Runnable} object.  Because <tt>FutureTask</tt>
 * implements <tt>Runnable</tt>, a <tt>FutureTask</tt> can be
 * submitted to an {@link Executor} for execution.
 *
FutureTask可用于包装Callable和Runnable对象。由于FutureTask实现了Runnable接口,
所有可以被调到执行器,执行。
 * <p>In addition to serving as a standalone class, this class provides
 * <tt>protected</tt> functionality that may be useful when creating
 * customized task classes.
 *
 当我们创建任务线程类时为单独的类(独立任务),FutureTask的protect功能方法也许有用。

 * @since 1.5
 * @author Doug Lea
 * @param <V> The result type returned by this FutureTask's <tt>get</tt> method
 */
public class FutureTask<V> implements RunnableFuture<V> {
    /** Synchronization control for FutureTask 用于控制FutureTask的同步器*/
    private final Sync sync;
        /**
     * Creates a <tt>FutureTask</tt> that will, upon running, execute the
     * given <tt>Callable</tt>.
     *
     创建一个FutureTask,在执行时,将会执行参数Callable
     * @param  callable the callable task
     * @throws NullPointerException if callable is null
     */
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        sync = new Sync(callable);
    }
    /**
     * Creates a <tt>FutureTask</tt> that will, upon running, execute the
     * given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return the
     * given result on successful completion.
     *
     直接通过Executors执行任务,并将结果保存到result中
     * @param runnable the runnable task
     * @param result the result to return on successful completion. If
     * you don't need a particular result, consider using
     * constructions of the form:
     * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
     * @throws NullPointerException if runnable is null
     */
    public FutureTask(Runnable runnable, V result) {
        sync = new Sync(Executors.callable(runnable, result));
    }
}

FutureTask内部关联着一个同步器Sync,主要用于控制cancel,get等操作。
我们来看一下内部同步器Sync:
  /**
     * Synchronization control for FutureTask. Note that this must be
     * a non-static inner class in order to invoke the protected
     * <tt>done</tt> method. For clarity, all inner class support
     * methods are same as outer, prefixed with "inner".
     *
     控制FutureTask的同步器,由于需要调用protected的done方法,所以类必须定义为
     非静态内部类。为了清晰起见,所有内部类支持的方法,与外部类FutureTask一样,只不过
     添加了inner最为前缀。
     * Uses AQS sync state to represent run status
     */
    private final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -7828117401763700385L;

        /** State value representing that task is ready to run  准备就绪*/
        private static final int READY     = 0;
        /** State value representing that task is running 正在运行*/
        private static final int RUNNING   = 1;
        /** State value representing that task ran 运行完*/
        private static final int RAN       = 2;
        /** State value representing that task was cancelled 取消*/
        private static final int CANCELLED = 4;

        /** The underlying callable 执行线程callable*/
        private final Callable<V> callable;
        /** The result to return from get() 结果*/
        private V result;
        /** The exception to throw from get() get方法抛出的异常*/
        private Throwable exception;

        /**
         * The thread running task. When nulled after set/cancel, this
         * indicates that the results are accessible.  Must be
         * volatile, to ensure visibility upon completion.
	 线程用于执行任务。在set/cancel操作后为null,预示着任务结果可用,
	 变量必须volatile,以保证在任务执行完时,结果的可见性。
         */
        private volatile Thread runner;
        //构造任务同步器
        Sync(Callable<V> callable) {
            this.callable = callable;
        }
	//返回任务执行的状态,是运行完还是取消
	 private boolean ranOrCancelled(int state) {
            return (state & (RAN | CANCELLED)) != 0;
        }
	//是否执行结束,如果任务运行完或取消,且运行任务线程为null,即任务结束
	 boolean innerIsDone() {
            return ranOrCancelled(getState()) && runner == null;
        }
	 /**
         * Implements AQS base acquire to succeed if ran or cancelled
	 任务运行完或取消,则尝试获取共享锁成功。
         */
        protected int tryAcquireShared(int ignore) {
            return innerIsDone() ? 1 : -1;
        }
        /**
         * Implements AQS base release to always signal after setting
         * final done status by nulling runner thread.
	 在通过设置运行任务线程为null,设置任务线程状态为结束时,释放共享锁
         */
        protected boolean tryReleaseShared(int ignore) {
            runner = null;
            return true;
        }
	//判断任务状态是否为取消
	 boolean innerIsCancelled() {
            return getState() == CANCELLED;
        }
	//获取任务执行结果
        V innerGet() throws InterruptedException, ExecutionException {
	     //这个我们在AQS篇章中有讲,这里不再说
	    //如果任务线程执行结束,如果状态为取消,则抛出CancellationException
            acquireSharedInterruptibly(0);
            if (getState() == CANCELLED)
                throw new CancellationException();
            if (exception != null)
                throw new ExecutionException(exception);
	    //否则任务线程运行完,返回结果
            return result;
        }
	//超时获取任务执行结果,这个与get方法不同是,超时等待任务线程执行结束
	 V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
            if (!tryAcquireSharedNanos(0, nanosTimeout))
                throw new TimeoutException();
            if (getState() == CANCELLED)
                throw new CancellationException();
            if (exception != null)
                throw new ExecutionException(exception);
            return result;
        }
}

Sync主要用于控制FutureTask的运行状态,状态一共有4中,准备就绪READY,
正在运行RUNNING,运行完RAN,取消CANCELLED;任务线程结束可能有两个原因
,运行完RAN或取消CANCELLED。Sync内部有一个线程runner用于执行任务,当任务线程执行结束时,runner为null。
回到FutureTask
 public boolean isCancelled() {
        return sync.innerIsCancelled();
    }
    public boolean isDone() {
        return sync.innerIsDone();
    }
    /**
     * @throws CancellationException {@inheritDoc}
     */
    public V get() throws InterruptedException, ExecutionException {
        return sync.innerGet();
    }
    /**
     * @throws CancellationException {@inheritDoc}
     */
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return sync.innerGet(unit.toNanos(timeout));
    }

从上面可以看出FutureTask的isCancelled,isDone,get和超时get方法是直接委托给
内部同步器Sync的相应方法。
再看其他方法先看取消
//FutureTask
   public boolean cancel(boolean mayInterruptIfRunning) {
        //委托给内部同步器
        return sync.innerCancel(mayInterruptIfRunning);
    }

//Sync
 boolean innerCancel(boolean mayInterruptIfRunning) {
            //自旋设置任务线程运行状态为CANCELLED
            for (;;) {
                int s = getState();
                if (ranOrCancelled(s))
		    //如果任务已经执行结束,则返回false,不可取消
                    return false;
		    //AQS设置任务线程运行状态为CANCELLED
                if (compareAndSetState(s, CANCELLED))
                    break;
            }
	    //如果任务处于运行状态可以中断,任务运行线程不为null,则中断任务运行线程
            if (mayInterruptIfRunning) {
                Thread r = runner;
                if (r != null)
                    r.interrupt();
            }
	    //释放锁
            releaseShared(0);
            //做一些任务执行结束工作
            done();
            return true;
        }


//FutureTask

  /**
     * Protected method invoked when this task transitions to state
     * <tt>isDone</tt> (whether normally or via cancellation). The
     * default implementation does nothing.  Subclasses may override
     * this method to invoke completion callbacks or perform
     * bookkeeping. Note that you can query status inside the
     * implementation of this method to determine whether this task
     * has been cancelled.
     无论任务线程取消还是正常运行结束,只要线程的isDone状态为true,则调用
     此方法。默认实现不做任务事情,留给子类扩展。子类可以重写此方法,用于
     在执行完成时,调用回调接口或者执行记录工作。同时可以在 此方法中确认
     任务线程是否被取消。
     */
    protected void done() { }

从上来看取消操作,首先自旋设置任务线程运行状态为CANCELLED,
如果任务处于运行状态可以中断,任务运行线程不为null,则中断任务运行线程,
释放锁,做一些任务执行结束工作(默认为空)。

再来看run
    // The following (duplicated) doc comment can be removed once
    //
    // 6270645: Javadoc comments should be inherited from most derived
    //          superinterface or superclass
    // is fixed.
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    public void run() {
        //委托给内部同步器
        sync.innerRun();
    }

//Sync
 void innerRun() {
           //如果任务线程处理就绪状态,则设置为运行状态,否则返回
            if (!compareAndSetState(READY, RUNNING))
                return;
            runner = Thread.currentThread();
            if (getState() == RUNNING) { // recheck after setting thread
                V result;
                try {
		    //执行callable
                    result = callable.call();
                } catch (Throwable ex) {
		    //设置执行异常
                    setException(ex);
                    return;
                }
		//设置结果
                set(result);
            } else {
                releaseShared(0); // cancel
            }
        }

分别来看设置执行异常和设置结果

//设置结果
set(result);
//FutureTask
    /**
     * Sets the result of this Future to the given value unless
     * this future has already been set or has been cancelled.
     * This method is invoked internally by the <tt>run</tt> method
     * upon successful completion of the computation.
     如果结果已经被设值或任务线程被取消,则设置失败。此方在run方法成功
     完成任务时,调用。
     * @param v the value
     */
    protected void set(V v) {
        sync.innerSet(v);
    }

//Sync
void innerSet(V v) {
            for (;;) {
                int s = getState();
		//如果任务运行完,则返回
                if (s == RAN)
                    return;
                if (s == CANCELLED) {
                    // aggressively release to set runner to null,
                    // in case we are racing with a cancel request
                    // that will try to interrupt runner
                    releaseShared(0);
                    return;
                }
		//设置任务运行状态为RAN,并设值
                if (compareAndSetState(s, RAN)) {
                    result = v;
                    releaseShared(0);
                    done();
                    return;
                }
            }
        }


//设置执行异常
setException(ex);
//FutureTask

    /**
     * Causes this future to report an <tt>ExecutionException</tt>
     * with the given throwable as its cause, unless this Future has
     * already been set or has been cancelled.
     * This method is invoked internally by the <tt>run</tt> method
     * upon failure of the computation.
     * @param t the cause of failure
     设置执行异常
     */
    protected void setException(Throwable t) {
        sync.innerSetException(t);
    }

//Sync 这一步与innerSet相似,不在说
void innerSetException(Throwable t) {
            for (;;) {
                int s = getState();
                if (s == RAN)
                    return;
                if (s == CANCELLED) {
                    // aggressively release to set runner to null,
                    // in case we are racing with a cancel request
                    // that will try to interrupt runner
                    releaseShared(0);
                    return;
                }
                if (compareAndSetState(s, RAN)) {
                    exception = t;
                    releaseShared(0);
                    done();
                    return;
                }
            }
        }

再看runAndReset
 
 /**
     * Executes the computation without setting its result, and then
     * resets this Future to initial state, failing to do so if the
     * computation encounters an exception or is cancelled.  This is
     * designed for use with tasks that intrinsically execute more
     * than once.
     此方的功能如果任务线程正在运行,并且没有设置结果,可以重新设置任务线程为
     就绪状态,如任务线程运行异常或取消,则重置失败。这个用于任务需要多次执行的场景。
     * @return true if successfully run and reset
     */
    protected boolean runAndReset() {
	//委托给内部同步器
        return sync.innerRunAndReset();
    }

//Sync
 
 boolean innerRunAndReset() {
            //如果任务线程处于从READY切换到RUNNING失败,则返回false,即任务线程不处于就绪状态
            if (!compareAndSetState(READY, RUNNING))
                return false;
            try {
                runner = Thread.currentThread();
                if (getState() == RUNNING)
		    //如果任务线程正在运行,调用callable
                    callable.call(); // don't set result
                runner = null;//重置任务线程为null
                return compareAndSetState(RUNNING, READY);//重置任务线程从RUNNING到READY
            } catch (Throwable ex) {
                setException(ex);
                return false;
            }
        }


总结:
FutureTask内部关联一个同步器Sync,Sync主要用于控制FutureTask的运行状态,状态一共有4中,准备就绪READY,正在运行RUNNING,运行完RAN,取消CANCELLED;任务线程结束可能有两个原因,运行完RAN或取消CANCELLED。Sync内部有一个线程runner用于执行任务,当任务线程执行结束时,runner为null。取消操作,首先自旋设置任务线程运行状态为CANCELLED,如果任务处于运行状态可以中断,任务运行线程不为null,则中断任务运行线程,释放锁,做一些任务执行结束工作(默认为空)。FutureTask的相关操作主要通过Sync来完成。
/**
 * A {@link Future} that is {@link Runnable}. Successful execution of
 * the <tt>run</tt> method causes completion of the <tt>Future</tt>
 * and allows access to its results.
 * @see FutureTask
 * @see Executor
 * @since 1.6
 * @author Doug Lea
 * @param <V> The result type returned by this Future's <tt>get</tt> method
 */
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}
0
0
分享到:
评论
1 楼 hero001 2018-07-09  
     方式                

相关推荐

    java多线程编程同步器Future和FutureTask解析及代码示例

    Java多线程编程中,`Future` 和 `FutureTask` 是两种重要的同步工具,它们用于管理异步计算的结果。在Java并发编程中,通常我们会使用`ExecutorService`来提交任务,而`Future`和`FutureTask`就是与这些任务交互的...

    FutureTask:FutureTask原始解析与重组-源码解析

    FutureTask原始码解析 一,FutureTask是什么? FutureTask是可取消的异步的计算任务,它可以通过线程池和线程对象执行,一般来说是FutureTask用于耗时的计算。 二,FutureTask继承图 三,未来任务源码 FutureTask的...

    Java FutureTask类使用案例解析

    Java FutureTask类使用案例解析 Java FutureTask类是一种异步计算的工具,用于执行长时间的任务并获取结果。它实现了Runnable和Future接口,既可以作为一个Runnable对象提交给Executor执行,也可以作为一个Future...

    揭密FutureTask.docx

    4. 核心方法解析 - `run()`:这是FutureTask实际执行的任务,调用了Callable的`call()`方法,同时更新状态和结果。 - `get()`和`get(long timeout, TimeUnit unit)`:这两个方法会阻塞当前线程,直到任务完成。如果...

    Java 多线程与并发(17-26)-JUC线程池- FutureTask详解.pdf

    #### 四、FutureTask源码解析 ##### 4.1 Callable接口 `Callable`是一个泛型接口,其泛型`V`指定了`call`方法返回的类型。与`Runnable`接口不同,`Callable`不仅能够执行任务,还能返回计算结果,并且可以抛出异常...

    Java分布式应用学习笔记05多线程下的并发同步器

    示例代码解析 下面是对文章中给出的示例代码的详细分析: ```java package threadConcurrent.test; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util....

    Java中高级核心知识全面解析

    11. **并发编程**:深入理解并发容器如BlockingQueue、FutureTask,以及并发工具类CountDownLatch、CyclicBarrier、Phaser等的使用。 12. **网络编程**:理解TCP/IP协议,了解Socket编程,以及HTTP协议的工作原理,...

    美团动态线程池实践思路开源项目(DynamicTp),线程池源码解析及通知告警篇.doc

    美团动态线程池实践思路开源项目(DynamicTp)线程池源码解析及通知告警篇 本文详细介绍了美团动态线程池实践思路开源项目(DynamicTp)的通知告警模块,该模块提供了多种通知告警功能,每一个通知项都可以独立配置...

    Java并发编程原理与实战

    提前完成任务之FutureTask使用.mp4 Future设计模式实现(实现类似于JDK提供的Future).mp4 Future源码解读.mp4 ForkJoin框架详解.mp4 同步容器与并发容器.mp4 并发容器CopyOnWriteArrayList原理与使用.mp4 并发容器...

    AsyncTask使用及源码解析

    ExecutorService负责调度和执行任务,FutureTask则提供了异步计算的结果。AsyncTask通过内部的SerialExecutor保证了同一时间只有一个任务在执行,实现了任务的串行化。 此外,Android的AsyncTask有版本差异,从...

    AsyncTask 源码解析

    3. `FutureTask` 类:实现了 Future 接口,用于保存任务的结果并控制任务的执行状态。 4. `InternalHandler` 类:内部 Handler 实现,用于在 UI 线程中分发消息。 通过深入理解这些细节,开发者可以更好地掌握如何...

    Java常见2023年最新面试题,附答案解析

    本篇将基于2023年的最新趋势,解析Java面试中的常见问题及答案。 1. **Java基础** - **面向对象**:理解类、对象、封装、继承、多态的概念。 - **异常处理**:熟悉try-catch-finally语句块,了解Checked与...

    上海某大厂java面试真题与解析

    ### 上海某大厂Java面试真题与解析 #### 并发编程三大核心:原子性、可见性与有序性 **原子性**是指一个或多个操作作为一个不可分割的整体执行,这意味着这些操作要么全部成功,要么全部失败,在执行过程中不允许...

    Java多线程与并发系列22道高频面试题(附思维导图和答案解析)

    Java中实现多线程有四种方法:继承Thread类、实现Runnable接口、实现Callable接口通过FutureTask包装器来创建Thread线程、使用ExecutorService、Callable、Future实现有返回结果的多线程。 二、停止一个正在运行的...

    Java多线程并发实战

    ### Java多线程并发实战知识点解析 #### 一、引言 在计算机科学领域,**多线程**和**并发**技术是现代软件开发中不可或缺的一部分。随着处理器核心数量的增加,利用多线程和并发可以显著提高应用程序的性能和响应...

    rxjava常用操作符

    FutureTask&lt;String&gt; futureTask = new FutureTask(new Callable() { @Override public String call() throws Exception { // 异步操作 return "hihi"; } }); Scheduler.Worker worker = Schedulers.io()....

    java线程池实例

    - 使用`Future`和`FutureTask`获取任务结果,便于管理和同步。 - 避免长时间阻塞的操作,以免阻塞工作线程,影响线程池效率。 在实际开发中,Java还提供了一些预定义的线程池,如`Executors.newFixedThreadPool(int...

    Android AsyncTask 完美解析 看不懂源码你就输了

    1.简介 android.os.AsyncTask,一个执行异步操作的类,我们可以使用它来处理后台任务,并且在UI线程中... 例如{@link Executor},{@ link ThreadPoolExecutor}和{@link FutureTask}。 2.基本使用 2.1 关键API andro

    Java多线程Callable接口实现代码示例

    Java 多线程 Callable 接口实现代码示例 Java 多线程编程中,创建线程有多种方式,其中一种便...感兴趣的朋友可以继续参阅本站的其他文章,如《浅谈 Java 面向接口编程》、《Java 编程接口回调一般用法代码解析》等。

    13-Java并发编程学习宝典.zip

    6. **Future模式** - "31 凭票取餐—Future模式详解-慕课专栏.html":讲解了`Future`接口和`FutureTask`类,它们用于获取异步任务的结果。 7. **Master-Slave模式** - "36 为多线程们安排一位经理—Master-Slave...

Global site tag (gtag.js) - Google Analytics