`
周凡杨
  • 浏览: 233259 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Callable<V>、Future<V>详解 | Executor框架

阅读更多
 

一:关于 Callable<V>的源码

package java.util.concurrent;

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

Callable<V>: 返回结果并且可能抛出异常的任务。实现者定义了一个不带任何参数的叫做 call的方法。 Callable<V>接口类似于 Runnable,两者都是为那些其实例可能被另一个线程执行的类设计的。但是 Runnable不会返回结果,并且无法抛出经过检查的异常。 Executors类包含一些从其他普通形式转换成 Callable<V>类的实用方法。

 

 

在并发编程时,一般使用Runnable接口,然后扔给线程池完事,这种情况下不需要线程的结果。 所以run()的返回值是void类型。如下代码所示:

 
package future;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FutureTest {
	public static class Task implements Runnable{
		@Override
		public void run() {
			System.out.println("做任务----"+Thread.currentThread().getName());
		}
	}
	public static void main(String[] args) {
		//创建线程池
		ExecutorService es = Executors.newCachedThreadPool();
		for(int i=0;i<100;i++){
			es.submit(new Task());
		}
	}
}
 

 

如果是一个多线程协作程序,比如菲波拉切数列,112358...使用多线程来计算。
但后者需要前者的结果,就需要用Callable<V>接口了。如下代码所示:

 

package future;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FutureTest2 {
	public static class Task implements Callable<String>{
		@Override
		public String call() throws Exception {
			System.out.println("做任务----"+Thread.currentThread().getName());
			return "返回运算结果";
		}
	}
	public static void main(String[] args) {
		//创建线程池
		ExecutorService es = Executors.newCachedThreadPool();
		for(int i=0;i<100;i++){
			es.submit(new Task());
		}
	}
}

 

 

我们看到,当Task类实现Callable接口后,重写了call()方法,call()方法是有返回值的。但我们如何来接受call()方法的返回值呢?

 

package future;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class FutureTest3 {
	public static class Task implements Callable<String>{
		@Override
		public String call() throws Exception {
			System.out.println("做任务----"+Thread.currentThread().getName());
			return "返回运算结果";
		}
	}
	public static void main(String[] args) throws InterruptedException, ExecutionException {
		
		List<Future<String>> result = new ArrayList<Future<String>>();
		
		//创建线程池
		ExecutorService es = Executors.newCachedThreadPool();
		for(int i=0;i<100;i++){
			result.add(es.submit(new Task()));
		}
		for(Future<String> f : result){
			System.out.println(f.get());
		}
	}
}

 

从这里可以看出,这时候,Future<V>就出场了,使用Future<V>接口的get()方法可以获得call()方法返回的结果,当调用Futureget()方法时,当前线程就开始阻塞,直到call()方法结束返回结果。

 

Future模式可以这样来描述:我有一个任务,提交给了FutureFuture我完成这个任务。期间我自己可以去做任何想做的事情。一段时间之后,我就便可以从Future那儿取出结果。就相当于下了一张订货单,一段时间后可以拿着提订单来提货,这期间可以干别的任何事情。其中Future接口就是订货单,真正处理订单的是Executor类,它根据Future接口的要求来生产产品。

 

 

二:关于 Future<V>的源码

 

 

package java.util.concurrent;

/**
* Future 表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。计算完成后只能使
* 用 get 方法来获取结果,如有必要,计算完成前可以阻塞此方法。取消则由 cancel 方法来执行。还提供了其他方法,以确     * 定任务是正常完成还是被取消了。一旦计算完成,就不能再取消计算。如果为了可取消性而使用 Future 但又不提供可用的结     * 果,则可以声明 Future<?> 形式类型、并返回 null 作为底层任务的结果。
*
  * @see FutureTask
  * @see Executor
  * @since 1.5
  * @author Doug Lea
  * @param<V> The result type returned by this Future's <tt>get</tt> method
  */
publicinterface Future<V> {

    /**
     * 试图取消对此任务的执行。如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败。当调用 cancel * 时,如果调用成功,而此任务尚未启动,则此任务将永不运行。如果任务已经启动,则 mayInterruptIfRunning 参数     
     * 确定是否应该以试图停止任务的方式来中断执行此任务的线程。
     */
    boolean cancel(boolean mayInterruptIfRunning);
    /**
     * 如果在任务正常完成前将其取消,则返回 true。   
     */
    boolean isCancelled();

    /**
     * 如果任务已完成,则返回 true。可能由于正常终止、异常或取消而完成,在所有这些情况中,此方法都将返回 true。    
     */
    boolean isDone();

    /**
     * 如有必要,等待计算完成,然后获取其结果。 
     */
    V get() throws InterruptedException, ExecutionException;

    /**
     * 如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。
     */
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

 

1)   Future接口是一个泛型接口,严格的格式应该是Future<V>,其中V代表了Future执行的任务返回值的类型

2)   Future接口提供方法来检测任务是否被执行完,等待任务执行完获得结果,也可以设置任务执行的超时时间。这个设置超时的方法就是实现Java程序执行超时的关键。

 

 

 

 

三:关于 Future实现

 

 
   



 

 

Future的实现类有FutureTaskSwingWork,其中SwingWork用于GUI编程模块,在并发包里经常用到的是FutureTask,  这里主要介绍一下FutureTask

 

 

类声明部分

 

public class FutureTask<V> implements RunnableFuture<V> {}

 

 

FutureTask: 实现了RunnableFuture<V>接口,而RunnableFuture<V>又是extends(继承) RunnableFuture<V>两个接口,所以它既可以作为Runnable被线程执行,又可以作为Future<V>得到 Callable<V>的返回值,那么这个组合的使用有什么好处呢?假设有一个很耗时的返回值需要计算,并且这个返回值不是立刻需要的话,那么就可以使用这个组 合,用另一个线程去计算返回值,而当前线程在使用这个返回值之前可以做其它的操作,等到需要这个返回值时,再通过Future<V>得到,岂不美哉!

 

变量部分


private volatile int state;
private static final int NEW          = 0;               //新建  
private static final int COMPLETING   = 1;              //执行中 
private static final int NORMAL       = 2;               //正常
private static final int EXCEPTIONAL  = 3;              //异常       
private static final int CANCELLED    = 4;              //取消
private static final int INTERRUPTING = 5;             //中断中
private static final int INTERRUPTED  = 6;             //被中断
 

state:任务的状态,最初是 NEW,完成期间,状态也许暂时呈现为COMPLETING(当结果已经被设值)或者INTERRUPTING(only while interrupting the runner to satisfy a cancel(true)),

 

可能出现的状态转换情况:

* NEW -> COMPLETING -> NORMAL                    正常完成的流程

* NEW -> COMPLETING -> EXCEPTIONAL              出现异常的流程

* NEW -> CANCELLED                                 被取消的流程

* NEW -> INTERRUPTING -> INTERRUPTED            被中断的流程

构造函数:

   /**
     *  创建一个 FutureTask,一旦运行就执行给定的 Callable。    
     */
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            thrownew NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    /**
     *  创建一个 FutureTask,一旦运行就执行给定的 Runnable,并安排成功完成时 get 返回给定的结果
     */
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
 

从构造函数中可以看出,当构造一个FutureTask时,state会被置为NEWNEW也就是所有状态变化路径的起始状态

 

FutureTask生命周期的变化,主要取决于 run()方法先被调用还是cancel ()方法会被调用,这两个方法的执行顺序决定了FutureTask的生命周期的四种走向。

 

先来看run()方法

 

publicvoid run() {
     //首先判断任务的状态,如果任务的状态值不为NEW,或 runner变量的值不为null,则返回(说明正在走或已经走了4种状态变化的一种)
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
//如果状态值是NEW,则开始执行任务
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) { //如果任务不为空,且状态为NEW,开始执行
                V result;                         //任务返回的结果
                boolean ran;
                try {
                    result = c.call();          //执行任务并返回结果
                    ran = true;                  //标记任务执行成功
                } catch (Throwable ex) {  //任务执行中发生异常
                    result = null;
                    ran = false;
                    setException(ex);           //设置异常
                }
                if (ran)                          //任务执行成功,设置结果
                    set(result);
            }
        } finally {
            runner = null;                       //runner置为null
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
 

 

1)任务执行成功,会调用set()方法设置结果

protected void set(V v) {
//如过state是NEW,把state设置成COMPLETING 
     if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
          outcome = v;
        //将任务设置成NORMAL   over the task  
          UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
          finishCompletion();
     }
}
 

 

set()方法可以看出,把任务运行的结果赋值给了outcome变量,这个执行流程导致的状态变化就是  NEW->COMPLETING->NORMAL   

 

2)任务执行中发生异常,会调用setException()方法

protecte dvoid setException(Throwable t) {
//如过state是NEW,把state设置成COMPLETING    
     if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
          outcome = t;
        //将任务设置成EXCEPTIONAL  
          UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
          finishCompletion();
     }
}
 

 

这个执行流程导致的状态变化就是  NEW->COMPLETING->EXCEPTIONAL

 

再来看cancel()方法:

 

//参数:mayInterruptIfRunning   是否中断running
publicboolean cancel(boolean mayInterruptIfRunning) { 
     if (state != NEW)      //状态不为NEW,返回
          returnfalse;
     if (mayInterruptIfRunning) { //如果应该中断执行此任务的线程
          //如过state是NEW,把state设置成INTERRUPTING
          if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
               returnfalse;
          Thread t = runner;
          if (t != null)
                t.interrupt();
         //将任务设置成INTERRUPTED
          UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
     }
     //如过state是NEW,把state设置成CANCELLED
     elseif (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
            returnfalse;
     finishCompletion();
     returntrue;
}

 

如果mayInterruptIfRunning==true,则流程为:NEW->INTERRUPTING ->INTERRUPTED, 否则流程为:NEW->CANCELLED

 

到此,四个流程走完了!

 

 

参考资料:

   JDK API文档

   http://blog.csdn.net/yangyan19870319/article/details/6093481

http://www.oschina.net/question/54100_83333

http://www.2cto.com/kf/201411/351903.html

http://blog.csdn.net/ghsau/article/details/7451464

http://blog.csdn.net/liulipuo/article/details/39029643

 

  • 大小: 6.1 KB
0
0
分享到:
评论

相关推荐

    简单谈谈ThreadPoolExecutor线程池之submit方法

    &lt;T&gt; Future&lt;T&gt; submit(Callable&lt;T&gt; task); &lt;T&gt; Future&lt;T&gt; submit(Runnable task, T result); &lt;T&gt; Future&lt;T&gt; submit(Runnable task); ... } ``` 在 AbstractExecutorService 类中,实现了 submit 方法。 ```...

    Guava-并行编程Futures详解.pdf

    ListenableFuture&lt;Integer&gt; future1 = service.submit(new Callable&lt;Integer&gt;() { public Integer call() throws InterruptedException { Thread.sleep(1000); System.out.println("call future 1."); return 1...

    AbrastractExecutorService

    - **invokeAll(Collection&lt;Callable&lt;T&gt;&gt; tasks, long timeout, TimeUnit unit)** - **描述**:执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的`Future`列表。 - **...

    Executor,Executors,ExecutorService比较.docx

    - **invokeAll**(Collection&lt;Callable&lt;T&gt;&gt; tasks)**:执行所有的任务,返回一个包含每个任务结果的`Future`列表。 - **shutdown()**:关闭线程池,不再接受新的任务,但会等待已提交的任务执行完。 - **shutdown...

    Executor框架使用详解

    Executor框架是Java并发编程的核心组件,它在Java 5中被引入,极大地简化了多线程编程。这个框架是基于`java.util.concurrent`包中的接口和类构建的,旨在提供线程池服务、任务调度以及并发执行任务的能力。Executor...

    31 凭票取餐—Future模式详解.pdf

    FutureTask&lt;String&gt; cookTask = new FutureTask&lt;&gt;(new Callable&lt;String&gt;() { @Override public String call() throws Exception { Thread.sleep(3000); // 模拟做饭耗时3秒 return "5斤的龙虾"; } }); Long ...

    Java组合式异步编程方法详解.pdf

    Future&lt;Double&gt; future = executor.submit(new Callable&lt;Double&gt;() { public Double call() { return doSomeLongComputation(); } }); doSomethingElse(); try { Double result = future.get(1, TimeUnit....

    java面试精华14

    FutureTask&lt;Integer&gt; futureTask = new FutureTask&lt;&gt;(new Callable&lt;Integer&gt;() { @Override public Integer call() { return 2; } }); executor.submit(futureTask); System.out.println(futureTask.get());...

    线程池的submit和execute的区别.md

    - **3.2.1** 对于需要等待任务完成并获取执行结果的场景,如异步计算任务,应优先考虑使用`submit(Callable&lt;T&gt; task)`方法。 - **3.2.2** 对于只需要异步执行但不关心执行结果的任务,可以使用`submit(Runnable task...

    JavaThreaddemo_DEMO_tidecme_线程池Java_源码.zip

    - `submit(Callable&lt;T&gt; task)`: 提交一个Callable任务到线程池,返回Future对象,可以通过Future获取任务结果。 - `invokeAll(Collection&lt;? extends Callable&lt;T&gt;&gt; tasks)`: 执行所有给定的任务,等待所有任务完成...

    Spring Boot 异步框架的使用详解

    ListenableFuture&lt;String&gt; future = new AsyncResult&lt;&gt;("Hello, Asynchronous!"); future.addCallback(new ListenableFutureCallback&lt;String&gt;() { @Override public void onSuccess(String result) { // 处理...

    java Future 接口使用方法详解

    FutureTask&lt;String&gt; future = new FutureTask&lt;&gt;(new Callable&lt;String&gt;() { public String call() { // 实际任务代码 } }); executor.execute(future); try { result = future.get(5, TimeUnit.SECONDS); // 设置...

    Java多线程的4种实现方式(源代码)

    Future&lt;Integer&gt; future2 = executor.submit(new MyCallable(10)); System.out.println("Result from thread 1: " + future1.get()); System.out.println("Result from thread 2: " + future2.get()); ...

    Android Executor线程池

    3. 工作队列(BlockingQueue&lt;Runnable&gt; workQueue):存储待执行任务的队列,限制同时执行的任务数量。 4. 空闲线程存活时间(long keepAliveTime):当线程池中的线程数量超过核心线程数时,空闲线程等待新任务的...

    Java 高并发七:并发设计模型详解

    BlockingQueue&lt;Integer&gt; queue = new ArrayBlockingQueue&lt;&gt;(10); Thread producerThread = new Thread(new Producer(queue)); Thread consumerThread = new Thread(new Consumer(queue)); producerThread....

    详解三种java实现多线程的方式

    Future&lt;String&gt; future1 = executor.submit(new Task("Task 1")); Future&lt;String&gt; future2 = executor.submit(new Task("Task 2")); try { System.out.println(future1.get()); System.out.println(future2....

    顶层接口Executors详解

    工作单元包括Runnable和Callable,而执行机制由Executor框架提供。 2. 框架结构 Executor框架采用的是两级调度模型。在HotSpot VM的线程模型中,Java线程被一对一映射为本地操作系统线程。Java线程启动时会创建一...

    Java并发程序设计 并发

    Future&lt;Integer&gt; future = executor.submit(new Callable&lt;Integer&gt;() { @Override public Integer call() throws Exception { return 1; } }); int result = future.get(); // 获取结果 ``` 2. **阻塞队列*...

    Java并发框架:Executor API详解

    Java并发框架中的Executor API是Java多线程编程的重要组成部分,它是Java从JDK 1.5版本开始引入的,旨在提供一种更高效、更可控的线程管理方式。Executor API的核心在于`java.util.concurrent.Executor`接口,它定义...

    java的concurrent用法详解

    Future&lt;Integer&gt; future = executor.submit(() -&gt; { System.out.println("Task is running..."); Thread.sleep(1000); return 42; }); System.out.println("Task result: " + future.get()); executor....

Global site tag (gtag.js) - Google Analytics