java Netty 之 字符串消息收发(ChannelBuffer)
ChannelBuffer
Netty中的消息传递,都必须以字节的形式,以ChannelBuffer为载体传递。简单的说,就是你想直接写个字符串过去,对不起,抛异常。虽然,Netty定义的writer的接口参数是Object的,这可能也是会给新上手的朋友容易造成误会的地方。Netty源码中,是这样判断的:
SendBuffer acquire(Object message) { if (message instanceof ChannelBuffer) { return acquire((ChannelBuffer) message); } else if (message instanceof FileRegion) { return acquire((FileRegion) message); } throw new IllegalArgumentException("unsupported message type: " + message.getClass()); }
所以,我们要想传递字符串,那么就必须转换成ChannelBuffer。明确了这一点,接下来我们上代码:
public class MessageServer { public static void main(String args[]) { // Server服务启动器 ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // 设置一个处理客户端消息和各种消息事件的类(Handler) bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new MessageServerHandler()); } }); // 开放8000端口供客户端访问。 bootstrap.bind(new InetSocketAddress(8000)); } private static class MessageServerHandler extends SimpleChannelHandler { /** * 用户接受客户端发来的消息,在有客户端消息到达时触发 * */ @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { ChannelBuffer buffer = (ChannelBuffer) e.getMessage(); System.out.println(buffer.toString(Charset.defaultCharset())); } } } public class MessageClient { public static void main(String args[]) { // Client服务启动器 ClientBootstrap bootstrap = new ClientBootstrap( new NioClientSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // 设置一个处理服务端消息和各种消息事件的类(Handler) bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new MessageClientHandler()); } }); // 连接到本地的8000端口的服务端 bootstrap.connect(new InetSocketAddress("127.0.0.1", 8000)); } private static class MessageClientHandler extends SimpleChannelHandler { /** * 当绑定到服务端的时候触发,给服务端发消息。 */ @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) { // 将字符串,构造成ChannelBuffer,传递给服务端 String msg = "Hello, I'm client."; ChannelBuffer buffer = ChannelBuffers.buffer(msg.length()); buffer.writeBytes(msg.getBytes()); e.getChannel().write(buffer); } } }
与 前面“Hello World” 样例代码不同的是,客户端在channel连通后,不是在本地打印,而是将消息转换成ChannelBuffer传递给服务端,服务端接受到ChannelBuffer后,解码成字符串打印出来。
数据的读和写
在TCP/IP这种基于流传递的协议中。他识别的不是你每一次发送来的消息,不是分包的。而是,只认识一个整体的流,即使分三次分别发送三段话:ABC、DEF、GHI。在传递的过程中,他就是一个具有整体长度的流。在读流的过程中,如果我一次读取的长度选择的不是三个,我可以收到类似AB、CDEFG、H、I这样的信息。这显然是我们不想看到的。所以说,在你写的消息收发的系统里,需要预先定义好这种解析机制,规定每帧(次)读取的长度。通过代码来理解一下:
public class ServerBufferHandler extends SimpleChannelHandler { /** * 用户接受客户端发来的消息,在有客户端消息到达时触发 * */ @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { ChannelBuffer buffer = (ChannelBuffer) e.getMessage(); // 五位读取 while (buffer.readableBytes() >= 5) { ChannelBuffer tempBuffer = buffer.readBytes(5); System.out.println(tempBuffer.toString(Charset.defaultCharset())); } // 读取剩下的信息 System.out.println(buffer.toString(Charset.defaultCharset())); } }
public class ClientBufferHandler extends SimpleChannelHandler { /** * 当绑定到服务端的时候触发,给服务端发消息。 * */ @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) { // 分段发送信息 sendMessageByFrame(e); } /** * 将<b>"Hello, I'm client."</b>分成三份发送。 * Hello, * I'm * client. * * @param e Netty事件 */ private void sendMessageByFrame(ChannelStateEvent e) { String msgOne = "Hello, "; String msgTwo = "I'm "; String msgThree = "client."; e.getChannel().write(tranStr2Buffer(msgOne)); e.getChannel().write(tranStr2Buffer(msgTwo)); e.getChannel().write(tranStr2Buffer(msgThree)); } /** * 将字符串转换成{@link ChannelBuffer},私有方法不进行字符串的非空判断。 * * @param str 待转换字符串,要求非null * * @return 转换后的ChannelBuffer */ private ChannelBuffer tranStr2Buffer(String str) { ChannelBuffer buffer = ChannelBuffers.buffer(str.length()); buffer.writeBytes(str.getBytes()); return buffer; } }
输出结果:
Hello
, I'm
clie
nt.
注意:
这里其实,服务端是否分段发送并不会影响输出结果,也就是说,你一次性的把"Hi, I'm client."这段信息发送过来,输出的结果也是一样的。这就是开头说的,传输的是流,不分包。而只在于你如何分段读写。
ChannelBuffer的结构
ChannelBuffer是Netty中比较常用的一个类,其功能类似于字符数组,可以对其进行读写操作。
ChannelBuffer的模型图如下:
如上图所示,一个ChannelBuffer被划分为三个部分:
- discardable:表示已经读过的内容缓冲区
- readable:表示可读的内容缓冲区
- writable:表示可写的内容缓冲区
ChannelBuffer的这三个缓冲区由2个内部控制指针来控制:
- readerIndex:控制读缓冲区首地址
- writerIndex:控制写缓冲区首地址
因此,ChannelBuffer提供的大部分操作都是围绕readerIndex和writerIndex来进行的。
ChannelBuffer的常用方法
1、
read()/skip()
从readerIndex读出或跳过指定数目的字节,同时readerIndex = readerIndex + byteNumber.如果readerIndex > capacity,表示读取下标越界,会抛出IndexOutOfBoundsException异常
readable():boolean
如果buffer有可读内容(此时writerIndex > readerIndex),则返回true,否则返回false
readerIndex():int
返回readerIndex
readableBytes():int
返回可读的字节数目(writerIndex - readerIndex)
2、
write();
写入指定数目的字节,同时writerIndex = writerIndex + byteNumber. 如果writerIndex > capacity,表示写入下标越界,会抛出IndexOutOfBoundsException异常
writable():boolean
如果buffer有可写入空间(此时writerIndex < capacity),则返回true,否则返回false。
writerIndex(): int
返回writerIndex
writeableIndex():int
返回可写入的字节数(capacity - writerIndex)。
3、
discardReadBytes()
丢弃已读的内容。其执行过程如下:
4、
clear()
丢弃所有的数据,并将readerIndex和writerIndex重置为0。
5、
markReaderIndex()
markWriterIndex()
保存readerIndex或writerIndex的状态
6、
resetReaderIndex()
resetWriterIndex()
重置readerIndex或writerIndex为最后一次保存的状态,如果没有保存过,则置为0
7、
duplicate()
slice()
拷贝和源目标共享buffer的数据区域,但是拷贝有自己的readerIndex和writerIndex以及markIndex,实际上只是拷贝了控制指针,数据区还是与源buffer共享。
8、
copy()
拷贝整个buffer,包括控制指针和数据区
测试代码:
package test; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; public class ChannelBufferTest { public static void main( String[] args ) { // TODO Auto-generated method stub ChannelBuffer buffer = ChannelBuffers.buffer( 10 ); System.out.println("readable bytes: " + buffer.readableBytes( )); System.out.println("readable index: " + buffer.readerIndex( )); System.out.println("writable bytes: " + buffer.writableBytes( )); System.out.println("writable index: " + buffer.writerIndex( )); buffer.writeInt( 10 ); System.out.println("after write one integer"); System.out.println("readable bytes: " + buffer.readableBytes( )); System.out.println("readable index: " + buffer.readerIndex( )); System.out.println("writable bytes: " + buffer.writableBytes( )); System.out.println("writable index: " + buffer.writerIndex( )); buffer.writeInt( 10 ); System.out.println("after write two integer"); System.out.println("readable bytes: " + buffer.readableBytes( )); System.out.println("readable index: " + buffer.readerIndex( )); System.out.println("writable bytes: " + buffer.writableBytes( )); System.out.println("writable index: " + buffer.writerIndex( )); int i = buffer.readInt( ); System.out.println("after read one integer: " + i); System.out.println("readable bytes: " + buffer.readableBytes( )); System.out.println("readable index: " + buffer.readerIndex( )); System.out.println("writable bytes: " + buffer.writableBytes( )); System.out.println("writable index: " + buffer.writerIndex( )); buffer.discardReadBytes( ); System.out.println("after discard read bytes"); System.out.println("readable bytes: " + buffer.readableBytes( )); System.out.println("readable index: " + buffer.readerIndex( )); System.out.println("writable bytes: " + buffer.writableBytes( )); System.out.println("writable index: " + buffer.writerIndex( )); buffer.resetReaderIndex( ); System.out.println("after reset reader index"); System.out.println("readable bytes: " + buffer.readableBytes( )); System.out.println("readable index: " + buffer.readerIndex( )); System.out.println("writable bytes: " + buffer.writableBytes( )); System.out.println("writable index: " + buffer.writerIndex( )); buffer.resetWriterIndex( ); System.out.println("after reset writer index"); System.out.println("readable bytes: " + buffer.readableBytes( )); System.out.println("readable index: " + buffer.readerIndex( )); System.out.println("writable bytes: " + buffer.writableBytes( )); System.out.println("writable index: " + buffer.writerIndex( )); } }
结果:
readable bytes: 0
readable index: 0
writable bytes: 10
writable index: 0
after write one integer
readable bytes: 4
readable index: 0
writable bytes: 6
writable index: 4
after write two integer
readable bytes: 8
readable index: 0
writable bytes: 2
writable index: 8
after read one integer: 10
readable bytes: 4
readable index: 4
writable bytes: 2
writable index: 8
after discard read bytes
readable bytes: 4
readable index: 0
writable bytes: 6
writable index: 4
after reset reader index
readable bytes: 4
readable index: 0
writable bytes: 6
writable index: 4
after reset writer index
readable bytes: 0
readable index: 0
writable bytes: 10
writable index: 0
相关推荐
Java Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在标题中提到的“JAVA Netty 获取串口数据并且下发数据”,这意味着我们将探讨如何利用Netty来处理串行...
java netty接收串口数据 开启windows串口工具 发送串口数据调试助手
基于 Java Netty实现的可用于内网穿透的代理工具.zip基于 Java Netty实现的可用于内网穿透的代理工具.zip基于 Java Netty实现的可用于内网穿透的代理工具.zip基于 Java Netty实现的可用于内网穿透的代理工具.zip基于...
在解析JSON时,我们可以使用Jackson或Gson库,将接收到的字符串转换为对应的Java对象。 SCANFISH-II型声呐系统数据接口协议可能定义了特定的数据格式和字段,这些信息需要在解析过程中得到理解和处理。这可能包括声...
在Java中,Netty提供了一个名为ProtobufDecoder和ProtobufEncoder的类,它们可以帮助我们自动地对protobuf消息进行解码和编码。 当C++客户端需要向Java服务器发送数据时,先序列化protobuf消息,然后通过socket发送...
标题中的“Netty实现Java服务端和C#客户端联通”是指使用Netty作为Java服务器框架,与C#客户端(使用DotNetty库)进行通信的一种技术实现。这涉及到跨平台的网络通信,以及两个不同编程语言间的交互。 Netty是Java...
在Java编程环境中,Netty是一个高性能、异步事件驱动的网络应用程序框架,常用于构建可伸缩、高并发的服务器。本示例关注的是如何利用Netty实现一个基于UDP(User Datagram Protocol)的数据接收服务,这在需要进行...
标题"Java Netty客户端与服务器"表明我们将探讨如何使用Netty构建一个简单的服务器和客户端系统,其中客户端能够发送消息到服务器,而服务器则会给予响应。这种通信模式在许多分布式系统中非常常见,例如聊天应用、...
Java Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。这本书“Java Netty权威指南”是深入理解并掌握Netty的关键资源,包含详细的目录,使得学习和查阅更为...
Java Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。这个"Java Netty基于对象数据传输Demo"应该是演示了如何使用Netty进行对象序列化和反序列化,以便在网络...
在这个“基于Netty的服务器客户端收发消息代码”中,我们可以看到一个简单的实现,它演示了如何使用Netty进行双向通信,即服务器与客户端之间的消息交换。 首先,我们从服务器端(ChatServer)入手。服务器端通常...
例如,你可以定义一个 `CustomMessage` 类,包含一个整型的序列号、一个字符串类型的消息内容以及其他可能需要的字段。确保为每个字段提供 getter 和 setter 方法以便于访问和设置。 接下来,我们需要编写自定义的...
Netty 是一个基于 Java 的高性能、异步事件驱动的网络应用程序框架,专为开发协议服务器和客户端而设计。它的核心是基于 NIO(非阻塞 I/O)模型,提供了丰富的 API 和高度可定制的组件,使得开发者能够快速、高效地...
Java应用程序中的Netty框架是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。Netty广泛应用于分布式系统、云计算、大数据处理等领域,它的核心特性包括非阻塞I/O、...
netty-mqtt是一个基于Java开发的MQTT 3.1.1协议服务端与客户端,包含113个文件,其中包括87个Java源文件、8个XML文件、7个Iml文件、3个YAML文件、3个JKS文件、2个Factories文件、1个LICENSE文件和1个Markdown文件。...
JAVA版基于netty的物联网高并发智能网关 JAVA版基于netty的物联网高并发智能网关 JAVA版基于netty的物联网高并发智能网关 JAVA版基于netty的物联网高并发智能网关 JAVA版基于netty的物联网高并发智能网关 JAVA...
Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端,是基于Java NIO的异步非阻塞的网络编程框架。Netty在内部实现了自己的线程模型,支持多种协议,包括UDP、TCP、...
java netty需要的包。。。。。。。。。。。。java netty需要的包,netty-3.2.10.Final.jar一个就行
JAVA netty完整示例代码。里面包括整个项目和所需的JAR包。示例以:TCP/IP自定义报文协议进行解析分析,基于帧头HEAD_DATA=0x76解析过程的示例代码,并对数据进行粘包分离的处理。粘包处理方式有两种:1.自定义报文...
Java Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在Java世界中,Netty由于其高效、灵活和强大的特性,已经成为网络编程的事实标准。本主题将深入探讨Netty...