`

netty + protobuf 传输多个类

 
阅读更多

在protobuf序列化的前面,加上一个自定义的头,这个头包含序列化的长度和它的类型。在解压的时候根据包头来反序列化。

 

假设socket上要传输2个类型的数据,股票行情信息和期权行情信息:

股票的.proto定义:

复制代码
syntax = "proto3";

package test.model.protobuf;

option java_package = "test.model.protobuf";

message StockTick {
    string stockId = 1;
    int price = 2;
}
复制代码

 

期权的.proto定义:

复制代码
syntax = "proto3";

package test.model.protobuf;

option java_package = "test.model.protobuf";

message OptionTick {
    string optionId = 1;
    string securityId = 2;
    int price = 3;
}
复制代码

 

netty4官方事实上已经实现了protobuf的编解码的插件,但是只能用于传输单一类型的protobuf序列化。我这里截取一段netty代码,熟悉netty的同学马上就能理解它的作用:

复制代码
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new ProtobufVarint32FrameDecoder());
            pipeline.addLast(new ProtobufDecoder(StockTickOuterClass.StockTick.getDefaultInstance()));
            pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
            pipeline.addLast(new ProtobufEncoder());
            pipeline.addLast(new CustomProtoServerHandler());
        }
复制代码

 

看以上代码高亮部分,netty4官方的编解码器必须指定单一的protobuf类型才行。具体每个类的作用:

ProtobufEncoder:用于对Probuf类型序列化。
ProtobufVarint32LengthFieldPrepender:用于在序列化的字节数组前加上一个简单的包头,只包含序列化的字节长度。
ProtobufVarint32FrameDecoder:用于decode前解决半包和粘包问题(利用包头中的包含数组长度来识别半包粘包)
ProtobufDecoder:反序列化指定的Probuf字节数组为protobuf类型。

 

我们可以参考以上官方的编解码代码,将实现我们客户化的protobuf编解码插件,但是要支持多种不同类型protobuf数据在一个socket上传输:

 

编码器CustomProtobufEncoder:

复制代码
import com.google.protobuf.MessageLite;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
 * 参考ProtobufVarint32LengthFieldPrepender 和 ProtobufEncoder
 */
@Sharable
public class CustomProtobufEncoder extends MessageToByteEncoder<MessageLite> {
    
HangqingEncoder hangqingEncoder;
    
    public CustomProtobufEncoder(HangqingEncoder hangqingEncoder)
    {
        this.hangqingEncoder = hangqingEncoder;
    }
    
    @Override
    protected void encode(
            ChannelHandlerContext ctx, MessageLite msg, ByteBuf out) throws Exception {
        

        byte[] body = msg.toByteArray();
        byte[] header = encodeHeader(msg, (short)body.length);
        
        out.writeBytes(header);
        out.writeBytes(body);
        
        return;
    }
    
    private byte[] encodeHeader(MessageLite msg, short bodyLength) {
        byte messageType = 0x0f;
        
        if (msg instanceof StockTickOuterClass.StockTick) {
            messageType = 0x00;
        } else if (msg instanceof OptionTickOuterClass.OptionTick) {
            messageType = 0x01;
        }
        
        byte[] header = new byte[4];
        header[0] = (byte) (bodyLength & 0xff);
        header[1] = (byte) ((bodyLength >> 8) & 0xff);
        header[2] = 0; // 保留字段
        header[3] = messageType;

        return header;

    }
}
复制代码

 

CustomProtobufEncoder序列化传入的protobuf类型,并且为它创建了一个4个字节的包头,格式如下

 

body长度(low) body长度
(high)
保留字节 类型

 

其中的encodeHeader方法具体的实现要根据你要传输哪些protobuf类型来修改代码,也可以稍加设计避免使用太多的if…else。

 

解码器CustomProtobufDecoder:

复制代码
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
import com.google.protobuf.MessageLite;

/**
 * 参考ProtobufVarint32FrameDecoder 和 ProtobufDecoder
 */

public class CustomProtobufDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        while (in.readableBytes() > 4) { // 如果可读长度小于包头长度,退出。
            in.markReaderIndex();

            // 获取包头中的body长度
            byte low = in.readByte();
            byte high = in.readByte();
            short s0 = (short) (low & 0xff);
            short s1 = (short) (high & 0xff);
            s1 <<= 8;
            short length = (short) (s0 | s1);

            // 获取包头中的protobuf类型
            in.readByte();
            byte dataType = in.readByte();

            // 如果可读长度小于body长度,恢复读指针,退出。
            if (in.readableBytes() < length) {
                in.resetReaderIndex();
                return;
            }

            // 读取body
            ByteBuf bodyByteBuf = in.readBytes(length);

            byte[] array;
            int offset;

            int readableLen= bodyByteBuf.readableBytes();
            if (bodyByteBuf.hasArray()) {
                array = bodyByteBuf.array();
                offset = bodyByteBuf.arrayOffset() + bodyByteBuf.readerIndex();
            } else {
                array = new byte[readableLen];
                bodyByteBuf.getBytes(bodyByteBuf.readerIndex(), array, 0, readableLen);
                offset = 0;
            }
            
            //反序列化
            MessageLite result = decodeBody(dataType, array, offset, readableLen);
            out.add(result);
        }
    }

    public MessageLite decodeBody(byte dataType, byte[] array, int offset, int length) throws Exception {
        if (dataType == 0x00) {
            return StockTickOuterClass.StockTick.getDefaultInstance().
                    getParserForType().parseFrom(array, offset, length);

        } else if (dataType == 0x01) {
            return OptionTickOuterClass.OptionTick.getDefaultInstance().
                    getParserForType().parseFrom(array, offset, length);
        }

        return null; // or throw exception
    }
}
复制代码

 

CustomProtobufDecoder实现了2个功能,1)通过包头中的长度信息来解决半包和粘包。 2)把消息body反序列化为对应的protobuf类型(根据包头中的类型信息)。

其中的decodeBody方法具体的实现要根据你要传输哪些protobuf类型来修改代码,也可以稍加设计避免使用太多的if…else。

 

在Netty服务器上应用编解码器

如何把我们自定义的编解码用于netty Server:

复制代码
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast("decoder",new CustomProtobufDecoder());
            pipeline.addLast("encoder",new CustomProtobufEncoder());
            pipeline.addLast(new CustomProtoServerHandler());
        }
复制代码

 

分享到:
评论

相关推荐

    netty+protobuf (整合源代码)

    这涉及到网络编程、序列化、事件驱动编程等多个重要知识点,对于深入理解这两项技术的结合使用非常有帮助。学习这个项目,开发者不仅可以掌握 Netty 和 Protobuf 的基本用法,还能了解到如何在实际项目中整合它们,...

    netty+protobuf开发一个聊天室实例

    3. **消息类型**: Protobuf中的基本数据结构,可以包含多个字段,每个字段都有唯一的标签和数据类型。 4. **效率**: Protobuf序列化的数据比JSON更紧凑,解析速度更快,适合网络传输。 **开发聊天室步骤** 1. **...

    采用netty与protobuf进行文件传输

    这个过程可能涉及多个网络包的合并,Netty的流式处理能力在此处得到体现,能够正确处理分片的数据包,确保文件的完整性。 为了优化传输性能,还可以考虑以下策略: 1. 使用零拷贝技术,减少内存复制,提高传输效率...

    netty+protobuf入门案例

    这种模型允许在单个线程中处理多个连接,从而提高了系统资源的利用率和并发性能。Netty提供了丰富的组件和API,如Channel、EventLoop、ByteBuf等,使得开发者可以轻松地构建复杂、高可用的网络应用。 接着,我们来...

    netty+protobuf入门案例.

    在结合Netty和Protobuf的场景中,通常用Protobuf来定义通信协议,因为它提供了一种高效的二进制编码方式,可以减少网络传输的数据量,提高性能。Netty则作为网络通信的基础框架,负责处理网络连接、读写事件,以及将...

    毕设项目:基于netty+websocket+springboot的实时聊天系统.zip

    总的来说,这个毕设项目涵盖了后端开发的多个关键知识点,包括微服务架构、实时通信、网络编程、数据库设计和前端交互,对于学习和提升这些技能是非常有价值的。通过实践这个项目,开发者可以深入理解Spring Boot的...

    Netty+自定义Protobuf编解码器

    在提供的描述中提到,这个自定义编解码器能够发送多个protobuf对象,这意味着它可能采用了某种机制来区分不同类型的protobuf消息,比如添加一个消息类型标识或使用多路复用技术。 对于Protobuf本身,它是通过定义....

    基于netty与protobuf的Android手机视频实时传输

    本项目“基于Netty与Protobuf的Android手机视频实时传输”利用了高性能的网络库Netty和高效的序列化框架Protobuf,实现了高效、低延迟的视频流传输。下面将详细介绍这两个关键技术及其在该项目中的应用。 **Netty**...

    netty+thrift高并发高性能

    - **NIO多路复用模型**:如图2-3所示,Netty通过Selector监听多个Channel的就绪状态,一旦某个Channel准备就绪,就会被Selector选中,然后由对应的线程处理。这种方式避免了传统多线程模型下的线程切换和上下文切换...

    基于netty和protobuf的聊天系统,客户端+服务器

    1. **protobuf定义**:项目应该包含一个或多个.proto文件,定义了聊天消息的结构,如用户登录、发送消息、接收消息等消息类型。 2. **Netty服务器**:这部分代码实现了服务器端的逻辑,包括监听客户端连接、接收...

    netty+websocket通讯例子

    5. **Pipeline**: 每个 Channel 都有一个处理链,由多个 Handler 组成,负责对网络事件进行处理。 在 Netty 中实现 WebSocket 服务器端: 1. 创建 `WebSocketServerBootstrap` 并配置 `ServerSocketChannel`。 2. ...

    android netty cli +probuf示例

    在Android平台上实现即时通讯功能,Netty和Protobuf(Protocol Buffers)的结合是一个高效、可靠的解决方案。本文将深入探讨如何使用这两个技术构建一个客户端应用程序。 Netty是Java领域的一款高性能、异步事件...

    netty学习文件,实现http,websocket,protobuf

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本文中,我们将深入探讨Netty如何实现HTTP、WebSocket和Protobuf这三种通信方式,以及它们在服务器与客户端...

    使用Netty+Protocol Buffer实现聊天室

    6. **多平台兼容性**:由于protobuf的跨平台特性,同一套消息协议可以在Android、iOS、Web等多个平台上共享,简化了多端同步的复杂性。 7. **UI设计与交互**:在Android端,利用Android Studio和相关库,创建用户...

    一个基于Java的开源游戏服务器框架实现,使用了Netty、ProtoBuf、Disruptor等.zip

    在这个特定的项目中,一个基于Java的开源游戏服务器框架被实现,利用了几个关键的技术栈,包括Netty、ProtoBuf和Disruptor。这些技术都是为了优化性能、提高效率和简化通信而设计的。 Netty是一个高性能、异步事件...

    Netty中集成Protobuf实现Java对象数据传递示例代码.rar

    通过Netty的非阻塞I/O机制,我们可以轻松地处理多个并发连接,同时保持高性能。 总结起来,Netty与Protobuf的结合提供了一种高效、可靠的数据传输方案,特别适用于对性能有高要求的网络应用。通过理解这两者的原理...

    HttpProtobuf:一个项目的API测试工具(Https,protobuf)

    原本想用Jmeter进行接口测试,结果发现工作的项目用的是Protobuf的数据传输格式,一下子发现没办法用了,于是自己开始写个小工具。很多功能还不具备,使用的时候,输入请求类型,和URL,参数要用jsonObject的格式,...

    《Netty+JavaFx实战:仿桌面版微信聊天》| 本项目是作-chat.itstack.github.io.zip

    《Netty+JavaFx实战:仿桌面版微信聊天》是一个基于Netty网络框架和JavaFX图形用户界面库的项目,旨在模拟实现一个类似桌面版微信的聊天应用。这个项目不仅涵盖了网络编程的基础知识,还涉及到了GUI设计和事件处理等...

    netty传输对象源码

    EventLoop则是处理I/O事件的线程,每个Channel都关联一个或多个EventLoop。通过使用NIO(非阻塞I/O)和EPOLL(Linux下的高效I/O事件通知机制),Netty能够实现高并发和低延迟。 在传输对象时,Netty提供了一种序列...

    java服务器端(Netty_Proto)和c++客户端tcp通讯.rar

    - 异步处理:Netty服务器通常采用非阻塞I/O,通过事件循环处理多个连接。 - 错误处理:异常捕获和处理,保证服务的稳定运行。 6. **网络编程**: - IP地址与端口号:理解如何指定服务器和客户端的网络位置。 - ...

Global site tag (gtag.js) - Google Analytics