一:关于 Callable<V>的源码
- 浏览: 234519 次
- 性别:
- 来自: 上海
文章分类
最新评论
-
cherami:
解法3有问题,在n很大的时候会导致baseNum溢出使得结果不 ...
阶乘算法之一N! 末尾有多少个零 -
yubenjie:
我怎么没看出多线程啊,就单线程再跑嘛
多线程编程之理财 -
fei229670104:
多线程 不错
多线程编程之理财 -
fei229670104:
liujiafei_2007 写道你好,问个问题,取钱时不用判 ...
多线程编程之存钱与取钱 -
liujiafei_2007:
你好,问个问题,取钱时不用判断取出的金额是否大于账户的余额吗? ...
多线程编程之存钱与取钱
package java.util.concurrent; public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
Callable<V>: 返回结果并且可能抛出异常的任务。实现者定义了一个不带任何参数的叫做 call的方法。 Callable<V>接口类似于 Runnable,两者都是为那些其实例可能被另一个线程执行的类设计的。但是 Runnable不会返回结果,并且无法抛出经过检查的异常。 Executors类包含一些从其他普通形式转换成 Callable<V>类的实用方法。
在并发编程时,一般使用Runnable接口,然后扔给线程池完事,这种情况下不需要线程的结果。 所以run()的返回值是void类型。如下代码所示:
package future; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class FutureTest { public static class Task implements Runnable{ @Override public void run() { System.out.println("做任务----"+Thread.currentThread().getName()); } } public static void main(String[] args) { //创建线程池 ExecutorService es = Executors.newCachedThreadPool(); for(int i=0;i<100;i++){ es.submit(new Task()); } } }
如果是一个多线程协作程序,比如菲波拉切数列,1,1,2,3,5,8...使用多线程来计算。
但后者需要前者的结果,就需要用Callable<V>接口了。如下代码所示:
package future; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class FutureTest2 { public static class Task implements Callable<String>{ @Override public String call() throws Exception { System.out.println("做任务----"+Thread.currentThread().getName()); return "返回运算结果"; } } public static void main(String[] args) { //创建线程池 ExecutorService es = Executors.newCachedThreadPool(); for(int i=0;i<100;i++){ es.submit(new Task()); } } }
我们看到,当Task类实现Callable接口后,重写了call()方法,call()方法是有返回值的。但我们如何来接受call()方法的返回值呢?
package future; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class FutureTest3 { public static class Task implements Callable<String>{ @Override public String call() throws Exception { System.out.println("做任务----"+Thread.currentThread().getName()); return "返回运算结果"; } } public static void main(String[] args) throws InterruptedException, ExecutionException { List<Future<String>> result = new ArrayList<Future<String>>(); //创建线程池 ExecutorService es = Executors.newCachedThreadPool(); for(int i=0;i<100;i++){ result.add(es.submit(new Task())); } for(Future<String> f : result){ System.out.println(f.get()); } } }
从这里可以看出,这时候,Future<V>就出场了,使用Future<V>接口的get()方法可以获得call()方法返回的结果,当调用Future的get()方法时,当前线程就开始阻塞,直到call()方法结束返回结果。
Future模式可以这样来描述:我有一个任务,提交给了Future,Future替我完成这个任务。期间我自己可以去做任何想做的事情。一段时间之后,我就便可以从Future那儿取出结果。就相当于下了一张订货单,一段时间后可以拿着提订单来提货,这期间可以干别的任何事情。其中Future接口就是订货单,真正处理订单的是Executor类,它根据Future接口的要求来生产产品。
二:关于 Future<V>的源码
package java.util.concurrent; /** * Future 表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。计算完成后只能使 * 用 get 方法来获取结果,如有必要,计算完成前可以阻塞此方法。取消则由 cancel 方法来执行。还提供了其他方法,以确 * 定任务是正常完成还是被取消了。一旦计算完成,就不能再取消计算。如果为了可取消性而使用 Future 但又不提供可用的结 * 果,则可以声明 Future<?> 形式类型、并返回 null 作为底层任务的结果。 * * @see FutureTask * @see Executor * @since 1.5 * @author Doug Lea * @param<V> The result type returned by this Future's <tt>get</tt> method */ publicinterface Future<V> { /** * 试图取消对此任务的执行。如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败。当调用 cancel * 时,如果调用成功,而此任务尚未启动,则此任务将永不运行。如果任务已经启动,则 mayInterruptIfRunning 参数 * 确定是否应该以试图停止任务的方式来中断执行此任务的线程。 */ boolean cancel(boolean mayInterruptIfRunning); /** * 如果在任务正常完成前将其取消,则返回 true。 */ boolean isCancelled(); /** * 如果任务已完成,则返回 true。可能由于正常终止、异常或取消而完成,在所有这些情况中,此方法都将返回 true。 */ boolean isDone(); /** * 如有必要,等待计算完成,然后获取其结果。 */ V get() throws InterruptedException, ExecutionException; /** * 如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。 */ V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
1) Future接口是一个泛型接口,严格的格式应该是Future<V>,其中V代表了Future执行的任务返回值的类型
2) Future接口提供方法来检测任务是否被执行完,等待任务执行完获得结果,也可以设置任务执行的超时时间。这个设置超时的方法就是实现Java程序执行超时的关键。
三:关于 Future实现
Future的实现类有FutureTask和SwingWork,其中SwingWork用于GUI编程模块,在并发包里经常用到的是FutureTask类, 这里主要介绍一下FutureTask类
类声明部分:
public class FutureTask<V> implements RunnableFuture<V> {}
FutureTask: 实现了RunnableFuture<V>接口,而RunnableFuture<V>又是extends(继承) Runnable和Future<V>两个接口,所以它既可以作为Runnable被线程执行,又可以作为Future<V>得到 Callable<V>的返回值,那么这个组合的使用有什么好处呢?假设有一个很耗时的返回值需要计算,并且这个返回值不是立刻需要的话,那么就可以使用这个组 合,用另一个线程去计算返回值,而当前线程在使用这个返回值之前可以做其它的操作,等到需要这个返回值时,再通过Future<V>得到,岂不美哉!
变量部分:
private volatile int state; private static final int NEW = 0; //新建 private static final int COMPLETING = 1; //执行中 private static final int NORMAL = 2; //正常 private static final int EXCEPTIONAL = 3; //异常 private static final int CANCELLED = 4; //取消 private static final int INTERRUPTING = 5; //中断中 private static final int INTERRUPTED = 6; //被中断
state:任务的状态,最初是 NEW,完成期间,状态也许暂时呈现为COMPLETING(当结果已经被设值)或者INTERRUPTING(only while interrupting the runner to satisfy a cancel(true)),
可能出现的状态转换情况:
* NEW -> COMPLETING -> NORMAL 正常完成的流程
* NEW -> COMPLETING -> EXCEPTIONAL 出现异常的流程
* NEW -> CANCELLED 被取消的流程
* NEW -> INTERRUPTING -> INTERRUPTED 被中断的流程
构造函数:
/** * 创建一个 FutureTask,一旦运行就执行给定的 Callable。 */ public FutureTask(Callable<V> callable) { if (callable == null) thrownew NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } /** * 创建一个 FutureTask,一旦运行就执行给定的 Runnable,并安排成功完成时 get 返回给定的结果 */ public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
从构造函数中可以看出,当构造一个FutureTask时,state会被置为NEW(NEW也就是所有状态变化路径的起始状态)
FutureTask生命周期的变化,主要取决于 run()方法先被调用还是cancel ()方法会被调用,这两个方法的执行顺序决定了FutureTask的生命周期的四种走向。
先来看run()方法:
publicvoid run() { //首先判断任务的状态,如果任务的状态值不为NEW,或 runner变量的值不为null,则返回(说明正在走或已经走了4种状态变化的一种) if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; //如果状态值是NEW,则开始执行任务 try { Callable<V> c = callable; if (c != null && state == NEW) { //如果任务不为空,且状态为NEW,开始执行 V result; //任务返回的结果 boolean ran; try { result = c.call(); //执行任务并返回结果 ran = true; //标记任务执行成功 } catch (Throwable ex) { //任务执行中发生异常 result = null; ran = false; setException(ex); //设置异常 } if (ran) //任务执行成功,设置结果 set(result); } } finally { runner = null; //runner置为null // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
1)任务执行成功,会调用set()方法设置结果
protected void set(V v) { //如过state是NEW,把state设置成COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; //将任务设置成NORMAL over the task UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
从set()方法可以看出,把任务运行的结果赋值给了outcome变量,这个执行流程导致的状态变化就是 NEW->COMPLETING->NORMAL
2)任务执行中发生异常,会调用setException()方法
protecte dvoid setException(Throwable t) { //如过state是NEW,把state设置成COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; //将任务设置成EXCEPTIONAL UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
这个执行流程导致的状态变化就是 NEW->COMPLETING->EXCEPTIONAL
再来看cancel()方法:
//参数:mayInterruptIfRunning 是否中断running publicboolean cancel(boolean mayInterruptIfRunning) { if (state != NEW) //状态不为NEW,返回 returnfalse; if (mayInterruptIfRunning) { //如果应该中断执行此任务的线程 //如过state是NEW,把state设置成INTERRUPTING if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING)) returnfalse; Thread t = runner; if (t != null) t.interrupt(); //将任务设置成INTERRUPTED UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state } //如过state是NEW,把state设置成CANCELLED elseif (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) returnfalse; finishCompletion(); returntrue; }
如果mayInterruptIfRunning==true,则流程为:NEW->INTERRUPTING ->INTERRUPTED, 否则流程为:NEW->CANCELLED
到此,四个流程走完了!
参考资料:
JDK API文档
http://blog.csdn.net/yangyan19870319/article/details/6093481
http://www.oschina.net/question/54100_83333
http://www.2cto.com/kf/201411/351903.html
http://blog.csdn.net/ghsau/article/details/7451464
http://blog.csdn.net/liulipuo/article/details/39029643
发表评论
-
ExecutorService | Executor框架
2015-09-11 17:29 1719Excutor框架结构图: 上一篇讲了Execut ... -
Executor入门 | Executor框架
2015-09-07 10:07 1603讲到并发就不得不讲一下Executor框架,其框架主要类关系 ... -
LockSupport详解 | Java并发编程
2015-08-31 17:52 4382我们一再提线程、锁等概念,但锁是如果实现的呢?又 ... -
线程协作-Condition介绍
2015-07-22 15:55 1987上一篇文章里讲了java.util.concurr ... -
Lock详解 | Java并发编程
2015-07-21 10:40 3385在之前的多线程编程的文章中我们讲到了如何使用关 ... -
多线程编程之理财
2015-07-16 10:37 1599现实生活中,我们一边工作,一边消费,正常情况下 ... -
多线程编程之join()方法
2015-07-07 17:12 3498现实生活中,有些工作是需要团队中成员依次完成的,这就涉及到 ... -
多线程编程之存钱与取钱
2015-07-06 09:44 3922生活费问题是这样的:学生每月都需要生活费,家长一次预存一 ... -
多线程编程之卫生间
2015-07-01 11:33 1940如大家所知,火车上车厢的卫生间很小,每次只能容纳一个人,一 ...
相关推荐
<T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); <T> Future<T> submit(Runnable task); ... } ``` 在 AbstractExecutorService 类中,实现了 submit 方法。 ```...
ListenableFuture<Integer> future1 = service.submit(new Callable<Integer>() { public Integer call() throws InterruptedException { Thread.sleep(1000); System.out.println("call future 1."); return 1...
- **invokeAll(Collection<Callable<T>> tasks, long timeout, TimeUnit unit)** - **描述**:执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的`Future`列表。 - **...
- **invokeAll**(Collection<Callable<T>> tasks)**:执行所有的任务,返回一个包含每个任务结果的`Future`列表。 - **shutdown()**:关闭线程池,不再接受新的任务,但会等待已提交的任务执行完。 - **shutdown...
Executor框架是Java并发编程的核心组件,它在Java 5中被引入,极大地简化了多线程编程。这个框架是基于`java.util.concurrent`包中的接口和类构建的,旨在提供线程池服务、任务调度以及并发执行任务的能力。Executor...
FutureTask<String> cookTask = new FutureTask<>(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(3000); // 模拟做饭耗时3秒 return "5斤的龙虾"; } }); Long ...
Future<Double> future = executor.submit(new Callable<Double>() { public Double call() { return doSomeLongComputation(); } }); doSomethingElse(); try { Double result = future.get(1, TimeUnit....
FutureTask<Integer> futureTask = new FutureTask<>(new Callable<Integer>() { @Override public Integer call() { return 2; } }); executor.submit(futureTask); System.out.println(futureTask.get());...
- **3.2.1** 对于需要等待任务完成并获取执行结果的场景,如异步计算任务,应优先考虑使用`submit(Callable<T> task)`方法。 - **3.2.2** 对于只需要异步执行但不关心执行结果的任务,可以使用`submit(Runnable task...
- `submit(Callable<T> task)`: 提交一个Callable任务到线程池,返回Future对象,可以通过Future获取任务结果。 - `invokeAll(Collection<? extends Callable<T>> tasks)`: 执行所有给定的任务,等待所有任务完成...
ListenableFuture<String> future = new AsyncResult<>("Hello, Asynchronous!"); future.addCallback(new ListenableFutureCallback<String>() { @Override public void onSuccess(String result) { // 处理...
FutureTask<String> future = new FutureTask<>(new Callable<String>() { public String call() { // 实际任务代码 } }); executor.execute(future); try { result = future.get(5, TimeUnit.SECONDS); // 设置...
Future<Integer> future2 = executor.submit(new MyCallable(10)); System.out.println("Result from thread 1: " + future1.get()); System.out.println("Result from thread 2: " + future2.get()); ...
3. 工作队列(BlockingQueue<Runnable> workQueue):存储待执行任务的队列,限制同时执行的任务数量。 4. 空闲线程存活时间(long keepAliveTime):当线程池中的线程数量超过核心线程数时,空闲线程等待新任务的...
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10); Thread producerThread = new Thread(new Producer(queue)); Thread consumerThread = new Thread(new Consumer(queue)); producerThread....
Future<String> future1 = executor.submit(new Task("Task 1")); Future<String> future2 = executor.submit(new Task("Task 2")); try { System.out.println(future1.get()); System.out.println(future2....
工作单元包括Runnable和Callable,而执行机制由Executor框架提供。 2. 框架结构 Executor框架采用的是两级调度模型。在HotSpot VM的线程模型中,Java线程被一对一映射为本地操作系统线程。Java线程启动时会创建一...
Future<Integer> future = executor.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { return 1; } }); int result = future.get(); // 获取结果 ``` 2. **阻塞队列*...
Java并发框架中的Executor API是Java多线程编程的重要组成部分,它是Java从JDK 1.5版本开始引入的,旨在提供一种更高效、更可控的线程管理方式。Executor API的核心在于`java.util.concurrent.Executor`接口,它定义...
Future<Integer> future = executor.submit(() -> { System.out.println("Task is running..."); Thread.sleep(1000); return 42; }); System.out.println("Task result: " + future.get()); executor....