`

netty心跳检查之UDP篇

阅读更多
  部分UDP通信场景中,需要客户端定期发送心跳信息,以获取终端的状态,并获取终端IP,以便服务器主动发送控制命令。如移动通信,内网穿越等。
  使用TCP方式通信,心跳是比较容易实现的,使用IdleStateHandler监控channel,然后在自定义的Handler中处理几个对应的事件就可以了。但是对于UDP,就不灵了。

  学习研究netty,做了一个简单而完善的例子:通过UDP通信,客户端上线,发送一条信息,服务器响应(不在Handler中响应,在其他线程中处理)。服务器主动向客户端发问候消息,监控到无心跳后,踢掉客户端。
  程序逻辑比较简单,不多解释,请看注释。

一、辅助类
package com.wallimn.iteye.netty.heart;

import java.net.InetSocketAddress;

import io.netty.util.AttributeKey;

/**
 * 记录一些常量。真正的应用要从配置文件中读取。
 * 
 * <br>
 * <br>时间:2019年9月14日 下午11:41:26,作者:wallimn
 */
public class Config {
	private Config(){};
	
	public static final AttributeKey<InetSocketAddress> SERVER_ADDR_ATTR=AttributeKey.newInstance("SERVER_ADDR_ATTR");
	
	//原来打算将客户端的ID记录在Channel的属性中,后来发现对于UDP不适用。
	//public static final AttributeKey<String> CLIENT_ID=AttributeKey.newInstance("CLIENT_ID");
	public static final int IDLE_TIME=5;//允许的发呆时间
	
	public static final int SERVER_PORT=8585;
	public static final String SERVER_IP="localhost";
	
	public static final long CLIENT_VALID_THRESHOLD=5000;//客户端地址有效的时间阀值。单位为毫秒。
}


package com.wallimn.iteye.netty.heart;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;

/**
 * 用来模拟持有数据
 * 
 * <br>
 * <br>时间:2019年9月14日 下午7:58:40,作者:wallimn
 */
public class DataHolder {
	
	private DataHolder(){}
	
	/**
	 * 记录客户端的消息
	 */
	public static ConcurrentLinkedQueue<ClientMessage> clientMessageQueue = new ConcurrentLinkedQueue<ClientMessage>();
	
	/**
	 * 记录由心跳获取的客户端地址,用于服务器主动给客户端发消息
	 */
	public static ConcurrentMap<String, ClientInformation> clientInformationMap = new ConcurrentHashMap<String, ClientInformation>();
}


package com.wallimn.iteye.netty.heart;

import java.net.InetSocketAddress;
import java.util.Date;

import lombok.Data;

/**
 * 客户端信息
 * 
 * <br>
 * <br>时间:2019年9月14日 下午8:55:36,作者:wallimn
 */

@Data
public class ClientInformation {
	/**
	 * 客户端唯一标识
	 */
	private String id;
	/**
	 * 收到时间
	 */
	private Date recordTime;
	/**
	 * 客户端地址,
	 */
	private InetSocketAddress address;
}


package com.wallimn.iteye.netty.heart;

import lombok.Data;

/**
 * 客户端发来的消息
 * 
 * <br>
 * <br>时间:2019年9月14日 下午8:55:36,作者:wallimn
 */

@Data
public class ClientMessage {
	/**
	 * 消息
	 */
	private String message;
	
	/**
	 * 客户端信息
	 */
	private ClientInformation client;
}


二、客户端代码
package com.wallimn.iteye.netty.heart;

import java.net.InetSocketAddress;
import java.util.UUID;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.CharsetUtil;

/**
 * 客户端处理器(handler),并无太多逻辑。仅发了一条访问时间的信息(time)、读取服务器信息并显示。
 * 
 * <br>
 * <br>时间:2019年9月14日 下午9:09:47,作者:wallimn
 */
public class ClientHandler extends SimpleChannelInboundHandler<DatagramPacket> {

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
		String msg = packet.content().toString(CharsetUtil.UTF_8);
		System.out.println(msg);
		
		//如果收到exit信息,关闭channel
		if("exit".equals(msg)){
			ctx.close();
		}
	}

	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		InetSocketAddress addr = ctx.channel().attr(Config.SERVER_ADDR_ATTR).get();
		String clientId;
		String message;
		
		//发送1条心跳
		clientId = UUID.randomUUID().toString().toUpperCase().replace("-", "");
//		message = clientId+";"+"heart";//发送的信息
//		ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(message,CharsetUtil.UTF_8),addr));
//		System.out.println("发送一条心跳");//不用专门发心跳信息,任何发到服务器的信息都可以用于服务器更新心跳记录


		message = clientId+";"+"time";//发送的信息
		ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(message,CharsetUtil.UTF_8),addr));
		System.out.println("发送对时信息");
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		
		ctx.close();
	}


}


package com.wallimn.iteye.netty.heart;

import java.net.InetSocketAddress;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;

/**
 * 客户端程序
 * 启动命令:java -classpath .;netty-all-4.1.38.Final.jar com.wallimn.iteye.netty.heart.ClientApp
 * 
 * <br>
 * <br>时间:2019年9月14日 上午8:58:13,作者:wallimn
 */
public class ClientApp {
	

	public static void main(String[] args) {
		int port = Config.SERVER_PORT;
		new ClientApp().run(port);
	}

	
	public void run(int port){
		EventLoopGroup group = new NioEventLoopGroup();
		Bootstrap b = new Bootstrap();
		b.group(group)
			.channel(NioDatagramChannel.class)
			//.option(ChannelOption.SO_BROADCAST, true)
			.handler(new ClientHandler());
		InetSocketAddress addr = new InetSocketAddress(Config.SERVER_IP,port);
		b.attr(Config.SERVER_ADDR_ATTR, addr);
		
		try {
			Channel ch = b.bind(0).sync().channel();//使用一个随机端口
			
			//最长运行30秒
			if(!ch.closeFuture().await(30000)){
				System.out.println("操作超时");
			}
			
			System.out.println("退出");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		finally{
			group.shutdownGracefully();
		}
	}
}


三、服务器端代码
package com.wallimn.iteye.netty.heart;

import java.util.Date;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.CharsetUtil;
/**
 * 服务器处理器(handler)
 * 
 * <br>
 * <br>时间:2019年9月15日 上午8:37:15,作者:wallimn
 */
public class ServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
		System.out.println("读取信息,channelShortId="+ctx.channel().id().asShortText());
		String msg = packet.content().toString(CharsetUtil.UTF_8);
		System.out.println("host: " + packet.sender().getHostString());
		System.out.println("port: " + packet.sender().getPort());
		System.out.println("content: " + msg);
		String[] fields = msg.split(";");
		if (fields.length != 2) {
			return;
		}
		ClientInformation client = new ClientInformation();
		client.setId(fields[0]);
		client.setRecordTime(new Date());
		client.setAddress(packet.sender());
		ClientMessage message = new ClientMessage();
		message.setClient(client);
		message.setMessage(fields[1]);
		
		System.out.println("加入待处理数据队列");
		
		//标注客户端的ID

		// 不对消息进行处理,只是加入队列,由其他线程进行处理
		if(!"heart".equals(message.getMessage())){//如果不是心跳消息
			DataHolder.clientMessageQueue.add(message);		
		}
		
		//不管什么消息,更新客户端的信息
		DataHolder.clientInformationMap.put(client.getId(), client);
		
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		super.exceptionCaught(ctx, cause);
	}
	
//这个方法对于UDP没有什么意义
//	@Override
//	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
//		if (evt instanceof IdleStateEvent) {
//			IdleStateEvent e = (IdleStateEvent) evt;
//			switch (e.state()) {
//			case READER_IDLE:
//				System.out.println("READER_IDLE");
//				break;
//			case WRITER_IDLE:
//				System.out.println("WRITER_IDLE");
//				break;
//			case ALL_IDLE:
//				System.out.println("ALL_IDLE");
//				break;
//			default:
//				break;
//			}
//		}
//	}

}


package com.wallimn.iteye.netty.heart;

import java.util.Date;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;


import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.util.CharsetUtil;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;

/**
 * 服务器应用
 * 启动命令:java -classpath .;netty-all-4.1.38.Final.jar com.wallimn.iteye.netty.heart.ServerApp
 * <br>
 * <br>
 * 时间:2019年9月14日 上午9:47:29,作者:wallimn
 */
public class ServerApp {


	//public static ConcurrentMap<String, ClientMessage> clientMessageMap = new ConcurrentHashMap<String, ClientMessage>();

	// 内部放置多个Task,
	public static HashedWheelTimer timer = new HashedWheelTimer(Executors.defaultThreadFactory(), 1, // tick一下的时间
			TimeUnit.SECONDS, 3);// 放置Timer的数量

	public static Channel channel = null;

	public static final long RESPONSE_TIMEER_DELAY = 1L;
	public static final long CHECK_TIMEER_DELAY = 5L;
	public static final long HELLO_TIMEER_DELAY = 2L;

	/**
	 * 处理客户端发来的消息
	 */
	public static TimerTask responseTasker = new TimerTask() {
		public void run(Timeout timeout) throws Exception {
			timer.newTimeout(this, RESPONSE_TIMEER_DELAY, TimeUnit.SECONDS);
			if (channel == null){
				System.out.println("channel is null");
				return;
			}
			if (channel.isActive() == false) {
				System.out.println("channel is inactive");
				timeout.cancel();
				return;
			}
			//System.out.println("responseTasker run, size is "+DataHolder.clientMessageQueue.size());
			
			ClientMessage message;
			for (Iterator<ClientMessage> iterator=DataHolder.clientMessageQueue.iterator();iterator.hasNext();) {
				message = iterator.next();
				System.out.println(message.getMessage());
				if("time".equals(message.getMessage())){
					channel.writeAndFlush(
							new DatagramPacket(Unpooled.copiedBuffer("18:18", CharsetUtil.UTF_8), message.getClient().getAddress()));
				}
				else{
					;
				}
				//处理完后清除
				iterator.remove();
			}

		}
	};

	/**
	 * 用于检查客户端是否有效
	 */
	public static TimerTask checkTasker = new TimerTask() {
		public void run(Timeout timeout) throws Exception {
			timer.newTimeout(this, CHECK_TIMEER_DELAY, TimeUnit.SECONDS);
			ClientInformation client = null;
			long now = new Date().getTime();
			for (Entry<String, ClientInformation> entry : DataHolder.clientInformationMap.entrySet()) {
				client = entry.getValue();
				if (now - client.getRecordTime().getTime() > Config.CLIENT_VALID_THRESHOLD) {

					System.out.println("client kick : " + client.getId());
					DataHolder.clientInformationMap.remove(entry.getKey());
				}
			}

		}

	};
	
	/**
	 * 用于模拟主动向客户端发送消息
	 */
	public static TimerTask helloTimer = new TimerTask(){

		public void run(Timeout timeout) throws Exception {
			if (channel == null){
				System.out.println("channel is null");
				return;
			}
			if (channel.isActive() == false) {
				System.out.println("channel is inactive");
				timeout.cancel();
				return;
			}
			timer.newTimeout(this, HELLO_TIMEER_DELAY, TimeUnit.SECONDS);
			ClientInformation client = null;
			for (Entry<String, ClientInformation> entry : DataHolder.clientInformationMap.entrySet()) {
				client = entry.getValue();
				System.out.println("helloTimer run. send to "+client.getId());
				
				channel.writeAndFlush(
						new DatagramPacket(Unpooled.copiedBuffer("hello", CharsetUtil.UTF_8), client.getAddress()));
			}
		}
		
	};
	
	
	public static void main(String[] args) throws Exception {
		int port = Config.SERVER_PORT;
		timer.newTimeout(responseTasker, RESPONSE_TIMEER_DELAY, TimeUnit.SECONDS);
		timer.newTimeout(checkTasker, CHECK_TIMEER_DELAY, TimeUnit.SECONDS);
		timer.newTimeout(helloTimer, HELLO_TIMEER_DELAY, TimeUnit.SECONDS);
		new ServerApp().run(port);

	}

	public void run(int port) throws Exception {
		EventLoopGroup group = new NioEventLoopGroup();
		Bootstrap b = new Bootstrap();

		b.group(group).channel(NioDatagramChannel.class)
				// .option(ChannelOption.SO_BROADCAST, true)
				.handler(new ChannelInitializer<Channel>(){

					@Override
					protected void initChannel(Channel ch) throws Exception {
						//ch.pipeline().addLast(new IdleStateHandler(5, 0, 0));
						ch.pipeline().addLast(new ServerHandler());
					}
					
				});

		ChannelFuture future = b.bind(port).sync();
		channel = future.channel();
		System.out.println("服务器准备就绪,channelShortId="+channel.id().asShortText());
		channel.closeFuture().await();

		group.shutdownGracefully();
	}
}



2
0
分享到:
评论

相关推荐

    netty基于http socket websocke及心跳包机制t的demo

    3. 使用Netty的ChannelHandlerContext发送心跳包,并设置相应的超时检查和处理机制。 4. 如何处理Socket连接,发送和接收自定义协议的数据。 5. 示例代码展示如何在Java中编写和运行Netty应用。 这个demo将是一个很...

    面试官:Netty心跳检测机制是什么,怎么自定义检测间隔时间?.doc

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,常用于开发高并发、低延迟的网络服务,如TCP和UDP应用。在长连接场景下,保持连接的稳定性至关重要,这就涉及到了心跳检测机制。 心跳检测机制是网络通信中...

    springboot netty4 及时通讯(带心跳检测,zookeeper)

    本文将深入探讨如何使用SpringBoot整合Netty4实现即时通讯,并结合Zookeeper进行服务发现与管理,同时包含心跳检测和下线重连机制。 **一、SpringBoot与Netty集成** SpringBoot以其简化Spring应用初始搭建以及开发...

    netty通信以及心跳demo

    例如,可以创建一个 HeartbeatHandler,设置一个定时任务定期发送心跳,并在 channelRead() 方法中检查是否接收到的是心跳响应。 六、Netty-chat 示例项目 "netty-chat" 文件可能是一个简单的 Netty 聊天应用示例,...

    netty-4.1_javaNetty_netty_服务器_

    9. **心跳与保持连接**:了解如何在 Netty 中实现心跳机制,以检测连接是否断开,以及如何维持长连接。 10. **安全性**:研究 SSL/TLS 的配置和使用,以实现安全的网络通信。 通过学习和实践这些知识点,你可以...

    netty+4G DTU

    Netty 提供了丰富的 ChannelHandler 接口,开发者可以根据需求自定义数据的编码解码、连接管理、心跳检测等功能。 以下是使用 Netty 开发 IoT 应用的关键步骤: 1. **配置 Netty 服务器**:创建 ServerBootstrap,...

    Netty实战 电子版.pdf_java_netty_服务器_

    7. **心跳机制**:Netty提供了心跳包处理,用于检测连接的活跃状态,防止连接长时间无交互导致的失效。 8. **异常处理**:Netty提供了一套完整的异常处理机制,使得在发生错误时能够优雅地恢复或关闭连接。 9. **...

    netty-netty-4.1.69.Final.tar.gz

    9. **心跳与空闲检测**:Netty提供心跳机制和空闲检测,以保持连接的活跃状态并及时发现死连接。 10. **文档与社区支持**:Netty有详尽的官方文档和活跃的社区,为开发者提供强大的支持。 下载并解压“netty-netty...

    网络编程之Netty一站式精讲.rar

    6. 快速故障恢复:通过心跳检测和自动重连机制,提高了系统的稳定性和可靠性。 三、Netty核心组件 1. Channel:表示网络连接,负责数据的发送和接收。 2. EventLoop:事件循环,负责处理I/O事件和执行回调任务。 3...

    深入浅出Netty_netty_

    除此之外,Netty还提供了强大的心跳检测机制,可以防止因网络延迟或故障导致的连接僵死。其优雅的关闭机制也能确保在系统关闭时,所有正在处理的连接都能得到妥善处理。 总之,《深入浅出Netty》会带你深入理解...

    物联网之java实现(springboot + netty + 心跳,附完整源码)

    本实例主要探讨如何利用Java、Spring Boot和Netty来创建一个带有心跳机制的物联网系统,并提供完整的源代码供参考。 **Java在物联网中的角色** Java作为跨平台的编程语言,拥有广泛的应用场景,特别是在物联网(IoT)...

    使用netty3建立的JT809处理基本工程

    我们需要在Netty中实现心跳检测和回应机制,确保连接不会因长时间无数据交换而断开。 5. **异常处理**:在处理网络通信时,必须考虑各种可能出现的异常情况,如网络中断、数据解析错误等。Netty提供了`...

    netty 代码.rar

    - **心跳机制**:Netty 可以轻松实现心跳检测,保证连接的有效性。 - **编解码**:例如,Netty 提供的 LengthFieldBasedFrameDecoder 可以解析带有长度字段的协议,适合即时通讯中的消息传输。 - **WebSocket ...

    netty5.0官方自带的demo

    Netty的强大之处在于它能够轻松地处理自定义协议。通过示例,我们可以学习如何定义自己的消息结构,创建相应的编解码器,并将其整合进Netty的Pipeline中。 10. **异常处理** Netty提供了全面的异常处理机制,通过...

    learning-netty.zip

    - **丰富的组件库**: 提供大量预定义的Handler,如解码器、编码器、心跳检测等,简化开发工作。 - **强大的错误处理**: 自动处理异常,提供优雅的故障恢复机制。 3. **Netty 4.1的新特性和改进** - **ByteBuf...

    Netty-API-文档中文版

    文档还会介绍如何进行连接管理、读写操作、心跳检测、流量控制等网络编程的关键技术。 在实际开发中,了解并掌握Netty的API和设计理念,不仅可以提高网络应用的性能,还能极大地提升开发效率,降低维护成本。因此,...

    Netty实战-Netty.zip

    6. **心跳与空闲检测**:Netty 内置了心跳机制和空闲状态检测,可以防止网络连接长时间无交互导致的问题。 7. **强大的异常处理**:Netty 提供了统一的异常处理机制,可以优雅地处理网络通信过程中的各种异常。 8....

    netty资料.rar

    - **心跳机制**:学习如何在Netty中实现心跳检查,保持连接的活跃性。 - **异常处理**:理解Netty的异常处理机制,如何优雅地处理网络异常。 - **性能优化**:学习如何通过配置和调整参数来提升Netty应用的性能。 ...

    netty权威指南和源码

    8. **心跳机制**:Netty提供心跳检测功能,用于维持长连接的活跃性,防止因网络问题导致的连接断开。 9. **WebSocket和HTTP支持**:Netty提供了WebSocket和HTTP/HTTPS协议的支持,使得开发Web服务更加方便。 10. *...

    netty官方例子

    5. **TCP和UDP通信**:Netty支持TCP和UDP协议,你可以找到处理TCP连接和UDP数据报的示例。 6. **SSL/TLS加密通信**:Netty提供了SSL支持,示例会展示如何在服务器和客户端之间建立安全的HTTPS连接。 7. **编解码器...

Global site tag (gtag.js) - Google Analytics