`

Netty4.1源码 :DefaultPromise

 
阅读更多

 

 /**
 * 异步操作回调类,当某项操作异步执行时(由另外的线程执行),当前线程当即返回DefaultPromise
 * DefaultPromise.addListener  方法可以添加监听器,当操作完成时被触发
 * 或者调用DefaultPromise.sync()阻塞当前线程,等待操作完成。
 **/

 public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
    

    private volatile Object result; //结果值
    private final EventExecutor executor;
    /**
     * 监听器对象,可能是一个或多个监听器
     */
    private Object listeners;
   
    

  /**
   * 设置操作成功并通知监听器,如设置失败,抛出异常
   *  
   **/
    public Promise<V> setSuccess(V result) {
        if (setSuccess0(result)) {
            notifyListeners();
            return this;
        }
        throw new IllegalStateException("complete already: " + this);
    }

   /**
   * 设置操作成功并通知监听器,如设置失败,则返回false
   *  
   **/
    public boolean trySuccess(V result) {
        if (setSuccess0(result)) {
            notifyListeners();
            return true;
        }
        return false;
    }

    /**
	* 设置操作失败并通知监听器,如设置失败,抛出异常
	*  
	*/
    public Promise<V> setFailure(Throwable cause) {
        if (setFailure0(cause)) {
            notifyListeners();
            return this;
        }
        throw new IllegalStateException("complete already: " + this, cause);
    }

    /**
	* 设置操作失败并通知监听器,如设置失败,返回false
	*/
    public boolean tryFailure(Throwable cause) {
        if (setFailure0(cause)) {
            notifyListeners();
            return true;
        }
        return false;
    }

 
    /**
	* 操作是否成功
	*/
    public boolean isSuccess() {
        Object result = this.result;
        return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
    }

    /**
	* 操作是否可以取消
	*/
    public boolean isCancellable() {
        return result == null;
    }

    /**
	* 查年操作的抛出的异常
	*/
    public Throwable cause() {
        Object result = this.result;
        return (result instanceof CauseHolder) ? ((CauseHolder) result).cause : null;
    }

    //添加或移除监听器
    public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {}
    public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {}
    public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) {}
    public Promise<V> removeListeners(final GenericFutureListener<? extends Future<? super V>>... listeners) {}

    /**
	* 阻塞当前线程,等待异步任务操作完成
	*
	**/
    public Promise<V> await() throws InterruptedException {
        if (isDone()) {
            return this;
        }

        if (Thread.interrupted()) {
            throw new InterruptedException(toString());
        }

        checkDeadLock();

        synchronized (this) {
            while (!isDone()) {
                incWaiters();
                try {
                    wait();
                } finally {
                    decWaiters();
                }
            }
        }
        return this;
    }

    /**
	* 阻塞当前线程,等待异步任务操作完成,接受中断请求
	*
	**/
    public Promise<V> awaitUninterruptibly() {
        if (isDone()) {
            return this;
        }

        checkDeadLock();

        boolean interrupted = false;
        synchronized (this) {
            while (!isDone()) {
                incWaiters();
                try {
                    wait();
                } catch (InterruptedException e) {
                    // Interrupted while waiting.
                    interrupted = true;
                } finally {
                    decWaiters();
                }
            }
        }

        if (interrupted) {
            Thread.currentThread().interrupt();
        }

        return this;
    }

    /**
	* 阻塞当前线程,等待异步任务操作完成,如果在等待时间花费完成之后,异步任务还未完成则不在等待(退出阻塞)
	*
	**/
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return await0(unit.toNanos(timeout), true);
    }

     /**
	* 阻塞当前线程,等待异步任务操作完成,如果在等待时间花费完成之后,异步任务还未完成则不在等待(退出阻塞)
	*
	**/
    public boolean await(long timeoutMillis) throws InterruptedException {
        return await0(MILLISECONDS.toNanos(timeoutMillis), true);
    }
 

    /**
	* 获取异步操作的结果值
	**/
    public V getNow() {
        Object result = this.result;
        if (result instanceof CauseHolder || result == SUCCESS) {
            return null;
        }
        return (V) result;
    }
 

    /**
	* 任务是否被取消
	**/
    public boolean isCancelled() {
        return isCancelled0(result);
    }

     /**
	* 异步任务是否执行完成
	**/
    public boolean isDone() {
        return isDone0(result);
    }
	
    /**
	* 阻塞直至异步任务完成,或抛出异步任务出错的异常
	*/
    public Promise<V> sync() throws InterruptedException {
        await();
        rethrowIfFailed();
        return this;
    }
	/**
	* 阻塞直至异步任务完成,或抛出异步任务出错的异常,支持中断
	*/
    public Promise<V> syncUninterruptibly() {
        awaitUninterruptibly();
        rethrowIfFailed();
        return this;
    }
 

    /**
     * Notify a listener that a future has completed.
     * <p>
     * This method has a fixed depth of {@link #MAX_LISTENER_STACK_DEPTH} that will limit recursion to prevent
     * {@link StackOverflowError} and will stop notifying listeners added after this threshold is exceeded.
     * @param eventExecutor the executor to use to notify the listener {@code listener}.
     * @param future the future that is complete.
     * @param listener the listener to notify.
     */
    protected static void notifyListener(
            EventExecutor eventExecutor, final Future<?> future, final GenericFutureListener<?> listener) {
        
        notifyListenerWithStackOverFlowProtection(eventExecutor, future, listener);
    }
	/**
	* 通知监听器
	**/
	
    private void notifyListeners() {
        EventExecutor executor = executor();
        if (executor.inEventLoop()) {
            final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
            final int stackDepth = threadLocals.futureListenerStackDepth();
            if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
                threadLocals.setFutureListenerStackDepth(stackDepth + 1);
                try {
                    notifyListenersNow();
                } finally {
                    threadLocals.setFutureListenerStackDepth(stackDepth);
                }
                return;
            }
        }

        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                notifyListenersNow();
            }
        });
    }

   //ignore some code
}

 

分享到:
评论

相关推荐

    Netty 4.1源码包

    Netty4.1的源码,欢迎大家下载。.............................................................................................................................................................................

    netty4.1源码

    这个“netty4.1源码”压缩包包含的是Netty框架4.1版本的源代码,对于深入理解Netty的工作原理、性能优化以及自定义功能扩展非常有帮助。 Netty的核心特性包括: 1. **异步非阻塞I/O**:Netty基于Java NIO(非阻塞I...

    netty-4.1 源码包

    这个"Netty-4.1 源码包"包含了Netty框架的源代码,允许开发者深入理解其内部工作原理,优化自定义实现,或者排查问题。 在Netty 4.1 版本中,主要包含以下关键知识点: 1. **NIO (Non-blocking I/O)**: Netty 使用...

    netty 4.1 中文api 帮助文档 + 用户指南

    在本文中,我们将深入探讨 Netty 4.1 的中文API帮助文档和用户指南,以及如何利用这些资源来提升你的网络编程技能。 首先,Netty 4.1 中文API帮助文档是理解 Netty 内部机制的关键工具。它包含了详细的类、接口、...

    netty 4.1中文.CHM

    这个“netty 4.1 中文.CHM”文件是一个压缩包,包含的是Netty 4.1版本的中文版帮助文档,对于开发者来说是一个非常宝贵的资源,特别是对于那些中文为母语的开发者,它提供了方便的理解和学习Netty的途径。...

    netty-common-4.1.65.Final-API文档-中英对照版.zip

    赠送jar包:netty-common-4.1.65.Final.jar; 赠送原API文档:netty-common-4.1.65.Final-javadoc.jar; 赠送源代码:netty-common-4.1.65.Final-sources.jar; 赠送Maven依赖信息文件:netty-common-4.1.65.Final....

    netty案例,netty4.1中级拓展篇八《Netty心跳服务与断线重连》源码

    netty案例,netty4.1中级拓展篇八《Netty心跳服务与断线重连》源码 https://mp.weixin.qq.com/s?__biz=MzIxMDAwMDAxMw==&mid=2650724845&idx=1&sn=8631c590ff4876ba0b7af64df16fc54b&scene=19#wechat_redirect

    netty 4.1 英文api 帮助文档 + 用户指南 chm

    这个压缩包包含的是Netty 4.1的英文API帮助文档和用户指南,对于理解和使用Netty框架非常有帮助。 首先,我们来看`netty 4.1.CHM`文件,这是一个CHM(Compiled Help Manual)格式的帮助文档,通常包含了详细的API...

    netty-all-4.1.68.Final-API文档-中文版.zip

    赠送jar包:netty-all-4.1.68.Final.jar; 赠送原API文档:netty-all-4.1.68.Final-javadoc.jar; 赠送源代码:netty-all-4.1.68.Final-sources.jar; 赠送Maven依赖信息文件:netty-all-4.1.68.Final.pom; 包含...

    Netty-4.1.97.Final源码

    Netty-4.1.97.Final源码提供了对Netty内部机制的深度洞察,对于Java程序员尤其是希望提升网络编程能力或进行定制化开发的人来说,是一份极其宝贵的资料。 首先,让我们从整体上了解Netty的架构设计。Netty采用了...

    以netty4.1源码中的EchoServer为例对netty的源码进行分析.docx

    在本文中,我们将深入分析 Netty 4.1 源码中的 EchoServer 示例,以理解其核心组件和工作原理。 首先,我们关注 EchoServer 服务端的初始化,这涉及到两个关键组件:`bossGroup` 和 `workerGroup`。它们都是 `...

    netty-netty-4.1.32.final-remark.zip

    标题 "netty-netty-4.1.32.final-remark.zip" 提到了 Netty 的版本号 4.1.32.Final,这表明这是一个关于 Netty 4.1.32.Final 版本的资料包。"final" 表示这是该版本的最终发布,通常意味着经过了充分测试和稳定。...

    netty-netty-4.1.79.Final.tar.gz

    这个“netty-netty-4.1.79.Final.tar.gz”文件是一个包含Netty 4.1.79.Final版本的压缩包,通常用于Java开发环境。解压后,我们可以得到Netty的源代码、库文件和其他相关资源。 Netty的核心特性包括: 1. **异步...

    netty案例,netty4.1中级拓展篇十三《Netty基于SSL实现信息传输过程中双向加密验证》源码

    netty案例,netty4.1中级拓展篇十三《Netty基于SSL实现信息传输过程中双向加密验证》源码 ...

    netty案例,netty4.1中级拓展篇一《Netty与SpringBoot整合》源码

    netty案例,netty4.1中级拓展篇一《Netty与SpringBoot整合》源码 https://mp.weixin.qq.com/s?__biz=MzIxMDAwMDAxMw==&mid=2650724796&idx=1&sn=ce5dc3c913d464b0e2e4e429a17bb01e&scene=19#wechat_redirect

    netty案例,netty4.1基础入门篇十一《netty udp通信方式案例Demo》源码

    netty案例,netty4.1基础入门篇十一《netty udp通信方式案例Demo》源码 https://mp.weixin.qq.com/s?__biz=MzIxMDAwMDAxMw==&mid=2650724927&idx=1&sn=a16bc8e98d6a27816da0896adcc83778&scene=19#wechat_redirect

    netty案例,netty4.1中级拓展篇九《Netty集群部署实现跨服务端通信的落地方案》源码

    netty案例,netty4.1中级拓展篇九《Netty集群部署实现跨服务端通信的落地方案》源码 ...

    netty案例,netty4.1基础入门篇六《NettyServer群发消息》源码

    netty案例,netty4.1基础入门篇六《NettyServer群发消息》源码 https://mp.weixin.qq.com/s?__biz=MzIxMDAwMDAxMw==&mid=2650724778&idx=1&sn=72e4b1ea5323475b16e99c6720c7069d&scene=19#wechat_redirect

    netty-common-4.1.68.Final-API文档-中文版.zip

    赠送jar包:netty-common-4.1.68.Final.jar; 赠送原API文档:netty-common-4.1.68.Final-javadoc.jar; 赠送源代码:netty-common-4.1.68.Final-sources.jar; 赠送Maven依赖信息文件:netty-common-4.1.68.Final....

Global site tag (gtag.js) - Google Analytics