`

java线程池之Future

阅读更多

前言

 

Future直接翻译成中文将来,是JUC并发包中定义的一个接口,用于控制线程池中任务:表示一个任务的生命周期,并提供方法来判断任务是否已经完成或者取消,并且还提供阻塞的获取任务执行结果的方法。

 

Future只能配合线程池使用,作为主线程向线程池提交任务后的返回值,但需要注意的是这时主线程并不会阻塞,只有当需要线程执行结果时调用Futureget方法时,才可能会阻塞。这就是Future强大的之处,下面看下Future的简单用法:

public class FutureTest {
    public static void main(String[] args) {
        ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newCachedThreadPool();
        Future<String> taskret = executorService.submit(new CallTask());
        //主线程可以继续执行自己的业务逻辑
        try {
            //在需要返回值时,调用get方法获取(可能阻塞),如果该任务已经执行完成 主线程不会阻塞。
            String ob= taskret.get();
            System.out.println(ob);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
 
class CallTask implements Callable<String>{
    @Override
    public String call() throws Exception {
 
        return "call任务执行";
    }
}

 

本示例中通过submit方法向线程池提交任务是非阻塞的,所以主线程在提交任务后,可以执行执行其他业务逻辑,在需要任务返回值的地方,调用Futureget方法获取返回值。此时就分为两种情况,如果任务还没有执行完成,此时主线程会阻塞,直到任务执行完成并获取返回值;如果任务已经执行完成,直接获取到返回值。这种巧妙的做法,是如何实现的呢?

 

在前面有一次分享中对线程池ThreadPoolExecutor进行了讲解(点击这里http://moon-walker.iteye.com/blog/2406788),其提交任务的使用的是execute方法,并且参数是实现Runnable的对象;而如果要使用Future,需要使用submit方法提交任务,并且提交的任务是实现了Callable的对象(也可以是Runnable的,后面细讲)。这又是为何?

 

submit方法不是在ThreadPoolExecutor中实现的的,而是在其父类AbstractExecutorService中实现的。其最终也是调用的ThreadPoolExecutorexecute方法提交任务,而execute方法的参数是Runnable型,这中间就有个步骤是把Callable转化成Runnable。这是如何实现的呢?

 

要弄清楚这三个问题,就要搞清楚线程池、FutureCallable它们的实现原理。

 

Callable接口

这个接口很简单,只定义了一个方法call,并且带有一个返回值。与Runnable接口类似,只是Runnablerun方法没有返回值。Runnable类型的对象可以直接作为Threand的参数,在线程中运行;而Callable类型的对象却不行,只能通过AbstractExecutorServicesubmit方法进行一次转换。

 

AbstractExecutorServicesubmit方法

文章开头的示例中调用submit方法提交任务,本质上执行的是AbstractExecutorServicesubmit

public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);//包装成Runnable类型对象(FutureTask)
        execute(ftask);//调用ThreadPoolExecutor的execute方法提交任务
        return ftask;//返回这个对象
    }

 

这个方法核心就是Callable对象包装成Runnable类型的任务对象,也可以就调用ThreadPoolExecutorexecute方法提交任务。这个对象的类型是RunnableFuture类型,最终返回的也是这个包装后的对象,返回类型是Future。说明RunnableFuture即使Runnable类型,又是Future类型,看下RunnableFuture的定义,可以看到RunnableFuture接口,同时继承了RunnableFuture

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

由于RunnableFuture是接口,那被包装后的对象具体的实现类型是什么呢?再看下newTaskFor方法的实现就一目了然了:

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
}

 

可以看到最终被包装的对象类型是FutureTask,它实现了RunnableFuture接口。也就是说Future<String> taskret = executorService.submit(new CallTask());这段代码中返回的Future对象真实类型是FutureTask。在Future中定义的对任务生命周期的相关操作的所有方法,都在FutureTask中实现,比如:cancel(取消任务)、isCancelled(是否取消)isDone(是否完成)get(获取返回值,以及延时获取返回值)

 

另外submit还有两个重载方法,参数是Runnable的对象,最终会通过一个适配器RunnableAdapter,把Runnable转换成一个Callable,只是返回值是指定的返回值(有一个重载方法返回是null)。这两个方法不是核心流程就不贴代码了,理解起来比较简单。

 

FutureTask实现原理

 

FutureTask的构造方法

在讲解这些实现方法前,先来看下是如果把Callable对象包装成FutureTask对象的。这步操作是在FutureTask的构造方法中完成的:

public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;   //使用组合的方式把callable作为自己的成员变量
        this.state = NEW;     
}
 

 

这里使用了组合的方式把callable对象封装到FutureTask的成员变量中,至此CallableFutureTaskFutureRunnableRunnableFuture的关系就梳理清楚了,类图关系如下:



 

 

FutureTaskrun方法

FutureTask对象提交给ThreadPoolExecutor线程池后,任务在执行是会调用FutureTaskrun方法(详见之前对ThreadPoolExecutor runWorker方法的讲解,点击这里)

 

FutureTaskrun方法最终会调用Callable的的call方法,这个方法带有返回值,此时就可以获得这个返回值,具体实现如下:

public void run() {
        if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                        null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call(); //调用Callable的call方法获的返回值
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);//如果 任务执行成功,设置返回值。
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
}
 

 

该方法实现比较简单,核心就是result = c.call();获取到任务的返回值,如果任务执行成功就通过set(result)方法为成员变量outcome设置,主线程中通过FutureTaskget方法获取的就是这个值。同时执行run方法会根据执行的结果,修改任务的状态,FutureTask中定义任务的状态类型如下,完成的类型有4种: normal(正常结束)、exceptional(异常结束)、cancelled(取消)、interrupted(中断):



 

FutureTaskget方法

在主线程中调用Futureget方法本质上是调用的FutureTaskget方法 以获取返回值。大致思想就是如果任务状态已经完成(上述讲的4种状态),则返回执行结果;如果还未完成,则主线程阻塞(调用get方法的线程)。get方法具体实现如下:

public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING) //小于等于1表示还没有完成,结合上面的状态字段理解
            s = awaitDone(false, 0L);//轮询、阻塞等待任务执行
        return report(s);//根据不同的状态返回不同的值
}
 

 

如果状态字段state小于等于1,即不是上述讲的4种完成状态,就调用awaitDone方法阻塞,直到被唤醒,再次判断状态是否完成,最后返回的是完成状态。然后report(s)方法会根据状态获取不同的返回值,返回个主线程,整个get方法执行结束。

 

这里重点看下awaitDone方法:

private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
 
            int s = state;
            if (s > COMPLETING) {//如果已经完成就返回
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();//创建等待节点
            else if (!queued)//如果不在等待队列就加入等待队列
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                        q.next = waiters, q);
            else if (timed) {//如果是延时获取,就行延时阻塞
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else//否则直接阻塞
                LockSupport.park(this);
        }
    }
 

 

结合给出的注释应该很好理解,但需要注意两点:

1、线程被阻塞了,什么时候被唤醒?这个是在上述run方法执行完成后,会调用finishCompletion()方法,在这个方法中会唤醒awaitDone继续执行。

private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);//唤醒awaitDone阻塞
                    }
                    //从队列中移除当前线程节点
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
 
        done();
 
        callable = null;        // to reduce footprint
}

2、这里的WaitNode是等待节点,每个节点中包含一个调用get方法被阻塞的主线程。这些节点会组成一个单向链表,从队列中移除的工作在awaitDonefinishCompletion中都会进行。在jdk1.6时是通过AQS实现的,jdk 1.7以后做了简化,这里贴出的是jdk1.8的代码,感兴趣的朋友可以切到jdk1.6看下。

 

如果说set方法和get方法是相对的话,那finishCompletionawaitDone就是相对的,理解了这两个方法,基本就理解了FutureTask

 

还有一个延时get方法,实现流程与get方法基本相对,只是在awaitDone方法中会使用延时阻塞。

 

FutureTaskcancel取消任务方法

如果任务还可以开始,可以通过Futurecancel方法取消任务执行,本质上是通过修改FutureTask的状态为CANCELLED取消状态。但线程池还是会执行FutureTaskrun方法,只是,在run方法的开始就会判断当前的状态如果不是NEW(初始状态),run方法直接结束:

public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&  //cas修改状态为CANCELLED
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        //省略其他内容
}
 
public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
//剩余其他内容
}

 

另外isCancelled(是否取消)isDone(是否完成)的实现就很简单了,直接判断比较当前状态值就行了:

public boolean isCancelled() {
        return state >= CANCELLED;
    }
 
    public boolean isDone() {
        return state != NEW; //注意这里只要任务开始就表示完成了。
    }

 

需要注意的是isDone方法FutureTask的实现是只要开始就算完成了,也就是说只要开始了任务就不能取消了,只能等到任务执行完成。而任务真正的任务完成,状态必须是上述讲的4中完成状态之一。至此FutureTask实现原理讲解完毕。

 

总结

 

 

Future直接结合线程池一起使用,它定义了一些操作任务的方法(取消任务、获取任务执行结果等),它的最终实现类为FutureTask。通过这个类的相关实现,可以把一个Callable封装到FutureTaskFutureTask同时实现了RunnableFuture接口,可以把FutureTask提交给线程池执行的同时,也可以通过这个对象获取任务的执行情况。

 

 

 

  • 大小: 26.2 KB
  • 大小: 32.9 KB
1
0
分享到:
评论

相关推荐

    java线程池的源码分析.zip

    Java线程池是Java并发编程中的重要组成部分,它在多线程和高并发场景下扮演着关键角色。本文将深入探讨Java线程池的源码分析,并对比不同类型的线程池,以帮助开发者更好地理解和利用这一强大的工具。 首先,我们要...

    java线程池threadpool简单使用源码

    Java线程池(ThreadPool)是Java并发编程中的一个重要概念,它可以帮助我们有效地管理和控制并发执行的任务,从而提高系统的效率和稳定性。线程池通过复用已存在的线程,避免了频繁创建和销毁线程带来的开销,同时也...

    java线程池实例

    Java线程池是一种高效管理线程资源的工具,它通过维护一组可重用的线程来减少创建和销毁线程的开销。在Java中,`java.util.concurrent`包提供了`ExecutorService`接口和它的实现类,如`ThreadPoolExecutor`,来支持...

    JAVA使用线程池查询大批量数据

    在Java开发中,处理大批量数据时,合理利用线程池可以显著提高程序的执行效率和资源利用率。本文将深入探讨如何在Java中使用线程池来查询大量数据,以及这样做的好处和实现方法。 首先,理解线程池的概念至关重要。...

    java 线程池与通过Future终止线程实例

    NULL 博文链接:https://waitingkkk-163-com.iteye.com/blog/2232286

    详细分析JAVA 线程池

    "详细分析JAVA线程池" Java线程池是Java编程语言中的一种机制,用于管理和重用线程,以提高程序的性能和效率。下面是Java线程池的相关知识点: 1. 什么是线程池? 线程池是指在程序启动时创建的一组空闲线程,程序...

    java 线程池常用方法

    Java线程池是Java并发编程中的重要组成部分,它在Java 5及后续版本中引入,大大简化了线程管理和资源调度。线程池通过`Executor`接口和`ExecutorService`接口提供了一套强大的机制,允许开发者高效地创建、管理和...

    Java 线程池_动力节点Java学院整理

    Java线程池是一种高效管理线程资源的工具,它的出现是为了优化系统性能,尤其是在需要频繁创建和销毁线程的情况下。由于操作系统创建新线程需要进行内存分配、上下文切换等操作,这涉及到一定的开销。因此,使用...

    JAVA集中常用的线程池比较.pdf

    Java线程池是一种高效管理线程的工具,它允许开发者预先配置一组线程,以便在处理并发任务时能更好地控制系统的资源。线程池的概念源于服务器应用程序中对大量短小任务处理的需求,避免频繁创建和销毁线程带来的性能...

    JAVA经典线程池源码

    Java线程池是Java并发编程中的重要组成部分,它在多线程编程中扮演着至关重要的角色,有效地管理和调度线程资源,提高了程序的性能和稳定性。本资源包含了一个经典的Java线程池实现,适用于大型项目,能帮助开发者...

    java 线程池 学习代码

    Java线程池是一种高效管理线程的工具,它允许开发者预先定义好一组线程,然后根据需要分配任务,而不是每次需要执行任务时都创建新的线程。这种设计模式可以显著提高系统的性能,减少线程创建和销毁的开销,同时通过...

    java线程池实现批量下载文件

    Java线程池实现批量下载文件 Java线程池实现批量下载文件是指使用Java语言和线程池机制来实现批量下载文件的功能。这种方法可以大幅提高下载速度和效率,特别是当需要下载大量文件时。下面将对Java线程池实现批量...

    Java线程池学习资料-全

    Java线程池是一种高效管理并发任务的工具,它通过复用线程来减少创建和销毁线程的开销,从而提高系统性能。线程池的核心组成部分包括工作队列(Work Queue)、工作线程(Worker Threads)和任务调度器(Executor)。...

    java聊天室,利用线程池实现多用户聊天室

    我们可以通过`submit()`方法将任务(如处理用户消息)提交到线程池,然后通过`Future`对象获取任务执行的结果。 为了实现多用户聊天功能,还需要设计消息的序列化和反序列化机制。这可能涉及到JSON或XML等数据格式...

    JAVA线程池的分析和使用

    JAVA线程池是一种高效管理线程的技术,它允许开发者预设线程数量,避免无限制地创建和销毁线程带来的开销。线程池通过控制并发级别,优化系统资源使用,提升系统的稳定性和响应速度。 1. **线程池的优势** - **...

    Java中Future、FutureTask原理以及与线程池的搭配使用

    Java中的`Future`和`FutureTask`是并发编程中重要的工具,它们允许程序异步执行任务并获取结果。`Future`接口提供了对异步计算结果的访问和控制,而`FutureTask`是`Future`的一个具体实现,它还同时实现了`Runnable`...

    Java中多线程的使用线程池.docx

    - **提交任务**:使用`executorService.submit(Runnable task)` 将任务提交给线程池,返回`Future`对象,可以用于获取任务执行结果。 - **关闭线程池**:在所有任务执行完毕后,调用 `executorService.shutdown()`...

    深入理解高并发编程-Java线程池核心技术

    在深入理解高并发编程,尤其是Java线程池核心技术时,我们首先要明白线程与多线程的概念。线程是操作系统中的基本调度单元,它比进程更小,且基本不拥有系统资源,主要由程序计数器、寄存器和栈等组成。在同一个进程...

    Java 线程池详解及创建简单实例

    Java线程池是一种高效管理并发任务的机制,它通过复用已存在的线程来减少线程创建和销毁的开销,从而提高系统的整体性能。本文将深入解析Java线程池的工作原理,并给出创建简单实例的步骤。 线程池的核心在于`java....

    Java线程池完整代码,已经用于实际的项目中,性能稳定.zip

    Java线程池是一种高效管理并发任务执行的机制,它通过维护一组可重用的线程,减少了创建和销毁线程的开销。本压缩包包含的`ThreadPool.java`、`PooledThread.java`和`ThreadTask.java`是实现线程池功能的核心组件。 ...

Global site tag (gtag.js) - Google Analytics