- 浏览: 981061 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
Netty 通道处理器ChannelHandler和适配器定义ChannelHandlerAdapter:http://donald-draper.iteye.com/blog/2386891
Netty Inbound/Outbound通道处理器定义:http://donald-draper.iteye.com/blog/2387019
netty 简单Inbound通道处理器(SimpleChannelInboundHandler):http://donald-draper.iteye.com/blog/2387772
netty 消息编码器-MessageToByteEncoder:http://donald-draper.iteye.com/blog/2387832
引言:
前一篇文章看了一下消息编码器MessageToByteEncoder,我们来回顾一下:
消息编码器MessageToByteEncoder实际上为一个Outbound通道处理器,内部有一个类型参数处理器TypeParameterMatcher,用于判断消息是否可以被当前编码器处理,不能则传给Channel管道线上的下一个通道处理器;一个preferDirect参数,用于决定,当将消息编码为字节序列时,应该存储在direct类型还是heap类型的字节buffer中。消息编码器主要方法为write方法,write方法首先,判断消息是否可以被当前编码器处理,如果消息可以被编码器处理,根据通道处理器上下文和preferDirect,分配一个字节buf,委托encode方法,编码消息对象到字节buf,encode方法待子类实现;释放消息对应引用参数,如果当前buffer可读,则通道处理器上下文写buffer,否则释放buffer,写空buf,最后释放buf。消息编码器MessageToByteEncoder实际上为一个Outbound通道处理器,这个与Mina中的消息编码器是有区别的,Mina中的消息编码器要和解码器组装成编解码工厂过滤器添加到过滤链上,且编解码工厂过滤器,在过滤链上是由先后顺序的,通道Mina中编码器和通道Handler是两个概念。而Netty中编码器实际为Outbound通道处理器,主要是通过类型参数匹配器TypeParameterMatcher,来判断消息是否可以被编码器处理。
今天来看一下消息解码器:
来看一下复合buf的定义
从上面可以看出:
MERGE_CUMULATOR累计器buf,累计buf的过程为,首先判断当前累计buf空间不够存储,需要整合的in buf,或当前buf的引用数大于1,或累计buf只可读,三个条件中,有一个满足,则扩展累计buf容量,然后写in buf字节序列到累计器,释放in buf。COMPOSITE_CUMULATOR累计器是将需要整合的buf放在,内部的Component集合中,每个Component描述一个buf信息。
为了理解后面的读操作过程,再来看一下解码器的相关属性:
从上面来看,解码器有一个累计buf cumulation用于存放接收的数据;一个累计器Cumulator,默认为MERGE_CUMULATOR,累计接收的字节数据到cumulatio;first表示是否第一次累计字节到累计buf;decodeWasNull表示当前解码器是否成功解码消息,后续调ByteBuf#discardSomeReadBytes方法;singleDecode表示解码器是否只能解码一次;numReads表示当前已经读取的字节数;
discardAfterReads表示在读取多少个字节后,调用ByteBuf#discardSomeReadBytes方法,释放buf空间;解码器有三种状态,初始化状态STATE_INIT,还没有解码,正在解码状态STATE_CALLING_CHILD_DECODE,解码器正在从通道处理器上下文移除状态STATE_HANDLER_REMOVED_PENDING。
再来看构造
再来看读操作
读取方法我们有一下几点待看:
1.
2.
第二点有以下几点要看:
2.a
//ChannelHandlerContext
//ChannelInboundInvoker
2.b
再来看下面一句:
在来看这一句:
//ChannelHandlerContext
//ChannelInboundInvoker
3.
p
//ByteBuf
4.
//CodecOutputList
5.
来看解码消息集合回收操作
//CodecOutputList
小节一下读操作:
读取操作首先判断消息是否为字节buf,是则,创建解码消息List集合out,
如果第一次累计字节buf,则累计buf为,消息buf,否则累计器,累计消息buf数据,
然后调用解码器#callDecode解码累计buf中的数据,并将解码后的消息添加到out集合中,并
遍历解码消息集合,转发消息到Channle管道线上的下一个通道处理器。如果消息类型不是字节buf,直接通知Channle管道线上的下一个通道处理器消息消息。在解码的过程中,如果解码器从通道处理器上下文移除,则处理移除事件。移除解码器,首先判断 解码器状态,如果解码器处于正在解码状态,则解码器状态置为正在移除,并返回,否则判断累计buf是否为空,如果为空,则置空,否则通知通道处理上下文,所属的Channle管道线上的下一通道Handler消费数据。
委托给handlerRemoved0方法完成实际的handler移除工作。
再来看channelReadComplete事件操作
//ChannelHandlerContext
//ChannelOutboundInvoker
//ChannelInboundInvoker
从上面可以看出,解码器的channelReadComplete方法,主要是执行通道处理器上下文read操作,
请求从通道读取数据到第一个Inbound字节buf中,如果读取到数据,触发ChannelInboundHandler#channelRead操作;并触发一个ChannelInboundHandler#channelReadComplete时间以便,处理能够决定是否继续读取数据,如果当前解码完成,则通知,管道中的下一个通道处理器的read方法处理数据,如果一个读操作正在放生,则此方法不做什么事情。并触发管道中下一个通道处理器的fireChannelReadComplete事件。
再来看通道处于非激活状态channelInactive,的相关操作:
从上面来看channelInactive事件操作,主要是关闭通道输入流,在关闭之前,如果累计buf不为空,调用callDecode方法,解码字节数,为消息对象,并放入解码消息集合out中,管道中的下一个通道处理器,消费解码消息集合中的消息;最后调用decodeRemovalReentryProtection做最后的解码工作和通道移除工作。
再来看触发用户事件
总结:
消息解码器ByteToMessageDecoder,内部有两个buf累计器,分别为MERGE_CUMULATOR累计器buf,累计buf的过程为,首先判断当前累计buf空间不够存储,需要整合的in buf,或当前buf的引用数大于1,或累计buf只可读,三个条件中,有一个满足,则扩展累计buf容量,然后写in buf字节序列到累计器,释放in buf;COMPOSITE_CUMULATOR累计器是将需要整合的buf放在,内部的Component集合中,每个Component描述一个buf信息。
解码器有一个累计buf cumulation用于存放接收的数据;一个累计器Cumulator,默认为
MERGE_CUMULATOR,累计接收的字节数据到cumulatio;first表示是否第一次累计字节到累计buf;decodeWasNull表示当前解码器是否成功解码消息,后续调用ByteBuf#discardSomeReadBytes方法;singleDecode表示解码器是否只能解码一次;numReads表示当前已经读取的字节数;discardAfterReads表示在读取多少个字节后,调用ByteBuf#discardSomeReadBytes方法,释放buf空间;解码器有三种状态,初始化状态STATE_INIT,还没有解码,正在解码状态STATE_CALLING_CHILD_DECODE,解码器正在从通道处理器上下文移除状态STATE_HANDLER_REMOVED_PENDING。
需要注意的是,解码器不可共享。
读取操作首先判断消息是否为字节buf,是则,创建解码消息List集合out,如果第一次累计字节buf,则累计buf为,消息buf,否则累计器,累计消息buf数据,然后调用解码器
#callDecode解码累计buf中的数据,并将解码后的消息添加到out集合中,并遍历解码消息集合,转发消息到Channle管道线上的下一个通道处理器。如果消息类型不是字节buf,直接通知Channle管道线上的下一个通道处理器消息消息。在解码的过程中,如果解码器从通道处理器上下文移除,则处理移除事件。移除解码器,首先判断 解码器状态,如果解码器处于正在解码状态,则解码器状态置为正在移除,并返回,否则判断累计buf是否为空,如果为空,则置空,否则通知通道处理上下文,所属的Channle管道线上的下一通道Handler消费数据。委托给handlerRemoved0方法完成实际的handler移除工作。
解码器的channelReadComplete方法,主要是执行通道处理器上下文read操作,请求从通道读取数据到第一个Inbound字节buf中,如果读取到数据,触发ChannelInboundHandler#channelRead操作;并触发一个ChannelInboundHandler#channelReadComplete时间以便,处理能够决定是否继续读取数据,如果当前解码完成,则通知,管道中的下一个通道处理器的read方法处理数据,如果一个读操作正在放生,则此方法不做什么事情。并触发管道中下一个通道处理器的fireChannelReadComplete事件。
channelInactive事件操作,主要是关闭通道输入流,在关闭之前,如果累计buf不为空,调用callDecode方法,解码字节数,为消息对象,并放入解码消息集合out中,管道中的下一个通道处理器,消费解码消息集合中的消息;最后调用decodeRemovalReentryProtection做最后的解码工作和通道移除工作。
消息解码器ByteToMessageDecoder实际上为Inbound通道处理器,这个与Mina中的消息解码器是有区别的,Mina中的消息解码器要和编码器组装成编解码工厂过滤器添加到过滤链上,且编解码工厂过滤器,在过滤链上是有先后顺序的,通道Mina中解码器和通道Handler是两个概念。
附:
下面的看看就行,有兴趣的可以研究一下:
//DelimiterBasedFrameDecoder
//FixedLengthFrameDecoder
//LineBasedFrameDecoder,可以理解为分隔符帧解码器的特殊形式
//LengthFieldBasedFrameDecoder
Netty Inbound/Outbound通道处理器定义:http://donald-draper.iteye.com/blog/2387019
netty 简单Inbound通道处理器(SimpleChannelInboundHandler):http://donald-draper.iteye.com/blog/2387772
netty 消息编码器-MessageToByteEncoder:http://donald-draper.iteye.com/blog/2387832
引言:
前一篇文章看了一下消息编码器MessageToByteEncoder,我们来回顾一下:
消息编码器MessageToByteEncoder实际上为一个Outbound通道处理器,内部有一个类型参数处理器TypeParameterMatcher,用于判断消息是否可以被当前编码器处理,不能则传给Channel管道线上的下一个通道处理器;一个preferDirect参数,用于决定,当将消息编码为字节序列时,应该存储在direct类型还是heap类型的字节buffer中。消息编码器主要方法为write方法,write方法首先,判断消息是否可以被当前编码器处理,如果消息可以被编码器处理,根据通道处理器上下文和preferDirect,分配一个字节buf,委托encode方法,编码消息对象到字节buf,encode方法待子类实现;释放消息对应引用参数,如果当前buffer可读,则通道处理器上下文写buffer,否则释放buffer,写空buf,最后释放buf。消息编码器MessageToByteEncoder实际上为一个Outbound通道处理器,这个与Mina中的消息编码器是有区别的,Mina中的消息编码器要和解码器组装成编解码工厂过滤器添加到过滤链上,且编解码工厂过滤器,在过滤链上是由先后顺序的,通道Mina中编码器和通道Handler是两个概念。而Netty中编码器实际为Outbound通道处理器,主要是通过类型参数匹配器TypeParameterMatcher,来判断消息是否可以被编码器处理。
今天来看一下消息解码器:
package io.netty.handler.codec; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.util.internal.StringUtil; import java.util.List; /** * {@link ChannelInboundHandlerAdapter} which decodes bytes in a stream-like fashion from one {@link ByteBuf} to an * other Message type. *j解码器ByteToMessageDecoder,从字节buf中,解码字节序列为消息对象。 * For example here is an implementation which reads all readable bytes from * the input {@link ByteBuf} and create a new {@link ByteBuf}. *下面是一个实例,读取输入字节buf数据,到一个新的buf中 * <pre> * public class SquareDecoder extends {@link ByteToMessageDecoder} { * {@code @Override} * public void decode({@link ChannelHandlerContext} ctx, {@link ByteBuf} in, List<Object> out) * throws {@link Exception} { * out.add(in.readBytes(in.readableBytes())); * } * } * </pre> * * <h3>Frame detection</h3> * <p>帧探测 * Generally frame detection should be handled earlier in the pipeline by adding a * {@link DelimiterBasedFrameDecoder}, {@link FixedLengthFrameDecoder}, {@link LengthFieldBasedFrameDecoder}, * or {@link LineBasedFrameDecoder}. 一般情况下,通过添加*FrameDecoder帧解码器,在管道中尽早地处理数据。 DelimiterBasedFrameDecoder:以某个分割符为一帧 FixedLengthFrameDecoder:以固定长度字节为一帧 LineBasedFrameDecoder:以换行符为一帧 LengthFieldBasedFrameDecoder:这种针对,协议有不同的部分组成,不如头部,数据部分,头部中包含数据长度信息的情况 * <p> * If a custom frame decoder is required, then one needs to be careful when implementing * one with {@link ByteToMessageDecoder}. Ensure there are enough bytes in the buffer for a * complete frame by checking {@link ByteBuf#readableBytes()}. If there are not enough bytes * for a complete frame, return without modifying the reader index to allow more bytes to arrive. 如果需要一个一般的帧解码器,当实现解码器时,必须要小心。可以通过ByteBuf#readableBytes确保一个完成 的帧有足够的数据。如果没有足够的数据,不要修改buf的read索引,直接返回。 * <p> * To check for complete frames without modifying the reader index, use methods like {@link ByteBuf#getInt(int)}. * One [b]MUST[/b] use the reader index when using methods like {@link ByteBuf#getInt(int)}. * For example calling <tt>in.getInt(0)</tt> is assuming the frame starts at the beginning of the buffer, which * is not always the case. Use <tt>in.getInt(in.readerIndex())</tt> instead. 在不修改buf的读位置的情况,可以通过ByteBuf#getInt方法,检查数据是否足够一帧。当使用过ByteBuf#getInt方法时, 必须使用buf的读位置。比如调用in.getInt(0),假设帧从buf的头开始,虽然大多数情况下不是这样,使用in.getInt(in.readerIndex()) 来代替。 * <h3>Pitfalls</h3> * <p>陷阱 * Be aware that sub-classes of {@link ByteToMessageDecoder} [b]MUST NOT[/b] * annotated with {@link @Sharable}. 需要注意的是,解码器不能被Sharable注解。 * <p> * Some methods such as {@link ByteBuf#readBytes(int)} will cause a memory leak if the returned buffer * is not released or added to the <tt>out</tt> {@link List}. Use derived buffers like {@link ByteBuf#readSlice(int)} * to avoid leaking memory. 如果返回或添加到解码消息集合out的buf,没有释放,ByteBuf#readBytes方法可能引起内存泄漏。 我们可以使用ByteBuf#readSlice(int),避免这个问题。 */ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter { /** * Cumulate {@link ByteBuf}s.累计字节buf */ public interface Cumulator { /** * Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes. * The implementation is responsible to correctly handle the life-cycle of the given {@link ByteBuf}s and so * call {@link ByteBuf#release()} if a {@link ByteBuf} is fully consumed. 累计给定的字节buf,返回存储累计字节序列的字节buf。具体的实现应该负责正确地处理给定字节buf的声明周期,如果 一个buf被完全消费完,应该调用 ByteBuf#release释放buf空间. */ ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in); } /** * Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies. MERGE_CUMULATOR累计器,使用内存拷贝,整合所有buf到一个字节buf。 */ public static final Cumulator MERGE_CUMULATOR = new Cumulator() { @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { final ByteBuf buffer; if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() || cumulation.refCnt() > 1 || cumulation.isReadOnly()) { //如果当前累计buf空间不够存储,需要整合的in buf,或当前buf的引用数大于1,或累计buf只可读 // Expand cumulation (by replace it) when either there is not more room in the buffer // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or // duplicate().retain() or if its read-only. // // See: // - https://github.com/netty/netty/issues/2327 // - https://github.com/netty/netty/issues/1764 //扩展累计buf空间 buffer = expandCumulation(alloc, cumulation, in.readableBytes()); } else { buffer = cumulation; } //写in buf字节序列到累计器 buffer.writeBytes(in); in.release();//释放in buf return buffer; } }; //扩展累计buf空间 static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) { ByteBuf oldCumulation = cumulation; //重新分配足够容量的buf cumulation = alloc.buffer(oldCumulation.readableBytes() + readable); //将原始累计buf中的数据,copy到新的累计buf中 cumulation.writeBytes(oldCumulation); oldCumulation.release();//释放原始累计buf空间 return cumulation; } /** * Cumulate {@link ByteBuf}s by add them to a {@link CompositeByteBuf} and so do no memory copy whenever possible. * Be aware that {@link CompositeByteBuf} use a more complex indexing implementation so depending on your use-case * and the decoder implementation this may be slower then just use the {@link #MERGE_CUMULATOR}. 累计字节buf到一个复合的字节buf中,并尽可能的不适用内存拷贝。注意复合buf,需要依赖于具体的使用场景,使用一个更为 复杂的索引实现,并且解码实现,有可能比#MERGE_CUMULATOR buf的效率要低。 */ public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() { @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { ByteBuf buffer; if (cumulation.refCnt() > 1) { //当累计buf引用数大于1时,在用户使用use slice().retain() or duplicate().retain()方法时,发生 // Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the user // use slice().retain() or duplicate().retain(). // // See: // - https://github.com/netty/netty/issues/2327 // - https://github.com/netty/netty/issues/1764 //扩展累计buf buffer = expandCumulation(alloc, cumulation, in.readableBytes()); //将in buf数据写到累计buf中 buffer.writeBytes(in); in.release();//释放in buf } else { CompositeByteBuf composite; //如果累计buf为Composite buf if (cumulation instanceof CompositeByteBuf) { //直接转换 composite = (CompositeByteBuf) cumulation; } else { //否则分配一个复合buf composite = alloc.compositeBuffer(Integer.MAX_VALUE); composite.addComponent(true, cumulation); } //写in buf数据到复合buf composite.addComponent(true, in); buffer = composite; } return buffer; } }; }
来看一下复合buf的定义
/** * A virtual buffer which shows multiple buffers as a single merged buffer. It is recommended to use * {@link ByteBufAllocator#compositeBuffer()} or {@link Unpooled#wrappedBuffer(ByteBuf...)} instead of calling the * constructor explicitly. 虚拟buf CompositeByteBuf将多个buf作为一个整合的buf。强烈建议使用ByteBufAllocator#compositeBuffer 或Unpooled#wrappedBuffer方法创建复合buf,而不是通过构造函数 */ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements Iterable<ByteBuf> { private static final ByteBuffer EMPTY_NIO_BUFFER = Unpooled.EMPTY_BUFFER.nioBuffer(); private static final Iterator<ByteBuf> EMPTY_ITERATOR = Collections.<ByteBuf>emptyList().iterator(); private final ByteBufAllocator alloc; private final boolean direct; private final List<Component> components;//存放buf数据,每个Component为一个buf private final int maxNumComponents; private boolean freed; //这个不具体展开了,当前只做了解 ... private static final class Component { final ByteBuf buf; final int length; int offset; int endOffset; Component(ByteBuf buf) { this.buf = buf; length = buf.readableBytes(); } void freeIfNecessary() { buf.release(); // We should not get a NPE here. If so, it must be a bug. } } }
从上面可以看出:
MERGE_CUMULATOR累计器buf,累计buf的过程为,首先判断当前累计buf空间不够存储,需要整合的in buf,或当前buf的引用数大于1,或累计buf只可读,三个条件中,有一个满足,则扩展累计buf容量,然后写in buf字节序列到累计器,释放in buf。COMPOSITE_CUMULATOR累计器是将需要整合的buf放在,内部的Component集合中,每个Component描述一个buf信息。
为了理解后面的读操作过程,再来看一下解码器的相关属性:
private static final byte STATE_INIT = 0;//解码器初话状态,还没有解码 private static final byte STATE_CALLING_CHILD_DECODE = 1;//正在解码 private static final byte STATE_HANDLER_REMOVED_PENDING = 2;//解码器正在从通道处理器上下文移除 ByteBuf cumulation; private Cumulator cumulator = MERGE_CUMULATOR;//累计器 private boolean singleDecode;//是否只能解码一次 private boolean decodeWasNull;//当前解码器是否成功解码消息,后续调用ByteBuf#discardSomeReadBytes方法 private boolean first;//是否第一次累计字节buf /** * A bitmask where the bits are defined as * [list] * [*]{@link #STATE_INIT} * [*]{@link #STATE_CALLING_CHILD_DECODE} * [*]{@link #STATE_HANDLER_REMOVED_PENDING} * [/list] */ private byte decodeState = STATE_INIT;//解码器状态 private int discardAfterReads = 16;//在读取多少个字节后,调用ByteBuf#discardSomeReadBytes方法,释放buf空间 private int numReads;//当前读取的字节数 /** * If set then only one message is decoded on each {@link #channelRead(ChannelHandlerContext, Object)} * call. This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up. *设置为true,在通道每次读操作时,只能有一个消息对象被解码。。 如果你需要升级协议,或确保没有任何数据混合,此参数非常有用。 * Default is {@code false} as this has performance impacts. 由于性能的影响,默认为false */ public void setSingleDecode(boolean singleDecode) { this.singleDecode = singleDecode; } /** * If {@code true} then only one message is decoded on each * {@link #channelRead(ChannelHandlerContext, Object)} call. *设置为true,在通道每次读操作时,只能有一个消息对象被解码。 * Default is {@code false} as this has performance impacts. 由于性能的影响,默认为false */ public boolean isSingleDecode() { return singleDecode; } /** * Set the {@link Cumulator} to use for cumulate the received {@link ByteBuf}s. 设置累积buf */ public void setCumulator(Cumulator cumulator) { if (cumulator == null) { throw new NullPointerException("cumulator"); } this.cumulator = cumulator; } /** * Set the number of reads after which {@link ByteBuf#discardSomeReadBytes()} are called and so free up memory. * The default is {@code 16}. 在读取discardAfterReads个字节数后,调用ByteBuf#discardSomeReadBytes方法,以便释放空间 */ public void setDiscardAfterReads(int discardAfterReads) { if (discardAfterReads <= 0) { throw new IllegalArgumentException("discardAfterReads must be > 0"); } this.discardAfterReads = discardAfterReads; } /** * Returns the actual number of readable bytes in the internal cumulative * buffer of this decoder. You usually do not need to rely on this value * to write a decoder. Use it only when you must use it at your own risk. * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}. 返回当前解码器内部累计buf的实际可读字节数。一般情况乡下不需要依赖于这个值。 当需要自己负责内部数据时,可以使用此方法。实际为内部buf的readableBytes方法的快捷访问路径 */ protected int actualReadableBytes() { return internalBuffer().readableBytes(); } /** * Returns the internal cumulative buffer of this decoder. You usually * do not need to access the internal buffer directly to write a decoder. * Use it only when you must use it at your own risk. 返回当前解码器的内部累计buf。通常情况下,不需要直接访问内部buf。使用最好在 自己可以控制风险的情况下。 */ protected ByteBuf internalBuffer() { if (cumulation != null) { return cumulation; } else { return Unpooled.EMPTY_BUFFER; } }
从上面来看,解码器有一个累计buf cumulation用于存放接收的数据;一个累计器Cumulator,默认为MERGE_CUMULATOR,累计接收的字节数据到cumulatio;first表示是否第一次累计字节到累计buf;decodeWasNull表示当前解码器是否成功解码消息,后续调ByteBuf#discardSomeReadBytes方法;singleDecode表示解码器是否只能解码一次;numReads表示当前已经读取的字节数;
discardAfterReads表示在读取多少个字节后,调用ByteBuf#discardSomeReadBytes方法,释放buf空间;解码器有三种状态,初始化状态STATE_INIT,还没有解码,正在解码状态STATE_CALLING_CHILD_DECODE,解码器正在从通道处理器上下文移除状态STATE_HANDLER_REMOVED_PENDING。
再来看构造
protected ByteToMessageDecoder() { //确保解码器不可共享,即没有被Sharable注解 ensureNotSharable(); }
再来看读操作
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { //创建解码消息List集合 CodecOutputList out = CodecOutputList.newInstance(); try { ByteBuf data = (ByteBuf) msg; first = cumulation == null; if (first) { //如果第一次累计字节buf,则累计buf为,消息buf cumulation = data; } else { //否则累计器,累计消息buf数据 cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } //解码累计buf callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { if (cumulation != null && !cumulation.isReadable()) { //如果累计buf不为空,且不可读,设置读取字节数为0,释放累计buf,并置空 numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) { // We did enough reads already try to discard some bytes so we not risk to see a OOME. // See https://github.com/netty/netty/issues/4275 //如果读取的字节数量大于discardAfterReads,则设置读取字节数为0 numReads = 0; //丢弃一些字节 discardSomeReadBytes(); } //获取解码消息集合size int size = out.size(); decodeWasNull = !out.insertSinceRecycled();//设置解码消息成功标志位 //遍历解码消息集合,转发消息到Channle管道线上的下一个通道处理器 fireChannelRead(ctx, out, size); out.recycle();//回收解码消息集合 } } else { //如果消息对象非字节buf,则通知Channle处理器上下文所在的Channel管道线上的下一个 //通道处理器消费消息 ctx.fireChannelRead(msg); } }
读取方法我们有一下几点待看:
1.
//创建解码消息List集合 CodecOutputList out = CodecOutputList.newInstance(); final class CodecOutputList extends AbstractList<Object> implements RandomAccess { private static final Recycler<CodecOutputList> RECYCLER = new Recycler<CodecOutputList>() { @Override protected CodecOutputList newObject(Handle<CodecOutputList> handle) { return new CodecOutputList(handle); } }; static CodecOutputList newInstance() { return RECYCLER.get(); } private final Recycler.Handle<CodecOutputList> handle;//结合回收器 private int size; // Size of 16 should be good enough for 99 % of all users. private Object[] array = new Object[16];//存放集合元素数据 private boolean insertSinceRecycled;//回收后是否插入元素 } 从上来看CodecOutputList实际为一个List集合
2.
//解码累计buf callDecode(ctx, cumulation, out);
/** * Called once data should be decoded from the given {@link ByteBuf}. This method will call * {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place. * * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to * @param in the {@link ByteBuf} from which to read data * @param out the {@link List} to which decoded messages should be added */ protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { while (in.isReadable()) {//如果字节buf中有数据 int outSize = out.size();//获取当前解码消息集合size if (outSize > 0) { //如果解码消息集合size大于0,即解码成功的消息,没有消费完,通知Channel管道线的 //下一个通道处理器消费消息 fireChannelRead(ctx, out, outSize); //清空解码消息集合 out.clear(); // Check if this handler was removed before continuing with decoding. // If it was removed, it is not safe to continue to operate on the buffer. //确保继续解码消息之前,通道处理器上下文没有移除,如果移除,继续操作buf是不安全的,则跳出解码循环 // See: // - https://github.com/netty/netty/issues/4635 if (ctx.isRemoved()) { break; } outSize = 0; } //获取buf中字节数量 int oldInputLength = in.readableBytes(); //解码字节buf中的数据,为消息对象,并放入out中,此过程如果解码器从通道处理器上下文移除,则处理移除事件 decodeRemovalReentryProtection(ctx, in, out); // Check if this handler was removed before continuing the loop. // If it was removed, it is not safe to continue to operate on the buffer. //确保继续解码消息之前,通道处理器上下文没有移除,如果移除,继续操作buf是不安全的,则跳出解码循环 // See https://github.com/netty/netty/issues/1664 if (ctx.isRemoved()) { break; } if (outSize == out.size()) { //如果解码消息集合size没变,且buf的字节数据没有被消费,则跳出解码循环 if (oldInputLength == in.readableBytes()) { break; } else { //如果解码消息集合size没变,且buf的字节数据没有被消费,则跳出当前解码循环 continue; } } if (oldInputLength == in.readableBytes()) { //buf的字节数据没有被消费,则抛出解码异常 throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } if (isSingleDecode()) { //如果只能解码消息一次,则跳出解码循环 break; } } } catch (DecoderException e) { throw e; } catch (Throwable cause) { throw new DecoderException(cause); } }
第二点有以下几点要看:
2.a
//如果解码消息集合size大于0,即解码成功的消息,没有消费完,通知Channel管道线的 //下一个通道处理器消费消息 fireChannelRead(ctx, out, outSize);
/** * Get {@code numElements} out of the {@link List} and forward these through the pipeline. */ static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) { if (msgs instanceof CodecOutputList) { //如果集合为解码消息集合,直接委托给fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements)方法 fireChannelRead(ctx, (CodecOutputList) msgs, numElements); } else { //否则遍历解码消息集合,转发消息到Channle管道线上的下一个通道处理器 for (int i = 0; i < numElements; i++) { ctx.fireChannelRead(msgs.get(i)); } } } /** * Get {@code numElements} out of the {@link CodecOutputList} and forward these through the pipeline. 遍历解码消息集合,转发消息到Channle管道线上的下一个通道处理器 */ static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) { for (int i = 0; i < numElements; i ++) { ctx.fireChannelRead(msgs.getUnsafe(i)); } }
//ChannelHandlerContext
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
//ChannelInboundInvoker
/** * A {@link Channel} received a message. * * This will result in having the {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)} * method called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. */ ChannelInboundInvoker fireChannelRead(Object msg);
2.b
decodeRemovalReentryProtection(ctx, in, out);
/** * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input * {@link ByteBuf}. *解码字节buf中的数据,为消息对象,并放入out中。此方法直到字节buf中没有任何数据可读时,停止解码。 * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to * @param in the {@link ByteBuf} from which to read data 字节buf,读取数据 * @param out the {@link List} to which decoded messages should be added 存放解码消息 * @throws Exception is thrown if an error occurs */ final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { decodeState = STATE_CALLING_CHILD_DECODE;//解码器状态为正在解码 try { //委托为decode方法,待子类实现 decode(ctx, in, out); } finally { boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;//通道处理器正在移除 decodeState = STATE_INIT;//恢复解码器状态为初始化状态 if (removePending) { //处理handler从通道处理器移除事件 handlerRemoved(ctx); } } } /** * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input * {@link ByteBuf}. *解码字节buf中的数据,为消息对象,并放入out中。此方法直到字节buf中没有任何数据可读时,停止解码。 * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to * @param in the {@link ByteBuf} from which to read data 字节buf,读取数据 * @param out the {@link List} to which decoded messages should be added 存放解码消息 * @throws Exception is thrown if an error occurs */ protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
再来看下面一句:
//处理handler从通道处理器移除事件 handlerRemoved(ctx);
@Override public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception { if (decodeState == STATE_CALLING_CHILD_DECODE) { //如果正在解码,则解码器状态为正在移除,并返回 decodeState = STATE_HANDLER_REMOVED_PENDING; return; } ByteBuf buf = cumulation; if (buf != null) { //如果累计buf不为空,置空,所以我们必须保证我们不再再其他地方访问 // Directly set this to null so we are sure we not access it in any other method here anymore. cumulation = null; int readable = buf.readableBytes(); if (readable > 0) { //如果累计buf中,仍有数据没被读取, ByteBuf bytes = buf.readBytes(readable); buf.release(); //通知通道处理上下文,所属的Channle管道线上的下一通道Handler消费数据。 ctx.fireChannelRead(bytes); } else { buf.release(); } //已读字节数置零 numReads = 0; //触发读操作完成事件 ctx.fireChannelReadComplete(); } //委托给handlerRemoved0完成实际的handler移除工作,待子类实现 handlerRemoved0(ctx); } /** * Gets called after the {@link ByteToMessageDecoder} was removed from the actual context and it doesn't handle * events anymore. 在解码器从实际上下移除时,不再处理任何事件,调用此方法。 */ protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { }
在来看这一句:
//触发读操作完成事件 ctx.fireChannelReadComplete();
//ChannelHandlerContext
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
//ChannelInboundInvoker
/** * Triggers an {@link ChannelInboundHandler#channelReadComplete(ChannelHandlerContext)} * event to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}. */ ChannelInboundInvoker fireChannelReadComplete();
3.
if (++ numReads >= discardAfterReads) { // We did enough reads already try to discard some bytes so we not risk to see a OOME. // See https://github.com/netty/netty/issues/4275 //如果读取的字节数量大于discardAfterReads,则设置读取字节数为0 numReads = 0; //丢弃一些字节 discardSomeReadBytes(); }
p
rotected final void discardSomeReadBytes() { if (cumulation != null && !first && cumulation.refCnt() == 1) { //如果累计buf不为空,且非第一次累计消息,累计buf引用为1 // discard some bytes if possible to make more room in the // buffer but only if the refCnt == 1 as otherwise the user may have // used slice().retain() or duplicate().retain(). //当前仅当用户使用slice().retain() or duplicate().retain()函数,累计buf引用为1 //为了是buf可以有更多的空间有用,丢弃一些字节 // See: // - https://github.com/netty/netty/issues/2327 // - https://github.com/netty/netty/issues/1764 cumulation.discardSomeReadBytes(); } }
//ByteBuf
/** * Similar to {@link ByteBuf#discardReadBytes()} except that this method might discard * some, all, or none of read bytes depending on its internal implementation to reduce * overall memory bandwidth consumption at the cost of potentially additional memory * consumption. */ public abstract ByteBuf discardSomeReadBytes();
4.
int size = out.size();//获取当前解码消息集合size decodeWasNull = !out.insertSinceRecycled();//设置解码消息成功标志位
//CodecOutputList
/** * Returns {@code true} if any elements where added or set. This will be reset once {@link #recycle()} was called. 是否有元素插入到解码消息集合中 */ boolean insertSinceRecycled() { return insertSinceRecycled; }
5.
//转发解码消息集合中的消息,到Channle管道线上的下一个通道处理器 fireChannelRead(ctx, out, size); out.recycle();//回收解码消息集合
/** * Get {@code numElements} out of the {@link List} and forward these through the pipeline. */ static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) { if (msgs instanceof CodecOutputList) { fireChannelRead(ctx, (CodecOutputList) msgs, numElements); } else { for (int i = 0; i < numElements; i++) { ctx.fireChannelRead(msgs.get(i)); } } } /** * Get {@code numElements} out of the {@link CodecOutputList} and forward these through the pipeline. 遍历解码消息集合,转发消息到Channle管道线上的下一个通道处理器 */ static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) { for (int i = 0; i < numElements; i ++) { ctx.fireChannelRead(msgs.getUnsafe(i)); } }
来看解码消息集合回收操作
//CodecOutputList
/** * Recycle the array which will clear it and null out all entries in the internal storage. //回收解码消息集合,实际为清空集合,并置空 */ void recycle() { for (int i = 0 ; i < size; i ++) { array[i] = null; } clear(); insertSinceRecycled = false; handle.recycle(this); }
小节一下读操作:
读取操作首先判断消息是否为字节buf,是则,创建解码消息List集合out,
如果第一次累计字节buf,则累计buf为,消息buf,否则累计器,累计消息buf数据,
然后调用解码器#callDecode解码累计buf中的数据,并将解码后的消息添加到out集合中,并
遍历解码消息集合,转发消息到Channle管道线上的下一个通道处理器。如果消息类型不是字节buf,直接通知Channle管道线上的下一个通道处理器消息消息。在解码的过程中,如果解码器从通道处理器上下文移除,则处理移除事件。移除解码器,首先判断 解码器状态,如果解码器处于正在解码状态,则解码器状态置为正在移除,并返回,否则判断累计buf是否为空,如果为空,则置空,否则通知通道处理上下文,所属的Channle管道线上的下一通道Handler消费数据。
委托给handlerRemoved0方法完成实际的handler移除工作。
再来看channelReadComplete事件操作
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //重置读取字节数,释放buf空间 numReads = 0; discardSomeReadBytes(); if (decodeWasNull) {//如果解码消息集合中没有消息 decodeWasNull = false; if (!ctx.channel().config().isAutoRead()) {//通道非自动读取, //执行通道处理器上下文read操作 ctx.read(); } } ctx.fireChannelReadComplete(); }
//ChannelHandlerContext
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
//ChannelOutboundInvoker
/** * Request to Read data from the {@link Channel} into the first inbound buffer, triggers an * {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)} event if data was * read, and triggers a * {@link ChannelInboundHandler#channelReadComplete(ChannelHandlerContext) channelReadComplete} event so the * handler can decide to continue reading. If there's a pending read operation already, this method does nothing. 请求从通道读取数据到第一个Inbound字节buf中,如果读取到数据,触发ChannelInboundHandler#channelRead操作; 并触发一个ChannelInboundHandler#channelReadComplete时间以便,处理能够决定是否继续读取数据。 如果一个读操作正在放生,则此方法不做什么事情。 * <p> * This will result in having the * {@link ChannelOutboundHandler#read(ChannelHandlerContext)} * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. 此方法将会通知,管道中的下一个通道处理器的read方法。 */ ChannelOutboundInvoker read();
//ChannelInboundInvoker
/** * Triggers an {@link ChannelInboundHandler#channelReadComplete(ChannelHandlerContext)} * event to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}. 触发管道中下一个通道处理器的fireChannelReadComplete事件。 */ ChannelInboundInvoker fireChannelReadComplete();
从上面可以看出,解码器的channelReadComplete方法,主要是执行通道处理器上下文read操作,
请求从通道读取数据到第一个Inbound字节buf中,如果读取到数据,触发ChannelInboundHandler#channelRead操作;并触发一个ChannelInboundHandler#channelReadComplete时间以便,处理能够决定是否继续读取数据,如果当前解码完成,则通知,管道中的下一个通道处理器的read方法处理数据,如果一个读操作正在放生,则此方法不做什么事情。并触发管道中下一个通道处理器的fireChannelReadComplete事件。
再来看通道处于非激活状态channelInactive,的相关操作:
@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //关闭通道输入流,并触发通道处理器上下文fireChannelInactive事件 channelInputClosed(ctx, true); }
//关闭通道输入流 private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) throws Exception { //创建解码消息集合 CodecOutputList out = CodecOutputList.newInstance(); try { // channelInputClosed(ctx, out); } catch (DecoderException e) { throw e; } catch (Exception e) { throw new DecoderException(e); } finally { try { if (cumulation != null) { //释放累计buf,并置空 cumulation.release(); cumulation = null; } int size = out.size(); //通知管道中的下一个通道处理器,消费解码消息集合中的消息 fireChannelRead(ctx, out, size); if (size > 0) { // Something was read, call fireChannelReadComplete() //触发通道处理器读操作完成事件 ctx.fireChannelReadComplete(); } if (callChannelInactive) { //如果需要触发通道处理器上下文fireChannelInactive事件,则触发 ctx.fireChannelInactive(); } } finally { // Recycle in all cases,回收解码消息集合 out.recycle(); } } }
/** * Called when the input of the channel was closed which may be because it changed to inactive or because of * {@link ChannelInputShutdownEvent}. 当通道输入流关闭,因为通道关闭事件,通道将要处于非激活状态,调用此方法 */ void channelInputClosed(ChannelHandlerContext ctx, List<Object> out) throws Exception { if (cumulation != null) { //解码字节数,为消息对象,并放入解码消息集合out中,管道中的下一个通道处理器,消费解码消息集合中的消息 callDecode(ctx, cumulation, out); decodeLast(ctx, cumulation, out); } else { decodeLast(ctx, Unpooled.EMPTY_BUFFER, out); }
/** * Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the * {@link #channelInactive(ChannelHandlerContext)} was triggered. *在通道处理器上下文即将处于非激活状态是,调用,及#channelInactive事件触发。 * By default this will just call {@link #decode(ChannelHandlerContext, ByteBuf, List)} but sub-classes may * override this for some special cleanup operation. 默认,调用#decode解码字节数据,子类可以重写做一些清理工作 */ protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.isReadable()) { // Only call decode() if there is something left in the buffer to decode. // See https://github.com/netty/netty/issues/4386 decodeRemovalReentryProtection(ctx, in, out); } }
从上面来看channelInactive事件操作,主要是关闭通道输入流,在关闭之前,如果累计buf不为空,调用callDecode方法,解码字节数,为消息对象,并放入解码消息集合out中,管道中的下一个通道处理器,消费解码消息集合中的消息;最后调用decodeRemovalReentryProtection做最后的解码工作和通道移除工作。
再来看触发用户事件
@Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof ChannelInputShutdownEvent) { // The decodeLast method is invoked when a channelInactive event is encountered. // This method is responsible for ending requests in some situations and must be called // when the input has been shutdown. //如果事件为关闭通道输入流事件,当channelInactive事件发生时,调用decodeLast方法, //主要负责结束一些场景的请求。 channelInputClosed(ctx, false); } super.userEventTriggered(ctx, evt); }
总结:
消息解码器ByteToMessageDecoder,内部有两个buf累计器,分别为MERGE_CUMULATOR累计器buf,累计buf的过程为,首先判断当前累计buf空间不够存储,需要整合的in buf,或当前buf的引用数大于1,或累计buf只可读,三个条件中,有一个满足,则扩展累计buf容量,然后写in buf字节序列到累计器,释放in buf;COMPOSITE_CUMULATOR累计器是将需要整合的buf放在,内部的Component集合中,每个Component描述一个buf信息。
解码器有一个累计buf cumulation用于存放接收的数据;一个累计器Cumulator,默认为
MERGE_CUMULATOR,累计接收的字节数据到cumulatio;first表示是否第一次累计字节到累计buf;decodeWasNull表示当前解码器是否成功解码消息,后续调用ByteBuf#discardSomeReadBytes方法;singleDecode表示解码器是否只能解码一次;numReads表示当前已经读取的字节数;discardAfterReads表示在读取多少个字节后,调用ByteBuf#discardSomeReadBytes方法,释放buf空间;解码器有三种状态,初始化状态STATE_INIT,还没有解码,正在解码状态STATE_CALLING_CHILD_DECODE,解码器正在从通道处理器上下文移除状态STATE_HANDLER_REMOVED_PENDING。
需要注意的是,解码器不可共享。
读取操作首先判断消息是否为字节buf,是则,创建解码消息List集合out,如果第一次累计字节buf,则累计buf为,消息buf,否则累计器,累计消息buf数据,然后调用解码器
#callDecode解码累计buf中的数据,并将解码后的消息添加到out集合中,并遍历解码消息集合,转发消息到Channle管道线上的下一个通道处理器。如果消息类型不是字节buf,直接通知Channle管道线上的下一个通道处理器消息消息。在解码的过程中,如果解码器从通道处理器上下文移除,则处理移除事件。移除解码器,首先判断 解码器状态,如果解码器处于正在解码状态,则解码器状态置为正在移除,并返回,否则判断累计buf是否为空,如果为空,则置空,否则通知通道处理上下文,所属的Channle管道线上的下一通道Handler消费数据。委托给handlerRemoved0方法完成实际的handler移除工作。
解码器的channelReadComplete方法,主要是执行通道处理器上下文read操作,请求从通道读取数据到第一个Inbound字节buf中,如果读取到数据,触发ChannelInboundHandler#channelRead操作;并触发一个ChannelInboundHandler#channelReadComplete时间以便,处理能够决定是否继续读取数据,如果当前解码完成,则通知,管道中的下一个通道处理器的read方法处理数据,如果一个读操作正在放生,则此方法不做什么事情。并触发管道中下一个通道处理器的fireChannelReadComplete事件。
channelInactive事件操作,主要是关闭通道输入流,在关闭之前,如果累计buf不为空,调用callDecode方法,解码字节数,为消息对象,并放入解码消息集合out中,管道中的下一个通道处理器,消费解码消息集合中的消息;最后调用decodeRemovalReentryProtection做最后的解码工作和通道移除工作。
消息解码器ByteToMessageDecoder实际上为Inbound通道处理器,这个与Mina中的消息解码器是有区别的,Mina中的消息解码器要和编码器组装成编解码工厂过滤器添加到过滤链上,且编解码工厂过滤器,在过滤链上是有先后顺序的,通道Mina中解码器和通道Handler是两个概念。
附:
下面的看看就行,有兴趣的可以研究一下:
//DelimiterBasedFrameDecoder
/** * A decoder that splits the received {@link ByteBuf}s by one or more * delimiters. It is particularly useful for decoding the frames which ends * with a delimiter such as {@link Delimiters#nulDelimiter() NUL} or * {@linkplain Delimiters#lineDelimiter() newline characters}. * * <h3>Predefined delimiters</h3> * <p> * {@link Delimiters} defines frequently used delimiters for convenience' sake. * * <h3>Specifying more than one delimiter</h3> * <p> * {@link DelimiterBasedFrameDecoder} allows you to specify more than one * delimiter. If more than one delimiter is found in the buffer, it chooses * the delimiter which produces the shortest frame. For example, if you have * the following data in the buffer: * <pre> * +--------------+ * | ABC\nDEF\r\n | * +--------------+ * </pre> * a {@link DelimiterBasedFrameDecoder}({@link Delimiters#lineDelimiter() Delimiters.lineDelimiter()}) * will choose {@code '\n'} as the first delimiter and produce two frames: * <pre> * +-----+-----+ * | ABC | DEF | * +-----+-----+ * </pre> * rather than incorrectly choosing {@code '\r\n'} as the first delimiter: * <pre> * +----------+ * | ABC\nDEF | * +----------+ * </pre> */ public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder { private final ByteBuf[] delimiters; private final int maxFrameLength; private final boolean stripDelimiter; private final boolean failFast; private boolean discardingTooLongFrame; private int tooLongFrameLength; /** Set only when decoding with "\n" and "\r\n" as the delimiter. */ private final LineBasedFrameDecoder lineBasedDecoder; ... }
//FixedLengthFrameDecoder
/** * A decoder that splits the received {@link ByteBuf}s by the fixed number * of bytes. For example, if you received the following four fragmented packets: * <pre> * +---+----+------+----+ * | A | BC | DEFG | HI | * +---+----+------+----+ * </pre> * A {@link FixedLengthFrameDecoder}{@code (3)} will decode them into the * following three packets with the fixed length: * <pre> * +-----+-----+-----+ * | ABC | DEF | GHI | * +-----+-----+-----+ * </pre> */ public class FixedLengthFrameDecoder extends ByteToMessageDecoder { private final int frameLength; ... }
//LineBasedFrameDecoder,可以理解为分隔符帧解码器的特殊形式
/** * A decoder that splits the received {@link ByteBuf}s on line endings. * <p> * Both {@code "\n"} and {@code "\r\n"} are handled. * For a more general delimiter-based decoder, see {@link DelimiterBasedFrameDecoder}. */ public class LineBasedFrameDecoder extends ByteToMessageDecoder { /** Maximum length of a frame we're willing to decode. */ private final int maxLength; /** Whether or not to throw an exception as soon as we exceed maxLength. */ private final boolean failFast; private final boolean stripDelimiter; /** True if we're discarding input because we're already over maxLength. */ private boolean discarding; private int discardedBytes; ... }
//LengthFieldBasedFrameDecoder
/** * A decoder that splits the received {@link ByteBuf}s dynamically by the * value of the length field in the message. It is particularly useful when you * decode a binary message which has an integer header field that represents the * length of the message body or the whole message. * <p> * {@link LengthFieldBasedFrameDecoder} has many configuration parameters so * that it can decode any message with a length field, which is often seen in * proprietary client-server protocols. Here are some example that will give * you the basic idea on which option does what. * * <h3>2 bytes length field at offset 0, do not strip header</h3> * * The value of the length field in this example is <tt>12 (0x0C)</tt> which * represents the length of "HELLO, WORLD". By default, the decoder assumes * that the length field represents the number of the bytes that follows the * length field. Therefore, it can be decoded with the simplistic parameter * combination. * <pre> * <b>lengthFieldOffset</b> = <b>0</b> * <b>lengthFieldLength</b> = <b>2</b> * lengthAdjustment = 0 * initialBytesToStrip = 0 (= do not strip header) * * BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes) * +--------+----------------+ +--------+----------------+ * | Length | Actual Content |----->| Length | Actual Content | * | 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" | * +--------+----------------+ +--------+----------------+ * </pre> * * <h3>2 bytes length field at offset 0, strip header</h3> * * Because we can get the length of the content by calling * {@link ByteBuf#readableBytes()}, you might want to strip the length * field by specifying <tt>initialBytesToStrip</tt>. In this example, we * specified <tt>2</tt>, that is same with the length of the length field, to * strip the first two bytes. * <pre> * lengthFieldOffset = 0 * lengthFieldLength = 2 * lengthAdjustment = 0 * <b>initialBytesToStrip</b> = <b>2</b> (= the length of the Length field) * * BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes) * +--------+----------------+ +----------------+ * | Length | Actual Content |----->| Actual Content | * | 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" | * +--------+----------------+ +----------------+ * </pre> * * <h3>2 bytes length field at offset 0, do not strip header, the length field * represents the length of the whole message</h3> * * In most cases, the length field represents the length of the message body * only, as shown in the previous examples. However, in some protocols, the * length field represents the length of the whole message, including the * message header. In such a case, we specify a non-zero * <tt>lengthAdjustment</tt>. Because the length value in this example message * is always greater than the body length by <tt>2</tt>, we specify <tt>-2</tt> * as <tt>lengthAdjustment</tt> for compensation. * <pre> * lengthFieldOffset = 0 * lengthFieldLength = 2 * <b>lengthAdjustment</b> = <b>-2</b> (= the length of the Length field) * initialBytesToStrip = 0 * * BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes) * +--------+----------------+ +--------+----------------+ * | Length | Actual Content |----->| Length | Actual Content | * | 0x000E | "HELLO, WORLD" | | 0x000E | "HELLO, WORLD" | * +--------+----------------+ +--------+----------------+ * </pre> * * <h3>3 bytes length field at the end of 5 bytes header, do not strip header</h3> * * The following message is a simple variation of the first example. An extra * header value is prepended to the message. <tt>lengthAdjustment</tt> is zero * again because the decoder always takes the length of the prepended data into * account during frame length calculation. * <pre> * <b>lengthFieldOffset</b> = <b>2</b> (= the length of Header 1) * <b>lengthFieldLength</b> = <b>3</b> * lengthAdjustment = 0 * initialBytesToStrip = 0 * * BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes) * +----------+----------+----------------+ +----------+----------+----------------+ * | Header 1 | Length | Actual Content |----->| Header 1 | Length | Actual Content | * | 0xCAFE | 0x00000C | "HELLO, WORLD" | | 0xCAFE | 0x00000C | "HELLO, WORLD" | * +----------+----------+----------------+ +----------+----------+----------------+ * </pre> * * <h3>3 bytes length field at the beginning of 5 bytes header, do not strip header</h3> * * This is an advanced example that shows the case where there is an extra * header between the length field and the message body. You have to specify a * positive <tt>lengthAdjustment</tt> so that the decoder counts the extra * header into the frame length calculation. * <pre> * lengthFieldOffset = 0 * lengthFieldLength = 3 * <b>lengthAdjustment</b> = <b>2</b> (= the length of Header 1) * initialBytesToStrip = 0 * * BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes) * +----------+----------+----------------+ +----------+----------+----------------+ * | Length | Header 1 | Actual Content |----->| Length | Header 1 | Actual Content | * | 0x00000C | 0xCAFE | "HELLO, WORLD" | | 0x00000C | 0xCAFE | "HELLO, WORLD" | * +----------+----------+----------------+ +----------+----------+----------------+ * </pre> * * <h3>2 bytes length field at offset 1 in the middle of 4 bytes header, * strip the first header field and the length field</h3> * * This is a combination of all the examples above. There are the prepended * header before the length field and the extra header after the length field. * The prepended header affects the <tt>lengthFieldOffset</tt> and the extra * header affects the <tt>lengthAdjustment</tt>. We also specified a non-zero * <tt>initialBytesToStrip</tt> to strip the length field and the prepended * header from the frame. If you don't want to strip the prepended header, you * could specify <tt>0</tt> for <tt>initialBytesToSkip</tt>. * <pre> * lengthFieldOffset = 1 (= the length of HDR1) * lengthFieldLength = 2 * <b>lengthAdjustment</b> = <b>1</b> (= the length of HDR2) * <b>initialBytesToStrip</b> = <b>3</b> (= the length of HDR1 + LEN) * * BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes) * +------+--------+------+----------------+ +------+----------------+ * | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content | * | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" | * +------+--------+------+----------------+ +------+----------------+ * </pre> * * <h3>2 bytes length field at offset 1 in the middle of 4 bytes header, * strip the first header field and the length field, the length field * represents the length of the whole message</h3> * * Let's give another twist to the previous example. The only difference from * the previous example is that the length field represents the length of the * whole message instead of the message body, just like the third example. * We have to count the length of HDR1 and Length into <tt>lengthAdjustment</tt>. * Please note that we don't need to take the length of HDR2 into account * because the length field already includes the whole header length. * <pre> * lengthFieldOffset = 1 * lengthFieldLength = 2 * <b>lengthAdjustment</b> = <b>-3</b> (= the length of HDR1 + LEN, negative) * <b>initialBytesToStrip</b> = <b> 3</b> * * BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes) * +------+--------+------+----------------+ +------+----------------+ * | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content | * | 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" | * +------+--------+------+----------------+ +------+----------------+ * </pre> * @see LengthFieldPrepender */ public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder { private final ByteOrder byteOrder; private final int maxFrameLength; private final int lengthFieldOffset; private final int lengthFieldLength; private final int lengthFieldEndOffset; private final int lengthAdjustment; private final int initialBytesToStrip; private final boolean failFast; private boolean discardingTooLongFrame; private long tooLongFrameLength; private long bytesToDiscard; ... }
发表评论
-
netty NioSocketChannel解析
2017-09-29 12:50 1321netty 抽象BootStrap定义:http://dona ... -
netty Pooled字节buf分配器
2017-09-28 13:00 2057netty 字节buf定义:http://donald-dra ... -
netty Unpooled字节buf分配器
2017-09-26 22:01 2444netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf分配器
2017-09-26 08:43 1317netty 字节buf定义:http:// ... -
netty 复合buf概念
2017-09-25 22:31 1310netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf引用计数器
2017-09-22 12:48 1594netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf解析
2017-09-22 09:00 1844netty 通道接口定义:http://donald-drap ... -
netty 资源泄漏探测器
2017-09-21 09:37 1397netty 通道接口定义:http://donald-drap ... -
netty 字节buf定义
2017-09-20 08:31 2834netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置后续
2017-09-18 08:36 2177netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置初始化
2017-09-17 22:51 2037netty 通道接口定义:http://donald-drap ... -
netty 通道配置接口定义
2017-09-17 14:51 1078netty 通道接口定义:http://donald-drap ... -
netty NioServerSocketChannel解析
2017-09-16 13:01 1877netty ServerBootStrap解析:http:// ... -
netty 抽象nio消息通道
2017-09-15 15:30 1219netty 通道接口定义:http:/ ... -
netty 抽象nio字节通道
2017-09-14 22:39 1202netty 通道接口定义:http:/ ... -
netty 抽象nio通道解析
2017-09-14 17:23 958netty 通道接口定义:http://donald-drap ... -
netty 抽象通道后续
2017-09-13 22:40 1309netty Inboudn/Outbound通道Inv ... -
netty 通道Outbound缓冲区
2017-09-13 14:31 2189netty 通道接口定义:http:/ ... -
netty 抽象Unsafe定义
2017-09-12 21:24 1077netty 通道接口定义:http:/ ... -
netty 抽象通道初始化
2017-09-11 12:56 1855netty 管道线定义-ChannelPipeline:htt ...
相关推荐
9. **高效的编码解码器**:Netty提供了丰富的编码解码器,如ByteToMessageDecoder和MessageToByteEncoder,用于将网络数据转换为应用程序可处理的对象。 10. **ChannelFuture和Promise**:这些概念是Netty异步编程...
2. **高效的数据编码与解码**:Netty提供了多种编解码器,如ByteToMessageDecoder和MessageToByteEncoder,以及各种协议的预定义编解码器,如HTTP、TCP、UDP等,简化了数据传输的处理。 3. **高度可定制的事件处理...
例如,解码器(Decoder)、编码器(Encoder)和业务逻辑处理器等。 在 "netty-demo-pipeline-channelHandler" 示例中,我们可能看到如何配置和使用 ChannelPipeline 和 ChannelHandler。通常,这个例子会包括以下...
4. **编码解码器**:根据协议需求选择合适的编码解码器,如ByteToMessageDecoder和MessageToByteEncoder,以正确处理网络数据传输。 5. **性能优化**:利用Netty的零拷贝特性,减少数据复制,提高性能。 6. **异常...
2. **高度可定制性**: Netty提供了多种编解码器,如LengthFieldBasedFrameDecoder用于处理固定长度或可变长度的数据帧,ByteToMessageDecoder和MessageToByteEncoder则可以自定义数据解析和序列化方式。 3. **零...
7. **编码和解码**:Netty提供了一系列的编解码器,如ByteToMessageDecoder和MessageToByteEncoder,方便用户自定义协议的解析和组装。 8. **心跳机制**:Netty可以通过添加心跳Handler来实现客户端和服务端之间的...
- **编解码器**:提供了一系列的编码器和解码器,如ByteToMessageDecoder、MessageToByteEncoder,方便对各种数据格式进行转换。 - **灵活性**:Netty支持多种协议栈,如HTTP、WebSocket、FTP等,且易于扩展新协议...
解码器通常会实现 `LengthFieldBasedFrameDecoder` 或者 `ByteToMessageDecoder`,而编码器则可以实现 `LengthFieldPrepender` 或者 `MessageToByteEncoder`。在解码器中,我们需要解析输入缓冲区并构建出 `Custom...
2. **高度可定制的编解码器**:Netty提供了丰富的编解码器,如ByteToMessageDecoder和MessageToByteEncoder,用于将原始字节流转换为有意义的应用层消息,反之亦然。用户还可以自定义编解码器以适应特定的协议需求。...
4. `netty-codec`: 提供了各种编解码器,如ByteToMessageDecoder和MessageToByteEncoder。 5. `netty-codec-http`和`netty-codec-http2`: 对HTTP/1.x和HTTP/2协议的支持。 6. `netty-handler`: 包含了处理网络事件的...
- Netty 提供多种预定义的编解码器,如 ByteToMessageDecoder 和 MessageToByteEncoder,用于将原始字节流转换为业务对象,反之亦然。 6. **HTTP/HTTPS 支持**: - Netty 支持 HTTP 协议,包括 HTTP/1.x 和 ...
2. **netty-codec-4.1.19.Final-sources.jar**:这是Netty的编码解码模块,提供了各种协议的编码器和解码器,例如ByteToMessageDecoder和MessageToByteEncoder,它们帮助我们将数据转换为可以发送在网络上的格式,并...
8. **编解码器**:Netty提供了许多预定义的编解码器,如ByteToMessageDecoder和MessageToByteEncoder,用于将字节流转换为对象,以及将对象转换回字节流。 9. **Future和Promise**:Netty中的异步编程模型,Future...
Netty 提供了多种预定义的编码器和解码器,如 ByteToMessageDecoder 和 MessageToByteEncoder,它们分别用于处理字节流到消息对象和消息对象到字节流的转换。例如,我们可能需要将Java对象编码为字节流发送到网络,...
5. **编码与解码**: Netty提供多种编解码器,如ByteToMessageDecoder和MessageToByteEncoder,用于处理不同格式的数据,如文本、二进制、protobuf等。 6. **自定义协议支持**: Netty的灵活性在于可以根据需求定制...
3. **编码与解码**:Netty提供了丰富的编码器和解码器,如ByteToMessageDecoder和MessageToByteEncoder,用于在网络通信中进行数据转换。了解如何自定义编码解码器以适应特定的协议格式。 4. **协议支持**:Netty...
Netty 提供了一系列预定义的 ChannelHandler,如 ByteToMessageDecoder 和 MessageToByteEncoder,用于解码和编码网络数据。这些处理器使得数据在网络中的传输变得简单,例如,它可以轻松地处理 HTTP 请求和响应,...
无论是内置的编解码器,如ByteToMessageDecoder和MessageToByteEncoder,还是用户自定义的编码器,都是Netty高效通信的关键组成部分。 通过本章的学习,读者将深入理解Netty如何处理网络通信中的数据问题,并掌握...
对于这些协议,Netty 提供了相应的编解码器,如 HttpObjectDecoder 和 WebSocketFrameDecoder,它们使得处理这些复杂协议变得非常简单。 此外,Netty 的性能优化也是其受欢迎的原因之一。例如,零拷贝技术通过避免...
在Netty中,编解码器是处理数据转换的关键组件,它们将原始的字节流转换为应用程序可理解的消息格式,反之亦然。本文将深入探讨Netty中的编解码器框架。 首先,我们需要理解编码器和解码器的基本概念。编码器负责将...