- 浏览: 987772 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
package java.util.concurrent; import java.util.concurrent.locks.*; /** * A cancellable asynchronous computation. This class provides a base * implementation of {@link Future}, with methods to start and cancel * a computation, query to see if the computation is complete, and * retrieve the result of the computation. The result can only be * retrieved when the computation has completed; the <tt>get</tt> * method will block if the computation has not yet completed. Once * the computation has completed, the computation cannot be restarted * or cancelled. * FutureTask是一个可取消的异步计算任务,并提供了基于Future接口实现的开始和取消 计算任务,查看计算任务状态和在计算任务结束后获取结果的方法。计算任务的结果,只有 在计算任务完成时,才能取得,如果计算任务还没完成,将会阻塞。只要计算任务完成, 计算任务就不能被取消或重新启动。 * <p>A <tt>FutureTask</tt> can be used to wrap a {@link Callable} or * {@link java.lang.Runnable} object. Because <tt>FutureTask</tt> * implements <tt>Runnable</tt>, a <tt>FutureTask</tt> can be * submitted to an {@link Executor} for execution. * FutureTask可用于包装Callable和Runnable对象。由于FutureTask实现了Runnable接口, 所有可以被调到执行器,执行。 * <p>In addition to serving as a standalone class, this class provides * <tt>protected</tt> functionality that may be useful when creating * customized task classes. * 当我们创建任务线程类时为单独的类(独立任务),FutureTask的protect功能方法也许有用。 * @since 1.5 * @author Doug Lea * @param <V> The result type returned by this FutureTask's <tt>get</tt> method */ public class FutureTask<V> implements RunnableFuture<V> { /** Synchronization control for FutureTask 用于控制FutureTask的同步器*/ private final Sync sync; /** * Creates a <tt>FutureTask</tt> that will, upon running, execute the * given <tt>Callable</tt>. * 创建一个FutureTask,在执行时,将会执行参数Callable * @param callable the callable task * @throws NullPointerException if callable is null */ public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); sync = new Sync(callable); } /** * Creates a <tt>FutureTask</tt> that will, upon running, execute the * given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return the * given result on successful completion. * 直接通过Executors执行任务,并将结果保存到result中 * @param runnable the runnable task * @param result the result to return on successful completion. If * you don't need a particular result, consider using * constructions of the form: * {@code Future<?> f = new FutureTask<Void>(runnable, null)} * @throws NullPointerException if runnable is null */ public FutureTask(Runnable runnable, V result) { sync = new Sync(Executors.callable(runnable, result)); } }
FutureTask内部关联着一个同步器Sync,主要用于控制cancel,get等操作。
我们来看一下内部同步器Sync:
/** * Synchronization control for FutureTask. Note that this must be * a non-static inner class in order to invoke the protected * <tt>done</tt> method. For clarity, all inner class support * methods are same as outer, prefixed with "inner". * 控制FutureTask的同步器,由于需要调用protected的done方法,所以类必须定义为 非静态内部类。为了清晰起见,所有内部类支持的方法,与外部类FutureTask一样,只不过 添加了inner最为前缀。 * Uses AQS sync state to represent run status */ private final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -7828117401763700385L; /** State value representing that task is ready to run 准备就绪*/ private static final int READY = 0; /** State value representing that task is running 正在运行*/ private static final int RUNNING = 1; /** State value representing that task ran 运行完*/ private static final int RAN = 2; /** State value representing that task was cancelled 取消*/ private static final int CANCELLED = 4; /** The underlying callable 执行线程callable*/ private final Callable<V> callable; /** The result to return from get() 结果*/ private V result; /** The exception to throw from get() get方法抛出的异常*/ private Throwable exception; /** * The thread running task. When nulled after set/cancel, this * indicates that the results are accessible. Must be * volatile, to ensure visibility upon completion. 线程用于执行任务。在set/cancel操作后为null,预示着任务结果可用, 变量必须volatile,以保证在任务执行完时,结果的可见性。 */ private volatile Thread runner; //构造任务同步器 Sync(Callable<V> callable) { this.callable = callable; } //返回任务执行的状态,是运行完还是取消 private boolean ranOrCancelled(int state) { return (state & (RAN | CANCELLED)) != 0; } //是否执行结束,如果任务运行完或取消,且运行任务线程为null,即任务结束 boolean innerIsDone() { return ranOrCancelled(getState()) && runner == null; } /** * Implements AQS base acquire to succeed if ran or cancelled 任务运行完或取消,则尝试获取共享锁成功。 */ protected int tryAcquireShared(int ignore) { return innerIsDone() ? 1 : -1; } /** * Implements AQS base release to always signal after setting * final done status by nulling runner thread. 在通过设置运行任务线程为null,设置任务线程状态为结束时,释放共享锁 */ protected boolean tryReleaseShared(int ignore) { runner = null; return true; } //判断任务状态是否为取消 boolean innerIsCancelled() { return getState() == CANCELLED; } //获取任务执行结果 V innerGet() throws InterruptedException, ExecutionException { //这个我们在AQS篇章中有讲,这里不再说 //如果任务线程执行结束,如果状态为取消,则抛出CancellationException acquireSharedInterruptibly(0); if (getState() == CANCELLED) throw new CancellationException(); if (exception != null) throw new ExecutionException(exception); //否则任务线程运行完,返回结果 return result; } //超时获取任务执行结果,这个与get方法不同是,超时等待任务线程执行结束 V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException { if (!tryAcquireSharedNanos(0, nanosTimeout)) throw new TimeoutException(); if (getState() == CANCELLED) throw new CancellationException(); if (exception != null) throw new ExecutionException(exception); return result; } }
Sync主要用于控制FutureTask的运行状态,状态一共有4中,准备就绪READY,
正在运行RUNNING,运行完RAN,取消CANCELLED;任务线程结束可能有两个原因
,运行完RAN或取消CANCELLED。Sync内部有一个线程runner用于执行任务,当任务线程执行结束时,runner为null。
回到FutureTask
public boolean isCancelled() { return sync.innerIsCancelled(); } public boolean isDone() { return sync.innerIsDone(); } /** * @throws CancellationException {@inheritDoc} */ public V get() throws InterruptedException, ExecutionException { return sync.innerGet(); } /** * @throws CancellationException {@inheritDoc} */ public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return sync.innerGet(unit.toNanos(timeout)); }
从上面可以看出FutureTask的isCancelled,isDone,get和超时get方法是直接委托给
内部同步器Sync的相应方法。
再看其他方法先看取消
//FutureTask
public boolean cancel(boolean mayInterruptIfRunning) { //委托给内部同步器 return sync.innerCancel(mayInterruptIfRunning); }
//Sync
boolean innerCancel(boolean mayInterruptIfRunning) { //自旋设置任务线程运行状态为CANCELLED for (;;) { int s = getState(); if (ranOrCancelled(s)) //如果任务已经执行结束,则返回false,不可取消 return false; //AQS设置任务线程运行状态为CANCELLED if (compareAndSetState(s, CANCELLED)) break; } //如果任务处于运行状态可以中断,任务运行线程不为null,则中断任务运行线程 if (mayInterruptIfRunning) { Thread r = runner; if (r != null) r.interrupt(); } //释放锁 releaseShared(0); //做一些任务执行结束工作 done(); return true; }
//FutureTask
/** * Protected method invoked when this task transitions to state * <tt>isDone</tt> (whether normally or via cancellation). The * default implementation does nothing. Subclasses may override * this method to invoke completion callbacks or perform * bookkeeping. Note that you can query status inside the * implementation of this method to determine whether this task * has been cancelled. 无论任务线程取消还是正常运行结束,只要线程的isDone状态为true,则调用 此方法。默认实现不做任务事情,留给子类扩展。子类可以重写此方法,用于 在执行完成时,调用回调接口或者执行记录工作。同时可以在 此方法中确认 任务线程是否被取消。 */ protected void done() { }
从上来看取消操作,首先自旋设置任务线程运行状态为CANCELLED,
如果任务处于运行状态可以中断,任务运行线程不为null,则中断任务运行线程,
释放锁,做一些任务执行结束工作(默认为空)。
再来看run
// The following (duplicated) doc comment can be removed once // // 6270645: Javadoc comments should be inherited from most derived // superinterface or superclass // is fixed. /** * Sets this Future to the result of its computation * unless it has been cancelled. */ public void run() { //委托给内部同步器 sync.innerRun(); }
//Sync
void innerRun() { //如果任务线程处理就绪状态,则设置为运行状态,否则返回 if (!compareAndSetState(READY, RUNNING)) return; runner = Thread.currentThread(); if (getState() == RUNNING) { // recheck after setting thread V result; try { //执行callable result = callable.call(); } catch (Throwable ex) { //设置执行异常 setException(ex); return; } //设置结果 set(result); } else { releaseShared(0); // cancel } }
分别来看设置执行异常和设置结果
//设置结果
set(result);
//FutureTask
/** * Sets the result of this Future to the given value unless * this future has already been set or has been cancelled. * This method is invoked internally by the <tt>run</tt> method * upon successful completion of the computation. 如果结果已经被设值或任务线程被取消,则设置失败。此方在run方法成功 完成任务时,调用。 * @param v the value */ protected void set(V v) { sync.innerSet(v); }
//Sync
void innerSet(V v) { for (;;) { int s = getState(); //如果任务运行完,则返回 if (s == RAN) return; if (s == CANCELLED) { // aggressively release to set runner to null, // in case we are racing with a cancel request // that will try to interrupt runner releaseShared(0); return; } //设置任务运行状态为RAN,并设值 if (compareAndSetState(s, RAN)) { result = v; releaseShared(0); done(); return; } } }
//设置执行异常
setException(ex);
//FutureTask
/** * Causes this future to report an <tt>ExecutionException</tt> * with the given throwable as its cause, unless this Future has * already been set or has been cancelled. * This method is invoked internally by the <tt>run</tt> method * upon failure of the computation. * @param t the cause of failure 设置执行异常 */ protected void setException(Throwable t) { sync.innerSetException(t); }
//Sync 这一步与innerSet相似,不在说
void innerSetException(Throwable t) { for (;;) { int s = getState(); if (s == RAN) return; if (s == CANCELLED) { // aggressively release to set runner to null, // in case we are racing with a cancel request // that will try to interrupt runner releaseShared(0); return; } if (compareAndSetState(s, RAN)) { exception = t; releaseShared(0); done(); return; } } }
再看runAndReset
/** * Executes the computation without setting its result, and then * resets this Future to initial state, failing to do so if the * computation encounters an exception or is cancelled. This is * designed for use with tasks that intrinsically execute more * than once. 此方的功能如果任务线程正在运行,并且没有设置结果,可以重新设置任务线程为 就绪状态,如任务线程运行异常或取消,则重置失败。这个用于任务需要多次执行的场景。 * @return true if successfully run and reset */ protected boolean runAndReset() { //委托给内部同步器 return sync.innerRunAndReset(); }
//Sync
boolean innerRunAndReset() { //如果任务线程处于从READY切换到RUNNING失败,则返回false,即任务线程不处于就绪状态 if (!compareAndSetState(READY, RUNNING)) return false; try { runner = Thread.currentThread(); if (getState() == RUNNING) //如果任务线程正在运行,调用callable callable.call(); // don't set result runner = null;//重置任务线程为null return compareAndSetState(RUNNING, READY);//重置任务线程从RUNNING到READY } catch (Throwable ex) { setException(ex); return false; } }
总结:
FutureTask内部关联一个同步器Sync,Sync主要用于控制FutureTask的运行状态,状态一共有4中,准备就绪READY,正在运行RUNNING,运行完RAN,取消CANCELLED;任务线程结束可能有两个原因,运行完RAN或取消CANCELLED。Sync内部有一个线程runner用于执行任务,当任务线程执行结束时,runner为null。取消操作,首先自旋设置任务线程运行状态为CANCELLED,如果任务处于运行状态可以中断,任务运行线程不为null,则中断任务运行线程,释放锁,做一些任务执行结束工作(默认为空)。FutureTask的相关操作主要通过Sync来完成。
/** * A {@link Future} that is {@link Runnable}. Successful execution of * the <tt>run</tt> method causes completion of the <tt>Future</tt> * and allows access to its results. * @see FutureTask * @see Executor * @since 1.6 * @author Doug Lea * @param <V> The result type returned by this Future's <tt>get</tt> method */ public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); }
发表评论
-
Executors解析
2017-04-07 14:38 1264ThreadPoolExecutor解析一(核心线程池数量、线 ... -
ScheduledThreadPoolExecutor解析三(关闭线程池)
2017-04-06 20:52 4466ScheduledThreadPoolExecutor解析一( ... -
ScheduledThreadPoolExecutor解析二(任务调度)
2017-04-06 12:56 2131ScheduledThreadPoolExecutor解析一( ... -
ScheduledThreadPoolExecutor解析一(调度任务,任务队列)
2017-04-04 22:59 4996Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析四(线程池关闭)
2017-04-03 23:02 9118Executor接口的定义:http: ... -
ThreadPoolExecutor解析三(线程池执行提交任务)
2017-04-03 12:06 6089Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析二(线程工厂、工作线程,拒绝策略等)
2017-04-01 17:12 3047Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析一(核心线程池数量、线程池状态等)
2017-03-31 22:01 20530Executor接口的定义:http://donald-dra ... -
ScheduledExecutorService接口定义
2017-03-29 12:53 1519Executor接口的定义:http://donald-dra ... -
AbstractExecutorService解析
2017-03-29 08:27 1090Executor接口的定义:http: ... -
ExecutorCompletionService解析
2017-03-28 14:27 1603Executor接口的定义:http://donald-dra ... -
CompletionService接口定义
2017-03-28 12:39 1076Executor接口的定义:http://donald-dra ... -
Future接口定义
2017-03-26 09:40 1203/* * Written by Doug Lea with ... -
ExecutorService接口定义
2017-03-25 22:14 1170Executor接口的定义:http://donald-dra ... -
Executor接口的定义
2017-03-24 23:24 1687package java.util.concurrent; ... -
简单测试线程池拒绝执行任务策略
2017-03-24 22:37 2037线程池多余任务的拒绝执行策略有四中,分别是直接丢弃任务Disc ... -
JAVA集合类简单综述
2017-03-23 22:51 930Queue接口定义:http://donald-draper. ... -
DelayQueue解析
2017-03-23 11:00 1742Queue接口定义:http://donald-draper. ... -
SynchronousQueue解析下-TransferQueue
2017-03-22 22:20 2147Queue接口定义:http://donald-draper. ... -
SynchronousQueue解析上-TransferStack
2017-03-21 22:08 3078Queue接口定义:http://donald-draper. ...
相关推荐
Java多线程编程中,`Future` 和 `FutureTask` 是两种重要的同步工具,它们用于管理异步计算的结果。在Java并发编程中,通常我们会使用`ExecutorService`来提交任务,而`Future`和`FutureTask`就是与这些任务交互的...
FutureTask原始码解析 一,FutureTask是什么? FutureTask是可取消的异步的计算任务,它可以通过线程池和线程对象执行,一般来说是FutureTask用于耗时的计算。 二,FutureTask继承图 三,未来任务源码 FutureTask的...
Java FutureTask类使用案例解析 Java FutureTask类是一种异步计算的工具,用于执行长时间的任务并获取结果。它实现了Runnable和Future接口,既可以作为一个Runnable对象提交给Executor执行,也可以作为一个Future...
4. 核心方法解析 - `run()`:这是FutureTask实际执行的任务,调用了Callable的`call()`方法,同时更新状态和结果。 - `get()`和`get(long timeout, TimeUnit unit)`:这两个方法会阻塞当前线程,直到任务完成。如果...
#### 四、FutureTask源码解析 ##### 4.1 Callable接口 `Callable`是一个泛型接口,其泛型`V`指定了`call`方法返回的类型。与`Runnable`接口不同,`Callable`不仅能够执行任务,还能返回计算结果,并且可以抛出异常...
示例代码解析 下面是对文章中给出的示例代码的详细分析: ```java package threadConcurrent.test; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util....
11. **并发编程**:深入理解并发容器如BlockingQueue、FutureTask,以及并发工具类CountDownLatch、CyclicBarrier、Phaser等的使用。 12. **网络编程**:理解TCP/IP协议,了解Socket编程,以及HTTP协议的工作原理,...
美团动态线程池实践思路开源项目(DynamicTp)线程池源码解析及通知告警篇 本文详细介绍了美团动态线程池实践思路开源项目(DynamicTp)的通知告警模块,该模块提供了多种通知告警功能,每一个通知项都可以独立配置...
提前完成任务之FutureTask使用.mp4 Future设计模式实现(实现类似于JDK提供的Future).mp4 Future源码解读.mp4 ForkJoin框架详解.mp4 同步容器与并发容器.mp4 并发容器CopyOnWriteArrayList原理与使用.mp4 并发容器...
ExecutorService负责调度和执行任务,FutureTask则提供了异步计算的结果。AsyncTask通过内部的SerialExecutor保证了同一时间只有一个任务在执行,实现了任务的串行化。 此外,Android的AsyncTask有版本差异,从...
3. `FutureTask` 类:实现了 Future 接口,用于保存任务的结果并控制任务的执行状态。 4. `InternalHandler` 类:内部 Handler 实现,用于在 UI 线程中分发消息。 通过深入理解这些细节,开发者可以更好地掌握如何...
本篇将基于2023年的最新趋势,解析Java面试中的常见问题及答案。 1. **Java基础** - **面向对象**:理解类、对象、封装、继承、多态的概念。 - **异常处理**:熟悉try-catch-finally语句块,了解Checked与...
### 上海某大厂Java面试真题与解析 #### 并发编程三大核心:原子性、可见性与有序性 **原子性**是指一个或多个操作作为一个不可分割的整体执行,这意味着这些操作要么全部成功,要么全部失败,在执行过程中不允许...
Java中实现多线程有四种方法:继承Thread类、实现Runnable接口、实现Callable接口通过FutureTask包装器来创建Thread线程、使用ExecutorService、Callable、Future实现有返回结果的多线程。 二、停止一个正在运行的...
### Java多线程并发实战知识点解析 #### 一、引言 在计算机科学领域,**多线程**和**并发**技术是现代软件开发中不可或缺的一部分。随着处理器核心数量的增加,利用多线程和并发可以显著提高应用程序的性能和响应...
FutureTask<String> futureTask = new FutureTask(new Callable() { @Override public String call() throws Exception { // 异步操作 return "hihi"; } }); Scheduler.Worker worker = Schedulers.io()....
- 使用`Future`和`FutureTask`获取任务结果,便于管理和同步。 - 避免长时间阻塞的操作,以免阻塞工作线程,影响线程池效率。 在实际开发中,Java还提供了一些预定义的线程池,如`Executors.newFixedThreadPool(int...
1.简介 android.os.AsyncTask,一个执行异步操作的类,我们可以使用它来处理后台任务,并且在UI线程中... 例如{@link Executor},{@ link ThreadPoolExecutor}和{@link FutureTask}。 2.基本使用 2.1 关键API andro
Java 多线程 Callable 接口实现代码示例 Java 多线程编程中,创建线程有多种方式,其中一种便...感兴趣的朋友可以继续参阅本站的其他文章,如《浅谈 Java 面向接口编程》、《Java 编程接口回调一般用法代码解析》等。
6. **Future模式** - "31 凭票取餐—Future模式详解-慕课专栏.html":讲解了`Future`接口和`FutureTask`类,它们用于获取异步任务的结果。 7. **Master-Slave模式** - "36 为多线程们安排一位经理—Master-Slave...