package com.shihuan.netty.codec.protobuf.server; import com.shihuan.netty.codec.protobuf.SubscribeReqProto; import com.shihuan.netty.codec.protobuf.SubscribeRespProto; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; @Sharable public class SubReqServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq) msg; if ("Lilinfeng".equalsIgnoreCase(req.getUserName())) { System.out.println("Service accept client subscribe req : [" + req.toString() + "]"); ctx.writeAndFlush(resp(req.getSubReqID())); } } private SubscribeRespProto.SubscribeResp resp(int subReqID) { SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp.newBuilder(); builder.setSubReqID(subReqID); builder.setRespCode(0); builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address"); return builder.build(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close();// 发生异常,关闭链路 } }
package com.shihuan.netty.codec.protobuf.server; import com.shihuan.netty.codec.protobuf.SubscribeReqProto; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; public class SubReqServer { public void bind(int port) throws Exception { // 配置服务端的NIO线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); //处理半包粘包的消息 ch.pipeline().addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance())); ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(new SubReqServerHandler()); } }); // 绑定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); // 等待服务端监听端口关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默认值 } } new SubReqServer().bind(port); } }
package com.shihuan.netty.codec.protobuf.client; import java.util.ArrayList; import java.util.List; import com.shihuan.netty.codec.protobuf.SubscribeReqProto; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class SubReqClientHandler extends ChannelHandlerAdapter { /** * Creates a client-side handler. */ public SubReqClientHandler() { } @Override public void channelActive(ChannelHandlerContext ctx) { for (int i = 0; i < 10; i++) { ctx.write(subReq(i)); } ctx.flush(); } private SubscribeReqProto.SubscribeReq subReq(int i) { SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder(); builder.setSubReqID(i); builder.setUserName("Lilinfeng"); builder.setProductName("Netty Book For Protobuf"); List<String> address = new ArrayList<>(); address.add("NanJing YuHuaTai"); address.add("BeiJing LiuLiChang"); address.add("ShenZhen HongShuLin"); builder.addAllAddress(address); return builder.build(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Receive server response : [" + msg + "]"); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
package com.shihuan.netty.codec.protobuf.client; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; import com.shihuan.netty.codec.protobuf.SubscribeRespProto; public class SubReqClient { public void connect(int port, String host) throws Exception { // 配置客户端NIO线程组 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); ch.pipeline().addLast(new ProtobufDecoder(SubscribeRespProto.SubscribeResp.getDefaultInstance())); ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(new SubReqClientHandler()); } }); // 发起异步连接操作 ChannelFuture f = b.connect(host, port).sync(); // 当代客户端链路关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放NIO线程组 group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默认值 } } new SubReqClient().connect(port, ""); } }
package com.shihuan.netty.codec.protobuf.test; import java.util.ArrayList; import java.util.List; import com.google.protobuf.InvalidProtocolBufferException; import com.shihuan.netty.codec.protobuf.SubscribeReqProto; public class TestSubscribeReqProto { private static byte[] encode(SubscribeReqProto.SubscribeReq req) { return req.toByteArray(); } private static SubscribeReqProto.SubscribeReq decode(byte[] body) throws InvalidProtocolBufferException { return SubscribeReqProto.SubscribeReq.parseFrom(body); } private static SubscribeReqProto.SubscribeReq createSubscribeReq() { SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder(); builder.setSubReqID(1); builder.setUserName("Lilinfeng"); builder.setProductName("Netty Book"); List<String> address = new ArrayList<>(); address.add("NanJing YuHuaTai"); address.add("BeiJing LiuLiChang"); address.add("ShenZhen HongShuLin"); builder.addAllAddress(address); return builder.build(); } public static void main(String[] args) throws InvalidProtocolBufferException { SubscribeReqProto.SubscribeReq req = createSubscribeReq(); System.out.println("Before encode : " + req.toString()); SubscribeReqProto.SubscribeReq req2 = decode(encode(req)); System.out.println("After decode : " + req.toString()); System.out.println("Assert equal : --> " + req2.equals(req)); } }
Netty、HTTP与Protobuf是三个在IT领域中至关重要的技术组件,它们分别在不同的层面上为高性能网络应用提供服务。下面将详细解释这三个概念及其相互结合的应用。 **Netty** Netty是一个开源的Java NIO(非阻塞I/O)...
3. **创建ProtoBuf编码解码器**:在Netty中,我们需要自定义编码器和解码器来处理ProtoBuf格式的数据。可以继承`ByteToMessageDecoder`和`MessageToByteEncoder`,重写其方法。编码器将Java对象转换为ByteBuf,解码...
处理器链中,我们可以自定义一个 ChannelInboundHandler 来处理接收到的 Protobuf 数据,并进行解码。同样,客户端也会设置一个 Bootstrap,连接到服务器,发送和接收 Protobuf 编码的数据。 总的来说,这个项目...
3. **创建 Netty 通道处理器**:在 Netty 中,我们需要创建一个自定义的 `ChannelInboundHandler` 或 `ChannelOutboundHandler` 来处理 Protobuf 消息的编码和解码。可以使用 ` ProtobufVarint32FrameDecoder` 和 `...
本文将深入探讨 Marshalling 编解码器在 Netty 中的应用及其自定义实现。 Marshalling 是一种序列化技术,它能够将 Java 对象转换为字节流,便于在网络中传输,同时也可以将字节流恢复为原来的对象。在 Netty 中,...
3. 配置Spring Boot:在Spring Boot的配置文件中添加Netty和protobuf的相关依赖,配置服务器端口和protobuf的编码解码器。 4. 创建Netty Server:编写Netty服务器端代码,使用protobuf的Decoder和Encoder处理进来的...
在本示例中,我们将深入探讨如何利用 Netty 和 Google 的 Protocol Buffers(protobuf)来构建一个简单的服务端和客户端通信系统。 Protocol Buffers 是 Google 提供的一种数据序列化协议,它可以将结构化数据序列...
本资源主要关注的是利用Netty框架和Protocol Buffers(Protobuf)进行网络通信,这在游戏开发中是一种常见的高性能解决方案。 Netty是一个Java开源框架,它提供了一个异步事件驱动的网络应用程序框架,用于快速开发...
Netty和Protobuf是两种在IT领域中广泛使用的开源技术,尤其在开发高效、高性能的网络应用程序时。本文将深入探讨这两个技术,并结合一个入门案例,帮助初学者理解如何将它们结合起来使用。 Netty是一个高性能、异步...
// 添加Protobuf解码器和编码器 p.addLast(new ProtobufVarint32FrameDecoder()); p.addLast(new ProtobufDecoder(YourMessageProto.YourMessage.getDefaultInstance())); p.addLast(new ProtobufVarint32...
在Netty中集成Protobuf,主要目的是利用Protobuf的数据序列化能力,将Java对象转换为字节流,然后通过Netty的Channel发送到网络,接收端再反序列化恢复成原来的对象。具体步骤如下: 1. 定义消息:首先创建一个....
在Spring Boot和Netty结合的场景下,protobuf可以用于编码和解码消息,确保在网络中高效地传输数据。 集成步骤通常包括以下几个关键部分: 1. **protobuf的使用**:定义protobuf消息模型,创建.proto文件,然后...