package bhz.netty.runtime; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; import java.util.concurrent.TimeUnit; /** * Best Do It */ public class Client { private static class SingletonHolder { static final Client instance = new Client(); } public static Client getInstance(){ return SingletonHolder.instance; } private EventLoopGroup group; private Bootstrap b; private ChannelFuture cf ; private Client(){ group = new NioEventLoopGroup(); b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); //超时handler(当服务器端与客户端在指定时间以上没有任何进行通信,则会关闭响应的通道,主要为减小服务端资源占用) //5秒要跟服务器设置的一致 sc.pipeline().addLast(new ReadTimeoutHandler(5)); sc.pipeline().addLast(new ClientHandler()); } }); } public void connect(){ try { this.cf = b.connect("127.0.0.1", 8765).sync(); System.out.println("远程服务器已经连接, 可以进行数据交换.."); } catch (Exception e) { e.printStackTrace(); } } public ChannelFuture getChannelFuture(){ if(this.cf == null){ this.connect(); } if(!this.cf.channel().isActive()){ this.connect(); } return this.cf; } public static void main(String[] args) throws Exception{ final Client c = Client.getInstance(); ChannelFuture cf = c.getChannelFuture(); for(int i = 1; i <= 3; i++ ){ Request request = new Request(); request.setId("" + i); request.setName("pro" + i); request.setRequestMessage("数据信息" + i); cf.channel().writeAndFlush(request); TimeUnit.SECONDS.sleep(4); } //超过五秒之后就断开连接 cf.channel().closeFuture().sync();//程序阻塞在这里,断开连接时才继续往下走 //断开之后,重连服务器再发送 new Thread(new Runnable() { @Override public void run() { try { System.out.println("进入子线程..."); //重连 ChannelFuture cf = c.getChannelFuture(); System.out.println(cf.channel().isActive()); System.out.println(cf.channel().isOpen()); //再次发送数据 Request request = new Request(); request.setId("" + 4); request.setName("pro" + 4); request.setRequestMessage("数据信息" + 4); cf.channel().writeAndFlush(request); cf.channel().closeFuture().sync(); System.out.println("子线程结束."); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); System.out.println("断开连接,主线程结束.."); } }
package bhz.netty.runtime; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; public class ClientHandler extends ChannelHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { Response resp = (Response)msg; System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage()); } finally { ReferenceCountUtil.release(msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
package bhz.netty.runtime; import io.netty.handler.codec.marshalling.DefaultMarshallerProvider; import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider; import io.netty.handler.codec.marshalling.MarshallerProvider; import io.netty.handler.codec.marshalling.MarshallingDecoder; import io.netty.handler.codec.marshalling.MarshallingEncoder; import io.netty.handler.codec.marshalling.UnmarshallerProvider; import org.jboss.marshalling.MarshallerFactory; import org.jboss.marshalling.Marshalling; import org.jboss.marshalling.MarshallingConfiguration; /** * Marshalling工厂 * @author(alienware) * @since 2014-12-16 */ public final class MarshallingCodeCFactory { /** * 创建Jboss Marshalling解码器MarshallingDecoder * @return MarshallingDecoder */ public static MarshallingDecoder buildMarshallingDecoder() { //首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); //创建了MarshallingConfiguration对象,配置了版本号为5 final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); //根据marshallerFactory和configuration创建provider UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); //构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度 MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024); return decoder; } /** * 创建Jboss Marshalling编码器MarshallingEncoder * @return MarshallingEncoder */ public static MarshallingEncoder buildMarshallingEncoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组 MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; } }
package bhz.netty.runtime; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; public class Server { public static void main(String[] args) throws Exception{ EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) //设置日志 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); //五秒后没有连接,断开。 sc.pipeline().addLast(new ReadTimeoutHandler(5)); sc.pipeline().addLast(new ServerHandler()); } }); ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
package bhz.netty.runtime; import java.io.Serializable; public class Response implements Serializable{ private static final long serialVersionUID = 1L; private String id; private String name; private String responseMessage; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getResponseMessage() { return responseMessage; } public void setResponseMessage(String responseMessage) { this.responseMessage = responseMessage; } }
package bhz.netty.runtime; import java.io.Serializable; public class Request implements Serializable{ private static final long SerialVersionUID = 1L; private String id ; private String name ; private String requestMessage ; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getRequestMessage() { return requestMessage; } public void setRequestMessage(String requestMessage) { this.requestMessage = requestMessage; } }
package bhz.netty.runtime; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class ServerHandler extends ChannelHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Request request = (Request)msg; System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getRequestMessage()); Response response = new Response(); response.setId(request.getId()); response.setName("response" + request.getId()); response.setResponseMessage("响应内容" + request.getId()); ctx.writeAndFlush(response);//.addListener(ChannelFutureListener.CLOSE); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
相关推荐
netty使用自带工具类实现断线重连和心跳包
配置maven添加io.netty 在com.zhao的包下的文件,可以自行修改使用
自动重连”这个主题中,我们将深入探讨 Netty 如何实现长连接以及自动重连的机制。 首先,让我们理解什么是长连接。在传统的 TCP/IP 协议中,每次通信都需要建立一次连接(三次握手),完成数据交换后断开连接(四...
当心跳超时,它会关闭当前的 Channel 并触发重连。 4. **实现重连逻辑**: 当检测到连接断开或心跳超时时,使用 `ChannelFutureListener` 注册到 `Channel.closeFuture()`,在 Channel 关闭后执行重连操作。 5. **...
Netty 断线重连解决方案 Netty 是一个高性能的 NIO 框架,用于构建高性能的网络应用程序。然而,在使用 Netty 实现长连接服务时,可能会遇到断线的问题,即客户端和服务端之间的连接断开。这种情况可能是由于网络...
在本文中,我们将深入探讨如何利用 Netty 和 WebSocket 实现心跳检测和断线重连机制。 首先,我们需要理解 WebSocket 协议。WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,它为客户端和服务器提供了低...
在分布式系统和网络编程中,保持连接的稳定性是至关重要的,因此"Netty 断线重连"成为了关键特性。下面将详细介绍如何使用Netty实现心跳检测以及断线后的自动重连功能。 1. **心跳机制** - 心跳包:在网络通信中,...
8. **异常处理和连接管理**:Netty提供了优雅的异常处理机制,并能自动处理连接的关闭和重连。 9. **实例代码**:通常会包含如何配置和启动一个支持长连接的Netty服务器,以及客户端如何连接和发送数据的示例代码。...
使用netty4,客户端断线重连,使用message收发数据,重写方法
本文将深入探讨Netty4在构建长连接、实现断开重连、心跳检测以及Msgpack编码解码方面的知识。 首先,我们要理解什么是长连接。在TCP/IP通信中,长连接是指在完成一次数据传输后,连接不被立即关闭,而是保持一段...
功能 #### 自定义协议 | 魔数 2byte | 指令类型 1byte | 数据长度 4byte ... 设置重连次数来判断是否重连,重连次数耗尽则停止重连并关闭客户端 - 多长时间重连 短时间内频繁重连既消耗资源又没有必要,好的重连
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。这个“netty聊天小例子”提供了一个简单的服务端和客户端应用,可以帮助我们理解Netty如何在实际场景中工作。...
在实际应用中,我们可能还需要处理心跳、异常、超时等问题。Netty提供了WebSocketFrameDecoder和WebSocketFrameEncoder帮助解码和编码WebSocket帧,确保数据正确传输。 标签“源码”提示我们关注Netty和WebSocket...
netty案例,netty4.1中级拓展篇八《Netty心跳服务与断线重连》源码 https://mp.weixin.qq.com/s?__biz=MzIxMDAwMDAxMw==&mid=2650724845&idx=1&sn=8631c590ff4876ba0b7af64df16fc54b&scene=19#wechat_redirect
用netty实现长连接和心跳监测的示例代码
标题中的“基于netty实现的支持长连接的rpc”是指利用Netty框架构建一个远程过程调用(RPC)系统,该系统能够维持长时间的连接状态,提高通信效率。Netty是一个高性能、异步事件驱动的网络应用程序框架,适用于开发...
这个入门项目是学习WebSocket与Netty结合的一个好起点,通过实际操作,你可以更深入地理解WebSocket协议的工作原理,以及如何使用Netty构建高效稳定的WebSocket服务器。同时,对于前端开发人员,这也是一个了解...
学习netty框架编写的maven项目,使用netty4.1.1.Final版本实现的netty客户端自动重连,检测链路空闲时自动发送心跳包,如没有收到返回则自动断开连接重连。先运行NettyServerBootstrap类启动Server端,再启动Netty...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,常用于开发服务器和客户端的通信应用,尤其在处理高并发、低延迟的场景下表现出色。本教程将围绕"Netty实现长连接通讯"展开,重点讲解如何使用Netty框架构建...
springboot不用多说,火的不行,netty在很多实际项目中都有运用,非常好的一个框架,开发非常方便。 本文主要内容是: springboot与netty的整合 netty简单的长连接 pom文件 <groupId>org.springframework.boot ...