前言
Future直接翻译成中文“将来”,是JUC并发包中定义的一个接口,用于控制线程池中任务:表示一个任务的生命周期,并提供方法来判断任务是否已经完成或者取消,并且还提供阻塞的获取任务执行结果的方法。
Future只能配合线程池使用,作为主线程向线程池提交任务后的返回值,但需要注意的是这时主线程并不会阻塞,只有当需要线程执行结果时调用Future的get方法时,才可能会阻塞。这就是Future强大的之处,下面看下Future的简单用法:
public class FutureTest { public static void main(String[] args) { ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newCachedThreadPool(); Future<String> taskret = executorService.submit(new CallTask()); //主线程可以继续执行自己的业务逻辑 try { //在需要返回值时,调用get方法获取(可能阻塞),如果该任务已经执行完成 主线程不会阻塞。 String ob= taskret.get(); System.out.println(ob); } catch (Exception e) { e.printStackTrace(); } } } class CallTask implements Callable<String>{ @Override public String call() throws Exception { return "call任务执行"; } }
本示例中通过submit方法向线程池提交任务是非阻塞的,所以主线程在提交任务后,可以执行执行其他业务逻辑,在需要任务返回值的地方,调用Future的get方法获取返回值。此时就分为两种情况,如果任务还没有执行完成,此时主线程会阻塞,直到任务执行完成并获取返回值;如果任务已经执行完成,直接获取到返回值。这种巧妙的做法,是如何实现的呢?
在前面有一次分享中对线程池ThreadPoolExecutor进行了讲解(点击这里http://moon-walker.iteye.com/blog/2406788),其提交任务的使用的是execute方法,并且参数是实现Runnable的对象;而如果要使用Future,需要使用submit方法提交任务,并且提交的任务是实现了Callable的对象(也可以是Runnable的,后面细讲)。这又是为何?
submit方法不是在ThreadPoolExecutor中实现的的,而是在其父类AbstractExecutorService中实现的。其最终也是调用的ThreadPoolExecutor的execute方法提交任务,而execute方法的参数是Runnable型,这中间就有个步骤是把Callable转化成Runnable。这是如何实现的呢?
要弄清楚这三个问题,就要搞清楚线程池、Future、Callable它们的实现原理。
Callable接口
这个接口很简单,只定义了一个方法call,并且带有一个返回值。与Runnable接口类似,只是Runnable的run方法没有返回值。Runnable类型的对象可以直接作为Threand的参数,在线程中运行;而Callable类型的对象却不行,只能通过AbstractExecutorService的submit方法进行一次转换。
AbstractExecutorService的submit方法
文章开头的示例中调用submit方法提交任务,本质上执行的是AbstractExecutorService的submit。
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task);//包装成Runnable类型对象(FutureTask) execute(ftask);//调用ThreadPoolExecutor的execute方法提交任务 return ftask;//返回这个对象 }
这个方法核心就是Callable对象包装成Runnable类型的任务对象,也可以就调用ThreadPoolExecutor的execute方法提交任务。这个对象的类型是RunnableFuture类型,最终返回的也是这个包装后的对象,返回类型是Future。说明RunnableFuture即使Runnable类型,又是Future类型,看下RunnableFuture的定义,可以看到RunnableFuture接口,同时继承了Runnable和Future。
public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
由于RunnableFuture是接口,那被包装后的对象具体的实现类型是什么呢?再看下newTaskFor方法的实现就一目了然了:
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
可以看到最终被包装的对象类型是FutureTask,它实现了RunnableFuture接口。也就是说Future<String> taskret = executorService.submit(new CallTask());这段代码中返回的Future对象真实类型是FutureTask。在Future中定义的对任务生命周期的相关操作的所有方法,都在FutureTask中实现,比如:cancel(取消任务)、isCancelled(是否取消)、isDone(是否完成)、get(获取返回值,以及延时获取返回值)。
另外submit还有两个重载方法,参数是Runnable的对象,最终会通过一个适配器RunnableAdapter,把Runnable转换成一个Callable,只是返回值是指定的返回值(有一个重载方法返回是null)。这两个方法不是核心流程就不贴代码了,理解起来比较简单。
FutureTask实现原理
FutureTask的构造方法
在讲解这些实现方法前,先来看下是如果把Callable对象包装成FutureTask对象的。这步操作是在FutureTask的构造方法中完成的:
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; //使用组合的方式把callable作为自己的成员变量 this.state = NEW; }
这里使用了组合的方式把callable对象封装到FutureTask的成员变量中,至此Callable、FutureTask、Future、Runnable、RunnableFuture的关系就梳理清楚了,类图关系如下:
FutureTask的run方法
把FutureTask对象提交给ThreadPoolExecutor线程池后,任务在执行是会调用FutureTask的run方法(详见之前对ThreadPoolExecutor runWorker方法的讲解,点击这里)。
FutureTask的run方法最终会调用Callable的的call方法,这个方法带有返回值,此时就可以获得这个返回值,具体实现如下:
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); //调用Callable的call方法获的返回值 ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result);//如果 任务执行成功,设置返回值。 } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
该方法实现比较简单,核心就是result = c.call();获取到任务的返回值,如果任务执行成功就通过set(result)方法为成员变量outcome设置,主线程中通过FutureTask的get方法获取的就是这个值。同时执行run方法会根据执行的结果,修改任务的状态,FutureTask中定义任务的状态类型如下,完成的类型有4种: normal(正常结束)、exceptional(异常结束)、cancelled(取消)、interrupted(中断):
FutureTask的get方法
在主线程中调用Future的get方法本质上是调用的FutureTask的get方法 以获取返回值。大致思想就是如果任务状态已经完成(上述讲的4种状态),则返回执行结果;如果还未完成,则主线程阻塞(调用get方法的线程)。get方法具体实现如下:
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) //小于等于1表示还没有完成,结合上面的状态字段理解 s = awaitDone(false, 0L);//轮询、阻塞等待任务执行 return report(s);//根据不同的状态返回不同的值 }
如果状态字段state小于等于1,即不是上述讲的4种完成状态,就调用awaitDone方法阻塞,直到被唤醒,再次判断状态是否完成,最后返回的是完成状态。然后report(s)方法会根据状态获取不同的返回值,返回个主线程,整个get方法执行结束。
这里重点看下awaitDone方法:
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) {//如果已经完成就返回 if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode();//创建等待节点 else if (!queued)//如果不在等待队列就加入等待队列 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) {//如果是延时获取,就行延时阻塞 nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else//否则直接阻塞 LockSupport.park(this); } }
结合给出的注释应该很好理解,但需要注意两点:
1、线程被阻塞了,什么时候被唤醒?这个是在上述run方法执行完成后,会调用finishCompletion()方法,在这个方法中会唤醒awaitDone继续执行。
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t);//唤醒awaitDone阻塞 } //从队列中移除当前线程节点 WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
2、这里的WaitNode是等待节点,每个节点中包含一个调用get方法被阻塞的“主线程”。这些节点会组成一个单向链表,从队列中移除的工作在awaitDone和finishCompletion中都会进行。在jdk1.6时是通过AQS实现的,jdk 1.7以后做了简化,这里贴出的是jdk1.8的代码,感兴趣的朋友可以切到jdk1.6看下。
如果说set方法和get方法是相对的话,那finishCompletion和awaitDone就是相对的,理解了这两个方法,基本就理解了FutureTask。
还有一个延时get方法,实现流程与get方法基本相对,只是在awaitDone方法中会使用延时阻塞。
FutureTask的cancel取消任务方法
如果任务还可以开始,可以通过Future的cancel方法取消任务执行,本质上是通过修改FutureTask的状态为CANCELLED取消状态。但线程池还是会执行FutureTask的run方法,只是,在run方法的开始就会判断当前的状态如果不是NEW(初始状态),run方法直接结束:
public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && //cas修改状态为CANCELLED UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; //省略其他内容 } public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; //剩余其他内容 }
另外isCancelled(是否取消)、isDone(是否完成)的实现就很简单了,直接判断比较当前状态值就行了:
public boolean isCancelled() { return state >= CANCELLED; } public boolean isDone() { return state != NEW; //注意这里只要任务开始就表示完成了。 }
需要注意的是isDone方法:FutureTask的实现是只要开始就算完成了,也就是说只要开始了任务就不能取消了,只能等到任务执行完成。而任务真正的任务完成,状态必须是上述讲的4中完成状态之一。至此FutureTask实现原理讲解完毕。
总结
Future直接结合线程池一起使用,它定义了一些操作任务的方法(取消任务、获取任务执行结果等),它的最终实现类为FutureTask。通过这个类的相关实现,可以把一个Callable封装到FutureTask,FutureTask同时实现了Runnable和Future接口,可以把FutureTask提交给线程池执行的同时,也可以通过这个对象获取任务的执行情况。
相关推荐
Java线程池是Java并发编程中的重要组成部分,它在多线程和高并发场景下扮演着关键角色。本文将深入探讨Java线程池的源码分析,并对比不同类型的线程池,以帮助开发者更好地理解和利用这一强大的工具。 首先,我们要...
Java线程池(ThreadPool)是Java并发编程中的一个重要概念,它可以帮助我们有效地管理和控制并发执行的任务,从而提高系统的效率和稳定性。线程池通过复用已存在的线程,避免了频繁创建和销毁线程带来的开销,同时也...
Java线程池是一种高效管理线程资源的工具,它通过维护一组可重用的线程来减少创建和销毁线程的开销。在Java中,`java.util.concurrent`包提供了`ExecutorService`接口和它的实现类,如`ThreadPoolExecutor`,来支持...
在Java开发中,处理大批量数据时,合理利用线程池可以显著提高程序的执行效率和资源利用率。本文将深入探讨如何在Java中使用线程池来查询大量数据,以及这样做的好处和实现方法。 首先,理解线程池的概念至关重要。...
NULL 博文链接:https://waitingkkk-163-com.iteye.com/blog/2232286
"详细分析JAVA线程池" Java线程池是Java编程语言中的一种机制,用于管理和重用线程,以提高程序的性能和效率。下面是Java线程池的相关知识点: 1. 什么是线程池? 线程池是指在程序启动时创建的一组空闲线程,程序...
Java线程池是Java并发编程中的重要组成部分,它在Java 5及后续版本中引入,大大简化了线程管理和资源调度。线程池通过`Executor`接口和`ExecutorService`接口提供了一套强大的机制,允许开发者高效地创建、管理和...
Java线程池是一种高效管理线程资源的工具,它的出现是为了优化系统性能,尤其是在需要频繁创建和销毁线程的情况下。由于操作系统创建新线程需要进行内存分配、上下文切换等操作,这涉及到一定的开销。因此,使用...
Java线程池是一种高效管理线程的工具,它允许开发者预先配置一组线程,以便在处理并发任务时能更好地控制系统的资源。线程池的概念源于服务器应用程序中对大量短小任务处理的需求,避免频繁创建和销毁线程带来的性能...
Java线程池是Java并发编程中的重要组成部分,它在多线程编程中扮演着至关重要的角色,有效地管理和调度线程资源,提高了程序的性能和稳定性。本资源包含了一个经典的Java线程池实现,适用于大型项目,能帮助开发者...
Java线程池是一种高效管理线程的工具,它允许开发者预先定义好一组线程,然后根据需要分配任务,而不是每次需要执行任务时都创建新的线程。这种设计模式可以显著提高系统的性能,减少线程创建和销毁的开销,同时通过...
Java线程池实现批量下载文件 Java线程池实现批量下载文件是指使用Java语言和线程池机制来实现批量下载文件的功能。这种方法可以大幅提高下载速度和效率,特别是当需要下载大量文件时。下面将对Java线程池实现批量...
Java线程池是一种高效管理并发任务的工具,它通过复用线程来减少创建和销毁线程的开销,从而提高系统性能。线程池的核心组成部分包括工作队列(Work Queue)、工作线程(Worker Threads)和任务调度器(Executor)。...
我们可以通过`submit()`方法将任务(如处理用户消息)提交到线程池,然后通过`Future`对象获取任务执行的结果。 为了实现多用户聊天功能,还需要设计消息的序列化和反序列化机制。这可能涉及到JSON或XML等数据格式...
JAVA线程池是一种高效管理线程的技术,它允许开发者预设线程数量,避免无限制地创建和销毁线程带来的开销。线程池通过控制并发级别,优化系统资源使用,提升系统的稳定性和响应速度。 1. **线程池的优势** - **...
Java中的`Future`和`FutureTask`是并发编程中重要的工具,它们允许程序异步执行任务并获取结果。`Future`接口提供了对异步计算结果的访问和控制,而`FutureTask`是`Future`的一个具体实现,它还同时实现了`Runnable`...
- **提交任务**:使用`executorService.submit(Runnable task)` 将任务提交给线程池,返回`Future`对象,可以用于获取任务执行结果。 - **关闭线程池**:在所有任务执行完毕后,调用 `executorService.shutdown()`...
在深入理解高并发编程,尤其是Java线程池核心技术时,我们首先要明白线程与多线程的概念。线程是操作系统中的基本调度单元,它比进程更小,且基本不拥有系统资源,主要由程序计数器、寄存器和栈等组成。在同一个进程...
Java线程池是一种高效管理并发任务的机制,它通过复用已存在的线程来减少线程创建和销毁的开销,从而提高系统的整体性能。本文将深入解析Java线程池的工作原理,并给出创建简单实例的步骤。 线程池的核心在于`java....
Java线程池是一种高效管理并发任务执行的机制,它通过维护一组可重用的线程,减少了创建和销毁线程的开销。本压缩包包含的`ThreadPool.java`、`PooledThread.java`和`ThreadTask.java`是实现线程池功能的核心组件。 ...