`

netty 超时重连列子

 
阅读更多
package bhz.netty.runtime;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;

import java.util.concurrent.TimeUnit;


/**
 * Best Do It
 */
public class Client {
	
	private static class SingletonHolder {
		static final Client instance = new Client();
	}
	
	public static Client getInstance(){
		return SingletonHolder.instance;
	}
	
	private EventLoopGroup group;
	private Bootstrap b;
	private ChannelFuture cf ;
	
	private Client(){
			group = new NioEventLoopGroup();
			b = new Bootstrap();
			b.group(group)
			 .channel(NioSocketChannel.class)
			 .handler(new LoggingHandler(LogLevel.INFO))
			 .handler(new ChannelInitializer<SocketChannel>() {
					@Override
					protected void initChannel(SocketChannel sc) throws Exception {
						sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
						sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
						//超时handler(当服务器端与客户端在指定时间以上没有任何进行通信,则会关闭响应的通道,主要为减小服务端资源占用)
						//5秒要跟服务器设置的一致
						sc.pipeline().addLast(new ReadTimeoutHandler(5)); 
						sc.pipeline().addLast(new ClientHandler());
					}
		    });
	}
	
	public void connect(){
		try {
			this.cf = b.connect("127.0.0.1", 8765).sync();
			System.out.println("远程服务器已经连接, 可以进行数据交换..");				
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	public ChannelFuture getChannelFuture(){
		
		if(this.cf == null){
			this.connect();
		}
		if(!this.cf.channel().isActive()){
			this.connect();
		}
		
		return this.cf;
	}
	
	public static void main(String[] args) throws Exception{
		final Client c = Client.getInstance();

		
		ChannelFuture cf = c.getChannelFuture();
		for(int i = 1; i <= 3; i++ ){
			Request request = new Request();
			request.setId("" + i);
			request.setName("pro" + i);
			request.setRequestMessage("数据信息" + i);
			cf.channel().writeAndFlush(request);
			TimeUnit.SECONDS.sleep(4);
		}
		//超过五秒之后就断开连接
		cf.channel().closeFuture().sync();//程序阻塞在这里,断开连接时才继续往下走
		
		//断开之后,重连服务器再发送
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					System.out.println("进入子线程...");
					//重连
					ChannelFuture cf = c.getChannelFuture();
					System.out.println(cf.channel().isActive());
					System.out.println(cf.channel().isOpen());
					
					//再次发送数据
					Request request = new Request();
					request.setId("" + 4);
					request.setName("pro" + 4);
					request.setRequestMessage("数据信息" + 4);
					cf.channel().writeAndFlush(request);					
					cf.channel().closeFuture().sync();
					System.out.println("子线程结束.");
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}).start();
		
		System.out.println("断开连接,主线程结束..");
		
	}
	
	
	
}

 

package bhz.netty.runtime;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

public class ClientHandler extends ChannelHandlerAdapter{
	
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {

	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		try {
			Response resp = (Response)msg;
			System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());			
		} finally {
			ReferenceCountUtil.release(msg);
		}
	}

	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		ctx.close();
	}
	
}

 

package bhz.netty.runtime;

import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;

import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;

/**
 * Marshalling工厂
 * @author(alienware)
 * @since 2014-12-16
 */
public final class MarshallingCodeCFactory {

    /**
     * 创建Jboss Marshalling解码器MarshallingDecoder
     * @return MarshallingDecoder
     */
    public static MarshallingDecoder buildMarshallingDecoder() {
    	//首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
		final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
		//创建了MarshallingConfiguration对象,配置了版本号为5 
		final MarshallingConfiguration configuration = new MarshallingConfiguration();
		configuration.setVersion(5);
		//根据marshallerFactory和configuration创建provider
		UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
		//构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
		MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
		return decoder;
    }

    /**
     * 创建Jboss Marshalling编码器MarshallingEncoder
     * @return MarshallingEncoder
     */
    public static MarshallingEncoder buildMarshallingEncoder() {
		final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
		final MarshallingConfiguration configuration = new MarshallingConfiguration();
		configuration.setVersion(5);
		MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
		//构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
		MarshallingEncoder encoder = new MarshallingEncoder(provider);
		return encoder;
    }
}

 

package bhz.netty.runtime;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;

public class Server {

	public static void main(String[] args) throws Exception{
		
		EventLoopGroup pGroup = new NioEventLoopGroup();
		EventLoopGroup cGroup = new NioEventLoopGroup();
		
		ServerBootstrap b = new ServerBootstrap();
		b.group(pGroup, cGroup)
		 .channel(NioServerSocketChannel.class)
		 .option(ChannelOption.SO_BACKLOG, 1024)
		 //设置日志
		 .handler(new LoggingHandler(LogLevel.INFO))
		 .childHandler(new ChannelInitializer<SocketChannel>() {
			protected void initChannel(SocketChannel sc) throws Exception {
				sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
				sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
				//五秒后没有连接,断开。
				sc.pipeline().addLast(new ReadTimeoutHandler(5)); 
				sc.pipeline().addLast(new ServerHandler());
			}
		});
		
		ChannelFuture cf = b.bind(8765).sync();
		
		cf.channel().closeFuture().sync();
		pGroup.shutdownGracefully();
		cGroup.shutdownGracefully();
		
	}
}

 

package bhz.netty.runtime;

import java.io.Serializable;

public class Response implements Serializable{
	
	private static final long serialVersionUID = 1L;
	
	private String id;
	private String name;
	private String responseMessage;
	
	public String getId() {
		return id;
	}
	public void setId(String id) {
		this.id = id;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public String getResponseMessage() {
		return responseMessage;
	}
	public void setResponseMessage(String responseMessage) {
		this.responseMessage = responseMessage;
	}
	

}

 

package bhz.netty.runtime;

import java.io.Serializable;

public class Request implements Serializable{

	private static final long  SerialVersionUID = 1L;
	
	private String id ;
	private String name ;
	private String requestMessage ;
	
	public String getId() {
		return id;
	}
	public void setId(String id) {
		this.id = id;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public String getRequestMessage() {
		return requestMessage;
	}
	public void setRequestMessage(String requestMessage) {
		this.requestMessage = requestMessage;
	}


}

 

package bhz.netty.runtime;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class ServerHandler extends ChannelHandlerAdapter{

	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {

	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		Request request = (Request)msg;
		System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getRequestMessage());
		Response response = new Response();
		response.setId(request.getId());
		response.setName("response" + request.getId());
		response.setResponseMessage("响应内容" + request.getId());
		ctx.writeAndFlush(response);//.addListener(ChannelFutureListener.CLOSE);
	}

	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		ctx.close();
	}

	
	
}

 

分享到:
评论

相关推荐

    netty 断线重连+心跳

    netty使用自带工具类实现断线重连和心跳包

    Netty 超时_心跳机制_断线重连 demo

    配置maven添加io.netty 在com.zhao的包下的文件,可以自行修改使用

    netty5长连接.自动重连

    自动重连”这个主题中,我们将深入探讨 Netty 如何实现长连接以及自动重连的机制。 首先,让我们理解什么是长连接。在传统的 TCP/IP 协议中,每次通信都需要建立一次连接(三次握手),完成数据交换后断开连接(四...

    netty断线重连机制及心跳机制.rar

    当心跳超时,它会关闭当前的 Channel 并触发重连。 4. **实现重连逻辑**: 当检测到连接断开或心跳超时时,使用 `ChannelFutureListener` 注册到 `Channel.closeFuture()`,在 Channel 关闭后执行重连操作。 5. **...

    Netty断线重连解决方案.docx

    Netty 断线重连解决方案 Netty 是一个高性能的 NIO 框架,用于构建高性能的网络应用程序。然而,在使用 Netty 实现长连接服务时,可能会遇到断线的问题,即客户端和服务端之间的连接断开。这种情况可能是由于网络...

    netty+websocket实现心跳和断线重连

    在本文中,我们将深入探讨如何利用 Netty 和 WebSocket 实现心跳检测和断线重连机制。 首先,我们需要理解 WebSocket 协议。WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,它为客户端和服务器提供了低...

    netty 断线重连

    在分布式系统和网络编程中,保持连接的稳定性是至关重要的,因此"Netty 断线重连"成为了关键特性。下面将详细介绍如何使用Netty实现心跳检测以及断线后的自动重连功能。 1. **心跳机制** - 心跳包:在网络通信中,...

    netty 实现长连接

    8. **异常处理和连接管理**:Netty提供了优雅的异常处理机制,并能自动处理连接的关闭和重连。 9. **实例代码**:通常会包含如何配置和启动一个支持长连接的Netty服务器,以及客户端如何连接和发送数据的示例代码。...

    netty4长连接重连机制,心跳包

    使用netty4,客户端断线重连,使用message收发数据,重写方法

    Netty4长连接(服务端+客户端)

    本文将深入探讨Netty4在构建长连接、实现断开重连、心跳检测以及Msgpack编码解码方面的知识。 首先,我们要理解什么是长连接。在TCP/IP通信中,长连接是指在完成一次数据传输后,连接不被立即关闭,而是保持一段...

    基于netty实现采用自定义协议方式通讯,同时支持心跳机制和重连机制

    功能 #### 自定义协议 | 魔数 2byte | 指令类型 1byte | 数据长度 4byte ... 设置重连次数来判断是否重连,重连次数耗尽则停止重连并关闭客户端 - 多长时间重连 短时间内频繁重连既消耗资源又没有必要,好的重连

    netty聊天小列子

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。这个“netty聊天小例子”提供了一个简单的服务端和客户端应用,可以帮助我们理解Netty如何在实际场景中工作。...

    基于netty的websocket开发小结

    在实际应用中,我们可能还需要处理心跳、异常、超时等问题。Netty提供了WebSocketFrameDecoder和WebSocketFrameEncoder帮助解码和编码WebSocket帧,确保数据正确传输。 标签“源码”提示我们关注Netty和WebSocket...

    netty案例,netty4.1中级拓展篇八《Netty心跳服务与断线重连》源码

    netty案例,netty4.1中级拓展篇八《Netty心跳服务与断线重连》源码 https://mp.weixin.qq.com/s?__biz=MzIxMDAwMDAxMw==&mid=2650724845&idx=1&sn=8631c590ff4876ba0b7af64df16fc54b&scene=19#wechat_redirect

    用netty实现长连接和心跳监测的示例代码

    用netty实现长连接和心跳监测的示例代码

    基于netty实现的支持长连接的rpc

    标题中的“基于netty实现的支持长连接的rpc”是指利用Netty框架构建一个远程过程调用(RPC)系统,该系统能够维持长时间的连接状态,提高通信效率。Netty是一个高性能、异步事件驱动的网络应用程序框架,适用于开发...

    WebSocket利用netty连接入门项目

    这个入门项目是学习WebSocket与Netty结合的一个好起点,通过实际操作,你可以更深入地理解WebSocket协议的工作原理,以及如何使用Netty构建高效稳定的WebSocket服务器。同时,对于前端开发人员,这也是一个了解...

    netty4Test

    学习netty框架编写的maven项目,使用netty4.1.1.Final版本实现的netty客户端自动重连,检测链路空闲时自动发送心跳包,如没有收到返回则自动断开连接重连。先运行NettyServerBootstrap类启动Server端,再启动Netty...

    netty客户端连接多个服务端

    服务器上部署TCP客户端程序,主动连接下属的各个终端,终端上面跑TCP服务端程序。

    Netty实现长连接通讯-连接协议为了简单json封装

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,常用于开发服务器和客户端的通信应用,尤其在处理高并发、低延迟的场景下表现出色。本教程将围绕"Netty实现长连接通讯"展开,重点讲解如何使用Netty框架构建...

Global site tag (gtag.js) - Google Analytics