`

netty 对 protobuf的封装

 
阅读更多

netty 默认支持protobuf 的封装与解码,如果通信双方都使用netty则没有什么障碍,但如果客户端是其它语言(C#)则需要自己仿写与netty一致的方式(解码+封装),提前是必须很了解netty是如何进行封装与解码的。这里主要通过读源码主要类ProtobufVarint32FrameDecoder(解码)+ProtobufVarint32LengthFieldPrepender(封装) 来解析其原理与实现。

文章来源http://www.cnblogs.com/tankaixiong

一,支持protobuf 协议的默认实现

//配置服务端NIO线程组  
        EventLoopGroup bossGroup = new NioEventLoopGroup();  
        EventLoopGroup workerGroup = new NioEventLoopGroup();  
        try{  
            ServerBootstrap b = new ServerBootstrap();  
            b.group(bossGroup, workerGroup)  
                .channel(NioServerSocketChannel.class)  
                .option(ChannelOption.SO_BACKLOG, 1024)  
                .handler(new LoggingHandler(LogLevel.INFO))  
                .childHandler(new ChannelInitializer<SocketChannel>() {  
  
                    @Override  
                    protected void initChannel(SocketChannel ch) throws Exception {  
                        ch.pipeline()  
                        .addLast(new ProtobufVarint32FrameDecoder())                          
                        .addLast(new ProtobufDecoder(  
                                SubscribeReqProto.SubscribeReq.getDefaultInstance()))                         
                        .addLast(new ProtobufVarint32LengthFieldPrepender())                          
                        .addLast(new ProtobufEncoder())                       
                        .addLast(new SubReqServerHandler());                          
                    }  
                      
                });  
            //绑定端口,同步等待成功  
            ChannelFuture f = b.bind(port).sync();  
            //等待服务端监听端口关闭  
            f.channel().closeFuture().sync();  
              
        }finally{  
            //退出时释放资源  
            bossGroup.shutdownGracefully();  
            workerGroup.shutdownGracefully();  
        }

 

以上是提供的默认实现。关键在于ProtobufVarint32FrameDecoder,ProtobufVarint32LengthFieldPrepender类。

二,ProtobufVarint32LengthFieldPrepender 编码类

An encoder that prepends the the Google Protocol Buffers 128 Varints integer length field.

  1.  
    * BEFORE DECODE (300 bytes) AFTER DECODE (302 bytes)
  2.  
    * +---------------+ +--------+---------------+
  3.  
    * | Protobuf Data |-------------->| Length | Protobuf Data |
  4.  
    * | (300 bytes) | | 0xAC02 | (300 bytes) |
  5.  
    * +---------------+ +--------+---------------+

从类的说明来看, proto 消息格式如:Length + Protobuf Data (消息头+消息数据) 方式,这里特别需要注意的是头长使用的是varints方式不是int ,消息头描述消息数据体的长度。为了更减少传输量,消息头采用的是varint 格式。

什么是varint?

文章来源http://www.cnblogs.com/tankaixiongVarint 是一种紧凑的表示数字的方法。它用一个或多个字节来表示一个数字,值越小的数字使用越少的字节数。这能减少用来表示数字的字节数。 Varint 中的每个 byte 的最高位 bit 有特殊的含义,如果该位为 1,表示后续的 byte 也是该数字的一部分,如果该位为 0,则结束。其他的 7 个 bit 都用来表示数字。因此小于 128 的数字都可以用一个 byte 表示。大于 128 的数字,会用两个字节。

更多可参见我上篇文章

最大的区别是消息头它不是固定长度(常见是的使用INT 4个字节固定长度),Varint它用一个或多个字节来表示一个数字决定它不是固定长度!

ProtobufVarint32LengthFieldPrepender 类的主要方法如下:

@Override
    protected void encode(
            ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
        int bodyLen = msg.readableBytes();
        int headerLen = CodedOutputStream.computeRawVarint32Size(bodyLen);
        out.ensureWritable(headerLen + bodyLen);

        CodedOutputStream headerOut =
                CodedOutputStream.newInstance(new ByteBufOutputStream(out), headerLen);
        headerOut.writeRawVarint32(bodyLen);
        headerOut.flush();

        out.writeBytes(msg, msg.readerIndex(), bodyLen);
    }

 

CodedOutputStream 主要是针对与varints相关操作类。 先看是如何写消息头的,得到bodyLen 消息体长度然后调用computeRawVarint32Size()计算需要多少个字节,

public static int computeRawVarint32Size(final int value) {
    if ((value & (0xffffffff <<  7)) == 0) return 1;
    if ((value & (0xffffffff << 14)) == 0) return 2;
    if ((value & (0xffffffff << 21)) == 0) return 3;
    if ((value & (0xffffffff << 28)) == 0) return 4;
    return 5;
  }

 

0xffffffff << 7 二进制表示11111111111111111111111110000000 ,当与value &计算=0则表示value最大只会是000000000000000000000001111111,一个字节足以。

通过&运算得出使用多少个字节就可以表示当前数字。左移7位是与Varint定义相关,第一位需要保留给标识(1表示后续的 byte 也是该数字的一部分,0则结束)。要表示 int 32位 和多加的每个字节第一个标识位,多出了4位,所以就最大会有5个字节。

得到了varints值,然后如何写入out? 再看关键方法writeRawVarint32()。

public void writeRawVarint32(int value) throws IOException {
    while (true) {
      //0x7F为127
      if ((value & ~0x7F) == 0) {//是否小于127,小于则一个字节就可以表示了
        writeRawByte(value);
        return;
      } else {
        writeRawByte((value & 0x7F) | 0x80);//因不于小127,加一高位标识
        value >>>= 7;//右移7位,再递归
      }
    }
  }
    /** Write a single byte. */
  public void writeRawByte(final byte value) throws IOException {
    if (position == limit) {
      refreshBuffer();
    }

    buffer[position++] = value;
  }
  
  private void refreshBuffer() throws IOException {
    if (output == null) {
      // We're writing to a single buffer.
      throw new OutOfSpaceException();
    }

    // Since we have an output stream, this is our buffer
    // and buffer offset == 0
    output.write(buffer, 0, position);
    position = 0;
  }

 

byte 的取值(-128~127) , 0x7F为127 , 0x80为128

循环取后7位,如果小于127则结束,不小于第一位加标识位1。 因为循环右移所以,实际位置颠倒了,解码时需要倒过来再拼接。

消息头因为是varint32可变字节,所以比较复杂些,消息体简单直接writeBytes即可。

二,ProtobufVarint32FrameDecoder 解码类

同样对应CodedOutputStream有CodedInputStream类,操作解码时的varints。

@Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        in.markReaderIndex();
        final byte[] buf = new byte[5];
        for (int i = 0; i < buf.length; i ++) {
            if (!in.isReadable()) {
                in.resetReaderIndex();
                return;
            }

            buf[i] = in.readByte();
            if (buf[i] >= 0) {
                int length = CodedInputStream.newInstance(buf, 0, i + 1).readRawVarint32();
                if (length < 0) {
                    throw new CorruptedFrameException("negative length: " + length);
                }

                if (in.readableBytes() < length) {
                    in.resetReaderIndex();
                    return;
                } else {
                    out.add(in.readBytes(length));
                    return;
                }
            }
        }

        // Couldn't find the byte whose MSB is off.
        throw new CorruptedFrameException("length wider than 32-bit");
    }

 

前面说明了最大长度为5个字节所以这里声明了5个长度的字节来读取消息头。

buf[i] >= 0 这里为什么是>0然后就可以解码了呢?

还是这句话:varints第一位表示后续的byte是否是该数字的一部分!

如果字节第一位为1则表示后续还有字节是表示消息头,当这个字节的第一位为1则这个字节肯定是负数(字节最高位表示正负),大于等于0表示描述消息体长度的数字已经读完了。

然后调用readRawVarint32() 还原成int ,与之前 writeRawVarint32()反其道而行。

public int readRawVarint32() throws IOException {
   byte tmp = readRawByte();
   if (tmp >= 0) {
     return tmp;
   }
   int result = tmp & 0x7f;
   if ((tmp = readRawByte()) >= 0) {
     result |= tmp << 7;
   } else {
     result |= (tmp & 0x7f) << 7;
     if ((tmp = readRawByte()) >= 0) {
       result |= tmp << 14;
     } else {
       result |= (tmp & 0x7f) << 14;
       if ((tmp = readRawByte()) >= 0) {
         result |= tmp << 21;
       } else {
         result |= (tmp & 0x7f) << 21;
         result |= (tmp = readRawByte()) << 28;
         if (tmp < 0) {
           // Discard upper 32 bits.
           for (int i = 0; i < 5; i++) {
             if (readRawByte() >= 0) {
               return result;
             }
           }
           throw InvalidProtocolBufferException.malformedVarint();
         }
       }
     }
   }
   return result;
 }

 

取第N字节左移7*N位或|第一个字节拼接,实现了倒序拼接,最后得到了消息体长度。然后根据得到的消息体长度读取数据,如果消息体长度不够则回滚到markReaderIndex,等待数据。

四,总结

文章来源http://www.cnblogs.com/tankaixiong本文主要详细介绍了netty 对 protobuf 协议的解码与包装。重点在消息头 varint32的 算法表示上进行了说明。了解了varint32在协议中的实现,方便应用在其语言对接。

除了varint ,还可以使用LengthFieldPrepender and LengthFieldBasedFrameDecoder

分享到:
评论

相关推荐

    netty http protobuf

    客户端首先将业务对象转换为Protobuf格式的字节流,然后将其封装在HTTP请求中。 3. **数据传输**:HTTP协议负责将客户端发送的数据可靠地传输到服务器,而Protobuf保证了数据在传输过程中的高效编码和解码。 4. **...

    Android-基于NettyTCPProtobuf实现的AndroidIM库

    基于Netty TCP Protobuf实现的Android IM库,包含Protobuf序列化、TCP拆包与粘包、长连接握手认证、心跳机制、断线重连机制、消息重发机制、读写超时机制、离线消息、线程池等功能。

    gimmicks-math-0.95.zip

    4. **Netty客户端**:封装了与服务器通信的逻辑,使用Netty的异步I/O能力发起RPC调用,并处理返回的结果。 5. **序列化/反序列化**:Protobuf提供了高效的序列化和反序列化机制,使得数据能在网络间高效传输。 6. ...

    c++ netty.zip

    【标题】:“C++ Netty.zip”涉及到的关键技术点包括Java的Netty服务器、TCP通信、C++套接字编程以及Google的Protocol Buffers(protobuf)在Java和C++之间的跨语言通信应用。 【描述】:这个压缩包文件提供了一个...

    Unity与Netty通信Demo

    在Unity中进行网络通信,通常会涉及到对TCP/IP或UDP协议的封装。而本Demo中,Unity部分可能使用了名为`TestSocket.unitypackage`的资源包。这个包可能包含了一些预设(Prefabs)、脚本(Scripts)、场景(Scenes)或...

    基于netty3.5的游戏服务器端框架 消息封装,编解码结构提供扩展,请求消息队列处理,基于protobuf的实例已经完成.7z

    基于netty3.5的游戏服务器端框架 消息封装,编解码结构提供扩展,请求消息队列处理,基于protobuf的实例已经完成.7z

    Netty4 使用

    为了方便用户使用,本Demo可能对Netty的通信过程进行了封装。这可能包括: 1. 创建ServerBootstrap和Bootstrap,分别用于服务端和客户端的启动配置。 2. 配置Pipeline,添加自定义的处理器或预定义的编解码器。 3. ...

    netty技术实现即时通信方案.pdf

    在实现即时通讯方案中,Netty可以与Google ProtoBuf自定义的消息协议结合使用,实现高效的消息处理机制。同时,Netty的可靠Socket支持和高度可定制的线程模型也可以确保消息处理的可靠性和高效性。 Netty技术是实现...

    Java Netty面试题

    * 封装好:Netty封装了NIO操作的很多细节,提供了易于使用的调用接口。 Netty的优势有哪些? Netty的优势包括: * 使用简单:封装了NIO的很多细节,使用更简单。 * 功能强大:预置了多种编解码功能,支持多种主流...

    如何用Netty写一个自己的RPC框架

    Netty通过封装Java NIO,简化了网络编程。 5. Linux Epoll:在Linux系统上,Epoll是一个高效的I/O事件通知机制。Netty使用Epoll模型来提高网络处理性能,特别是当处理成千上万的连接时。 6. Java IO API与Netty的...

    Netty面试题(2020最新版).pdf

    * 定制能力强:Netty可以通过ChannelHandler对通信框架进行灵活的扩展。 * 性能高:Netty的综合性能最优。 * 稳定:Netty修复了已经发现的所有NIO的bug,开发人员可以专注于业务本身。 * 社区活跃:Netty是活跃的...

    Netty 35道面试题和答案.docx

    Netty 是基于 NIO 的,它封装了 JDK 的 NIO,让我们使用起来更加灵活。Netty 的特点包括高并发、传输快、封装好等。Netty 的优势包括使用简单、功能强大、定制能力强、性能高、稳定、社区活跃等。Netty 的应用场景有...

    16-Netty面试题.docx

    * 封装好:Netty 封装了 NIO 操作的很多细节,提供了易于使用调用接口。 Netty 的优势 * 使用简单:封装了 NIO 的很多细节,使用更简单。 * 功能强大:预置了多种编解码功能,支持多种主流协议。 * 定制能力强:...

    Netty 25道面试题你和答案.docx

    - 隐藏复杂的IO操作:如OPACCEPT、OP_READ、OP_WRITE等,Netty进行了封装,用户无需直接关注。 - 多线程模型:使用预配置的boss和worker EventLoopGroup,实现多Reactor多线程模型,简化线程管理。 - 编解码器:内置...

    Netty 20道面试题和答案.docx

    3. 封装好:Netty 封装了 NIO 操作的很多细节,提供了易于使用的调用接口。 什么是 Netty 的零拷贝? Netty 的零拷贝主要包含三个方面: 1. Netty 的接收和发送 ByteBuffer 采用 DIRECT BUFFERS,使用堆外直接...

    netty即时通讯程序

    Netty对NIO进行了封装,提供了更加简洁易用的API,使得开发者无需深入理解NIO的底层细节就能编写出高效的网络应用。 在即时通讯系统中,Netty主要负责以下关键组件: 1. **Bootstrap**: 它是Netty中的启动类,用于...

    netty 心跳实现

    - **心跳消息定义**:首先定义心跳包的消息类型,可以是自定义的 ByteBuf 或者 Protobuf 消息。 - **心跳发送**:在客户端的 ChannelOutboundHandler 中,设置定时任务,如使用 ScheduledExecutorService 定期发送...

Global site tag (gtag.js) - Google Analytics