在上一篇博客(netty入门实现简单的echo程序)中,我们知道了如何使用netty发送一个简单的消息,但是这远远是不够的。在这篇博客中,我们来使用netty发送一个java bean对象的消息,但是要发送对象类型的消息,必然要将java对象进行序列化,在java中序列化框架有很多中,此处我们使用protostuff来进行序列化,不了解protostuff的可以先看一下这篇博客(protostuff序列化)了解一下简单的用法。
需求:
客户端在连接上服务器端时,向服务器发送100个Person对象。
服务器端接收到消息后在控制台上打印即可。
消息发送的协议:4个字节的长度(长度是后面对象的长度,不包含自身的4个字节),后面是实际的要发送的数据
实现思路:
一、服务器端:
1、服务器端先用LengthFieldBasedFrameDecoder进行解码,获取到一个完整的ByteBuf消息
2、然后编写ProtostuffDecoder解码器将上一步解码出来的消息,转换成一个Person对象
3、编写ProtoStuffServerHandler类用于将上一步解码出来的Person对象输出出来
二、客户端
1、编写ProtostuffClientHandler类用于向服务器端发送Person对象
2、编写ProtostuffEncoder来进行将Person对象转换成字节数组
3、借助netty的LengthFieldPrepender来向上一步的字节数组前增加4个字节的消息长度
半包的处理主要是借助netty提交的LengthFieldBasedFrameDecoder来进行处理
注意:
new LengthFieldPrepender(4) ==> 会在发送的数据前增加4个字节表示消息的长度
new LengthFieldBasedFrameDecoder(10240, 0, 4, 0, 4) ==> 10240表示如果此次读取的字节长度比这个大说明可能是别人伪造socket攻击,将会抛出异常,第一个4表示读取四个字节表示此次 消息的长度,后面一个4表示丢弃四个字节,然后读取业务数据。
实现步骤:
一、引入maven依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.huan.netty</groupId> <artifactId>netty-study</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>netty-study</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> <scope>test</scope> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.6.Final</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.18</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.1.7</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.1.7</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-access</artifactId> <version>1.1.7</version> </dependency> <dependency> <groupId>io.protostuff</groupId> <artifactId>protostuff-api</artifactId> </dependency> <dependency> <groupId>io.protostuff</groupId> <artifactId>protostuff-core</artifactId> </dependency> <dependency> <groupId>io.protostuff</groupId> <artifactId>protostuff-runtime</artifactId> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>io.protostuff</groupId> <artifactId>protostuff-bom</artifactId> <version>1.4.4</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> </project>
二、编写实体类Person(客户端将会发送这个对象,服务器端接收这个对象)
@Data @Builder @NoArgsConstructor @AllArgsConstructor public class Person { private int id; private String name; }
三、编写ProtostuffDecoder用将ByteBuf中的数据转换成Person对象
public class ProtostuffDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Schema<Person> schema = RuntimeSchema.getSchema(Person.class); Person person = schema.newMessage(); byte[] array = new byte[in.readableBytes()]; in.readBytes(array); ProtobufIOUtil.mergeFrom(array, person, schema); out.add(person); } }
四、编写ProtoStuffServerHandler用于将接收到的数据输出到控制台
@Slf4j public class ProtoStuffServerHandler extends ChannelInboundHandlerAdapter { private int counter = 0; /** * 接收到数据的时候调用 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Person person = (Person) msg; log.info("当前是第[{}]次获取到客户端发送过来的person对象[{}].", ++counter, person); } /** 当发生了异常时,次方法调用 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("error:", cause); ctx.close(); } }
五、编写netty的服务端,用于启动netty的服务
@Slf4j public class NettyServer { public static void main(String[] args) throws Exception { EventLoopGroup boss = new NioEventLoopGroup(1); EventLoopGroup worker = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, worker)// .channel(NioServerSocketChannel.class)// 对应的是ServerSocketChannel类 .option(ChannelOption.SO_BACKLOG, 128)// .handler(new LoggingHandler(LogLevel.TRACE))// .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(10240, 0, 4, 0, 4)); ch.pipeline().addLast(new ProtostuffDecoder()); ch.pipeline().addLast(new ProtoStuffServerHandler()); } }); ChannelFuture future = bootstrap.bind(9090).sync(); log.info("server start in port:[{}]", 9090); future.channel().closeFuture().sync(); boss.shutdownGracefully(); worker.shutdownGracefully(); } }
此处需要注意解码器的顺序:
必须要先是LengthFieldBasedFrameDecoder,然后是ProtostuffDecoder,在是ProtoStuffServerHandler
六、编写客户端的handler处理器,用于向服务器端发送Person对象
@Slf4j public class ProtostuffClientHandler extends ChannelInboundHandlerAdapter { /** * 客户端和服务器端TCP链路建立成功后,此方法被调用 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Person person; for (int i = 0; i < 100; i++) { person = new Person(); person.setId(i); person.setName("张三" + i); ctx.writeAndFlush(person); } } /** * 发生异常时调用 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("client error:", cause); ctx.close(); } }
七、编写ProtostuffEncoder编码器,用于将Person对象编码成字节数组
public class ProtostuffEncoder extends MessageToByteEncoder<Person> { @Override protected void encode(ChannelHandlerContext ctx, Person msg, ByteBuf out) throws Exception { LinkedBuffer buffer = LinkedBuffer.allocate(1024); Schema<Person> schema = RuntimeSchema.getSchema(Person.class); byte[] array = ProtobufIOUtil.toByteArray(msg, schema, buffer); out.writeBytes(array); } }
八、编写客户端
@Slf4j public class NettyClient { public static void main(String[] args) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group)// .channel(NioSocketChannel.class)// .option(ChannelOption.TCP_NODELAY, true)// .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LengthFieldPrepender(4)); ch.pipeline().addLast(new ProtostuffEncoder()); ch.pipeline().addLast(new ProtostuffClientHandler()); } }); ChannelFuture future = bootstrap.connect("127.0.0.1", 9090).sync(); log.info("client connect server."); future.channel().closeFuture().sync(); group.shutdownGracefully(); } }
注意:此处也需要注意initChannel方法中的编码器加入的顺序
ProtostuffEncoder->将Person对象转换成字节数组
LengthFieldPrepender->在上一步的字节数组前加入4个字节的长度
九、启动服务器端和客户端进行测试
相关推荐
在这个“netty传输对象源码”的主题中,我们将深入理解如何使用Netty来传输自定义的对象,同时结合Spring和Maven进行项目构建。首先,让我们了解一下Netty的基本概念。 Netty的核心是其Channel和EventLoop系统。...
Spring Netty项目是一个整合了Netty 4和Spring 5框架的应用示例,它展示了如何在Netty服务器中使用Spring的特性,特别是针对UDP通信、MyBatis持久化以及通过Netty Handler访问Spring Bean来实现数据库操作。...
在Java生态系统中,Netty因其高效的NIO模型而被广泛应用于高并发的TCP/UDP网络通信,如游戏服务器、聊天服务器等。在这里,Netty可能作为底层通信层,负责处理客户端的连接请求和数据传输。 MyBatis是一个优秀的...
1. **NIO基础**:Netty基于Java NIO,允许非阻塞的I/O操作,提高了并发处理能力。 2. **Channel**:Netty的I/O操作都围绕着Channel进行,它可以读写数据并管理连接状态。 3. **EventLoop**:每个Channel都与一个...
6. **集成SpringBoot应用**:将Netty服务器作为SpringBoot的一个组件,通过`@Bean`注解暴露出去。在SpringBoot的主类中启动Netty服务器。 7. **测试与部署**:编写测试用例验证整合后的系统是否正常工作,然后打包...
@Component public class RabbitmqConfig { private final static String message = "web.socket.message";...完整代码,下载有详细说明,使用于长报文通讯,将报文长度截取一定字节发送,便于网速传输中丢包
- **Spring Bean容器**:Netty服务器启动时,可以利用Spring的Bean容器加载并管理ChannelInitializer和ChannelHandler实例,实现服务端的初始化。 - **Spring Event**:Netty的事件可以映射为Spring的...
Netty和Spring是两个在Java开发领域非常重要的框架。Netty是一个高性能、异步事件驱动的网络应用程序框架,常用于创建高并发、低延迟的网络服务,如TCP、UDP等协议的应用。Spring则是一个全面的企业级应用开发框架,...
### Java中级面试题:Spring和Netty相关问题详解 #### Spring框架相关问题解析 ##### BeanFactory与ApplicationContext的区别 - **BeanFactory**: - **概念**:BeanFactory 可被视为包含 Bean 集合的工厂类。它...
4. 将Netty服务器作为Spring Boot的一个Bean注入到Spring容器中,以便于管理和控制。 5. 使用Spring的Service或Repository层处理业务逻辑,与Netty服务器交互。 通过这个示例项目,开发者可以学习到如何在Spring ...
Resteasy + Spring + Netty sample Inject resteasy provider / controllers as spring bean Authentication Run at Main.java Test http://localhost:8082/resteasy/hello/world 教程 jax-rs规范用法: ...
在物联网领域,Java技术常被用来构建服务器端和客户端应用,尤其在结合Spring Boot和Netty框架时,能实现高效、稳定的数据传输。本实例主要探讨如何利用Java、Spring Boot和Netty来创建一个带有心跳机制的物联网系统...
Netty 是一个 Java 开源框架,专为处理 I/O 而设计,提供了高效的异步事件驱动的网络应用程序框架,适用于 TCP、UDP 和文件传输等网络通信场景。而 Spring 是一个广泛使用的 Java 应用开发框架,尤其在企业级应用和...
在Netty中,UDT模块扩展了其网络通信的能力,使得开发者能够在需要高速数据传输和低延迟的应用场景中,如金融交易系统、实时流媒体服务等,充分利用UDT的优势。 Netty的UDT模块提供了高效的双向数据通道,它通过多...
Netty是由JBOSS提供的一个Java开源框架,它提供了一个高效的、富有表现力的API,使得开发者能够快速构建出高性能、高可靠性的网络应用程序。Netty的核心优势在于其异步非阻塞I/O模型,这使得它在处理高并发场景时...
Socket.IO则是一种在浏览器和服务器之间实现实时双向通信的库,它兼容多种传输协议,如WebSocket、Long Polling等。 首先,我们需要在Spring Boot项目中引入Netty-Socket.IO的相关依赖。在`pom.xml`文件中添加如下...
这个"netty-demo-boot.zip"文件显然包含了使用 Netty 构建的一个演示应用,它很可能是基于 Spring Boot 的,因为 "boot" 通常与 Spring Boot 相关联,这是一个流行的 Java 应用开发框架,简化了微服务和云应用的起步...
Netty-socketio是一个Java实现的Socket.IO服务器,它提供了与Node.js版本Socket.IO服务器类似的功能,可以方便地与各种客户端(如JavaScript、iOS、Android等)进行交互。 在多人聊天室的实现中,核心功能包括: 1...
在有状态SessionBean中,用累加器,以对话状态存储起来,创建EJB对象,并将当前的计数器初始化,调用每一个EJB对象的count()方法,保证Bean正常被激活和钝化,EJB对象是用完毕,从内存中清除…… Java Socket 聊天...