`
zhaohaolin
  • 浏览: 1017390 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

netty的粘包 解包问题【转】

 
阅读更多

 

  1. 1、简介  
  2. Java1.4提供了NIO使开发者可以使用Java编写高性能的服务端程序,但使用原生的NIO API就像Linux C中网络编程一样,还是需要做IO处理、协议处理等低层次工作。所以,就像C服务端程序大量使用libevent作为网络应用框架一样,Java社区也不断涌现出基于NIO的网络应用框架。在这其中,Jboss出品的Netty就是个中翘楚。Netty是个异步的事件驱动网络应用框架,具有高性能、高扩展性等特性。Netty提供了统一的底层协议接口,使得开发者从底层的网络协议(比如TCP/IP、UDP)中解脱出来。就使用来说,开发者只要参考 Netty提供的若干例子和它的指南文档,就可以放手开发基于Netty的服务端程序了。  
  3.   
  4. 在Java社区,最知名的开源Java NIO框架要属Mina和Netty,而且两者渊源颇多,对两者的比较自然不少。实际上,Netty的作者原来就是Mina作者之一,所以可以想到,Netty和Mina在设计理念上会有很多共同点。我对Mina没什么研究,但其作者介绍,Netty的设计对开发者有更友好的扩展性,并且性能方面要优于Mina,而Netty完善的文档也很吸引人。所以,如果你在寻找Java NIO框架,Netty是个很不错的选择。本文的内容就是围绕一个demo介绍使用Netty的点点滴滴。  
  5.   
  6. 2、服务端程序  
  7. 2.1、ChannelHandler  
  8. 服务端程序通常的处理过程是:解码请求数据、业务逻辑处理、编码响应。从框架角度来说,可以提供3个接口来控制并调度该处理过程;从更通用的角度来说,并不特化处理其中的每一步,而把每一步当做过滤器链中的一环,这也是Netty的做法。Netty对请求处理过程实现了过滤器链模式(ChannelPipeline),每个过滤器实现了ChannelHandler接口。Netty中有两种请求事件流类型也做了细分:  
  9.   
  10. 1)downstream event:其对应的ChannelHandler子接口是ChannelDownstreamHandler。downstream event是说从头到尾执行ChannelPipeline中的ChannelDownstreamHandler,这一过程相当于向外发送数据的过程。 downstream event有:”write”、”bind”、”unbind”、 “connect”、 “disconnect”、”close”。  
  11.   
  12. 2)upstream event:其对应的ChannelHandler子接口是ChannelUpstreamHandler。upstream event处理的事件方向和downstream event相反,这一过程相当于接收处理外来请求的过程。upstream event有:”messageReceived”、 “exceptionCaught”、”channelOpen”、”channelClosed”、 “channelBound”、”channelUnbound”、 “channelConnected”、”writeComplete”、”channelDisconnected”、”channelInterestChanged”。  
  13.   
  14. Netty中有个注释@interface ChannelPipelineCoverage,它表示被注释的ChannelHandler是否能添加到多个ChannelPipeline中,其可选的值是”all”和”one”。”all”表示ChannelHandler是无状态的,可被多个ChannelPipeline共享,而”one”表示ChannelHandler只作用于单个ChannelPipeline中。但ChannelPipelineCoverage只是个注释而已,并没有实际的检查作用。对于ChannelHandler是”all”还是”one”,还是根据逻辑需要而定。比如,像解码请求handler,因为可能解码的数据不完整,需要等待下一次读事件来了之后再继续解析,所以解码请求handler就需要是”one”的(否则多个Channel共享数据就乱了)。而像业务逻辑处理hanlder通常是”all”的。  
  15.   
  16. 下面以一个简单的例子说明如何编写“解码请求数据、业务逻辑处理、编码响应”这一过程中涉及的ChannelHandler。该例子实现的协议格式很简单,请求和响应流中头4个字节表示后面跟的内容长度,根据该长度可得到内容体。  
  17.   
  18. 首先看下解码器的实现:  
  19.   
  20. public class MessageDecoder extends FrameDecoder {  
  21.    
  22.     @Override  
  23.     protected Object decode(  
  24.             ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {  
  25.         if (buffer.readableBytes() < 4) {  
  26.             return null;//(1)  
  27.         }  
  28.         int dataLength = buffer.getInt(buffer.readerIndex());  
  29.         if (buffer.readableBytes() < dataLength + 4) {  
  30.             return null;//(2)  
  31.         }  
  32.    
  33.         buffer.skipBytes(4);//(3)  
  34.         byte[] decoded = new byte[dataLength];  
  35.         buffer.readBytes(decoded);  
  36.         String msg = new String(decoded);//(4)  
  37.         return msg;  
  38.     }  
  39. }  
  40. MessageDecoder继承自FrameDecoder,FrameDecoder是Netty codec包中的辅助类,它是个ChannelUpstreamHandler,decode方法是FrameDecoder子类需要实现的。在上面的代码中,有:  
  41.   
  42. (1)检查ChannelBuffer中的字节数,如果ChannelBuffer可读的字节数少于4,则返回null等待下次读事件。  
  43. (2)继续检查ChannelBuffer中的字节数,如果ChannelBuffer可读的字节数少于dataLength + 4,则返回null等待下次读事件。  
  44. (3)越过dataLength的字节。  
  45. (4)构造解码的字符串返回。  
  46.   
  47. @ChannelPipelineCoverage("all")  
  48. public class MessageServerHandler extends SimpleChannelUpstreamHandler {  
  49.    
  50.     private static final Logger logger = Logger.getLogger(  
  51.             MessageServerHandler.class.getName());  
  52.    
  53.     @Override  
  54.     public void messageReceived(  
  55.             ChannelHandlerContext ctx, MessageEvent e) {  
  56.         if (!(e.getMessage() instanceof String)) {  
  57.             return;//(1)  
  58.         }  
  59.         String msg = (String) e.getMessage();  
  60.         System.err.println("got msg:"+msg);  
  61.         e.getChannel().write(msg);//(2)  
  62.     }  
  63.    
  64.     @Override  
  65.     public void exceptionCaught(  
  66.             ChannelHandlerContext ctx, ExceptionEvent e) {  
  67.         logger.log(  
  68.                 Level.WARNING,  
  69.                 "Unexpected exception from downstream.",  
  70.                 e.getCause());  
  71.         e.getChannel().close();  
  72.     }  
  73. }  
  74. MessageServerHandler是服务端业务处理handler,其继承自SimpleChannelUpstreamHandler,并主要实现messageReceived事件。关于该类,有如下注解:  
  75.   
  76. (1)该upstream事件流中,首先经过MessageDecoder,其会将decode返回的解码后的数据构造成 MessageEvent.getMessage(),所以在handler上下文关系中,MessageEvent.getMessage()并不一定都返回ChannelBuffer类型的数据。  
  77. (2)MessageServerHandler只是简单的将得到的msg再写回给客户端。e.getChannel().write(msg);操作将触发DownstreamMessageEvent事件,也就是调用下面的MessageEncoder将编码的数据返回给客户端。  
  78.   
  79. @ChannelPipelineCoverage("all")  
  80. public class MessageEncoder extends OneToOneEncoder {  
  81.    
  82.     @Override  
  83.     protected Object encode(  
  84.             ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {  
  85.         if (!(msg instanceof String)) {  
  86.             return msg;//(1)  
  87.         }  
  88.    
  89.         String res = (String)msg;  
  90.         byte[] data = res.getBytes();  
  91.         int dataLength = data.length;  
  92.         ChannelBuffer buf = ChannelBuffers.dynamicBuffer();//(2)  
  93.         buf.writeInt(dataLength);  
  94.         buf.writeBytes(data);  
  95.         return buf;//(3)  
  96.     }  
  97. }  
  98. MessageEncoder是个ChannelDownstreamHandler。对该类的注解如下:  
  99.   
  100. (1)如果编码的msg不是合法类型,就直接返回该msg,之后OneToOneEncoder会调用 ctx.sendDownstream(evt);来调用下一个ChannelDownstreamHandler。对于该例子来说,这种情况是不应该出现的。  
  101. (2)开发者创建ChannelBuffer的用武之地就是这儿了,通常使用dynamicBuffer即可,表示得到的ChannelBuffer可动态增加大小。  
  102. (3)返回编码后的ChannelBuffer之后,OneToOneEncoder会调用Channels.write将数据写回客户端。  
  103.   
  104. 2.2、MessageServerPipelineFactory  
  105. 创建了3个ChannelHandler,需要将他们注册到ChannelPipeline,而ChannelPipeline又是和Channel对应的(是全局单例还是每个Channel对应一个ChannelPipeline实例依赖于实现)。可以实现ChannelPipeline的工厂接口 ChannelPipelineFactory实现该目的。MessageServerPipelineFactory的代码如下:  
  106.   
  107. public class MessageServerPipelineFactory implements  
  108.         ChannelPipelineFactory {  
  109.    
  110.     public ChannelPipeline getPipeline() throws Exception {  
  111.         ChannelPipeline pipeline = pipeline();  
  112.    
  113.         pipeline.addLast("decoder"new MessageDecoder());  
  114.         pipeline.addLast("encoder"new MessageEncoder());  
  115.         pipeline.addLast("handler"new MessageServerHandler());  
  116.    
  117.         return pipeline;  
  118.     }  
  119. }  
  120. 2.3、MessageServer  
  121. 服务端程序就剩下启动代码了,使用Netty的ServerBootstrap三下五除二完成之。  
  122.   
  123. public class MessageServer {  
  124.    
  125.     public static void main(String[] args) throws Exception {  
  126.         // Configure the server.  
  127.         ServerBootstrap bootstrap = new ServerBootstrap(  
  128.                 new NioServerSocketChannelFactory(  
  129.                         Executors.newCachedThreadPool(),  
  130.                         Executors.newCachedThreadPool()));  
  131.    
  132.         // Set up the default event pipeline.  
  133.         bootstrap.setPipelineFactory(new MessageServerPipelineFactory());  
  134.    
  135.         // Bind and start to accept incoming connections.  
  136.         bootstrap.bind(new InetSocketAddress(8080));  
  137.    
  138.     }  
  139. }  
  140. 稍加补充的是,该Server程序并不完整,它没有处理关闭时的资源释放,尽管暴力的来看并不一定需要做这样的善后工作。  
  141.   
  142. 3、客户端程序  
  143. 客户端程序和服务端程序处理模型上是很相似的,这里还是付上代码并作简要说明。  
  144.   
  145. 3.1、 ChannelHandler  
  146. 客户端是先发送数据到服务端(downstream事件流),然后是处理从服务端接收的数据(upstream事件流)。这里有个问题是,怎么把需要发送的数据送到downstream事件流里呢?这就用到了ChannelUpstreamHandler的channelConnected事件了。实现的 MessageClientHandler代码如下:  
  147.   
  148. @ChannelPipelineCoverage("all")  
  149. public class MessageClientHandler extends SimpleChannelUpstreamHandler {  
  150.    
  151.     private static final Logger logger = Logger.getLogger(  
  152.             MessageClientHandler.class.getName());  
  153.    
  154.    
  155.     @Override  
  156.     public void channelConnected(  
  157.             ChannelHandlerContext ctx, ChannelStateEvent e) {  
  158.         String message = "hello kafka0102";  
  159.         e.getChannel().write(message);  
  160.     }  
  161.    
  162.     @Override  
  163.     public void messageReceived(  
  164.             ChannelHandlerContext ctx, MessageEvent e) {  
  165.         // Send back the received message to the remote peer.  
  166.         System.err.println("messageReceived send message "+e.getMessage());  
  167.         try {  
  168.             Thread.sleep(1000*3);  
  169.         } catch (Exception ex) {  
  170.             ex.printStackTrace();  
  171.         }  
  172.         e.getChannel().write(e.getMessage());  
  173.     }  
  174.    
  175.     @Override  
  176.     public void exceptionCaught(  
  177.             ChannelHandlerContext ctx, ExceptionEvent e) {  
  178.         // Close the connection when an exception is raised.  
  179.         logger.log(  
  180.                 Level.WARNING,  
  181.                 "Unexpected exception from downstream.",  
  182.                 e.getCause());  
  183.         e.getChannel().close();  
  184.     }  
  185. }  
  186. 对于编码和解码Handler,复用MessageEncoder和MessageDecoder即可。  
  187.   
  188. 3.2、 MessageClientPipelineFactory  
  189.     public class MessageClientPipelineFactory implements  
  190.         ChannelPipelineFactory {  
  191.    
  192.     public ChannelPipeline getPipeline() throws Exception {  
  193.         ChannelPipeline pipeline = pipeline();  
  194.    
  195.         pipeline.addLast("decoder"new MessageDecoder());  
  196.         pipeline.addLast("encoder"new MessageEncoder());  
  197.         pipeline.addLast("handler"new MessageClientHandler());  
  198.    
  199.         return pipeline;  
  200.     }  
  201. }  
  202. 3.3、MessageClient  
  203. public class MessageClient {  
  204.    
  205.     public static void main(String[] args) throws Exception {  
  206.         // Parse options.  
  207.         String host = "127.0.0.1";  
  208.         int port = 8080;  
  209.         // Configure the client.  
  210.         ClientBootstrap bootstrap = new ClientBootstrap(  
  211.                 new NioClientSocketChannelFactory(  
  212.                         Executors.newCachedThreadPool(),  
  213.                         Executors.newCachedThreadPool()));  
  214.         // Set up the event pipeline factory.  
  215.         bootstrap.setPipelineFactory(new MessageClientPipelineFactory());  
  216.         // Start the connection attempt.  
  217.         ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));  
  218.         // Wait until the connection is closed or the connection attempt fails.  
  219.         future.getChannel().getCloseFuture().awaitUninterruptibly();  
  220.         // Shut down thread pools to exit.  
  221.         bootstrap.releaseExternalResources();  
  222.     }  
  223. }  
  224. 在写客户端例子时,我想像的代码并不是这样的,对客户端的代码我也没做过多的研究,所以也可能没有找到更好的解决方案。在上面的例子中,bootstrap.connect方法中会触发实际的连接操作,接着触发 MessageClientHandler.channelConnected,使整个过程运转起来。但是,我想要的是一个连接池,并且如何写数据也不应该在channelConnected中,这样对于动态的数据,只能在构造函数中传递需要写的数据了。但到现在,我还不清楚如何将连接池和 ChannelPipeline有效的结合起来。或许,这样的需求可以跨过Netty来实现。  
  225.   
  226. 4、总结  
  227. 关于Netty的初步使用,尚且总结到这里。关于这篇文章,写得断断续续,以至于到后来我都没兴趣把内容都整理出来。当然,这多少也是因为我是先整理 Netty原理方面的东西所致。我也只能卑微的期望,该文对Netty入门者会有些许帮助。  
  228.   
  229.   
  230. =============================== 华丽的终止符 ================================   

 

分享到:
评论

相关推荐

    使用Netty解决TCP粘包和拆包问题过程详解

    使用Netty解决TCP粘包和拆包问题过程详解 Netty是一个流行的Java网络编程框架,提供了简洁、灵活的API来处理网络编程的各种问题。其中,解决TCP粘包和拆包问题是Netty的一个重要应用场景。本文将详细介绍使用Netty...

    Netty 粘包/半包原理与拆包实战 【源代码 新】

    - 本实例是《Netty 粘包/半包原理与拆包实战》 一文的源代码工程。 大家好,我是作者尼恩。 在前面的文章中,完成了一个高性能的 Java 聊天程序,尼恩已经再一次的进行了通讯协议的选择。放弃了大家非常熟悉的json...

    Netty粘包拆包问题解决方案

    Netty 粘包拆包问题解决方案 Netty 是一个基于 Java 的网络编程框架,它提供了一个便捷的方式来处理网络数据的读写操作。然而,在使用 Netty 进行网络编程时,经常会遇到粘包和拆包的问题。所谓粘包和拆包,就是指...

    Netty 粘包/半包原理与拆包实战 源码

    本实例是《Netty 粘包/半包原理与拆包实战》 一文的源代码工程。 大家好,我是作者尼恩。 在前面的文章中,完成了一个...在开始聊天器实战开发之前,还有一个非常基础的问题,需要解决:这就是通讯的粘包和半包问题。

    Netty粘包分包现象及解决方案实战,防socket攻击

    Netty粘包分包现象及解决方案实战,防socket攻击水。

    Netty粘包分包服务器端客户端完整例子

    在使用Netty进行网络通信时,"粘包"和"分包"是两个常见的问题,这两个概念在标题和描述中被提及。 粘包和分包问题通常出现在数据传输过程中。当发送端连续发送多个小包时,接收端可能会一次性接收到这些小包的组合...

    netty拆包粘包解决方案示例

    本文将深入探讨Netty中解决拆包粘包问题的策略,以及客户端断线重连的实现方式。 首先,我们需要理解什么是“拆包”和“粘包”。在TCP/IP通信中,由于TCP协议的流式传输特性,数据可能会被分片或者合并,这就可能...

    Netty粘包与拆包源码

    对于粘包和拆包问题,Netty提供了ByteToMessageDecoder和MessageToByteEncoder等基础类,开发者可以通过继承它们并实现自己的解码和编码逻辑。 1. ByteToMessageDecoder:这个类的主要任务是将接收到的字节数组拆分...

    Netty粘包-拆包应用案例及解决方案分析.docx

    Netty 作为一个高性能的网络通信框架,提供了多种解决 TCP 粘包/拆包问题的方法。 一、TCP 粘包/拆包的基础知识 TCP 是一个“流”协议,没有界限的一串数据。TCP 底层并不知道上层业务逻辑,它会根据 TCP 缓冲区...

    Netty粘包拆包解决方案.docx

    总的来说,Netty提供的解码器机制使得开发者无需深入理解TCP底层细节,也能有效地解决粘包和拆包问题,提高了网络通信的可靠性和效率。通过选择合适的解码器并合理配置,我们可以构建出健壮的网络应用。

    unity实现Socket通讯(内含tcp粘包/拆包解决)

    - 协议框架:设计一套完整的协议框架,如Netty或protobuf,它们提供自动的粘包/拆包处理。 五、实现步骤 1. 设计消息结构:定义消息的格式,如固定长度、包头+包体或特殊分隔符。 2. 创建数据序列化/反序列化函数:...

    Netty精粹之TCP粘包拆包问题

    ### Netty精粹之TCP粘包拆包问题详解 #### 一、引言 在网络通信领域,尤其是在基于TCP协议的应用程序开发中,经常会遇到“粘包”和“拆包”的问题。这些问题虽然属于较为底层的技术细节,但对于保障数据传输的准确...

    springboot整合netty的实践操作(解决粘包拆包问题)

    springboot整合netty的实践操作(解决粘包拆包问题)

    Socket粘包问题终极解决方案-Netty版.docx

    Socket 粘包问题终极解决方案-Netty 版 Socket 粘包问题是指在使用 TCP 协议进行网络通讯时,由于 TCP 协议本身没有边界的概念,导致在传输数据时无法确定消息的边界,从而产生粘包和半包问题。本文将详细介绍 ...

    Netty进制转换乱码问题

    在使用Netty进行TCP/IP通信时,我们可能会遇到数据进制转换导致的乱码问题。这个问题通常是由于数据编码不一致或者处理方式不当所引起的。在本文中,我们将深入探讨Netty中的进制转换和字符编码,并提供解决方案。 ...

    Netty中粘包和拆包的解决方案.docx

    Netty框架提供了多种内置的解码器来处理粘包和拆包问题,方便开发者根据实际需求选择: 1. **FixedLengthFrameDecoder**:用于处理固定长度的数据包,每次读取指定长度的数据作为完整的消息。 2. **...

    基于javatcpsocket通信的拆包和装包源码-NettyTree:网状树

    TCP粘包、拆包 2)编解码技术 1)Java序列化 2)业界主流的编解码框架 Thrift Protobuf 3) Websocket 5)Netty协议栈功能设计 6)Netty源码分析 ByteBuf工作原理 Channel, Unsafe ChannelPipline, ChannelHandler ...

    netty消息拆包粘包编解码(java代码)

    3. **Netty中的编解码处理**:Netty 提供了多种ChannelHandler(处理器)来解决这个问题,其中`LengthFieldBasedFrameDecoder`和`LengthFieldPrepender`是处理粘包和拆包的常用工具。 - `...

    netty 数据分包、组包、粘包处理机制(部分)1

    Netty 数据分包、组包、粘包处理机制 Netty 中的 LengthFieldBasedFrameDecoder 是一种常用的处理大数据分包传输问题的解决类。该类提供了多种参数来调整帧的解码方式,从而满足不同的应用场景。 1. maxFrame...

    netty解析报文,解决粘包拆包

    注:下载前请查看本人博客文章,看是否...里面包含模拟TCP客户端发送报文工具,硬件厂商提供的协议,服务端(springboot+netty)解析报文源码,源码里整合了redis,不需要可自行删除,如有需要客户端代码,可联系我。

Global site tag (gtag.js) - Google Analytics