`

Netty的超时机制

 
阅读更多
原文地址:http://blog.163.com/linfenliang@126/blog/static/127857195201467112958326/


Netty超时机制学习

技术点描述

ReadTimeoutHandler读取数据超时处理
WriteTimeoutHandler写数据超时处理
IdleStateHandler状态空闲处理


通过以上三种方式,可以轻松实现SOCKET长连接的心跳包机制。

另要注意的是这里所说的超时是指逻辑上的超时,而非TCP连接超时。

实现方案

ReadTimeoutHandler,WriteTimeoutHandler, IdleStateHandler这三种方式都是通过在数据处理管道ChannelPipelineFactory的实现类里配置相关代码来实现。

一般使用根据需求使用一种方式即可。



它主要使用Timer定时器用自定义的时间,去定时检测相应的状态,检测到后会根据选择的处理方式,是报错或回调指定的接口实现类处理。



参考源码包



主要包

IdleStateHandler 空闲状态的处理代码


源码:

/**

* Creates a new instance.

*

* @param timer

* the Timer that is used to trigger the scheduled event.

* The recommended Timer implementation is HashedWheelTimer.

* @param readerIdleTimeSeconds

* an IdleStateEvent whose state is IdleState.READER_IDLE

* will be triggered when no read was performed for the specified

* period of time. Specify 0 to disable.

* @param writerIdleTimeSeconds

* an IdleStateEvent whose state is IdleState.WRITER_IDLE

* will be triggered when no write was performed for the specified

* period of time. Specify {@code 0} to disable.

* @param allIdleTimeSeconds

* an IdleStateEvent whose state is IdleState.ALL_IDLE

* will be triggered when neither read nor write was performed

* for the specified period of time. Specify 0 to disable.

*/

public IdleStateHandler(

Timer timer,

int readerIdleTimeSeconds,

int writerIdleTimeSeconds,

int allIdleTimeSeconds) {



this(timer, readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds, TimeUnit.SECONDS);

}



ReadTimeoutHandler 读取数据超时的处理代码


源码:

/**

* Creates a new instance.

*

* @param timer

* the Timer that is used to trigger the scheduled event.

* The recommended Timer implementation is HashedWheelTimer.

* @param timeoutSeconds

* read timeout in seconds

*/

public ReadTimeoutHandler(Timer timer, int timeoutSeconds) {

this(timer, timeoutSeconds, TimeUnit.SECONDS);

}



WriteTimeoutHandler 写数据超时的处理代码


源码:

/**

* Creates a new instance.

*

* @param timer

* the Timer that is used to trigger the scheduled event.

* The recommended Timer implementation is HashedWheelTimer.

* @param timeoutSeconds

* write timeout in seconds

*/

public WriteTimeoutHandler(Timer timer, int timeoutSeconds) {

this(timer, timeoutSeconds, TimeUnit.SECONDS);

}



辅助包

IdleStateEvent 空闲事件的接口类


源码:

/**

* A ChannelEvent that is triggered when a Channel has been idle

* for a while.

* @apiviz.landmark

* @apiviz.has org.jboss.netty.handler.timeout.IdleState oneway - -

*/

public interface IdleStateEvent extends ChannelEvent {

/**

* Returns the detailed idle state.

*/

IdleState getState();



/**

* Returns the last time when I/O occurred in milliseconds.

*/

long getLastActivityTimeMillis();

}



DefaultIdleStateEvent空闲事件的实现类
IdleStateAwareChannelHandler 处理空闲事件的接口.


源码:

/**

* An extended SimpleChannelHandler that adds the handler method for

* an IdleStateEvent.

* @apiviz.uses org.jboss.netty.handler.timeout.IdleStateEvent

*/

public class IdleStateAwareChannelHandler extends SimpleChannelHandler {



/**

* Creates a new instance.

*/

public IdleStateAwareChannelHandler() {

super();

}



@Override

public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {

if (e instanceof IdleStateEvent) {

channelIdle(ctx, (IdleStateEvent) e);

} else {

super.handleUpstream(ctx, e);

}

}



/**

* Invoked when a Channel has been idle for a while.

*/

public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) throws Exception {

ctx.sendUpstream(e);

}

}



IdleStateAwareChannelUpstreamHandler处理空闲事件的接口.
IdleState 定义了三种空闲状态的枚举类


源码:

/**

* An Enum that represents the idle state of a Channel.

*/

public enum IdleState {

/**

* No data was received for a while.

*/

READER_IDLE,

/**

* No data was sent for a while.

*/

WRITER_IDLE,

/**

* No data was either received or sent for a while.

*/

ALL_IDLE;

}



ReadTimeoutException 读取超时异常类
WriteTimeoutException 写数据超时异常类
TimeoutException 读取数据和写数据异常类的父类


Demo实现

以下是关于三种方式的关键代码实现讲解。



IdleStateHandler



//第一步,在服务启动类中设置数据处理管道.

...

bootstrap.setPipelineFactory(new TCPServerPipelineFactory());

...



//第二步,在数据处理管道实现类里配置空闲状态处理代码.

public class TCPServerPipelineFactory implements ChannelPipelineFactory {

   

    @Override

    public ChannelPipeline getPipeline() throws Exception {

        // Create a default pipeline implementation.

        ChannelPipeline pipeline = Channels.pipeline();

        //设置空闲状态处理操作

pipeline.addLast("idlehandler", new IdleStateHandler(new HashedWheelTimer(), 10, 5 , 0));



pipeline.addLast("hearbeat", new Heartbeat());

        pipeline.addLast("handler", new TCPServerHandler());

        return pipeline;

    }



//第三步,实现方式1:自定义IdleStateAwareChannelHandler的实现类,

//当Channel处理空闲状态时,会触发此方法.

public class Heartbeat extends IdleStateAwareChannelHandler {

   

    @Override

public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) throws Exception {

        super.channelIdle(ctx, e);       

        if (e.getState() == IdleState.READER_IDLE){

             byte[] test = " ac0bce0490007050006".getBytes();

ChannelBuffer channelBuffer = ChannelBuffers.buffer(test.length);

            channelBuffer.writeBytes(upData);

            //发送超时数据到终端.

            e.getChannel().write(channelBuffer);

        }

}



//第三步, 实现方式2:在扩展SimpleChannelHandler的handler类里,如下操作:

//记得注释掉第二步中的代码pipeline.addLast("hearbeat", new Heartbeat());

@Override

public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {

if (e instanceof IdleStateEvent) {

     IdleStateEvent ise = (IdleStateEvent)e;

     if (ise.getState() == IdleState.READER_IDLE){

     byte[] test = "超时 ...".getBytes();

ChannelBuffer channelBuffer = ChannelBuffers.buffer(test.length);

     channelBuffer.writeBytes(test);

         //发送超时数据到终端.

     ctx.getChannel().write(channelBuffer);

     }   

}

super.handleUpstream(ctx, e);

}



ReadTimeoutHandler



//第一步,在服务启动类中设置数据处理管道.

...

bootstrap.setPipelineFactory(new TCPServerPipelineFactory());

...



//第二步,在数据处理管道实现类里配置空闲状态处理代码.

public class TCPServerPipelineFactory implements ChannelPipelineFactory {

   

    @Override

    public ChannelPipeline getPipeline() throws Exception {

        // Create a default pipeline implementation.

        ChannelPipeline pipeline = Channels.pipeline();

        //设置读取数据超时处理

pipeline.addLast("readTimeOut",new ReadTimeoutHandler(new HashedWheelTimer(),10));

        pipeline.addLast("handler", new TCPServerHandler());

        return pipeline;

    }



//第三步, 在扩展SimpleChannelHandler的handler类里,如下操作:



@Override

public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {

    //对读取超时异常进行判断

    if (e.getCause() instanceof ReadTimeoutException) {

     byte[] test = "超时 ...".getBytes();

ChannelBuffer channelBuffer = ChannelBuffers.buffer(test.length);

     channelBuffer.writeBytes(test);

     ctx.getChannel().write(channelBuffer);

    } else {

logger.log(Level.WARN, "Unexpected exception from downstream.",e.getCause());

    }

}

WriteTimeoutHandler

略,原理同ReadTimeoutHandler。
分享到:
评论

相关推荐

    Netty 超时_心跳机制_断线重连 demo

    配置maven添加io.netty 在com.zhao的包下的文件,可以自行修改使用

    netty4中文用户手册

    特别地,手册还介绍了Netty实现聊天功能和WebSocket聊天功能的方法,以及Netty超时机制和心跳程序的实现。这些功能的实现案例,为用户提供了实用的网络通信应用的构建框架。 用户手册中还提到了Netty的另一个重要...

    Netty 4.x User Guide 中文翻译《Netty 4.x 用户指南》

    最后,用户指南中还会介绍Netty的其他高级特性,如实现WebSocket聊天功能,以及Netty超时机制和心跳程序的实现。这些高级特性对于构建复杂的网络应用程序至关重要。 总之,Netty用户指南通过全面的介绍和实例演示,...

    netty4.X用户指南.docx

    【Netty 超时机制及心跳程序实现】(1.5.1-1.5.3)讲解了如何处理连接超时和保持连接活跃的心跳机制,这对于长连接服务至关重要。 最后,【Architectural Overview】部分总结了Netty的整体架构,帮助读者从宏观角度...

    netty断线重连机制及心跳机制.rar

    在本文中,我们将深入探讨 Netty 的断线重连机制和心跳机制,这两个特性对于维持稳定可靠的网络通信至关重要。 首先,让我们了解**断线重连机制**。在分布式系统中,网络连接可能会因为各种原因中断,如网络抖动、...

    netty的timeout

    在本文中,我们将深入探讨 Netty 的超时机制,这是在网络编程中非常重要的一个概念,因为它能够有效地处理连接延迟或无响应的情况。 在 Netty 中,超时的概念通常与 ChannelHandler 和 ChannelHandlerContext 关联...

    netty基于http socket websocke及心跳包机制t的demo

    3. 使用Netty的ChannelHandlerContext发送心跳包,并设置相应的超时检查和处理机制。 4. 如何处理Socket连接,发送和接收自定义协议的数据。 5. 示例代码展示如何在Java中编写和运行Netty应用。 这个demo将是一个很...

    基于Java+netty内置时间轮工具处理大批量定时或超时任务工具源码.zip

    在这个项目中,我们关注的是如何利用Netty内置的时间轮(TimeWheel)工具来处理大批量的定时或超时任务。 时间轮是一种高效的数据结构,常用于实现定时器和延迟队列。它是由一系列的槽(Bucket)组成,每个槽代表一...

    netty+websocket实现心跳和断线重连

    在 Netty 中,我们还需要处理各种可能出现的异常,如连接超时、网络中断等。这些异常可以通过实现 `ExceptionCaughtHandler` 并添加到 `ChannelPipeline` 中来捕获和处理。 6. **优化与性能** 为了提高性能和稳定...

    Netty3.x 源码解析

    此外,还需要对TCP连接进行管理,包括处理连接池、选择可用连接、连接超时和关闭机制等。这些任务的实现往往复杂且容易出错。Netty的出现,通过封装底层的网络操作和多线程细节,使得开发者能够更加专注于业务逻辑的...

    springboot整合netty的demo

    对于性能优化,可以调整Netty的线程模型、缓冲区大小、心跳机制等参数。 总的来说,SpringBoot整合Netty的实践涉及到SpringBoot的应用构建、Netty的服务器和客户端创建、响应式编程模型、以及两者的交互和配置优化...

    用netty实现文件传输

    - **心跳机制**:保持连接活跃,防止因长时间无数据交换导致的连接超时断开。 4. **Netty 与大文件传输** - **零拷贝**:Netty 利用 NIO 的 FileChannel.transferTo 方法,实现了从文件到网络的直接传输,减少了...

    netty+4G DTU

    4. **异常处理**:定义异常处理器,捕获并处理可能出现的网络异常,如连接断开、超时等。 5. **数据处理**:根据业务需求,解析接收到的数据并进行相应的业务逻辑处理,如存储数据、触发报警、执行远程控制等。 6....

    netty权威指南 第二版 李林锋pdf

    这本书针对Netty5版本的源码进行了详尽的解读,旨在帮助开发者深入理解Netty的设计原理和实现机制。 1. **Netty的基本概念**:Netty是一个基于NIO(非阻塞I/O)的Java框架,它简化了网络编程,提供了线程模型、缓冲...

    NettyRPC-master

    6. **异常处理和重试机制**: 网络通信中难免会出现异常,如网络抖动、超时等。NettyRPC应包含错误处理和重试策略,以确保服务的稳定性和健壮性。 7. **线程模型**: Netty的EventLoop线程模型可以有效处理大量并发...

    Netty HelloWorld + HeartBeat Demo

    在这个“Netty HelloWorld + HeartBeat Demo”中,我们将深入理解Netty的基本用法以及如何实现心跳机制。 首先,让我们从Netty HelloWorld Demo开始。这个Demo通常用于展示如何使用Netty创建一个简单的TCP服务器和...

    Spring Boot 整合 Netty + WebSocket 实时消息推送

    Spring Security可以与WebSocket整合,提供基于JWT或OAuth的认证机制,确保只有经过验证的用户才能访问WebSocket端点。 6. **前端实现**:在Web端,我们可以使用JavaScript的WebSocket API来建立连接,并使用Stomp...

    NettySocket同步数据获取实现

    心跳检测是确保网络连接活性的重要机制。在Netty中,我们可以在处理器Pipeline中添加一个HeartbeatHandler。每当达到预设的时间间隔,该处理器就会向客户端发送心跳包。如果服务器未在规定时间内收到回应,可以认为...

    Netty UDP协议网络打洞实例

    5. **心跳维持**: 为了保持NAT映射的有效性,通常需要定期发送心跳包,确保通信通道不被NAT超时关闭。 6. **异常处理**: 设计适当的异常处理机制,例如重试、断线重连等,以增强系统稳定性。 在实际应用中,可能还...

    Netty权威指南 (第2版)

    - 长连接管理:心跳检测和超时策略 6. **Netty实战应用** - 构建Web服务器和客户端 - 实现聊天室和消息推送系统 - 客户端和服务端的负载均衡和故障恢复 - 高并发场景下的性能测试和调优 7. **Netty与其他技术...

Global site tag (gtag.js) - Google Analytics