`
uule
  • 浏览: 6387501 次
  • 性别: Icon_minigender_1
  • 来自: 一片神奇的土地
社区版块
存档分类
最新评论

Future接口和FutureTask类【FutureTask实现了Runnable和Future接口】

 
阅读更多

API:

 

public interface Future<V> {

    /**
     * Attempts to cancel execution of this task.  This attempt will
     * fail if the task has already completed, has already been cancelled,
     * or could not be cancelled for some other reason. If successful,
     * and this task has not started when <tt>cancel</tt> is called,
     * this task should never run.  If the task has already started,
     * then the <tt>mayInterruptIfRunning</tt> parameter determines
     * whether the thread executing this task should be interrupted in
     * an attempt to stop the task.
     */   
    boolean cancel(boolean mayInterruptIfRunning);

    /**
     * Returns <tt>true</tt> if this task was cancelled before it completed
     * normally.
     *
     * @return <tt>true</tt> if this task was cancelled before it completed
     */
    boolean isCancelled();

    /**
     * Returns <tt>true</tt> if this task completed.
     *
     * Completion may be due to normal termination, an exception, or
     * cancellation -- in all of these cases, this method will return
     * <tt>true</tt>.
     *
     * @return <tt>true</tt> if this task completed
     */
    boolean isDone();

    /**
     * Waits if necessary for the computation to complete, and then
     * retrieves its result.
     *
     * @return the computed result
     * @throws CancellationException if the computation was cancelled
     * @throws ExecutionException if the computation threw an
     * exception
     * @throws InterruptedException if the current thread was interrupted
     * while waiting
     */
    V get() throws InterruptedException, ExecutionException;

    /**
     * Waits if necessary for at most the given time for the computation
     * to complete, and then retrieves its result, if available.
     *
     * @param timeout the maximum time to wait
     * @param unit the time unit of the timeout argument
     * @return the computed result
     * @throws CancellationException if the computation was cancelled
     * @throws ExecutionException if the computation threw an
     * exception
     * @throws InterruptedException if the current thread was interrupted
     * while waiting
     * @throws TimeoutException if the wait timed out
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

 

public interface Executor {    
    void execute(Runnable command);  
} 

public interface ExecutorService extends Executor {  
  
    <T> Future<T> submit(Callable<T> task);       
    <T> Future<T> submit(Runnable task, T result);    
    Future<?> submit(Runnable task);        
    ...     
}  

public class FutureTask<V>  extends Object  
			implements Future<V>, Runnable {
      FutureTask(Callable<V> callable)   
             //创建一个 FutureTask,一旦运行就执行给定的 Callable。    
      FutureTask(Runnable runnable, V result)   
             //创建一个 FutureTask,一旦运行就执行给定的 Runnable,并安排成功完成时 get 返回给定的结果 。  
}
/*参数:  
runnable - 可运行的任务。  
result - 成功完成时要返回的结果。  
如果不需要特定的结果,则考虑使用下列形式的构造:Future<?> f = new FutureTask<Object>(runnable, null)  */

  

单独使用Runnable时

        无法获得返回值

 

单独使用Callable时

        无法在新线程中(new Thread(Runnable r))使用,只能使用ExecutorService

        Thread类只支持Runnable

 

FutureTask

         实现了RunnableFuture,所以兼顾两者优点

         既可以使用ExecutorService,也可以使用Thread

 

Callable pAccount = new PrivateAccount();  
	FutureTask futureTask = new FutureTask(pAccount);  
	// 使用futureTask创建一个线程  
	Thread thread = new Thread(futureTask);  
	thread.start();  

  

=================================================================

public interface Future<V> Future 表示异步计算的结果。
Future有个get方法而获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态,然后会返回结果或者抛出异常。 

  

Future 主要定义了5个方法: 

1)boolean cancel(boolean mayInterruptIfRunning):试图取消对此任务的执行。如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败。当调用 cancel 时,如果调用成功,而此任务尚未启动,则此任务将永不运行。如果任务已经启动,则 mayInterruptIfRunning 参数确定是否应该以试图停止任务的方式来中断执行此任务的线程。此方法返回后,对 isDone() 的后续调用将始终返回 true。如果此方法返回 true,则对 isCancelled() 的后续调用将始终返回 true。 


2)boolean isCancelled():如果在任务正常完成前将其取消,则返回 true。 
3)boolean isDone():如果任务已完成,则返回 true。 可能由于正常终止、异常或取消而完成,在所有这些情况中,此方法都将返回 true。 
4)V get()throws InterruptedException,ExecutionException:如有必要,等待计算完成,然后获取其结果。 
5)V get(long timeout,TimeUnit unit) throws InterruptedException,ExecutionException,TimeoutException:如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。

6)finishCompletion()该方法在任务完成(包括异常完成、取消)后调用。删除所有正在get获取等待的节点且唤醒节点的线程。和调用done方法和置空callable.

 

CompletionService接口能拿到按完成顺序拿到一组线程池中所有线程,依靠的就是FutureTask中的finishCompletion()方法。

/**
    该方法在任务完成(包括异常完成、取消)后调用。删除所有正在get获取等待的节点且唤醒节点的线程。和调用done方法和置空callable.
    **/
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

 

 

public class FutureTask<V>  extends Object
	implements Future<V>, Runnable
Future是一个接口, FutureTask类是Future 的一个实现类,并实现了Runnable,因此FutureTask可以传递到线程对象Thread中新建一个线程执行。所以可通过Excutor(线程池) 来执行,也可传递给Thread对象执行。如果在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给Future对象在后台完成,当主线程将来需要时,就可以通过Future对象获得后台作业的计算结果或者执行状态。 

FutureTask是为了弥补Thread的不足而设计的,它可以让程序员准确地知道线程什么时候执行完成并获得到线程执行完成后返回的结果(如果有需要)。

 

FutureTask是一种可以取消的异步的计算任务。它的计算是通过Callable实现的,它等价于可以携带结果的Runnable,并且有三个状态:等待、运行和完成。完成包括所有计算以任意的方式结束,包括正常结束、取消和异常。

Executor框架利用FutureTask来完成异步任务,并可以用来进行任何潜在的耗时的计算。一般FutureTask多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。

  FutureTask多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果

 

 

 

JDK:

此类提供了对 Future 的基本实现。仅在计算完成时才能检索结果;如果计算尚未完成,则阻塞 get 方法。一旦计算完成,就不能再重新开始或取消计算。

 

可使用 FutureTask 包装 Callable 或 Runnable 对象。因为 FutureTask 实现了 Runnable,所以可将 FutureTask 提交给 Executor 执行。

 

 

//构造方法摘要
FutureTask(Callable<V> callable) 
          //创建一个 FutureTask,一旦运行就执行给定的 Callable。

FutureTask(Runnable runnable, V result) 
          //创建一个 FutureTask,一旦运行就执行给定的 Runnable,并安排成功完成时 get 返回给定的结果 。
//参数:
runnable - 可运行的任务。
result - 成功完成时要返回的结果。
如果不需要特定的结果,则考虑使用下列形式的构造:Future<?> f = new FutureTask<Object>(runnable, null)

 

Example1:

下面的例子模拟一个会计算账的过程,主线程已经获得其他帐户的总额了,为了不让主线程等待 PrivateAccount类的计算结果的返回而启用新的线程去处理, 并使用 FutureTask对象来监控,这样,主线程还可以继续做其他事情, 最后需要计算总额的时候再尝试去获得privateAccount 的信息。 

 

package test;

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

/**
 *
 * @author Administrator
 *
 */
@SuppressWarnings("all")
public class FutureTaskDemo {
	public static void main(String[] args) {
		// 初始化一个Callable对象和FutureTask对象
		Callable pAccount = new PrivateAccount();
		FutureTask futureTask = new FutureTask(pAccount);
		// 使用futureTask创建一个线程
		Thread pAccountThread = new Thread(futureTask);
		System.out.println("futureTask线程现在开始启动,启动时间为:" + System.nanoTime());
		pAccountThread.start();
		System.out.println("主线程开始执行其他任务");
		// 从其他账户获取总金额
		int totalMoney = new Random().nextInt(100000);
		System.out.println("现在你在其他账户中的总金额为" + totalMoney);
		System.out.println("等待私有账户总金额统计完毕...");
		// 测试后台的计算线程是否完成,如果未完成则等待
		while (!futureTask.isDone()) {
			try {
				Thread.sleep(500);
				System.out.println("私有账户计算未完成继续等待...");
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		System.out.println("futureTask线程计算完毕,此时时间为" + System.nanoTime());
		Integer privateAccountMoney = null;
		try {
			privateAccountMoney = (Integer) futureTask.get();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}
		System.out.println("您现在的总金额为:" + totalMoney + privateAccountMoney.intValue());
	}
}

@SuppressWarnings("all")
class PrivateAccount implements Callable {
	Integer totalMoney;

	@Override
	public Object call() throws Exception {
		Thread.sleep(5000);
		totalMoney = new Integer(new Random().nextInt(10000));
		System.out.println("您当前有" + totalMoney + "在您的私有账户中");
		return totalMoney;
	}

}

 运行结果   


 

          来源:
http://zheng12tian.iteye.com/blog/991484
         
Example2:
public class FutureTaskSample {
    
    static FutureTask<String> future = new FutureTask(new Callable<String>(){
        public String call(){
            return getPageContent();
        }
    });
    
    public static void main(String[] args) throws InterruptedException, ExecutionException{
        //Start a thread to let this thread to do the time exhausting thing
        new Thread(future).start();

        //Main thread can do own required thing first
        doOwnThing();

        //At the needed time, main thread can get the result
        System.out.println(future.get());
    }
    
    public static String doOwnThing(){
        return "Do Own Thing";
    }
    public static String getPageContent(){
        return "Callable method...";
    }
}
 结果为:Callable method...
不科学啊,为毛??!
 改为这样,结果就正常:
public class FutureTaskSample {  
      
    public static void main(String[] args) throws InterruptedException, ExecutionException{  
        //Start a thread to let this thread to do the time exhausting thing  
    	Callable call = new MyCallable();
    	FutureTask future = new FutureTask(call);
        new Thread(future).start();  
  
        //Main thread can do own required thing first  
        //doOwnThing();  
        System.out.println("Do Own Thing");  
        
        //At the needed time, main thread can get the result  
        System.out.println(future.get());  
    }  
    
}  


	class MyCallable implements Callable<String>{

		@Override
		public String call() throws Exception {
			 return getPageContent();  
		}
		
		public String getPageContent(){  
	        return "Callable method...";  
	    } 
	}
 结果:
Do Own Thing
Callable method...

 

 Example3:
import java.util.concurrent.Callable;

public class Changgong implements Callable<Integer>{

    private int hours=12;
    private int amount;
    
    @Override
    public Integer call() throws Exception {
        while(hours>0){
            System.out.println("I'm working......");
            amount ++;
            hours--;
            Thread.sleep(1000);
        }
        return amount;
    }
}
 
public class Dizhu {
        
    public static void main(String args[]){
        Changgong worker = new Changgong();
        FutureTask<Integer> jiangong = new FutureTask<Integer>(worker);
        new Thread(jiangong).start();
        while(!jiangong.isDone()){
            try {
                System.out.println("看长工做完了没...");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        int amount;
        try {
            amount = jiangong.get();
            System.out.println("工作做完了,上交了"+amount);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}
 
结果:
看工人做完了没...
I'm working......
看工人做完了没...
I'm working......
看工人做完了没...
I'm working......
看工人做完了没...
I'm working......
看工人做完了没...
I'm working......
看工人做完了没...
I'm working......
看工人做完了没...
I'm working......
看工人做完了没...
I'm working......
看工人做完了没...
I'm working......
看工人做完了没...
I'm working......
看工人做完了没...
I'm working......
看工人做完了没...
I'm working......
看工人做完了没...
工作做完了,上交了12
 
/**
 * Factory and utility methods for {@link Executor}, {@link
 * ExecutorService}, {@link ScheduledExecutorService}, {@link
 * ThreadFactory}, and {@link Callable} classes defined in this
 * package. This class supports the following kinds of methods:
 *
 * <ul>
 *   <li> Methods that create and return an {@link ExecutorService}
 *        set up with commonly useful configuration settings.
 *   <li> Methods that create and return a {@link ScheduledExecutorService}
 *        set up with commonly useful configuration settings.
 *   <li> Methods that create and return a "wrapped" ExecutorService, that
 *        disables reconfiguration by making implementation-specific methods
 *        inaccessible.
 *   <li> Methods that create and return a {@link ThreadFactory}
 *        that sets newly created threads to a known state.
 *   <li> Methods that create and return a {@link Callable}
 *        out of other closure-like forms, so they can be used
 *        in execution methods requiring <tt>Callable</tt>.
 * </ul>
 *
 * @since 1.5
 * @author Doug Lea
 */
public class Executors {

    /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * <tt>nThreads</tt> threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if <tt>nThreads &lt;= 0</tt>
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue, using the provided
     * ThreadFactory to create new threads when needed.  At any point,
     * at most <tt>nThreads</tt> threads will be active processing
     * tasks.  If additional tasks are submitted when all threads are
     * active, they will wait in the queue until a thread is
     * available.  If any thread terminates due to a failure during
     * execution prior to shutdown, a new one will take its place if
     * needed to execute subsequent tasks.  The threads in the pool will
     * exist until it is explicitly {@link ExecutorService#shutdown
     * shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @param threadFactory the factory to use when creating new threads
     * @return the newly created thread pool
     * @throws NullPointerException if threadFactory is null
     * @throws IllegalArgumentException if <tt>nThreads &lt;= 0</tt>
     */
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

    /**
     * Creates an Executor that uses a single worker thread operating
     * off an unbounded queue. (Note however that if this single
     * thread terminates due to a failure during execution prior to
     * shutdown, a new one will take its place if needed to execute
     * subsequent tasks.)  Tasks are guaranteed to execute
     * sequentially, and no more than one task will be active at any
     * given time. Unlike the otherwise equivalent
     * <tt>newFixedThreadPool(1)</tt> the returned executor is
     * guaranteed not to be reconfigurable to use additional threads.
     *
     * @return the newly created single-threaded Executor
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    /**
     * Creates an Executor that uses a single worker thread operating
     * off an unbounded queue, and uses the provided ThreadFactory to
     * create a new thread when needed. Unlike the otherwise
     * equivalent <tt>newFixedThreadPool(1, threadFactory)</tt> the
     * returned executor is guaranteed not to be reconfigurable to use
     * additional threads.
     *
     * @param threadFactory the factory to use when creating new
     * threads
     *
     * @return the newly created single-threaded Executor
     * @throws NullPointerException if threadFactory is null
     */
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

    /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available.  These pools will typically improve the performance
     * of programs that execute many short-lived asynchronous tasks.
     * Calls to <tt>execute</tt> will reuse previously constructed
     * threads if available. If no existing thread is available, a new
     * thread will be created and added to the pool. Threads that have
     * not been used for sixty seconds are terminated and removed from
     * the cache. Thus, a pool that remains idle for long enough will
     * not consume any resources. Note that pools with similar
     * properties but different details (for example, timeout parameters)
     * may be created using {@link ThreadPoolExecutor} constructors.
     *
     * @return the newly created thread pool
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available, and uses the provided
     * ThreadFactory to create new threads when needed.
     * @param threadFactory the factory to use when creating new threads
     * @return the newly created thread pool
     * @throws NullPointerException if threadFactory is null
     */
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

    /**
     * Creates a single-threaded executor that can schedule commands
     * to run after a given delay, or to execute periodically.
     * (Note however that if this single
     * thread terminates due to a failure during execution prior to
     * shutdown, a new one will take its place if needed to execute
     * subsequent tasks.)  Tasks are guaranteed to execute
     * sequentially, and no more than one task will be active at any
     * given time. Unlike the otherwise equivalent
     * <tt>newScheduledThreadPool(1)</tt> the returned executor is
     * guaranteed not to be reconfigurable to use additional threads.
     * @return the newly created scheduled executor
     */
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }

    /**
     * Creates a single-threaded executor that can schedule commands
     * to run after a given delay, or to execute periodically.  (Note
     * however that if this single thread terminates due to a failure
     * during execution prior to shutdown, a new one will take its
     * place if needed to execute subsequent tasks.)  Tasks are
     * guaranteed to execute sequentially, and no more than one task
     * will be active at any given time. Unlike the otherwise
     * equivalent <tt>newScheduledThreadPool(1, threadFactory)</tt>
     * the returned executor is guaranteed not to be reconfigurable to
     * use additional threads.
     * @param threadFactory the factory to use when creating new
     * threads
     * @return a newly created scheduled executor
     * @throws NullPointerException if threadFactory is null
     */
    public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1, threadFactory));
    }

    /**
     * Creates a thread pool that can schedule commands to run after a
     * given delay, or to execute periodically.
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle.
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if <tt>corePoolSize &lt; 0</tt>
     */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    /**
     * Creates a thread pool that can schedule commands to run after a
     * given delay, or to execute periodically.
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle.
     * @param threadFactory the factory to use when the executor
     * creates a new thread.
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if <tt>corePoolSize &lt; 0</tt>
     * @throws NullPointerException if threadFactory is null
     */
    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }
    ........

    /** Cannot instantiate. */
    private Executors() {}
}
 ================================================================================
FutureTask实现原理-源码

    下面我们介绍一下FutureTask内部的一些实现机制。下文从以下几点叙述:

  1. 类继承结构
  2. 核心成员变量
  3. 内部状态转换
  4. 核心方法解析

1 类继承结构

      首先我们看一下FutureTask的继承结构:

   

 

      FutureTask实现了RunnableFuture接口,而RunnableFuture继承了Runnable和Future,也就是说FutureTask既是Runnable,也是Future。

2 核心成员变量

    FutureTask内部定义了以下变量,以及它们的含义如下

  • volatile int state:表示对象状态,volatile关键字保证了内存可见性。futureTask中定义了7种状态,代表了7种不同的执行状态
1
2
3
4
5
6
7
private static final int NEW          = 0//任务新建和执行中
private static final int COMPLETING   = 1//任务将要执行完毕
private static final int NORMAL       = 2//任务正常执行结束
private static final int EXCEPTIONAL  = 3//任务异常
private static final int CANCELLED    = 4//任务取消
private static final int INTERRUPTING = 5//任务线程即将被中断
private static final int INTERRUPTED  = 6//任务线程已中断
  • Callable<V> callable:被提交的任务
  • Object outcome:任务执行结果或者任务异常
  • volatile Thread runner:执行任务的线程
  • volatile WaitNode waiters:等待节点,关联等待线程
  • long stateOffset:state字段的内存偏移量
  • long runnerOffset:runner字段的内存偏移量
  • long waitersOffset:waiters字段的内存偏移量

    后三个字段是配合Unsafe类做CAS操作使用的。

3 内部状态转换

    FutureTask中使用state表示任务状态,state值变更的由CAS操作保证原子性。

    FutureTask对象初始化时,在构造器中把state置为为NEW,之后状态的变更依据具体执行情况来定。

   例如任务执行正常结束前,state会被设置成COMPLETING,代表任务即将完成,接下来很快就会被设置为NARMAL或者EXCEPTIONAL,这取决于调用Runnable中的call()方法是否抛出了异常。有异常则后者,反之前者。

  任务提交后、任务结束前取消任务,那么有可能变为CANCELLED或者INTERRUPTED。在调用cancel方法时,如果传入false表示不中断线程,state会被置为CANCELLED,反之state先被变为INTERRUPTING,后变为INTERRUPTED。

     总结下,FutureTask的状态流转过程,可以出现以下四种情况:

        1. 任务正常执行并返回。 NEW -> COMPLETING -> NORMAL

    2. 执行中出现异常。NEW -> COMPLETING -> EXCEPTIONAL

        3. 任务执行过程中被取消,并且不响应中断。NEW -> CANCELLED

    4. 任务执行过程中被取消,并且响应中断。 NEW -> INTERRUPTING -> INTERRUPTED  

4 核心方法解析

  接下来我们一起扒一扒FutureTask的源码。我们先看一下任务线程是怎么执行的。当任务被提交到线程池后,会执行futureTask的run()方法。

1 public void run()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void run() {<br>        // 校验任务状态
        if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;<br>       // double check
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {<br>            //执行业务代码
                    result = c.call();
                    ran = true;
                catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        finally {<br>        // 重置runner
            runner = null;
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

  翻译一下,这个方法经历了以下几步

  1. 校验当前任务状态是否为NEW以及runner是否已赋值。这一步是防止任务被取消。
  2. double-check任务状态state
  3. 执行业务逻辑,也就是c.call()方法被执行
  4. 如果业务逻辑异常,则调用setException方法将异常对象赋给outcome,并且更新state值
  5. 如果业务正常,则调用set方法将执行结果赋给outcome,并且更新state值

 我们继续往下看,setException(Throwable t)和set(V v) 具体是怎么做的

1
2
3
4
5
6
7
8
9
10
protected void set(V v) {
        // state状态 NEW->COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            // COMPLETING -> NORMAL 到达稳定状态
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
            // 一些结束工作
            finishCompletion();
        }
    }
1
2
3
4
5
6
7
8
9
10
protected void setException(Throwable t) {
    // state状态 NEW->COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        // COMPLETING -> EXCEPTIONAL 到达稳定状态
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
        // 一些结束工作
        finishCompletion();
    }
} 

    code中的注释已经写的很清楚,故不翻译了。状态变更的原子性由unsafe对象提供的CAS操作保证。FutureTask的outcome变量存储执行结果或者异常对象,会由主线程返回。

2  get()和get(long timeout, TimeUnit unit)

    任务由线程池提供的线程执行,那么这时候主线程则会阻塞,直到任务线程唤醒它们。我们通过get(long timeout, TimeUnit unit)方法看看是怎么做的  

1
2
3
4
5
6
7
8
9
10
public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }

   get的源码很简洁,首先校验参数,然后根据state状态判断是否超时,如果超时则异常,不超时则调用report(s)去获取最终结果。

    当 s<= COMPLETING时,表明任务仍然在执行且没有被取消。如果它为true,那么走到awaitDone方法。

    awaitDone是futureTask实现阻塞的关键方法,我们重点关注一下它的实现原理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
 * 等待任务执行完毕,如果任务取消或者超时则停止
 * @param timed 为true表示设置超时时间
 * @param nanos 超时时间
 * @return 任务完成时的状态
 * @throws InterruptedException
 */
private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
    // 任务截止时间
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    // 自旋
    for (;;) {
        if (Thread.interrupted()) {
            //线程中断则移除等待线程,并抛出异常
            removeWaiter(q);
            throw new InterruptedException();
        }
        int s = state;
        if (s > COMPLETING) {
            // 任务可能已经完成或者被取消了
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING)
            // 可能任务线程被阻塞了,主线程让出CPU
            Thread.yield();
        else if (q == null)
            // 等待线程节点为空,则初始化新节点并关联当前线程
            q = new WaitNode();
        else if (!queued)
            // 等待线程入队列,成功则queued=true
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                    q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                //已经超时的话,移除等待节点
                removeWaiter(q);
                return state;
            }
            // 未超时,将当前线程挂起指定时间
            LockSupport.parkNanos(this, nanos);
        }
        else
            // timed=false时会走到这里,挂起当前线程
            LockSupport.park(this);
    }
}

注释里也很清楚的写明了每一步的作用,我们以设置超时时间为例,总结一下过程

  1. 计算deadline,也就是到某个时间点后如果还没有返回结果,那么就超时了。
  2. 进入自旋,也就是死循环。
  3. 首先判断是否响应线程中断。对于线程中断的响应往往会放在线程进入阻塞之前,这里也印证了这一点。
  4. 判断state值,如果>COMPLETING表明任务已经取消或者已经执行完毕,就可以直接返回了。
  5. 如果任务还在执行,则为当前线程初始化一个等待节点WaitNode,入等待队列。这里和AQS的等待队列类似,只不过Node只关联线程,而没有状态。AQS里面的等待节点是有状态的。
  6. 计算nanos,判断是否已经超时。如果已经超时,则移除所有等待节点,直接返回state。超时的话,state的值仍然还是COMPLETING。
  7. 如果还未超时,就通过LockSupprot类提供的方法在指定时间内挂起当前线程,等待任务线程唤醒或者超时唤醒。

当线程被挂起之后,如果任务线程执行完毕,就会唤醒等待线程哦。这一步就是在finishCompletion里面做的,前面已经提到这个方法。我们再看看这个方法具体做了哪些事吧~

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
 * 移除并唤醒所有等待线程,执行done,置空callable
 * nulls out callable.
 */
private void finishCompletion() {
    //遍历等待节点
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    //唤醒等待线程
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                // unlink to help gc
                q.next = null;
                q = next;
            }
            break;
        }
    }
    //模板方法,可以被覆盖
    done();
    //清空callable
    callable = null;
}

由代码和注释可以看出来,这个方法的作用主要在于唤醒等待线程。由前文可知,当任务正常结束或者异常时,都会调用finishCompletion去唤醒等待线程。这个时候,等待线程就可以醒来,开开心心的获得结果啦。  

最后我们看一下任务取消  

3 public boolean cancel(boolean mayInterruptIfRunning)

  注意,取消操作不一定会起作用,这里我们先贴个demo

复制代码
 1 public class FutureDemo {
 2     public static void main(String[] args) {
 3         ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
 4         // 预创建线程
 5         executorService.prestartCoreThread();
 6 
 7         Future future = executorService.submit(new Callable<Object>() {
 8             @Override
 9             public Object call() {
10                 System.out.println("start to run callable");
11                 Long start = System.currentTimeMillis();
12                 while (true) {
13                     Long current = System.currentTimeMillis();
14                     if ((current - start) > 1000) {
15                         System.out.println("当前任务执行已经超过1s");
16                         return 1;
17                     }
18                 }
19             }
20         });
21 
22         System.out.println(future.cancel(false));
23 
24         try {
25             Thread.currentThread().sleep(3000);
26             executorService.shutdown();
27         } catch (Exception e) {
28             //NO OP
29         }
30     }
31 }
复制代码

我们多次测试后发现,出现了2种打印结果,如图

                结果1

                结果2    

第一种是任务压根没取消,第二种则是任务压根没提交成功。 

    方法签名注释告诉我们,取消操作是可能会失败的,如果当前任务已经结束或者已经取消,则当前取消操作会失败。如果任务尚未开始,那么任务不会被执行。这就解释了出现上图结果2的情况。我们还是从源码去分析cancel()究竟做了哪些事。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public boolean cancel(boolean mayInterruptIfRunning) {
        if (state != NEW)
            return false;
        if (mayInterruptIfRunning) {
            if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
                return false;
            Thread t = runner;
            if (t != null)
                t.interrupt();
            UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
        }
        else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
            return false;
        finishCompletion();
        return true;
    }

  执行逻辑如下

  1. state不为NEW时,任务即将进入终态,直接返回false表明取消操作失败。
  2. state状态为NEW,任务可能已经开始执行,也可能还未开始。
  3. mayInterruptIfRunning表明是否中断线程。若是,则尝试将state设置为INTERRUPTING,并且中断线程,之后将state设置为终态INTERRUPTED。
  4. 如果mayInterruptIfRunning=false,则不中断线程,把state设置为CANCELLED
  5. 移除等待线程并唤醒。
  6. 返回true

    可见,cancel()方法改变了futureTask的状态位,如果传入的是false并且业务逻辑已经开始执行,当前任务是不会被终止的,而是会继续执行,直到异常或者执行完毕。如果传入的是true,会调用当前线程的interrupt()方法,把中断标志位设为true。

    事实上,除非线程自己停止自己的任务,或者退出JVM,是没有其他方法完全终止一个线程的任务的。mayInterruptIfRunning=true,通过希望当前线程可以响应中断的方式来结束任务。当任务被取消后,会被封装为CancellationException抛出。

总结

  总结一下,futureTask中的任务状态由变量state表示,任务状态都基于state判断。而futureTask的阻塞则是通过自旋+挂起线程实现。理解FutureTask的内部实现机制,我们使用Future时才能更加得心应手。文中掺杂着笔者的个人理解,如果有不正之处,还望读者多多指正

  • 大小: 62.5 KB
  • 大小: 25.7 KB
分享到:
评论
9 楼 Aaron_scut 2016-08-23  
LZ,类FutureTaskSample中14行,doOwnThing()返回的结果没有打印,当然不显示“Do Own Thing”
8 楼 passerby_whu 2015-06-19  
example的doOwnThing没有System.out.println。当然看不到。
7 楼 passerby_whu 2015-06-19  
clean1981 写道
那ExecutorService配合FUTURETASK和这个有什么不同呢?好像都是异步吧?

ExecutorService主要作用是维护一个线程池。
6 楼 hujinhu 2014-11-18  
  非常好。。
5 楼 veally 2014-08-21  
基本上讲明白了, Callable 和 FutureTask 基本上明白 了
4 楼 clean1981 2013-08-28  
那ExecutorService配合FUTURETASK和这个有什么不同呢?好像都是异步吧?
3 楼 krave 2013-07-28  
Future还能够通过 传入Callable实例的submit方法获得
ExecutorService的submit方法留作此用。
2 楼 dingran 2013-05-09  
很棒,说的很明白,真不错,虽然eye不让这么回帖,可我还是要这样说,呵呵
1 楼 sundongyadh 2012-12-17  
非常好!通俗易懂。 

相关推荐

    Matlab环境下决策分类树的构建、优化与应用

    内容概要:本文详细介绍了如何利用Matlab构建、优化和应用决策分类树。首先,讲解了数据准备阶段,将数据与程序分离,确保灵活性。接着,通过具体实例展示了如何使用Matlab内置函数如fitctree快速构建决策树模型,并通过可视化工具直观呈现决策树结构。针对可能出现的过拟合问题,提出了基于成本复杂度的剪枝方法,以提高模型的泛化能力。此外,还分享了一些实用技巧,如处理连续特征、保存模型、并行计算等,帮助用户更好地理解和应用决策树。 适合人群:具有一定编程基础的数据分析师、机器学习爱好者及科研工作者。 使用场景及目标:适用于需要进行数据分类任务的场景,特别是当需要解释性强的模型时。主要目标是教会读者如何在Matlab环境中高效地构建和优化决策分类树,从而应用于实际项目中。 其他说明:文中不仅提供了完整的代码示例,还强调了代码模块化的重要性,便于后续维护和扩展。同时,对于初学者来说,建议从简单的鸢尾花数据集开始练习,逐步掌握决策树的各项技能。

    《营销调研》第7章-探索性调研数据采集.pptx

    《营销调研》第7章-探索性调研数据采集.pptx

    Assignment1_search_final(1).ipynb

    Assignment1_search_final(1).ipynb

    美团外卖优惠券小程序 美团优惠券微信小程序 自带流量主模式 带教程.zip

    美团优惠券小程序带举牌小人带菜谱+流量主模式,挺多外卖小程序的,但是都没有搭建教程 搭建: 1、下载源码,去微信公众平台注册自己的账号 2、解压到桌面 3、打开微信开发者工具添加小程序-把解压的源码添加进去-appid改成自己小程序的 4、在pages/index/index.js文件搜流量主广告改成自己的广告ID 5、到微信公众平台登陆自己的小程序-开发管理-开发设置-服务器域名修改成

    《计算机录入技术》第十八章-常用外文输入法.pptx

    《计算机录入技术》第十八章-常用外文输入法.pptx

    基于Andorid的跨屏拖动应用设计.zip

    基于Andorid的跨屏拖动应用设计实现源码,主要针对计算机相关专业的正在做毕设的学生和需要项目实战练习的学习者,也可作为课程设计、期末大作业。

    《网站建设与维护》项目4-在线购物商城用户管理功能.pptx

    《网站建设与维护》项目4-在线购物商城用户管理功能.pptx

    区块链_房屋转租系统_去中心化存储_数据防篡改_智能合约_S_1744435730.zip

    区块链_房屋转租系统_去中心化存储_数据防篡改_智能合约_S_1744435730

    《计算机应用基础实训指导》实训五-Word-2010的文字编辑操作.pptx

    《计算机应用基础实训指导》实训五-Word-2010的文字编辑操作.pptx

    《移动通信(第4版)》第5章-组网技术.ppt

    《移动通信(第4版)》第5章-组网技术.ppt

    ABB机器人基础.pdf

    ABB机器人基础.pdf

    《综合布线施工技术》第9章-综合布线实训指导.ppt

    《综合布线施工技术》第9章-综合布线实训指导.ppt

    最新修复版万能镜像系统源码-最终版站群利器持续更新升级

    很不错的一套站群系统源码,后台配置采集节点,输入目标站地址即可全自动智能转换自动全站采集!支持 https、支持 POST 获取、支持搜索、支持 cookie、支持代理、支持破解防盗链、支持破解防采集 全自动分析,内外链接自动转换、图片地址、css、js,自动分析 CSS 内的图片使得页面风格不丢失: 广告标签,方便在规则里直接替换广告代码 支持自定义标签,标签可自定义内容、自由截取、内容正则截取。可以放在模板里,也可以在规则里替换 支持自定义模板,可使用标签 diy 个性模板,真正做到内容上移花接木 调试模式,可观察采集性能,便于发现和解决各种错误 多条采集规则一键切换,支持导入导出 内置强大替换和过滤功能,标签过滤、站内外过滤、字符串替换、等等 IP 屏蔽功能,屏蔽想要屏蔽 IP 地址让它无法访问 ****高级功能*****· url 过滤功能,可过滤屏蔽不采集指定链接· 伪原创,近义词替换有利于 seo· 伪静态,url 伪静态化,有利于 seo· 自动缓存自动更新,可设置缓存时间达到自动更新,css 缓存· 支持演示有阿三源码简繁体互转· 代理 IP、伪造 IP、随机 IP、伪造 user-agent、伪造 referer 来路、自定义 cookie,以便应对防采集措施· url 地址加密转换,个性化 url,让你的 url 地址与众不同· 关键词内链功能· 还有更多功能等你发现…… 程序使用非常简单,仅需在后台输入一个域名即可建站,不限子域名,站群利器,无授权,无绑定限制,使用后台功能可对页面进行自定义修改,在程序后台开启生 成功能,只要访问页面就会生成一个本地文件。当用户再次访问的时候就直接访问网站本地的页面,所以目标站点无法访问了也没关系,我们的站点依然可以访问, 支持伪静态、伪原创、生成静态文件、自定义替换、广告管理、友情链接管理、自动下载 CSS 内的图。

    《Approaching(Almost)any machine learning problem》中文版第11章

    【自然语言处理】文本分类方法综述:从基础模型到深度学习的情感分析系统设计

    基于Andorid的下拉浏览应用设计.zip

    基于Andorid的下拉浏览应用设计实现源码,主要针对计算机相关专业的正在做毕设的学生和需要项目实战练习的学习者,也可作为课程设计、期末大作业。

    P2插电式混合动力系统Simulink模型:基于逻辑门限值控制策略的混动汽车仿真

    内容概要:本文详细介绍了一个原创的P2插电式混合动力系统Simulink模型,该模型基于逻辑门限值控制策略,涵盖了多个关键模块如工况输入、驾驶员模型、发动机模型、电机模型、制动能量回收模型、转矩分配模型、运行模式切换模型、档位切换模型以及纵向动力学模型。模型支持多种标准工况(WLTC、UDDS、EUDC、NEDC)和自定义工况,并展示了丰富的仿真结果,包括发动机和电机转矩变化、工作模式切换、档位变化、电池SOC变化、燃油消耗量、速度跟随和最大爬坡度等。此外,文章还深入探讨了逻辑门限值控制策略的具体实现及其效果,提供了详细的代码示例和技术细节。 适合人群:汽车工程专业学生、研究人员、混动汽车开发者及爱好者。 使用场景及目标:①用于教学和科研,帮助理解和掌握P2混动系统的原理和控制策略;②作为开发工具,辅助设计和优化混动汽车控制系统;③提供仿真平台,评估不同工况下的混动系统性能。 其他说明:文中不仅介绍了模型的整体架构和各模块的功能,还分享了许多实用的调试技巧和优化方法,使读者能够更好地理解和应用该模型。

    电力系统分布式调度中ADMM算法的MATLAB实现及其应用

    内容概要:本文详细介绍了基于ADMM(交替方向乘子法)算法在电力系统分布式调度中的应用,特别是并行(Jacobi)和串行(Gauss-Seidel)两种不同更新模式的实现。文中通过MATLAB代码展示了这两种模式的具体实现方法,并比较了它们的优劣。并行模式适用于多核计算环境,能够充分利用硬件资源,尽管迭代次数较多,但总体计算时间较短;串行模式则由于“接力式”更新机制,通常收敛更快,但在计算资源有限的情况下可能会形成瓶颈。此外,文章还讨论了惩罚系数rho的自适应调整策略以及在电-气耦合系统优化中的应用实例。 适合人群:从事电力系统优化、分布式计算研究的专业人士,尤其是有一定MATLAB编程基础的研究人员和技术人员。 使用场景及目标:①理解和实现ADMM算法在电力系统分布式调度中的应用;②评估并行和串行模式在不同应用场景下的性能表现;③掌握惩罚系数rho的自适应调整技巧,提高算法收敛速度和稳定性。 其他说明:文章提供了详细的MATLAB代码示例,帮助读者更好地理解和实践ADMM算法。同时,强调了在实际工程应用中需要注意的关键技术和优化策略。

    这篇文章详细探讨了交错并联Buck变换器的设计、仿真及其实现,涵盖了从理论分析到实际应用的多个方面(含详细代码及解释)

    内容概要:本文深入研究了交错并联Buck变换器的工作原理、性能优势及其具体实现。文章首先介绍了交错并联Buck变换器相较于传统Buck变换器的优势,包括减小输出电流和电压纹波、降低开关管和二极管的电流应力、减小输出滤波电容容量等。接着,文章详细展示了如何通过MATLAB/Simulink建立该变换器的仿真模型,包括参数设置、电路元件添加、PWM信号生成及连接、电压电流测量模块的添加等。此外,还探讨了PID控制器的设计与实现,通过理论分析和仿真验证了其有效性。最后,文章通过多个仿真实验验证了交错并联Buck变换器在纹波性能、器件应力等方面的优势,并分析了不同控制策略的效果,如P、PI、PID控制等。 适合人群:具备一定电力电子基础,对DC-DC变换器特别是交错并联Buck变换器感兴趣的工程师和技术人员。 使用场景及目标:①理解交错并联Buck变换器的工作原理及其相对于传统Buck变换器的优势;②掌握使用MATLAB/Simulink搭建交错并联Buck变换器仿真模型的方法;③学习PID控制器的设计与实现,了解其在电源系统中的应用;④通过仿真实验验证交错并联Buck变换器的性能,评估不同控制策略的效果。 其他说明:本文不仅提供了详细的理论分析,还给出了大量可运行的MATLAB代码,帮助读者更好地理解和实践交错并联Buck变换器的设计与实现。同时,通过对不同控制策略的对比分析,为实际工程应用提供了有价值的参考。

    《综合布线施工技术》第8章-综合布线工程案例.ppt

    《综合布线施工技术》第8章-综合布线工程案例.ppt

    基于STM32F103C8T6的K型热电偶温度控制仪设计与实现

    内容概要:本文详细介绍了基于STM32F103C8T6的K型热电偶温度控制仪的设计与实现。硬件部分涵盖了热电偶采集电路、OLED显示模块、蜂鸣器电路、风扇控制电路以及EEPROM存储模块。软件部分则涉及ADC配置、OLED刷新、PID控温算法、EEPROM参数存储、风扇PWM控制等多个方面的具体实现。文中不仅提供了详细的代码示例,还分享了许多调试经验和注意事项,如冷端补偿、DMA传输优化、I2C时钟配置、PWM频率选择等。 适合人群:具有一定嵌入式系统开发经验的工程师和技术爱好者。 使用场景及目标:适用于需要进行温度监测与控制的应用场景,如工业自动化、实验室设备等。目标是帮助读者掌握STM32F103C8T6在温度控制领域的应用技巧,提升硬件设计和软件编程能力。 其他说明:本文提供的工程文件包含Altium Designer的原理图PCB文件,便于二次开发。此外,文中还提到了一些扩展功能,如加入Modbus通信协议,供有兴趣的读者进一步探索。

Global site tag (gtag.js) - Google Analytics