`

netty心跳检测

 
阅读更多

 

package bhz.netty.heartBeat;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class Client {

	
	public static void main(String[] args) throws Exception{
		
		EventLoopGroup group = new NioEventLoopGroup();
		Bootstrap b = new Bootstrap();
		b.group(group)
		 .channel(NioSocketChannel.class)
		 .handler(new ChannelInitializer<SocketChannel>() {
			@Override
			protected void initChannel(SocketChannel sc) throws Exception {
				sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
				sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
				sc.pipeline().addLast(new ClienHeartBeattHandler());
			}
		});
		
		ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();

		cf.channel().closeFuture().sync();
		group.shutdownGracefully();
	}
}

 

package bhz.netty.heartBeat;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.hyperic.sigar.CpuPerc;
import org.hyperic.sigar.Mem;
import org.hyperic.sigar.Sigar;
import org.hyperic.sigar.Swap;


public class ClienHeartBeattHandler extends ChannelHandlerAdapter {

    private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    
    private ScheduledFuture<?> heartBeat;
	//主动向服务器发送认证信息
    private InetAddress addr ;
    
    private static final String SUCCESS_KEY = "auth_success_key";

	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		addr = InetAddress.getLocalHost();
        String ip = addr.getHostAddress();
		String key = "1234";
		//证书
		String auth = ip + "," + key;
		ctx.writeAndFlush(auth);
	}
	
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    	try {
        	if(msg instanceof String){
        		String ret = (String)msg;
        		if(SUCCESS_KEY.equals(ret)){
        	    	// 握手成功,主动发送心跳消息
        	    	this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 2, TimeUnit.SECONDS);
        		    System.out.println(msg);    			
        		}
        		else {
        			System.out.println(msg);
        		}
        	}
		} finally {
			ReferenceCountUtil.release(msg);
		}
    }

    private class HeartBeatTask implements Runnable {
    	private final ChannelHandlerContext ctx;

		public HeartBeatTask(final ChannelHandlerContext ctx) {
		    this.ctx = ctx;
		}
	
		@Override
		public void run() {
			try {
			    RequestInfo info = new RequestInfo();
			    //ip
			    info.setIp(addr.getHostAddress());
		        Sigar sigar = new Sigar();
		        //cpu prec
		        CpuPerc cpuPerc = sigar.getCpuPerc();
		        HashMap<String, Object> cpuPercMap = new HashMap<String, Object>();
		        cpuPercMap.put("combined", cpuPerc.getCombined());
		        cpuPercMap.put("user", cpuPerc.getUser());
		        cpuPercMap.put("sys", cpuPerc.getSys());
		        cpuPercMap.put("wait", cpuPerc.getWait());
		        cpuPercMap.put("idle", cpuPerc.getIdle());
		        // memory
		        Mem mem = sigar.getMem();
				HashMap<String, Object> memoryMap = new HashMap<String, Object>();
				memoryMap.put("total", mem.getTotal() / 1024L);
				memoryMap.put("used", mem.getUsed() / 1024L);
				memoryMap.put("free", mem.getFree() / 1024L);
				info.setCpuPercMap(cpuPercMap);
			    info.setMemoryMap(memoryMap);
			    ctx.writeAndFlush(info);
			    
			} catch (Exception e) {
				e.printStackTrace();
			}
		}

	    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
	    	cause.printStackTrace();
			if (heartBeat != null) {
			    heartBeat.cancel(true);
			    heartBeat = null;
			}
			ctx.fireExceptionCaught(cause);
	    }
	    
	}
}

 

package bhz.netty.heartBeat;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class Server {

	public static void main(String[] args) throws Exception{
		
		EventLoopGroup pGroup = new NioEventLoopGroup();
		EventLoopGroup cGroup = new NioEventLoopGroup();
		
		ServerBootstrap b = new ServerBootstrap();
		b.group(pGroup, cGroup)
		 .channel(NioServerSocketChannel.class)
		 .option(ChannelOption.SO_BACKLOG, 1024)
		 //设置日志
		 .handler(new LoggingHandler(LogLevel.INFO))
		 .childHandler(new ChannelInitializer<SocketChannel>() {
			protected void initChannel(SocketChannel sc) throws Exception {
				sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
				sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
				sc.pipeline().addLast(new ServerHeartBeatHandler());
			}
		});
		
		ChannelFuture cf = b.bind(8765).sync();
		
		cf.channel().closeFuture().sync();
		pGroup.shutdownGracefully();
		cGroup.shutdownGracefully();
		
	}
}

 

package bhz.netty.heartBeat;

import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.util.HashMap;

public class ServerHeartBeatHandler extends ChannelHandlerAdapter {
    
	/** key:ip value:auth */
	private static HashMap<String, String> AUTH_IP_MAP = new HashMap<String, String>();
	private static final String SUCCESS_KEY = "auth_success_key";
	
	static {
		AUTH_IP_MAP.put("192.168.1.194", "1234");
	}
	
	private boolean auth(ChannelHandlerContext ctx, Object msg){
			//System.out.println(msg);
			String [] ret = ((String) msg).split(",");
			String auth = AUTH_IP_MAP.get(ret[0]);
			if(auth != null && auth.equals(ret[1])){
				ctx.writeAndFlush(SUCCESS_KEY);
				return true;
			} else {
				ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE);
				return false;
			}
	}
	
	@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		if(msg instanceof String){
			auth(ctx, msg);
		} else if (msg instanceof RequestInfo) {
			
			RequestInfo info = (RequestInfo) msg;
			System.out.println("--------------------------------------------");
			System.out.println("当前主机ip为: " + info.getIp());
			System.out.println("当前主机cpu情况: ");
			HashMap<String, Object> cpu = info.getCpuPercMap();
			System.out.println("总使用率: " + cpu.get("combined"));
			System.out.println("用户使用率: " + cpu.get("user"));
			System.out.println("系统使用率: " + cpu.get("sys"));
			System.out.println("等待率: " + cpu.get("wait"));
			System.out.println("空闲率: " + cpu.get("idle"));
			
			System.out.println("当前主机memory情况: ");
			HashMap<String, Object> memory = info.getMemoryMap();
			System.out.println("内存总量: " + memory.get("total"));
			System.out.println("当前内存使用量: " + memory.get("used"));
			System.out.println("当前内存剩余量: " + memory.get("free"));
			System.out.println("--------------------------------------------");
			
			ctx.writeAndFlush("info received!");
		} else {
			ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE);
		}
    }


}

 

package bhz.netty.heartBeat;

import java.io.Serializable;
import java.util.HashMap;

public class RequestInfo implements Serializable {

	private String ip ;
	private HashMap<String, Object> cpuPercMap ;
	private HashMap<String, Object> memoryMap;
	//.. other field
	
	public String getIp() {
		return ip;
	}
	public void setIp(String ip) {
		this.ip = ip;
	}
	public HashMap<String, Object> getCpuPercMap() {
		return cpuPercMap;
	}
	public void setCpuPercMap(HashMap<String, Object> cpuPercMap) {
		this.cpuPercMap = cpuPercMap;
	}
	public HashMap<String, Object> getMemoryMap() {
		return memoryMap;
	}
	public void setMemoryMap(HashMap<String, Object> memoryMap) {
		this.memoryMap = memoryMap;
	}
	
	
}

 

package bhz.netty.heartBeat;

import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;

import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;

/**
 * Marshalling工厂
 * @author(alienware)
 * @since 2014-12-16
 */
public final class MarshallingCodeCFactory {

    /**
     * 创建Jboss Marshalling解码器MarshallingDecoder
     * @return MarshallingDecoder
     */
    public static MarshallingDecoder buildMarshallingDecoder() {
    	//首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
		final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
		//创建了MarshallingConfiguration对象,配置了版本号为5 
		final MarshallingConfiguration configuration = new MarshallingConfiguration();
		configuration.setVersion(5);
		//根据marshallerFactory和configuration创建provider
		UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
		//构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
		MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
		return decoder;
    }

    /**
     * 创建Jboss Marshalling编码器MarshallingEncoder
     * @return MarshallingEncoder
     */
    public static MarshallingEncoder buildMarshallingEncoder() {
		final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
		final MarshallingConfiguration configuration = new MarshallingConfiguration();
		configuration.setVersion(5);
		MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
		//构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
		MarshallingEncoder encoder = new MarshallingEncoder(provider);
		return encoder;
    }
}

 

 

分享到:
评论

相关推荐

    springboot整合 netty做心跳检测

    springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合...

    spring整合netty心跳检测

    下面我们将深入探讨"spring整合netty心跳检测"这一主题。 首先,我们要理解心跳检测的基本概念。心跳检测是指在通信双方之间周期性地发送一个简单的消息,用来确认连接是否仍然有效,以及数据传输是否正常。这对于...

    基于netty_4.0.26的心跳检测

    7. **跨平台兼容性**:本示例提及的Netty心跳检测技术在Java端和Android上都已测试通过,说明Netty的API设计是高度统一和兼容的,能够很好地适应多平台环境。 总结起来,Netty的心跳检测机制是通过自定义处理器和...

    断网断电心跳检测

    本文将深入探讨“断网断电心跳检测”这一技术主题,以及它与Netty心跳检测的关联。 WebSocket协议允许服务器和客户端之间建立持久连接,允许双向数据传输,极大地简化了实时应用的开发。然而,当网络中断或设备断电...

    netty 心跳检测

    通过以上步骤,我们可以创建一个完整的Netty心跳检测系统。在实际应用中,还需要考虑性能优化,如避免过多的心跳包导致的网络拥堵,以及合理设置心跳间隔和超时时间,以平衡检测效率和资源消耗。 这个"netty-test...

    Netty心跳检测机制.zip

    Netty心跳检测机制

    面试官:Netty心跳检测机制是什么,怎么自定义检测间隔时间?.doc

    在 Netty 中,心跳检测可以通过 `IdleStateHandler` 实现。`IdleStateHandler` 是一个处理空闲状态的处理器,它可以监控连接的空闲时间并触发相应的事件。当我们创建 `IdleStateHandler` 实例时,需要提供三个参数:...

    netty+websocket实现心跳和断线重连

    在本文中,我们将深入探讨如何利用 Netty 和 WebSocket 实现心跳检测和断线重连机制。 首先,我们需要理解 WebSocket 协议。WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,它为客户端和服务器提供了低...

    SpringBoot整合Netty心跳机制过程详解

    SpringBoot 整合 Netty 心跳机制过程详解 SpringBoot 整合 Netty 心跳机制过程详解是指通过 SpringBoot 框架整合 Netty 网络框架来实现心跳机制的过程。心跳机制是指客户端和服务端之间的连接维持机制,当客户端和...

    netty 心跳实现

    - **心跳检测**:服务器端可以设置定时任务,检测上一次收到客户端心跳的时间,如果超过预设阈值,认为客户端断线,关闭连接。 ### 5. Netty 心跳示例代码 ```java public class HeartbeatHandler extends ...

    基于springboot+netty实现的心跳检测源码+项目说明文档.zip

    基于springboot+netty实现的心跳检测源码+项目说明文档.zip (1),NioEventLoopGroup是一个处理I / O操作的多线程事件循环。 Netty为不同类型的传输提供各种EventLoopGroup实现。我们在此示例中实现了服务器端应用程序...

    netty基于http socket websocke及心跳包机制t的demo

    心跳包机制在网络通信中扮演着重要角色,特别是在长时间无数据交换或需要检测连接状态的场景下。在Netty中,心跳包可以是自定义的协议消息,服务器和客户端定期交换这些消息以确认连接的活跃性。如果在预设时间内...

    基于springboot+netty实现的心跳检测-源码

    本项目是基于Spring Boot和Netty框架实现的心跳检测功能,它能够帮助开发者在分布式系统或微服务架构中检测服务是否正常运行,及时发现并处理网络故障。下面将详细解释这个项目涉及的关键知识点。 1. **Spring Boot...

    springboot netty4 及时通讯(带心跳检测,zookeeper)

    本文将深入探讨如何使用SpringBoot整合Netty4实现即时通讯,并结合Zookeeper进行服务发现与管理,同时包含心跳检测和下线重连机制。 **一、SpringBoot与Netty集成** SpringBoot以其简化Spring应用初始搭建以及开发...

    用netty实现长连接和心跳监测的示例代码

    用netty实现长连接和心跳监测的示例代码

    Netty空闲检测&Keepalive.pdf

    // 在这里可以发送心跳检测信息或关闭连接 ctx.close(); } } } ``` #### 三、Keepalive机制 ##### 3.1 基本概念 - **Keepalive** 是一种基于 TCP/IP 协议栈提供的机制,依赖于操作系统的实现。 - 当 TCP 连接...

    netty通信以及心跳demo

    心跳机制在长连接通信中非常重要,用于检测网络连接的健康状态,防止因网络延迟或故障导致的连接失效。Netty 中实现心跳通常有以下步骤: 1. **定义心跳消息**:创建一个特定的 ByteBuf 或其他消息类型作为心跳包。...

    netty断线重连机制及心跳机制.rar

    在本文中,我们将深入探讨 Netty 的断线重连机制和心跳机制,这两个特性对于维持稳定可靠的网络通信至关重要。 首先,让我们了解**断线重连机制**。在分布式系统中,网络连接可能会因为各种原因中断,如网络抖动、...

    netty学习demo(初学代码结构+固定消息+自定义分隔符+自定义协议+心跳+http+序列化压缩+自动断线)

    你可以创建一个`IdleStateHandler`,当检测到连接长时间无数据交换时,发送心跳包。 HTTP 支持是Netty的一个强大特性,Netty提供了`HttpServerCodec`和`HttpClientCodec`来处理HTTP请求和响应。结合`...

    netty学习之心跳.rar

    心跳机制在网络通信中扮演着至关重要的角色,它确保了连接的活跃性,并能及时检测到网络故障。 心跳机制的基本概念是:在网络连接的两端,节点间定期发送小量数据(心跳包),以确认对方是否还在线并能够正常接收...

Global site tag (gtag.js) - Google Analytics