`

实验九:java Future 接口

 
阅读更多

下面讲一下java 的future 接口

 

我们都知道java 实现多线程的机制, 1.5之前调用runnable 接口。

 

1.5之后我们可以同过别的方式实现多线程。

 

如果我们想执行另外一个线程 比如说调用class A 的B()方法,

 

最好的方式就是调用A.B();

 

为了给A.B() 提供扩展性, 最好的方式就是A是接口, 所有实现了A接口的类都会实现B接口,这就是RUnnale接口的由来。同理callable 接口也是一样, 只不过多了返回值。

 

谁来接收这个返回值呢, Future 接口。

 

FutureTask 是Future接口的实现, 是对callable和Runnable 接口的包装。

 

Future 接口的所有实现其实是操作callable和runnable的细节

 

package Future;

 

import java.util.concurrent.Callable;

 

public class FutureCase implements Callable{

 

/**

* @param args

*/

public static void main(String[] args) {

System.out.println("test Future");

 

}

 

@Override

public Object call() throws Exception {

// TODO Auto-generated method stub

System.out.println("test Future");

Thread.sleep(10000);

System.out.println("test Future 10000");

Thread.sleep(10000);

return "call";

}

 

}

 

package Future;

 

import java.util.concurrent.ExecutionException;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Future;

 

 

public class TestFuture {

 

/**

* @param args

*/

public static void main(String[] args) {

 

ExecutorService exec = Executors.newCachedThreadPool();    

Future<String> result = exec.submit(new FutureCase());

System.out.println("i need to do my work at first");

try {

String res =  result.get();

System.out.println("i need to do my work at second");

System.out.println(res);

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

} catch (ExecutionException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

 

}

 

}

 

 

 

 

 

FutureTask 实现。

package java.util.concurrent;

 

import java.util.concurrent.locks.LockSupport;

import sun.misc.Unsafe;

 

public class FutureTask<V>

  implements RunnableFuture<V>

{

  private volatile int state;

  private static final int NEW = 0;

  private static final int COMPLETING = 1;

  private static final int NORMAL = 2;

  private static final int EXCEPTIONAL = 3;

  private static final int CANCELLED = 4;

  private static final int INTERRUPTING = 5;

  private static final int INTERRUPTED = 6;

  private Callable<V> callable;

  private Object outcome;

  private volatile Thread runner;

  private volatile WaitNode waiters;

  private static final Unsafe UNSAFE;

  private static final long stateOffset;

  private static final long runnerOffset;

  private static final long waitersOffset;

 

  private V report(int paramInt)

    throws ExecutionException

  {

    Object localObject = this.outcome;

    if (paramInt == 2)

      return localObject;

    if (paramInt >= 4)

      throw new CancellationException();

    throw new ExecutionException((Throwable)localObject);

  }

 

  public FutureTask(Callable<V> paramCallable)

  {

    if (paramCallable == null)

      throw new NullPointerException();

    this.callable = paramCallable;

    this.state = 0;

  }

 

  public FutureTask(Runnable paramRunnable, V paramV)

  {

    this.callable = Executors.callable(paramRunnable, paramV);

    this.state = 0;

  }

 

  public boolean isCancelled()

  {

    return this.state >= 4;

  }

 

  public boolean isDone()

  {

    return this.state != 0;

  }

 

  public boolean cancel(boolean paramBoolean)

  {

    if (this.state != 0)

      return false;

    if (paramBoolean)

    {

      if (!UNSAFE.compareAndSwapInt(this, stateOffset, 0, 5))

        return false;

      Thread localThread = this.runner;

      if (localThread != null)

        localThread.interrupt();

      UNSAFE.putOrderedInt(this, stateOffset, 6);

    }

    else if (!UNSAFE.compareAndSwapInt(this, stateOffset, 0, 4))

    {

      return false;

    }

    finishCompletion();

    return true;

  }

 

  public V get()

    throws InterruptedException, ExecutionException

  {

    int i = this.state;

    if (i <= 1)

      i = awaitDone(false, 0L);

    return report(i);

  }

 

  public V get(long paramLong, TimeUnit paramTimeUnit)

    throws InterruptedException, ExecutionException, TimeoutException

  {

    if (paramTimeUnit == null)

      throw new NullPointerException();

    int i = this.state;

    if ((i <= 1) && ((i = awaitDone(true, paramTimeUnit.toNanos(paramLong))) <= 1))

      throw new TimeoutException();

    return report(i);

  }

 

  protected void done()

  {

  }

 

  protected void set(V paramV)

  {

    if (UNSAFE.compareAndSwapInt(this, stateOffset, 0, 1))

    {

      this.outcome = paramV;

      UNSAFE.putOrderedInt(this, stateOffset, 2);

      finishCompletion();

    }

  }

 

  protected void setException(Throwable paramThrowable)

  {

    if (UNSAFE.compareAndSwapInt(this, stateOffset, 0, 1))

    {

      this.outcome = paramThrowable;

      UNSAFE.putOrderedInt(this, stateOffset, 3);

      finishCompletion();

    }

  }

 

  public void run()

  {

    if ((this.state != 0) || (!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())))

      return;

    try

    {

      Callable localCallable = this.callable;

      if ((localCallable != null) && (this.state == 0))

      {

        Object localObject1;

        int j;

        try

        {

          localObject1 = localCallable.call();

          j = 1;

        }

        catch (Throwable localThrowable)

        {

          localObject1 = null;

          j = 0;

          setException(localThrowable);

        }

        if (j != 0)

          set(localObject1);

      }

      this.runner = null;

      int i = this.state;

      if (i >= 5)

        handlePossibleCancellationInterrupt(i);

    }

    finally

    {

      this.runner = null;

      int k = this.state;

      if (k >= 5)

        handlePossibleCancellationInterrupt(k);

    }

  }

 

  protected boolean runAndReset()

  {

    if ((this.state != 0) || (!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())))

      return false;

    int i = 0;

    int j = this.state;

    try

    {

      Callable localCallable = this.callable;

      if ((localCallable != null) && (j == 0))

        try

        {

          localCallable.call();

          i = 1;

        }

        catch (Throwable localThrowable)

        {

          setException(localThrowable);

        }

      this.runner = null;

      j = this.state;

      if (j >= 5)

        handlePossibleCancellationInterrupt(j);

    }

    finally

    {

      this.runner = null;

      j = this.state;

      if (j >= 5)

        handlePossibleCancellationInterrupt(j);

    }

    return (i != 0) && (j == 0);

  }

 

  private void handlePossibleCancellationInterrupt(int paramInt)

  {

    if (paramInt == 5)

      while (this.state == 5)

        Thread.yield();

  }

 

  private void finishCompletion()

  {

    Object localObject;

    while ((localObject = this.waiters) != null)

      if (UNSAFE.compareAndSwapObject(this, waitersOffset, localObject, null))

        while (true)

        {

          Thread localThread = ((WaitNode)localObject).thread;

          if (localThread != null)

          {

            ((WaitNode)localObject).thread = null;

            LockSupport.unpark(localThread);

          }

          WaitNode localWaitNode = ((WaitNode)localObject).next;

          if (localWaitNode == null)

            break;

          ((WaitNode)localObject).next = null;

          localObject = localWaitNode;

        }

    done();

    this.callable = null;

  }

 

  private int awaitDone(boolean paramBoolean, long paramLong)

    throws InterruptedException

  {

    long l = paramBoolean ? System.nanoTime() + paramLong : 0L;

    WaitNode localWaitNode = null;

    boolean bool = false;

    while (true)

    {

      if (Thread.interrupted())

      {

        removeWaiter(localWaitNode);

        throw new InterruptedException();

      }

      int i = this.state;

      if (i > 1)

      {

        if (localWaitNode != null)

          localWaitNode.thread = null;

        return i;

      }

      if (i == 1)

      {

        Thread.yield();

      }

      else if (localWaitNode == null)

      {

        localWaitNode = new WaitNode();

      }

      else if (!bool)

      {

        bool = UNSAFE.compareAndSwapObject(this, waitersOffset, localWaitNode.next = this.waiters, localWaitNode);

      }

      else if (paramBoolean)

      {

        paramLong = l - System.nanoTime();

        if (paramLong <= 0L)

        {

          removeWaiter(localWaitNode);

          return this.state;

        }

        LockSupport.parkNanos(this, paramLong);

      }

      else

      {

        LockSupport.park(this);

      }

    }

  }

 

  private void removeWaiter(WaitNode paramWaitNode)

  {

    if (paramWaitNode != null)

    {

      paramWaitNode.thread = null;

      Object localObject1 = null;

      WaitNode localWaitNode;

      for (Object localObject2 = this.waiters; ; localObject2 = localWaitNode)

      {

        if (localObject2 == null)

          return;

        localWaitNode = ((WaitNode)localObject2).next;

        if (((WaitNode)localObject2).thread != null)

        {

          localObject1 = localObject2;

        }

        else

        {

          if (localObject1 != null)

          {

            localObject1.next = localWaitNode;

            if (localObject1.thread != null)

              continue;

            break;

          }

          if (!UNSAFE.compareAndSwapObject(this, waitersOffset, localObject2, localWaitNode))

            break;

        }

      }

    }

  }

 

  static

  {

    try

    {

      UNSAFE = Unsafe.getUnsafe();

      FutureTask localFutureTask = FutureTask.class;

      stateOffset = UNSAFE.objectFieldOffset(localFutureTask.getDeclaredField("state"));

      runnerOffset = UNSAFE.objectFieldOffset(localFutureTask.getDeclaredField("runner"));

      waitersOffset = UNSAFE.objectFieldOffset(localFutureTask.getDeclaredField("waiters"));

    }

    catch (Exception localException)

    {

      throw new Error(localException);

    }

  }

 

  static final class WaitNode

  {

    volatile Thread thread = Thread.currentThread();

    volatile WaitNode next;

  }

}

 

/* Location:           C:\Program Files\Java\jre7\lib\rt.jar

 * Qualified Name:     java.util.concurrent.FutureTask

 * JD-Core Version:    0.6.2

 

 */

 

 

 

分享到:
评论

相关推荐

    JAVA实验指导书

    此实验进一步深入到并发编程,介绍线程池、Future、Callable接口以及如何处理线程间的通信和协作,提高多线程程序的效率和可控性。 实验八:字符流和字符流输入/输出编程 这一部分关注I/O流,包括字符流的读写操作...

    java实验(包含文件、多线程等)

    同时,Java还提供了`ExecutorService`和`Future`接口,以及`ThreadPoolExecutor`等类来管理线程池,这在处理大量并发任务时非常有用。理解线程同步的概念,如`synchronized`关键字、`wait()`、`notify()`和`...

    LabJava:Java编程实验室

    在实验室中,你可以学习如何同步线程、使用Thread类和Runnable接口,以及了解ExecutorService和Future的概念。 6. **I/O流**:Java的I/O流系统允许程序读取和写入各种数据源,包括文件、网络连接等。学会使用...

    java实验.rar

    - 线程池:ExecutorService和Future接口,提高系统性能。 7. **网络编程**: - Socket编程:实现客户端和服务器端的通信。 - URL和URLConnection类:访问网络资源。 8. **设计模式**: - 单例模式、工厂模式、...

    JavaPractice:Java Lab任务存储库

    JavaPractice存储库是Java编程学习者的一个宝贵资源,由作者金孝俊创建,旨在提供一系列的Java实验室任务,帮助开发者深化对Java语言的理解并提升实际编程能力。这个存储库主要聚焦于Java的基础与进阶应用,涵盖了从...

    java高级编程.zip

    此外,ExecutorService和Future接口提供了一种更灵活的线程池管理方式。 2. **网络编程**:Java提供了Socket和ServerSocket类进行TCP/IP通信,以及DatagramSocket和DatagramPacket类进行UDP通信。NIO(非阻塞I/O)...

    【Java毕业设计】毕业设计——基于Java的漏洞扫描系统.zip

    7. **多线程与并发**:为了提高扫描效率,系统可能会并发执行多个扫描任务,这就需要理解和运用Java的并发库,如ExecutorService和Future。 8. **数据库操作**:收集到的漏洞信息可能被存储在数据库中,因此需要...

    JAVA多线程实验,字符移动、小球碰撞

    在Java中,我们可以通过继承Thread类或实现Runnable接口来创建线程。 对于“字符移动”实验,我们可以创建一个线程来处理字符的移动逻辑。例如,定义一个Character类,包含位置信息和移动方法。然后,通过创建...

    gradle_java_experiments:各种gradle java实验的容器项目

    5. **并发编程**:实验可能涉及到Java的并发工具,如ExecutorService、Future、Semaphore等。 6. **Spring框架**:如果项目包含Spring相关内容,可能涵盖了依赖注入、AOP、数据访问等Spring核心概念的实践。 7. **...

    java面向对象和多线程实验

    Java提供了丰富的线程API,包括`Thread`类和`Runnable`接口,以及高级的并发工具如`ExecutorService`和`Future`。 线程同步是多线程编程中的关键问题,防止数据竞争和死锁的发生。Java提供了多种同步机制,如`...

    WHUT-java多线程实验-第三周-文件上传和下载.zip

    5. **资源管理**:Java提供`ExecutorService`和`Future`接口,帮助开发者更好地管理和控制线程池,防止过多线程导致系统资源耗尽。 在实现多线程文件上传和下载时,通常会用到以下Java API: - **Thread**:基础的...

    java并发编程艺术源码-ArtConcurrentBook:Java并发编程的艺术书本

    5. **并发工具类**:`java.util.concurrent`包还包含了许多并发工具类,如`CountDownLatch`、`CyclicBarrier`、`Future`、`CompletableFuture`等,它们可以帮助开发者实现复杂的并发控制和协作。 6. **并发模式**:...

    v2ch11.rar

    章节可能涵盖Thread类的使用,同步控制(synchronized关键字、wait()和notify()方法),以及ExecutorService和Future接口。 9. 文件和目录操作:Java提供了File类和nio包来处理文件和目录,包括创建、删除、移动和...

    Java多线程编程总结

    - `Callable` 接口允许线程执行后返回结果,`Future` 接口用于获取这些结果。 #### 十四、Java线程:新特征-锁(上)&(下) - `java.util.concurrent.locks` 包提供了更高级别的锁实现,如 `ReentrantLock`、`...

    设计一个动态时钟程序,指针

    在本实验中,我们将设计一个动态时钟程序,利用Java的多线程特性来模拟时钟的指针移动。这个程序的核心在于理解和运用Java线程的相关知识点,包括线程的创建、调度、同步以及生命周期管理。以下是关于这些知识点的...

    WHUT-java多线程实验-第二周-异常处理.zip

    4. **异常传播**:在一个线程中捕获的异常可能会传播到其他线程,特别是在使用Future和Callable接口时。理解如何正确处理这种跨线程的异常传播非常重要。 5. **finally块**:无论是否发生异常,finally块中的代码...

    Java实验五线程.pdf

    2. 实现`Runnable`接口:创建一个类实现`Runnable`接口,实现`run()`方法。然后将`Runnable`对象作为参数传递给`Thread`的构造函数,创建`Thread`实例并启动。 并发执行的多线程间可能存在以下关系: - 同步:线程...

    简单的java线程demo

    除了这两种方式,Java还提供了Callable和Future接口,它们允许线程返回一个值,并处理异常。此外,Executor框架提供了一种更高级的方式来管理和控制线程,比如ThreadPoolExecutor可以创建线程池,有效地管理和重用...

    java-labs:继续学习Java:video_game:

    在“Java实验室”中,我们专注于深入理解和应用Java编程语言,特别是通过"video_game"这个主题,我们可以探索如何使用Java来开发游戏。这个压缩包文件包含一个名为"java-labs-main"的主目录,可能包含了源代码、示例...

    JAVA网络通信系统的研究与开发(论文+源代码+开题报告).zip

    Java的Thread类和Runnable接口使得实现并发编程变得简单,而ExecutorService和Future接口则提供了更高级的线程管理和任务执行机制。 3. **NIO(非阻塞I/O)**:Java的非阻塞I/O模型,即New I/O API,允许服务器在...

Global site tag (gtag.js) - Google Analytics