- 浏览: 984519 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
Netty 通道处理器ChannelHandler和适配器定义ChannelHandlerAdapter:http://donald-draper.iteye.com/blog/2386891
Netty Inbound/Outbound通道处理器定义:http://donald-draper.iteye.com/blog/2387019
netty 简单Inbound通道处理器(SimpleChannelInboundHandler):http://donald-draper.iteye.com/blog/2387772
引言:
前一篇文章我们看了简单Inbound通道处理器(SimpleChannelInboundHandler),先来看回顾一下:
简单Inbound通道处理器SimpleChannelInboundHandler<I>,内部有连个变量一个为参数类型匹配器,用来判断通道是否可以处理消息,另一个变量autoRelease,用于控制是否在通道处理消息完毕时,释放消息。读取方法channelRead,首先判断跟定的消息类型是否可以被处理,如果是,则委托给channelRead0,channelRead0待子类实现;如果返回false,则将消息转递给Channel管道线的下一个通道处理器;最后,如果autoRelease为自动释放消息,且消息已处理则释放消息。
今天我们来看一下消息编码器MessageToByteEncoder:
我们来分析一下写消息对象
//写消息对象
有一下代码片段我们需要注意:
//ByteBuf
字节buf我们后面有时间再说。
//Unpooled
//ChannelHandlerContext
//ChannelOutboundInvoker
再来简单看一字节buf分配函数:
//ChannelHandlerContext
//ByteBufAllocator
分配字节buf,实际委托给通道处理器上下文的ByteBufAllocator。
总结:
消息编码器MessageToByteEncoder实际上为一个Outbound通道处理器,内部有一个类型参数处理器TypeParameterMatcher,用于判断消息是否可以被当前编码器处理,不能则传给Channel管道线上的下一个通道处理器;一个preferDirect参数,用于决定,当将消息编码为字节序列时,应该存储在direct类型还是heap类型的字节buffer中。消息编码器主要方法为write方法,write方法首先,判断消息是否可以被当前编码器处理,如果消息可以被编码器处理,根据通道处理器上下文和preferDirect,分配一个字节buf,委托encode方法,编码消息对象到字节buf,encode方法待子类实现;释放消息对应引用参数,如果当前buffer可读,则通道处理器上下文写buffer,否则释放buffer,写空buf,最后释放buf。消息编码器MessageToByteEncoder实际上为一个Outbound通道处理器,这个与Mina中的消息编码器是有区别的,Mina中的消息编码器要和解码器组装成编解码工厂过滤器添加到过滤链上,且编解码工厂过滤器,在过滤链上是由先后顺序的,通道Mina中编码器和通道Handler是两个概念。而Netty中编码器实际为Outbound通道处理器,主要是通过类型参数匹配器TypeParameterMatcher,来判断消息是否可以被编码器处理。
Netty Inbound/Outbound通道处理器定义:http://donald-draper.iteye.com/blog/2387019
netty 简单Inbound通道处理器(SimpleChannelInboundHandler):http://donald-draper.iteye.com/blog/2387772
引言:
前一篇文章我们看了简单Inbound通道处理器(SimpleChannelInboundHandler),先来看回顾一下:
简单Inbound通道处理器SimpleChannelInboundHandler<I>,内部有连个变量一个为参数类型匹配器,用来判断通道是否可以处理消息,另一个变量autoRelease,用于控制是否在通道处理消息完毕时,释放消息。读取方法channelRead,首先判断跟定的消息类型是否可以被处理,如果是,则委托给channelRead0,channelRead0待子类实现;如果返回false,则将消息转递给Channel管道线的下一个通道处理器;最后,如果autoRelease为自动释放消息,且消息已处理则释放消息。
今天我们来看一下消息编码器MessageToByteEncoder:
package io.netty.handler.codec; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.TypeParameterMatcher; /** * {@link ChannelOutboundHandlerAdapter} which encodes message in a stream-like fashion from one message to an * {@link ByteBuf}. *消息编码器MessageToByteEncoder,将高层消息对象编码为底层字节流。消息编码器实际为Outbound通道处理器。 * * Example implementation which encodes {@link Integer}s to a {@link ByteBuf}. *下面是一个编码整数到字节缓冲的例子 * <pre> * public class IntegerEncoder extends {@link MessageToByteEncoder}<{@link Integer}> { * {@code @Override} * public void encode({@link ChannelHandlerContext} ctx, {@link Integer} msg, {@link ByteBuf} out) * throws {@link Exception} { * out.writeInt(msg); * } * } * </pre> */ public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter { private final TypeParameterMatcher matcher;//类型参数匹配器 private final boolean preferDirect;//是否使用Direct类型buf /** * see {@link #MessageToByteEncoder(boolean)} with {@code true} as boolean parameter. */ protected MessageToByteEncoder() { this(true);//默认使用direct类型buffer,关于direct类型buffer我们在java nio相关篇有说 } /** * see {@link #MessageToByteEncoder(Class, boolean)} with {@code true} as boolean value. 根据消息类型构造消息编码器 */ protected MessageToByteEncoder(Class<? extends I> outboundMessageType) { this(outboundMessageType, true); } /** * Create a new instance which will try to detect the types to match out of the type parameter of the class. *创建一个将会尝试探测消息类型是否匹配类型参数匹配器编码实例 * @param preferDirect {@code true} if a direct {@link ByteBuf} should be tried to be used as target for * the encoded messages. If {@code false} is used it will allocate a heap * {@link ByteBuf}, which is backed by an byte array. 如果用direct类型buffer,存储消息编码后的字节流,则preferDirect为true。如果preferDirect为false将分配一个 堆类型buffer,用一个可backed的字节数组存储消息编码后的字节流。 */ protected MessageToByteEncoder(boolean preferDirect) { //这句话,看过前面的文章,应该很好理解,获取消息编码,类型参数名I对应的类型参数匹配器 matcher = TypeParameterMatcher.find(this, MessageToByteEncoder.class, "I"); this.preferDirect = preferDirect; } /** * Create a new instance * * @param outboundMessageType The type of messages to match * @param preferDirect {@code true} if a direct {@link ByteBuf} should be tried to be used as target for * the encoded messages. If {@code false} is used it will allocate a heap * {@link ByteBuf}, which is backed by an byte array. */ protected MessageToByteEncoder(Class<? extends I> outboundMessageType, boolean preferDirect) { //获取消息类型outboundMessageType对应的类型参数匹配器 matcher = TypeParameterMatcher.get(outboundMessageType); this.preferDirect = preferDirect; } /** * Returns {@code true} if the given message should be handled. If {@code false} it will be passed to the next * {@link ChannelOutboundHandler} in the {@link ChannelPipeline}. 如果给定的消息可以被处理,则返回true。如果返回false,则将消息传到给Channel管道线上的下一个Outbound处理器。 */ public boolean acceptOutboundMessage(Object msg) throws Exception { //通过类型参数匹配器,判断消息是否可以被处理 return matcher.match(msg); } //写消息对象 @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = null; try { if (acceptOutboundMessage(msg)) {//如果消息可以被编码器处理 @SuppressWarnings("unchecked") I cast = (I) msg; //根据通道处理器上下文和preferDirect,分配一个字节buf buf = allocateBuffer(ctx, cast, preferDirect); try { //编码消息对象到字节buf encode(ctx, cast, buf); } finally { //释放消息对应引用参数 ReferenceCountUtil.release(cast); } //如果当前buffer可读,则通道处理器上下文写buffer,并将结果存储在promise中 if (buf.isReadable()) { ctx.write(buf, promise); } else { //否则释放buffer,写空buf buf.release(); ctx.write(Unpooled.EMPTY_BUFFER, promise); } buf = null; } else { ctx.write(msg, promise); } } catch (EncoderException e) { throw e; } catch (Throwable e) { throw new EncoderException(e); } finally { if (buf != null) { //释放buf buf.release(); } } } /** * Allocate a {@link ByteBuf} which will be used as argument of {@link #encode(ChannelHandlerContext, I, ByteBuf)}. * Sub-classes may override this method to return {@link ByteBuf} with a perfect matching {@code initialCapacity}. 分配一个字节buffer,用于#encode方法中ByteBuf参数,子类可以重写此方法,返回一个合适的初始化容量的Direct类型的buffer。 */ protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg, boolean preferDirect) throws Exception { if (preferDirect) { //返回一个direct类型的buffer return ctx.alloc().ioBuffer(); } else { //返回一个heap类型buffer return ctx.alloc().heapBuffer(); } } /** * Encode a message into a {@link ByteBuf}. This method will be called for each written message that can be handled * by this encoder. *编码消息到字节buf。当写消息可以被当前编码器处理时,调用此方法。 * @param ctx the {@link ChannelHandlerContext} which this {@link MessageToByteEncoder} belongs to 消息编码器所属的通道处理器上下文 * @param msg the message to encode 需要编码的消息 * @param out the {@link ByteBuf} into which the encoded message will be written 字节buffer * @throws Exception is thrown if an error occurs */ protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception; //获取是否使用direct类型的buffer存储消息编码后的字节序列 protected boolean isPreferDirect() { return preferDirect; } }
我们来分析一下写消息对象
//写消息对象
@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = null; try { if (acceptOutboundMessage(msg)) {//如果消息可以被编码器处理 @SuppressWarnings("unchecked") I cast = (I) msg; //根据通道处理器上下文和preferDirect,分配一个字节buf buf = allocateBuffer(ctx, cast, preferDirect); try { //编码消息对象到字节buf encode(ctx, cast, buf); } finally { //释放消息对应引用参数 ReferenceCountUtil.release(cast); } //如果当前buffer可读,则通道处理器上下文写buffer if (buf.isReadable()) { ctx.write(buf, promise); } else { //否则释放buffer,写空buf buf.release(); ctx.write(Unpooled.EMPTY_BUFFER, promise); } buf = null; } else { ctx.write(msg, promise); } } catch (EncoderException e) { throw e; } catch (Throwable e) { throw new EncoderException(e); } finally { if (buf != null) { //释放buf buf.release(); } } }
有一下代码片段我们需要注意:
//如果当前buffer可读,则通道处理器上下文写buffer,并将结果存储在promise中 if (buf.isReadable()) { ctx.write(buf, promise); } else { //否则释放buffer,写空buf buf.release(); ctx.write(Unpooled.EMPTY_BUFFER, promise); }
//ByteBuf
/** * Returns {@code true} * if and only if {@code (this.writerIndex - this.readerIndex)} is greater * than {@code 0}. 字节buf的写索引大于读索引,则可读 */ public abstract boolean isReadable();
字节buf我们后面有时间再说。
//Unpooled
private static final ByteBufAllocator ALLOC = UnpooledByteBufAllocator.DEFAULT; /** * A buffer whose capacity is {@code 0}. 空容量的字节buf,这个在后面跟字节buf一起将 */ public static final ByteBuf EMPTY_BUFFER = ALLOC.buffer(0, 0);
//ChannelHandlerContext
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
//ChannelOutboundInvoker
/** * Request to write a message via this {@link ChannelHandlerContext} through the {@link ChannelPipeline}. * This method will not request to actual flush, so be sure to call {@link #flush()} * once you want to request to flush all pending data to the actual transport. 通过Channel处理器上下文,请求写一个消息到Channle管道线。如果方法没有实际地刷新,如果你想请求 刷新所有等待发送的数据到实际transport,必须调用flush方法一次。 */ ChannelFuture write(Object msg, ChannelPromise promise);
再来简单看一字节buf分配函数:
protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg, boolean preferDirect) throws Exception { if (preferDirect) { //返回一个direct类型的buffer return ctx.alloc().ioBuffer(); } else { //返回一个heap类型buffer return ctx.alloc().heapBuffer(); } }
//ChannelHandlerContext
/** * Return the assigned {@link ByteBufAllocator} which will be used to allocate {@link ByteBuf}s. */ ByteBufAllocator alloc();
//ByteBufAllocator
public interface ByteBufAllocator { ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR; /** * Allocate a {@link ByteBuf}, preferably a direct buffer which is suitable for I/O. */ ByteBuf ioBuffer(); /** * Allocate a heap {@link ByteBuf}. */ ByteBuf heapBuffer(); ... }
分配字节buf,实际委托给通道处理器上下文的ByteBufAllocator。
总结:
消息编码器MessageToByteEncoder实际上为一个Outbound通道处理器,内部有一个类型参数处理器TypeParameterMatcher,用于判断消息是否可以被当前编码器处理,不能则传给Channel管道线上的下一个通道处理器;一个preferDirect参数,用于决定,当将消息编码为字节序列时,应该存储在direct类型还是heap类型的字节buffer中。消息编码器主要方法为write方法,write方法首先,判断消息是否可以被当前编码器处理,如果消息可以被编码器处理,根据通道处理器上下文和preferDirect,分配一个字节buf,委托encode方法,编码消息对象到字节buf,encode方法待子类实现;释放消息对应引用参数,如果当前buffer可读,则通道处理器上下文写buffer,否则释放buffer,写空buf,最后释放buf。消息编码器MessageToByteEncoder实际上为一个Outbound通道处理器,这个与Mina中的消息编码器是有区别的,Mina中的消息编码器要和解码器组装成编解码工厂过滤器添加到过滤链上,且编解码工厂过滤器,在过滤链上是由先后顺序的,通道Mina中编码器和通道Handler是两个概念。而Netty中编码器实际为Outbound通道处理器,主要是通过类型参数匹配器TypeParameterMatcher,来判断消息是否可以被编码器处理。
发表评论
-
netty NioSocketChannel解析
2017-09-29 12:50 1324netty 抽象BootStrap定义:http://dona ... -
netty Pooled字节buf分配器
2017-09-28 13:00 2063netty 字节buf定义:http://donald-dra ... -
netty Unpooled字节buf分配器
2017-09-26 22:01 2453netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf分配器
2017-09-26 08:43 1320netty 字节buf定义:http:// ... -
netty 复合buf概念
2017-09-25 22:31 1319netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf引用计数器
2017-09-22 12:48 1600netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf解析
2017-09-22 09:00 1848netty 通道接口定义:http://donald-drap ... -
netty 资源泄漏探测器
2017-09-21 09:37 1400netty 通道接口定义:http://donald-drap ... -
netty 字节buf定义
2017-09-20 08:31 2840netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置后续
2017-09-18 08:36 2186netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置初始化
2017-09-17 22:51 2042netty 通道接口定义:http://donald-drap ... -
netty 通道配置接口定义
2017-09-17 14:51 1082netty 通道接口定义:http://donald-drap ... -
netty NioServerSocketChannel解析
2017-09-16 13:01 1880netty ServerBootStrap解析:http:// ... -
netty 抽象nio消息通道
2017-09-15 15:30 1222netty 通道接口定义:http:/ ... -
netty 抽象nio字节通道
2017-09-14 22:39 1205netty 通道接口定义:http:/ ... -
netty 抽象nio通道解析
2017-09-14 17:23 960netty 通道接口定义:http://donald-drap ... -
netty 抽象通道后续
2017-09-13 22:40 1313netty Inboudn/Outbound通道Inv ... -
netty 通道Outbound缓冲区
2017-09-13 14:31 2193netty 通道接口定义:http:/ ... -
netty 抽象Unsafe定义
2017-09-12 21:24 1085netty 通道接口定义:http:/ ... -
netty 抽象通道初始化
2017-09-11 12:56 1859netty 管道线定义-ChannelPipeline:htt ...
相关推荐
9. **高效的编码解码器**:Netty提供了丰富的编码解码器,如ByteToMessageDecoder和MessageToByteEncoder,用于将网络数据转换为应用程序可处理的对象。 10. **ChannelFuture和Promise**:这些概念是Netty异步编程...
2. **高效的数据编码与解码**:Netty提供了多种编解码器,如ByteToMessageDecoder和MessageToByteEncoder,以及各种协议的预定义编解码器,如HTTP、TCP、UDP等,简化了数据传输的处理。 3. **高度可定制的事件处理...
例如,解码器(Decoder)、编码器(Encoder)和业务逻辑处理器等。 在 "netty-demo-pipeline-channelHandler" 示例中,我们可能看到如何配置和使用 ChannelPipeline 和 ChannelHandler。通常,这个例子会包括以下...
在文件名称列表中,"netty-all-5.0.0"是一个JAR文件,它包含了Netty框架的所有组件,包括各种处理器、编码器、解码器和事件循环等。这个单一的JAR文件简化了依赖管理,使得开发者可以方便地引入Netty的所有功能。 ...
7. **编码和解码**:Netty提供了一系列的编解码器,如ByteToMessageDecoder和MessageToByteEncoder,方便用户自定义协议的解析和组装。 8. **心跳机制**:Netty可以通过添加心跳Handler来实现客户端和服务端之间的...
- **编解码器**:提供了一系列的编码器和解码器,如ByteToMessageDecoder、MessageToByteEncoder,方便对各种数据格式进行转换。 - **灵活性**:Netty支持多种协议栈,如HTTP、WebSocket、FTP等,且易于扩展新协议...
2. **netty-codec-4.1.19.Final-sources.jar**:这是Netty的编码解码模块,提供了各种协议的编码器和解码器,例如ByteToMessageDecoder和MessageToByteEncoder,它们帮助我们将数据转换为可以发送在网络上的格式,并...
8. **编解码器**:Netty提供了许多预定义的编解码器,如ByteToMessageDecoder和MessageToByteEncoder,用于将字节流转换为对象,以及将对象转换回字节流。 9. **Future和Promise**:Netty中的异步编程模型,Future...
- Netty 提供多种预定义的编解码器,如 ByteToMessageDecoder 和 MessageToByteEncoder,用于将原始字节流转换为业务对象,反之亦然。 6. **HTTP/HTTPS 支持**: - Netty 支持 HTTP 协议,包括 HTTP/1.x 和 ...
Netty 提供了多种预定义的编码器和解码器,如 ByteToMessageDecoder 和 MessageToByteEncoder,它们分别用于处理字节流到消息对象和消息对象到字节流的转换。例如,我们可能需要将Java对象编码为字节流发送到网络,...
解码器通常会实现 `LengthFieldBasedFrameDecoder` 或者 `ByteToMessageDecoder`,而编码器则可以实现 `LengthFieldPrepender` 或者 `MessageToByteEncoder`。在解码器中,我们需要解析输入缓冲区并构建出 `Custom...
5. **编码与解码**: Netty提供多种编解码器,如ByteToMessageDecoder和MessageToByteEncoder,用于处理不同格式的数据,如文本、二进制、protobuf等。 6. **自定义协议支持**: Netty的灵活性在于可以根据需求定制...
Netty 提供了一系列预定义的 ChannelHandler,如 ByteToMessageDecoder 和 MessageToByteEncoder,用于解码和编码网络数据。这些处理器使得数据在网络中的传输变得简单,例如,它可以轻松地处理 HTTP 请求和响应,...
3. **编码与解码**:Netty提供了丰富的编码器和解码器,如ByteToMessageDecoder和MessageToByteEncoder,用于在网络通信中进行数据转换。了解如何自定义编码解码器以适应特定的协议格式。 4. **协议支持**:Netty...
Netty 的编码和解码是通过 ByteToMessageDecoder 和 MessageToByteEncoder 来实现的。它们负责将字节流转换为对象,或者将对象转换为字节流,以便在网络上传输。例如,TCP 连接中常用的 ...
- `netty-handler.jar`:包含了处理网络事件的基本处理器,如ByteToMessageDecoder和MessageToByteEncoder。 - `netty-codec-http.jar`:提供了HTTP协议的支持,包括HTTP/1.1和WebSocket。 3. **Example 使用例子...
在“EasyMedia-master”这个压缩包中,可能包含了实现上述功能的源代码,包括Netty服务器和客户端的配置、处理器、解码器和编码器的实现,以及可能的测试用例。用户可以借此学习如何将Netty应用于实时流媒体传输,...
自定义的编码器和解码器可以通过继承`MessageToByteEncoder`或`MessageToMessageEncoder`,`MessageToByteDecoder`或`MessageToMessageDecoder`来实现。 Netty的管道(Pipeline)机制是其灵活性的关键。每个`...
Netty提供了ByteToMessageDecoder和MessageToByteEncoder等类,用于自定义协议的编解码过程,确保LIS系统的各种数据格式能够正确地在网络中传输和解析。 3. **事件驱动**:Netty基于事件驱动模型,通过...
3. **编解码器**:Netty 提供了丰富的编解码器,如 ByteToMessageDecoder 和 MessageToByteEncoder,用于将原始字节流转换为有意义的对象,以及将对象编码回字节流。 4. **异常处理**:学习如何在 Pipeline 中捕获...