- 浏览: 981499 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
Netty 通道处理器ChannelHandler和适配器定义ChannelHandlerAdapter:
http://donald-draper.iteye.com/blog/2386891
引言:
前面一篇文章我们看了通道处理器及适配器的定义,先来回顾一下:
通道处理器ChannelHandler,主要有两个事件方法分别为handlerAdded和handlerRemoved,handlerAdded在通道处理器添加到实际上下文后调用,通道处理器准备处理IO事件;handlerRemoved在通道处理器从实际上下文中移除后调用,通道处理器不再处理IO事件。
一个通道处理器关联一个通道处理器上下文ChannelHandlerContext。通道处理器通过一个上下文对象,与它所属的通道管道线交互。通道上下文对象,通道处理器上行或下行传递的事件,动态修改管道,或通过AttributeKey存储特殊的信息。通道处理器内部定义了一个共享注解Sharable,默认访问类型为Protected;添加共享注解的通道处理器,说明通道处理器中的变量可以共享,可以创建一个通道处理器实例,多次添加到通道管道线ChannlePipeline;对于没有共享注解的通道器,在每次添加到管道线上时,都要重新创建一个通道处理器实例。通道处理器只定义了简单的通道处理器添加到通道处理器上下文或从上下文移除的事件操作,没有具体定义读操作(上行UpStream,输入流Inbound,字节流到消息对象ByteToMessage),写操作(下行DownStream,输出流Outbound,消息到字节流MessageToByte)。这操作分别定义在,输入流处理器ChannelInboundHandler,输出流处理器ChannelOutboundHandler,并提供了处理的相应适配器,输入流处理器适配器ChannelInboundHandlerAdapter,输出流通道适配器ChannelOutboundHandlerAdapter,多路复用适配器ChannelDuplexHandler。
通道处理器适配器ChannelHandlerAdapter的设计模式为适配器,这个适配器模式中的 handlerAdded和handlerRemoved事件默认处理器,不做任何事情,这个与MINA中的适配器模式相同。处理IO操作异常,则调用ChannelHandlerContext#fireExceptionCaught方法,触发异常事件,并转发给通道管道线的下一个通道处理器。
看通道处理器适配器的判断通道处理器是否共享注解,首先获取线程的本地变量,从线程本地变量获取线程本地共享注解通道处理器探测结果缓存,如果缓存中存在通道处理器clazz,则返回缓存结果,否则将探测结果添加到缓存中。
今天来看一下Inbound处理器
从通道Inbound处理器来看,主要是处理从peer发送过来的字节流;
通道处理器上下文关联的通道注册到事件循环EventLoop时,触发channelRegistered方法;
通道处理器上下文关联的通道激活时,触发channelActive方法;
通道从peer读取消息时,触发channelRead方法;
当上一消息通过#channelRead方法,并被当先读操作消费时,触发channelReadComplete方法,
如果通道配置项#AUTO_READ为关闭状态,没有进一步尝试从当前通道读取inbound数据时,
直到ChannelHandlerContext#read调用,触发;
当用户事件发生时,触发userEventTriggered方法;
异常抛出时,触发exceptionCaught方法;
当通道可写状态改变时,触发channelWritabilityChanged方法;
通道处理器上下文关联的通道注册到事件循环EventLoop,但处于非激活状态,
达到生命周期的末端时,触发channelInactive方法;
通道处理器上下文关联的通道从事件循环EventLoop移除时,触发channelUnregistered方法。
再来看通道Inbound处理器适配器ChannelInboundHandlerAdapter
从上面来看Inbound通道handler适配器ChannelInboundHandlerAdapter,提供的Inbound通道处理器的所有方法的实现,但实现仅仅是,转发操作给Channel管道线的下一个通道处理器,子类必须重写方法。需要注意的是,在#channelRead方法自动返回后,消息并没有释放。如果你寻找ChannelInboundHandler的实现,可以自动释放接受的到消息可以使用SimpleChannelInboundHandler。
在前面的Netty实例文章中,消息解码器一般要继承ByteToMessageDecoder,我们来看一下ByteToMessageDecoder继承树
实际上消息解码继承与ChannelInboundHandlerAdapter,可以简单理解为一个Inbound通道处理器。
消息编码器一般为继承MessageToByteEncoder,我们从MessageToByteEncoder来看Outbound通道处理器;
实际上消息编码器为Outbound通道处理器,下面我们来看一下Outbound处理器的定义。
从上面来看,Outbound通道处理器ChannelOutboundHandler主要处理outbound IO操作。
当绑定操作发生时,调用bind方法;
当连接操作发生时,调用connect方法;
read方法拦截通道处理器上下文读操作;
当写操发生时,调用write方法,写操作通过Channel管道线写消息,
当通道调用#flush方法时,消息将会被刷新,发送出去;
当一个刷新操作发生时,调用flush方法,刷新操作将会刷新所有先前已经写,待发送的消息。
再来看Outbound通道Handler适配器:
Outbound通道Handler适配器ChannelOutboundHandlerAdapter为Outbound通道处理器的基本实现,这个实现仅仅通过通道处理器上下文转发方法的调用。
子类必须重写Outbound通道Handler适配器的相关方法。
总结:
通道Inbound处理器,主要是处理从peer发送过来的字节流;通道处理器上下文关联的通道注册到事件循环EventLoop时,触发channelRegistered方法;通道处理器上下文关联的通道激活时,触发channelActive方法;通道从peer读取消息时,触发channelRead方法;当上一消息通过#channelRead方法,并被当先读操作消费时,触发channelReadComplete方法,如果通道配置项
#AUTO_READ为关闭状态,没有进一步尝试从当前通道读取inbound数据时,直到ChannelHandlerContext#read调用,触发;当用户事件发生时,触发userEventTriggered方法;异常抛出时,触发exceptionCaught方法;当通道可写状态改变时,触发channelWritabilityChanged方法;通道处理器上下文关联的通道注册到事件循环EventLoop,但处于非激活状态,达到生命周期的末端时,触发channelInactive方法;通道处理器上下文关联的通道从事件循环EventLoop移除时,触发channelUnregistered方法。
Inbound通道handler适配器ChannelInboundHandlerAdapter,提供的Inbound通道处理器的所有方法的实现,但实现仅仅是,转发操作给Channel管道线的下一个通道处理器,子类必须重写方法。需要注意的是,在#channelRead方法自动返回后,消息并没有释放。如果你寻找ChannelInboundHandler的实现,可以自动释放接受的到消息可以使用SimpleChannelInboundHandler。
Outbound通道处理器ChannelOutboundHandler主要处理outbound IO操作。当绑定操作发生时,调用bind方法;当连接操作发生时,调用connect方法;read方法拦截通道处理器上下文读操作;当写操发生时,调用write方法,写操作通过Channel管道线写消息,当通道调用#flush方法时,消息将会被刷新,发送出去;当一个刷新操作发生时,调用flush方法,刷新操作将会刷新所有先前已经写,待发送的消息。
Outbound通道Handler适配器ChannelOutboundHandlerAdapter为Outbound通道处理器的基本实现,这个实现仅仅通过通道处理器上下文转发方法的调用。
子类必须重写Outbound通道Handler适配器的相关方法。
在Mina中,通道读写全部在一个通道Handler,Mina提供的通道Handler适配器,我们在使用通道处理器时继承它,实现我们需要关注的读写事件。而Netty使用InBound和OutBound将通道的读写分离,同时提供了InBound和OutBound通道Handler的适配器。
附:
我们这里简单看一ChannelPromise继承树,有机会我们在后面在详讲
事件循环组EventExecutorGroup:
http://donald-draper.iteye.com/blog/2386891
引言:
前面一篇文章我们看了通道处理器及适配器的定义,先来回顾一下:
通道处理器ChannelHandler,主要有两个事件方法分别为handlerAdded和handlerRemoved,handlerAdded在通道处理器添加到实际上下文后调用,通道处理器准备处理IO事件;handlerRemoved在通道处理器从实际上下文中移除后调用,通道处理器不再处理IO事件。
一个通道处理器关联一个通道处理器上下文ChannelHandlerContext。通道处理器通过一个上下文对象,与它所属的通道管道线交互。通道上下文对象,通道处理器上行或下行传递的事件,动态修改管道,或通过AttributeKey存储特殊的信息。通道处理器内部定义了一个共享注解Sharable,默认访问类型为Protected;添加共享注解的通道处理器,说明通道处理器中的变量可以共享,可以创建一个通道处理器实例,多次添加到通道管道线ChannlePipeline;对于没有共享注解的通道器,在每次添加到管道线上时,都要重新创建一个通道处理器实例。通道处理器只定义了简单的通道处理器添加到通道处理器上下文或从上下文移除的事件操作,没有具体定义读操作(上行UpStream,输入流Inbound,字节流到消息对象ByteToMessage),写操作(下行DownStream,输出流Outbound,消息到字节流MessageToByte)。这操作分别定义在,输入流处理器ChannelInboundHandler,输出流处理器ChannelOutboundHandler,并提供了处理的相应适配器,输入流处理器适配器ChannelInboundHandlerAdapter,输出流通道适配器ChannelOutboundHandlerAdapter,多路复用适配器ChannelDuplexHandler。
通道处理器适配器ChannelHandlerAdapter的设计模式为适配器,这个适配器模式中的 handlerAdded和handlerRemoved事件默认处理器,不做任何事情,这个与MINA中的适配器模式相同。处理IO操作异常,则调用ChannelHandlerContext#fireExceptionCaught方法,触发异常事件,并转发给通道管道线的下一个通道处理器。
看通道处理器适配器的判断通道处理器是否共享注解,首先获取线程的本地变量,从线程本地变量获取线程本地共享注解通道处理器探测结果缓存,如果缓存中存在通道处理器clazz,则返回缓存结果,否则将探测结果添加到缓存中。
今天来看一下Inbound处理器
/** * {@link ChannelHandler} which adds callbacks for state changes. This allows the user * to hook in to state changes easily. ChannelInboundHandler在通道处理器状态改变时,回调。允许用户hook处理器的状态改变 */ public interface ChannelInboundHandler extends ChannelHandler { /** * The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop} 通道处理器上下文关联的通道注册到事件循环EventLoop时,触发 */ void channelRegistered(ChannelHandlerContext ctx) throws Exception; /** * The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop} 通道处理器上下文关联的通道从事件循环EventLoop移除时,触发 */ void channelUnregistered(ChannelHandlerContext ctx) throws Exception; /** * The {@link Channel} of the {@link ChannelHandlerContext} is now active 通道处理器上下文关联的通道激活时,触发 */ void channelActive(ChannelHandlerContext ctx) throws Exception; /** * The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its * end of lifetime. 通道处理器上下文关联的通道注册到事件循环EventLoop,但处于非激活状态,达到生命周期的末端时,触发 */ void channelInactive(ChannelHandlerContext ctx) throws Exception; /** * Invoked when the current {@link Channel} has read a message from the peer. 通道从peer读取消息时,触发 */ void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception; /** * Invoked when the last message read by the current read operation has been consumed by * {@link #channelRead(ChannelHandlerContext, Object)}. If {@link ChannelOption#AUTO_READ} is off, no further * attempt to read an inbound data from the current {@link Channel} will be made until * {@link ChannelHandlerContext#read()} is called. 当上一消息通过#channelRead方法,并被当先读操作消费时,触发。如果通道配置项#AUTO_READ为关闭状态,没有进一步 尝试从当前通道读取inbound数据时,直到ChannelHandlerContext#read调用,触发。 */ void channelReadComplete(ChannelHandlerContext ctx) throws Exception; /** * Gets called if an user event was triggered. 当用户事件发生时,触发 */ void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception; /** * Gets called once the writable state of a {@link Channel} changed. You can check the state with * {@link Channel#isWritable()}. 当通道可写状态改变时,触发 */ void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception; /** * Gets called if a {@link Throwable} was thrown. 异常抛出时,触发 */ @Override @SuppressWarnings("deprecation") void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception; }
从通道Inbound处理器来看,主要是处理从peer发送过来的字节流;
通道处理器上下文关联的通道注册到事件循环EventLoop时,触发channelRegistered方法;
通道处理器上下文关联的通道激活时,触发channelActive方法;
通道从peer读取消息时,触发channelRead方法;
当上一消息通过#channelRead方法,并被当先读操作消费时,触发channelReadComplete方法,
如果通道配置项#AUTO_READ为关闭状态,没有进一步尝试从当前通道读取inbound数据时,
直到ChannelHandlerContext#read调用,触发;
当用户事件发生时,触发userEventTriggered方法;
异常抛出时,触发exceptionCaught方法;
当通道可写状态改变时,触发channelWritabilityChanged方法;
通道处理器上下文关联的通道注册到事件循环EventLoop,但处于非激活状态,
达到生命周期的末端时,触发channelInactive方法;
通道处理器上下文关联的通道从事件循环EventLoop移除时,触发channelUnregistered方法。
再来看通道Inbound处理器适配器ChannelInboundHandlerAdapter
package io.netty.channel; /** * Abstract base class for {@link ChannelInboundHandler} implementations which provide * implementations of all of their methods. * 通道Inbound处理器抽象实现,提供了所有方法的实现。 * * This implementation just forward the operation to the next {@link ChannelHandler} in the * {@link ChannelPipeline}. Sub-classes may override a method implementation to change this. * Inbound通道handler适配器的实现,仅仅转发操作给Channel管道线的下一个通道处理器。子类必须重写方法。 * * Be aware that messages are not released after the {@link #channelRead(ChannelHandlerContext, Object)} * method returns automatically. If you are looking for a {@link ChannelInboundHandler} implementation that * releases the received messages automatically, please see {@link SimpleChannelInboundHandler}. 需要注意的是,在#channelRead方法自动返回后,消息并没有释放。如果你寻找ChannelInboundHandler的实现,可以自动 释放接受的到消息可以使用SimpleChannelInboundHandler。 * */ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler { /** * Calls {@link ChannelHandlerContext#fireChannelRegistered()} to forward * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}. * * Sub-classes may override this method to change behavior. */ @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelRegistered(); } /** * Calls {@link ChannelHandlerContext#fireChannelUnregistered()} to forward * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}. * * Sub-classes may override this method to change behavior. */ @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelUnregistered(); } /** * Calls {@link ChannelHandlerContext#fireChannelActive()} to forward * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}. * * Sub-classes may override this method to change behavior. */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); } /** * Calls {@link ChannelHandlerContext#fireChannelInactive()} to forward * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}. * * Sub-classes may override this method to change behavior. */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive(); } /** * Calls {@link ChannelHandlerContext#fireChannelRead(Object)} to forward * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}. * * Sub-classes may override this method to change behavior. */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); } /** * Calls {@link ChannelHandlerContext#fireChannelReadComplete()} to forward * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}. * * Sub-classes may override this method to change behavior. */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); } /** * Calls {@link ChannelHandlerContext#fireUserEventTriggered(Object)} to forward * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}. * * Sub-classes may override this method to change behavior. */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { ctx.fireUserEventTriggered(evt); } /** * Calls {@link ChannelHandlerContext#fireChannelWritabilityChanged()} to forward * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}. * * Sub-classes may override this method to change behavior. */ @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelWritabilityChanged(); } /** * Calls {@link ChannelHandlerContext#fireExceptionCaught(Throwable)} to forward * to the next {@link ChannelHandler} in the {@link ChannelPipeline}. * * Sub-classes may override this method to change behavior. */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } }
从上面来看Inbound通道handler适配器ChannelInboundHandlerAdapter,提供的Inbound通道处理器的所有方法的实现,但实现仅仅是,转发操作给Channel管道线的下一个通道处理器,子类必须重写方法。需要注意的是,在#channelRead方法自动返回后,消息并没有释放。如果你寻找ChannelInboundHandler的实现,可以自动释放接受的到消息可以使用SimpleChannelInboundHandler。
public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter
在前面的Netty实例文章中,消息解码器一般要继承ByteToMessageDecoder,我们来看一下ByteToMessageDecoder继承树
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
实际上消息解码继承与ChannelInboundHandlerAdapter,可以简单理解为一个Inbound通道处理器。
消息编码器一般为继承MessageToByteEncoder,我们从MessageToByteEncoder来看Outbound通道处理器;
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter
实际上消息编码器为Outbound通道处理器,下面我们来看一下Outbound处理器的定义。
package io.netty.channel; import java.net.SocketAddress; /** * {@link ChannelHandler} which will get notified for IO-outbound-operations. Outbound通道处理器处理outbound IO操作。 */ public interface ChannelOutboundHandler extends ChannelHandler { /** * Called once a bind operation is made. *当绑定操作发生时,调用 * @param ctx the {@link ChannelHandlerContext} for which the bind operation is made 通道处理器上下文 * @param localAddress the {@link SocketAddress} to which it should bound 绑定的本地socket地址 * @param promise the {@link ChannelPromise} to notify once the operation completes 通知一个操作是否完成 * @throws Exception thrown if an error occurs */ void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception; /** * Called once a connect operation is made. *当连接操作完成时,调用 * @param ctx the {@link ChannelHandlerContext} for which the connect operation is made 通道处理器上下文 * @param remoteAddress the {@link SocketAddress} to which it should connect 远端socket地址 * @param localAddress the {@link SocketAddress} which is used as source on connect 本地Socket地址 * @param promise the {@link ChannelPromise} to notify once the operation completes 通知一个操作是否完成 * @throws Exception thrown if an error occurs */ void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception; /** * Called once a disconnect operation is made. *当断开连接时,调用 * @param ctx the {@link ChannelHandlerContext} for which the disconnect operation is made * @param promise the {@link ChannelPromise} to notify once the operation completes * @throws Exception thrown if an error occurs */ void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; /** * Called once a close operation is made. *当关闭操作发生时,调用 * @param ctx the {@link ChannelHandlerContext} for which the close operation is made * @param promise the {@link ChannelPromise} to notify once the operation completes * @throws Exception thrown if an error occurs */ void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; /** * Called once a deregister operation is made from the current registered {@link EventLoop}. 当通道处理器,从当前注册的事件循环EventLoop,反注册时,调用 * * @param ctx the {@link ChannelHandlerContext} for which the close operation is made * @param promise the {@link ChannelPromise} to notify once the operation completes * @throws Exception thrown if an error occurs */ void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; /** * Intercepts {@link ChannelHandlerContext#read()}. 拦截通道处理器上下文读操作 */ void read(ChannelHandlerContext ctx) throws Exception; /** * Called once a write operation is made. The write operation will write the messages through the * {@link ChannelPipeline}. Those are then ready to be flushed to the actual {@link Channel} once * {@link Channel#flush()} is called *当写操发生时,调用。写操作通过Channel管道线写消息。当通道调用#flush方法时,消息将会被刷新,发送出去。 * @param ctx the {@link ChannelHandlerContext} for which the write operation is made * @param msg the message to write 写消息 * @param promise the {@link ChannelPromise} to notify once the operation completes * @throws Exception thrown if an error occurs */ void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception; /** * Called once a flush operation is made. The flush operation will try to flush out all previous written messages * that are pending. *当一个刷新操作发生,调用。刷新操作将会刷新所有先前已经写,待发送的消息。 * @param ctx the {@link ChannelHandlerContext} for which the flush operation is made * @throws Exception thrown if an error occurs */ void flush(ChannelHandlerContext ctx) throws Exception; }
从上面来看,Outbound通道处理器ChannelOutboundHandler主要处理outbound IO操作。
当绑定操作发生时,调用bind方法;
当连接操作发生时,调用connect方法;
read方法拦截通道处理器上下文读操作;
当写操发生时,调用write方法,写操作通过Channel管道线写消息,
当通道调用#flush方法时,消息将会被刷新,发送出去;
当一个刷新操作发生时,调用flush方法,刷新操作将会刷新所有先前已经写,待发送的消息。
再来看Outbound通道Handler适配器:
package io.netty.channel; import java.net.SocketAddress; /** * Skeleton implementation of a {@link ChannelOutboundHandler}. This implementation just forwards each method call via * the {@link ChannelHandlerContext}. ChannelOutboundHandlerAdapter为Outbound通道处理器的基本实现,这个实现仅仅通过通道处理器上下文转发方法的调用 */ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler { /** * Calls {@link ChannelHandlerContext#bind(SocketAddress, ChannelPromise)} to forward * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}. *bind方法仅仅转发操作给Channel管道线下一个Outbound处理,子类必须重写此方法。 * Sub-classes may override this method to change behavior. */ @Override public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { ctx.bind(localAddress, promise); } //其他的方法,处理行为与Bind相同 /** * Calls {@link ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)} to forward * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}. * * Sub-classes may override this method to change behavior. */ @Override public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { ctx.connect(remoteAddress, localAddress, promise); } /** * Calls {@link ChannelHandlerContext#disconnect(ChannelPromise)} to forward * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}. * * Sub-classes may override this method to change behavior. */ @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.disconnect(promise); } /** * Calls {@link ChannelHandlerContext#close(ChannelPromise)} to forward * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}. * * Sub-classes may override this method to change behavior. */ @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.close(promise); } /** * Calls {@link ChannelHandlerContext#deregister(ChannelPromise)} to forward * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}. * * Sub-classes may override this method to change behavior. */ @Override public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.deregister(promise); } /** * Calls {@link ChannelHandlerContext#read()} to forward * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}. * * Sub-classes may override this method to change behavior. */ @Override public void read(ChannelHandlerContext ctx) throws Exception { ctx.read(); } /** * Calls {@link ChannelHandlerContext#write(Object, ChannelPromise)} to forward * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}. * * Sub-classes may override this method to change behavior. */ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.write(msg, promise); } /** * Calls {@link ChannelHandlerContext#flush()} to forward * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}. * * Sub-classes may override this method to change behavior. */ @Override public void flush(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }
Outbound通道Handler适配器ChannelOutboundHandlerAdapter为Outbound通道处理器的基本实现,这个实现仅仅通过通道处理器上下文转发方法的调用。
子类必须重写Outbound通道Handler适配器的相关方法。
总结:
通道Inbound处理器,主要是处理从peer发送过来的字节流;通道处理器上下文关联的通道注册到事件循环EventLoop时,触发channelRegistered方法;通道处理器上下文关联的通道激活时,触发channelActive方法;通道从peer读取消息时,触发channelRead方法;当上一消息通过#channelRead方法,并被当先读操作消费时,触发channelReadComplete方法,如果通道配置项
#AUTO_READ为关闭状态,没有进一步尝试从当前通道读取inbound数据时,直到ChannelHandlerContext#read调用,触发;当用户事件发生时,触发userEventTriggered方法;异常抛出时,触发exceptionCaught方法;当通道可写状态改变时,触发channelWritabilityChanged方法;通道处理器上下文关联的通道注册到事件循环EventLoop,但处于非激活状态,达到生命周期的末端时,触发channelInactive方法;通道处理器上下文关联的通道从事件循环EventLoop移除时,触发channelUnregistered方法。
Inbound通道handler适配器ChannelInboundHandlerAdapter,提供的Inbound通道处理器的所有方法的实现,但实现仅仅是,转发操作给Channel管道线的下一个通道处理器,子类必须重写方法。需要注意的是,在#channelRead方法自动返回后,消息并没有释放。如果你寻找ChannelInboundHandler的实现,可以自动释放接受的到消息可以使用SimpleChannelInboundHandler。
Outbound通道处理器ChannelOutboundHandler主要处理outbound IO操作。当绑定操作发生时,调用bind方法;当连接操作发生时,调用connect方法;read方法拦截通道处理器上下文读操作;当写操发生时,调用write方法,写操作通过Channel管道线写消息,当通道调用#flush方法时,消息将会被刷新,发送出去;当一个刷新操作发生时,调用flush方法,刷新操作将会刷新所有先前已经写,待发送的消息。
Outbound通道Handler适配器ChannelOutboundHandlerAdapter为Outbound通道处理器的基本实现,这个实现仅仅通过通道处理器上下文转发方法的调用。
子类必须重写Outbound通道Handler适配器的相关方法。
在Mina中,通道读写全部在一个通道Handler,Mina提供的通道Handler适配器,我们在使用通道处理器时继承它,实现我们需要关注的读写事件。而Netty使用InBound和OutBound将通道的读写分离,同时提供了InBound和OutBound通道Handler的适配器。
附:
我们这里简单看一ChannelPromise继承树,有机会我们在后面在详讲
/** * Special {@link ChannelFuture} which is writable. */ public interface ChannelPromise extends ChannelFuture, Promise<Void> {
public interface ChannelFuture extends Future<Void> {
/** * Special {@link Future} which is writable. */ public interface Promise<V> extends Future<V> {
事件循环组EventExecutorGroup:
/** * Special {@link EventExecutorGroup} which allows registering {@link Channel}s that get * processed for later selection during the event loop. EventExecutorGroup允许注册通道到EventLoop,以便在事件循环选择过程中处理通道事件 * */ public interface EventLoopGroup extends EventExecutorGroup {
/** * The {@link EventExecutorGroup} is responsible for providing the {@link EventExecutor}'s to use * via its {@link #next()} method. Besides this, it is also responsible for handling their * life-cycle and allows shutting them down in a global fashion. EventExecutorGroup负责事件的执行 * */ public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
发表评论
-
netty NioSocketChannel解析
2017-09-29 12:50 1321netty 抽象BootStrap定义:http://dona ... -
netty Pooled字节buf分配器
2017-09-28 13:00 2057netty 字节buf定义:http://donald-dra ... -
netty Unpooled字节buf分配器
2017-09-26 22:01 2447netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf分配器
2017-09-26 08:43 1317netty 字节buf定义:http:// ... -
netty 复合buf概念
2017-09-25 22:31 1310netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf引用计数器
2017-09-22 12:48 1596netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf解析
2017-09-22 09:00 1844netty 通道接口定义:http://donald-drap ... -
netty 资源泄漏探测器
2017-09-21 09:37 1398netty 通道接口定义:http://donald-drap ... -
netty 字节buf定义
2017-09-20 08:31 2834netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置后续
2017-09-18 08:36 2179netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置初始化
2017-09-17 22:51 2037netty 通道接口定义:http://donald-drap ... -
netty 通道配置接口定义
2017-09-17 14:51 1079netty 通道接口定义:http://donald-drap ... -
netty NioServerSocketChannel解析
2017-09-16 13:01 1877netty ServerBootStrap解析:http:// ... -
netty 抽象nio消息通道
2017-09-15 15:30 1219netty 通道接口定义:http:/ ... -
netty 抽象nio字节通道
2017-09-14 22:39 1202netty 通道接口定义:http:/ ... -
netty 抽象nio通道解析
2017-09-14 17:23 958netty 通道接口定义:http://donald-drap ... -
netty 抽象通道后续
2017-09-13 22:40 1310netty Inboudn/Outbound通道Inv ... -
netty 通道Outbound缓冲区
2017-09-13 14:31 2190netty 通道接口定义:http:/ ... -
netty 抽象Unsafe定义
2017-09-12 21:24 1077netty 通道接口定义:http:/ ... -
netty 抽象通道初始化
2017-09-11 12:56 1857netty 管道线定义-ChannelPipeline:htt ...
相关推荐
Netty的处理器主要分为两种类型:入站处理器(Inbound Handler)和出站处理器(Outbound Handler)。入站处理器通常处理读取数据、连接事件等事件,出站处理器则处理写入数据、建立连接等请求。 Netty的架构包括了...
6. **Handlers**:ChannelHandler是业务逻辑的载体,分为Inbound和Outbound两种类型,分别处理输入和输出事件。例如,解码器、编码器、异常处理器等。 7. **Future和Promise**:Netty提供了Future和Promise来处理...
- ChannelHandler是Netty处理I/O事件的核心组件,包括Inbound和Outbound两种类型,分别处理入站事件(如连接、接收数据)和出站事件(如发送数据、关闭连接)。 - 通常,我们会创建自定义的处理器类,继承`...
有Inbound和Outbound两种,分别处理入站和出站事件。 5. **EventLoop与EventLoopGroup**: - EventLoop是线程的实现,负责处理I/O事件并调度任务。EventLoopGroup是一组EventLoop的集合,用于分配和管理线程资源。...
4. **Pipeline(处理器链)**:Netty的处理器链设计允许在连接上定义一组处理任务,每个任务都是一个ChannelInboundHandler或ChannelOutboundHandler。数据在Pipeline中按顺序传递,实现了解耦和职责分离。 5. **...
每个处理器都有进站(Inbound)和出站(Outbound)两个处理阶段,用于处理接收到的数据和发送数据的操作。 进站操作通常涉及接收连接、读取数据、解码数据等。例如,`ByteToMessageDecoder` 就是一个常见的进站...
《Netty实战》是针对Java网络编程框架Netty的一本深度实践书籍,它涵盖了Netty的核心概念、设计模式以及在实际应用中的最佳实践。Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能...
有Inbound 和 Outbound 两种类型,分别处理入站和出站事件。 6. **EventLoop** 和 **EventLoopGroup**:EventLoop 是执行事件处理的线程,EventLoopGroup 是 EventLoop 的集合,用于分配事件循环并管理线程。 7. *...
1. **Inbound 和 Outbound**:ChannelHandler 分为两种类型,Inbound Handler 处理接收的数据,Outbound Handler 处理发送的数据。这使得处理流程更加清晰和可扩展。 2. **ChannelHandlerContext**:在 ...
5. **Pipeline**: 事件处理器链,每个连接都有自己的Pipeline,其中包含多个处理器(ChannelHandler),用于处理入站(Inbound)和出站(Outbound)事件。 6. **ByteBuf**: Netty的字节缓冲区,比Java的ByteBuffer...
Handler是Netty处理业务逻辑的关键,分为Inbound和Outbound两种。Inbound Handler处理入站事件,如连接建立、数据读取;Outbound Handler处理出站事件,如数据写入、连接关闭。用户可以根据需求自定义Handler,实现...
- **Handler**:处理I/O事件或消息的组件,分为Inbound和Outbound两种,分别处理入站和出站操作。 - **EventLoop**:执行Handler的事件处理方法,Netty使用线程池实现,确保每个Channel分配一个EventLoop。 - **...
- **Handler**:处理逻辑的组件,分为Inbound Handler(处理入站事件,如接收数据)和Outbound Handler(处理出站事件,如发送数据)。 3. **Netty的工作流程** - **BossGroup** 和 **WorkerGroup**:BossGroup...
- ChannelHandler是处理I/O事件的接口,包括入站事件(Inbound Events)和出站事件(Outbound Events),如数据接收、连接建立等。 4. **ChannelPipeline**: - ChannelPipeline是事件处理链,负责将接收到的事件...
- **管道(Pipeline)**:是一个由多个处理器组成的链表结构,用于处理入站(Inbound)和出站(Outbound)的数据流。每个处理器都可以执行特定的逻辑处理,比如编解码、错误处理等。 #### 3. SEDA事件处理模式 SEDA...
- 上游和下游事件的处理方式改变,`Upstream` 改为 `Inbound`,`Downstream` 改为 `Outbound`。这使得事件处理更加清晰,简化了处理逻辑。 - `ChannelHandler` 接口的继承结构发生变化,新增了 `...
每个ChannelHandler负责处理特定类型的事件,如入站(inbound)事件(如数据接收)和出站(outbound)事件(如数据发送)。 7. **ChannelHandlerContext**:这是ChannelHandler的上下文,它定义了ChannelHandler...
- Netty入站出站:介绍了Netty中的入站(inbound)和出站(outbound)处理机制。 4. MyBatis: - MyBatis面:可能是关于MyBatis框架的面试题库。 - Mybatis工作原理:解释MyBatis的SQL映射和持久层操作的原理。 ...
客户端 esl-client是项目的基于Java的事件套接字库。 该项目是未维护的原始项目的分支,位于 状态:进行中... ...import org.freeswitch.esl.client.outbound.Context ; import org.freeswitch.e
- **过滤器名称变化**:将原来的`Pre-routing Filters`改名为`Inbound Filters`,`Routing Filter`改为`Endpoint Filter`,`Post-routing Filters`则变为`Outbound Filters`。 - **支持HTTP2**:为了提高性能和效率...