- 浏览: 5161112 次
- 性别:
- 来自: 北京
文章分类
最新评论
-
silence19841230:
先拿走看看
SpringBoot2.0开发WebSocket应用完整示例 -
wallimn:
masuweng 写道发下源码下载地址吧!三个相关文件打了个包 ...
SpringBoot2.0开发WebSocket应用完整示例 -
masuweng:
发下源码下载地址吧!
SpringBoot2.0开发WebSocket应用完整示例 -
masuweng:
SpringBoot2.0开发WebSocket应用完整示例 -
wallimn:
水淼火 写道你好,我使用以后,图标不显示,应该怎么引用呢,谢谢 ...
前端框架iviewui使用示例之菜单+多Tab页布局
部分UDP通信场景中,需要客户端定期发送心跳信息,以获取终端的状态,并获取终端IP,以便服务器主动发送控制命令。如移动通信,内网穿越等。
使用TCP方式通信,心跳是比较容易实现的,使用IdleStateHandler监控channel,然后在自定义的Handler中处理几个对应的事件就可以了。但是对于UDP,就不灵了。
学习研究netty,做了一个简单而完善的例子:通过UDP通信,客户端上线,发送一条信息,服务器响应(不在Handler中响应,在其他线程中处理)。服务器主动向客户端发问候消息,监控到无心跳后,踢掉客户端。
程序逻辑比较简单,不多解释,请看注释。
一、辅助类
二、客户端代码
三、服务器端代码
使用TCP方式通信,心跳是比较容易实现的,使用IdleStateHandler监控channel,然后在自定义的Handler中处理几个对应的事件就可以了。但是对于UDP,就不灵了。
学习研究netty,做了一个简单而完善的例子:通过UDP通信,客户端上线,发送一条信息,服务器响应(不在Handler中响应,在其他线程中处理)。服务器主动向客户端发问候消息,监控到无心跳后,踢掉客户端。
程序逻辑比较简单,不多解释,请看注释。
一、辅助类
package com.wallimn.iteye.netty.heart; import java.net.InetSocketAddress; import io.netty.util.AttributeKey; /** * 记录一些常量。真正的应用要从配置文件中读取。 * * <br> * <br>时间:2019年9月14日 下午11:41:26,作者:wallimn */ public class Config { private Config(){}; public static final AttributeKey<InetSocketAddress> SERVER_ADDR_ATTR=AttributeKey.newInstance("SERVER_ADDR_ATTR"); //原来打算将客户端的ID记录在Channel的属性中,后来发现对于UDP不适用。 //public static final AttributeKey<String> CLIENT_ID=AttributeKey.newInstance("CLIENT_ID"); public static final int IDLE_TIME=5;//允许的发呆时间 public static final int SERVER_PORT=8585; public static final String SERVER_IP="localhost"; public static final long CLIENT_VALID_THRESHOLD=5000;//客户端地址有效的时间阀值。单位为毫秒。 }
package com.wallimn.iteye.netty.heart; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; /** * 用来模拟持有数据 * * <br> * <br>时间:2019年9月14日 下午7:58:40,作者:wallimn */ public class DataHolder { private DataHolder(){} /** * 记录客户端的消息 */ public static ConcurrentLinkedQueue<ClientMessage> clientMessageQueue = new ConcurrentLinkedQueue<ClientMessage>(); /** * 记录由心跳获取的客户端地址,用于服务器主动给客户端发消息 */ public static ConcurrentMap<String, ClientInformation> clientInformationMap = new ConcurrentHashMap<String, ClientInformation>(); }
package com.wallimn.iteye.netty.heart; import java.net.InetSocketAddress; import java.util.Date; import lombok.Data; /** * 客户端信息 * * <br> * <br>时间:2019年9月14日 下午8:55:36,作者:wallimn */ @Data public class ClientInformation { /** * 客户端唯一标识 */ private String id; /** * 收到时间 */ private Date recordTime; /** * 客户端地址, */ private InetSocketAddress address; }
package com.wallimn.iteye.netty.heart; import lombok.Data; /** * 客户端发来的消息 * * <br> * <br>时间:2019年9月14日 下午8:55:36,作者:wallimn */ @Data public class ClientMessage { /** * 消息 */ private String message; /** * 客户端信息 */ private ClientInformation client; }
二、客户端代码
package com.wallimn.iteye.netty.heart; import java.net.InetSocketAddress; import java.util.UUID; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.socket.DatagramPacket; import io.netty.util.CharsetUtil; /** * 客户端处理器(handler),并无太多逻辑。仅发了一条访问时间的信息(time)、读取服务器信息并显示。 * * <br> * <br>时间:2019年9月14日 下午9:09:47,作者:wallimn */ public class ClientHandler extends SimpleChannelInboundHandler<DatagramPacket> { @Override protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception { String msg = packet.content().toString(CharsetUtil.UTF_8); System.out.println(msg); //如果收到exit信息,关闭channel if("exit".equals(msg)){ ctx.close(); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { InetSocketAddress addr = ctx.channel().attr(Config.SERVER_ADDR_ATTR).get(); String clientId; String message; //发送1条心跳 clientId = UUID.randomUUID().toString().toUpperCase().replace("-", ""); // message = clientId+";"+"heart";//发送的信息 // ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(message,CharsetUtil.UTF_8),addr)); // System.out.println("发送一条心跳");//不用专门发心跳信息,任何发到服务器的信息都可以用于服务器更新心跳记录 message = clientId+";"+"time";//发送的信息 ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(message,CharsetUtil.UTF_8),addr)); System.out.println("发送对时信息"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
package com.wallimn.iteye.netty.heart; import java.net.InetSocketAddress; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioDatagramChannel; /** * 客户端程序 * 启动命令:java -classpath .;netty-all-4.1.38.Final.jar com.wallimn.iteye.netty.heart.ClientApp * * <br> * <br>时间:2019年9月14日 上午8:58:13,作者:wallimn */ public class ClientApp { public static void main(String[] args) { int port = Config.SERVER_PORT; new ClientApp().run(port); } public void run(int port){ EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioDatagramChannel.class) //.option(ChannelOption.SO_BROADCAST, true) .handler(new ClientHandler()); InetSocketAddress addr = new InetSocketAddress(Config.SERVER_IP,port); b.attr(Config.SERVER_ADDR_ATTR, addr); try { Channel ch = b.bind(0).sync().channel();//使用一个随机端口 //最长运行30秒 if(!ch.closeFuture().await(30000)){ System.out.println("操作超时"); } System.out.println("退出"); } catch (InterruptedException e) { e.printStackTrace(); } finally{ group.shutdownGracefully(); } } }
三、服务器端代码
package com.wallimn.iteye.netty.heart; import java.util.Date; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.socket.DatagramPacket; import io.netty.util.CharsetUtil; /** * 服务器处理器(handler) * * <br> * <br>时间:2019年9月15日 上午8:37:15,作者:wallimn */ public class ServerHandler extends SimpleChannelInboundHandler<DatagramPacket> { @Override protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception { System.out.println("读取信息,channelShortId="+ctx.channel().id().asShortText()); String msg = packet.content().toString(CharsetUtil.UTF_8); System.out.println("host: " + packet.sender().getHostString()); System.out.println("port: " + packet.sender().getPort()); System.out.println("content: " + msg); String[] fields = msg.split(";"); if (fields.length != 2) { return; } ClientInformation client = new ClientInformation(); client.setId(fields[0]); client.setRecordTime(new Date()); client.setAddress(packet.sender()); ClientMessage message = new ClientMessage(); message.setClient(client); message.setMessage(fields[1]); System.out.println("加入待处理数据队列"); //标注客户端的ID // 不对消息进行处理,只是加入队列,由其他线程进行处理 if(!"heart".equals(message.getMessage())){//如果不是心跳消息 DataHolder.clientMessageQueue.add(message); } //不管什么消息,更新客户端的信息 DataHolder.clientInformationMap.put(client.getId(), client); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); } //这个方法对于UDP没有什么意义 // @Override // public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { // if (evt instanceof IdleStateEvent) { // IdleStateEvent e = (IdleStateEvent) evt; // switch (e.state()) { // case READER_IDLE: // System.out.println("READER_IDLE"); // break; // case WRITER_IDLE: // System.out.println("WRITER_IDLE"); // break; // case ALL_IDLE: // System.out.println("ALL_IDLE"); // break; // default: // break; // } // } // } }
package com.wallimn.iteye.netty.heart; import java.util.Date; import java.util.Iterator; import java.util.Map.Entry; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.util.CharsetUtil; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.TimerTask; /** * 服务器应用 * 启动命令:java -classpath .;netty-all-4.1.38.Final.jar com.wallimn.iteye.netty.heart.ServerApp * <br> * <br> * 时间:2019年9月14日 上午9:47:29,作者:wallimn */ public class ServerApp { //public static ConcurrentMap<String, ClientMessage> clientMessageMap = new ConcurrentHashMap<String, ClientMessage>(); // 内部放置多个Task, public static HashedWheelTimer timer = new HashedWheelTimer(Executors.defaultThreadFactory(), 1, // tick一下的时间 TimeUnit.SECONDS, 3);// 放置Timer的数量 public static Channel channel = null; public static final long RESPONSE_TIMEER_DELAY = 1L; public static final long CHECK_TIMEER_DELAY = 5L; public static final long HELLO_TIMEER_DELAY = 2L; /** * 处理客户端发来的消息 */ public static TimerTask responseTasker = new TimerTask() { public void run(Timeout timeout) throws Exception { timer.newTimeout(this, RESPONSE_TIMEER_DELAY, TimeUnit.SECONDS); if (channel == null){ System.out.println("channel is null"); return; } if (channel.isActive() == false) { System.out.println("channel is inactive"); timeout.cancel(); return; } //System.out.println("responseTasker run, size is "+DataHolder.clientMessageQueue.size()); ClientMessage message; for (Iterator<ClientMessage> iterator=DataHolder.clientMessageQueue.iterator();iterator.hasNext();) { message = iterator.next(); System.out.println(message.getMessage()); if("time".equals(message.getMessage())){ channel.writeAndFlush( new DatagramPacket(Unpooled.copiedBuffer("18:18", CharsetUtil.UTF_8), message.getClient().getAddress())); } else{ ; } //处理完后清除 iterator.remove(); } } }; /** * 用于检查客户端是否有效 */ public static TimerTask checkTasker = new TimerTask() { public void run(Timeout timeout) throws Exception { timer.newTimeout(this, CHECK_TIMEER_DELAY, TimeUnit.SECONDS); ClientInformation client = null; long now = new Date().getTime(); for (Entry<String, ClientInformation> entry : DataHolder.clientInformationMap.entrySet()) { client = entry.getValue(); if (now - client.getRecordTime().getTime() > Config.CLIENT_VALID_THRESHOLD) { System.out.println("client kick : " + client.getId()); DataHolder.clientInformationMap.remove(entry.getKey()); } } } }; /** * 用于模拟主动向客户端发送消息 */ public static TimerTask helloTimer = new TimerTask(){ public void run(Timeout timeout) throws Exception { if (channel == null){ System.out.println("channel is null"); return; } if (channel.isActive() == false) { System.out.println("channel is inactive"); timeout.cancel(); return; } timer.newTimeout(this, HELLO_TIMEER_DELAY, TimeUnit.SECONDS); ClientInformation client = null; for (Entry<String, ClientInformation> entry : DataHolder.clientInformationMap.entrySet()) { client = entry.getValue(); System.out.println("helloTimer run. send to "+client.getId()); channel.writeAndFlush( new DatagramPacket(Unpooled.copiedBuffer("hello", CharsetUtil.UTF_8), client.getAddress())); } } }; public static void main(String[] args) throws Exception { int port = Config.SERVER_PORT; timer.newTimeout(responseTasker, RESPONSE_TIMEER_DELAY, TimeUnit.SECONDS); timer.newTimeout(checkTasker, CHECK_TIMEER_DELAY, TimeUnit.SECONDS); timer.newTimeout(helloTimer, HELLO_TIMEER_DELAY, TimeUnit.SECONDS); new ServerApp().run(port); } public void run(int port) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group).channel(NioDatagramChannel.class) // .option(ChannelOption.SO_BROADCAST, true) .handler(new ChannelInitializer<Channel>(){ @Override protected void initChannel(Channel ch) throws Exception { //ch.pipeline().addLast(new IdleStateHandler(5, 0, 0)); ch.pipeline().addLast(new ServerHandler()); } }); ChannelFuture future = b.bind(port).sync(); channel = future.channel(); System.out.println("服务器准备就绪,channelShortId="+channel.id().asShortText()); channel.closeFuture().await(); group.shutdownGracefully(); } }
发表评论
-
gradle编译错误:Could not find method compile() for arguments
2020-09-19 10:50 18510编译(IDEA+Gradle)一个别人的工程,出现一个 ... -
解决tomcat部署两个SpringBoot应用提示InstanceAlreadyExistsException
2019-06-30 11:49 3391两个SpringBoot应用部署在一个Tomcat中,单独 ... -
Eclipse配置MyBatis代码自动化功能
2019-06-29 10:16 17711.安装插件 Eclipse中,Help->Ecli ... -
vue.js中使用qrcode生成二维码
2019-05-20 00:00 7655一、安装包 npm install qrcodejs2 --s ... -
MySQL插入数据报错: Incorrect string value: '\xFD\xDE'
2019-03-31 23:19 1253我MySQL数据库用的uft-8字符集,插入数据一直很正常 ... -
vue自定义组件并双向绑定属性
2019-03-08 22:46 3258做了两个子组件,原理基本一样,一个是使用原生的select ... -
vue-router简单示例
2019-03-05 00:32 1152写个基本完整、稍有借鉴意义的示例,防止自己忘记。 &l ... -
“联通充值系统繁忙”轻松应对
2019-02-06 11:03 3973大过年的,联通充个值一直报“充值系统繁忙”。昨天晚上试了几 ... -
electron.js数据库应用---导航菜单(element-ui+mysql)
2019-02-05 21:33 2364一、环境搭建 略, ... -
electron.js数据库应用---入门(mysql+element-ui)
2019-01-27 23:19 7503我的机器:Windows10,64 ... -
SpringMVC 在controller层中注入成员变量request,是否线程安全
2018-12-17 21:17 2748@RestController public class ... -
VueJS 组件参数名命名与组件属性转化
2018-12-03 00:00 2075转自:https://www.cnblogs.com/meiy ... -
vue-resource拦截器实现token发送及检验自动化
2018-11-16 22:38 3077用了很长时间vue-resource,最近思考$http发 ... -
element-ui试用手记
2018-10-29 20:25 1747element-ui、iviewui都以vue.js为基础 ... -
iviewui中表格控件中render的使用示例
2018-07-07 16:46 9786示例了如何在表格中显示按钮,如何将代码转化为文字。 i ... -
Tomcat错误“Alias name tomcat does not identify a key entry”解决
2018-07-05 21:39 6574申请到了阿里云的证书后,下载、按照说明生成jks格式证书、 ... -
阿里云免费证书“fileauth.txt内容配置错误”解决
2018-07-05 20:43 5300最近研究微信小程序开发,上阿里云申请了个证书,使用文件验证 ... -
springboot2.0跨域配置
2018-07-04 22:11 5285springboot2.0跨域配置: 一、代码 ... -
微信小程序使用code换openid的方法(JAVA、SpringBoot)
2018-07-01 21:52 10398微信小程序序的代码中提示,使用code换取openid,但 ... -
SpringBoot2.0启用https协议
2018-06-28 23:00 7777SpringBoot2.0之后,启用https协议的方式与 ...
相关推荐
3. 使用Netty的ChannelHandlerContext发送心跳包,并设置相应的超时检查和处理机制。 4. 如何处理Socket连接,发送和接收自定义协议的数据。 5. 示例代码展示如何在Java中编写和运行Netty应用。 这个demo将是一个很...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,常用于开发高并发、低延迟的网络服务,如TCP和UDP应用。在长连接场景下,保持连接的稳定性至关重要,这就涉及到了心跳检测机制。 心跳检测机制是网络通信中...
本文将深入探讨如何使用SpringBoot整合Netty4实现即时通讯,并结合Zookeeper进行服务发现与管理,同时包含心跳检测和下线重连机制。 **一、SpringBoot与Netty集成** SpringBoot以其简化Spring应用初始搭建以及开发...
例如,可以创建一个 HeartbeatHandler,设置一个定时任务定期发送心跳,并在 channelRead() 方法中检查是否接收到的是心跳响应。 六、Netty-chat 示例项目 "netty-chat" 文件可能是一个简单的 Netty 聊天应用示例,...
9. **心跳与保持连接**:了解如何在 Netty 中实现心跳机制,以检测连接是否断开,以及如何维持长连接。 10. **安全性**:研究 SSL/TLS 的配置和使用,以实现安全的网络通信。 通过学习和实践这些知识点,你可以...
Netty 提供了丰富的 ChannelHandler 接口,开发者可以根据需求自定义数据的编码解码、连接管理、心跳检测等功能。 以下是使用 Netty 开发 IoT 应用的关键步骤: 1. **配置 Netty 服务器**:创建 ServerBootstrap,...
7. **心跳机制**:Netty提供了心跳包处理,用于检测连接的活跃状态,防止连接长时间无交互导致的失效。 8. **异常处理**:Netty提供了一套完整的异常处理机制,使得在发生错误时能够优雅地恢复或关闭连接。 9. **...
9. **心跳与空闲检测**:Netty提供心跳机制和空闲检测,以保持连接的活跃状态并及时发现死连接。 10. **文档与社区支持**:Netty有详尽的官方文档和活跃的社区,为开发者提供强大的支持。 下载并解压“netty-netty...
6. 快速故障恢复:通过心跳检测和自动重连机制,提高了系统的稳定性和可靠性。 三、Netty核心组件 1. Channel:表示网络连接,负责数据的发送和接收。 2. EventLoop:事件循环,负责处理I/O事件和执行回调任务。 3...
除此之外,Netty还提供了强大的心跳检测机制,可以防止因网络延迟或故障导致的连接僵死。其优雅的关闭机制也能确保在系统关闭时,所有正在处理的连接都能得到妥善处理。 总之,《深入浅出Netty》会带你深入理解...
本实例主要探讨如何利用Java、Spring Boot和Netty来创建一个带有心跳机制的物联网系统,并提供完整的源代码供参考。 **Java在物联网中的角色** Java作为跨平台的编程语言,拥有广泛的应用场景,特别是在物联网(IoT)...
我们需要在Netty中实现心跳检测和回应机制,确保连接不会因长时间无数据交换而断开。 5. **异常处理**:在处理网络通信时,必须考虑各种可能出现的异常情况,如网络中断、数据解析错误等。Netty提供了`...
- **心跳机制**:Netty 可以轻松实现心跳检测,保证连接的有效性。 - **编解码**:例如,Netty 提供的 LengthFieldBasedFrameDecoder 可以解析带有长度字段的协议,适合即时通讯中的消息传输。 - **WebSocket ...
Netty的强大之处在于它能够轻松地处理自定义协议。通过示例,我们可以学习如何定义自己的消息结构,创建相应的编解码器,并将其整合进Netty的Pipeline中。 10. **异常处理** Netty提供了全面的异常处理机制,通过...
- **丰富的组件库**: 提供大量预定义的Handler,如解码器、编码器、心跳检测等,简化开发工作。 - **强大的错误处理**: 自动处理异常,提供优雅的故障恢复机制。 3. **Netty 4.1的新特性和改进** - **ByteBuf...
文档还会介绍如何进行连接管理、读写操作、心跳检测、流量控制等网络编程的关键技术。 在实际开发中,了解并掌握Netty的API和设计理念,不仅可以提高网络应用的性能,还能极大地提升开发效率,降低维护成本。因此,...
6. **心跳与空闲检测**:Netty 内置了心跳机制和空闲状态检测,可以防止网络连接长时间无交互导致的问题。 7. **强大的异常处理**:Netty 提供了统一的异常处理机制,可以优雅地处理网络通信过程中的各种异常。 8....
- **心跳机制**:学习如何在Netty中实现心跳检查,保持连接的活跃性。 - **异常处理**:理解Netty的异常处理机制,如何优雅地处理网络异常。 - **性能优化**:学习如何通过配置和调整参数来提升Netty应用的性能。 ...
8. **心跳机制**:Netty提供心跳检测功能,用于维持长连接的活跃性,防止因网络问题导致的连接断开。 9. **WebSocket和HTTP支持**:Netty提供了WebSocket和HTTP/HTTPS协议的支持,使得开发Web服务更加方便。 10. *...
5. **TCP和UDP通信**:Netty支持TCP和UDP协议,你可以找到处理TCP连接和UDP数据报的示例。 6. **SSL/TLS加密通信**:Netty提供了SSL支持,示例会展示如何在服务器和客户端之间建立安全的HTTPS连接。 7. **编解码器...