`
Poechant
  • 浏览: 229449 次
博客专栏
Bebe66e7-3a30-3fc9-aeea-cfa3b474b591
Nginx高性能Web服务...
浏览量:24307
5738817b-23a1-3a32-86de-632d7da73b1e
Cumulus实时媒体服务...
浏览量:22095
社区版块
存档分类
最新评论

Practical Netty (3) 在Netty中使用Protobuf

 
阅读更多

Practical Netty (3) 在Netty中使用Protobuf

1. Netty 核心概念之一:Upstream 与 Downstream

学过 OSI 的 5 层或 TCP/IP 的 7 层网络模型知道,应用层位于传输层之上。所以从传输层的 Socket 读取数据,就是从下层向上层传输,就是 Upstream;反过来向 Socket 写数据,就是从应用层向传输层发送数据,就是 Downstream,即向 Socket 写。

从LocalTime 实例中 ServerBootstrap 是惯常用法,其中要注意的是:

  • bootstrap.setPipelineFactory(new LocalTimeServerPipelineFactory());

下面是LocalTimeServer的实现:


28  public class LocalTimeServer {
29  
30      private final int port;
31  
32      public LocalTimeServer(int port) {
33          this.port = port;
34      }
35  
36      public void run() {
37          // Configure the server.
38          ServerBootstrap bootstrap = new ServerBootstrap(
39                  new NioServerSocketChannelFactory(
40                          Executors.newCachedThreadPool(),
41                          Executors.newCachedThreadPool()));
42  
43          // Set up the event pipeline factory.
44          bootstrap.setPipelineFactory(new LocalTimeServerPipelineFactory());
45  
46          // Bind and start to accept incoming connections.
47          bootstrap.bind(new InetSocketAddress(port));
48      }
49  
50      public static void main(String[] args) throws Exception {
51          int port;
52          if (args.length > 0) {
53              port = Integer.parseInt(args[0]);
54          } else {
55              port = 8080;
56          }
57          new LocalTimeServer(port).run();
58      }
59  }

LocalTimeServerPipelineFactory的实现如下。在ChannelPipelineFactory的实现类的getPipeline函数中需要创建一个管道(pipeline),然后在管道中按照顺序添加 Upstream Handlers 和 Downstream Handlers。这其中的规则,如果用图表示的话,如下。


                                          I/O Request
                                         via Channel or
                                     ChannelHandlerContext
                                               |
      +----------------------------------------+---------------+
      |                  ChannelPipeline       |               |
      |                                       \|/              |
      |  +----------------------+  +-----------+------------+  |
      |  | Upstream Handler  N  |  | Downstream Handler  1  |  |
      |  +----------+-----------+  +-----------+------------+  |
      |            /|\                         |               |
      |             |                         \|/              |
      |  +----------+-----------+  +-----------+------------+  |
      |  | Upstream Handler N-1 |  | Downstream Handler  2  |  |
      |  +----------+-----------+  +-----------+------------+  |
      |            /|\                         .               |
      |             .                          .               |
      |     [ sendUpstream() ]        [ sendDownstream() ]     |
      |     [ + INBOUND data ]        [ + OUTBOUND data  ]     |
      |             .                          .               |
      |             .                         \|/              |
      |  +----------+-----------+  +-----------+------------+  |
      |  | Upstream Handler  2  |  | Downstream Handler M-1 |  |
      |  +----------+-----------+  +-----------+------------+  |
      |            /|\                         |               |
      |             |                         \|/              |
      |  +----------+-----------+  +-----------+------------+  |
      |  | Upstream Handler  1  |  | Downstream Handler  M  |  |
      |  +----------+-----------+  +-----------+------------+  |
      |            /|\                         |               |
      +-------------+--------------------------+---------------+
                    |                         \|/
      +-------------+--------------------------+---------------+
      |             |                          |               |
      |     [ Socket.read() ]          [ Socket.write() ]      |
      |                                                        |
      |  Netty Internal I/O Threads (Transport Implementation) |
      +--------------------------------------------------------+

2. Netty 核心概念之二:Pipeline 与 Upstream、Downstream 如何组织

Upstream 和 Downstream 都是在 Pipeline 中“流动”的,所以影响 Upstream 和 Downstream 行为的 UpstreamHandler 和 DownstreamHandler,也要被放到 Pipeline 里,吼吼。所以呢,就有如下的代码:


ChannelPipeline p = Channels.pipeline();
p.addLast("1", new UpstreamHandlerA());
p.addLast("2", new UpstreamHandlerB());
p.addLast("3", new DownstreamHandlerA());
p.addLast("4", new DownstreamHandlerB());
p.addLast("5", new UpstreamHandlerX());

则实际的 Upstream 和 Downstream 执行顺序是:

upstream: 1, 2, 5
downstream: 4, 3

当然其中任何一个 Handler 也可以兼有 Upstream 和 Downstream 的功能。下面是 Netty 中使用 Protobuf 的经典方式的实例,即官方的 LocalServer 的用法。


27  public class LocalTimeServerPipelineFactory implements ChannelPipelineFactory {
28  
29      public ChannelPipeline getPipeline() throws Exception {
30          ChannelPipeline p = pipeline();
31          p.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
32          p.addLast("protobufDecoder", new ProtobufDecoder(LocalTimeProtocol.Locations.getDefaultInstance()));
33  
34          p.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
35          p.addLast("protobufEncoder", new ProtobufEncoder());
36  
37          p.addLast("handler", new LocalTimeServerHandler());
38          return p;
39      }
40  }
  • 首先添加ProtobufVarint32FrameDecoderProtobufDecoder用于解码 Protobuf package,注意顺序,前者是解码标识长度的 32 位数据,后者才是解码包内容。他们都是 Upstream Handlers,顺序为自上而下。
  • 然后添加ProtobufVarint32LengthFieldPrependerProtobufEncoder用于编码 Protobuf package,注意顺序,前者是编码标识长度的 32 位数据,后者才是编码包内容的。他们都是 Downstream Handlers,顺序为自下而上。
  • LocalTimeServerHandler是自定义的,也是一个 Upstream Handler。所以这里 Upstream(从Socket读到数据)的执行顺序为ProtobufVarint32FrameDecoderProtobufDecoderLocalTimeServerHandler,Downstream(向Socket写数据)的执行顺序为protobufEncoderframeEncoder

这里要说的是,Netty 中提供的关于 Protobuf 的类只有这四个:

org.jboss.netty.handler.codec.protobuf
    - ProtobufDecoder
    - ProtobufEncoder
    - ProtobufVariant32FrameDecoder
    - ProtobufVariant32FrameEncoder

3. Netty 中如何处理 Protobuf 数据包

3.1. Netty 官方示例 LocalTime 中的 Protobuf 使用方式一览

那接下来我们就看看LocalTimeServerHandler的实现方式吧。这个类继承自SimpleChannelUpstreamHandler,说明只有从 Socket 接收数据时(Upstream)才会响应这个类的方法。它覆盖了四个方法,如下:


public class LocalTimeServerHandler extends SimpleChannelUpstreamHandler {
    public void handleUpstream(…){…}
    public void messageReceived(…){…}
    public void exceptionCaught(…){…}
    private static String toString(…){…}
}

除了messageReceived之外,其他的都不是重点。看看messageReceived吧。


54      public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
55  
56          Locations locations = (Locations) e.getMessage();
57          long currentTime = System.currentTimeMillis();
58  
59          LocalTimes.Builder builder = LocalTimes.newBuilder();
60          for (Location l: locations.getLocationList()) {
61              TimeZone tz = TimeZone.getTimeZone(
62                      toString(l.getContinent()) + '/' + l.getCity());
63              Calendar calendar = getInstance(tz);
64              calendar.setTimeInMillis(currentTime);
65  
66              builder.addLocalTime(LocalTime.newBuilder().
67                      setYear(calendar.get(YEAR)).
68                      setMonth(calendar.get(MONTH) + 1).
69                      setDayOfMonth(calendar.get(DAY_OF_MONTH)).
70                      setDayOfWeek(DayOfWeek.valueOf(calendar.get(DAY_OF_WEEK))).
71                      setHour(calendar.get(HOUR_OF_DAY)).
72                      setMinute(calendar.get(MINUTE)).
73                      setSecond(calendar.get(SECOND)).build());
74          }
75  
76          e.getChannel().write(builder.build());
77      }

你可能会问这个 Locations 是神马。这是 Protobuf 生成的一个 Java 类,留在后面说,你现在要知道的,就是 Locations 就是一个由 Protobuf 反序列化(Deserialize 或 Unmarshall)之后的东东。

3.2. 如何生成 Protobuf 的 Java 源文件

如果我们不讲讲 Protobuf 生成 Java 类的内容,似乎有点说不过去了,那就说说吧。

下面是一个 Protobuf 的 Protocol 定义文件的内容,文件名为LocalTimeProtocol.proto(关于 Protobuf 的入门实例,可以参考《Google Protobuf——实现跨平台跨语言的序列化/反序列化》)。


package org.jboss.netty.example.localtime;

option optimize_for = SPEED;
enum Continent {
    AFRICA = 0;
    AMERICA = 1;
    ANTARCTICA = 2;
    ARCTIC = 3;
    ASIA = 4;
    ATLANTIC = 5;
    AUSTRALIA = 6;
    EUROPE = 7;
    INDIAN = 8;
    MIDEAST = 9;
    PACIFIC = 10;
}

message Location {
    required Continent continent = 1;
    required string city = 2;
}

message Locations {
    repeated Location location = 1;
}

enum DayOfWeek {
    SUNDAY = 1;
    MONDAY = 2;
    TUESDAY = 3;
    WEDNESDAY = 4;
    THURSDAY = 5;
    FRIDAY = 6;
    SATURDAY = 7;
}

message LocalTime {
    required uint32 year = 1;
    required uint32 month = 2;
    required uint32 dayOfMonth = 4;
    required DayOfWeek dayOfWeek = 5;
    required uint32 hour = 6;
    required uint32 minute = 7;
    required uint32 second = 8;
}

message LocalTimes {
    repeated LocalTime localTime = 1;
}

可能有些朋友还不太了解 Protobuf,所以我在这里细说下。Protobuf 是 Google 开源的一个平台无关、语言无关的结构化数据的序列化与反序列化工具,将上面的内容保存到文件后,在控制台下输入命令(当然你要事先按照 Google Protocol Buffers 或者《Google Protobuf——实现跨平台跨语言的序列化/反序列化》一文的指导安装 Protobuf):


$ protoc LocalTimeProtocol.proto --java_out=.

就可以在当前目录下生成 Java 类了(并且包括相应的目录结构)。在上面的 Protobuf 定义文件中,出现了LocationLocationsLocalTimeLocalTimes这几个message。其中Location的定义如下:


message Location {
    required Continent continent = 1;
    required string city = 2;
}

其中required限定符表示是这个Locationmessage中必须包含的字段。那么再看Locations


message Locations {
    repeated Location location = 1;
}

这里的repeated限定符表示这是一个多元字段,即包含一组内容。后面的Location表明其中的元素是Locationmessage。LocalTimeLocalTimes的关系与其类似。

另外还要注意到的是ContinentDayOfWeek两个枚举类型(enum),它同 C++、Java 中的枚举定义类似(与 C++ 更类似些),这两个枚举类型分别被用在了LocationLocalTime两个 messages 中。

3.3. 如何使用 Protobuf 生成的 Java 类

3.3.1. 获取一个普通消息

  • 上面我们看到了,一个普通的 message,比如Locations,就叫Locations。全称是

      org.jboss.netty.example.localtime.LocalTimeProtocol.Locations
    

SimpleChannelUpstreamHandler.messageReceived()中是如下使用的:

Locations locations = (Locations) e.getMessage(); // e is an instance of MessageEvent

3.3.2. 从消息中 get 一个 required 成员

Location中的ContinentCityrequired字段,他们是最普通不过的,get如下:

l.getContinent()) + '/' + l.getCity()

如果 get 出来的是一个由 Protobuf 消息自身定义的 enum,则其值是定义中的对应的 int 值。

3.3.3. 从消息中 get 一个 repeated 成员

  • message定义中如果有repeated字段,如Locationsrepeated Location location,则这个字段的值的取出方法是:

      Locations locations = (Locations) e.getMessage();
      for (Location l: locations.getLocationList()) {
          …
      }
    

3.3.4. 生成一个普通消息

LocalTime localTime = LocalTime.newBuilder().build();

build()的返回值就是一个 LocalTime 实例。但是这样创建的一个 LocalTime,它的各 required 字段还没有设置,所以请看 3.3.5。

3.3.5. 向消息中 set 一个 required 成员

在 3.3.4. 中创建的一个 LocalTime,可以如下链式操作:

LocalTime.newBuilder().
                setYear(calendar.get(YEAR)).
                setMonth(calendar.get(MONTH) + 1).
                setDayOfMonth(calendar.get(DAY_OF_MONTH)).
                setDayOfWeek(DayOfWeek.valueOf(calendar.get(DAY_OF_WEEK))).
                setHour(calendar.get(HOUR_OF_DAY)).
                setMinute(calendar.get(MINUTE)).
                setSecond(calendar.get(SECOND)).build()
LocalTime localTime = 

newBuilder返回的是一个BuildersetYearsetMonth等等返回的也是一个Builder,这样很方便链式写法。

3.3.5. 向消息中 set 一个 repeated 成员

LocalTimes包含一个 repeated 成员,所以要如下 set:

LocalTimes.Builder builder = LocalTimes.newBuilder();
builder.addLocalTime(localTime1);
builder.addLocalTime(localTime2);
builder.addLocalTime(localTime3);
LocalTimes localTimes = builder.build();

其中localTime1localTime3localTime3都是通过 3.3.4. 中的方式得到的。

-

转载请注明来自柳大的CSDN博客:Blog.CSDN.net/Poechant,微博:weibo.com/lauginhom

-

分享到:
评论

相关推荐

    springboot集成netty,使用protobuf作为数据交换格式,可以用于智能终端云端服务脚手架

    3. 配置Spring Boot:在Spring Boot的配置文件中添加Netty和protobuf的相关依赖,配置服务器端口和protobuf的编码解码器。 4. 创建Netty Server:编写Netty服务器端代码,使用protobuf的Decoder和Encoder处理进来的...

    Unity与Netty进行ProtoBuf通信__

    在Unity中使用ProtoBuf-net的步骤如下: 1. **安装ProtoBuf-net**:首先,你需要在Unity项目中引入ProtoBuf-net库。这可以通过NuGet包管理器或直接下载源码添加到项目中完成。 2. **定义数据模型**:定义需要进行...

    Netty Protobuf3 测试服务器

    2. 编译protobuf3消息:使用protobuf编译器将.proto文件转换为Java和C#代码,生成的消息类可以直接在Netty服务器和Unity客户端中使用。 3. 创建Netty ChannelHandler:实现自定义的ChannelInboundHandler和...

    netty+protobuf入门案例

    接下来,我们来看看如何在 Netty 中使用 Protobuf。Netty 提供了对 Protobuf 的支持,我们可以使用 Protobuf 的 `.proto` 文件生成相应的 Java 类,然后在 Netty 中处理这些类。以下是一般步骤: 1. **定义 ...

    采用netty与protobuf进行文件传输

    在该系统中,Netty作为网络通信的底层框架,负责处理网络连接、数据传输等底层细节,而Protobuf则作为数据序列化工具,将文件内容转换为二进制格式,降低网络传输的开销。 在文件传输过程中,客户端(rrkd-file-...

    Netty中Protobuf编解码应用

    Protobuf编解码器在Netty中的应用,使得基于Protobuf的数据在网络传输中变得更加便捷和高效。 首先,理解Protobuf的基本概念。Protobuf定义了一种语言中立、平台中立的数据表示格式,通过.proto文件定义消息结构。...

    Netty4+ProtoBuf通信框架

    ProtoBuf的使用步骤通常包括定义.proto文件来描述数据结构,然后使用protobuf编译器生成对应的Java类,最后在程序中使用这些类进行序列化和反序列化操作。 在这个项目中,startClient和startServer两个mainClass...

    基于netty与protobuf的Android手机视频实时传输

    在本项目中,Android手机摄像头捕捉的视频数据被编码为Protobuf消息,然后发送到服务器。 **Android视频采集** 在Android设备上,项目可能使用了Android的MediaRecorder或者Camera2 API来捕获摄像头的视频流。...

    在使用netty进行网络通信协议传输使用protobuf时protobuf编译.proto文件生成JAVA类.zip

    在使用netty进行网络通信协议传输使用protobuf时protobuf编译.proto文件生成JAVA类.zip 包括测试proto3.proto文件,自动protobuf编译.proto文件生成JAVA类

    netty protobuf序列化 推送 android客户端

    在接收到消息时,使用protobuf的`Message.toByteArray()`方法将Java对象序列化为字节数组,然后通过Netty的ByteBuf传递给客户端。反之,对于客户端,接收到字节数组后,使用protobuf的`Message.parseFrom(byte[])`...

    netty http protobuf

    Netty、HTTP与Protobuf是三个在IT领域中至关重要的技术组件,它们分别在不同的层面上为高性能网络应用提供服务。下面将详细解释这三个概念及其相互结合的应用。 **Netty** Netty是一个开源的Java NIO(非阻塞I/O)...

    netty基于protobuf的简单示例

    在本示例中,我们将深入探讨如何利用 Netty 和 Google 的 Protocol Buffers(protobuf)来构建一个简单的服务端和客户端通信系统。 Protocol Buffers 是 Google 提供的一种数据序列化协议,它可以将结构化数据序列...

    通信与协议Netty+Protobuf-游戏设计与开发(1)配套代码

    在本资源中,"NettyProtobufTcpServer"是使用Netty实现的TCP服务器,它负责接收客户端的连接并处理基于Protobuf编码的数据。TCP协议提供了可靠的、面向连接的通信,适合于需要保证数据完整性和顺序的游戏通信场景。 ...

    Netty发送protoBuf格式数据

    3. **创建ProtoBuf编码解码器**:在Netty中,我们需要自定义编码器和解码器来处理ProtoBuf格式的数据。可以继承`ByteToMessageDecoder`和`MessageToByteEncoder`,重写其方法。编码器将Java对象转换为ByteBuf,解码...

    protobuf-netty-Demo

    protobuf-netty-Demo

    netty+protobuf (整合源代码)

    在《netty+protobuf 整合实战》中,作者通过实际的源代码展示了如何将这两个技术结合使用。首先,我们需要理解 Protobuf 的工作原理。 Protobuf 提供了语言无关的 .proto 文件来定义数据结构,然后通过 protoc ...

    netty学习文件,实现http,websocket,protobuf

    Netty 集成了Protobuf的支持,提供了 `ProtobufDecoder` 和 `ProtobufEncoder`,使得我们在Netty中可以方便地处理Protobuf消息。使用Protobuf,开发者可以定义结构化的数据模型,并在Java、C++、Python等多语言之间...

    Netty+自定义Protobuf编解码器

    然后,使用protobuf编译器(protoc)将.proto文件转换为各种目标语言(如Java、Python、C++等)的源代码,生成的消息类可以直接在代码中使用。在protobuf文件夹下,可能包含了一些.proto文件、编译生成的Java代码,...

    netty+protobuf开发一个聊天室实例

    3. **消息类型**: Protobuf中的基本数据结构,可以包含多个字段,每个字段都有唯一的标签和数据类型。 4. **效率**: Protobuf序列化的数据比JSON更紧凑,解析速度更快,适合网络传输。 **开发聊天室步骤** 1. **...

    基于netty和protobuf的聊天系统,客户端+服务器

    通过这个项目,开发者不仅可以学习到如何使用Netty构建网络应用,还能掌握protobuf在实际项目中的应用,理解数据序列化和反序列化的流程。同时,这个聊天系统也可以作为进一步开发分布式系统、实时通信应用的基础,...

Global site tag (gtag.js) - Google Analytics