在我们使用 netty 的过程中,有时候为了高效的传输数据,经常使用 protobuf 进行数据的传输,netty默认情况下为我们实现的 protobuf 的编解码,但是默认的只能实现单个对象的编解码,但是我们在使用 netty 的过程中,可能需要传输的对象有各种各样的,那么该如何实现对protobuf多协议的解码呢?
在 protobuf 中有一种类型的字段叫做 oneof , 被 oneof 声明的字段就类似于可选字段,在同一时刻只有一个字段有值,并且它们会共享内存。
有了上述基础知识,我们来实现一个简单的功能。
需求:
客户端在连接上服务器端后,每隔 1s 向服务器端发送一个 protobuf 类型的对象(比如登录报文、创建任务报文、删除任务报文等等),服务器端接收到这个对象并打印出来。
protobuf文件的编写:
在protobuf 文件中,我们申明一个 枚举类型的字段,用来标识当前发送的 protobuf 对象的类型,比如是登录报文、创建任务报文还是别的,然后在 oneof 字段中,申明所有可能需要传递的 报文实体。
一、protobuf-java jar包的引入
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.6.1</version> </dependency>
二、proto 文件的编写
注意:
1、定义的枚举是为了标识当前发送的是什么类型的消息
2、需要发送的多个消息统一放入到 oneof 中进行申明
3、到时候给 netty 编解码的时候就编解码 TaskProtocol 对象
三、使用 protoc 命令根据 .proto 文件生成 对应的 java 代码
四、netty服务器端的编写
/** * netty protobuf server * * @author huan.fu * @date 2019/2/15 - 11:54 */ @Slf4j public class NettyProtobufServer { public static void main(String[] args) throws InterruptedException { EventLoopGroup parentGroup = new NioEventLoopGroup(1); EventLoopGroup childGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(parentGroup, childGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) // 连接超时 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000) .handler(new LoggingHandler(LogLevel.TRACE)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline() .addLast(new ProtobufVarint32FrameDecoder()) .addLast(new ProtobufDecoder(TaskProtobufWrapper.TaskProtocol.getDefaultInstance())) .addLast(new ProtobufVarint32LengthFieldPrepender()) .addLast(new ProtobufEncoder()) .addLast(new ServerProtobufHandler()); } }); // 绑定端口,同步等待成功 ChannelFuture future = bootstrap.bind(9090).sync(); log.info("server start in port:[{}]", 9090); // 等待服务端链路关闭后,main线程退出 future.channel().closeFuture().sync(); // 关闭线程池资源 parentGroup.shutdownGracefully(); childGroup.shutdownGracefully(); } }
注意:
1、注意一下 netty 是如何使用那些编解码器来编解码 protobuf 的。
五、服务器端接收到客户端发送过来的消息的处理
/** * 服务器端接收到客户端发送的请求,然后随机给客户端返回一个对象 * * @author huan.fu * @date 2019/2/15 - 14:26 */ @Slf4j public class ServerProtobufHandler extends SimpleChannelInboundHandler<TaskProtobufWrapper.TaskProtocol> { @Override protected void channelRead0(ChannelHandlerContext ctx, TaskProtobufWrapper.TaskProtocol taskProtocol) { switch (taskProtocol.getPackType()) { case LOGIN: log.info("接收到一个登录类型的pack:[{}]", taskProtocol.getLoginPack().getUsername() + " : " + taskProtocol.getLoginPack().getPassword()); break; case CREATE_TASK: log.info("接收到一个创建任务类型的pack:[{}]", taskProtocol.getCreateTaskPack().getTaskId() + " : " + taskProtocol.getCreateTaskPack().getTaskName()); break; case DELETE_TASK: log.info("接收到一个删除任务类型的pack:[{}]", Arrays.toString(taskProtocol.getDeleteTaskPack().getTaskIdList().toArray())); break; default: log.error("接收到一个未知类型的pack:[{}]", taskProtocol.getPackType()); break; } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); log.error("发生异常", cause); } }
注意:
1、服务器端根据 packType 字段来判断客户端发送的是什么类型的消息
六、netty 客户端的编写
/** * netty protobuf client * * @author huan.fu * @date 2019/2/15 - 11:54 */ @Slf4j public class NettyProtobufClient { public static void main(String[] args) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline() .addLast(new ProtobufVarint32FrameDecoder()) .addLast(new ProtobufDecoder(TaskProtobufWrapper.TaskProtocol.getDefaultInstance())) .addLast(new ProtobufVarint32LengthFieldPrepender()) .addLast(new ProtobufEncoder()) .addLast(new ClientProtobufHandler()); } }); ChannelFuture future = bootstrap.connect("127.0.0.1", 9090).sync(); log.info("client connect server."); future.channel().closeFuture().sync(); group.shutdownGracefully(); } }
七、客户端连接到服务器端时的处理
/** * 客户端连接到服务器端后,每隔1s发送一个报文到服务器端 * * @author huan.fu * @date 2019/2/15 - 14:26 */ @Slf4j public class ClientProtobufHandler extends ChannelInboundHandlerAdapter { private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); private AtomicInteger atomicInteger = new AtomicInteger(1); @Override public void channelActive(ChannelHandlerContext ctx) { executor.scheduleAtFixedRate(() -> { // 产生的pack类型 int packType = new Random().nextInt(3); switch (TaskProtobufWrapper.PackType.forNumber(packType)) { case LOGIN: TaskProtobufWrapper.LoginPack loginPack = TaskProtobufWrapper.LoginPack.newBuilder().setUsername("张三[" + atomicInteger.getAndIncrement() + "]").setPassword("123456").build(); ctx.writeAndFlush(TaskProtobufWrapper.TaskProtocol.newBuilder().setPackType(TaskProtobufWrapper.PackType.LOGIN).setLoginPack(loginPack).build()); break; case CREATE_TASK: TaskProtobufWrapper.CreateTaskPack createTaskPack = TaskProtobufWrapper.CreateTaskPack.newBuilder().setCreateTime(System.currentTimeMillis()).setTaskId("100" + atomicInteger.get()).setTaskName("任务编号" + atomicInteger.get()).build(); ctx.writeAndFlush(TaskProtobufWrapper.TaskProtocol.newBuilder().setPackType(TaskProtobufWrapper.PackType.CREATE_TASK).setCreateTaskPack(createTaskPack).build()); break; case DELETE_TASK: TaskProtobufWrapper.DeleteTaskPack deleteTaskPack = TaskProtobufWrapper.DeleteTaskPack.newBuilder().addTaskId("1001").addTaskId("1002").build(); ctx.writeAndFlush(TaskProtobufWrapper.TaskProtocol.newBuilder().setPackType(TaskProtobufWrapper.PackType.DELETE_TASK).setDeleteTaskPack(deleteTaskPack).build()); break; default: log.error("产生一个未知的包类型:[{}]", packType); break; } }, 0, 1, TimeUnit.SECONDS); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); log.error("发生异常", cause); } }
注意:
1、客户端在连接服务器端时每隔1s发送不同的消息到服务器端
八、运行结果
九、完整代码
完成代码如下:https://gitee.com/huan1993/netty-study/tree/master/src/main/java/com/huan/netty/protobuf
相关推荐
5. 实现业务逻辑:在服务端和客户端的业务代码中,使用protobuf生成的Java类进行数据操作。 6. 测试通信:编写客户端代码,连接到服务器并发送protobuf编码的数据,验证数据能否正确接收和处理。 通过这样的集成,...
在提供的描述中提到,这个自定义编解码器能够发送多个protobuf对象,这意味着它可能采用了某种机制来区分不同类型的protobuf消息,比如添加一个消息类型标识或使用多路复用技术。 对于Protobuf本身,它是通过定义....
在Unity中使用ProtoBuf-net的步骤如下: 1. **安装ProtoBuf-net**:首先,你需要在Unity项目中引入ProtoBuf-net库。这可以通过NuGet包管理器或直接下载源码添加到项目中完成。 2. **定义数据模型**:定义需要进行...
本话题聚焦于使用Netty和Protocol Buffers(Protobuf)来实现文件传输,这是一种现代且高性能的技术组合,尤其适用于大规模分布式系统。 Netty是一个开源的异步事件驱动的网络应用程序框架,用于快速开发可维护的高...
Protobuf的数据格式比JSON更紧凑,解析速度更快,尤其适合在网络通信中使用。 Protobuf的消息定义文件(.proto)是文本格式,易于阅读和编辑,且可以跨语言共享。 在Netty中集成Protobuf,主要目的是利用Protobuf...
在这个项目中,客户端和服务端之间的通信是基于ProtoBuf协议进行的,通过maven项目管理依赖,提供startClient和startServer两个主要的入口类。 Netty是一个用Java编写的异步事件驱动的网络应用框架,特别适合用于...
在Netty中, Protobuf(Protocol Buffers)是一种高效的序列化协议,由Google开发,用于数据交换。它提供了一种结构化数据的序列化方法,类似于XML、JSON,但更小、更快、更简单。Protobuf编解码器在Netty中的应用,...
相比于XML或JSON,Protobuf的数据格式更紧凑,解析速度更快,适合在网络传输中使用。在Java中,我们可以使用Protobuf编译器protoc生成Java API,通过这些API进行数据的序列化和反序列化。 结合Netty和Protobuf,...
标题中的“一个基于Nacos、Netty、Protobuf 实现的简单易懂的RCP框架”指的是一个使用了阿里巴巴的Nacos服务发现平台、高性能的网络库Netty以及高效的序列化协议Protobuf来构建的远程过程调用(RPC)框架。...
接下来,我们来看看如何在 Netty 中使用 Protobuf。Netty 提供了对 Protobuf 的支持,我们可以使用 Protobuf 的 `.proto` 文件生成相应的 Java 类,然后在 Netty 中处理这些类。以下是一般步骤: 1. **定义 ...
在本项目中,Android手机摄像头捕捉的视频数据被编码为Protobuf消息,然后发送到服务器。 **Android视频采集** 在Android设备上,项目可能使用了Android的MediaRecorder或者Camera2 API来捕获摄像头的视频流。...
2. **Netty服务器**:这部分代码实现了服务器端的逻辑,包括监听客户端连接、接收protobuf编码的消息、解析消息内容并进行相应的业务处理,如广播消息给其他在线用户。 3. **Netty客户端**:客户端代码负责与服务器...
Netty 集成了Protobuf的支持,提供了 `ProtobufDecoder` 和 `ProtobufEncoder`,使得我们在Netty中可以方便地处理Protobuf消息。使用Protobuf,开发者可以定义结构化的数据模型,并在Java、C++、Python等多语言之间...
在使用netty进行网络通信协议传输使用protobuf时protobuf编译.proto文件生成JAVA类.zip 包括测试proto3.proto文件,自动protobuf编译.proto文件生成JAVA类
5. **发送和接收ProtoBuf消息**:现在,Netty服务器和客户端可以使用生成的Java类来创建和解析消息。例如,服务器端创建一个`MyMessage`实例,通过编码器转换成ByteBuf,然后通过`ChannelHandlerContext....
当Netty与Protobuf结合时,可以在HTTP协议之上实现高效的数据传输。通常,HTTP用于在网络中传输数据,而Protobuf用于编码和解码这些数据,以提高性能和减少带宽消耗。以下是一个简单的步骤概述: 1. **服务器端**:...
在本文中,我们将深入探讨如何将Spring Boot与Netty结合,并使用Protocol Buffers(protobuf)作为数据传输格式,实现心跳检测、断开重连、数据上传以及主动推送等功能。这是一套综合性的技术栈,旨在构建高效、稳定...
在本资源中,"NettyProtobufTcpServer"是使用Netty实现的TCP服务器,它负责接收客户端的连接并处理基于Protobuf编码的数据。TCP协议提供了可靠的、面向连接的通信,适合于需要保证数据完整性和顺序的游戏通信场景。 ...
在本文中,我们将深入探讨如何使用Netty和Protobuf来开发一个实时聊天室实例。Netty是一个高性能、异步事件驱动的网络应用框架,适用于Java平台,它简化了TCP、UDP和HTTP等协议的服务器和客户端应用开发。而Protobuf...
在《netty+protobuf 整合实战》中,作者通过实际的源代码展示了如何将这两个技术结合使用。首先,我们需要理解 Protobuf 的工作原理。 Protobuf 提供了语言无关的 .proto 文件来定义数据结构,然后通过 protoc ...