`
hbxflihua
  • 浏览: 679382 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

基于Google Protobuf的Netty编解码技术

阅读更多

Google的Protobuf在业界非常流行,很多商业项目都选择Protobuf作为编解码框架,以下为Protobuf的一些优点:

    (1)在谷歌内长期使用,产品成熟度高。

    (2)跨语言,支持包括C++、Java、Python在内的多重语言。

    (3)编码后的码流小,便于存储和传输。

    (4)编解码性能高。

    (5)支持不同协议向前兼容。

    (6)支持定义可选和必选字段。

 

一、Protobuf开发环境搭建

    1、下载Protobuf的Windows版,网址如下:https://developers.google.com/protocol-buffers/docs/downloads?hl=zh-cn,本示例基于protoc-2.6.1-win32.zip

    2、下载Protobuf Java语言所需的jar包,网址如下:http://repo2.maven.org/maven2/com/google/protobuf/protobuf-java/2.6.1/,本示例基于protobuf-java-2.6.1.jar

    3、新建请求响应所需的proto文件

    SubscribeReq.proto

package netty;
option java_package = "com.serial.java.protobuf";
option java_outer_classname = "SubscribeReqProto";

message SubscribeReq{
	required int32 subReqID = 1;
	required string userName = 2;
	required string productName = 3;
	repeated string address = 4;
}

    SubscribeRespProto.proto

package netty;
option java_package = "com.serial.java.protobuf";
option java_outer_classname = "SubscribeRespProto";
message SubscribeResp{
	required int32 subReqID = 1;
	required string respCode = 2;
	required string desc = 3;
}

    4、通过Protoc.exe生成所需的Java编解码POJO文件,命令行如下。

C:\Users\Administrator>d:
D:\>cd "Program Files\protoc-2.6.1-win32"
D:\Program Files\protoc-2.6.1-win32>protoc.exe --java_out=.\src .\netty\Subscrib
eReq.proto
D:\Program Files\protoc-2.6.1-win32>protoc.exe --java_out=.\src .\netty\Subscrib
eResp.proto
D:\Program Files\protoc-2.6.1-win32>

    5、将生成的Java POJO文件拷贝到项目中,注意Protobuf所需的jar包也需包含在项目中,不然会报错。

    6、创建测试类,测试Protobuf的编解码功能。

    TestSubscribeReq.java

package com.serial.java.test;

import java.util.ArrayList;
import java.util.List;

import com.google.protobuf.InvalidProtocolBufferException;
import com.serial.java.protobuf.SubscribeReqProto;

public class TestSubscribeReq {

	private static byte [] encode(SubscribeReqProto.SubscribeReq req){
		return req.toByteArray();
	}
	
	private static SubscribeReqProto.SubscribeReq decode(byte [] body) 
			throws InvalidProtocolBufferException{
		return SubscribeReqProto.SubscribeReq.parseFrom(body);
	}
	
	private static SubscribeReqProto.SubscribeReq createSubscribeReq(){
		SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();
		builder.setSubReqID(1);
		builder.setUserName("leeka");
		builder.setProductName("Netty book");
		
		List<String> address = new ArrayList<String>();
		address.add("Nanjing");
		address.add("Beijing");
		address.add("Hangzhou");
		builder.addAllAddress(address);
		return builder.build();
	}
	
	
	public static void main(String[] args)throws Exception {		
		SubscribeReqProto.SubscribeReq req = createSubscribeReq();
		System.out.println("before encode:"+ req.toString());		
		SubscribeReqProto.SubscribeReq req2 = decode(encode(req));		
		System.out.println("after encode:"+ req2.toString());		
		System.out.println("Assert equal: " + req2.equals(req));
		
	}
	
}

    7、运行测试类,查看测试结果,控制台输出如下信息:

before encode:subReqID: 1
userName: "leeka"
productName: "Netty book"
address: "Nanjing"
address: "Beijing"
address: "Hangzhou"

after encode:subReqID: 1
userName: "leeka"
productName: "Netty book"
address: "Nanjing"
address: "Beijing"
address: "Hangzhou"

Assert equal: true

 

 二、Netty的Protobuf服务端和客户端开发

     服务端入口

package com.serial.java;

import com.serial.java.protobuf.SubscribeReqProto;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;


public class SubReqServer {

	public void bind(int port)throws Exception{
		
		//配置服务端NIO线程组
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try{
			ServerBootstrap b = new ServerBootstrap();
			b.group(bossGroup, workerGroup)
				.channel(NioServerSocketChannel.class)
				.option(ChannelOption.SO_BACKLOG, 1024)
				.handler(new LoggingHandler(LogLevel.INFO))
				.childHandler(new ChannelInitializer<SocketChannel>() {

					@Override
					protected void initChannel(SocketChannel ch) throws Exception {
						ch.pipeline()
						.addLast(new ProtobufVarint32FrameDecoder())						
						.addLast(new ProtobufDecoder(
								SubscribeReqProto.SubscribeReq.getDefaultInstance()))						
						.addLast(new ProtobufVarint32LengthFieldPrepender())						
						.addLast(new ProtobufEncoder())						
						.addLast(new SubReqServerHandler());						
					}
					
				});
			//绑定端口,同步等待成功
			ChannelFuture f = b.bind(port).sync();
			//等待服务端监听端口关闭
			f.channel().closeFuture().sync();
			
		}finally{
			//退出时释放资源
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}		
	}
	
	public static void main(String[] args) throws Exception{
		int port = 8085;
		if(args!=null && args.length > 0){
			port = Integer.valueOf(args[0]);
		}
		new SubReqServer().bind(port);		
	}
}

 

    服务端处理类

package com.serial.java;

import com.serial.java.protobuf.SubscribeReqProto;
import com.serial.java.protobuf.SubscribeRespProto;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class SubReqServerHandler extends ChannelHandlerAdapter {

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg)
			throws Exception {
		SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq)msg;
		//System.out.println("SubReqServerHandler channelRead:"+ req.getUserName());
		if("leeka".equalsIgnoreCase(req.getUserName())){
			System.out.println("service accept client subscribe req:["+ req +"]");
			ctx.writeAndFlush(resp(req.getSubReqID()));		
		}
	}
	
	private SubscribeRespProto.SubscribeResp resp(int subReqID){
		SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp.newBuilder();
		builder.setSubReqID(subReqID);
		builder.setRespCode("0");
		builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address");
		return builder.build();
	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
			throws Exception {
		cause.printStackTrace();
		ctx.close();
	}
	
}

    

    客户端入口

package com.serial.java;

import com.serial.java.protobuf.SubscribeRespProto;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

public class SubReqClient {
	
	public void connect(int port,String host)throws Exception{
		
		//配置客户端NIO线程组
		EventLoopGroup group = new NioEventLoopGroup();
		
		try{
			Bootstrap b = new Bootstrap();
			b.group(group).channel(NioSocketChannel.class)
				.option(ChannelOption.TCP_NODELAY, true)
				.handler(new ChannelInitializer<SocketChannel>() {
					@Override
					protected void initChannel(SocketChannel ch) throws Exception {
						ch.pipeline()
						.addLast(new ProtobufVarint32FrameDecoder())						
						.addLast(new ProtobufDecoder(
								SubscribeRespProto.SubscribeResp.getDefaultInstance()))						
						.addLast(new ProtobufVarint32LengthFieldPrepender())						
						.addLast(new ProtobufEncoder())						
						.addLast(new SubReqClientHandler());
					};
				});
			
			//发起异步连接操作
			ChannelFuture f = b.connect(host,port).sync();
			//等待客户端链路关闭
			f.channel().closeFuture().sync();
		}finally{
			//退出,释放资源
			group.shutdownGracefully();
		}
		
	}
	
	public static void main(String[] args)throws Exception {
		int port = 8085;
		if(args!=null && args.length > 0){
			port = Integer.valueOf(args[0]);
		}
		new SubReqClient().connect(port, "127.0.0.1");		
	}
}

 

    客户端处理类

package com.serial.java;

import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;

import com.serial.java.protobuf.SubscribeReqProto;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class SubReqClientHandler extends ChannelHandlerAdapter {

	private static final Logger logger = Logger.getLogger(SubReqClientHandler.class.getName());
	
	public SubReqClientHandler() {	
		
	}
	
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		for (int i = 0; i < 10; i++) {
			ctx.write(req(i));
		}
		ctx.flush();
	}
	
	private SubscribeReqProto.SubscribeReq req(int i){
		SubscribeReqProto.SubscribeReq.Builder r = SubscribeReqProto.SubscribeReq.newBuilder();
		r.setSubReqID(i);
		r.setProductName("Netty Book"+i);
		r.setUserName("leeka");
		
		List<String> address = new ArrayList<String>();
		address.add("Nanjing");
		address.add("Beijing");
		r.addAllAddress(address);		
		return r.build();
	}
	
	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		//super.channelReadComplete(ctx);
		ctx.flush();
	}
	
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg)
			throws Exception {
		System.out.println("receive server response:["+msg+"]");
	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
			throws Exception {
		logger.warning("unexpected exception from downstream:"+ cause.getMessage());
		ctx.close();
	}
	
}

 

OVER

分享到:
评论
1 楼 quanwsx 2016-09-19  
   

相关推荐

    Netty+自定义Protobuf编解码器

    在提供的描述中提到,这个自定义编解码器能够发送多个protobuf对象,这意味着它可能采用了某种机制来区分不同类型的protobuf消息,比如添加一个消息类型标识或使用多路复用技术。 对于Protobuf本身,它是通过定义....

    Netty中Protobuf编解码应用

    Protobuf编解码器在Netty中的应用,使得基于Protobuf的数据在网络传输中变得更加便捷和高效。 首先,理解Protobuf的基本概念。Protobuf定义了一种语言中立、平台中立的数据表示格式,通过.proto文件定义消息结构。...

    基于netty与protobuf的Android手机视频实时传输

    本项目“基于Netty与Protobuf的Android手机视频实时传输”利用了高性能的网络库Netty和高效的序列化框架Protobuf,实现了高效、低延迟的视频流传输。下面将详细介绍这两个关键技术及其在该项目中的应用。 **Netty**...

    基于netty和protobuf的聊天系统,客户端+服务器

    这个项目“基于netty和protobuf的聊天系统,客户端+服务器”就是这样一个实例,它展示了如何利用Java语言结合Netty框架和Protocol Buffers(protobuf)来搭建一个高性能、低延迟的聊天应用。 Netty是一个开源的异步...

    springboot集成netty,使用protobuf作为数据交换格式,可以用于智能终端云端服务脚手架

    本项目“springboot-netty-protobuf-master”旨在提供一个基础架构,它利用了Spring Boot的便捷性以及Netty的高效网络通信能力,同时采用Google的Protocol Buffers(protobuf)作为数据交换格式,确保数据传输的高效...

    netty基于protobuf的简单示例

    在本示例中,我们将深入探讨如何利用 Netty 和 Google 的 Protocol Buffers(protobuf)来构建一个简单的服务端和客户端通信系统。 Protocol Buffers 是 Google 提供的一种数据序列化协议,它可以将结构化数据序列...

    netty学习文件,实现http,websocket,protobuf

    通过以上步骤,我们可以利用Netty实现基于HTTP、WebSocket和Protobuf的通信。Netty的强大之处在于其灵活性和高度定制性,允许开发者根据具体需求调整和扩展网络通信的各个层面。无论是在大规模分布式系统还是小型...

    netty+protobuf入门案例

    3. **创建 Netty 通道处理器**:在 Netty 中,我们需要创建一个自定义的 `ChannelInboundHandler` 或 `ChannelOutboundHandler` 来处理 Protobuf 消息的编码和解码。可以使用 ` ProtobufVarint32FrameDecoder` 和 `...

    netty http protobuf

    Netty、HTTP与Protobuf是三个在IT领域中至关重要的技术组件,它们分别在不同的层面上为高性能网络应用提供服务。下面将详细解释这三个概念及其相互结合的应用。 **Netty** Netty是一个开源的Java NIO(非阻塞I/O)...

    java快速上手的网络IO框架,基于netty, google protobuf 数据传输协议.zip

    综上所述,"java快速上手的网络IO框架,基于netty, google protobuf 数据传输协议"的主题涵盖了Java网络编程的关键技术。通过学习和实践,开发者可以构建出高性能、低延迟的网络应用,实现数据的高效传输。这个...

    Netty中Marshalling编解码应用

    在Netty中,编码(Encoding)和解码(Decoding)是网络通信中处理数据转换的关键步骤,而Marshalling就是一种常见的序列化和反序列化技术,它在Netty中被用于将Java对象转化为字节流以便在网络间传输,同时也能将...

    采用netty与protobuf进行文件传输

    服务端(rrkd-file-server)接收到数据后,同样利用Netty的事件驱动模型,将接收到的字节流转交给Protobuf的解码器,还原成原始文件内容。这个过程可能涉及多个网络包的合并,Netty的流式处理能力在此处得到体现,...

    Netty发送protoBuf格式数据

    3. **创建ProtoBuf编码解码器**:在Netty中,我们需要自定义编码器和解码器来处理ProtoBuf格式的数据。可以继承`ByteToMessageDecoder`和`MessageToByteEncoder`,重写其方法。编码器将Java对象转换为ByteBuf,解码...

    通信与协议Netty+Protobuf-游戏设计与开发(1)配套代码

    在本资源中,"NettyProtobufTcpServer"是使用Netty实现的TCP服务器,它负责接收客户端的连接并处理基于Protobuf编码的数据。TCP协议提供了可靠的、面向连接的通信,适合于需要保证数据完整性和顺序的游戏通信场景。 ...

    netty+protobuf (整合源代码)

    在《netty+protobuf 整合实战》中,作者通过实际的源代码展示了如何将这两个技术结合使用。首先,我们需要理解 Protobuf 的工作原理。 Protobuf 提供了语言无关的 .proto 文件来定义数据结构,然后通过 protoc ...

    使用Netty进行编解码的操作过程详解

    为了解决这些问题,需要使用专门的编解码框架,如Google的Protobuf、Facebok的Thrift、Jboss Marshalling、MessagePack等。 MessagePack是一个高效的二进制序列化框架,它像JSON一样支持不同的语言间的数据交换,...

    93个netty高并发教学视频下载.txt

    85_Netty编解码器剖析与入站出站处理器详解;86_Netty自定义编解码器与TCP粘包拆包问题;87_Netty编解码器执行流程深入分析;88_ReplayingDecoder源码分析与特性解读;89_Netty常见且重要编解码器详解;90_TCP粘包与...

    基于Netty+Redis+protobuf开发的即时通讯服务器.zip

    本项目"基于Netty+Redis+protobuf开发的即时通讯服务器"利用了三种技术:Netty作为网络通信库,Redis作为数据缓存与消息中间件,以及protobuf作为序列化协议,实现了高性能的实时通信解决方案。 首先,Netty是一个...

    游戏开发中的protobuf自动解码DEMO

    在游戏开发中,protobuf(Protocol Buffers)是一种广泛使用的数据序列化协议,它由Google开发,旨在提供一种高效、简洁的方式来定义数据结构,并且能在多种编程语言之间进行数据交换。protobuf的主要优势在于其紧凑...

Global site tag (gtag.js) - Google Analytics