1.7. 流数据的传输处理
1.7.1. Socket Buffer的缺陷
对于例如TCP/IP这种基于流的传输协议实现,接收到的数据会被存储在socket的接受缓冲区内。不幸的是,这种基于流的传输缓冲区并不是一个包队列,而是一个字节队列。这意味着,即使你以两个数据包的形式发送了两条消息,操作系统却不会把它们看成是两条消息,而仅仅是一个批次的字节序 列。因此,在这种情况下我们就无法保证收到的数据恰好就是远程节点所发送的数据。例如,让我们假设一个操作系统的TCP/IP堆栈收到了三个数据包:
+-----+-----+-----+
| ABC | DEF | GHI |
+-----+-----+-----+
由于这种流传输协议的普遍性质,在你的应用中有较高的可能会把这些数据读取为另外一种形式:
+----+-------+---+---+
| AB | CDEFG | H | I |
+----+-------+---+---+
因此对于数据的接收方,不管是服务端还是客户端,应当重构这些接收到的数据,让其变成一种可让你的应用逻辑易于理解的更有意义的数据结构。在上面所述的这个例子中,接收到的数据应当重构为下面的形式:
+-----+-----+-----+
| ABC | DEF | GHI |
+-----+-----+-----+
1.7.2. 第一种方案
现在让我们回到时间协议服务客户端的例子中。我们在这里遇到了同样的问题。一个32位的整数是一个非常小的数据量,因此它常常不会被切分在不同的数据段内。然而,问题是它确实可以被切分在不同的数据段内,并且这种可能性随着流量的增加而提高。
最简单的方案是在程序内部创建一个可准确接收4字节数据的累积性缓冲。下面的代码是修复了这个问题后的TimeClientHandler实现。
package org.jboss.netty.example.time; import static org.jboss.netty.buffer.ChannelBuffers.*; import java.util.Date; @ChannelPipelineCoverage("one") public class TimeClientHandler extends SimpleChannelHandler { private final ChannelBuffer buf = dynamicBuffer(); @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { ChannelBuffer m = (ChannelBuffer) e.getMessage(); buf.writeBytes(m); if (buf.readableBytes() >= 4) { long currentTimeMillis = buf.readInt() * 1000L; System.out.println(new Date(currentTimeMillis)); e.getChannel().close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { e.getCause().printStackTrace(); e.getChannel().close(); } }
代码说明
1) 这一次我们使用“one”做为ChannelPipelineCoverage的注解值。这是由于这个修改后的TimeClientHandler不在不 在内部保持一个buffer缓冲,因此这个TimeClientHandler实例不可以再被多个Channel通道或ChannelPipeline共 享。否则这个内部的buffer缓冲将无法缓冲正确的数据内容。
2) 动态的buffer缓冲也是ChannelBuffer的一种实现,其拥有动态增加缓冲容量的能力。当你无法预估消息的数据长度时,动态的buffer缓冲是一种很有用的缓冲结构。
3) 首先,所有的数据将会被累积的缓冲至buf容器。
4) 之后,这个处理器将会检查是否收到了足够的数据然后再进行真实的业务逻辑处理,在这个例子中需要接收4字节数据。否则,Netty将重复调用messageReceived方法,直至4字节数据接收完成。
这里还有另一个地方需要进行修改。你是否还记得我们把TimeClientHandler实例添加到了这个ClientBootstrap实例的默 认ChannelPipeline管道里?这意味着同一个TimeClientHandler实例将被多个Channel通道共享,因此接受的数据也将受 到破坏。为了给每一个Channel通道创建一个新的TimeClientHandler实例,我们需要实现一个 ChannelPipelineFactory管道工厂:
package org.jboss.netty.example.time; public class TimeClientPipelineFactory implements ChannelPipelineFactory { public ChannelPipeline getPipeline() { ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("handler", new TimeClientHandler()); return pipeline; } }
现在,我们需要把TimeClient下面的代码片段:
TimeClientHandler handler = new TimeClientHandler(); bootstrap.getPipeline().addLast("handler", handler);
替换为:
bootstrap.setPipelineFactory(new TimeClientPipelineFactory());
虽然这看上去有些复杂,并且由于在TimeClient内部我们只创建了一个连接(connection),因此我们在这里确实没必要引入TimeClientPipelineFactory实例。
然而,当你的应用变得越来越复杂,你就总会需要实现自己的ChannelPipelineFactory,这个管道工厂将会令你的管道配置变得更加具有灵活性。
1.7.3. 第二种方案
虽然第二种方案解决了时间协议客户端遇到的问题,但是这个修改后的处理器实现看上去却不再那么简洁。设想一种更为复杂的,由多个可变长度字段组成的协议。你的ChannelHandler实现将变得越来越难以维护。
正如你已注意到的,你可以为一个ChannelPipeline添加多个ChannelHandler,因此,为了减小应用的复杂性,你可以把这个 臃肿的ChannelHandler切分为多个独立的模块单元。例如,你可以把TimeClientHandler切分为两个独立的处理器:
TimeDecoder,解决数据分段的问题。
TimeClientHandler,原始版本的实现。
幸运的是,Netty提供了一个可扩展的类,这个类可以直接拿过来使用帮你完成TimeDecoder的开发:
package org.jboss.netty.example.time; public class TimeDecoder extends FrameDecoder { @Override protected Object decode( ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) { if (buffer.readableBytes() < 4) { return null; } return buffer.readBytes(4); } }
代码说明
1) 这里不再需要使用ChannelPipelineCoverage的注解,因为FrameDecoder总是被注解为“one”。
2) 当接收到新的数据后,FrameDecoder会调用decode方法,同时传入一个FrameDecoder内部持有的累积型buffer缓冲。
3) 如果decode返回null值,这意味着还没有接收到足够的数据。当有足够数量的数据后FrameDecoder会再次调用decode方法。
4) 如果decode方法返回一个非空值,这意味着decode方法已经成功完成一条信息的解码。FrameDecoder将丢弃这个内部的累计型缓冲。请注 意你不需要对多条消息进行解码,FrameDecoder将保持对decode方法的调用,直到decode方法返回非空对象。
如果你是一个勇于尝试的人,你或许应当使用ReplayingDecoder,ReplayingDecoder更加简化了解码的过程。为此你需要查看API手册获得更多的帮助信息。
package org.jboss.netty.example.time; public class TimeDecoder extends ReplayingDecoder<VoidEnum> { @Override protected Object decode( ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, VoidEnum state) { return buffer.readBytes(4); } }
此外,Netty还为你提供了一些可以直接使用的decoder实现,这些decoder实现不仅可以让你非常容易的实现大多数协议,并且还会帮你避免某些臃肿、难以维护的处理器实现。请参考下面的代码包获得更加详细的实例:
org.jboss.netty.example.factorial for a binary protocol, and
org.jboss.netty.example.telnet for a text line-based protocol
相关推荐
Netty入门教程文档 Netty是Java的网络编程框架,广泛应用于数据采集服务中,本文将对Netty的基本概念和应用进行详细介绍,并将其与ETL技术结合,讲解如何使用Netty进行数据流转和处理。 1. ETL概述 ETL(Extract...
### Netty 入门与实战:仿写微信 IM 即时通讯系统 #### 一、引言 随着移动互联网技术的飞速发展,即时通讯(IM)应用已成为人们日常生活中不可或缺的一部分。微信作为中国最成功的即时通讯软件之一,其背后的架构和...
### Netty入门与实战:仿写微信IM即时通讯系统 #### 一、引言 随着移动互联网技术的飞速发展,即时通讯(IM)系统已成为人们日常生活中不可或缺的一部分。微信作为国内最受欢迎的即时通讯软件之一,其高效稳定的通信...
在本文中,我们将深入探讨Netty框架,并通过实战项目——仿写微信IM即时通讯系统,来深入了解其在高性能网络应用中的应用。Netty是Java领域中一个高效的异步事件驱动的网络应用程序框架,它为快速开发可维护的高性能...
Netty 入门与实战:仿写微信 IM 即时通讯系统,掘金小册子,netty教程。章节齐全无缺失,排版非常不错。 1.仿微信IM系统简介 1 2.Netty是什么? 2 3.服务端启动流程 8 4.客户端启动流程 11 5.实战:客户端与服务端双向...
《Netty 入门与实战:仿写微信 IM 即时通讯系统》是一份专为初学者设计的高质量教程,旨在帮助读者快速掌握Netty框架并应用到实际的即时通讯系统开发中,如仿写微信IM系统。Netty是Java领域内广泛使用的高性能、异步...
根据提供的文件信息“netty入门到精通”,我们可以深入探讨Netty框架的相关知识点,包括其基本概念、核心组件、应用场景以及如何逐步掌握这项技术。 ### Netty框架简介 Netty是一款高性能、异步事件驱动的网络应用...
3. **客户端与服务端通信协议编解码** 在Netty中,通信协议的编解码是通过ChannelHandlerContext和ChannelHandler实现的。例如,你可以自定义Decoder来解析进来的字节流,将它们转换为业务对象;而Encoder则负责将...
本工程采用maven+netty4.1.0+PrefixedStringDecoder+json技术,包括客户端和服务端。先运行服务端SampleServer,再去等客户端SampleClient。示例中发的是心跳包,其中消息格式定义为msgType + msgNo + content(json...
文件"72-第2章_20-netty入门-jdk-future-480P 清晰-AVC.Cover.jpg"、"73-第2章_21-netty入门-netty-future-480P 清晰-AVC.Cover.jpg"和"74-第2章_22-netty入门-netty-promise-480P 清晰-AVC.Cover.jpg"详细解释了...
这个教程将引导我们入门 Netty 编码,让我们深入理解其核心概念和实际应用。 首先,Netty 的核心是其设计模式,即 Reactor 模式,也称为事件驱动模型。Reactor 模式允许 Netty 高效地处理大量并发连接,通过非阻塞 ...
《Netty+入门与实战:仿写微信+IM+即时通讯系统》是一本专注于使用Netty框架构建即时通讯系统的教程。Netty是一个高性能、异步事件驱动的网络应用框架,适用于开发服务器和客户端的Java应用。它极大地简化了网络编程...
Netty快速入门系列源码, 参考 https://blog.csdn.net/netcobol Netty是一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。 Netty是...
这个“netty入门案例.zip”文件提供了一个简单的 Netty 应用示例,旨在帮助初学者快速理解并掌握 Netty 的基本概念和使用方法。下面将详细介绍这个入门案例中的关键知识点。 首先,Netty 的核心是其 ChannelHandler...
在学习"Netty入门Reactor示例"时,你可以按照以下步骤进行: 1. **创建服务器端:** - 首先,你需要创建一个ServerBootstrap实例,配置BossGroup和WorkerGroup。 - 然后,添加自定义的...
3.第二个示例 com.time’ Netty服务端与客户端,数据的发送与接收 使用:先运行TimeServer,在运行TimeClient。成功连接后,服务器发送一个时间给客户端。输出到客户端控制台 4.第三个示例 com.user_1 Netty将...