netty为protobuf提供了两个编码器(ProtobufEncoder,ProtobufVarint32LengthFieldPrepender),两个解码器(ProtobufVarint32FrameDecoder,ProtobufDecoder)
[注]所谓的编码就是把应用程序使用的数据类型编码成在网络上传输的二进制字节流,反之同理。
看一个netty官网上提供的一个使用protobuf的例子:
LocalTimeProtocol.proto文件:
- package org.jboss.netty.example.localtime;
- option optimize_for = SPEED;
- enum Continent {
- AFRICA = 0;
- AMERICA = 1;
- ANTARCTICA = 2;
- ARCTIC = 3;
- ASIA = 4;
- ATLANTIC = 5;
- AUSTRALIA = 6;
- EUROPE = 7;
- INDIAN = 8;
- MIDEAST = 9;
- PACIFIC = 10;
- }
- message Location {
- required Continent continent = 1;
- required string city = 2;
- }
- message Locations {
- repeated Location location = 1;
- }
- enum DayOfWeek {
- SUNDAY = 1;
- MONDAY = 2;
- TUESDAY = 3;
- WEDNESDAY = 4;
- THURSDAY = 5;
- FRIDAY = 6;
- SATURDAY = 7;
- }
- message LocalTime {
- required uint32 year = 1;
- required uint32 month = 2;
- required uint32 dayOfMonth = 4;
- required DayOfWeek dayOfWeek = 5;
- required uint32 hour = 6;
- required uint32 minute = 7;
- required uint32 second = 8;
- }
- message LocalTimes {
- repeated LocalTime localTime = 1;
- }
客户端:
- public class LocalTimeClient {
- public static void main(String[] args) throws Exception {
- // Parse options.
- String host = "localhost";
- int port = 8080;
- Collection<String> cities = new ArrayList<String>(){
- private static final long serialVersionUID = 1L;
- {
- add("America/New_York");
- add("Asia/Seoul");
- }
- };
- // Set up.
- ClientBootstrap bootstrap = new ClientBootstrap(
- new NioClientSocketChannelFactory(
- Executors.newCachedThreadPool(),
- Executors.newCachedThreadPool()));
- // Configure the event pipeline factory.
- bootstrap.setPipelineFactory(new LocalTimeClientPipelineFactory());
- // Make a new connection.
- ChannelFuture connectFuture =
- bootstrap.connect(new InetSocketAddress(host, port));
- // Wait until the connection is made successfully.
- Channel channel = connectFuture.awaitUninterruptibly().getChannel();
- // Get the handler instance to initiate the request.
- LocalTimeClientHandler handler =
- channel.getPipeline().get(LocalTimeClientHandler.class);
- // Request and get the response.
- List<String> response = handler.getLocalTimes(cities);
- // Close the connection.
- channel.close().awaitUninterruptibly();
- // Shut down all thread pools to exit.
- bootstrap.releaseExternalResources();
- // Print the response at last but not least.
- Iterator<String> i1 = cities.iterator();
- Iterator<String> i2 = response.iterator();
- while (i1.hasNext()) {
- System.out.format("%28s: %s%n", i1.next(), i2.next());
- }
- }
- }
- public class LocalTimeClientPipelineFactory implements ChannelPipelineFactory {
- public ChannelPipeline getPipeline() throws Exception {
- ChannelPipeline p = pipeline();
- //解码用
- p.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
- //构造函数传递要解码成的类型
- p.addLast("protobufDecoder", new ProtobufDecoder(LocalTimeProtocol.LocalTimes.getDefaultInstance()));
- //编码用
- p.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
- p.addLast("protobufEncoder", new ProtobufEncoder());
- //业务逻辑用
- p.addLast("handler", new LocalTimeClientHandler());
- return p;
- }
- }
- public class LocalTimeClientHandler extends SimpleChannelUpstreamHandler {
- private static final Logger logger = Logger.getLogger(
- LocalTimeClientHandler.class.getName());
- // Stateful properties
- private volatile Channel channel;
- //用来存储服务端返回的结果
- private final BlockingQueue<LocalTimes> answer = new LinkedBlockingQueue<LocalTimes>();
- public List<String> getLocalTimes(Collection<String> cities) {
- Locations.Builder builder = Locations.newBuilder();
- //构造传输给服务端的Locations对象
- for (String c: cities) {
- String[] components = c.split("/");
- builder.addLocation(Location.newBuilder().
- setContinent(Continent.valueOf(components[0].toUpperCase())).
- setCity(components[1]).build());
- }
- channel.write(builder.build());
- LocalTimes localTimes;
- boolean interrupted = false;
- for (;;) {
- try {
- //从queue里面得到的,也就是服务端传过来的LocalTimes。
- localTimes = answer.take();
- break;
- } catch (InterruptedException e) {
- interrupted = true;
- }
- }
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
- List<String> result = new ArrayList<String>();
- for (LocalTime lt: localTimes.getLocalTimeList()) {
- result.add(
- new Formatter().format(
- "%4d-%02d-%02d %02d:%02d:%02d %s",
- lt.getYear(),
- lt.getMonth(),
- lt.getDayOfMonth(),
- lt.getHour(),
- lt.getMinute(),
- lt.getSecond(),
- lt.getDayOfWeek().name()).toString());
- }
- return result;
- }
- @Override
- public void handleUpstream(
- ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
- if (e instanceof ChannelStateEvent) {
- logger.info(e.toString());
- }
- super.handleUpstream(ctx, e);
- }
- @Override
- public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
- throws Exception {
- channel = e.getChannel();
- super.channelOpen(ctx, e);
- }
- @Override
- public void messageReceived(
- ChannelHandlerContext ctx, final MessageEvent e) {
- //收到服务端返回的消息,已经解码成了LocalTimes类型
- boolean offered = answer.offer((LocalTimes) e.getMessage());
- assert offered;
- }
- @Override
- public void exceptionCaught(
- ChannelHandlerContext ctx, ExceptionEvent e) {
- logger.log(
- Level.WARNING,
- "Unexpected exception from downstream.",
- e.getCause());
- e.getChannel().close();
- }
- }
服务端的处理:
- public class LocalTimeServer {
- public static void main(String[] args) throws Exception {
- // Configure the server.
- ServerBootstrap bootstrap = new ServerBootstrap(
- new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(),
- Executors.newCachedThreadPool()));
- // Set up the event pipeline factory.
- bootstrap.setPipelineFactory(new LocalTimeServerPipelineFactory());
- // Bind and start to accept incoming connections.
- bootstrap.bind(new InetSocketAddress(8080));
- }
- }
- public class LocalTimeServerPipelineFactory implements ChannelPipelineFactory {
- public ChannelPipeline getPipeline() throws Exception {
- ChannelPipeline p = pipeline();
- //解码
- p.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
- //构造函数传递要解码成的类型
- p.addLast("protobufDecoder", new ProtobufDecoder(LocalTimeProtocol.Locations.getDefaultInstance()));
- //编码
- p.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
- p.addLast("protobufEncoder", new ProtobufEncoder());
- //业务逻辑处理
- p.addLast("handler", new LocalTimeServerHandler());
- return p;
- }
- }
- public class LocalTimeServerHandler extends SimpleChannelUpstreamHandler {
- private static final Logger logger = Logger.getLogger(
- LocalTimeServerHandler.class.getName());
- @Override
- public void handleUpstream(
- ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
- if (e instanceof ChannelStateEvent) {
- logger.info(e.toString());
- }
- super.handleUpstream(ctx, e);
- }
- @Override
- public void messageReceived(
- ChannelHandlerContext ctx, MessageEvent e) {
- //收到的消息是Locations
- Locations locations = (Locations) e.getMessage();
- long currentTime = System.currentTimeMillis();
- LocalTimes.Builder builder = LocalTimes.newBuilder();
- for (Location l: locations.getLocationList()) {
- TimeZone tz = TimeZone.getTimeZone(
- toString(l.getContinent()) + '/' + l.getCity());
- Calendar calendar = Calendar.getInstance(tz);
- calendar.setTimeInMillis(currentTime);
- builder.addLocalTime(LocalTime.newBuilder().
- setYear(calendar.get(YEAR)).
- setMonth(calendar.get(MONTH) + 1).
- setDayOfMonth(calendar.get(DAY_OF_MONTH)).
- setDayOfWeek(DayOfWeek.valueOf(calendar.get(DAY_OF_WEEK))).
- setHour(calendar.get(HOUR_OF_DAY)).
- setMinute(calendar.get(MINUTE)).
- setSecond(calendar.get(SECOND)).build());
- }
- //返回LocalTimes
- e.getChannel().write(builder.build());
- }
- @Override
- public void exceptionCaught(
- ChannelHandlerContext ctx, ExceptionEvent e) {
- logger.log(
- Level.WARNING,
- "Unexpected exception from downstream.",
- e.getCause());
- e.getChannel().close();
- }
- private static String toString(Continent c) {
- return "" + c.name().charAt(0) + c.name().toLowerCase().substring(1);
- }
- }
从这个例子中也可以看出来,netty已经把所有的protobuf的细节给封装过了,我们现在就看一下netty是如何发送和接受protobuf数据的。
先看一下ProtobufEncoder,
- @Override
- protected Object encode(
- ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
- if (!(msg instanceof MessageLite)) {
- return msg;
- }
- return wrappedBuffer(((MessageLite) msg).toByteArray());
- }
encode方法很简单,实际上它会调用protobuf的api,把消息编码成protobuf格式的字节数组。
然后看一下ProtobufVarint32LengthFieldPrepender:
它会在原来的数据的前面,追加一个使用Base 128 Varints编码过的length:
BEFORE DECODE (300 bytes) AFTER DECODE (302 bytes)
+---------------+ +--------+---------------+
| Protobuf Data |-------------->| Length | Protobuf Data |
| (300 bytes) | | 0xAC02 | (300 bytes) |
+---------------+ +--------+---------------+
因此,netty实际上只做了这么一点工作,其余的全部都是protobuf自己完成的。
- @Override
- protected Object encode(ChannelHandlerContext ctx, Channel channel,
- Object msg) throws Exception {
- if (!(msg instanceof ChannelBuffer)) {
- return msg;
- }
- ChannelBuffer body = (ChannelBuffer) msg;
- int length = body.readableBytes();
- //header使用跟body同样的字节序,容量是length这个整数所占的字节数
- ChannelBuffer header =
- channel.getConfig().getBufferFactory().getBuffer(
- body.order(),
- CodedOutputStream.computeRawVarint32Size(length));
- CodedOutputStream codedOutputStream = CodedOutputStream
- .newInstance(new ChannelBufferOutputStream(header));
- //把length按照Base 128 Varints的方式写入header里面
- codedOutputStream.writeRawVarint32(length);
- codedOutputStream.flush();
- //把header和body组合到一块
- return wrappedBuffer(header, body);
- }
唯一值得一看的是:
- //计算一个整数在varint编码下所占的字节数,
- public static int computeRawVarint32Size(final int value) {
- if ((value & (0xffffffff << 7)) == 0) return 1;
- if ((value & (0xffffffff << 14)) == 0) return 2;
- if ((value & (0xffffffff << 21)) == 0) return 3;
- if ((value & (0xffffffff << 28)) == 0) return 4;
- return 5;
- }
- public void writeRawVarint32(int value) throws IOException {
- while (true) {
- if ((value & ~0x7F) == 0) {//如果最高位是0
- writeRawByte(value);//直接写
- return;
- } else {
- writeRawByte((value & 0x7F) | 0x80);//先写入低7位,最高位置1
- value >>>= 7;//再写高7位
- }
- }
- }
解码的过程无非就是先读出length来,根据length读取出所有的数据来,交给protobuf就能还原消息出来。
看一下具体的解码过程:
ProtobufVarint32FrameDecoder:
- protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
- buffer.markReaderIndex();
- final byte[] buf = new byte[5];//存放长度,最多也就5个字节
- for (int i = 0; i < buf.length; i ++) {
- if (!buffer.readable()) {
- buffer.resetReaderIndex();
- return null;
- }
- buf[i] = buffer.readByte();
- if (buf[i] >= 0) {
- //读取长度
- int length = CodedInputStream.newInstance(buf, 0, i + 1).readRawVarint32();
- if (length < 0) {
- throw new CorruptedFrameException("negative length: " + length);
- }
- if (buffer.readableBytes() < length) {
- buffer.resetReaderIndex();
- return null;
- } else {
- //读取数据
- return buffer.readBytes(length);
- }
- }
- }
- // Couldn't find the byte whose MSB is off.
- throw new CorruptedFrameException("length wider than 32-bit");
- }
看一下readRawVarint32:
- public int readRawVarint32() throws IOException {
- byte tmp = readRawByte();//先读一个字节
- if (tmp >= 0) {
- return tmp;
- }
- int result = tmp & 0x7f;//长度大于一个字节
- if ((tmp = readRawByte()) >= 0) {
- result |= tmp << 7;
- } else {
- result |= (tmp & 0x7f) << 7;
- if ((tmp = readRawByte()) >= 0) {
- result |= tmp << 14;
- } else {
- result |= (tmp & 0x7f) << 14;
- if ((tmp = readRawByte()) >= 0) {
- result |= tmp << 21;
- } else {
- result |= (tmp & 0x7f) << 21;
- result |= (tmp = readRawByte()) << 28;
- if (tmp < 0) {
- // Discard upper 32 bits.
- for (int i = 0; i < 5; i++) {
- if (readRawByte() >= 0) {
- return result;
- }
- }
- throw InvalidProtocolBufferException.malformedVarint();
- }
- }
- }
- }
- return result;
- }
- ProtobufDecoder:
- @Override
- protected Object decode(
- ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
- if (!(msg instanceof ChannelBuffer)) {
- return msg;
- }
- ChannelBuffer buf = (ChannelBuffer) msg;
- if (buf.hasArray()) {
- final int offset = buf.readerIndex();
- if(extensionRegistry == null) {
- //从字节数组里面还原出消息,上层收到的就是构造函数里面传递进来的类型
- return prototype.newBuilderForType().mergeFrom(
- buf.array(), buf.arrayOffset() + offset, buf.readableBytes()).build();
- } else {
- return prototype.newBuilderForType().mergeFrom(
- buf.array(), buf.arrayOffset() + offset, buf.readableBytes(), extensionRegistry).build();
- }
- } else {
- if (extensionRegistry == null) {
- return prototype.newBuilderForType().mergeFrom(
- new ChannelBufferInputStream((ChannelBuffer) msg)).build();
- } else {
- return prototype.newBuilderForType().mergeFrom(
- new ChannelBufferInputStream((ChannelBuffer) msg), extensionRegistry).build();
- }
- }
- }
相关推荐
这个项目“基于netty和protobuf的聊天系统,客户端+服务器”就是这样一个实例,它展示了如何利用Java语言结合Netty框架和Protocol Buffers(protobuf)来搭建一个高性能、低延迟的聊天应用。 Netty是一个开源的异步...
Netty4+ProtoBuf通信框架是一种高效的网络应用框架,它结合了Netty的高性能和Google的Protocol Buffers(ProtoBuf)的数据序列化能力,用于构建可伸缩、高并发的网络应用程序。在这个项目中,客户端和服务端之间的...
总结来说,"通信与协议Netty+Protobuf-游戏设计与开发(1)配套代码"是一个关于如何利用Netty进行网络通信以及如何使用Protobuf进行数据序列化的实例。通过学习这个资源,开发者可以更好地理解和掌握这两种技术在游戏...
在本文中,我们将深入探讨如何使用Netty和Protobuf来开发一个实时聊天室实例。Netty是一个高性能、异步事件驱动的网络应用框架,适用于Java平台,它简化了TCP、UDP和HTTP等协议的服务器和客户端应用开发。而Protobuf...
总的来说,Netty发送protoBuf格式数据的过程包括定义数据结构、生成Java类、编写编码解码器、配置Netty管道以及在服务器和客户端之间发送和接收消息。这种结合提供了强大而高效的网络通信解决方案,适用于多种应用...
在服务端,Netty 会创建一个 ServerBootstrap 实例,配置 NIO 事件循环组、处理器链以及绑定的端口。处理器链中,我们可以自定义一个 ChannelInboundHandler 来处理接收到的 Protobuf 数据,并进行解码。同样,...
客户端会创建一个 Netty 客户端实例,连接到服务端,编码 protobuf 消息并发送,然后接收并解码响应: ```java public class ProtobufClient { public static void main(String[] args) throws Exception { ...
在这个“Netty Protobuf3 测试服务器”项目中,开发者使用Netty框架来构建一个服务器,该服务器与Unity游戏引擎中的protobuf3(Protocol Buffers版本3)进行通信。protobuf3是Google开发的一种数据序列化协议,它...
在Java世界里,Netty和Protobuf是两个非常重要的技术组件。Netty是一个高性能、异步事件驱动的网络应用程序框架,常用于构建高并发、低延迟的网络服务。而Protobuf(Protocol Buffers)是Google推出的一种数据序列化...
在Java编程环境中,Netty是一个高性能、异步事件驱动的网络应用程序框架,常用于构建可伸缩、高并发的服务器。本示例关注的是如何利用Netty实现一个基于UDP(User Datagram Protocol)的数据接收服务,这在需要进行...
Netty和Protobuf是两种在IT领域中广泛使用的开源技术,尤其在开发高效、高性能的网络应用程序时。本文将深入探讨这两个技术,并结合一个入门案例,帮助初学者理解如何将它们结合起来使用。 Netty是一个高性能、异步...
在本文中,我们将深入探讨Netty在实际应用中的实例——对象传递调用,以及如何解决TCP粘包问题。同时,我们还会讨论Java序列化方案在Netty中的编解码对比。 首先,让我们来看看TCP粘包问题。在TCP协议中,由于其...
93个netty高并发全面的教学视频下载,每个视频在400-700M,一到两个小时时长的视频,无机器码和解压密码,下载下来的就是MP4格式视频。点击即可观看学习。下载txt文档,里面有永久分享的连接。包括01_学习的要义;02...
在Netty中,我们创建一个`ServerBootstrap`实例,配置服务端的线程模型、处理链(ChannelPipeline)以及绑定的端口。服务器的主要任务是接收连接请求,建立连接,并在连接建立后将数据通道(Channel)添加到管道中,...
在Android平台上实现即时通讯功能,Netty和Protobuf(Protocol Buffers)的结合是一个高效、可靠的解决方案。本文将深入探讨如何使用这两个技术构建一个客户端应用程序。 Netty是Java领域的一款高性能、异步事件...
54_Netty服务端初始化过程与反射在其中的应用分析 55_Netty提供的Future与ChannelFuture优势分析与源码讲解 56_Netty服务器地址绑定底层源码分析 57_Reactor模式透彻理解及其在Netty中的应用 58_Reactor模式与Netty...
在Netty应用中,这些protobuf生成的Java类可以作为消息类型进行网络通信。Netty的ByteToMessageDecoder和MessageToByteEncoder等编码解码器类可以帮助我们在ChannelHandlerContext中处理这些消息。例如,你可以创建...
第54讲:Netty服务端初始化过程与反射在其中的应用分析 第55讲:Netty提供的Future与ChannelFuture优势分析与源码讲解 第56讲:Netty服务器地址绑定底层源码分析 第57讲:Reactor模式透彻理解及其在Netty中的应用...
总的来说,这个项目提供了使用C#和Netty进行跨语言网络通信的实例,尤其是涉及到使用Protobuf作为数据交换格式,这在分布式系统和微服务架构中非常常见。学习这个项目可以深入了解C#中的网络编程,以及如何与Java...
在实际应用中,这些报告数据可能被Kafka消费者读取,经过处理后,通过Netty服务器推送到前端展示,帮助监控系统的访问情况或者进行数据分析。 总的来说,Netty4推送与Kafka消费的集成提供了一种高效、可扩展的方式...