`
youyu4
  • 浏览: 440093 次
社区版块
存档分类
最新评论

java Netty 之 字符串消息收发(ChannelBuffer)

 
阅读更多

java Netty 之 字符串消息收发(ChannelBuffer)

 

 

ChannelBuffer



 

      Netty中的消息传递,都必须以字节的形式,以ChannelBuffer为载体传递。简单的说,就是你想直接写个字符串过去,对不起,抛异常。虽然,Netty定义的writer的接口参数是Object的,这可能也是会给新上手的朋友容易造成误会的地方。Netty源码中,是这样判断的:

 

SendBuffer acquire(Object message) {
	if (message instanceof ChannelBuffer) {
		return acquire((ChannelBuffer) message);
	} else if (message instanceof FileRegion) {
		return acquire((FileRegion) message);
	}
 
	throw new IllegalArgumentException("unsupported message type: " + message.getClass());
}

 

 

所以,我们要想传递字符串,那么就必须转换成ChannelBuffer。明确了这一点,接下来我们上代码:

 

 

public class MessageServer {
 
	public static void main(String args[]) {
		// Server服务启动器
		ServerBootstrap bootstrap = new ServerBootstrap(
			new NioServerSocketChannelFactory(
			Executors.newCachedThreadPool(),
			Executors.newCachedThreadPool()));

		// 设置一个处理客户端消息和各种消息事件的类(Handler)
		bootstrap.setPipelineFactory(new ChannelPipelineFactory() {

			@Override
			public ChannelPipeline getPipeline() throws Exception {
				return Channels.pipeline(new MessageServerHandler());
			}
		});

		// 开放8000端口供客户端访问。
		bootstrap.bind(new InetSocketAddress(8000));
	}
 
	private static class MessageServerHandler extends SimpleChannelHandler {
 
		/**
		* 用户接受客户端发来的消息,在有客户端消息到达时触发
		*
		*/
		@Override
		public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
			ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
			System.out.println(buffer.toString(Charset.defaultCharset()));
		}
		
	}
}


public class MessageClient {
 
	public static void main(String args[]) {
		// Client服务启动器
		ClientBootstrap bootstrap = new ClientBootstrap(
			new NioClientSocketChannelFactory(
			Executors.newCachedThreadPool(),
			Executors.newCachedThreadPool()));
	
		// 设置一个处理服务端消息和各种消息事件的类(Handler)
		bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
			@Override
			public ChannelPipeline getPipeline() throws Exception {
				return Channels.pipeline(new MessageClientHandler());
			}
		});

		// 连接到本地的8000端口的服务端
		bootstrap.connect(new InetSocketAddress("127.0.0.1", 8000));
	}
 
	private static class MessageClientHandler extends SimpleChannelHandler {
 
		/**
		* 当绑定到服务端的时候触发,给服务端发消息。
		*/
		@Override
		public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
			
			// 将字符串,构造成ChannelBuffer,传递给服务端
			String msg = "Hello, I'm client.";
			ChannelBuffer buffer = ChannelBuffers.buffer(msg.length());
			buffer.writeBytes(msg.getBytes());
			e.getChannel().write(buffer);
		}
	}
}
 

 

       与 前面“Hello World” 样例代码不同的是,客户端在channel连通后,不是在本地打印,而是将消息转换成ChannelBuffer传递给服务端,服务端接受到ChannelBuffer后,解码成字符串打印出来。

 

 

 

 

数据的读和写

 

      在TCP/IP这种基于流传递的协议中。他识别的不是你每一次发送来的消息,不是分包的。而是,只认识一个整体的流,即使分三次分别发送三段话:ABC、DEF、GHI。在传递的过程中,他就是一个具有整体长度的流。在读流的过程中,如果我一次读取的长度选择的不是三个,我可以收到类似AB、CDEFG、H、I这样的信息。这显然是我们不想看到的。所以说,在你写的消息收发的系统里,需要预先定义好这种解析机制,规定每帧(次)读取的长度。通过代码来理解一下:

 

 

public class ServerBufferHandler extends SimpleChannelHandler {
 
	/**
	* 用户接受客户端发来的消息,在有客户端消息到达时触发
	*
	*/
	@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
		
		ChannelBuffer buffer = (ChannelBuffer) e.getMessage();

		// 五位读取
		while (buffer.readableBytes() >= 5) {
			ChannelBuffer tempBuffer = buffer.readBytes(5);
			System.out.println(tempBuffer.toString(Charset.defaultCharset()));
		}

		// 读取剩下的信息
		System.out.println(buffer.toString(Charset.defaultCharset()));
	}
}
 

 

 

public class ClientBufferHandler extends SimpleChannelHandler {
 
	/**
	* 当绑定到服务端的时候触发,给服务端发消息。
	*
	*/
	@Override
	public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {

		// 分段发送信息
		sendMessageByFrame(e);
	}
 
	/**
	* 将<b>"Hello, I'm client."</b>分成三份发送。 
	* Hello, 
	* I'm
	* client.
	*
	* @param e	Netty事件
	*/
	private void sendMessageByFrame(ChannelStateEvent e) {
		String msgOne = "Hello, ";
		String msgTwo = "I'm ";
		String msgThree = "client.";
		e.getChannel().write(tranStr2Buffer(msgOne));
		e.getChannel().write(tranStr2Buffer(msgTwo));
		e.getChannel().write(tranStr2Buffer(msgThree));
	}
 
	/**
	* 将字符串转换成{@link ChannelBuffer},私有方法不进行字符串的非空判断。
	*
	* @param str	待转换字符串,要求非null
	*            
	* @return 转换后的ChannelBuffer
	*/
	private ChannelBuffer tranStr2Buffer(String str) {
		ChannelBuffer buffer = ChannelBuffers.buffer(str.length());
		buffer.writeBytes(str.getBytes());
		return buffer;
	}
}
 

 

输出结果:

Hello

, I'm

 clie

nt.

 

 

注意:

       这里其实,服务端是否分段发送并不会影响输出结果,也就是说,你一次性的把"Hi, I'm client."这段信息发送过来,输出的结果也是一样的。这就是开头说的,传输的是流,不分包。而只在于你如何分段读写。

 

 

 

 

ChannelBuffer的结构

 

ChannelBuffer是Netty中比较常用的一个类,其功能类似于字符数组,可以对其进行读写操作。

 

ChannelBuffer的模型图如下:



 

 

如上图所示,一个ChannelBuffer被划分为三个部分:

 

  • discardable:表示已经读过的内容缓冲区
  • readable:表示可读的内容缓冲区
  • writable:表示可写的内容缓冲区

 

 

ChannelBuffer的这三个缓冲区由2个内部控制指针来控制:

 

  • readerIndex:控制读缓冲区首地址
  • writerIndex:控制写缓冲区首地址

 

因此,ChannelBuffer提供的大部分操作都是围绕readerIndex和writerIndex来进行的。

 

 

 

 

ChannelBuffer的常用方法

 

1、

 

read()/skip()

从readerIndex读出或跳过指定数目的字节,同时readerIndex = readerIndex + byteNumber.如果readerIndex > capacity,表示读取下标越界,会抛出IndexOutOfBoundsException异常

 

 

readable():boolean

如果buffer有可读内容(此时writerIndex > readerIndex),则返回true,否则返回false

 

 

readerIndex():int

返回readerIndex

 

readableBytes():int

返回可读的字节数目(writerIndex - readerIndex)

 

 

2、

 

write();

写入指定数目的字节,同时writerIndex = writerIndex + byteNumber. 如果writerIndex > capacity,表示写入下标越界,会抛出IndexOutOfBoundsException异常

 

 

writable():boolean

如果buffer有可写入空间(此时writerIndex < capacity),则返回true,否则返回false。

 

 

writerIndex(): int

返回writerIndex

 

 

writeableIndex():int

返回可写入的字节数(capacity - writerIndex)。

 

 

3、

 

discardReadBytes()

丢弃已读的内容。其执行过程如下:



 

 

4、

 

clear()

丢弃所有的数据,并将readerIndex和writerIndex重置为0。



 

 

5、

 

markReaderIndex()

markWriterIndex()

 

保存readerIndex或writerIndex的状态

 

 

6、

 

resetReaderIndex()

resetWriterIndex()

 

重置readerIndex或writerIndex为最后一次保存的状态,如果没有保存过,则置为0

 

 

7、

 

duplicate()

slice()

 

拷贝和源目标共享buffer的数据区域,但是拷贝有自己的readerIndex和writerIndex以及markIndex,实际上只是拷贝了控制指针,数据区还是与源buffer共享。

 

 

8、

 

copy()

拷贝整个buffer,包括控制指针和数据区

 

 

 

测试代码:

 

 

package test;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;

public class ChannelBufferTest {

    public static void main( String[] args ) {
        // TODO Auto-generated method stub
        ChannelBuffer buffer = ChannelBuffers.buffer( 10 );
        System.out.println("readable bytes: " + buffer.readableBytes( ));
        System.out.println("readable index: " + buffer.readerIndex( ));
        System.out.println("writable bytes: " + buffer.writableBytes( ));
        System.out.println("writable index: " + buffer.writerIndex( ));
        
        buffer.writeInt( 10 );
        System.out.println("after write one integer");
        System.out.println("readable bytes: " + buffer.readableBytes( ));
        System.out.println("readable index: " + buffer.readerIndex( ));
        System.out.println("writable bytes: " + buffer.writableBytes( ));
        System.out.println("writable index: " + buffer.writerIndex( ));

        buffer.writeInt( 10 );
        System.out.println("after write two integer");
        System.out.println("readable bytes: " + buffer.readableBytes( ));
        System.out.println("readable index: " + buffer.readerIndex( ));
        System.out.println("writable bytes: " + buffer.writableBytes( ));
        System.out.println("writable index: " + buffer.writerIndex( ));
        
        int i = buffer.readInt( );
        System.out.println("after read one integer: " + i);
        System.out.println("readable bytes: " + buffer.readableBytes( ));
        System.out.println("readable index: " + buffer.readerIndex( ));
        System.out.println("writable bytes: " + buffer.writableBytes( ));
        System.out.println("writable index: " + buffer.writerIndex( ));
        
        buffer.discardReadBytes( );
        System.out.println("after discard read bytes");
        System.out.println("readable bytes: " + buffer.readableBytes( ));
        System.out.println("readable index: " + buffer.readerIndex( ));
        System.out.println("writable bytes: " + buffer.writableBytes( ));
        System.out.println("writable index: " + buffer.writerIndex( ));
        
        buffer.resetReaderIndex( );
        System.out.println("after reset reader index");
        System.out.println("readable bytes: " + buffer.readableBytes( ));
        System.out.println("readable index: " + buffer.readerIndex( ));
        System.out.println("writable bytes: " + buffer.writableBytes( ));
        System.out.println("writable index: " + buffer.writerIndex( ));
        
        buffer.resetWriterIndex( );
        System.out.println("after reset writer index");
        System.out.println("readable bytes: " + buffer.readableBytes( ));
        System.out.println("readable index: " + buffer.readerIndex( ));
        System.out.println("writable bytes: " + buffer.writableBytes( ));
        System.out.println("writable index: " + buffer.writerIndex( ));
    }
}
 

 

 

结果:

 

readable bytes: 0

readable index: 0

writable bytes: 10

writable index: 0

after write one integer

readable bytes: 4

readable index: 0

writable bytes: 6

writable index: 4

after write two integer

readable bytes: 8

readable index: 0

writable bytes: 2

writable index: 8

after read one integer: 10

readable bytes: 4

readable index: 4

writable bytes: 2

writable index: 8

after discard read bytes

readable bytes: 4

readable index: 0

writable bytes: 6

writable index: 4

after reset reader index

readable bytes: 4

readable index: 0

writable bytes: 6

writable index: 4

after reset writer index

readable bytes: 0

readable index: 0

writable bytes: 10

writable index: 0

  • 大小: 9.3 KB
  • 大小: 6.8 KB
  • 大小: 17.5 KB
  • 大小: 15.6 KB
分享到:
评论
1 楼 liaodongdakai 2019-05-20  
Java读源码之Netty深入剖析
网盘地址:https://pan.baidu.com/s/1UZqs2ViSURgY_DEM8Kh8Uw
提取码:6ko0

相关推荐

    JAVA netty 获取串口数据并且下发数据

    Java Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在标题中提到的“JAVA Netty 获取串口数据并且下发数据”,这意味着我们将探讨如何利用Netty来处理串行...

    netty接收串口数据代码,测试串口工具

    java netty接收串口数据 开启windows串口工具 发送串口数据调试助手

    基于java netty的udp客户端声呐数据对接

    在解析JSON时,我们可以使用Jackson或Gson库,将接收到的字符串转换为对应的Java对象。 SCANFISH-II型声呐系统数据接口协议可能定义了特定的数据格式和字段,这些信息需要在解析过程中得到理解和处理。这可能包括声...

    c++客户端和java(Netty)服务器端tcp通讯

    在Java中,Netty提供了一个名为ProtobufDecoder和ProtobufEncoder的类,它们可以帮助我们自动地对protobuf消息进行解码和编码。 当C++客户端需要向Java服务器发送数据时,先序列化protobuf消息,然后通过socket发送...

    基于 Java Netty实现的可用于内网穿透的代理工具.zip

    基于 Java Netty实现的可用于内网穿透的代理工具.zip基于 Java Netty实现的可用于内网穿透的代理工具.zip基于 Java Netty实现的可用于内网穿透的代理工具.zip基于 Java Netty实现的可用于内网穿透的代理工具.zip基于...

    Netty实现Java服务端和C#客户端联通

    标题中的“Netty实现Java服务端和C#客户端联通”是指使用Netty作为Java服务器框架,与C#客户端(使用DotNetty库)进行通信的一种技术实现。这涉及到跨平台的网络通信,以及两个不同编程语言间的交互。 Netty是Java...

    java实现基于netty 的udp字节数据接收服务

    在Java编程环境中,Netty是一个高性能、异步事件驱动的网络应用程序框架,常用于构建可伸缩、高并发的服务器。本示例关注的是如何利用Netty实现一个基于UDP(User Datagram Protocol)的数据接收服务,这在需要进行...

    JavaNetty客户端与服务器

    标题"Java Netty客户端与服务器"表明我们将探讨如何使用Netty构建一个简单的服务器和客户端系统,其中客户端能够发送消息到服务器,而服务器则会给予响应。这种通信模式在许多分布式系统中非常常见,例如聊天应用、...

    java netty权威指南完整版带目录

    Java Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。这本书“Java Netty权威指南”是深入理解并掌握Netty的关键资源,包含详细的目录,使得学习和查阅更为...

    Java Netty基于对象数据传输Demo

    Java Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。这个"Java Netty基于对象数据传输Demo"应该是演示了如何使用Netty进行对象序列化和反序列化,以便在网络...

    基于netty的服务器客户端收发消息代码

    在这个“基于Netty的服务器客户端收发消息代码”中,我们可以看到一个简单的实现,它演示了如何使用Netty进行双向通信,即服务器与客户端之间的消息交换。 首先,我们从服务器端(ChatServer)入手。服务器端通常...

    Netty4编写服务器客户端,自定义编解码,发送自定义消息

    例如,你可以定义一个 `CustomMessage` 类,包含一个整型的序列号、一个字符串类型的消息内容以及其他可能需要的字段。确保为每个字段提供 getter 和 setter 方法以便于访问和设置。 接下来,我们需要编写自定义的...

    netty-4.1_javaNetty_netty_服务器_

    Netty 是一个基于 Java 的高性能、异步事件驱动的网络应用程序框架,专为开发协议服务器和客户端而设计。它的核心是基于 NIO(非阻塞 I/O)模型,提供了丰富的 API 和高度可定制的组件,使得开发者能够快速、高效地...

    java应用netty服务端和客户端

    Java应用程序中的Netty框架是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。Netty广泛应用于分布式系统、云计算、大数据处理等领域,它的核心特性包括非阻塞I/O、...

    JAVA版基于netty的物联网高并发智能网关.zip

    JAVA版基于netty的物联网高并发智能网关 JAVA版基于netty的物联网高并发智能网关 JAVA版基于netty的物联网高并发智能网关 JAVA版基于netty的物联网高并发智能网关 JAVA版基于netty的物联网高并发智能网关 JAVA...

    Java Netty 分布式开发 框架

    Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端,是基于Java NIO的异步非阻塞的网络编程框架。Netty在内部实现了自己的线程模型,支持多种协议,包括UDP、TCP、...

    java netty需要的包

    java netty需要的包。。。。。。。。。。。。java netty需要的包,netty-3.2.10.Final.jar一个就行

    Java采用Netty实现基于DTU的TCP服务器 + 多端口 + 多协议

    在Java中,我们通常不直接与DTU硬件打交道,而是通过模拟DTU协议来处理与之交互的数据。 Netty是一个高性能、异步事件驱动的网络应用框架,适用于开发高并发、低延迟的网络服务。它提供了丰富的API,使得开发者能更...

    JAVA netty完整示例代码

    JAVA netty完整示例代码。里面包括整个项目和所需的JAR包。示例以:TCP/IP自定义报文协议进行解析分析,基于帧头HEAD_DATA=0x76解析过程的示例代码,并对数据进行粘包分离的处理。粘包处理方式有两种:1.自定义报文...

    java netty通信

    Java Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在Java世界中,Netty由于其高效、灵活和强大的特性,已经成为网络编程的事实标准。本主题将深入探讨Netty...

Global site tag (gtag.js) - Google Analytics