`
沉沦的快乐
  • 浏览: 56794 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

并发编程之FutureTask的实现解析

阅读更多

简介

        很多时候我们希望创建一个新线程去执行一个功能相对独立的任务,并在任务完成之后返回执行结果。如果实现Runnable接口,没有办法获得返回值;如果实现Callable接口,必须使用ExecutorService来执行,不能使用更简单灵活的new thread方法来实现。为了解决上面两个问题,于是有了FutureTask。FutureTask实现了Runnable, Future<V>,所以它既可以通过new thread来跑任务,也可以通过ExecutorService来管理任务,同时FutureTask还提供了超时返回的功能。另外还有一个非常有用的功能是它提供了一个可重载方法done(),done()会在任务执行完之后被调用,用户可以override该方法,来执行一些数据清理,句柄释放等操作。下面来看下FutureTask的具体实现。

 

FutureTask实现原理

API接口

接口 接口描述
boolean cancel(boolean mayInterruptIfRunning) 取消任务,参数表示是否取消正在执行的任务
boolean isCancelled() 任务在正常结束前是否被取消
boolean isDone(); 任务是否执行完
V get() 返回任务结果
V get(long timeout, TimeUnit unit) 带超时时间的任务返回方法

实现原理

跟concurrent包中大多数并发类的设计一样,FutureTask也是基于AQS设计了一个内部类Sync来实现FutureTask的功能。Sync的通过传入Callable<V> callable来构造一个对象,这个callable就是需要执行的任务,V是任务返回的类型。Sync通过AQS的state属性来管理任务状态,比如1是RUNNING,2是RAN,4是CANCELLED。下面分别来剖析FutureTask的API的实现。

 

   1.isCancelled()

public boolean isCancelled() {
        return sync.innerIsCancelled();
    }
   真正的实现是通过sync的innerIsCancelled()来实现的,可以看出AQS的state属性值是保存任务状态的。

 

   

 boolean innerIsCancelled() {
            return getState() == CANCELLED;
        }
 
2.isDone()
public boolean isDone() {
        return sync.innerIsDone();
    }
    真正的实现是通过sync的innerIsDone()实现的,判断state的状态是RAN或者CANCELLED,并且这个任务的执行线程是否为null。
boolean innerIsDone() {
            return ranOrCancelled(getState()) && runner == null;
        }

 private boolean ranOrCancelled(int state) {
            return (state & (RAN | CANCELLED)) != 0;
        }
 
3.cancel(boolean mayInterruptIfRunning)
public boolean cancel(boolean mayInterruptIfRunning) {
        return sync.innerCancel(mayInterruptIfRunning);
    }
    取消任务的方法是Sync的innerCancel(boolean mayInterruptIfRunning)来实现的:
boolean innerCancel(boolean mayInterruptIfRunning) {
	    for (;;) {
		int s = getState();
		if (ranOrCancelled(s))
		    return false;
		if (compareAndSetState(s, CANCELLED))
		    break;
	    }
            if (mayInterruptIfRunning) {
                Thread r = runner;
                if (r != null)
                    r.interrupt();
            }
            releaseShared(0);
            done();
            return true;
        }
    这个方法通过轮询的方式 ,采用CAS(compareAndSet)的同步方式来把state状态设置为CANCELLED。如果设置参数需要中断正在执行的任务,则调用interrupt()来中断任务。最后通过tryReleaseShared释放执行线程。并且执行done()方法来执行一些清理操作。FutureTask的done()方法没有执行让任何操作,用户可以通过extends的方法来override该方法。tryReleaseShared的方法很简单,仅仅把执行线程runner设置为null。
 
4.public V get()
public V get() throws InterruptedException, ExecutionException {
        return sync.innerGet();
    }
  获取任务的返回值,该方法委托Sync的innerGet()来实现。
V innerGet() throws InterruptedException, ExecutionException {
            acquireSharedInterruptibly(0);
            if (getState() == CANCELLED)
                throw new CancellationException();
            if (exception != null)
                throw new ExecutionException(exception);
            return result;
        }
     acquireSharedInterruptibly方法是AQS里面非常重要的一个方法,他以轮询的方式来阻塞线程,直到获得中断或者满足退出轮询条件来终结果阻塞,运行后面的代码。acquireSharedInterruptibly的退出阻塞状态的条件是tryAcquireShared方法的返回值大于0,tryAcquireShared方法是需要用户来override的。在这里tryAcquireShared返回大于0的条件是任务是否完成或取消。
protected int tryAcquireShared(int ignore) {
            return innerIsDone()? 1 : -1;
        }
 如果acquireSharedInterruptibly捕获中断状态,则抛出中断异常,否则执行下面的代码,如果状态为CANCELLED,则抛出CancellationException;如果Sync的exception属性不为空,则抛出这个exception。如果到这里innerGet还没退出,说明任务是执行完的,则返回任务结果。
 
5.public V get(long timeout, TimeUnit unit)
public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return sync.innerGet(unit.toNanos(timeout));
    }
 带超时时间的获取任务结果方法,委托sync的innerGet(long nanosTimeout)来执行:
V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
            if (!tryAcquireSharedNanos(0, nanosTimeout))
                throw new TimeoutException();
            if (getState() == CANCELLED)
                throw new CancellationException();
            if (exception != null)
                throw new ExecutionException(exception);
            return result;
        }
 这个方法与上面讲的不带超时时间的方法差不多,只是tryAcquireSharedNanos超时之后会抛出超时异常。
 
6.public void run()
public void run() {
        sync.innerRun();
    }
  启动任务运行方法,委托sync的innerRun()来实现。
void innerRun() {
            if (!compareAndSetState(0, RUNNING))
                return;
            try {
                runner = Thread.currentThread();
                if (getState() == RUNNING) // recheck after setting thread
                    innerSet(callable.call());
                else
                    releaseShared(0); // cancel
            } catch (Throwable ex) {
                innerSetException(ex);
            }
        }
 首先通过CAS方法把state更新为RUNNING状态,如果更新失败,说明这个任务的状态不是初始状态(0),说明这个任务被取消了,或者运行完了,或者正在运行。直接退出方法。只有state能更新为RUNNING状态,则说明能够开始执行任务。状态更新RUNNING之后,任务真正执行之前,需要在检查下状态是否为RUNING。因为这个反复没有加锁,状态可能被更新为取消了。所以如果检查状态不在为RUNNING,则释放线程资源,否则把runner设置为当前线程,并通过innerSet(callable.call());执行用户的任务,然后设置任务状态。
void innerSet(V v) {
	    for (;;) {
		int s = getState();
		if (s == RAN)
		    return;
                if (s == CANCELLED) {
		    // aggressively release to set runner to null,
		    // in case we are racing with a cancel request
		    // that will try to interrupt runner
                    releaseShared(0);
                    return;
                }
		if (compareAndSetState(s, RAN)) {
                    result = v;
                    releaseShared(0);
                    done();
		    return;
                }
            }
        }
 如果状态已经运行结束了 直接退出;如果状态设置为CANCELLED,则强制释放线程并试图中断正在执行的任务并退出。如果既没运行完也没取消,则通过CAS把状态设置为RAN,把结果设置为call()返回的结果,然后释放线程(runner属性设置为null),并执行done()操作。
 
 
分享到:
评论

相关推荐

    13-Java并发编程学习宝典.zip

    Java并发编程是软件开发中的重要领域,特别是在大型系统和高并发场景中不可或缺。"13-Java并发编程学习宝典.zip" 包含了一系列关于Java并发编程的学习资源,旨在帮助开发者掌握多线程编程的核心技术和最佳实践。以下...

    JAVA并发编程实践 中文 高清 带书签 完整版 Doug Lea .pdf

    根据提供的文件信息,“JAVA并发编程实践 中文 高清 带书签 完整版 Doug Lea .pdf”,我们可以推断出这份文档主要聚焦于Java并发编程的技术实践与理论探讨。下面将从多个角度来解析这个文档可能涵盖的关键知识点。 ...

    Java并发编程原理与实战

    提前完成任务之FutureTask使用.mp4 Future设计模式实现(实现类似于JDK提供的Future).mp4 Future源码解读.mp4 ForkJoin框架详解.mp4 同步容器与并发容器.mp4 并发容器CopyOnWriteArrayList原理与使用.mp4 并发容器...

    JAVA并发编程实践

    Java并发编程中提供了丰富的工具类,如Atomic包下的原子变量类、ConcurrentHashMap并发哈希表、CopyOnWriteArrayList/CopyOnWriteArraySet并发集合、Future/FutureTask异步任务执行框架等。本书对这些工具类的原理和...

    《Java并发编程实战》PDF版本下载.txt

    然而,为了满足您对于生成相关知识点的需求,我们将围绕《Java并发编程实战》这本书的内容来展开讨论,深入解析Java并发编程的核心概念、原理以及实际应用技巧。 ### Java并发编程基础 #### 1. 并发与并行的概念 -...

    Java 并发编程原理与实战视频

    java并发编程原理实战 第2节理解多线程与并发的之间的联系与区别 [免费观看] 00:11:59分钟 | 第3节解析多线程与多进程的联系以及上下文切换所导致资源浪费问题 [免费观看] 00:13:03分钟 | 第4节学习并发的四个...

    龙果 java并发编程原理实战

    龙果 java并发编程原理实战 第2节理解多线程与并发的之间的联系与区别 [免费观看] 00:11:59分钟 | 第3节解析多线程与多进程的联系以及上下文切换所导致资源浪费问题 [免费观看] 00:13:03分钟 | 第4节学习并发的四...

    龙果java并发编程完整视频

    第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于JDK提供的Future)00:19:20分钟 | 第44节Future源码解读00:29:22分钟 | 第45节Fork/Join框架详解00:28:09分钟 | 第46节...

    Java并发编程实战

    ### Java并发编程实战 #### 知识点概览 1. **Java并发基础** - 线程生命周期与状态转换 - 创建线程的多种方式 - 线程安全问题及其解决策略 2. **Java并发工具类详解** - `Executor`框架与`ThreadPoolExecutor...

    Java 并发编程硬核资料.pdf

    Java并发编程主要涵盖了多线程编程、线程池的应用、锁的机制以及Java内存模型等方面的知识。在多线程编程中,需要了解线程的创建方法、线程状态的管理和控制、线程间通信的方法,以及线程安全和并发性能优化等。...

    并发编程库,&amp;&amp;,线程池

    在IT行业中,线程池是高并发编程领域中不可或缺的一部分,尤其在Java中,线程池的应用非常广泛。本文将详细解析线程池的概念、重要性以及如何在实际开发中合理利用。 首先,为什么要使用线程池?线程池的主要优势...

    java并发编程

    第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于JDK提供的Future)00:19:20分钟 | 第44节Future源码解读00:29:22分钟 | 第45节Fork/Join框架详解00:28:09分钟 | 第46节...

    Java 多线程与并发(17-26)-JUC线程池- FutureTask详解.pdf

    `FutureTask`是Java并发编程中一个非常重要的工具,它不仅可以用于封装任务并异步执行,还能方便地获取任务执行结果以及管理任务的状态。理解`FutureTask`的工作原理及其使用方法,对于提高程序的并发性能和响应能力...

    揭密FutureTask.docx

    在Java并发编程中,FutureTask扮演着至关重要的角色,它是实现异步计算的关键组件。本文将深入探讨FutureTask的原理和用法,帮助开发者更好地理解和利用这个强大的工具。 一、Future接口与FutureTask概述 Future...

    Java多线程与并发系列22道高频面试题(附思维导图和答案解析)

    Java中实现多线程有四种方法:继承Thread类、实现Runnable接口、实现Callable接口通过FutureTask包装器来创建Thread线程、使用ExecutorService、Callable、Future实现有返回结果的多线程。 二、停止一个正在运行的...

    java多线程编程同步器Future和FutureTask解析及代码示例

    在Java并发编程中,通常我们会使用`ExecutorService`来提交任务,而`Future`和`FutureTask`就是与这些任务交互的关键。 首先,`Future`是一个接口,它提供了一种机制来检查异步计算是否完成,以及在计算完成后获取...

Global site tag (gtag.js) - Google Analytics