- 浏览: 984555 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
netty 默认Channel管道线-Inbound和Outbound事件处理:http://donald-draper.iteye.com/blog/2389148
netty 通道处理器上下文定义:http://donald-draper.iteye.com/blog/2389214
引言:
前面一篇文章我们主要看了一下通道处理器上下文接口的定义,先来回顾一下:
通道处理器上下文ChannelHandlerContext,使通道处理器可以与管道和管道中其他处理器进行交互。当IO事件发生时,处理可以将事件转发给所属管道的下一个通道处理器,同时可以动态修改处理器所属的管道。通过上下文可以获取关联通道,处理器,事件执行器,上下文名,所属管道等信息。同时可以通过AttributeKey存储上下文属性,用alloc方法获取通道上下文的字节buf分配器,用于分配buf。
今天我们来看上下文的抽象实现。
从上面可以看出抽象通道处理器上下文AbstractChannelHandlerContext,拥有一个前驱和后继上下文,用于在管道中传递IO事件;通道处理器总共有四个状态,分别为初始化,正在添加到管道,已添加管道和从管道移除状态;上下文同时关联一个管道;Inbound和Outbound两个用于判断上下文的类型,决定了上下文是处理器Inbound事件还是Outbound事件;一个事件执行器executor,当上下文执行器不在当前事务循环中时,用于执行IO事件操作;同时有一些延时任务,如上下文读任务,上下文刷新任务,读完成任务和通道可写状态改变任务。上下文构造,主要是初始化上下文name,所属管道,事件执行器,上下文类型。
下面我们来看Inbound事件处理,
先来看上下文处理通道的channelRegistered事件
上述方法有一下几点要看
1.
//判断通道处理器已添加到管道
2.
再来看异常处理
//StackTraceElement
再来看,触发通道处理器异常处理方法exceptionCaught
从上面可以看出,上下文处理通道fireChannelRegistered事件,
如果上下文事件执行器在当前事务循环,则直接在当前线程,执行触发上下文关联通道处理器的channelRegistered事件任务,否则,创建一个线程执行事件任务,并由上下文事务执行器运行;
触发上下文关联通道处理器的channelRegistered事件任务,首先判断上下文是否已经添加到管道,已添加,则触发上下文关联通道处理器的channelRegistered事件,否则转发事件给上下文所属管道的下一个Inbound上下文。如果Inbound事件处理过程中,异常发生,首先检查异常是不是通道处理器的exceptionCaught方法抛出,是,则直接log,否则触发上下文关联通道处理器的exceptionCaught事件。其他Inbound事件的处理过程与fireChannelRegistered方法思路相同,
只不过触发的是通道处理器的相应事件;
前面的文章已讲,可以参考
netty 默认Channel管道线-Inbound和Outbound事件处理:http://donald-draper.iteye.com/blog/2389148
我们这里不再赘述。
再来看Outbound地址绑定事件的处理:
再来看一下寻找Outbound处理器:
最后来看一下异常处理
//PromiseNotificationUtil
从上面可以看出上下文处理关联通道处理器的地址绑定bind事件,首先从所属管道上下文链的尾部开始,寻找Outbound上下文,找到后,获取上下文的事件执行器,如果事件执行器线程在当前事件循环中,则触发上下文关联通道处理器地址绑定事件,否则创建一个线程,执行事件触发操作,并交由事件执行器执行;触发上下文关联通道处理器地址绑定事件,首先判断上下文关联通道处理器是否已经添加到管道,如果以添加,则触发Outbound通道处理器的bind事件方法,否则,传递地址绑定事件给管道中的下一个Outbound上下文。如果在Outbound事件处理器过程中,出现异常则直接委托给异步任务结果通知工具PromiseNotificationUtil,通知异步任务
失败,并log异常日志。
上下文,处理其他Outbound事件的思路,基本相同,
前面文章已将,可以参考
netty 默认Channel管道线-Inbound和Outbound事件处理:http://donald-draper.iteye.com/blog/2389148
我们这里不再赘述。
再来看其他方法,很简单,不用说了:
再来通道处理器上下文的默认实现:
通道处理器上下文默认实现DefaultChannelHandlerContext内部关联一个通道处理器。
总结:
抽象通道处理器上下文AbstractChannelHandlerContext,拥有一个前驱和后继上下文,用于在管道中传递IO事件;通道处理器总共有四个状态,分别为初始化,正在添加到管道,已添加管道和从管道移除状态;上下文同时关联一个管道;Inbound和Outbound两个用于判断上下文的类型,决定了上下文是处理器Inbound事件还是Outbound事件;一个事件执行器executor,当上下文执行器不在当前事务循环中时,用于执行IO事件操作;同时有一些延时任务,如上下文读任务,上下文刷新任务,读完成任务和通道可写状态改变任务。上下文构造,主要是初始化上下文name,所属管道,事件执行器,上下文类型。上下文关联的通道通道处理器在具体的实现中定义,比如通道处理器上下文默认实现为DefaultChannelHandlerContext,内部关联一个通道处理器。
上下文处理通道fireChannelRegistered事件,如果上下文事件执行器在当前事务循环,则直接在当前线程,执行触发上下文关联通道处理器的channelRegistered事件任务,否则,创建一个线程执行事件任务,并由上下文事务执行器运行;触发上下文关联通道处理器的channelRegistered事件任务,首先判断上下文是否已经添加到管道,已添加,则触发
上下文关联通道处理器的channelRegistered事件,否则转发事件给上下文所属管道的下一个Inbound上下文。其他Inbound事件的处理过程与fireChannelRegistered方法思路相同,只不过触发的是通道处理器的相应事件;如果Inbound事件处理过程中,异常发生,首先检查异常是不是通道处理器的exceptionCaught方法抛出,是,则直接log,否则触发上下文关联通道处理器的exceptionCaught事件。
上下文处理关联通道处理器的地址绑定bind事件,首先从所属管道上下文链的尾部开始,
寻找Outbound上下文,找到后,获取上下文的事件执行器,如果事件执行器线程在当前事件循环中,则触发上下文关联通道处理器地址绑定事件,否则创建一个线程,执行事件触发操作,并交由事件执行器执行;触发上下文关联通道处理器地址绑定事件,首先判断上下文关联通道处理器是否已经添加到管道,如果以添加,则触发Outbound通道处理器的bind事件方法,否则,传递地址绑定事件给管道中的下一个Outbound上下文。如果在Outbound事件处理器过程中,出现异常则直接委托给异步任务结果通知工具PromiseNotificationUtil,通知异步任务失败,并log异常日志。
netty的通道处理器上下文和mina的会话有点像,都拥有描述通道Handler的Context;
不同的时mina中的会话与通道直接关联,而netty上下文与通道间是通过Channel管道关联起来,mina中的过滤链是依附于会话,而netty上下文依附于Channel管道,mina中的IO事件执行器为IoProcessor,netty中的IO事件的处理委托给事件循环
或上下文的子事件执行器。
附:
netty 通道处理器上下文定义:http://donald-draper.iteye.com/blog/2389214
引言:
前面一篇文章我们主要看了一下通道处理器上下文接口的定义,先来回顾一下:
通道处理器上下文ChannelHandlerContext,使通道处理器可以与管道和管道中其他处理器进行交互。当IO事件发生时,处理可以将事件转发给所属管道的下一个通道处理器,同时可以动态修改处理器所属的管道。通过上下文可以获取关联通道,处理器,事件执行器,上下文名,所属管道等信息。同时可以通过AttributeKey存储上下文属性,用alloc方法获取通道上下文的字节buf分配器,用于分配buf。
今天我们来看上下文的抽象实现。
import io.netty.buffer.ByteBufAllocator; import io.netty.util.Attribute; import io.netty.util.AttributeKey; import io.netty.util.DefaultAttributeMap; import io.netty.util.Recycler; import io.netty.util.ReferenceCountUtil; import io.netty.util.ResourceLeakHint; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.OrderedEventExecutor; import io.netty.util.internal.PromiseNotificationUtil; import io.netty.util.internal.ThrowableUtil; import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.StringUtil; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.net.SocketAddress; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint { private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class); volatile AbstractChannelHandlerContext next;//上下文后继 volatile AbstractChannelHandlerContext prev;//上下文前驱 //上下文状态 private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState"); /** * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called. 通道处理器handlerAdded事件将要触发 */ private static final int ADD_PENDING = 1; /** * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called. 通道处理器handlerAdded事件已经触发 */ private static final int ADD_COMPLETE = 2; /** * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called. 通道处理器上下文已经从所属管道移除 */ private static final int REMOVE_COMPLETE = 3; /** * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called. 上下文初始化状态 */ private static final int INIT = 0; private final boolean inbound;//Inbound处理器上下文标志 private final boolean outbound;//Outbound处理器上下文标志 private final DefaultChannelPipeline pipeline;//上下文所属管道 private final String name;//上下文名,对应通道处理器添加时的name private final boolean ordered;//事件执行器是否为顺序执行器 // Will be set to null if no child executor should be used, otherwise it will be set to the // child executor. //如果没有子事件执行器可用,为空,否则为子事件执行器 final EventExecutor executor;//事件执行器 private ChannelFuture succeededFuture;//通道异步任务结果 // Lazily instantiated tasks used to trigger events to a handler with different executor. // There is no need to make this volatile as at worse it will just create a few more instances then needed. //延时创建任务用于在不同的执行器中,处理触发事件,这些任务需要为volatile,最糟糕的情况下,仅仅创建比实际需要,多一点的任务 private Runnable invokeChannelReadCompleteTask;//读完成任务线程 private Runnable invokeReadTask;//上下文读任务线程 private Runnable invokeChannelWritableStateChangedTask;//通道可写状态改变任务线程 private Runnable invokeFlushTask;//上下文刷新任务线程 private volatile int handlerState = INIT; AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) { //检查上下为name是否为空 this.name = ObjectUtil.checkNotNull(name, "name"); this.pipeline = pipeline; this.executor = executor; this.inbound = inbound; this.outbound = outbound; // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor. //是否为Ordered事件执行器 ordered = executor == null || executor instanceof OrderedEventExecutor; } }
从上面可以看出抽象通道处理器上下文AbstractChannelHandlerContext,拥有一个前驱和后继上下文,用于在管道中传递IO事件;通道处理器总共有四个状态,分别为初始化,正在添加到管道,已添加管道和从管道移除状态;上下文同时关联一个管道;Inbound和Outbound两个用于判断上下文的类型,决定了上下文是处理器Inbound事件还是Outbound事件;一个事件执行器executor,当上下文执行器不在当前事务循环中时,用于执行IO事件操作;同时有一些延时任务,如上下文读任务,上下文刷新任务,读完成任务和通道可写状态改变任务。上下文构造,主要是初始化上下文name,所属管道,事件执行器,上下文类型。
下面我们来看Inbound事件处理,
先来看上下文处理通道的channelRegistered事件
@Override public ChannelHandlerContext fireChannelRegistered() { invokeChannelRegistered(findContextInbound()); return this; } static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor();//获取上下文事件执行器 //如果事件执行器在当前事务循环,则直接调用上下文invokeChannelRegistered方法 if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { //否则创建一个线程执行上下文invokeChannelRegistered方法,并有上下文事务执行器运行 executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRegistered(); } }); } } //触发通道channelRegistered事件 private void invokeChannelRegistered() { //如果通道处理器已添加到管道 if (invokeHandler()) { try { //触发通道处理器的channelRegistered事件 ((ChannelInboundHandler) handler()).channelRegistered(this); } catch (Throwable t) { //通知异常 notifyHandlerException(t); } } else { //转发事件消息 fireChannelRegistered(); } }
上述方法有一下几点要看
1.
//判断通道处理器已添加到管道
/** * Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called * yet. If not return {@code false} and if called or could not detect return {@code true}. *确保通道处理器的handlerAdded方法已触发。 * If this method returns {@code false} we will not invoke the {@link ChannelHandler} but just forward the event. * This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list * but not called {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}. 如果失败,则不会调用通道处理器的相关事件处理方法,而是转发事件。这种情况主要针对通道处理器已经添加到管道, 但通道处理器handlerAdded方法没有被调用的情况,即通道处理器关联的上下文已经添加管道上下文链,但并没有更新上下文状态 和触发通道处理器的handlerAdded方法。 */ private boolean invokeHandler() { // Store in local variable to reduce volatile reads. int handlerState = this.handlerState; return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING); }
2.
//转发事件消息 Override public ChannelHandlerContext fireChannelRegistered() { //转发事件给上下文所属管道的下一个上下文 invokeChannelRegistered(findContextInbound()); return this; } //获取上下文所属管道的下一个Inbound上下文 private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; }
再来看异常处理
//通知异常 notifyHandlerException(t);
private void notifyHandlerException(Throwable cause) { //判断异常堆栈信息中是否存在exceptionCaught方法信息 if (inExceptionCaught(cause)) { //如果是通道处理器exceptionCaught方法抛出的异常,则直接log if (logger.isWarnEnabled()) { logger.warn( "An exception was thrown by a user handler " + "while handling an exceptionCaught event", cause); } return; } //否则,触发通道处理器异常处理方法exceptionCaught invokeExceptionCaught(cause); }
//判断异常堆栈信息中是否存在exceptionCaught方法信息 private static boolean inExceptionCaught(Throwable cause) { do { //获取异常堆栈frame信息 StackTraceElement[] trace = cause.getStackTrace(); if (trace != null) { for (StackTraceElement t : trace) { if (t == null) { break; } //是否是exceptionCaught方法抛出的异常,是则返回true if ("exceptionCaught".equals(t.getMethodName())) { return true; } } } cause = cause.getCause(); } while (cause != null); return false; }
//StackTraceElement
package java.lang; import java.util.Objects; /** * An element in a stack trace, as returned by {@link * Throwable#getStackTrace()}. Each element represents a single stack frame. * All stack frames except for the one at the top of the stack represent * a method invocation. The frame at the top of the stack represents the * execution point at which the stack trace was generated. Typically, * this is the point at which the throwable corresponding to the stack trace * was created. 用于描述异常堆栈的frame * * @since 1.4 * @author Josh Bloch */ public final class StackTraceElement implements java.io.Serializable { // Normally initialized by VM (public constructor added in 1.5) private String declaringClass;//异常类 private String methodName;//异常发生点方法 private String fileName;//异常发生的文件名 private int lineNumber;//异常发生的行号 ... }
再来看,触发通道处理器异常处理方法exceptionCaught
private void invokeExceptionCaught(final Throwable cause) { //如果通道处理器已添加到管道 if (invokeHandler()) { try { //触发通道处理器exceptionCaught事件 handler().exceptionCaught(this, cause); } catch (Throwable error) { if (logger.isDebugEnabled()) { logger.debug( "An exception {}" + "was thrown by a user handler's exceptionCaught() " + "method while handling the following exception:", ThrowableUtil.stackTraceToString(error), cause); } else if (logger.isWarnEnabled()) { logger.warn( "An exception '{}' [enable DEBUG level for full stacktrace] " + "was thrown by a user handler's exceptionCaught() " + "method while handling the following exception:", error, cause); } } } else { //否则转递IO异常事件给管道中的下一个Inbound处理器 fireExceptionCaught(cause); } } @Override public ChannelHandlerContext fireExceptionCaught(final Throwable cause) { invokeExceptionCaught(next, cause); return this; }
从上面可以看出,上下文处理通道fireChannelRegistered事件,
如果上下文事件执行器在当前事务循环,则直接在当前线程,执行触发上下文关联通道处理器的channelRegistered事件任务,否则,创建一个线程执行事件任务,并由上下文事务执行器运行;
触发上下文关联通道处理器的channelRegistered事件任务,首先判断上下文是否已经添加到管道,已添加,则触发上下文关联通道处理器的channelRegistered事件,否则转发事件给上下文所属管道的下一个Inbound上下文。如果Inbound事件处理过程中,异常发生,首先检查异常是不是通道处理器的exceptionCaught方法抛出,是,则直接log,否则触发上下文关联通道处理器的exceptionCaught事件。其他Inbound事件的处理过程与fireChannelRegistered方法思路相同,
只不过触发的是通道处理器的相应事件;
前面的文章已讲,可以参考
netty 默认Channel管道线-Inbound和Outbound事件处理:http://donald-draper.iteye.com/blog/2389148
我们这里不再赘述。
再来看Outbound地址绑定事件的处理:
@Override public ChannelFuture bind(SocketAddress localAddress) { return bind(localAddress, newPromise()); }
//创建通道任务DefaultChannelPromise /** * The default {@link ChannelPromise} implementation. It is recommended to use {@link Channel#newPromise()} to create * a new {@link ChannelPromise} rather than calling the constructor explicitly. */ public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint { private final Channel channel; private long checkpoint; ... /** * Creates a new instance. * * @param channel * the {@link Channel} associated with this future */ public DefaultChannelPromise(Channel channel, EventExecutor executor) { super(executor); this.channel = channel; } }
//绑定socket地址 @Override public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { if (localAddress == null) { throw new NullPointerException("localAddress"); } if (isNotValidPromise(promise, false)) { //非可写通道任务,直接返回 // cancelled return promise; } //从当前上下文开始(尾部),向前找到第一个Outbound上下文,处理地址绑定事件 final AbstractChannelHandlerContext next = findContextOutbound(); //获取上下为事件执行器 EventExecutor executor = next.executor(); if (executor.inEventLoop()) { //如果事件执行器线程在事件循环中,则直接委托给invokeBind next.invokeBind(localAddress, promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeBind(localAddress, promise); } }, promise, null); } return promise; } //触发通道处理器地址绑定事件 private void invokeBind(SocketAddress localAddress, ChannelPromise promise) { if (invokeHandler()) {//如果通道处理器已经添加到管道中 try { //触发Outbound通道处理器的bind事件方法 ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); } catch (Throwable t) { //通知异常 notifyOutboundHandlerException(t, promise); } } else { //否则传递绑定事件给管道中的下一个Outbound上下文 bind(localAddress, promise); } } private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) { try { executor.execute(runnable); } catch (Throwable cause) { try { //执行事件失败 promise.setFailure(cause); } finally { if (msg != null) { ReferenceCountUtil.release(msg); } } } }
再来看一下寻找Outbound处理器:
private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; }
最后来看一下异常处理
//通知异常 notifyOutboundHandlerException(t, promise);
private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) { // Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return // false. //直接委托给异步任务结果通知工具PromiseNotificationUtil PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger); }
//PromiseNotificationUtil
package io.netty.util.internal; import io.netty.util.concurrent.Promise; import io.netty.util.internal.logging.InternalLogger; /** * Internal utilities to notify {@link Promise}s. 内部异步任务结果通知工具 */ public final class PromiseNotificationUtil { private PromiseNotificationUtil() { } /** * Try to cancel the {@link Promise} and log if {@code logger} is not {@code null} in case this fails. 通知异步任务取消 */ public static void tryCancel(Promise<?> p, InternalLogger logger) { if (!p.cancel(false) && logger != null) { Throwable err = p.cause(); if (err == null) { logger.warn("Failed to cancel promise because it has succeeded already: {}", p); } else { logger.warn( "Failed to cancel promise because it has failed already: {}, unnotified cause:", p, err); } } } /** * Try to mark the {@link Promise} as success and log if {@code logger} is not {@code null} in case this fails. 通知异步任务成功 */ public static <V> void trySuccess(Promise<? super V> p, V result, InternalLogger logger) { if (!p.trySuccess(result) && logger != null) { Throwable err = p.cause(); if (err == null) { logger.warn("Failed to mark a promise as success because it has succeeded already: {}", p); } else { logger.warn( "Failed to mark a promise as success because it has failed already: {}, unnotified cause:", p, err); } } } /** * Try to mark the {@link Promise} as failure and log if {@code logger} is not {@code null} in case this fails. 通知异步任务取消失败 */ public static void tryFailure(Promise<?> p, Throwable cause, InternalLogger logger) { if (!p.tryFailure(cause) && logger != null) { Throwable err = p.cause(); if (err == null) { logger.warn("Failed to mark a promise as failure because it has succeeded already: {}", p, cause); } else { logger.warn( "Failed to mark a promise as failure because it has failed already: {}, unnotified cause: {}", p, ThrowableUtil.stackTraceToString(err), cause); } } } }
从上面可以看出上下文处理关联通道处理器的地址绑定bind事件,首先从所属管道上下文链的尾部开始,寻找Outbound上下文,找到后,获取上下文的事件执行器,如果事件执行器线程在当前事件循环中,则触发上下文关联通道处理器地址绑定事件,否则创建一个线程,执行事件触发操作,并交由事件执行器执行;触发上下文关联通道处理器地址绑定事件,首先判断上下文关联通道处理器是否已经添加到管道,如果以添加,则触发Outbound通道处理器的bind事件方法,否则,传递地址绑定事件给管道中的下一个Outbound上下文。如果在Outbound事件处理器过程中,出现异常则直接委托给异步任务结果通知工具PromiseNotificationUtil,通知异步任务
失败,并log异常日志。
上下文,处理其他Outbound事件的思路,基本相同,
前面文章已将,可以参考
netty 默认Channel管道线-Inbound和Outbound事件处理:http://donald-draper.iteye.com/blog/2389148
我们这里不再赘述。
再来看其他方法,很简单,不用说了:
@Override public Channel channel() { return pipeline.channel(); } @Override public ChannelPipeline pipeline() { return pipeline; } @Override public ByteBufAllocator alloc() { return channel().config().getAllocator(); } @Override public EventExecutor executor() { if (executor == null) { return channel().eventLoop(); } else { return executor; } } @Override public String name() { return name; } @Override public boolean isRemoved() { return handlerState == REMOVE_COMPLETE; } @Override public <T> Attribute<T> attr(AttributeKey<T> key) { return channel().attr(key); } @Override public <T> boolean hasAttr(AttributeKey<T> key) { return channel().hasAttr(key); } //返回可读的资源泄漏信息,用于追踪资源泄漏情况 @Override public String toHintString() { return '\'' + name + "' will handle the message from this point."; } @Override public String toString() { return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel() + ')'; }
再来通道处理器上下文的默认实现:
//通道处理器上下文默认实现DefaultChannelHandlerContext: package io.netty.channel; import io.netty.util.concurrent.EventExecutor; final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext { private final ChannelHandler handler;//关联通道处理器 DefaultChannelHandlerContext( DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) { super(pipeline, executor, name, isInbound(handler), isOutbound(handler)); if (handler == null) { throw new NullPointerException("handler"); } this.handler = handler; } @Override public ChannelHandler handler() { return handler; } private static boolean isInbound(ChannelHandler handler) { return handler instanceof ChannelInboundHandler; } private static boolean isOutbound(ChannelHandler handler) { return handler instanceof ChannelOutboundHandler; } }
通道处理器上下文默认实现DefaultChannelHandlerContext内部关联一个通道处理器。
总结:
抽象通道处理器上下文AbstractChannelHandlerContext,拥有一个前驱和后继上下文,用于在管道中传递IO事件;通道处理器总共有四个状态,分别为初始化,正在添加到管道,已添加管道和从管道移除状态;上下文同时关联一个管道;Inbound和Outbound两个用于判断上下文的类型,决定了上下文是处理器Inbound事件还是Outbound事件;一个事件执行器executor,当上下文执行器不在当前事务循环中时,用于执行IO事件操作;同时有一些延时任务,如上下文读任务,上下文刷新任务,读完成任务和通道可写状态改变任务。上下文构造,主要是初始化上下文name,所属管道,事件执行器,上下文类型。上下文关联的通道通道处理器在具体的实现中定义,比如通道处理器上下文默认实现为DefaultChannelHandlerContext,内部关联一个通道处理器。
上下文处理通道fireChannelRegistered事件,如果上下文事件执行器在当前事务循环,则直接在当前线程,执行触发上下文关联通道处理器的channelRegistered事件任务,否则,创建一个线程执行事件任务,并由上下文事务执行器运行;触发上下文关联通道处理器的channelRegistered事件任务,首先判断上下文是否已经添加到管道,已添加,则触发
上下文关联通道处理器的channelRegistered事件,否则转发事件给上下文所属管道的下一个Inbound上下文。其他Inbound事件的处理过程与fireChannelRegistered方法思路相同,只不过触发的是通道处理器的相应事件;如果Inbound事件处理过程中,异常发生,首先检查异常是不是通道处理器的exceptionCaught方法抛出,是,则直接log,否则触发上下文关联通道处理器的exceptionCaught事件。
上下文处理关联通道处理器的地址绑定bind事件,首先从所属管道上下文链的尾部开始,
寻找Outbound上下文,找到后,获取上下文的事件执行器,如果事件执行器线程在当前事件循环中,则触发上下文关联通道处理器地址绑定事件,否则创建一个线程,执行事件触发操作,并交由事件执行器执行;触发上下文关联通道处理器地址绑定事件,首先判断上下文关联通道处理器是否已经添加到管道,如果以添加,则触发Outbound通道处理器的bind事件方法,否则,传递地址绑定事件给管道中的下一个Outbound上下文。如果在Outbound事件处理器过程中,出现异常则直接委托给异步任务结果通知工具PromiseNotificationUtil,通知异步任务失败,并log异常日志。
netty的通道处理器上下文和mina的会话有点像,都拥有描述通道Handler的Context;
不同的时mina中的会话与通道直接关联,而netty上下文与通道间是通过Channel管道关联起来,mina中的过滤链是依附于会话,而netty上下文依附于Channel管道,mina中的IO事件执行器为IoProcessor,netty中的IO事件的处理委托给事件循环
或上下文的子事件执行器。
附:
/** * A hint object that provides human-readable message for easier resource leak tracking. */ public interface ResourceLeakHint { /** * Returns a human-readable message that potentially enables easier resource leak tracking. */ String toHintString(); }
发表评论
-
netty NioSocketChannel解析
2017-09-29 12:50 1324netty 抽象BootStrap定义:http://dona ... -
netty Pooled字节buf分配器
2017-09-28 13:00 2063netty 字节buf定义:http://donald-dra ... -
netty Unpooled字节buf分配器
2017-09-26 22:01 2453netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf分配器
2017-09-26 08:43 1320netty 字节buf定义:http:// ... -
netty 复合buf概念
2017-09-25 22:31 1319netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf引用计数器
2017-09-22 12:48 1600netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf解析
2017-09-22 09:00 1848netty 通道接口定义:http://donald-drap ... -
netty 资源泄漏探测器
2017-09-21 09:37 1400netty 通道接口定义:http://donald-drap ... -
netty 字节buf定义
2017-09-20 08:31 2840netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置后续
2017-09-18 08:36 2186netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置初始化
2017-09-17 22:51 2042netty 通道接口定义:http://donald-drap ... -
netty 通道配置接口定义
2017-09-17 14:51 1082netty 通道接口定义:http://donald-drap ... -
netty NioServerSocketChannel解析
2017-09-16 13:01 1880netty ServerBootStrap解析:http:// ... -
netty 抽象nio消息通道
2017-09-15 15:30 1222netty 通道接口定义:http:/ ... -
netty 抽象nio字节通道
2017-09-14 22:39 1205netty 通道接口定义:http:/ ... -
netty 抽象nio通道解析
2017-09-14 17:23 960netty 通道接口定义:http://donald-drap ... -
netty 抽象通道后续
2017-09-13 22:40 1313netty Inboudn/Outbound通道Inv ... -
netty 通道Outbound缓冲区
2017-09-13 14:31 2193netty 通道接口定义:http:/ ... -
netty 抽象Unsafe定义
2017-09-12 21:24 1085netty 通道接口定义:http:/ ... -
netty 抽象通道初始化
2017-09-11 12:56 1859netty 管道线定义-ChannelPipeline:htt ...
相关推荐
- **ChannelHandlerContext**: 提供了与通道相关的上下文信息,用于处理通道事件和调用通道处理器。 - **EventLoop**: 事件循环,负责处理通道上的事件。 - **Pipeline**: 事件处理管道,包含一系列处理器,每个...
它的核心组件包括Bootstrap(引导类)、ServerBootstrap(服务器引导类)、Channel(通道)、ChannelHandlerContext(通道上下文)和EventLoopGroup(事件循环组)等。 1. **Bootstrap与ServerBootstrap**: ...
3. **ChannelHandlerContext**:上下文对象,提供了与 Channel 相关的各种操作,如发送消息、注册事件处理器等。 4. **Pipeline**:通道处理链,每个 Channel 都有一个 Pipeline,其中包含多个处理器...
- 客户端端:使用 `SslContextBuilder.forClient()` 创建客户端上下文,可能需要加载信任的 CA 证书来验证服务器。 3. **通道处理器** - 在 Netty 中,我们需要将 SSL/TLS 配置集成到我们的 `ChannelPipeline`。...
这种模型避免了频繁的上下文切换,提高了系统性能。 深入源码分析,我们可以看到Netty如何优雅地处理了线程安全、内存池管理、心跳机制、解码编码、零拷贝等高级特性。例如,Netty通过内部的DirectBufferPool和...
4. **ChannelHandlerContext(通道上下文)**: 这是与特定Channel关联的上下文对象,通过它我们可以注册事件处理器、读取和写入数据,以及触发I/O操作。 5. **Pipeline(处理管道)**: Pipeline是一系列处理器的链...
它使用了单线程事件循环(Event Loop)处理多个连接,大大减少了线程上下文切换的开销。同时,Netty的线程池设计可以确保并发处理大量连接时的高效性。 标签"java"表明这是Java编程语言的一部分,"netty-all"意味着...
Netty会调用`channelConnected`方法并传入`ChannelHandlerContext`(通道处理器上下文)和`ChannelStateEvent`(通道状态事件),在该方法中编写处理逻辑。 同样的,客户端需要使用`ClientBootstrap`来启动客户端,...
1. **Bootstrap**: 用于启动服务器或客户端的配置类,可以设置各种参数,如EventLoopGroup(事件循环组)、ChannelHandler(通道处理器)等。 2. **EventLoopGroup**: 事件循环组,负责处理I/O事件,通常包括一个...
Netty的核心是其EventLoopGroup(事件循环组),它负责处理I/O事件,并将它们分发到相应的ChannelHandlerContext(通道上下文)中。Channel(通道)是连接到特定I/O资源的对象,而Pipeline(管道)则是一系列处理器...
通过NIO(非阻塞I/O)或EPOLL(Linux下的高性能I/O)实现,Netty能够有效地处理大量并发连接,减少线程间的上下文切换,提高系统资源利用率。此外,Netty还提供了零拷贝特性,减少了数据在内存中的复制,进一步提升...
7. **ChannelHandler(通道处理器)**: 用户可以通过实现ChannelHandler接口来添加自定义的业务逻辑。常见的处理器包括解码器、编码器、事件处理器等。 8. **Future和Promise**:Netty提供了Future和Promise的概念...
4. **线程模型优化**: 通过EventLoopGroup,Netty能够高效地分配线程资源,避免过多线程间的上下文切换。 三、Netty的使用 1. **创建ServerBootstrap**: 服务器启动器,用于配置服务器的参数,如事件循环组、绑定...
5. **线程模型**:Netty的EventLoop(事件循环)线程池设计,确保了每个连接都有一个固定的事件处理器,降低了线程上下文切换的成本。 6. **优化的API**:Netty的API设计简洁易用,易于理解和实现复杂的网络应用。 ...
4. **线程模型**:Netty使用了EventLoopGroup(事件循环组)来管理线程,它将工作线程与I/O线程分离,减少了线程间的上下文切换,提高了效率。 5. **API设计**:Netty的API设计简洁易用,使用ChannelFuture和...
当数据到达时,事件会被添加到事件循环的队列中,由事件循环进行处理,这样可以避免线程上下文切换的开销。同时,Netty的 ByteBuf 提供了零拷贝特性,减少了内存复制,提高了性能。 在实际项目中,Netty可以用来...
- **异步非阻塞**:Netty基于NIO(Non-blocking I/O),能处理大量并发连接,避免了线程上下文切换带来的性能开销。 - **零拷贝**:通过直接操作内存缓冲区,减少数据复制,提高系统效率。 - **灵活的编解码器**...
通过事件循环EventLoop和事件通道EventChannel,Netty能够同时处理多个连接,避免了线程上下文切换带来的开销。 2. **丰富的协议支持**:Netty提供了对多种常见网络协议的支持,如HTTP、HTTPS、FTP、SMTP、TLS/SSL...
Netty 的核心是其异步事件驱动模型,这种模型能够高效地处理并发连接,减少线程上下文切换带来的开销。 二、HTTP 协议基础 HTTP(超文本传输协议)是互联网上应用最广泛的一种网络协议,主要用于浏览器和服务器之间...
非阻塞I/O允许单个线程处理多个连接,减少了线程创建和上下文切换的开销。 2. **EventLoop(事件循环)**:Netty中的事件循环是其异步模型的基础。每个EventLoop负责处理一组通道(Channels),并在这些通道上有...