Java并发编程之异步Future机制的原理和实现
项目中经常有些任务需要异步(提交到线程池中)去执行,而主线程往往需要知道异步执行产生的结果,这时我们要怎么做呢?用runnable是无法实现的,我们需要用callable看下面的代码:
import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class AddTask implements Callable<Integer> { private int a,b; public AddTask(int a, int b) { this.a = a; this.b = b; } @Override public Integer call() throws Exception { Integer result = a + b; return result; } public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newSingleThreadExecutor(); //JDK目前为止返回的都是FutureTask的实例 Future<Integer> future = executor.submit(new AddTask(1, 2)); Integer result = future.get();// 只有当future的状态是已完成时(future.isDone() = true),get()方法才会返回 } }
虽然可以实现获取异步执行结果的需求,但是我们发现这个Future其实很不好用,因为它没有提供通知的机制,也就是说我们不知道future什么时候完成(如果我们需要轮询isDone()来判断的话感觉就没有用这个的必要了)。看下java.util.concurrent.future.Future 的接口方法:
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
由此可见JDK的Future机制其实并不好用,如果能给这个future加个监听器,让它在完成时通知监听器的话就比较好用了,就像下面这个IFuture:
package future; import java.util.concurrent.CancellationException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** * The result of an asynchronous operation. * * @author lixiaohui * @param <V> 执行结果的类型参数 */ public interface IFuture<V> extends Future<V> { boolean isSuccess(); // 是否成功 V getNow(); //立即返回结果(不管Future是否处于完成状态) Throwable cause(); //若执行失败时的原因 boolean isCancellable(); //是否可以取消 IFuture<V> await() throws InterruptedException; //等待future的完成 boolean await(long timeoutMillis) throws InterruptedException; // 超时等待future的完成 boolean await(long timeout, TimeUnit timeunit) throws InterruptedException; IFuture<V> awaitUninterruptibly(); //等待future的完成,不响应中断 boolean awaitUninterruptibly(long timeoutMillis);//超时等待future的完成,不响应中断 boolean awaitUninterruptibly(long timeout, TimeUnit timeunit); IFuture<V> addListener(IFutureListener<V> l); //当future完成时,会通知这些加进来的监听器 IFuture<V> removeListener(IFutureListener<V> l); }
接下来就一起来实现这个IFuture,在这之前要说明下Object.wait(),Object.notifyAll()方法,因为整个Future实现的原理的核心就是这两个方法.看看JDK里面的解释:
public class Object { /** * Causes the current thread to wait until another thread invokes the * {@link java.lang.Object#notify()} method or the * {@link java.lang.Object#notifyAll()} method for this object. * In other words, this method behaves exactly as if it simply * performs the call {@code wait(0)}. * 调用该方法后,当前线程会释放对象监视器锁,并让出CPU使用权。直到别的线程调用notify()/notifyAll() */ public final void wait() throws InterruptedException { wait(0); } /** * Wakes up all threads that are waiting on this object's monitor. A * thread waits on an object's monitor by calling one of the * {@code wait} methods. * <p> * The awakened threads will not be able to proceed until the current * thread relinquishes the lock on this object. The awakened threads * will compete in the usual manner with any other threads that might * be actively competing to synchronize on this object; for example, * the awakened threads enjoy no reliable privilege or disadvantage in * being the next thread to lock this object. */ public final native void notifyAll(); }
知道这个后,我们要自己实现Future也就有了思路,当线程调用了IFuture.await()等一系列的方法时,如果Future还未完成,那么就调用future.wait() 方法使线程进入WAITING状态。而当别的线程设置Future为完成状态(注意这里的完成状态包括正常结束和异常结束)时,就需要调用future.notifyAll()方法来唤醒之前因为调用过wait()方法而处于WAITING状态的那些线程。完整的实现如下(代码应该没有很难理解的地方,我是参考netty的Future机制的。有兴趣的可以去看看netty的源码):
package future; import java.util.Collection; import java.util.concurrent.CancellationException; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * <pre> * 正常结束时, 若执行的结果不为null, 则result为执行结果; 若执行结果为null, 则result = {@link AbstractFuture#SUCCESS_SIGNAL} * 异常结束时, result为 {@link CauseHolder} 的实例;若是被取消而导致的异常结束, 则result为 {@link CancellationException} 的实例, 否则为其它异常的实例 * 以下情况会使异步操作由未完成状态转至已完成状态, 也就是在以下情况发生时调用notifyAll()方法: * <ul> * <li>异步操作被取消时(cancel方法)</li> * <li>异步操作正常结束时(setSuccess方法)</li> * <li>异步操作异常结束时(setFailure方法)</li> * </ul> * </pre> * * @author lixiaohui * * @param <V> * 异步执行结果的类型 */ public class AbstractFuture<V> implements IFuture<V> { protected volatile Object result; // 需要保证其可见性 /** * 监听器集 */ protected Collection<IFutureListener<V>> listeners = new CopyOnWriteArrayList<IFutureListener<V>>(); /** * 当任务正常执行结果为null时, 即客户端调用{@link AbstractFuture#setSuccess(null)}时, * result引用该对象 */ private static final SuccessSignal SUCCESS_SIGNAL = new SuccessSignal(); @Override public boolean cancel(boolean mayInterruptIfRunning) { if (isDone()) { // 已完成了不能取消 return false; } synchronized (this) { if (isDone()) { // double check return false; } result = new CauseHolder(new CancellationException()); notifyAll(); // isDone = true, 通知等待在该对象的wait()的线程 } notifyListeners(); // 通知监听器该异步操作已完成 return true; } @Override public boolean isCancellable() { return result == null; } @Override public boolean isCancelled() { return result != null && result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException; } @Override public boolean isDone() { return result != null; } @Override public V get() throws InterruptedException, ExecutionException { await(); // 等待执行结果 Throwable cause = cause(); if (cause == null) { // 没有发生异常,异步操作正常结束 return getNow(); } if (cause instanceof CancellationException) { // 异步操作被取消了 throw (CancellationException) cause; } throw new ExecutionException(cause); // 其他异常 } @Override public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (await(timeout, unit)) {// 超时等待执行结果 Throwable cause = cause(); if (cause == null) {// 没有发生异常,异步操作正常结束 return getNow(); } if (cause instanceof CancellationException) {// 异步操作被取消了 throw (CancellationException) cause; } throw new ExecutionException(cause);// 其他异常 } // 时间到了异步操作还没有结束, 抛出超时异常 throw new TimeoutException(); } @Override public boolean isSuccess() { return result == null ? false : !(result instanceof CauseHolder); } @SuppressWarnings("unchecked") @Override public V getNow() { return (V) (result == SUCCESS_SIGNAL ? null : result); } @Override public Throwable cause() { if (result != null && result instanceof CauseHolder) { return ((CauseHolder) result).cause; } return null; } @Override public IFuture<V> addListener(IFutureListener<V> listener) { if (listener == null) { throw new NullPointerException("listener"); } if (isDone()) { // 若已完成直接通知该监听器 notifyListener(listener); return this; } synchronized (this) { if (!isDone()) { listeners.add(listener); return this; } } notifyListener(listener); return this; } @Override public IFuture<V> removeListener(IFutureListener<V> listener) { if (listener == null) { throw new NullPointerException("listener"); } if (!isDone()) { listeners.remove(listener); } return this; } @Override public IFuture<V> await() throws InterruptedException { return await0(true); } private IFuture<V> await0(boolean interruptable) throws InterruptedException { if (!isDone()) { // 若已完成就直接返回了 // 若允许终端且被中断了则抛出中断异常 if (interruptable && Thread.interrupted()) { throw new InterruptedException("thread " + Thread.currentThread().getName() + " has been interrupted."); } boolean interrupted = false; synchronized (this) { while (!isDone()) { try { wait(); // 释放锁进入waiting状态,等待其它线程调用本对象的notify()/notifyAll()方法 } catch (InterruptedException e) { if (interruptable) { throw e; } else { interrupted = true; } } } } if (interrupted) { // 为什么这里要设中断标志位?因为从wait方法返回后, 中断标志是被clear了的, // 这里重新设置以便让其它代码知道这里被中断了。 Thread.currentThread().interrupt(); } } return this; } @Override public boolean await(long timeoutMillis) throws InterruptedException { return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), true); } @Override public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return await0(unit.toNanos(timeout), true); } private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException { if (isDone()) { return true; } if (timeoutNanos <= 0) { return isDone(); } if (interruptable && Thread.interrupted()) { throw new InterruptedException(toString()); } long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime(); long waitTime = timeoutNanos; boolean interrupted = false; try { synchronized (this) { if (isDone()) { return true; } if (waitTime <= 0) { return isDone(); } for (;;) { try { wait(waitTime / 1000000, (int) (waitTime % 1000000)); } catch (InterruptedException e) { if (interruptable) { throw e; } else { interrupted = true; } } if (isDone()) { return true; } else { waitTime = timeoutNanos - (System.nanoTime() - startTime); if (waitTime <= 0) { return isDone(); } } } } } finally { if (interrupted) { Thread.currentThread().interrupt(); } } } @Override public IFuture<V> awaitUninterruptibly() { try { return await0(false); } catch (InterruptedException e) { // 这里若抛异常了就无法处理了 throw new java.lang.InternalError(); } } @Override public boolean awaitUninterruptibly(long timeoutMillis) { try { return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), false); } catch (InterruptedException e) { throw new java.lang.InternalError(); } } @Override public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { try { return await0(unit.toNanos(timeout), false); } catch (InterruptedException e) { throw new java.lang.InternalError(); } } protected IFuture<V> setFailure(Throwable cause) { if (setFailure0(cause)) { notifyListeners(); return this; } throw new IllegalStateException("complete already: " + this); } private boolean setFailure0(Throwable cause) { if (isDone()) { return false; } synchronized (this) { if (isDone()) { return false; } result = new CauseHolder(cause); notifyAll(); } return true; } protected IFuture<V> setSuccess(Object result) { if (setSuccess0(result)) { // 设置成功后通知监听器 notifyListeners(); return this; } throw new IllegalStateException("complete already: " + this); } private boolean setSuccess0(Object result) { if (isDone()) { return false; } synchronized (this) { if (isDone()) { return false; } if (result == null) { // 异步操作正常执行完毕的结果是null this.result = SUCCESS_SIGNAL; } else { this.result = result; } notifyAll(); } return true; } private void notifyListeners() { for (IFutureListener<V> l : listeners) { notifyListener(l); } } private void notifyListener(IFutureListener<V> l) { try { l.operationCompleted(this); } catch (Exception e) { e.printStackTrace(); } } private static class SuccessSignal { } private static final class CauseHolder { final Throwable cause; CauseHolder(Throwable cause) { this.cause = cause; } } }
那么要怎么使用这个呢,有了上面的骨架实现,我们就可以定制各种各样的异步结果了。下面模拟一下一个延时的任务:
package future.test; import future.IFuture; import future.IFutureListener; /** * 延时加法 * @author lixiaohui * */ public class DelayAdder { public static void main(String[] args) { new DelayAdder().add(3 * 1000, 1, 2).addListener(new IFutureListener<Integer>() { @Override public void operationCompleted(IFuture<Integer> future) throws Exception { System.out.println(future.getNow()); } }); } /** * 延迟加 * @param delay 延时时长 milliseconds * @param a 加数 * @param b 加数 * @return 异步结果 */ public DelayAdditionFuture add(long delay, int a, int b) { DelayAdditionFuture future = new DelayAdditionFuture(); new Thread(new DelayAdditionTask(delay, a, b, future)).start(); return future; } private class DelayAdditionTask implements Runnable { private long delay; private int a, b; private DelayAdditionFuture future; public DelayAdditionTask(long delay, int a, int b, DelayAdditionFuture future) { super(); this.delay = delay; this.a = a; this.b = b; this.future = future; } @Override public void run() { try { Thread.sleep(delay); Integer i = a + b; // TODO 这里设置future为完成状态(正常执行完毕) future.setSuccess(i); } catch (InterruptedException e) { // TODO 这里设置future为完成状态(异常执行完毕) future.setFailure(e.getCause()); } } } }
package future.test; import future.AbstractFuture; import future.IFuture; //只是把两个方法对外暴露 public class DelayAdditionFuture extends AbstractFuture<Integer> { @Override public IFuture<Integer> setSuccess(Object result) { return super.setSuccess(result); } @Override public IFuture<Integer> setFailure(Throwable cause) { return super.setFailure(cause); } }
可以看到客户端不用主动去询问future是否完成,而是future完成时自动回调operationcompleted方法,客户端只需在回调里实现逻辑即可。
相关推荐
本文将深入探讨Java多线程中的异步Future机制,包括其原理、实现方式以及实际应用。 首先,我们需要了解什么是Future。在Java中,`java.util.concurrent.Future`接口代表一个异步计算的结果。它提供了检查计算是否...
《Java并发编程实战》是Java并发编程领域的一本经典著作,它深入浅出地介绍了如何在Java平台上进行高效的多线程编程。这本书的源码提供了丰富的示例,可以帮助读者更好地理解书中的理论知识并将其应用到实际项目中。...
总的来说,这份“java并发编程内部分享PPT”涵盖了Java并发编程的多个重要方面,包括线程创建与管理、同步机制、并发容器、线程池、并发问题以及异步计算。通过深入学习和实践这些知识点,开发者可以更好地应对多...
《JAVA并发编程实践》这本书是Java开发者深入理解并发编程的重要参考资料。它涵盖了Java并发的核心概念、工具和最佳实践,旨在帮助读者在多线程环境下编写高效、安全的代码。 并发编程是现代软件开发中的关键技能,...
《Java并发编程实践》是一本深入探讨Java多线程编程的经典著作,由Brian Goetz、Tim Peierls、Joshua Bloch、Joseph Bowles和David Holmes等专家共同编写。这本书全面介绍了Java平台上的并发编程技术,是Java开发...
2. **同步机制**:Java并发编程的核心在于同步,以防止数据不一致性和资源竞争。`synchronized`关键字用于实现临界区的互斥访问,确保同一时刻只有一个线程执行特定代码块。此外,还有`wait()`, `notify()`, `...
通过深入学习《JAVA并发编程艺术》,开发者能更好地理解并发编程的原理,熟练运用Java提供的并发工具和API,解决实际开发中的多线程问题,提高软件的性能和稳定性。这是一本值得每一位Java开发者研读的书。
本资源包含三本权威的Java并发编程书籍:《Java并发编程实践》、《java并发编程的艺术》以及Brian Goetz的文字版《Java并发编程实践》。 首先,我们来看《Java并发编程实践》(Java Concurrency in Practice)这...
### Java并发编程实践 #### 一、并发编程基础 ##### 1.1 并发与并行的区别 ...通过上述知识点的学习,我们可以更好地理解和掌握Java并发编程的基本原理和技巧,为开发高效稳定的并发应用程序打下坚实的基础。
Java并发编程是Java开发者必须掌握的关键技能之一,它涉及到如何在多线程环境中高效、安全地执行程序。并发编程能够充分利用多核处理器的计算能力,提高应用程序的响应速度和整体性能。《Java编程并发实战》这本书是...
Java并发编程是软件开发中的重要领域,特别是在大型系统和高并发场景中不可或缺。"13-Java并发编程学习宝典.zip" 包含了一系列关于Java并发编程的学习资源,旨在帮助开发者掌握多线程编程的核心技术和最佳实践。以下...
Java并发编程是Java开发中的重要领域,特别是在多核处理器和分布式系统中,高效地利用并发可以极大地提升程序的性能和响应速度。阿里大牛梁飞编写的《Java并发编程常识》PPT,深入浅出地讲解了这个主题,对开发者来...
Java并发编程是Java开发中的重要领域,涉及到多线程、同步机制、线程池等多个核心概念,对于构建高效、稳定的应用至关重要。这份资料包含了关于Java并发编程的博客和网页,可以提供深入的理解和实践指导。 在Java...
综上所述,尽管Java并发编程中创建线程的方法看似多种多样,但从根本上来说,都离不开实现`Runnable`接口和继承`Thread`类这两种基本方式。掌握这些基础知识对于深入理解和应用Java并发编程至关重要。
根据提供的文件信息,“JAVA并发编程实践 中文 高清 带书签 完整版 Doug Lea .pdf”,我们可以推断出这份文档主要聚焦于Java并发编程的技术实践与理论探讨。下面将从多个角度来解析这个文档可能涵盖的关键知识点。 ...
6.3.3 示例:使用Future实现页面渲染器 6.3.4 在异构任务并行化中存在的局限 6.3.5 CompletionService:Executor与BlockingQueue 6.3.6 示例:使用CompletionService实现页面渲染器 6.3.7 为任务设置时限 6.3.8...
学习《JAVA并发编程实践》这本书,将帮助开发者掌握Java并发编程的核心概念、技术和最佳实践,从而编写出高效、安全、可扩展的并发程序。通过实际案例和经验分享,加深对并发编程的理解,提升解决并发问题的能力。
9. **Future与CompletableFuture**:Future接口代表异步计算的结果,CompletableFuture则提供了更强大的功能,可以构建复杂的异步流程并行执行,便于实现链式调用和回调。 10. **并发编程最佳实践**:书中总结了在...
通过《Java并发编程实践》这本书,读者可以深入了解Java并发编程的原理和最佳实践,提升编写高效、可靠的并发程序的能力。书中不仅讲解了理论知识,还提供了大量示例代码,有助于读者在实践中巩固所学。