Google的Protobuf在业界非常流行,很多商业项目都选择Protobuf作为编解码框架,以下为Protobuf的一些优点:
(1)在谷歌内长期使用,产品成熟度高。
(2)跨语言,支持包括C++、Java、Python在内的多重语言。
(3)编码后的码流小,便于存储和传输。
(4)编解码性能高。
(5)支持不同协议向前兼容。
(6)支持定义可选和必选字段。
一、Protobuf开发环境搭建
1、下载Protobuf的Windows版,网址如下:https://developers.google.com/protocol-buffers/docs/downloads?hl=zh-cn,本示例基于protoc-2.6.1-win32.zip
2、下载Protobuf Java语言所需的jar包,网址如下:http://repo2.maven.org/maven2/com/google/protobuf/protobuf-java/2.6.1/,本示例基于protobuf-java-2.6.1.jar。
3、新建请求响应所需的proto文件
SubscribeReq.proto
package netty; option java_package = "com.serial.java.protobuf"; option java_outer_classname = "SubscribeReqProto"; message SubscribeReq{ required int32 subReqID = 1; required string userName = 2; required string productName = 3; repeated string address = 4; }
SubscribeRespProto.proto
package netty; option java_package = "com.serial.java.protobuf"; option java_outer_classname = "SubscribeRespProto"; message SubscribeResp{ required int32 subReqID = 1; required string respCode = 2; required string desc = 3; }
4、通过Protoc.exe生成所需的Java编解码POJO文件,命令行如下。
C:\Users\Administrator>d: D:\>cd "Program Files\protoc-2.6.1-win32" D:\Program Files\protoc-2.6.1-win32>protoc.exe --java_out=.\src .\netty\Subscrib eReq.proto D:\Program Files\protoc-2.6.1-win32>protoc.exe --java_out=.\src .\netty\Subscrib eResp.proto D:\Program Files\protoc-2.6.1-win32>
5、将生成的Java POJO文件拷贝到项目中,注意Protobuf所需的jar包也需包含在项目中,不然会报错。
6、创建测试类,测试Protobuf的编解码功能。
TestSubscribeReq.java
package com.serial.java.test; import java.util.ArrayList; import java.util.List; import com.google.protobuf.InvalidProtocolBufferException; import com.serial.java.protobuf.SubscribeReqProto; public class TestSubscribeReq { private static byte [] encode(SubscribeReqProto.SubscribeReq req){ return req.toByteArray(); } private static SubscribeReqProto.SubscribeReq decode(byte [] body) throws InvalidProtocolBufferException{ return SubscribeReqProto.SubscribeReq.parseFrom(body); } private static SubscribeReqProto.SubscribeReq createSubscribeReq(){ SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder(); builder.setSubReqID(1); builder.setUserName("leeka"); builder.setProductName("Netty book"); List<String> address = new ArrayList<String>(); address.add("Nanjing"); address.add("Beijing"); address.add("Hangzhou"); builder.addAllAddress(address); return builder.build(); } public static void main(String[] args)throws Exception { SubscribeReqProto.SubscribeReq req = createSubscribeReq(); System.out.println("before encode:"+ req.toString()); SubscribeReqProto.SubscribeReq req2 = decode(encode(req)); System.out.println("after encode:"+ req2.toString()); System.out.println("Assert equal: " + req2.equals(req)); } }
7、运行测试类,查看测试结果,控制台输出如下信息:
before encode:subReqID: 1 userName: "leeka" productName: "Netty book" address: "Nanjing" address: "Beijing" address: "Hangzhou" after encode:subReqID: 1 userName: "leeka" productName: "Netty book" address: "Nanjing" address: "Beijing" address: "Hangzhou" Assert equal: true
二、Netty的Protobuf服务端和客户端开发
服务端入口
package com.serial.java; import com.serial.java.protobuf.SubscribeReqProto; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; public class SubReqServer { public void bind(int port)throws Exception{ //配置服务端NIO线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try{ ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new ProtobufVarint32FrameDecoder()) .addLast(new ProtobufDecoder( SubscribeReqProto.SubscribeReq.getDefaultInstance())) .addLast(new ProtobufVarint32LengthFieldPrepender()) .addLast(new ProtobufEncoder()) .addLast(new SubReqServerHandler()); } }); //绑定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); //等待服务端监听端口关闭 f.channel().closeFuture().sync(); }finally{ //退出时释放资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception{ int port = 8085; if(args!=null && args.length > 0){ port = Integer.valueOf(args[0]); } new SubReqServer().bind(port); } }
服务端处理类
package com.serial.java; import com.serial.java.protobuf.SubscribeReqProto; import com.serial.java.protobuf.SubscribeRespProto; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class SubReqServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq)msg; //System.out.println("SubReqServerHandler channelRead:"+ req.getUserName()); if("leeka".equalsIgnoreCase(req.getUserName())){ System.out.println("service accept client subscribe req:["+ req +"]"); ctx.writeAndFlush(resp(req.getSubReqID())); } } private SubscribeRespProto.SubscribeResp resp(int subReqID){ SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp.newBuilder(); builder.setSubReqID(subReqID); builder.setRespCode("0"); builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address"); return builder.build(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
客户端入口
package com.serial.java; import com.serial.java.protobuf.SubscribeRespProto; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; public class SubReqClient { public void connect(int port,String host)throws Exception{ //配置客户端NIO线程组 EventLoopGroup group = new NioEventLoopGroup(); try{ Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new ProtobufVarint32FrameDecoder()) .addLast(new ProtobufDecoder( SubscribeRespProto.SubscribeResp.getDefaultInstance())) .addLast(new ProtobufVarint32LengthFieldPrepender()) .addLast(new ProtobufEncoder()) .addLast(new SubReqClientHandler()); }; }); //发起异步连接操作 ChannelFuture f = b.connect(host,port).sync(); //等待客户端链路关闭 f.channel().closeFuture().sync(); }finally{ //退出,释放资源 group.shutdownGracefully(); } } public static void main(String[] args)throws Exception { int port = 8085; if(args!=null && args.length > 0){ port = Integer.valueOf(args[0]); } new SubReqClient().connect(port, "127.0.0.1"); } }
客户端处理类
package com.serial.java; import java.util.ArrayList; import java.util.List; import java.util.logging.Logger; import com.serial.java.protobuf.SubscribeReqProto; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class SubReqClientHandler extends ChannelHandlerAdapter { private static final Logger logger = Logger.getLogger(SubReqClientHandler.class.getName()); public SubReqClientHandler() { } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for (int i = 0; i < 10; i++) { ctx.write(req(i)); } ctx.flush(); } private SubscribeReqProto.SubscribeReq req(int i){ SubscribeReqProto.SubscribeReq.Builder r = SubscribeReqProto.SubscribeReq.newBuilder(); r.setSubReqID(i); r.setProductName("Netty Book"+i); r.setUserName("leeka"); List<String> address = new ArrayList<String>(); address.add("Nanjing"); address.add("Beijing"); r.addAllAddress(address); return r.build(); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //super.channelReadComplete(ctx); ctx.flush(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("receive server response:["+msg+"]"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.warning("unexpected exception from downstream:"+ cause.getMessage()); ctx.close(); } }
OVER
相关推荐
在提供的描述中提到,这个自定义编解码器能够发送多个protobuf对象,这意味着它可能采用了某种机制来区分不同类型的protobuf消息,比如添加一个消息类型标识或使用多路复用技术。 对于Protobuf本身,它是通过定义....
Protobuf编解码器在Netty中的应用,使得基于Protobuf的数据在网络传输中变得更加便捷和高效。 首先,理解Protobuf的基本概念。Protobuf定义了一种语言中立、平台中立的数据表示格式,通过.proto文件定义消息结构。...
本项目“基于Netty与Protobuf的Android手机视频实时传输”利用了高性能的网络库Netty和高效的序列化框架Protobuf,实现了高效、低延迟的视频流传输。下面将详细介绍这两个关键技术及其在该项目中的应用。 **Netty**...
这个项目“基于netty和protobuf的聊天系统,客户端+服务器”就是这样一个实例,它展示了如何利用Java语言结合Netty框架和Protocol Buffers(protobuf)来搭建一个高性能、低延迟的聊天应用。 Netty是一个开源的异步...
本项目“springboot-netty-protobuf-master”旨在提供一个基础架构,它利用了Spring Boot的便捷性以及Netty的高效网络通信能力,同时采用Google的Protocol Buffers(protobuf)作为数据交换格式,确保数据传输的高效...
在本示例中,我们将深入探讨如何利用 Netty 和 Google 的 Protocol Buffers(protobuf)来构建一个简单的服务端和客户端通信系统。 Protocol Buffers 是 Google 提供的一种数据序列化协议,它可以将结构化数据序列...
通过以上步骤,我们可以利用Netty实现基于HTTP、WebSocket和Protobuf的通信。Netty的强大之处在于其灵活性和高度定制性,允许开发者根据具体需求调整和扩展网络通信的各个层面。无论是在大规模分布式系统还是小型...
3. **创建 Netty 通道处理器**:在 Netty 中,我们需要创建一个自定义的 `ChannelInboundHandler` 或 `ChannelOutboundHandler` 来处理 Protobuf 消息的编码和解码。可以使用 ` ProtobufVarint32FrameDecoder` 和 `...
Netty、HTTP与Protobuf是三个在IT领域中至关重要的技术组件,它们分别在不同的层面上为高性能网络应用提供服务。下面将详细解释这三个概念及其相互结合的应用。 **Netty** Netty是一个开源的Java NIO(非阻塞I/O)...
综上所述,"java快速上手的网络IO框架,基于netty, google protobuf 数据传输协议"的主题涵盖了Java网络编程的关键技术。通过学习和实践,开发者可以构建出高性能、低延迟的网络应用,实现数据的高效传输。这个...
在Netty中,编码(Encoding)和解码(Decoding)是网络通信中处理数据转换的关键步骤,而Marshalling就是一种常见的序列化和反序列化技术,它在Netty中被用于将Java对象转化为字节流以便在网络间传输,同时也能将...
服务端(rrkd-file-server)接收到数据后,同样利用Netty的事件驱动模型,将接收到的字节流转交给Protobuf的解码器,还原成原始文件内容。这个过程可能涉及多个网络包的合并,Netty的流式处理能力在此处得到体现,...
3. **创建ProtoBuf编码解码器**:在Netty中,我们需要自定义编码器和解码器来处理ProtoBuf格式的数据。可以继承`ByteToMessageDecoder`和`MessageToByteEncoder`,重写其方法。编码器将Java对象转换为ByteBuf,解码...
在本资源中,"NettyProtobufTcpServer"是使用Netty实现的TCP服务器,它负责接收客户端的连接并处理基于Protobuf编码的数据。TCP协议提供了可靠的、面向连接的通信,适合于需要保证数据完整性和顺序的游戏通信场景。 ...
在《netty+protobuf 整合实战》中,作者通过实际的源代码展示了如何将这两个技术结合使用。首先,我们需要理解 Protobuf 的工作原理。 Protobuf 提供了语言无关的 .proto 文件来定义数据结构,然后通过 protoc ...
为了解决这些问题,需要使用专门的编解码框架,如Google的Protobuf、Facebok的Thrift、Jboss Marshalling、MessagePack等。 MessagePack是一个高效的二进制序列化框架,它像JSON一样支持不同的语言间的数据交换,...
85_Netty编解码器剖析与入站出站处理器详解;86_Netty自定义编解码器与TCP粘包拆包问题;87_Netty编解码器执行流程深入分析;88_ReplayingDecoder源码分析与特性解读;89_Netty常见且重要编解码器详解;90_TCP粘包与...
本项目"基于Netty+Redis+protobuf开发的即时通讯服务器"利用了三种技术:Netty作为网络通信库,Redis作为数据缓存与消息中间件,以及protobuf作为序列化协议,实现了高性能的实时通信解决方案。 首先,Netty是一个...
在游戏开发中,protobuf(Protocol Buffers)是一种广泛使用的数据序列化协议,它由Google开发,旨在提供一种高效、简洁的方式来定义数据结构,并且能在多种编程语言之间进行数据交换。protobuf的主要优势在于其紧凑...