`

netty中使用protobuf实现多协议的消息

阅读更多

    在我们使用 netty 的过程中,有时候为了高效传输数据,经常使用 protobuf 进行数据的传输,netty默认情况下为我们实现的 protobuf 的编解码,但是默认的只能实现单个对象编解码,但是我们在使用 netty 的过程中,可能需要传输的对象有各种各样的,那么该如何实现对protobuf多协议的解码呢

 

    在 protobuf 中有一种类型的字段叫做  oneof , 被 oneof 声明的字段就类似于可选字段,在同一时刻只有一个字段有值,并且它们会共享内存。

 

有了上述基础知识,我们来实现一个简单的功能。

 

需求:

       客户端在连接上服务器端后,每隔 1s 向服务器端发送一个 protobuf 类型的对象(比如登录报文、创建任务报文、删除任务报文等等),服务器端接收到这个对象并打印出来。

 

protobuf文件的编写:

       在protobuf 文件中,我们申明一个 枚举类型的字段,用来标识当前发送的 protobuf 对象的类型,比如是登录报文、创建任务报文还是别的,然后在  oneof 字段中,申明所有可能需要传递的 报文实体

 

一、protobuf-java jar包的引入

<dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>3.6.1</version>
</dependency>

 

二、proto 文件的编写


   注意:

            1、定义的枚举是为了标识当前发送的是什么类型的消息

            2、需要发送的多个消息统一放入到 oneof 中进行申明

            3、到时候给 netty 编解码的时候就编解码 TaskProtocol 对象

 

三、使用 protoc 命令根据 .proto 文件生成 对应的 java 代码

四、netty服务器端的编写

/**
 * netty protobuf server
 *
 * @author huan.fu
 * @date 2019/2/15 - 11:54
 */
@Slf4j
public class NettyProtobufServer {

	public static void main(String[] args) throws InterruptedException {
		EventLoopGroup parentGroup = new NioEventLoopGroup(1);
		EventLoopGroup childGroup = new NioEventLoopGroup();
		ServerBootstrap bootstrap = new ServerBootstrap();
		bootstrap.group(parentGroup, childGroup)
				.channel(NioServerSocketChannel.class)
				.option(ChannelOption.SO_BACKLOG, 1024)
				// 连接超时
				.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000)
				.handler(new LoggingHandler(LogLevel.TRACE))
				.childHandler(new ChannelInitializer<SocketChannel>() {
					@Override
					protected void initChannel(SocketChannel ch) {
						ch.pipeline()
								.addLast(new ProtobufVarint32FrameDecoder())
								.addLast(new ProtobufDecoder(TaskProtobufWrapper.TaskProtocol.getDefaultInstance()))
								.addLast(new ProtobufVarint32LengthFieldPrepender())
								.addLast(new ProtobufEncoder())
								.addLast(new ServerProtobufHandler());
					}
				});
		// 绑定端口,同步等待成功
		ChannelFuture future = bootstrap.bind(9090).sync();
		log.info("server start in port:[{}]", 9090);
		// 等待服务端链路关闭后,main线程退出
		future.channel().closeFuture().sync();
		// 关闭线程池资源
		parentGroup.shutdownGracefully();
		childGroup.shutdownGracefully();
	}
}

    注意:

            1、注意一下 netty 是如何使用那些编解码器来编解码 protobuf 的。

 

五、服务器端接收到客户端发送过来的消息的处理

/**
 * 服务器端接收到客户端发送的请求,然后随机给客户端返回一个对象
 *
 * @author huan.fu
 * @date 2019/2/15 - 14:26
 */
@Slf4j
public class ServerProtobufHandler extends SimpleChannelInboundHandler<TaskProtobufWrapper.TaskProtocol> {

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, TaskProtobufWrapper.TaskProtocol taskProtocol) {
		switch (taskProtocol.getPackType()) {
			case LOGIN:
				log.info("接收到一个登录类型的pack:[{}]", taskProtocol.getLoginPack().getUsername() + " : " + taskProtocol.getLoginPack().getPassword());
				break;
			case CREATE_TASK:
				log.info("接收到一个创建任务类型的pack:[{}]", taskProtocol.getCreateTaskPack().getTaskId() + " : " + taskProtocol.getCreateTaskPack().getTaskName());
				break;
			case DELETE_TASK:
				log.info("接收到一个删除任务类型的pack:[{}]", Arrays.toString(taskProtocol.getDeleteTaskPack().getTaskIdList().toArray()));
				break;
			default:
				log.error("接收到一个未知类型的pack:[{}]", taskProtocol.getPackType());
				break;
		}
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		ctx.close();
		log.error("发生异常", cause);
	}
}

    注意:

            1、服务器端根据 packType 字段来判断客户端发送的是什么类型的消息

 

六、netty 客户端的编写

/**
 * netty protobuf client
 *
 * @author huan.fu
 * @date 2019/2/15 - 11:54
 */
@Slf4j
public class NettyProtobufClient {

	public static void main(String[] args) throws InterruptedException {
		EventLoopGroup group = new NioEventLoopGroup();
		Bootstrap bootstrap = new Bootstrap();
		bootstrap.group(group)
				.channel(NioSocketChannel.class)
				.option(ChannelOption.TCP_NODELAY, true)
				.handler(new ChannelInitializer<SocketChannel>() {
					@Override
					protected void initChannel(SocketChannel ch) {
						ch.pipeline()
								.addLast(new ProtobufVarint32FrameDecoder())
								.addLast(new ProtobufDecoder(TaskProtobufWrapper.TaskProtocol.getDefaultInstance()))
								.addLast(new ProtobufVarint32LengthFieldPrepender())
								.addLast(new ProtobufEncoder())
								.addLast(new ClientProtobufHandler());
					}
				});
		ChannelFuture future = bootstrap.connect("127.0.0.1", 9090).sync();
		log.info("client connect server.");
		future.channel().closeFuture().sync();
		group.shutdownGracefully();
	}
}

 

七、客户端连接到服务器端时的处理

/**
 * 客户端连接到服务器端后,每隔1s发送一个报文到服务器端
 *
 * @author huan.fu
 * @date 2019/2/15 - 14:26
 */
@Slf4j
public class ClientProtobufHandler extends ChannelInboundHandlerAdapter {

	private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

	private AtomicInteger atomicInteger = new AtomicInteger(1);

	@Override
	public void channelActive(ChannelHandlerContext ctx) {
		executor.scheduleAtFixedRate(() -> {
			// 产生的pack类型
			int packType = new Random().nextInt(3);
			switch (TaskProtobufWrapper.PackType.forNumber(packType)) {
				case LOGIN:
					TaskProtobufWrapper.LoginPack loginPack = TaskProtobufWrapper.LoginPack.newBuilder().setUsername("张三[" + atomicInteger.getAndIncrement() + "]").setPassword("123456").build();
					ctx.writeAndFlush(TaskProtobufWrapper.TaskProtocol.newBuilder().setPackType(TaskProtobufWrapper.PackType.LOGIN).setLoginPack(loginPack).build());
					break;
				case CREATE_TASK:
					TaskProtobufWrapper.CreateTaskPack createTaskPack = TaskProtobufWrapper.CreateTaskPack.newBuilder().setCreateTime(System.currentTimeMillis()).setTaskId("100" + atomicInteger.get()).setTaskName("任务编号" + atomicInteger.get()).build();
					ctx.writeAndFlush(TaskProtobufWrapper.TaskProtocol.newBuilder().setPackType(TaskProtobufWrapper.PackType.CREATE_TASK).setCreateTaskPack(createTaskPack).build());
					break;
				case DELETE_TASK:
					TaskProtobufWrapper.DeleteTaskPack deleteTaskPack = TaskProtobufWrapper.DeleteTaskPack.newBuilder().addTaskId("1001").addTaskId("1002").build();
					ctx.writeAndFlush(TaskProtobufWrapper.TaskProtocol.newBuilder().setPackType(TaskProtobufWrapper.PackType.DELETE_TASK).setDeleteTaskPack(deleteTaskPack).build());
					break;
				default:
					log.error("产生一个未知的包类型:[{}]", packType);
					break;
			}
		}, 0, 1, TimeUnit.SECONDS);
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		ctx.close();
		log.error("发生异常", cause);
	}
}

    注意:

           1、客户端在连接服务器端时每隔1s发送不同的消息到服务器端

 

八、运行结果


 九、完整代码

完成代码如下https://gitee.com/huan1993/netty-study/tree/master/src/main/java/com/huan/netty/protobuf

 

 

  • 大小: 49.5 KB
  • 大小: 84.6 KB
分享到:
评论

相关推荐

    springboot集成netty,使用protobuf作为数据交换格式,可以用于智能终端云端服务脚手架

    5. 实现业务逻辑:在服务端和客户端的业务代码中,使用protobuf生成的Java类进行数据操作。 6. 测试通信:编写客户端代码,连接到服务器并发送protobuf编码的数据,验证数据能否正确接收和处理。 通过这样的集成,...

    Netty+自定义Protobuf编解码器

    在提供的描述中提到,这个自定义编解码器能够发送多个protobuf对象,这意味着它可能采用了某种机制来区分不同类型的protobuf消息,比如添加一个消息类型标识或使用多路复用技术。 对于Protobuf本身,它是通过定义....

    Unity与Netty进行ProtoBuf通信__

    在Unity中使用ProtoBuf-net的步骤如下: 1. **安装ProtoBuf-net**:首先,你需要在Unity项目中引入ProtoBuf-net库。这可以通过NuGet包管理器或直接下载源码添加到项目中完成。 2. **定义数据模型**:定义需要进行...

    采用netty与protobuf进行文件传输

    本话题聚焦于使用Netty和Protocol Buffers(Protobuf)来实现文件传输,这是一种现代且高性能的技术组合,尤其适用于大规模分布式系统。 Netty是一个开源的异步事件驱动的网络应用程序框架,用于快速开发可维护的高...

    Netty中集成Protobuf实现Java对象数据传递示例代码.rar

    Protobuf的数据格式比JSON更紧凑,解析速度更快,尤其适合在网络通信中使用。 Protobuf的消息定义文件(.proto)是文本格式,易于阅读和编辑,且可以跨语言共享。 在Netty中集成Protobuf,主要目的是利用Protobuf...

    Netty4+ProtoBuf通信框架

    在这个项目中,客户端和服务端之间的通信是基于ProtoBuf协议进行的,通过maven项目管理依赖,提供startClient和startServer两个主要的入口类。 Netty是一个用Java编写的异步事件驱动的网络应用框架,特别适合用于...

    Netty中Protobuf编解码应用

    在Netty中, Protobuf(Protocol Buffers)是一种高效的序列化协议,由Google开发,用于数据交换。它提供了一种结构化数据的序列化方法,类似于XML、JSON,但更小、更快、更简单。Protobuf编解码器在Netty中的应用,...

    java快速上手的网络IO框架,基于netty, google protobuf 数据传输协议.zip

    相比于XML或JSON,Protobuf的数据格式更紧凑,解析速度更快,适合在网络传输中使用。在Java中,我们可以使用Protobuf编译器protoc生成Java API,通过这些API进行数据的序列化和反序列化。 结合Netty和Protobuf,...

    一个基于Nacos、Netty、Protobuf 实现的简单易懂的RCP框架.zip

    标题中的“一个基于Nacos、Netty、Protobuf 实现的简单易懂的RCP框架”指的是一个使用了阿里巴巴的Nacos服务发现平台、高性能的网络库Netty以及高效的序列化协议Protobuf来构建的远程过程调用(RPC)框架。...

    netty+protobuf入门案例

    接下来,我们来看看如何在 Netty 中使用 Protobuf。Netty 提供了对 Protobuf 的支持,我们可以使用 Protobuf 的 `.proto` 文件生成相应的 Java 类,然后在 Netty 中处理这些类。以下是一般步骤: 1. **定义 ...

    基于netty与protobuf的Android手机视频实时传输

    在本项目中,Android手机摄像头捕捉的视频数据被编码为Protobuf消息,然后发送到服务器。 **Android视频采集** 在Android设备上,项目可能使用了Android的MediaRecorder或者Camera2 API来捕获摄像头的视频流。...

    基于netty和protobuf的聊天系统,客户端+服务器

    2. **Netty服务器**:这部分代码实现了服务器端的逻辑,包括监听客户端连接、接收protobuf编码的消息、解析消息内容并进行相应的业务处理,如广播消息给其他在线用户。 3. **Netty客户端**:客户端代码负责与服务器...

    netty学习文件,实现http,websocket,protobuf

    Netty 集成了Protobuf的支持,提供了 `ProtobufDecoder` 和 `ProtobufEncoder`,使得我们在Netty中可以方便地处理Protobuf消息。使用Protobuf,开发者可以定义结构化的数据模型,并在Java、C++、Python等多语言之间...

    在使用netty进行网络通信协议传输使用protobuf时protobuf编译.proto文件生成JAVA类.zip

    在使用netty进行网络通信协议传输使用protobuf时protobuf编译.proto文件生成JAVA类.zip 包括测试proto3.proto文件,自动protobuf编译.proto文件生成JAVA类

    Netty发送protoBuf格式数据

    5. **发送和接收ProtoBuf消息**:现在,Netty服务器和客户端可以使用生成的Java类来创建和解析消息。例如,服务器端创建一个`MyMessage`实例,通过编码器转换成ByteBuf,然后通过`ChannelHandlerContext....

    netty http protobuf

    当Netty与Protobuf结合时,可以在HTTP协议之上实现高效的数据传输。通常,HTTP用于在网络中传输数据,而Protobuf用于编码和解码这些数据,以提高性能和减少带宽消耗。以下是一个简单的步骤概述: 1. **服务器端**:...

    springboot集成netty,使用protobuf作为数据传输格式,包含心跳检测、断开重连等功能..zip

    在本文中,我们将深入探讨如何将Spring Boot与Netty结合,并使用Protocol Buffers(protobuf)作为数据传输格式,实现心跳检测、断开重连、数据上传以及主动推送等功能。这是一套综合性的技术栈,旨在构建高效、稳定...

    通信与协议Netty+Protobuf-游戏设计与开发(1)配套代码

    在本资源中,"NettyProtobufTcpServer"是使用Netty实现的TCP服务器,它负责接收客户端的连接并处理基于Protobuf编码的数据。TCP协议提供了可靠的、面向连接的通信,适合于需要保证数据完整性和顺序的游戏通信场景。 ...

    netty+protobuf开发一个聊天室实例

    在本文中,我们将深入探讨如何使用Netty和Protobuf来开发一个实时聊天室实例。Netty是一个高性能、异步事件驱动的网络应用框架,适用于Java平台,它简化了TCP、UDP和HTTP等协议的服务器和客户端应用开发。而Protobuf...

    netty+protobuf (整合源代码)

    在《netty+protobuf 整合实战》中,作者通过实际的源代码展示了如何将这两个技术结合使用。首先,我们需要理解 Protobuf 的工作原理。 Protobuf 提供了语言无关的 .proto 文件来定义数据结构,然后通过 protoc ...

Global site tag (gtag.js) - Google Analytics