`
hbxflihua
  • 浏览: 686998 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

基于java序列化的Netty编解码技术

阅读更多

    基于Java提供的对象输入/输出流ObjectInputStream和ObjectOutputStream,可以直接把Java对象作为可存储的字节数组写入文件,也可以传输到网络上。对程序员来说,基于JDK默认的序列号机制可以避免操作底层的字节数组,从而提高开发效率。

    相信大多数Java程序员接触到的第一种序列化或者编解码技术就是Java默认序列化,只需要序列化的POJO对象实现java.io.Serializable接口,根据实际情况生成序列ID,这个类就能通过java.io.ObjectInput和java.io.ObjectOutput序列化和反序列化。

1、服务端开发

    (1)在服务端ChannelPipeline中添加ObjectDecoder解码器。

    (2)在服务端ChannelPipeline中添加ObjectEncoder编码器。

    (3)需要序列化的POJO类实现Serializable接口。

    (4)构造服务端处理类,处理客户端请求。

 

    请求POJO类

 

package com.serial.java.pojo;

import java.io.Serializable;

/**
 * 
 * @author leeka
 * @notice 
 *	1、实现Serializable接口
 *  2、生成默认的序列化ID
 *  3、重写toString()方法,方便输出
 */
public class SubscribeReq implements Serializable {

	private static final long serialVersionUID = 1L;
	
	private int subReqID;
	private String userName;
	private String productName;
	private String phoneNumber;
	private String address;
	public int getSubReqID() {
		return subReqID;
	}
	public void setSubReqID(int subReqID) {
		this.subReqID = subReqID;
	}
	public String getUserName() {
		return userName;
	}
	public void setUserName(String userName) {
		this.userName = userName;
	}
	public String getProductName() {
		return productName;
	}
	public void setProductName(String productName) {
		this.productName = productName;
	}
	public String getPhoneNumber() {
		return phoneNumber;
	}
	public void setPhoneNumber(String phoneNumber) {
		this.phoneNumber = phoneNumber;
	}
	public String getAddress() {
		return address;
	}
	public void setAddress(String address) {
		this.address = address;
	}
	@Override
	public String toString() {
		return "SubscribeReq [subReqID=" + subReqID + ", userName=" + userName + ", productName=" + productName
				+ ", phoneNumber=" + phoneNumber + ", address=" + address + "]";
	}
}

 

    响应POJO类

 

package com.serial.java.pojo;

import java.io.Serializable;
/**
 * 
 * @author leeka
 * @notice 
 *	1、实现Serializable接口
 *  2、生成默认的序列化ID
 *  3、重写toString()方法,方便输出
 */
public class SubscribeResp  implements Serializable{

	private static final long serialVersionUID = 1L;
	private int subReqID;
	private int respCode;
	private String desc;
	public int getSubReqID() {
		return subReqID;
	}
	public void setSubReqID(int subReqID) {
		this.subReqID = subReqID;
	}
	public int getRespCode() {
		return respCode;
	}
	public void setRespCode(int respCode) {
		this.respCode = respCode;
	}
	public String getDesc() {
		return desc;
	}
	public void setDesc(String desc) {
		this.desc = desc;
	}
	
	@Override
	public String toString() {
		return "SubscribeResp [subReqID=" + subReqID + ", respCode=" + respCode + ", desc=" + desc + "]";
	}
	
}

 

 

    服务端入口

package com.serial.java;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;


public class SubReqServer {

	public void bind(int port)throws Exception{
		
		//配置服务端NIO线程组
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try{
			ServerBootstrap b = new ServerBootstrap();
			b.group(bossGroup, workerGroup)
				.channel(NioServerSocketChannel.class)
				.option(ChannelOption.SO_BACKLOG, 1024)
				.childHandler(new ChannelInitializer<SocketChannel>() {

					@Override
					protected void initChannel(SocketChannel ch) throws Exception {
						ch.pipeline()
						.addLast(new ObjectDecoder(1024*1024//为防止异常码流和解码错位导致的内存溢出,将对象序列化后的最大字节数组长度设为1M
								, ClassResolvers.weakCachingConcurrentResolver(//创建线程安全的WeakReferenceMap对类加载器进行缓存
										this.getClass().getClassLoader())))						
						.addLast(new ObjectEncoder())//对实现了Serializable的POJO对象进行编码
						.addLast(new SubReqServerHandler());						
					}
					
				});
			//绑定端口,同步等待成功
			ChannelFuture f = b.bind(port).sync();
			//等待服务端监听端口关闭
			f.channel().closeFuture().sync();
			
		}finally{
			//退出时释放资源
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}		
	}
	
	public static void main(String[] args) throws Exception{
		int port = 8084;
		if(args!=null && args.length > 0){
			port = Integer.valueOf(args[0]);
		}
		new SubReqServer().bind(port);		
	}
}

 

    自定义服务端处理类

package com.serial.java;

import com.serial.java.pojo.SubscribeReq;
import com.serial.java.pojo.SubscribeResp;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class SubReqServerHandler extends ChannelHandlerAdapter {

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg)
			throws Exception {
		SubscribeReq req = (SubscribeReq) msg;
		if("leeka".equalsIgnoreCase(req.getUserName())){
			System.out.println("service accept client subscribe req:["+ req +"]");
			ctx.writeAndFlush(resp(req.getSubReqID()));		
		}
	}
	
	private SubscribeResp resp(int subReqID){
		SubscribeResp resp = new SubscribeResp();
		resp.setSubReqID(subReqID);
		resp.setRespCode(0);
		resp.setDesc("Netty book order succeed, 3 days later, sent to the designated address");
		return resp;
	}
	
//	@Override
//	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//		ctx.flush();
//	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
			throws Exception {
		//super.exceptionCaught(ctx, cause);
		ctx.close();
	}
	
}

 

2、客户端开发

    (1)将Netty对象编码器和解码器添加到ChannelPipeline。

    (2)客户端一次构造十个请求,最后一次性发送给服务端。

    (3)自定义客户端处理类打印服务端响应的消息。

 

    客户端入口

package com.serial.java;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

public class SubReqClient {
	
	public void connect(int port,String host)throws Exception{
		
		//配置客户端NIO线程组
		EventLoopGroup group = new NioEventLoopGroup();
		
		try{
			Bootstrap b = new Bootstrap();
			b.group(group).channel(NioSocketChannel.class)
				.option(ChannelOption.TCP_NODELAY, true)
				.handler(new ChannelInitializer<SocketChannel>() {
					@Override
					protected void initChannel(SocketChannel ch) throws Exception {
						ch.pipeline()
						.addLast(new ObjectDecoder(1024
								, ClassResolvers.cacheDisabled(//禁止对类加载器进行缓存
										this.getClass().getClassLoader())))						
						.addLast(new ObjectEncoder())//对实现了Serializable的POJO对象进行编码
						.addLast(new SubReqClientHandler());				
					};
				});
			
			//发起异步连接操作
			ChannelFuture f = b.connect(host,port).sync();
			//等待客户端链路关闭
			f.channel().closeFuture().sync();
		}finally{
			//退出,释放资源
			group.shutdownGracefully();
		}
		
	}
	
	public static void main(String[] args)throws Exception {
		int port = 8084;
		if(args!=null && args.length > 0){
			port = Integer.valueOf(args[0]);
		}
		new SubReqClient().connect(port, "127.0.0.1");		
	}
}

 

    客户端处理类

package com.serial.java;

import java.util.logging.Logger;

import com.serial.java.pojo.SubscribeReq;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class SubReqClientHandler extends ChannelHandlerAdapter {

	private static final Logger logger = Logger.getLogger(SubReqClientHandler.class.getName());
	
	public SubReqClientHandler() {	
	}
	
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		for (int i = 0; i < 10; i++) {
			ctx.write(req(i));
		}
		ctx.flush();
	}
	
	private SubscribeReq req(int i){
		SubscribeReq r = new SubscribeReq();
		r.setSubReqID(i);
		r.setAddress("浙江省杭州市西湖区");
		r.setPhoneNumber("1516844444"+i);
		r.setProductName("Netty权威指南系列"+i);
		r.setUserName("leeka");
		return r;
	}
	
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg)
			throws Exception {
		System.out.println("receive server response:["+msg+"]");
	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
			throws Exception {
		logger.warning("unexpected exception from downstream:"+ cause.getMessage());
		ctx.close();
	}
	
}

   

    Java序列化虽然很方便,但也有它的缺点: (1)无法跨语言;(2)序列化后码流大;(3)序列化性能低。目前业界主流的编解码框架有Google的Protobuf、Facebook的Thrift、Jboss的Marshalling等,大家不妨结合这些主流的编解码框架进行编解码操作。

 

OVER

分享到:
评论
1 楼 quanwsx 2016-09-17  
  

相关推荐

    基于Protostuff实现的Netty编解码器.docx

    总结来说,基于Protostuff的Netty编解码器是通过结合Protostuff的序列化和反序列化能力以及Netty的编解码器框架来实现的。它允许我们将复杂的Java对象安全地转换为网络友好的字节流,并在接收端恢复原状。这种机制...

    Netty中的java序列化

    在提供的描述中提到的`SubReqServer`和`SubReqClient`的Main函数,很可能是用来演示如何在Netty中使用Java序列化进行服务端和客户端之间的通信。 在Netty中,`ByteBuf`是一个高效的字节缓冲区,它比Java标准库中的`...

    Netty服务端与客户端依靠Java序列化传输数据

    对于基于Java序列化的Netty应用,单元测试和集成测试至关重要,确保数据的正确序列化和反序列化。可以使用如Mockito和WireMock等工具模拟服务端和客户端交互,验证数据传输的正确性。 综上所述,Netty服务端和...

    Netty中Marshalling编解码自定义栈应用

    Marshalling 是一种序列化技术,它能够将 Java 对象转换为字节流,便于在网络中传输,同时也可以将字节流恢复为原来的对象。在 Netty 中,MarshallingDecoder 和 MarshallingEncoder 分别用于解码和编码过程。这两个...

    Netty+自定义Protobuf编解码器

    2. 编译后的Java类:由protoc生成,用于序列化和反序列化protobuf消息。 3. 示例代码:展示如何使用自定义的Protobuf编解码器。 4. 测试用例:用于验证编解码器功能的正确性。 5. 使用说明:指导如何配置Netty管道以...

    c++客户端和java(Netty)服务器端tcp通讯

    本话题将探讨如何使用C++客户端与Java(通过Netty框架)服务器端实现TCP通讯,并涉及数据序列化工具Protocol Buffers(protobuf)在两者之间的交互。 首先,TCP(传输控制协议)是一种面向连接的、可靠的、基于字节...

    marshalling 1.3.0

    jboss marshalling 1.3.0 Java 对象序列化 netty 编解码

    Netty中Marshalling编解码应用

    需要注意的是,虽然Marshalling提供了跨平台的数据交换能力,但它的性能和效率可能不如其他轻量级的序列化方式,比如Java自带的Java序列化或protobuf。因此,在选择序列化方法时,需要权衡性能、兼容性和易用性等...

    java应用netty服务端和客户端

    在本示例中,"model对象目录必须一致"意味着服务端和客户端需要使用相同的类来序列化和反序列化数据,确保双方可以正确理解和交换消息。 服务端实现: 1. **Bootstrap**:服务端启动器,配置服务器的各种参数,如...

    Java Netty基于对象数据传输Demo

    这个"Java Netty基于对象数据传输Demo"应该是演示了如何使用Netty进行对象序列化和反序列化,以便在网络中高效地传输自定义对象。 Netty的核心在于它的`ByteBuf`类,它是字节缓冲区的高效实现,优于Java NIO的`...

    Netty中Protobuf编解码应用

    Protobuf编解码器在Netty中的应用,使得基于Protobuf的数据在网络传输中变得更加便捷和高效。 首先,理解Protobuf的基本概念。Protobuf定义了一种语言中立、平台中立的数据表示格式,通过.proto文件定义消息结构。...

    JAVA基于Netty实现的聊天室

    7. **消息编码与解码**:在Netty中,我们需要自定义编码器和解码器来处理消息的序列化和反序列化,确保数据在网络间正确传输。例如,可能有一个`ChatMessageDecoder`用于将接收到的字节流转换为聊天消息对象,而`...

    基于java GUI界面的简易netty聊天室(实现群聊和私聊)

    聊天消息以JSON格式封装,包含发送者、接收者、消息内容等字段,然后通过Netty的编码器和解码器进行序列化和反序列化,确保数据在网络中的安全传输。 【实现流程】 1. 用户通过GUI输入消息并选择聊天模式。 2. 消息...

    netty传输kryo序列化的对象基于socket传输

    本主题“netty传输kryo序列化的对象基于socket传输”探讨的是如何结合Netty和Kryo,实现高效、可靠的网络数据传输。首先,我们需要了解Kryo序列化的基本原理。Kryo通过跟踪对象引用和存储类型信息来优化序列化过程,...

    Netty之自定义编解码器.zip

    而在encode()方法中,我们需要将业务对象转换为ByteBuf,通常会涉及序列化过程。 为了保证解码的正确性,需要注意处理以下几点: 1. 处理字节顺序:如果你的协议包含多字节的数值,需要处理字节顺序(大端序或小端...

    使用Netty进行编解码的操作过程详解

    在Java中,默认的序列化机制可以将对象转换为可存储的字节数组,但这种方法存在着一些问题,如无法跨语言、序列化后的码流太大、序列化的性能太低等。为了解决这些问题,需要使用专门的编解码框架,如Google的...

    java netty通信

    在Netty中,我们使用Encoder和Decoder来处理数据的序列化和反序列化。例如,ByteToMessageDecoder和MessageToByteEncoder用于将原始字节流转换为业务对象,反之亦然。 5. **心跳机制** 心跳机制在长连接中至关...

    Netty之序列化协议

    1. **Java序列化**:Java自带的序列化机制简单易用,适用于Java对象的本地持久化或跨JVM通信。但其缺点在于序列化后的数据量大,且不适用于网络传输,因为其包含了大量元数据,效率较低。 2. **protobuf(Protocol ...

Global site tag (gtag.js) - Google Analytics