`
Donald_Draper
  • 浏览: 984553 次
社区版块
存档分类
最新评论

netty 异步任务-ChannelFuture

阅读更多
netty Inboudn/Outbound通道Invoker:http://donald-draper.iteye.com/blog/2388233
引言:
上一篇看了Channel管道线的父类接口Inboudn/Outbound通道Invoker定义,先来回顾一下:
每个通道Channel拥有自己的管道Pipeline,当通道创建时,管道自动创建,默认为DefaultChannelPipeline。Inbound通道Invoker ChannelInboundInvoker主要是触发管道线ChannelPipeline上的下一个Inbound通道处理器ChannelInboundHandler的相关方法。ChannelInboundInvoker有点Mina过滤器的意味。Outbound通道Invoker ChannelOutboundInvoker主要是触发触发管道线ChannelPipeline上的下一个Outbound通道处理器ChannelOnboundHandler的相关方法,同时增加了一下通道任务创建方法,
ChannelOutboundInvoker也有点Mina过滤器的意味,只不过不像ChannelInboundInvoker的方法命名那么相似。

在Outbound通道Invoker的方法定义中,我们看到有很多类型的返回异步任务,
比如:ChannelFuture,ChannelPromise,ChannelProgressivePromise,我们来看一下这些异步任务:
注意异步任务和异步结果的含义要结合上下文来看。
//ChannelFuture
public interface ChannelFuture extends Future<Void> 

再看ChannelFuture之前,先看Future接口定义

import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;
/**
 * The result of an asynchronous operation.
 一个异步操作接口,从定义来看继承与JUC的Future
 */
@SuppressWarnings("ClassNameSameAsAncestorName")
public interface Future<V> extends java.util.concurrent.Future<V> {

    /**
     * Returns {@code true} if and only if the I/O operation was completed
     * successfully.
     如果一个IO操作是否成功完成,返回ture
     */
    boolean isSuccess();

    /**
     * returns {@code true} if and only if the operation can be cancelled via {@link #cancel(boolean)}.
     如果一个操作通过cancel方法取消,则返回true
     */
    boolean isCancellable();

    /**
     * Returns the cause of the failed I/O operation if the I/O operation has
     * failed.
     *如果IO操作失败,则返回失败原因
     * @return the cause of the failure.
     *         {@code null} if succeeded or this future is not
     *         completed yet.
     */
    Throwable cause();

    /**
     * Adds the specified listener to this future.  The
     * specified listener is notified when this future is
     * {@linkplain #isDone() done}.  If this future is already
     * completed, the specified listener is notified immediately.
     添加任务监听器。当操作完成时,通知监听器。如果操作已经完成,则将立刻通知任务监听器。
     */
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    /**
     * Adds the specified listeners to this future.  The
     * specified listeners are notified when this future is
     * {@linkplain #isDone() done}.  If this future is already
     * completed, the specified listeners are notified immediately.
     与上述方法类似,可以一次添加多个监听器
     */
    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    /**
     * Removes the first occurrence of the specified listener from this future.
     * The specified listener is no longer notified when this
     * future is {@linkplain #isDone() done}.  If the specified
     * listener is not associated with this future, this method
     * does nothing and returns silently.
     从异步任务移除监听器。当操作完成时,不在通知监听器。如果监听器与当前异步任务没有关联,
     则此方什么都不做
     */
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

    /**
     * Removes the first occurrence for each of the listeners from this future.
     * The specified listeners are no longer notified when this
     * future is {@linkplain #isDone() done}.  If the specified
     * listeners are not associated with this future, this method
     * does nothing and returns silently.
     与上述方法类似,可以一次移动多个监听器
     */
    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    /**
     * Waits for this future until it is done, and rethrows the cause of the failure if this future
     * failed.
     等待异步任务,直到操作完成,如果操作失败,则重新抛出失败的原因。
     */
    Future<V> sync() throws InterruptedException;

    /**
     * Waits for this future until it is done, and rethrows the cause of the failure if this future
     * failed.
     以不可中断方式,等待异步任务,直到操作完成,如果操作失败,则重新抛出失败的原因。
     */
    Future<V> syncUninterruptibly();

    /**
     * Waits for this future to be completed.
     *等待操作完成
     * @throws InterruptedException
     *         if the current thread was interrupted
     */
    Future<V> await() throws InterruptedException;

    /**
     * Waits for this future to be completed without
     * interruption.  This method catches an {@link InterruptedException} and
     * discards it silently.
     以不可中断方法等待操作结果,如果遇到中断异常,则直接丢弃
     */
    Future<V> awaitUninterruptibly();

    /**
     * Waits for this future to be completed within the
     * specified time limit.
     *超时等待操作完成
     * @return {@code true} if and only if the future was completed within
     *         the specified time limit
     *
     * @throws InterruptedException
     *         if the current thread was interrupted
     */
    boolean await(long timeout, TimeUnit unit) throws InterruptedException;

    /**
     * Waits for this future to be completed within the
     * specified time limit.
     *超时等待,单位毫秒
     * @return {@code true} if and only if the future was completed within
     *         the specified time limit
     *
     * @throws InterruptedException
     *         if the current thread was interrupted
     */
    boolean await(long timeoutMillis) throws InterruptedException;

    /**
     * Waits for this future to be completed within the
     * specified time limit without interruption.  This method catches an
     * {@link InterruptedException} and discards it silently.
     *超时不可中断等待操作结果
     * @return {@code true} if and only if the future was completed within
     *         the specified time limit
     */
    boolean awaitUninterruptibly(long timeout, TimeUnit unit);

    /**
     * Waits for this future to be completed within the
     * specified time limit without interruption.  This method catches an
     * {@link InterruptedException} and discards it silently.
     *超时不可中断等待操作结果,单位毫秒
     * @return {@code true} if and only if the future was completed within
     *         the specified time limit
     */
    boolean awaitUninterruptibly(long timeoutMillis);

    /**
     * Return the result without blocking. If the future is not done yet this will return {@code null}.
     *
     * As it is possible that a {@code null} value is used to mark the future as successful you also need to check
     * if the future is really done with {@link #isDone()} and not relay on the returned {@code null} value.
     */
    V getNow();

    /**
     * {@inheritDoc}
     *
     * If the cancellation was successful it will fail the future with an {@link CancellationException}.
     如果取消成功,返回一个取消异常的失败结果。
     */
    @Override
    boolean cancel(boolean mayInterruptIfRunning);
}

从Netty的异步任务Future定义来看,继承于JUC的Future,可以异步获取IO操作的结果信息,比如操作是否成功完成,如果失败,可以获取失败的原因,是否取消,同时可以使用cancel方法取消IO操作,添加异步任务监听器,、监听IO操作是否完成,同时可以移除任务监听器,除这些之外我们可以异步、同步等待或超时等待IO操作结果。

来看一下结果监听器:
import java.util.EventListener;

/**
 * Listens to the result of a {@link Future}.  The result of the asynchronous operation is notified once this listener
 * is added by calling {@link Future#addListener(GenericFutureListener)}.
 监听IO操作异步结果。只要监听器被添加的异步任务中,异步操作完成,将会通知监听器。
 */
public interface GenericFutureListener<F extends Future<?>> extends EventListener {

    /**
     * Invoked when the operation associated with the {@link Future} has been completed.
     当异步任务关联的IO操作完成时,触发operationComplete方法
     *
     * @param future  the source {@link Future} which called this callback
     */
    void operationComplete(F future) throws Exception;
}

从上面来异步任务监听器,主要监听一个IO操作是否完成,在异步任务有返回值时,通知监听器。

下面我们来看通道异步任务:
import io.netty.bootstrap.Bootstrap;
import io.netty.util.concurrent.BlockingOperationException;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

import java.util.concurrent.TimeUnit;


/**
 * The result of an asynchronous {@link Channel} I/O operation.
 ChannelFuture为一个通道的异步IO操作结果
 * <p>
 * All I/O operations in Netty are asynchronous.  It means any I/O calls will
 * return immediately with no guarantee that the requested I/O operation has
 * been completed at the end of the call.  Instead, you will be returned with
 * a {@link ChannelFuture} instance which gives you the information about the
 * result or status of the I/O operation.
 Netty所有的IO操作都是异步的。意味着所有IO操作在不能保证在调用结束后,IO请求操作完成
 情况下,立刻返回。然而,你可以返回一个异步结果实例,可以等待异步IO操作的结果或IO状态。
 * <p>
 * A {@link ChannelFuture} is either [i]uncompleted[/i] or [i]completed[/i].
 * When an I/O operation begins, a new future object is created.  The new future
 * is uncompleted initially - it is neither succeeded, failed, nor cancelled
 * because the I/O operation is not finished yet.  If the I/O operation is
 * finished either successfully, with failure, or by cancellation, the future is
 * marked as completed with more specific information, such as the cause of the
 * failure.  Please note that even failure and cancellation belong to the
 * completed state.
 当一个IO操作开始时,不管操作是否完成,一个新的异步操作结果将会被创建。
 如果因为IO操作没有完成,同时既没有成功,失败,也没有取消,新创建的
 异步结果并没有完成初始化。如果IO操作完成,不论操作结果成功,失败或取消,
 异步结果将会标记为完成,同时携带更多的精确信息,比如失败的原因。需要注意的时,
 失败或取消也属于完成状态。
 * <pre>
 *                                      +---------------------------+
 *                                      | Completed successfully    |
 *                                      +---------------------------+
 *                                 +---->      isDone() = true      |
 * +--------------------------+    |    |   isSuccess() = true      |
 * |        Uncompleted       |    |    +===========================+
 * +--------------------------+    |    | Completed with failure    |
 * |      isDone() = false    |    |    +---------------------------+
 * |   isSuccess() = false    |----+---->      isDone() = true      |
 * | isCancelled() = false    |    |    |       cause() = non-null  |
 * |       cause() = null     |    |    +===========================+
 * +--------------------------+    |    | Completed by cancellation |
 *                                 |    +---------------------------+
 *                                 +---->      isDone() = true      |
 *                                      | isCancelled() = true      |
 *                                      +---------------------------+
 * </pre>
 *
 * Various methods are provided to let you check if the I/O operation has been
 * completed, wait for the completion, and retrieve the result of the I/O
 * operation. It also allows you to add {@link ChannelFutureListener}s so you
 * can get notified when the I/O operation is completed.
 异步结果提供不同的方法,用于检查IO操作是否完成,等待操作完成,获取IO操作结果。
 同时运行添加通道结果监听器,以便可以在IO操作完成时获取通知。
 *
 * <h3>Prefer {@link #addListener(GenericFutureListener)} to {@link #await()}</h3>
 *
 * It is recommended to prefer {@link #addListener(GenericFutureListener)} to
 * {@link #await()} wherever possible to get notified when an I/O operation is
 * done and to do any follow-up tasks.
 强烈建议使用添加监听器的方式,而不是等待方式,等待IO操作完成,同时可以做一下一些任务。
 * <p>
 * {@link #addListener(GenericFutureListener)} is non-blocking.  It simply adds
 * the specified {@link ChannelFutureListener} to the {@link ChannelFuture}, and
 * I/O thread will notify the listeners when the I/O operation associated with
 * the future is done.  {@link ChannelFutureListener} yields the best
 * performance and resource utilization because it does not block at all, but
 * it could be tricky to implement a sequential logic if you are not used to
 * event-driven programming.
 添加监听器是非阻塞的。仅仅简单地添加一个通道结果监听器到异步监听结果,当IO操作关联的 异步任务完成时,IO线程将会通知监听器。通道结果监听器因为是非阻塞的,所以有更好的性能和资源利用率,如果你不使用事件驱动编程,可以实现一个时间顺序的逻辑。
 * <p>
 * By contrast, {@link #await()} is a blocking operation.  Once called, the
 * caller thread blocks until the operation is done.  It is easier to implement
 * a sequential logic with {@link #await()}, but the caller thread blocks
 * unnecessarily until the I/O operation is done and there's relatively
 * expensive cost of inter-thread notification.  Moreover, there's a chance of
 * dead lock in a particular circumstance, which is described below.
 相比之下,await方式是一个阻塞操作。一旦调用,调用线程将会阻塞到IO操作完成。
 使用await可以很容易实现一个时序的逻辑,但是调用线程不需要阻塞到IO操作完成,这种
 方式相对于内部线程通知,代价比较大。更进一步说,在特殊的循环下,有可能出现死锁情况,
 具体描述如下:
 *
 * <h3>Do not call {@link #await()} inside {@link ChannelHandler}</h3>
 * <p>不要在通道处理器中调用await方法
 * The event handler methods in {@link ChannelHandler} are usually called by
 * an I/O thread.  If {@link #await()} is called by an event handler
 * method, which is called by the I/O thread, the I/O operation it is waiting
 * for might never complete because {@link #await()} can block the I/O
 * operation it is waiting for, which is a dead lock.
 在通道处理器内,通常有IO线程调用事件处理方法。如果await方法被IO线程调用事件处理方法调用,IO操作将会等待,同时可能因为await方法阻塞IO操作正在等待的条件,可能导致死锁,进而Io操作不能完成。
 * <pre>
 * // BAD - NEVER DO THIS ,坚决不要用await方式
 * {@code @Override}
 * public void channelRead({@link ChannelHandlerContext} ctx, Object msg) {
 *     {@link ChannelFuture} future = ctx.channel().close();
 *     future.awaitUninterruptibly();
 *     // Perform post-closure operation
 *     // ...
 * }
 *
 * // GOOD
 * {@code @Override} 建议方式,添加通道结果监听器
 * public void channelRead({@link ChannelHandlerContext} ctx, Object msg) {
 *     {@link ChannelFuture} future = ctx.channel().close();
 *     future.addListener(new {@link ChannelFutureListener}() {
 *         public void operationComplete({@link ChannelFuture} future) {
 *             // Perform post-closure operation
 *             // ...
 *         }
 *     });
 * }
 * </pre>
 * <p>
 * In spite of the disadvantages mentioned above, there are certainly the cases
 * where it is more convenient to call {@link #await()}. In such a case, please
 * make sure you do not call {@link #await()} in an I/O thread.  Otherwise,
 * {@link BlockingOperationException} will be raised to prevent a dead lock.
 尽管await方法有诸多缺点,但在其他一些场景中,释放await方法,非常便利。在这些场景中,
 要确保不在IO线程中,调用await方法。否则阻塞操作异常将会抛出,以阻止死锁的产生。
 *

 * <h3>Do not confuse I/O timeout and await timeout</h3>
 *不要混淆IO超时和超时等待。
 * The timeout value you specify with {@link #await(long)},
 * {@link #await(long, TimeUnit)}, {@link #awaitUninterruptibly(long)}, or
 * {@link #awaitUninterruptibly(long, TimeUnit)} are not related with I/O
 * timeout at all.  If an I/O operation times out, the future will be marked as
 * 'completed with failure,' as depicted in the diagram above.  For example,
 * connect timeout should be configured via a transport-specific option:
 在await*(*)方法中的超时时间与IO超时一点关系也没有。如果一个IO操作超时,异步结果
 将被标记为失败并完成,如果上图中的描述。比如,连接超时应该通过transport配置。
 * <pre>
 * // BAD - NEVER DO THIS 坚决不要使用这种方式
 * {@link Bootstrap} b = ...;
 * {@link ChannelFuture} f = b.connect(...);
 * f.awaitUninterruptibly(10, TimeUnit.SECONDS);
 * if (f.isCancelled()) {
 *     // Connection attempt cancelled by user
 * } else if (!f.isSuccess()) {
 *     // You might get a NullPointerException here because the future
 *     // might not be completed yet.
 *     f.cause().printStackTrace();
 * } else {
 *     // Connection established successfully
 * }
 *
 * // GOOD 建议方式
 * {@link Bootstrap} b = ...;
 * // Configure the connect timeout option.
 * <b>b.option({@link ChannelOption}.CONNECT_TIMEOUT_MILLIS, 10000);</b>
 * {@link ChannelFuture} f = b.connect(...);
 * f.awaitUninterruptibly();
 *
 * // Now we are sure the future is completed.
 * assert f.isDone();
 *
 * if (f.isCancelled()) {
 *     // Connection attempt cancelled by user
 * } else if (!f.isSuccess()) {
 *     f.cause().printStackTrace();
 * } else {
 *     // Connection established successfully
 * }
 * </pre>
 */
public interface ChannelFuture extends Future<Void> {

    /**
     * Returns a channel where the I/O operation associated with this
     * future takes place.
     返回异步结果关联IO操作所在的通道
     */
    Channel channel();
    //添加移除监听器,同步异步等待方法为空体,因为ChannelFuture继承与空异步结果,即没有返回值
    @Override
    ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelFuture sync() throws InterruptedException;

    @Override
    ChannelFuture syncUninterruptibly();

    @Override
    ChannelFuture await() throws InterruptedException;

    @Override
    ChannelFuture awaitUninterruptibly();

    /**
     * Returns {@code true} if this {@link ChannelFuture} is a void future and so not allow to call any of the
     * following methods:
     如果通道异步结果为void,返回ture,并不允许调用下面方法
     * [list]
     *     [*]{@link #addListener(GenericFutureListener)}

     *     [*]{@link #addListeners(GenericFutureListener[])}

     *     [*]{@link #await()}

     *     [*]{@link #await(long, TimeUnit)} ()}

     *     [*]{@link #await(long)} ()}

     *     [*]{@link #awaitUninterruptibly()}

     *     [*]{@link #sync()}

     *     [*]{@link #syncUninterruptibly()}

     * [/list]
     */
    boolean isVoid();
}

从上面可以看出,因为ChannelFuture继承于空异步结果,即没有返回值,所以添加移除监听器,同步异步等待方法为空体。netty所有的IO操作都是异步的,当一个IO操作开始时,不管操作是否完成,一个新的异步操作结果将会被创建。如果因为IO操作没有完成,同时既没有成功,失败,也没有取消,新创建的那么,异步结果并没有完成初始化。如果IO操作完成,不论操作结果成功,失败或取消,异步结果将会标记为完成,同时携带更多的精确信息,比如失败的原因。需要注意的时,失败或取消也属于完成状态。强烈建议使用添加监听器的方式等待IO操作结果,而不await方法,因为监听器模式时非阻塞的,有更好的性能和资源利用率。

再来看一通道结果监听器:
package io.netty.channel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;


/**
 * Listens to the result of a {@link ChannelFuture}.  The result of the
 * asynchronous {@link Channel} I/O operation is notified once this listener
 * is added by calling {@link ChannelFuture#addListener(GenericFutureListener)}.
 *通道结果监听器ChannelFutureListener,监听通道任务的结果。一旦监听器被添加到通道任务中,
 当通道的异步IO操作完成时,将会通知监听器。
 * <h3>Return the control to the caller quickly</h3>
 *快速地将控制权交给调用者
 * {@link #operationComplete(Future)} is directly called by an I/O
 * thread.  Therefore, performing a time consuming task or a blocking operation
 * in the handler method can cause an unexpected pause during I/O.  If you need
 * to perform a blocking operation on I/O completion, try to execute the
 * operation in a different thread using a thread pool.
#operationComplete直接通过IO线程调用。因此在IO操作过程中,执行一个耗时的任务或者阻塞操作在处理方法中,可能引起一个不期望的异常抛出。如果你需要执行一个阻塞操作在IO操作完成时,尝试在一个线程池中的不同线程执行操作。
 */
public interface ChannelFutureListener extends GenericFutureListener<ChannelFuture> {

    /**
     * A {@link ChannelFutureListener} that closes the {@link Channel} which is
     * associated with the specified {@link ChannelFuture}.
     在操作完成时,关闭通道任务关联的通道
     */
    ChannelFutureListener CLOSE = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            future.channel().close();
        }
    };

    /**
     * A {@link ChannelFutureListener} that closes the {@link Channel} when the
     * operation ended up with a failure or cancellation rather than a success.
     当IO操作失败时,关闭通道任务关联的通道
     */
    ChannelFutureListener CLOSE_ON_FAILURE = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            if (!future.isSuccess()) {
                future.channel().close();
            }
        }
    };

    /**
     * A {@link ChannelFutureListener} that forwards the {@link Throwable} of the {@link ChannelFuture} into the
     * {@link ChannelPipeline}. This mimics the old behavior of Netty 3.
     转发通道任务异常到Channel管道。默认Netty3的行为。
     */
    ChannelFutureListener FIRE_EXCEPTION_ON_FAILURE = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            if (!future.isSuccess()) {
                future.channel().pipeline().fireExceptionCaught(future.cause());
            }
        }
    };

    // Just a type alias
}

从上面来看,通道结果监听器内部有3个监听器,分别为在操作完成时,关闭通道任务关联的通道的监听器CLOSE;当IO操作失败时,关闭通道任务关联的通道的监听器CLOSE_ON_FAILURE;转发通道任务异常到Channel管道的监听器FIRE_EXCEPTION_ON_FAILURE。

来看一下可写的通道结果ChannelPromise
/**
 * Special {@link ChannelFuture} which is writable.
 */
public interface ChannelPromise extends ChannelFuture, Promise<Void> {
在往下看之前,来看Promise接口定义:
/**
 * Special {@link Future} which is writable.
 可写的Future
 */
public interface Promise<V> extends Future<V> {

    /**
     * Marks this future as a success and notifies all
     * listeners.
     标记任务成功,通知所有监听器
     *
     * If it is success or failed already it will throw an {@link IllegalStateException}.
     如果任务已经成功完成或失败,则抛出非法状态异常
     */
    Promise<V> setSuccess(V result);

    /**
     * Marks this future as a success and notifies all
     * listeners.
     *标记任务成功,通知所有监听器
     * @return {@code true} if and only if successfully marked this future as
     *         a success. Otherwise {@code false} because this future is
     *         already marked as either a success or a failure.
     成功标记Future为成功完成,则返回true,如果任务已经标记成功完成或失败,则返回false
     */
    boolean trySuccess(V result);

    /**
     * Marks this future as a failure and notifies all
     * listeners.
     *标记任务失败,通知所有监听器
     * If it is success or failed already it will throw an {@link IllegalStateException}.
     如果任务已经成功完成或失败,则抛出非法状态异常
     */
    Promise<V> setFailure(Throwable cause);

    /**
     * Marks this future as a failure and notifies all
     * listeners.
     *标记任务失败,通知所有监听器
     * @return {@code true} if and only if successfully marked this future as
     *         a failure. Otherwise {@code false} because this future is
     *         already marked as either a success or a failure.
      成功标记Future为失败完成,则返回true,如果任务已经标记成功完成或失败,则返回false
     */
    boolean tryFailure(Throwable cause);

    /**
     * Make this future impossible to cancel.
     *标记任务不可能取消
     * @return {@code true} if and only if successfully marked this future as uncancellable or it is already done
     *         without being cancelled.  {@code false} if this future has been cancelled already.
     如果成功标记不可取消,或在没有取消的情况下已经标记,则返回true,如果任务已经取消,返回false
     */
    boolean setUncancellable();
    //下面方法与Future相同

    @Override
    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    @Override
    Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    @Override
    Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

    @Override
    Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    @Override
    Promise<V> await() throws InterruptedException;

    @Override
    Promise<V> awaitUninterruptibly();

    @Override
    Promise<V> sync() throws InterruptedException;

    @Override
    Promise<V> syncUninterruptibly();
}

从Promise任务定义可以看出,继承了任务Future,但多了以便标记成功、失败和不可取消的方法。
再来看一下
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;

/**
 * Special {@link ChannelFuture} which is writable.
 */
public interface ChannelPromise extends ChannelFuture, Promise<Void> {

    @Override
    Channel channel();

    @Override
    ChannelPromise setSuccess(Void result);

    ChannelPromise setSuccess();

    boolean trySuccess();

    @Override
    ChannelPromise setFailure(Throwable cause);

    @Override
    ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelPromise sync() throws InterruptedException;

    @Override
    ChannelPromise syncUninterruptibly();

    @Override
    ChannelPromise await() throws InterruptedException;

    @Override
    ChannelPromise awaitUninterruptibly();

    /**
     * Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself.
     如果isVoid返回true,而不它自己,则返回一个新的ChannelPromise
     */
    ChannelPromise unvoid();
}

从上可以看出,ChannelPromise与ChannelFuture的不同在于ChannelPromise可以标记任务结果。


总结:

      netty的异步结果Future继承于JUC的Future,可以异步获取IO操作的结果信息,比如IO操作是否成功完成,如果失败,可以获取失败的原因,是否取消,同时可以使用cancel方法取消IO操作,添加异步结果监听器,、监听IO操作是否完成,并可以移除结果监听器,除这些之外我们还可以异步、同步等待或超时等待IO操作结果。
      异步结果监听器GenericFutureListener,主要监听一个IO操作是否完成,在异步结果有返回值时,通知监听器。
      ChannelFuture继承于空异步结果,即没有返回值,所以添加移除监听器,同步异步等待方法为空体。netty所有的IO操作都是异步的,当一个IO操作开始时,不管操作是否完成,一个新的异步操作结果将会被创建。如果因为IO操作没有完成,同时既没有成功,失败,也没有取消,新创建的那么,异步结果并没有完成初始化。如果IO操作完成,不论操作结果成功,失败或取消,异步结果将会标记为完成,同时携带更多的精确信息,比如失败的原因。需要注意的时,失败或取消也属于完成状态。强烈建议使用添加监听器的方式等待IO操作结果,而不await方法,因为监听器模式时非阻塞的,有更好的性能和资源利用率。
      通道结果监听器ChannelFutureListener内部有3个监听器,分别为在操作完成时,关闭通道任务关联的通道的监听器CLOSE;当IO操作失败时,关闭通道任务关联的通道的监听器CLOSE_ON_FAILURE;转发通道任务异常到Channel管道的监听器FIRE_EXCEPTION_ON_FAILURE。
       Promise任务继承了任务Future,但多了以便标记成功、失败和不可取消的方法。
ChannelPromise与ChannelFuture的不同在于ChannelPromise可以标记任务结果。
ChannelProgressivePromise与ProgressivePromise,ChannelProgressiveFuture的关系与ChannelPromise与Promise,ChannelFuture的关系类似,只不过ChannelPromise表示异步操作任务,ChannelProgressivePromise表示异步任务的进度,同时Promise类型异步任务都是可写的。

附:
ChannelProgressivePromise接口,简单看一下:
//ChannelProgressivePromise
package io.netty.channel;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.ProgressivePromise;

/**
 * Special {@link ChannelPromise} which will be notified once the associated bytes is transferring.
 当关联的字节数据正在传输时,ChannelProgressivePromise将会被通知
 */
public interface ChannelProgressivePromise extends ProgressivePromise<Void>, ChannelProgressiveFuture, ChannelPromise {

    @Override
    ChannelProgressivePromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelProgressivePromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelProgressivePromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelProgressivePromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelProgressivePromise sync() throws InterruptedException;

    @Override
    ChannelProgressivePromise syncUninterruptibly();

    @Override
    ChannelProgressivePromise await() throws InterruptedException;

    @Override
    ChannelProgressivePromise awaitUninterruptibly();

    @Override
    ChannelProgressivePromise setSuccess(Void result);

    @Override
    ChannelProgressivePromise setSuccess();

    @Override
    ChannelProgressivePromise setFailure(Throwable cause);

    @Override
    ChannelProgressivePromise setProgress(long progress, long total);

    @Override
    ChannelProgressivePromise unvoid();
}


//ProgressivePromise
/**
 * Special {@link ProgressiveFuture} which is writable.
 可写的过程任务。
 */
public interface ProgressivePromise<V> extends Promise<V>, ProgressiveFuture<V> {

    /**
     * Sets the current progress of the operation and notifies the listeners that implement
     * {@link GenericProgressiveFutureListener}.
     设置当前操作进程,并通知监听器GenericProgressiveFutureListener
     */
    ProgressivePromise<V> setProgress(long progress, long total);

    /**
     * Tries to set the current progress of the operation and notifies the listeners that implement
     * {@link GenericProgressiveFutureListener}.  If the operation is already complete or the progress is out of range,
     * this method does nothing but returning {@code false}.
     设置当前操作进程,并通知监听器GenericProgressiveFutureListener。如果此操作已经完成,进度已经超过范围,
     此方法不会做任何事情,仅仅返回false。
     */
    boolean tryProgress(long progress, long total);

    @Override
    ProgressivePromise<V> setSuccess(V result);

    @Override
    ProgressivePromise<V> setFailure(Throwable cause);

    @Override
    ProgressivePromise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    @Override
    ProgressivePromise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    @Override
    ProgressivePromise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

    @Override
    ProgressivePromise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    @Override
    ProgressivePromise<V> await() throws InterruptedException;

    @Override
    ProgressivePromise<V> awaitUninterruptibly();

    @Override
    ProgressivePromise<V> sync() throws InterruptedException;

    @Override
    ProgressivePromise<V> syncUninterruptibly();
}

//GenericProgressiveFutureListener
package io.netty.util.concurrent;

public interface GenericProgressiveFutureListener<F extends ProgressiveFuture<?>> extends GenericFutureListener<F> {
    /**
     * Invoked when the operation has progressed.
     *操作已经达到所设的进度
     * @param progress the progress of the operation so far (cumulative),操作当前进度
     * @param total the number that signifies the end of the operation when {@code progress} reaches at it.
     *              {@code -1} if the end of operation is unknown.
     total,当操作完成时达到的进度,如果操作的结束点不确定,则为-1
     */
    void operationProgressed(F future, long progress, long total) throws Exception;
}


//ProgressiveFuture
**
 * A {@link Future} which is used to indicate the progress of an operation.
 表示一个操作的进度
 */
public interface ProgressiveFuture<V> extends Future<V> {

    @Override
    ProgressiveFuture<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    @Override
    ProgressiveFuture<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    @Override
    ProgressiveFuture<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

    @Override
    ProgressiveFuture<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    @Override
    ProgressiveFuture<V> sync() throws InterruptedException;

    @Override
    ProgressiveFuture<V> syncUninterruptibly();

    @Override
    ProgressiveFuture<V> await() throws InterruptedException;

    @Override
    ProgressiveFuture<V> awaitUninterruptibly();
}


//ChannelProgressiveFuture

/**
 * An special {@link ChannelFuture} which is used to indicate the {@link FileRegion} transfer progress
 表示一个文件region的传输进度
 */
public interface ChannelProgressiveFuture extends ChannelFuture, ProgressiveFuture<Void> {
    @Override
    ChannelProgressiveFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelProgressiveFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelProgressiveFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelProgressiveFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelProgressiveFuture sync() throws InterruptedException;

    @Override
    ChannelProgressiveFuture syncUninterruptibly();

    @Override
    ChannelProgressiveFuture await() throws InterruptedException;

    @Override
    ChannelProgressiveFuture awaitUninterruptibly();
}
0
0
分享到:
评论

相关推荐

    netty-netty-4.1.79.Final.tar.gz

    其ChannelFuture和Promise机制,使得异步操作的链式调用变得简单,同时提供了灵活的错误处理策略。 5. **零拷贝**:Netty实现了操作系统层面的零拷贝,通过直接缓冲区和FileChannel的transferTo方法,减少了数据在...

    spring boot 集成原生netty(非netty-io-socket)

    Netty并非是Java标准库中的部分,而是由LinkedIn开源的一个异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。 集成Netty到Spring Boot项目中,可以利用Netty的非阻塞I/O模型和高度可...

    netty-all-5.0.0.Alpha2

    7. **ChannelFuture和Promise**: ChannelFuture表示某个I/O操作的异步结果,而Promise是其子类,用于在事件完成时设置结果或监听事件。这为异步编程提供了便利。 8. **协议支持**: Netty支持多种网络协议,包括HTTP...

    Netty实战-netty-in-action-2.zip

    5. **EventLoop**:事件循环是Netty的异步基础,它负责调度任务并处理事件。每个Worker线程对应一个EventLoop。 6. **ChannelFuture**和**ChannelPromise**:用于异步操作的接口,ChannelFuture表示某个I/O操作的...

    Netty-API-文档中文版

    通过“Netty-API-文档中文版”,你可以详细了解到如何创建ServerBootstrap、Bootstrap、ChannelFuture、ChannelInboundHandlerAdapter等核心组件,以及如何配置和使用各种编解码器。文档还会介绍如何进行连接管理、...

    netty-netty-4.1.66.Final.tar.gz

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。这个“netty-netty-4.1.66.Final.tar.gz”压缩包包含了Netty的4.1.66.Final稳定版本,这是一个广泛使用的Java...

    netty-api-4.1 中文版

    在使用 Netty API 4.1 的过程中,开发者可以利用 ChannelFuture 和 Promise 来异步地处理网络操作,它们代表了未来的操作结果,可以注册监听器来获取操作完成的通知。同时,Netty 提供了丰富的工具类,如 ByteBuf,...

    netty-api-4.1中文.rar

    例如,它的 ChannelFuture 对象允许异步操作,并且可以链式调用,使得代码更清晰。 4. **可伸缩性**:Netty 可以轻松地在单台机器或分布式系统上扩展。其事件模型允许高效地处理大量并发连接。 5. **内存管理**:...

    netty-demo-boot.zip

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。这个"netty-demo-boot.zip"文件显然包含了使用 Netty 构建的一个演示应用,它很可能是基于 Spring Boot 的,...

    Netty权威指南源码-含jar包,导入eclipse直接使用

    10. **ChannelFuture和Promise**:这些概念是Netty异步编程的基础,它们允许在操作完成时注册回调,简化了异步编程的复杂性。 在Eclipse中使用Netty源码,你可以深入了解其内部实现,学习如何构建网络服务,理解...

    netty面试专题-答案-一起学习

    8. **强大的工具集**:Netty 提供了诸如 ChannelFuture、Promise、ChannelInboundHandlerAdapter 等工具类,使得异步编程变得更加简单。 9. **跨平台兼容**:Netty 是跨平台的,可以在不同的操作系统上运行,并且与...

    Netty源码教程-3

    在本节"Netty源码教程-3"中,我们将深入探讨Netty这一高性能、异步事件驱动的网络应用程序框架。Netty广泛应用于各种分布式系统、服务器和客户端应用,尤其在处理高并发、低延迟的网络通信场景下,其优势尤为突出。...

    netty-file-master.rar

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本文中,我们将深入探讨 Netty 在文件传输中的应用,以及如何使用 Netty 实现文件的上传和下载功能。 1. *...

    Netty通讯框架-多线程-源代码

    5. `ChannelFuture`:代表了Channel的异步操作结果,我们可以通过监听`ChannelFuture`来获取操作完成的通知。 6. 多线程优化:Netty通过避免不必要的线程上下文切换和使用非阻塞I/O,提高了系统的并发性能。例如,...

    NettyDemo-master.zip

    - 通过 ChannelFuture 和 ChannelPromise,Netty 支持异步操作的链式调用和回调机制。 5. **Android 应用中的注意事项** - Android 的内存限制可能需要优化 Netty 配置,比如减少缓冲区大小、限制并发连接数等。 ...

    boot-example-netty-udp-2.0.5

    在现代的网络编程中,Netty作为一个高性能、异步事件驱动的网络应用框架,被广泛应用于构建高并发、低延迟的服务器。而SpringBoot作为Spring框架的轻量级启动器,能够简化微服务的开发流程。本篇将详细讲解如何利用...

    Netty权威指南-目录-书签-高清-完整版

    《Netty权威指南》是一本深入探讨Netty框架的详细教程,旨在帮助读者全面理解并掌握这个高性能、异步事件驱动的网络应用框架。Netty是Java领域内广泛使用的网络编程框架,尤其在开发高并发、低延迟的网络服务时,它...

    Netty4.1实战-手写RPC框架.pdf

    - 高性能:利用Java NIO实现异步非阻塞I/O操作; - 易用性:提供丰富的API便于开发者快速构建复杂的网络应用; - 可靠性:经过大量测试及社区验证,稳定性较高。 #### 三、手写RPC框架设计与实现 本节将详细...

    Netty源码解析-服务启动过程.pdf

    Netty是一个高性能、异步事件驱动的网络应用框架,它被广泛应用于快速开发高性能协议服务器和客户端。Netty通过高度优化的设计和实现提供了低延迟和高吞吐量的特性,尤其适合于实时通信场景。 #### 二、服务启动...

    netty-websocket-demo:一个简单的netty实现的websocket

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本文中,我们将深入探讨如何使用 Netty 实现 WebSocket 协议,这是一个基于 TCP 的低延迟、全双工通信协议...

Global site tag (gtag.js) - Google Analytics