- 浏览: 984553 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
netty Inboudn/Outbound通道Invoker:http://donald-draper.iteye.com/blog/2388233
引言:
上一篇看了Channel管道线的父类接口Inboudn/Outbound通道Invoker定义,先来回顾一下:
每个通道Channel拥有自己的管道Pipeline,当通道创建时,管道自动创建,默认为DefaultChannelPipeline。Inbound通道Invoker ChannelInboundInvoker主要是触发管道线ChannelPipeline上的下一个Inbound通道处理器ChannelInboundHandler的相关方法。ChannelInboundInvoker有点Mina过滤器的意味。Outbound通道Invoker ChannelOutboundInvoker主要是触发触发管道线ChannelPipeline上的下一个Outbound通道处理器ChannelOnboundHandler的相关方法,同时增加了一下通道任务创建方法,
ChannelOutboundInvoker也有点Mina过滤器的意味,只不过不像ChannelInboundInvoker的方法命名那么相似。
在Outbound通道Invoker的方法定义中,我们看到有很多类型的返回异步任务,
比如:ChannelFuture,ChannelPromise,ChannelProgressivePromise,我们来看一下这些异步任务:
注意异步任务和异步结果的含义要结合上下文来看。
//ChannelFuture
再看ChannelFuture之前,先看Future接口定义
从Netty的异步任务Future定义来看,继承于JUC的Future,可以异步获取IO操作的结果信息,比如操作是否成功完成,如果失败,可以获取失败的原因,是否取消,同时可以使用cancel方法取消IO操作,添加异步任务监听器,、监听IO操作是否完成,同时可以移除任务监听器,除这些之外我们可以异步、同步等待或超时等待IO操作结果。
来看一下结果监听器:
从上面来异步任务监听器,主要监听一个IO操作是否完成,在异步任务有返回值时,通知监听器。
下面我们来看通道异步任务:
从上面可以看出,因为ChannelFuture继承于空异步结果,即没有返回值,所以添加移除监听器,同步异步等待方法为空体。netty所有的IO操作都是异步的,当一个IO操作开始时,不管操作是否完成,一个新的异步操作结果将会被创建。如果因为IO操作没有完成,同时既没有成功,失败,也没有取消,新创建的那么,异步结果并没有完成初始化。如果IO操作完成,不论操作结果成功,失败或取消,异步结果将会标记为完成,同时携带更多的精确信息,比如失败的原因。需要注意的时,失败或取消也属于完成状态。强烈建议使用添加监听器的方式等待IO操作结果,而不await方法,因为监听器模式时非阻塞的,有更好的性能和资源利用率。
再来看一通道结果监听器:
从上面来看,通道结果监听器内部有3个监听器,分别为在操作完成时,关闭通道任务关联的通道的监听器CLOSE;当IO操作失败时,关闭通道任务关联的通道的监听器CLOSE_ON_FAILURE;转发通道任务异常到Channel管道的监听器FIRE_EXCEPTION_ON_FAILURE。
来看一下可写的通道结果ChannelPromise
从Promise任务定义可以看出,继承了任务Future,但多了以便标记成功、失败和不可取消的方法。
再来看一下
从上可以看出,ChannelPromise与ChannelFuture的不同在于ChannelPromise可以标记任务结果。
总结:
netty的异步结果Future继承于JUC的Future,可以异步获取IO操作的结果信息,比如IO操作是否成功完成,如果失败,可以获取失败的原因,是否取消,同时可以使用cancel方法取消IO操作,添加异步结果监听器,、监听IO操作是否完成,并可以移除结果监听器,除这些之外我们还可以异步、同步等待或超时等待IO操作结果。
异步结果监听器GenericFutureListener,主要监听一个IO操作是否完成,在异步结果有返回值时,通知监听器。
ChannelFuture继承于空异步结果,即没有返回值,所以添加移除监听器,同步异步等待方法为空体。netty所有的IO操作都是异步的,当一个IO操作开始时,不管操作是否完成,一个新的异步操作结果将会被创建。如果因为IO操作没有完成,同时既没有成功,失败,也没有取消,新创建的那么,异步结果并没有完成初始化。如果IO操作完成,不论操作结果成功,失败或取消,异步结果将会标记为完成,同时携带更多的精确信息,比如失败的原因。需要注意的时,失败或取消也属于完成状态。强烈建议使用添加监听器的方式等待IO操作结果,而不await方法,因为监听器模式时非阻塞的,有更好的性能和资源利用率。
通道结果监听器ChannelFutureListener内部有3个监听器,分别为在操作完成时,关闭通道任务关联的通道的监听器CLOSE;当IO操作失败时,关闭通道任务关联的通道的监听器CLOSE_ON_FAILURE;转发通道任务异常到Channel管道的监听器FIRE_EXCEPTION_ON_FAILURE。
Promise任务继承了任务Future,但多了以便标记成功、失败和不可取消的方法。
ChannelPromise与ChannelFuture的不同在于ChannelPromise可以标记任务结果。
ChannelProgressivePromise与ProgressivePromise,ChannelProgressiveFuture的关系与ChannelPromise与Promise,ChannelFuture的关系类似,只不过ChannelPromise表示异步操作任务,ChannelProgressivePromise表示异步任务的进度,同时Promise类型异步任务都是可写的。
附:
ChannelProgressivePromise接口,简单看一下:
//ChannelProgressivePromise
//ProgressivePromise
//GenericProgressiveFutureListener
//ProgressiveFuture
//ChannelProgressiveFuture
引言:
上一篇看了Channel管道线的父类接口Inboudn/Outbound通道Invoker定义,先来回顾一下:
每个通道Channel拥有自己的管道Pipeline,当通道创建时,管道自动创建,默认为DefaultChannelPipeline。Inbound通道Invoker ChannelInboundInvoker主要是触发管道线ChannelPipeline上的下一个Inbound通道处理器ChannelInboundHandler的相关方法。ChannelInboundInvoker有点Mina过滤器的意味。Outbound通道Invoker ChannelOutboundInvoker主要是触发触发管道线ChannelPipeline上的下一个Outbound通道处理器ChannelOnboundHandler的相关方法,同时增加了一下通道任务创建方法,
ChannelOutboundInvoker也有点Mina过滤器的意味,只不过不像ChannelInboundInvoker的方法命名那么相似。
在Outbound通道Invoker的方法定义中,我们看到有很多类型的返回异步任务,
比如:ChannelFuture,ChannelPromise,ChannelProgressivePromise,我们来看一下这些异步任务:
注意异步任务和异步结果的含义要结合上下文来看。
//ChannelFuture
public interface ChannelFuture extends Future<Void>
再看ChannelFuture之前,先看Future接口定义
import java.util.concurrent.CancellationException; import java.util.concurrent.TimeUnit; /** * The result of an asynchronous operation. 一个异步操作接口,从定义来看继承与JUC的Future */ @SuppressWarnings("ClassNameSameAsAncestorName") public interface Future<V> extends java.util.concurrent.Future<V> { /** * Returns {@code true} if and only if the I/O operation was completed * successfully. 如果一个IO操作是否成功完成,返回ture */ boolean isSuccess(); /** * returns {@code true} if and only if the operation can be cancelled via {@link #cancel(boolean)}. 如果一个操作通过cancel方法取消,则返回true */ boolean isCancellable(); /** * Returns the cause of the failed I/O operation if the I/O operation has * failed. *如果IO操作失败,则返回失败原因 * @return the cause of the failure. * {@code null} if succeeded or this future is not * completed yet. */ Throwable cause(); /** * Adds the specified listener to this future. The * specified listener is notified when this future is * {@linkplain #isDone() done}. If this future is already * completed, the specified listener is notified immediately. 添加任务监听器。当操作完成时,通知监听器。如果操作已经完成,则将立刻通知任务监听器。 */ Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); /** * Adds the specified listeners to this future. The * specified listeners are notified when this future is * {@linkplain #isDone() done}. If this future is already * completed, the specified listeners are notified immediately. 与上述方法类似,可以一次添加多个监听器 */ Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); /** * Removes the first occurrence of the specified listener from this future. * The specified listener is no longer notified when this * future is {@linkplain #isDone() done}. If the specified * listener is not associated with this future, this method * does nothing and returns silently. 从异步任务移除监听器。当操作完成时,不在通知监听器。如果监听器与当前异步任务没有关联, 则此方什么都不做 */ Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); /** * Removes the first occurrence for each of the listeners from this future. * The specified listeners are no longer notified when this * future is {@linkplain #isDone() done}. If the specified * listeners are not associated with this future, this method * does nothing and returns silently. 与上述方法类似,可以一次移动多个监听器 */ Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); /** * Waits for this future until it is done, and rethrows the cause of the failure if this future * failed. 等待异步任务,直到操作完成,如果操作失败,则重新抛出失败的原因。 */ Future<V> sync() throws InterruptedException; /** * Waits for this future until it is done, and rethrows the cause of the failure if this future * failed. 以不可中断方式,等待异步任务,直到操作完成,如果操作失败,则重新抛出失败的原因。 */ Future<V> syncUninterruptibly(); /** * Waits for this future to be completed. *等待操作完成 * @throws InterruptedException * if the current thread was interrupted */ Future<V> await() throws InterruptedException; /** * Waits for this future to be completed without * interruption. This method catches an {@link InterruptedException} and * discards it silently. 以不可中断方法等待操作结果,如果遇到中断异常,则直接丢弃 */ Future<V> awaitUninterruptibly(); /** * Waits for this future to be completed within the * specified time limit. *超时等待操作完成 * @return {@code true} if and only if the future was completed within * the specified time limit * * @throws InterruptedException * if the current thread was interrupted */ boolean await(long timeout, TimeUnit unit) throws InterruptedException; /** * Waits for this future to be completed within the * specified time limit. *超时等待,单位毫秒 * @return {@code true} if and only if the future was completed within * the specified time limit * * @throws InterruptedException * if the current thread was interrupted */ boolean await(long timeoutMillis) throws InterruptedException; /** * Waits for this future to be completed within the * specified time limit without interruption. This method catches an * {@link InterruptedException} and discards it silently. *超时不可中断等待操作结果 * @return {@code true} if and only if the future was completed within * the specified time limit */ boolean awaitUninterruptibly(long timeout, TimeUnit unit); /** * Waits for this future to be completed within the * specified time limit without interruption. This method catches an * {@link InterruptedException} and discards it silently. *超时不可中断等待操作结果,单位毫秒 * @return {@code true} if and only if the future was completed within * the specified time limit */ boolean awaitUninterruptibly(long timeoutMillis); /** * Return the result without blocking. If the future is not done yet this will return {@code null}. * * As it is possible that a {@code null} value is used to mark the future as successful you also need to check * if the future is really done with {@link #isDone()} and not relay on the returned {@code null} value. */ V getNow(); /** * {@inheritDoc} * * If the cancellation was successful it will fail the future with an {@link CancellationException}. 如果取消成功,返回一个取消异常的失败结果。 */ @Override boolean cancel(boolean mayInterruptIfRunning); }
从Netty的异步任务Future定义来看,继承于JUC的Future,可以异步获取IO操作的结果信息,比如操作是否成功完成,如果失败,可以获取失败的原因,是否取消,同时可以使用cancel方法取消IO操作,添加异步任务监听器,、监听IO操作是否完成,同时可以移除任务监听器,除这些之外我们可以异步、同步等待或超时等待IO操作结果。
来看一下结果监听器:
import java.util.EventListener; /** * Listens to the result of a {@link Future}. The result of the asynchronous operation is notified once this listener * is added by calling {@link Future#addListener(GenericFutureListener)}. 监听IO操作异步结果。只要监听器被添加的异步任务中,异步操作完成,将会通知监听器。 */ public interface GenericFutureListener<F extends Future<?>> extends EventListener { /** * Invoked when the operation associated with the {@link Future} has been completed. 当异步任务关联的IO操作完成时,触发operationComplete方法 * * @param future the source {@link Future} which called this callback */ void operationComplete(F future) throws Exception; }
从上面来异步任务监听器,主要监听一个IO操作是否完成,在异步任务有返回值时,通知监听器。
下面我们来看通道异步任务:
import io.netty.bootstrap.Bootstrap; import io.netty.util.concurrent.BlockingOperationException; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import java.util.concurrent.TimeUnit; /** * The result of an asynchronous {@link Channel} I/O operation. ChannelFuture为一个通道的异步IO操作结果 * <p> * All I/O operations in Netty are asynchronous. It means any I/O calls will * return immediately with no guarantee that the requested I/O operation has * been completed at the end of the call. Instead, you will be returned with * a {@link ChannelFuture} instance which gives you the information about the * result or status of the I/O operation. Netty所有的IO操作都是异步的。意味着所有IO操作在不能保证在调用结束后,IO请求操作完成 情况下,立刻返回。然而,你可以返回一个异步结果实例,可以等待异步IO操作的结果或IO状态。 * <p> * A {@link ChannelFuture} is either [i]uncompleted[/i] or [i]completed[/i]. * When an I/O operation begins, a new future object is created. The new future * is uncompleted initially - it is neither succeeded, failed, nor cancelled * because the I/O operation is not finished yet. If the I/O operation is * finished either successfully, with failure, or by cancellation, the future is * marked as completed with more specific information, such as the cause of the * failure. Please note that even failure and cancellation belong to the * completed state. 当一个IO操作开始时,不管操作是否完成,一个新的异步操作结果将会被创建。 如果因为IO操作没有完成,同时既没有成功,失败,也没有取消,新创建的 异步结果并没有完成初始化。如果IO操作完成,不论操作结果成功,失败或取消, 异步结果将会标记为完成,同时携带更多的精确信息,比如失败的原因。需要注意的时, 失败或取消也属于完成状态。 * <pre> * +---------------------------+ * | Completed successfully | * +---------------------------+ * +----> isDone() = true | * +--------------------------+ | | isSuccess() = true | * | Uncompleted | | +===========================+ * +--------------------------+ | | Completed with failure | * | isDone() = false | | +---------------------------+ * | isSuccess() = false |----+----> isDone() = true | * | isCancelled() = false | | | cause() = non-null | * | cause() = null | | +===========================+ * +--------------------------+ | | Completed by cancellation | * | +---------------------------+ * +----> isDone() = true | * | isCancelled() = true | * +---------------------------+ * </pre> * * Various methods are provided to let you check if the I/O operation has been * completed, wait for the completion, and retrieve the result of the I/O * operation. It also allows you to add {@link ChannelFutureListener}s so you * can get notified when the I/O operation is completed. 异步结果提供不同的方法,用于检查IO操作是否完成,等待操作完成,获取IO操作结果。 同时运行添加通道结果监听器,以便可以在IO操作完成时获取通知。 * * <h3>Prefer {@link #addListener(GenericFutureListener)} to {@link #await()}</h3> * * It is recommended to prefer {@link #addListener(GenericFutureListener)} to * {@link #await()} wherever possible to get notified when an I/O operation is * done and to do any follow-up tasks. 强烈建议使用添加监听器的方式,而不是等待方式,等待IO操作完成,同时可以做一下一些任务。 * <p> * {@link #addListener(GenericFutureListener)} is non-blocking. It simply adds * the specified {@link ChannelFutureListener} to the {@link ChannelFuture}, and * I/O thread will notify the listeners when the I/O operation associated with * the future is done. {@link ChannelFutureListener} yields the best * performance and resource utilization because it does not block at all, but * it could be tricky to implement a sequential logic if you are not used to * event-driven programming. 添加监听器是非阻塞的。仅仅简单地添加一个通道结果监听器到异步监听结果,当IO操作关联的 异步任务完成时,IO线程将会通知监听器。通道结果监听器因为是非阻塞的,所以有更好的性能和资源利用率,如果你不使用事件驱动编程,可以实现一个时间顺序的逻辑。 * <p> * By contrast, {@link #await()} is a blocking operation. Once called, the * caller thread blocks until the operation is done. It is easier to implement * a sequential logic with {@link #await()}, but the caller thread blocks * unnecessarily until the I/O operation is done and there's relatively * expensive cost of inter-thread notification. Moreover, there's a chance of * dead lock in a particular circumstance, which is described below. 相比之下,await方式是一个阻塞操作。一旦调用,调用线程将会阻塞到IO操作完成。 使用await可以很容易实现一个时序的逻辑,但是调用线程不需要阻塞到IO操作完成,这种 方式相对于内部线程通知,代价比较大。更进一步说,在特殊的循环下,有可能出现死锁情况, 具体描述如下: * * <h3>Do not call {@link #await()} inside {@link ChannelHandler}</h3> * <p>不要在通道处理器中调用await方法 * The event handler methods in {@link ChannelHandler} are usually called by * an I/O thread. If {@link #await()} is called by an event handler * method, which is called by the I/O thread, the I/O operation it is waiting * for might never complete because {@link #await()} can block the I/O * operation it is waiting for, which is a dead lock. 在通道处理器内,通常有IO线程调用事件处理方法。如果await方法被IO线程调用事件处理方法调用,IO操作将会等待,同时可能因为await方法阻塞IO操作正在等待的条件,可能导致死锁,进而Io操作不能完成。 * <pre> * // BAD - NEVER DO THIS ,坚决不要用await方式 * {@code @Override} * public void channelRead({@link ChannelHandlerContext} ctx, Object msg) { * {@link ChannelFuture} future = ctx.channel().close(); * future.awaitUninterruptibly(); * // Perform post-closure operation * // ... * } * * // GOOD * {@code @Override} 建议方式,添加通道结果监听器 * public void channelRead({@link ChannelHandlerContext} ctx, Object msg) { * {@link ChannelFuture} future = ctx.channel().close(); * future.addListener(new {@link ChannelFutureListener}() { * public void operationComplete({@link ChannelFuture} future) { * // Perform post-closure operation * // ... * } * }); * } * </pre> * <p> * In spite of the disadvantages mentioned above, there are certainly the cases * where it is more convenient to call {@link #await()}. In such a case, please * make sure you do not call {@link #await()} in an I/O thread. Otherwise, * {@link BlockingOperationException} will be raised to prevent a dead lock. 尽管await方法有诸多缺点,但在其他一些场景中,释放await方法,非常便利。在这些场景中, 要确保不在IO线程中,调用await方法。否则阻塞操作异常将会抛出,以阻止死锁的产生。 * * <h3>Do not confuse I/O timeout and await timeout</h3> *不要混淆IO超时和超时等待。 * The timeout value you specify with {@link #await(long)}, * {@link #await(long, TimeUnit)}, {@link #awaitUninterruptibly(long)}, or * {@link #awaitUninterruptibly(long, TimeUnit)} are not related with I/O * timeout at all. If an I/O operation times out, the future will be marked as * 'completed with failure,' as depicted in the diagram above. For example, * connect timeout should be configured via a transport-specific option: 在await*(*)方法中的超时时间与IO超时一点关系也没有。如果一个IO操作超时,异步结果 将被标记为失败并完成,如果上图中的描述。比如,连接超时应该通过transport配置。 * <pre> * // BAD - NEVER DO THIS 坚决不要使用这种方式 * {@link Bootstrap} b = ...; * {@link ChannelFuture} f = b.connect(...); * f.awaitUninterruptibly(10, TimeUnit.SECONDS); * if (f.isCancelled()) { * // Connection attempt cancelled by user * } else if (!f.isSuccess()) { * // You might get a NullPointerException here because the future * // might not be completed yet. * f.cause().printStackTrace(); * } else { * // Connection established successfully * } * * // GOOD 建议方式 * {@link Bootstrap} b = ...; * // Configure the connect timeout option. * <b>b.option({@link ChannelOption}.CONNECT_TIMEOUT_MILLIS, 10000);</b> * {@link ChannelFuture} f = b.connect(...); * f.awaitUninterruptibly(); * * // Now we are sure the future is completed. * assert f.isDone(); * * if (f.isCancelled()) { * // Connection attempt cancelled by user * } else if (!f.isSuccess()) { * f.cause().printStackTrace(); * } else { * // Connection established successfully * } * </pre> */ public interface ChannelFuture extends Future<Void> { /** * Returns a channel where the I/O operation associated with this * future takes place. 返回异步结果关联IO操作所在的通道 */ Channel channel(); //添加移除监听器,同步异步等待方法为空体,因为ChannelFuture继承与空异步结果,即没有返回值 @Override ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelFuture sync() throws InterruptedException; @Override ChannelFuture syncUninterruptibly(); @Override ChannelFuture await() throws InterruptedException; @Override ChannelFuture awaitUninterruptibly(); /** * Returns {@code true} if this {@link ChannelFuture} is a void future and so not allow to call any of the * following methods: 如果通道异步结果为void,返回ture,并不允许调用下面方法 * [list] * [*]{@link #addListener(GenericFutureListener)} * [*]{@link #addListeners(GenericFutureListener[])} * [*]{@link #await()} * [*]{@link #await(long, TimeUnit)} ()} * [*]{@link #await(long)} ()} * [*]{@link #awaitUninterruptibly()} * [*]{@link #sync()} * [*]{@link #syncUninterruptibly()} * [/list] */ boolean isVoid(); }
从上面可以看出,因为ChannelFuture继承于空异步结果,即没有返回值,所以添加移除监听器,同步异步等待方法为空体。netty所有的IO操作都是异步的,当一个IO操作开始时,不管操作是否完成,一个新的异步操作结果将会被创建。如果因为IO操作没有完成,同时既没有成功,失败,也没有取消,新创建的那么,异步结果并没有完成初始化。如果IO操作完成,不论操作结果成功,失败或取消,异步结果将会标记为完成,同时携带更多的精确信息,比如失败的原因。需要注意的时,失败或取消也属于完成状态。强烈建议使用添加监听器的方式等待IO操作结果,而不await方法,因为监听器模式时非阻塞的,有更好的性能和资源利用率。
再来看一通道结果监听器:
package io.netty.channel; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; /** * Listens to the result of a {@link ChannelFuture}. The result of the * asynchronous {@link Channel} I/O operation is notified once this listener * is added by calling {@link ChannelFuture#addListener(GenericFutureListener)}. *通道结果监听器ChannelFutureListener,监听通道任务的结果。一旦监听器被添加到通道任务中, 当通道的异步IO操作完成时,将会通知监听器。 * <h3>Return the control to the caller quickly</h3> *快速地将控制权交给调用者 * {@link #operationComplete(Future)} is directly called by an I/O * thread. Therefore, performing a time consuming task or a blocking operation * in the handler method can cause an unexpected pause during I/O. If you need * to perform a blocking operation on I/O completion, try to execute the * operation in a different thread using a thread pool. #operationComplete直接通过IO线程调用。因此在IO操作过程中,执行一个耗时的任务或者阻塞操作在处理方法中,可能引起一个不期望的异常抛出。如果你需要执行一个阻塞操作在IO操作完成时,尝试在一个线程池中的不同线程执行操作。 */ public interface ChannelFutureListener extends GenericFutureListener<ChannelFuture> { /** * A {@link ChannelFutureListener} that closes the {@link Channel} which is * associated with the specified {@link ChannelFuture}. 在操作完成时,关闭通道任务关联的通道 */ ChannelFutureListener CLOSE = new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { future.channel().close(); } }; /** * A {@link ChannelFutureListener} that closes the {@link Channel} when the * operation ended up with a failure or cancellation rather than a success. 当IO操作失败时,关闭通道任务关联的通道 */ ChannelFutureListener CLOSE_ON_FAILURE = new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { if (!future.isSuccess()) { future.channel().close(); } } }; /** * A {@link ChannelFutureListener} that forwards the {@link Throwable} of the {@link ChannelFuture} into the * {@link ChannelPipeline}. This mimics the old behavior of Netty 3. 转发通道任务异常到Channel管道。默认Netty3的行为。 */ ChannelFutureListener FIRE_EXCEPTION_ON_FAILURE = new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { if (!future.isSuccess()) { future.channel().pipeline().fireExceptionCaught(future.cause()); } } }; // Just a type alias }
从上面来看,通道结果监听器内部有3个监听器,分别为在操作完成时,关闭通道任务关联的通道的监听器CLOSE;当IO操作失败时,关闭通道任务关联的通道的监听器CLOSE_ON_FAILURE;转发通道任务异常到Channel管道的监听器FIRE_EXCEPTION_ON_FAILURE。
来看一下可写的通道结果ChannelPromise
/** * Special {@link ChannelFuture} which is writable. */ public interface ChannelPromise extends ChannelFuture, Promise<Void> { 在往下看之前,来看Promise接口定义: /** * Special {@link Future} which is writable. 可写的Future */ public interface Promise<V> extends Future<V> { /** * Marks this future as a success and notifies all * listeners. 标记任务成功,通知所有监听器 * * If it is success or failed already it will throw an {@link IllegalStateException}. 如果任务已经成功完成或失败,则抛出非法状态异常 */ Promise<V> setSuccess(V result); /** * Marks this future as a success and notifies all * listeners. *标记任务成功,通知所有监听器 * @return {@code true} if and only if successfully marked this future as * a success. Otherwise {@code false} because this future is * already marked as either a success or a failure. 成功标记Future为成功完成,则返回true,如果任务已经标记成功完成或失败,则返回false */ boolean trySuccess(V result); /** * Marks this future as a failure and notifies all * listeners. *标记任务失败,通知所有监听器 * If it is success or failed already it will throw an {@link IllegalStateException}. 如果任务已经成功完成或失败,则抛出非法状态异常 */ Promise<V> setFailure(Throwable cause); /** * Marks this future as a failure and notifies all * listeners. *标记任务失败,通知所有监听器 * @return {@code true} if and only if successfully marked this future as * a failure. Otherwise {@code false} because this future is * already marked as either a success or a failure. 成功标记Future为失败完成,则返回true,如果任务已经标记成功完成或失败,则返回false */ boolean tryFailure(Throwable cause); /** * Make this future impossible to cancel. *标记任务不可能取消 * @return {@code true} if and only if successfully marked this future as uncancellable or it is already done * without being cancelled. {@code false} if this future has been cancelled already. 如果成功标记不可取消,或在没有取消的情况下已经标记,则返回true,如果任务已经取消,返回false */ boolean setUncancellable(); //下面方法与Future相同 @Override Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); @Override Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); @Override Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); @Override Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); @Override Promise<V> await() throws InterruptedException; @Override Promise<V> awaitUninterruptibly(); @Override Promise<V> sync() throws InterruptedException; @Override Promise<V> syncUninterruptibly(); }
从Promise任务定义可以看出,继承了任务Future,但多了以便标记成功、失败和不可取消的方法。
再来看一下
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; /** * Special {@link ChannelFuture} which is writable. */ public interface ChannelPromise extends ChannelFuture, Promise<Void> { @Override Channel channel(); @Override ChannelPromise setSuccess(Void result); ChannelPromise setSuccess(); boolean trySuccess(); @Override ChannelPromise setFailure(Throwable cause); @Override ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelPromise sync() throws InterruptedException; @Override ChannelPromise syncUninterruptibly(); @Override ChannelPromise await() throws InterruptedException; @Override ChannelPromise awaitUninterruptibly(); /** * Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself. 如果isVoid返回true,而不它自己,则返回一个新的ChannelPromise */ ChannelPromise unvoid(); }
从上可以看出,ChannelPromise与ChannelFuture的不同在于ChannelPromise可以标记任务结果。
总结:
netty的异步结果Future继承于JUC的Future,可以异步获取IO操作的结果信息,比如IO操作是否成功完成,如果失败,可以获取失败的原因,是否取消,同时可以使用cancel方法取消IO操作,添加异步结果监听器,、监听IO操作是否完成,并可以移除结果监听器,除这些之外我们还可以异步、同步等待或超时等待IO操作结果。
异步结果监听器GenericFutureListener,主要监听一个IO操作是否完成,在异步结果有返回值时,通知监听器。
ChannelFuture继承于空异步结果,即没有返回值,所以添加移除监听器,同步异步等待方法为空体。netty所有的IO操作都是异步的,当一个IO操作开始时,不管操作是否完成,一个新的异步操作结果将会被创建。如果因为IO操作没有完成,同时既没有成功,失败,也没有取消,新创建的那么,异步结果并没有完成初始化。如果IO操作完成,不论操作结果成功,失败或取消,异步结果将会标记为完成,同时携带更多的精确信息,比如失败的原因。需要注意的时,失败或取消也属于完成状态。强烈建议使用添加监听器的方式等待IO操作结果,而不await方法,因为监听器模式时非阻塞的,有更好的性能和资源利用率。
通道结果监听器ChannelFutureListener内部有3个监听器,分别为在操作完成时,关闭通道任务关联的通道的监听器CLOSE;当IO操作失败时,关闭通道任务关联的通道的监听器CLOSE_ON_FAILURE;转发通道任务异常到Channel管道的监听器FIRE_EXCEPTION_ON_FAILURE。
Promise任务继承了任务Future,但多了以便标记成功、失败和不可取消的方法。
ChannelPromise与ChannelFuture的不同在于ChannelPromise可以标记任务结果。
ChannelProgressivePromise与ProgressivePromise,ChannelProgressiveFuture的关系与ChannelPromise与Promise,ChannelFuture的关系类似,只不过ChannelPromise表示异步操作任务,ChannelProgressivePromise表示异步任务的进度,同时Promise类型异步任务都是可写的。
附:
ChannelProgressivePromise接口,简单看一下:
//ChannelProgressivePromise
package io.netty.channel; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.ProgressivePromise; /** * Special {@link ChannelPromise} which will be notified once the associated bytes is transferring. 当关联的字节数据正在传输时,ChannelProgressivePromise将会被通知 */ public interface ChannelProgressivePromise extends ProgressivePromise<Void>, ChannelProgressiveFuture, ChannelPromise { @Override ChannelProgressivePromise addListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelProgressivePromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelProgressivePromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelProgressivePromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelProgressivePromise sync() throws InterruptedException; @Override ChannelProgressivePromise syncUninterruptibly(); @Override ChannelProgressivePromise await() throws InterruptedException; @Override ChannelProgressivePromise awaitUninterruptibly(); @Override ChannelProgressivePromise setSuccess(Void result); @Override ChannelProgressivePromise setSuccess(); @Override ChannelProgressivePromise setFailure(Throwable cause); @Override ChannelProgressivePromise setProgress(long progress, long total); @Override ChannelProgressivePromise unvoid(); }
//ProgressivePromise
/** * Special {@link ProgressiveFuture} which is writable. 可写的过程任务。 */ public interface ProgressivePromise<V> extends Promise<V>, ProgressiveFuture<V> { /** * Sets the current progress of the operation and notifies the listeners that implement * {@link GenericProgressiveFutureListener}. 设置当前操作进程,并通知监听器GenericProgressiveFutureListener */ ProgressivePromise<V> setProgress(long progress, long total); /** * Tries to set the current progress of the operation and notifies the listeners that implement * {@link GenericProgressiveFutureListener}. If the operation is already complete or the progress is out of range, * this method does nothing but returning {@code false}. 设置当前操作进程,并通知监听器GenericProgressiveFutureListener。如果此操作已经完成,进度已经超过范围, 此方法不会做任何事情,仅仅返回false。 */ boolean tryProgress(long progress, long total); @Override ProgressivePromise<V> setSuccess(V result); @Override ProgressivePromise<V> setFailure(Throwable cause); @Override ProgressivePromise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); @Override ProgressivePromise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); @Override ProgressivePromise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); @Override ProgressivePromise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); @Override ProgressivePromise<V> await() throws InterruptedException; @Override ProgressivePromise<V> awaitUninterruptibly(); @Override ProgressivePromise<V> sync() throws InterruptedException; @Override ProgressivePromise<V> syncUninterruptibly(); }
//GenericProgressiveFutureListener
package io.netty.util.concurrent; public interface GenericProgressiveFutureListener<F extends ProgressiveFuture<?>> extends GenericFutureListener<F> { /** * Invoked when the operation has progressed. *操作已经达到所设的进度 * @param progress the progress of the operation so far (cumulative),操作当前进度 * @param total the number that signifies the end of the operation when {@code progress} reaches at it. * {@code -1} if the end of operation is unknown. total,当操作完成时达到的进度,如果操作的结束点不确定,则为-1 */ void operationProgressed(F future, long progress, long total) throws Exception; }
//ProgressiveFuture
** * A {@link Future} which is used to indicate the progress of an operation. 表示一个操作的进度 */ public interface ProgressiveFuture<V> extends Future<V> { @Override ProgressiveFuture<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); @Override ProgressiveFuture<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); @Override ProgressiveFuture<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); @Override ProgressiveFuture<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); @Override ProgressiveFuture<V> sync() throws InterruptedException; @Override ProgressiveFuture<V> syncUninterruptibly(); @Override ProgressiveFuture<V> await() throws InterruptedException; @Override ProgressiveFuture<V> awaitUninterruptibly(); }
//ChannelProgressiveFuture
/** * An special {@link ChannelFuture} which is used to indicate the {@link FileRegion} transfer progress 表示一个文件region的传输进度 */ public interface ChannelProgressiveFuture extends ChannelFuture, ProgressiveFuture<Void> { @Override ChannelProgressiveFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelProgressiveFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelProgressiveFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelProgressiveFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelProgressiveFuture sync() throws InterruptedException; @Override ChannelProgressiveFuture syncUninterruptibly(); @Override ChannelProgressiveFuture await() throws InterruptedException; @Override ChannelProgressiveFuture awaitUninterruptibly(); }
发表评论
-
netty NioSocketChannel解析
2017-09-29 12:50 1324netty 抽象BootStrap定义:http://dona ... -
netty Pooled字节buf分配器
2017-09-28 13:00 2063netty 字节buf定义:http://donald-dra ... -
netty Unpooled字节buf分配器
2017-09-26 22:01 2453netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf分配器
2017-09-26 08:43 1320netty 字节buf定义:http:// ... -
netty 复合buf概念
2017-09-25 22:31 1319netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf引用计数器
2017-09-22 12:48 1600netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf解析
2017-09-22 09:00 1848netty 通道接口定义:http://donald-drap ... -
netty 资源泄漏探测器
2017-09-21 09:37 1400netty 通道接口定义:http://donald-drap ... -
netty 字节buf定义
2017-09-20 08:31 2840netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置后续
2017-09-18 08:36 2186netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置初始化
2017-09-17 22:51 2042netty 通道接口定义:http://donald-drap ... -
netty 通道配置接口定义
2017-09-17 14:51 1082netty 通道接口定义:http://donald-drap ... -
netty NioServerSocketChannel解析
2017-09-16 13:01 1880netty ServerBootStrap解析:http:// ... -
netty 抽象nio消息通道
2017-09-15 15:30 1222netty 通道接口定义:http:/ ... -
netty 抽象nio字节通道
2017-09-14 22:39 1205netty 通道接口定义:http:/ ... -
netty 抽象nio通道解析
2017-09-14 17:23 960netty 通道接口定义:http://donald-drap ... -
netty 抽象通道后续
2017-09-13 22:40 1313netty Inboudn/Outbound通道Inv ... -
netty 通道Outbound缓冲区
2017-09-13 14:31 2193netty 通道接口定义:http:/ ... -
netty 抽象Unsafe定义
2017-09-12 21:24 1085netty 通道接口定义:http:/ ... -
netty 抽象通道初始化
2017-09-11 12:56 1859netty 管道线定义-ChannelPipeline:htt ...
相关推荐
其ChannelFuture和Promise机制,使得异步操作的链式调用变得简单,同时提供了灵活的错误处理策略。 5. **零拷贝**:Netty实现了操作系统层面的零拷贝,通过直接缓冲区和FileChannel的transferTo方法,减少了数据在...
Netty并非是Java标准库中的部分,而是由LinkedIn开源的一个异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。 集成Netty到Spring Boot项目中,可以利用Netty的非阻塞I/O模型和高度可...
7. **ChannelFuture和Promise**: ChannelFuture表示某个I/O操作的异步结果,而Promise是其子类,用于在事件完成时设置结果或监听事件。这为异步编程提供了便利。 8. **协议支持**: Netty支持多种网络协议,包括HTTP...
5. **EventLoop**:事件循环是Netty的异步基础,它负责调度任务并处理事件。每个Worker线程对应一个EventLoop。 6. **ChannelFuture**和**ChannelPromise**:用于异步操作的接口,ChannelFuture表示某个I/O操作的...
通过“Netty-API-文档中文版”,你可以详细了解到如何创建ServerBootstrap、Bootstrap、ChannelFuture、ChannelInboundHandlerAdapter等核心组件,以及如何配置和使用各种编解码器。文档还会介绍如何进行连接管理、...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。这个“netty-netty-4.1.66.Final.tar.gz”压缩包包含了Netty的4.1.66.Final稳定版本,这是一个广泛使用的Java...
在使用 Netty API 4.1 的过程中,开发者可以利用 ChannelFuture 和 Promise 来异步地处理网络操作,它们代表了未来的操作结果,可以注册监听器来获取操作完成的通知。同时,Netty 提供了丰富的工具类,如 ByteBuf,...
例如,它的 ChannelFuture 对象允许异步操作,并且可以链式调用,使得代码更清晰。 4. **可伸缩性**:Netty 可以轻松地在单台机器或分布式系统上扩展。其事件模型允许高效地处理大量并发连接。 5. **内存管理**:...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。这个"netty-demo-boot.zip"文件显然包含了使用 Netty 构建的一个演示应用,它很可能是基于 Spring Boot 的,...
10. **ChannelFuture和Promise**:这些概念是Netty异步编程的基础,它们允许在操作完成时注册回调,简化了异步编程的复杂性。 在Eclipse中使用Netty源码,你可以深入了解其内部实现,学习如何构建网络服务,理解...
8. **强大的工具集**:Netty 提供了诸如 ChannelFuture、Promise、ChannelInboundHandlerAdapter 等工具类,使得异步编程变得更加简单。 9. **跨平台兼容**:Netty 是跨平台的,可以在不同的操作系统上运行,并且与...
在本节"Netty源码教程-3"中,我们将深入探讨Netty这一高性能、异步事件驱动的网络应用程序框架。Netty广泛应用于各种分布式系统、服务器和客户端应用,尤其在处理高并发、低延迟的网络通信场景下,其优势尤为突出。...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本文中,我们将深入探讨 Netty 在文件传输中的应用,以及如何使用 Netty 实现文件的上传和下载功能。 1. *...
5. `ChannelFuture`:代表了Channel的异步操作结果,我们可以通过监听`ChannelFuture`来获取操作完成的通知。 6. 多线程优化:Netty通过避免不必要的线程上下文切换和使用非阻塞I/O,提高了系统的并发性能。例如,...
- 通过 ChannelFuture 和 ChannelPromise,Netty 支持异步操作的链式调用和回调机制。 5. **Android 应用中的注意事项** - Android 的内存限制可能需要优化 Netty 配置,比如减少缓冲区大小、限制并发连接数等。 ...
在现代的网络编程中,Netty作为一个高性能、异步事件驱动的网络应用框架,被广泛应用于构建高并发、低延迟的服务器。而SpringBoot作为Spring框架的轻量级启动器,能够简化微服务的开发流程。本篇将详细讲解如何利用...
《Netty权威指南》是一本深入探讨Netty框架的详细教程,旨在帮助读者全面理解并掌握这个高性能、异步事件驱动的网络应用框架。Netty是Java领域内广泛使用的网络编程框架,尤其在开发高并发、低延迟的网络服务时,它...
- 高性能:利用Java NIO实现异步非阻塞I/O操作; - 易用性:提供丰富的API便于开发者快速构建复杂的网络应用; - 可靠性:经过大量测试及社区验证,稳定性较高。 #### 三、手写RPC框架设计与实现 本节将详细...
Netty是一个高性能、异步事件驱动的网络应用框架,它被广泛应用于快速开发高性能协议服务器和客户端。Netty通过高度优化的设计和实现提供了低延迟和高吞吐量的特性,尤其适合于实时通信场景。 #### 二、服务启动...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本文中,我们将深入探讨如何使用 Netty 实现 WebSocket 协议,这是一个基于 TCP 的低延迟、全双工通信协议...