Jdk1.6 JUC源码解析(16)-FutureTask
作者:大飞
- FutureTask是一种异步任务(或异步计算),举个栗子,主线程的逻辑中需要使用某个值,但这个值需要复杂的运算得来,那么主线程可以提前建立一个异步任务来计算这个值(在其他的线程中计算),然后去做其他事情,当需要这个值的时候再通过刚才建立的异步任务来获取这个值,有点并行的意思,这样可以缩短整个主线程逻辑的执行时间。
- FutureTask也是基于AQS来构建的,使用共享模式,使用AQS的状态来表示异步任务的运行状态。
- 先来看下FutureTask都实现了哪些接口。首先实现了RunnableFuture接口,先看下这个接口:
public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); }
RunnableFuture又扩展了Runnable和Future:
public interface Runnable { /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see java.lang.Thread#run() */ public abstract void run(); }
public interface Future<V> { /** * 尝试取消任务的执行,如果任务已经完成或者已经被取消或者由于某种原因 * 无法取消,方法返回false。如果任务取消成功,或者任务开始执行之前调用 * 了取消方法,那么任务就永远不会执行了。mayInterruptIfRunning参数决定 * 了是否要中断执行任务的线程。 */ boolean cancel(boolean mayInterruptIfRunning); /** * 判断任务是否在完成之前被取消。 */ boolean isCancelled(); /** * 判断任务是否完成。 */ boolean isDone(); /** * 等待,直到获取任务的执行结果。如果任务还没执行完,这个方法会阻塞。 */ V get() throws InterruptedException, ExecutionException; /** * 等待,在给定的时间内获取任务的执行结果。 */ V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
- 接下来看下FutureTask的实现,由于其基于AQS实现,那先看一下内部的同步机制:
private final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -7828117401763700385L; /** 表示任务正在执行 */ private static final int RUNNING = 1; /** 表示任务已经运行完毕 */ private static final int RAN = 2; /** 表示任务被取消 */ private static final int CANCELLED = 4; /** 内部的callable */ private final Callable<V> callable; /** 执行结果 */ private V result; /** 执行过程中发生的异常 */ private Throwable exception; /** * 执行当前任务的线程。在set/cancel之后置空,说明可以 * 了。必须使用volatile来修饰,以确保任务完成后的可见性。 */ private volatile Thread runner; Sync(Callable<V> callable) { this.callable = callable; }
内部同步器接中使用了一个callable来保存要执行的任务,看下这个接口:
public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
继续看下同步器的innerRun方法,这个方法用于支持FutureTask的run方法:
void innerRun() { //尝试设置任务运行状态为正在执行。 if (!compareAndSetState(0, RUNNING)) return; //如果设置失败,直接返回。 try { runner = Thread.currentThread(); //设置执行线程。 if (getState() == RUNNING) //再次检测任务状态 innerSet(callable.call()); //执行任务,然后设置执行结果。 else releaseShared(0); //说明任务已取消。 } catch (Throwable ex) { innerSetException(ex); //如果执行任务过程中发生异常,设置异常。 } }
看下设置执行结果的innerSet方法:
void innerSet(V v) { for (;;) { int s = getState(); //获取任务执行状态。 if (s == RAN) return; //如果任务已经执行完毕,退出。 if (s == CANCELLED) { //这里释放AQS控制权并设置runner为null, //为了避免正在和一个试图中断线程的取消请求竞 releaseShared(0); return; } //尝试将任务状态设置为执行完成。 if (compareAndSetState(s, RAN)) { result = v; //设置执行结果。 releaseShared(0); //释放AQS控制权。 done(); //这里调用一下done方法,子类可覆盖这个方法,做一些定制处理。 return; } } }
AQS分析过,releaseShared方法中会调用tryReleaseShared方法,看一下当前同步器中这个方法的实现:
protected boolean tryReleaseShared(int ignore) { runner = null; return true; }
innerRun方法中在执行抛异常后会调用innerSetException:
void innerSetException(Throwable t) { for (;;) { int s = getState(); if (s == RAN) return; if (s == CANCELLED) { // aggressively release to set runner to null, // in case we are racing with a cancel request // that will try to interrupt runner releaseShared(0); return; } if (compareAndSetState(s, RAN)) { exception = t; result = null; releaseShared(0); done(); return; } } }
和innerRun类似还有innerRunAndReset方法,看下实现:
boolean innerRunAndReset() { if (!compareAndSetState(0, RUNNING)) return false; try { runner = Thread.currentThread(); if (getState() == RUNNING) callable.call(); // don't set result runner = null; return compareAndSetState(RUNNING, 0); } catch (Throwable ex) { innerSetException(ex); return false; } }
再看下同步器的innerGet方法,这个方法用于支持FutureTask的get方法:
V innerGet() throws InterruptedException, ExecutionException { //获取共享锁,无法获取时阻塞等待。 acquireSharedInterruptibly(0); if (getState() == CANCELLED) throw new CancellationException(); //如果任务状态为取消,那么抛出CancellationException if (exception != null) throw new ExecutionException(exception);//如果任务执行异常,抛出ExecutionException,并传递异常。 return result; //成功执行完成,返回执行结果。 }
类似的有带超时的innerGet:
V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException { if (!tryAcquireSharedNanos(0, nanosTimeout)) throw new TimeoutException(); if (getState() == CANCELLED) throw new CancellationException(); if (exception != null) throw new ExecutionException(exception); return result; }
AQS分析过,acquireSharedInterruptibly和tryAcquireSharedNanos方法中会调用tryAcquireShared方法,看一下当前同步器中这个方法的实现:
protected int tryAcquireShared(int ignore) { return innerIsDone()? 1 : -1; } boolean innerIsDone() { return ranOrCancelled(getState()) && runner == null; } private boolean ranOrCancelled(int state) { return (state & (RAN | CANCELLED)) != 0; }
最后看下同步器的innerCancel方法,这个方法用于支持FutureTask的cancel方法:
boolean innerCancel(boolean mayInterruptIfRunning) { for (;;) { int s = getState(); if (ranOrCancelled(s)) return false; //如果任务已经执行完毕或者取消。 if (compareAndSetState(s, CANCELLED))//否则尝试设置任务状态为取消。 break; } if (mayInterruptIfRunning) { Thread r = runner; if (r != null) r.interrupt(); //如果设置了mayInterruptIfRunning为true,需要中断线程, } releaseShared(0); //释放AQS的控制权。 done(); //这里也会调用done,定制子类时需要注意下。 return true; }
- 有了内部同步机制,FutureTask的实现起来就很容易了,看下代码:
public class FutureTask<V> implements RunnableFuture<V> { /** 内部同步器 */ private final Sync sync; public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); sync = new Sync(callable); } public FutureTask(Runnable runnable, V result) { sync = new Sync(Executors.callable(runnable, result)); } public boolean isCancelled() { return sync.innerIsCancelled(); } public boolean isDone() { return sync.innerIsDone(); } public boolean cancel(boolean mayInterruptIfRunning) { return sync.innerCancel(mayInterruptIfRunning); } public V get() throws InterruptedException, ExecutionException { return sync.innerGet(); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return sync.innerGet(unit.toNanos(timeout)); } /** * 本类中是一个空实现,子类可以覆盖这个方法,做回调或一些记录工作。 * 可以来实现里面通过任务状态来判断任务是否被取消。 */ protected void done() { } protected void set(V v) { sync.innerSet(v); } protected void setException(Throwable t) { sync.innerSetException(t); } public void run() { sync.innerRun(); } protected boolean runAndReset() { return sync.innerRunAndReset(); } ...
实现都是基于上面分析过的方法,这里不啰嗦了。注意构造方法中有一个Runnable到callable的转换,使用了Executors中的方法,这个类后续会分析,这里简单看一下:
public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); } static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
FutureTask的代码解析完毕!
参见:Jdk1.6 JUC源码解析(6)-locks-AbstractQueuedSynchronizer
相关推荐
aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-...
2部分: jdk-1.6-windows-64-01 jdk-1.6-windows-64-02
1.okhttp3.8源码使用jdk1.6重新编译,已集成了okio,在javaweb项目中使用,未在安卓项目中使用 2.okhttp3.8源码使用jdk1.6重新编译_okhttp3.8.0-jdk1.6.jar
下载的压缩包文件"jdk-6u45-windows-x64(1.6 64).exe"是Windows 64位系统的安装程序。安装过程中,用户需要选择安装路径,并设置环境变量,包括`JAVA_HOME`指向JDK的安装目录,`PATH`添加JDK的bin目录,确保系统可以...
三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3
标题中的"jdk-jdk1.6.0.24-windows-i586.exe"是一个Java Development Kit(JDK)的安装程序,适用于Windows操作系统且为32位版本。JDK是Oracle公司提供的一个用于开发和运行Java应用程序的软件包。这个特定的版本,...
标题中的“jdk1.6集成jjwt的问题”指的是在Java Development Kit (JDK) 版本1.6的环境下,尝试整合JSON Web Token (JWT) 库jjwt时遇到的挑战。JWT是一种开放标准(RFC 7519),用于在各方之间安全地传输信息作为 ...
1. 解压缩"java-jdk1.6-jdk-6u45-windows-x64.zip"文件,这将释放出"jdk-6u45-windows-x64.exe"可执行文件。 2. 双击运行"jdk-6u45-windows-x64.exe",安装向导会引导你完成安装过程。通常,你需要选择安装路径,...
三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3
三部分: jdk-1.6-windows-32-1 jdk-1.6-windows-32-2 jdk-1.6-windows-32-3
三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3
### JDK1.6安装及与JDK-1.5版本共存 #### 一、前言 随着软件开发环境的变化和技术的进步,不同的项目可能需要不同的Java版本来支持其运行。例如,在某些特定环境下,可能既需要使用JDK1.5(Java Development Kit ...
logback-cfca-jdk1.6-3.1.0.0.jar
- 这可能是ZXing库的完整源码包,专门针对JDK1.6编译,包含了所有必要的源文件和资源,供开发者进行更深度的定制和集成。 总之,ZXing库是一个强大的条形码和二维码工具,这个特别适配JDK1.6的版本为那些仍在使用...
这个压缩包文件"jdk-6u45-linux-x64.zip"包含的是JDK 1.6.0_45(也被称为6u45或1.6u45)的64位Linux版本。JDK 1.6是Java平台标准版的一个重要版本,它提供了许多功能和性能改进,是许多企业级应用的基础。 JDK 1.6u...
jdk-1.6-linux-32-1 jdk-1.6-linux-32-2 jdk-1.6-linux-32-3
java环境搭建 jdk6(包含jre)64位 jdk-6u45-windows-x64
Java编程开发工具包,最新版本,很好用,经典
《深入解析JDK 1.6:构建高效Java开发环境》 JDK(Java Development Kit)是Oracle公司发布的用于开发Java应用程序的重要工具集,它为Java开发者提供了编译、调试和运行Java程序所需的环境。JDK 1.6,又称为Java SE...
Linux64位环境下的jdk6安装包:jdk-6u45-linux-x64.bin。 由于积分无法修改,现提供网盘下载地址: https://pan.baidu.com/s/1BE55ImTxZTQO6T22051P2g 提取码:5wvm