Practical Netty (3) 在Netty中使用Protobuf
- 作者:柳大·Poechant(钟超)
- 邮箱:zhongchao.ustc#gmail.com(# -> @)
- 博客:Blog.CSDN.net/Poechant
- 微博:weibo.com/lauginhom
- 日期:June 8th, 2012
1. Netty 核心概念之一:Upstream 与 Downstream
学过 OSI 的 5 层或 TCP/IP 的 7 层网络模型知道,应用层位于传输层之上。所以从传输层的 Socket 读取数据,就是从下层向上层传输,就是 Upstream;反过来向 Socket 写数据,就是从应用层向传输层发送数据,就是 Downstream,即向 Socket 写。
从LocalTime 实例中 ServerBootstrap 是惯常用法,其中要注意的是:
bootstrap.setPipelineFactory(new LocalTimeServerPipelineFactory());
下面是LocalTimeServer
的实现:
28 public class LocalTimeServer {
29
30 private final int port;
31
32 public LocalTimeServer(int port) {
33 this.port = port;
34 }
35
36 public void run() {
37 // Configure the server.
38 ServerBootstrap bootstrap = new ServerBootstrap(
39 new NioServerSocketChannelFactory(
40 Executors.newCachedThreadPool(),
41 Executors.newCachedThreadPool()));
42
43 // Set up the event pipeline factory.
44 bootstrap.setPipelineFactory(new LocalTimeServerPipelineFactory());
45
46 // Bind and start to accept incoming connections.
47 bootstrap.bind(new InetSocketAddress(port));
48 }
49
50 public static void main(String[] args) throws Exception {
51 int port;
52 if (args.length > 0) {
53 port = Integer.parseInt(args[0]);
54 } else {
55 port = 8080;
56 }
57 new LocalTimeServer(port).run();
58 }
59 }
LocalTimeServerPipelineFactory
的实现如下。在ChannelPipelineFactory
的实现类的getPipeline
函数中需要创建一个管道(pipeline),然后在管道中按照顺序添加 Upstream Handlers 和 Downstream Handlers。这其中的规则,如果用图表示的话,如下。
I/O Request
via Channel or
ChannelHandlerContext
|
+----------------------------------------+---------------+
| ChannelPipeline | |
| \|/ |
| +----------------------+ +-----------+------------+ |
| | Upstream Handler N | | Downstream Handler 1 | |
| +----------+-----------+ +-----------+------------+ |
| /|\ | |
| | \|/ |
| +----------+-----------+ +-----------+------------+ |
| | Upstream Handler N-1 | | Downstream Handler 2 | |
| +----------+-----------+ +-----------+------------+ |
| /|\ . |
| . . |
| [ sendUpstream() ] [ sendDownstream() ] |
| [ + INBOUND data ] [ + OUTBOUND data ] |
| . . |
| . \|/ |
| +----------+-----------+ +-----------+------------+ |
| | Upstream Handler 2 | | Downstream Handler M-1 | |
| +----------+-----------+ +-----------+------------+ |
| /|\ | |
| | \|/ |
| +----------+-----------+ +-----------+------------+ |
| | Upstream Handler 1 | | Downstream Handler M | |
| +----------+-----------+ +-----------+------------+ |
| /|\ | |
+-------------+--------------------------+---------------+
| \|/
+-------------+--------------------------+---------------+
| | | |
| [ Socket.read() ] [ Socket.write() ] |
| |
| Netty Internal I/O Threads (Transport Implementation) |
+--------------------------------------------------------+
2. Netty 核心概念之二:Pipeline 与 Upstream、Downstream 如何组织
Upstream 和 Downstream 都是在 Pipeline 中“流动”的,所以影响 Upstream 和 Downstream 行为的 UpstreamHandler 和 DownstreamHandler,也要被放到 Pipeline 里,吼吼。所以呢,就有如下的代码:
ChannelPipeline p = Channels.pipeline();
p.addLast("1", new UpstreamHandlerA());
p.addLast("2", new UpstreamHandlerB());
p.addLast("3", new DownstreamHandlerA());
p.addLast("4", new DownstreamHandlerB());
p.addLast("5", new UpstreamHandlerX());
则实际的 Upstream 和 Downstream 执行顺序是:
upstream: 1, 2, 5
downstream: 4, 3
当然其中任何一个 Handler 也可以兼有 Upstream 和 Downstream 的功能。下面是 Netty 中使用 Protobuf 的经典方式的实例,即官方的 LocalServer 的用法。
27 public class LocalTimeServerPipelineFactory implements ChannelPipelineFactory {
28
29 public ChannelPipeline getPipeline() throws Exception {
30 ChannelPipeline p = pipeline();
31 p.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
32 p.addLast("protobufDecoder", new ProtobufDecoder(LocalTimeProtocol.Locations.getDefaultInstance()));
33
34 p.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
35 p.addLast("protobufEncoder", new ProtobufEncoder());
36
37 p.addLast("handler", new LocalTimeServerHandler());
38 return p;
39 }
40 }
- 首先添加
ProtobufVarint32FrameDecoder
和ProtobufDecoder
用于解码 Protobuf package,注意顺序,前者是解码标识长度的 32 位数据,后者才是解码包内容。他们都是 Upstream Handlers,顺序为自上而下。 - 然后添加
ProtobufVarint32LengthFieldPrepender
和ProtobufEncoder
用于编码 Protobuf package,注意顺序,前者是编码标识长度的 32 位数据,后者才是编码包内容的。他们都是 Downstream Handlers,顺序为自下而上。 -
LocalTimeServerHandler
是自定义的,也是一个 Upstream Handler。所以这里 Upstream(从Socket读到数据后)的执行顺序为ProtobufVarint32FrameDecoder
、ProtobufDecoder
和LocalTimeServerHandler
,Downstream(向Socket写数据前)的执行顺序为protobufEncoder
和frameEncoder
。
这里要说的是,Netty 中提供的关于 Protobuf 的类只有这四个:
org.jboss.netty.handler.codec.protobuf
- ProtobufDecoder
- ProtobufEncoder
- ProtobufVariant32FrameDecoder
- ProtobufVariant32FrameEncoder
3. Netty 中如何处理 Protobuf 数据包
3.1. Netty 官方示例 LocalTime 中的 Protobuf 使用方式一览
那接下来我们就看看LocalTimeServerHandler
的实现方式吧。这个类继承自SimpleChannelUpstreamHandler
,说明只有从 Socket 接收数据时(Upstream)才会响应这个类的方法。它覆盖了四个方法,如下:
public class LocalTimeServerHandler extends SimpleChannelUpstreamHandler {
public void handleUpstream(…){…}
public void messageReceived(…){…}
public void exceptionCaught(…){…}
private static String toString(…){…}
}
除了messageReceived
之外,其他的都不是重点。看看messageReceived
吧。
54 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
55
56 Locations locations = (Locations) e.getMessage();
57 long currentTime = System.currentTimeMillis();
58
59 LocalTimes.Builder builder = LocalTimes.newBuilder();
60 for (Location l: locations.getLocationList()) {
61 TimeZone tz = TimeZone.getTimeZone(
62 toString(l.getContinent()) + '/' + l.getCity());
63 Calendar calendar = getInstance(tz);
64 calendar.setTimeInMillis(currentTime);
65
66 builder.addLocalTime(LocalTime.newBuilder().
67 setYear(calendar.get(YEAR)).
68 setMonth(calendar.get(MONTH) + 1).
69 setDayOfMonth(calendar.get(DAY_OF_MONTH)).
70 setDayOfWeek(DayOfWeek.valueOf(calendar.get(DAY_OF_WEEK))).
71 setHour(calendar.get(HOUR_OF_DAY)).
72 setMinute(calendar.get(MINUTE)).
73 setSecond(calendar.get(SECOND)).build());
74 }
75
76 e.getChannel().write(builder.build());
77 }
你可能会问这个 Locations 是神马。这是 Protobuf 生成的一个 Java 类,留在后面说,你现在要知道的,就是 Locations 就是一个由 Protobuf 反序列化(Deserialize 或 Unmarshall)之后的东东。
3.2. 如何生成 Protobuf 的 Java 源文件
如果我们不讲讲 Protobuf 生成 Java 类的内容,似乎有点说不过去了,那就说说吧。
下面是一个 Protobuf 的 Protocol 定义文件的内容,文件名为LocalTimeProtocol.proto
(关于 Protobuf 的入门实例,可以参考《Google Protobuf——实现跨平台跨语言的序列化/反序列化》)。
package org.jboss.netty.example.localtime;
option optimize_for = SPEED;
enum Continent {
AFRICA = 0;
AMERICA = 1;
ANTARCTICA = 2;
ARCTIC = 3;
ASIA = 4;
ATLANTIC = 5;
AUSTRALIA = 6;
EUROPE = 7;
INDIAN = 8;
MIDEAST = 9;
PACIFIC = 10;
}
message Location {
required Continent continent = 1;
required string city = 2;
}
message Locations {
repeated Location location = 1;
}
enum DayOfWeek {
SUNDAY = 1;
MONDAY = 2;
TUESDAY = 3;
WEDNESDAY = 4;
THURSDAY = 5;
FRIDAY = 6;
SATURDAY = 7;
}
message LocalTime {
required uint32 year = 1;
required uint32 month = 2;
required uint32 dayOfMonth = 4;
required DayOfWeek dayOfWeek = 5;
required uint32 hour = 6;
required uint32 minute = 7;
required uint32 second = 8;
}
message LocalTimes {
repeated LocalTime localTime = 1;
}
可能有些朋友还不太了解 Protobuf,所以我在这里细说下。Protobuf 是 Google 开源的一个平台无关、语言无关的结构化数据的序列化与反序列化工具,将上面的内容保存到文件后,在控制台下输入命令(当然你要事先按照 Google Protocol Buffers 或者《Google Protobuf——实现跨平台跨语言的序列化/反序列化》一文的指导安装 Protobuf):
$ protoc LocalTimeProtocol.proto --java_out=.
就可以在当前目录下生成 Java 类了(并且包括相应的目录结构)。在上面的 Protobuf 定义文件中,出现了Location
、Locations
、LocalTime
和LocalTimes
这几个message
。其中Location
的定义如下:
message Location {
required Continent continent = 1;
required string city = 2;
}
其中required
限定符表示是这个Location
message中必须包含的字段。那么再看Locations
:
message Locations {
repeated Location location = 1;
}
这里的repeated
限定符表示这是一个多元字段,即包含一组内容。后面的Location
表明其中的元素是Location
message。LocalTime
和LocalTimes
的关系与其类似。
另外还要注意到的是Continent
和DayOfWeek
两个枚举类型(enum),它同 C++、Java 中的枚举定义类似(与 C++ 更类似些),这两个枚举类型分别被用在了Location
和LocalTime
两个 messages 中。
3.3. 如何使用 Protobuf 生成的 Java 类
3.3.1. 获取一个普通消息
-
上面我们看到了,一个普通的 message,比如
Locations
,就叫Locations
。全称是org.jboss.netty.example.localtime.LocalTimeProtocol.Locations
在SimpleChannelUpstreamHandler.messageReceived()
中是如下使用的:
Locations locations = (Locations) e.getMessage(); // e is an instance of MessageEvent
3.3.2. 从消息中 get 一个 required 成员
Location
中的Continent
和City
是required
字段,他们是最普通不过的,get
如下:
l.getContinent()) + '/' + l.getCity()
如果 get 出来的是一个由 Protobuf 消息自身定义的 enum,则其值是定义中的对应的 int 值。
3.3.3. 从消息中 get 一个 repeated 成员
-
message
定义中如果有repeated
字段,如Locations
有repeated Location location
,则这个字段的值的取出方法是:Locations locations = (Locations) e.getMessage(); for (Location l: locations.getLocationList()) { … }
3.3.4. 生成一个普通消息
LocalTime localTime = LocalTime.newBuilder().build();
build()
的返回值就是一个 LocalTime 实例。但是这样创建的一个 LocalTime,它的各 required 字段还没有设置,所以请看 3.3.5。
3.3.5. 向消息中 set 一个 required 成员
在 3.3.4. 中创建的一个 LocalTime,可以如下链式操作:
LocalTime.newBuilder().
setYear(calendar.get(YEAR)).
setMonth(calendar.get(MONTH) + 1).
setDayOfMonth(calendar.get(DAY_OF_MONTH)).
setDayOfWeek(DayOfWeek.valueOf(calendar.get(DAY_OF_WEEK))).
setHour(calendar.get(HOUR_OF_DAY)).
setMinute(calendar.get(MINUTE)).
setSecond(calendar.get(SECOND)).build()
LocalTime localTime =
newBuilder
返回的是一个Builder
,setYear
、setMonth
等等返回的也是一个Builder
,这样很方便链式写法。
3.3.5. 向消息中 set 一个 repeated 成员
LocalTimes
包含一个 repeated 成员,所以要如下 set:
LocalTimes.Builder builder = LocalTimes.newBuilder();
builder.addLocalTime(localTime1);
builder.addLocalTime(localTime2);
builder.addLocalTime(localTime3);
LocalTimes localTimes = builder.build();
其中localTime1
、localTime3
、localTime3
都是通过 3.3.4. 中的方式得到的。
-
转载请注明来自柳大的CSDN博客:Blog.CSDN.net/Poechant,微博:weibo.com/lauginhom
-
相关推荐
3. 配置Spring Boot:在Spring Boot的配置文件中添加Netty和protobuf的相关依赖,配置服务器端口和protobuf的编码解码器。 4. 创建Netty Server:编写Netty服务器端代码,使用protobuf的Decoder和Encoder处理进来的...
在Unity中使用ProtoBuf-net的步骤如下: 1. **安装ProtoBuf-net**:首先,你需要在Unity项目中引入ProtoBuf-net库。这可以通过NuGet包管理器或直接下载源码添加到项目中完成。 2. **定义数据模型**:定义需要进行...
2. 编译protobuf3消息:使用protobuf编译器将.proto文件转换为Java和C#代码,生成的消息类可以直接在Netty服务器和Unity客户端中使用。 3. 创建Netty ChannelHandler:实现自定义的ChannelInboundHandler和...
接下来,我们来看看如何在 Netty 中使用 Protobuf。Netty 提供了对 Protobuf 的支持,我们可以使用 Protobuf 的 `.proto` 文件生成相应的 Java 类,然后在 Netty 中处理这些类。以下是一般步骤: 1. **定义 ...
在该系统中,Netty作为网络通信的底层框架,负责处理网络连接、数据传输等底层细节,而Protobuf则作为数据序列化工具,将文件内容转换为二进制格式,降低网络传输的开销。 在文件传输过程中,客户端(rrkd-file-...
Protobuf编解码器在Netty中的应用,使得基于Protobuf的数据在网络传输中变得更加便捷和高效。 首先,理解Protobuf的基本概念。Protobuf定义了一种语言中立、平台中立的数据表示格式,通过.proto文件定义消息结构。...
ProtoBuf的使用步骤通常包括定义.proto文件来描述数据结构,然后使用protobuf编译器生成对应的Java类,最后在程序中使用这些类进行序列化和反序列化操作。 在这个项目中,startClient和startServer两个mainClass...
在本项目中,Android手机摄像头捕捉的视频数据被编码为Protobuf消息,然后发送到服务器。 **Android视频采集** 在Android设备上,项目可能使用了Android的MediaRecorder或者Camera2 API来捕获摄像头的视频流。...
在使用netty进行网络通信协议传输使用protobuf时protobuf编译.proto文件生成JAVA类.zip 包括测试proto3.proto文件,自动protobuf编译.proto文件生成JAVA类
在接收到消息时,使用protobuf的`Message.toByteArray()`方法将Java对象序列化为字节数组,然后通过Netty的ByteBuf传递给客户端。反之,对于客户端,接收到字节数组后,使用protobuf的`Message.parseFrom(byte[])`...
Netty、HTTP与Protobuf是三个在IT领域中至关重要的技术组件,它们分别在不同的层面上为高性能网络应用提供服务。下面将详细解释这三个概念及其相互结合的应用。 **Netty** Netty是一个开源的Java NIO(非阻塞I/O)...
在本示例中,我们将深入探讨如何利用 Netty 和 Google 的 Protocol Buffers(protobuf)来构建一个简单的服务端和客户端通信系统。 Protocol Buffers 是 Google 提供的一种数据序列化协议,它可以将结构化数据序列...
在本资源中,"NettyProtobufTcpServer"是使用Netty实现的TCP服务器,它负责接收客户端的连接并处理基于Protobuf编码的数据。TCP协议提供了可靠的、面向连接的通信,适合于需要保证数据完整性和顺序的游戏通信场景。 ...
3. **创建ProtoBuf编码解码器**:在Netty中,我们需要自定义编码器和解码器来处理ProtoBuf格式的数据。可以继承`ByteToMessageDecoder`和`MessageToByteEncoder`,重写其方法。编码器将Java对象转换为ByteBuf,解码...
protobuf-netty-Demo
在《netty+protobuf 整合实战》中,作者通过实际的源代码展示了如何将这两个技术结合使用。首先,我们需要理解 Protobuf 的工作原理。 Protobuf 提供了语言无关的 .proto 文件来定义数据结构,然后通过 protoc ...
Netty 集成了Protobuf的支持,提供了 `ProtobufDecoder` 和 `ProtobufEncoder`,使得我们在Netty中可以方便地处理Protobuf消息。使用Protobuf,开发者可以定义结构化的数据模型,并在Java、C++、Python等多语言之间...
然后,使用protobuf编译器(protoc)将.proto文件转换为各种目标语言(如Java、Python、C++等)的源代码,生成的消息类可以直接在代码中使用。在protobuf文件夹下,可能包含了一些.proto文件、编译生成的Java代码,...
3. **消息类型**: Protobuf中的基本数据结构,可以包含多个字段,每个字段都有唯一的标签和数据类型。 4. **效率**: Protobuf序列化的数据比JSON更紧凑,解析速度更快,适合网络传输。 **开发聊天室步骤** 1. **...
通过这个项目,开发者不仅可以学习到如何使用Netty构建网络应用,还能掌握protobuf在实际项目中的应用,理解数据序列化和反序列化的流程。同时,这个聊天系统也可以作为进一步开发分布式系统、实时通信应用的基础,...