`
zhangwei_david
  • 浏览: 477285 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Netty通过心跳保持长链接

    博客分类:
  • Java
 
阅读更多

 

   Netty自带心跳检测功能,IdleStateHandler,客户端在写空闲时主动发起心跳请求,服务器接受到心跳请求后给出一个心跳响应。当客户端在一定时间范围内不能够给出响应则断开链接。

 

public class NettyClient {
	public void connect(String remoteServer, int port) throws Exception {
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			Bootstrap b = new Bootstrap();
			b.group(workerGroup).channel(NioSocketChannel.class).remoteAddress(remoteServer, port)
					.handler(new ChildChannelHandler());

			ChannelFuture f = b.connect();
			System.out.println("Netty time Client connected at port " + port);

			f.channel().closeFuture().sync();
		} finally {
			try {
				TimeUnit.SECONDS.sleep(5);
				try {
					System.out.println("重新链接。。。");
					connect(remoteServer, port);
				} catch (Exception e) {
					e.printStackTrace();
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}

	public static class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

		@Override
		protected void initChannel(final SocketChannel ch) throws Exception {
			// -8表示lengthAdjustment,让解码器从0开始截取字节,并且包含消息头
			ch.pipeline().addLast(new RpcEncoder(NettyMessage.class)).addLast(new RpcDecoder(NettyMessage.class))
					.addLast(new IdleStateHandler(120, 10, 0, TimeUnit.SECONDS)).addLast(new HeartBeatReqHandler());
		}

	}

	public static void main(String[] args) {
		try {
			new NettyClient().connect("127.0.0.1", 12000);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

 

public class SerializationUtil {

    private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();

    private static Objenesis                objenesis    = new ObjenesisStd(true);

    private static <T> Schema<T> getSchema(Class<T> clazz) {
        @SuppressWarnings("unchecked")
        Schema<T> schema = (Schema<T>) cachedSchema.get(clazz);
        if (schema == null) {
            schema = RuntimeSchema.getSchema(clazz);
            if (schema != null) {
                cachedSchema.put(clazz, schema);
            }
        }
        return schema;
    }

    /**
     * 序列化
     *
     * @param obj
     * @return
     */
    public static <T> byte[] serializer(T obj) {
        @SuppressWarnings("unchecked")
        Class<T> clazz = (Class<T>) obj.getClass();
        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
        try {
            Schema<T> schema = getSchema(clazz);
            byte result[] = ProtostuffIOUtil.toByteArray(obj, schema, buffer);
            return result;
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        } finally {
            buffer.clear();
        }
    }

    /**
     * 反序列化
     *
     * @param data
     * @param clazz
     * @return
     */
    public static <T> T deserializer(byte[] data, Class<T> clazz) {
        try {
            T obj = objenesis.newInstance(clazz);
            Schema<T> schema = getSchema(clazz);
            ProtostuffIOUtil.mergeFrom(data, obj, schema);
            return obj;
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }
}

 

@SuppressWarnings("rawtypes")
public class RpcEncoder extends MessageToByteEncoder {

    private Class<?> genericClass;

    public RpcEncoder(Class<?> genericClass) {
        this.genericClass = genericClass;
    }

    @Override
    public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
        if (genericClass.isInstance(in)) {
        	System.out.println("发送的请求是:"+in);
            byte[] data = SerializationUtil.serializer(in);
            out.writeInt(data.length);
            out.writeBytes(data);
        }
    }
}

 

public class RpcDecoder extends ByteToMessageDecoder {

    private Class<?> genericClass;

    public RpcDecoder(Class<?> genericClass) {
        this.genericClass = genericClass;
    }

    @Override
    public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
            throws Exception {
        if (in.readableBytes() < 4) {
            return;
        }
        in.markReaderIndex();
        int dataLength = in.readInt();
        if (dataLength < 0) {
            ctx.close();
        }
        if (in.readableBytes() < dataLength) {
            in.resetReaderIndex();
        }
		byte[] data = new byte[dataLength];
        in.readBytes(data);

        Object obj = SerializationUtil.deserializer(data, genericClass);
        System.out.println("接收到的消息是:"+obj);
        out.add(obj);
    }
}

 

public class HeartBeatReqHandler extends ChannelDuplexHandler {

	/**
	 * @see io.netty.channel.ChannelInboundHandlerAdapter#userEventTriggered(io.netty.channel.ChannelHandlerContext,
	 *      java.lang.Object)
	 */
	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
		if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
			IdleStateEvent event = (IdleStateEvent) evt;
			if (event.state() == IdleState.READER_IDLE) {
				System.out.println("read 空闲");
				ctx.disconnect();
			} else if (event.state() == IdleState.WRITER_IDLE) {
				System.out.println("write 空闲");
				ctx.writeAndFlush(buildHeartBeat(MessageType.HEARTBEAT_REQ.getType()));
			}
		}
	}

	/**
	 * 
	 * @return
	 * @author zhangwei<wei.zw@corp.netease.com>
	 */
	private NettyMessage buildHeartBeat(byte type) {
		NettyMessage msg = new NettyMessage();
		Header header = new Header();
		header.setType(type);
		msg.setHeader(header);
		return msg;
	}

}

 

public class NettyServer {
	public void bind(int port) throws Exception {
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			ServerBootstrap b = new ServerBootstrap();
			b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
					.childHandler(new ChildChannelHandler());

			ChannelFuture f = b.bind(port).sync();
			System.out.println("Netty time Server started at port " + port);
			f.channel().closeFuture().sync();
		} finally {
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}

	public static class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

		@Override
		protected void initChannel(final SocketChannel ch) throws Exception {
			ch.pipeline().addLast(new RpcDecoder(NettyMessage.class)).addLast(new RpcEncoder(NettyMessage.class))
					.addLast(new IdleStateHandler(120, 0, 0, TimeUnit.SECONDS)).addLast(new HeartBeatRespHandler());
		}

	}

	public static void main(String[] args) {
		try {
			new NettyServer().bind(12000);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

 

public enum MessageType {

	LOGIN_REQ((byte) 1), LOGIN_RESP((byte) 2), HEARTBEAT_REQ((byte) 3), HEARTBEAT_RESP((byte) 4);
	private byte type;

	/**
	 * @param type
	 */
	private MessageType(byte type) {
		this.type = type;
	}

	public byte getType() {
		return type;
	}

	public void setType(byte type) {
		this.type = type;
	}

	public static MessageType getMessageType(byte type) {
		for (MessageType b : MessageType.values()) {
			if (b.getType() == type) {
				return b;
			}
		}
		return null;
	}

}

 

public class HeartBeatRespHandler extends SimpleChannelInboundHandler<NettyMessage> {

	/**
	 * @see io.netty.channel.SimpleChannelInboundHandler#channelRead0(io.netty.channel.ChannelHandlerContext,
	 *      java.lang.Object)
	 */
	@Override
	protected void channelRead0(ChannelHandlerContext ctx, NettyMessage msg) throws Exception {
		if (msg.getHeader() != null && msg.getHeader().getType() == MessageType.HEARTBEAT_REQ.getType()) {
			NettyMessage heartBeat = buildHeartBeat(MessageType.HEARTBEAT_RESP.getType());
			ctx.writeAndFlush(heartBeat);
		} else {
			ctx.fireChannelRead(msg);
		}
	}
	

	/**
	 * @see io.netty.channel.ChannelInboundHandlerAdapter#userEventTriggered(io.netty.channel.ChannelHandlerContext,
	 *      java.lang.Object)
	 */
	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
		if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
			IdleStateEvent event = (IdleStateEvent) evt;
			if (event.state() == IdleState.READER_IDLE) {
				System.out.println("read 空闲 关闭链接");
				ctx.disconnect();	
			} 
		}
	}
	

	/**
	 * 
	 * @return
	 * @author zhangwei<wei.zw@corp.netease.com>
	 */
	private NettyMessage buildHeartBeat(byte type) {
		NettyMessage msg = new NettyMessage();
		Header header = new Header();
		header.setType(type);
		msg.setHeader(header);
		return msg;
	}

}

 

public class NettyMessage implements Serializable{
	
	/**  */
	private static final long serialVersionUID = 1L;

	private Header header;
	
	private Object body;

	public Header getHeader() {
		return header;
	}

	public void setHeader(Header header) {
		this.header = header;
	}

	public Object getBody() {
		return body;
	}

	public void setBody(Object body) {
		this.body = body;
	}

	/** 
	 * @see java.lang.Object#toString()
	 */
	@Override
	public String toString() {
		return "NettyMessage [header=" + header + ", body=" + body + "]";
	}
	
	
}

 

 

 

 

public class Header implements Serializable{
	/**  */
	private static final long serialVersionUID = 1L;
	private int crcCode=0xabef0101;
	private int length;
	private long sessionId;
	private byte type;
	private byte priority;
	private Map<String,Object> attachment=new HashMap<>();
	public int getCrcCode() {
		return crcCode;
	}
	public void setCrcCode(int crcCode) {
		this.crcCode = crcCode;
	}
	public int getLength() {
		return length;
	}
	public void setLength(int length) {
		this.length = length;
	}
	public long getSessionId() {
		return sessionId;
	}
	public void setSessionId(long sessionId) {
		this.sessionId = sessionId;
	}
	public byte getType() {
		return type;
	}
	public void setType(byte type) {
		this.type = type;
	}
	public byte getPriority() {
		return priority;
	}
	public void setPriority(byte priority) {
		this.priority = priority;
	}
	public Map<String, Object> getAttachment() {
		return attachment;
	}
	public void setAttachment(Map<String, Object> attachment) {
		this.attachment = attachment;
	}
	/** 
	 * @see java.lang.Object#toString()
	 */
	@Override
	public String toString() {
		return "Header [crcCode=" + crcCode + ", length=" + length + ", sessionId=" + sessionId + ", type=" + type
				+ ", priority=" + priority + ", attachment=" + attachment + "]";
	}
	
	
}

客户端的结果是:

etty time Client connected at port 12000
write 空闲
发送的请求是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=3, priority=0, attachment={}], body=null]
接收到的消息是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=4, priority=0, attachment={}], body=null]
write 空闲
发送的请求是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=3, priority=0, attachment={}], body=null]
接收到的消息是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=4, priority=0, attachment={}], body=null]
write 空闲
发送的请求是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=3, priority=0, attachment={}], body=null]
接收到的消息是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=4, priority=0, attachment={}], body=null]
write 空闲
发送的请求是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=3, priority=0, attachment={}], body=null]
接收到的消息是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=4, priority=0, attachment={}], body=null]
write 空闲
发送的请求是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=3, priority=0, attachment={}], body=null]
接收到的消息是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=4, priority=0, attachment={}], body=null]

 

 

1
4
分享到:
评论

相关推荐

    netty 实现长连接

    标题中的“netty 实现长连接”指的是使用Netty框架构建能够维持长时间连接的网络通信应用。Netty是一个高性能、异步事件驱动的网络应用程序框架,适用于开发服务器和客户端的高性能、高可用性协议库。在传统的HTTP或...

    Netty4长连接(服务端+客户端)

    本文将深入探讨Netty4在构建长连接、实现断开重连、心跳检测以及Msgpack编码解码方面的知识。 首先,我们要理解什么是长连接。在TCP/IP通信中,长连接是指在完成一次数据传输后,连接不被立即关闭,而是保持一段...

    spring整合netty心跳检测

    下面我们将深入探讨"spring整合netty心跳检测"这一主题。 首先,我们要理解心跳检测的基本概念。心跳检测是指在通信双方之间周期性地发送一个简单的消息,用来确认连接是否仍然有效,以及数据传输是否正常。这对于...

    netty基于http socket websocke及心跳包机制t的demo

    心跳包机制在网络通信中扮演着重要角色,特别是在长时间无数据交换或需要检测连接状态的场景下。在Netty中,心跳包可以是自定义的协议消息,服务器和客户端定期交换这些消息以确认连接的活跃性。如果在预设时间内...

    netty 心跳实现

    在分布式系统中,心跳机制是保持连接活性和检测网络异常的关键技术。本篇文章将深入探讨如何在 Netty 中实现心跳机制。 ### 1. Netty 心跳概念 心跳机制在网络通信中扮演着重要角色,它通过定期发送小型数据包或无...

    netty断线重连机制及心跳机制.rar

    心跳机制是网络通信中一种常见的保持连接活跃性的方法。通过定期发送小规模的数据包(心跳包),双方可以检测对方是否仍然在线,以及网络连接是否畅通。在 Netty 中,我们可以创建一个定时任务来周期性地发送心跳包...

    Netty实现长连接通讯-连接协议为了简单json封装

    为了实现长连接,我们需要配置`Bootstrap`实例,设置心跳机制以保持连接活跃。心跳消息可以在`MyBusinessHandler`中定时发送,当收到对端的心跳回应时,确认连接依然有效。 ```java public class MyBusinessHandler...

    netty学习demo(初学代码结构+固定消息+自定义分隔符+自定义协议+心跳+http+序列化压缩+自动断线)

    心跳机制是保持连接活跃的重要部分,防止因网络延迟或静默期导致的连接断开。你可以创建一个`IdleStateHandler`,当检测到连接长时间无数据交换时,发送心跳包。 HTTP 支持是Netty的一个强大特性,Netty提供了`...

    断网断电心跳检测

    本文将深入探讨“断网断电心跳检测”这一技术主题,以及它与Netty心跳检测的关联。 WebSocket协议允许服务器和客户端之间建立持久连接,允许双向数据传输,极大地简化了实时应用的开发。然而,当网络中断或设备断电...

    使用netty3建立的JT809处理基本工程

    4. **心跳机制**:为了保持连接的活跃性,JT809协议通常会包含心跳报文。我们需要在Netty中实现心跳检测和回应机制,确保连接不会因长时间无数据交换而断开。 5. **异常处理**:在处理网络通信时,必须考虑各种可能...

    Netty中实现多客户端连接与通信-以实现聊天室群聊功能为例示例代码.rar

    - 为了保持连接的活跃,可以添加心跳机制。服务器定期向客户端发送心跳包,客户端在收到心跳包后回应,反之亦然。如果在一定时间内未收到回应,则视为连接已断开。 5. **安全性** - 如果需要,可以通过添加SSL/...

    netty-socket实时在线人数统计

    在这个项目中,我们可以利用WebSocket来保持客户端与服务器之间的连接状态,从而实时统计在线人数。 **实现步骤** 1. **配置SpringBoot应用**:首先,我们需要创建一个SpringBoot项目,并添加相关的依赖,如Spring...

    netty+4G DTU

    3. **心跳机制**:为了保持连接活性,需要实现心跳包的发送和接收。可以创建一个专门的心跳 ChannelHandler 来处理心跳相关的逻辑。 4. **异常处理**:定义异常处理器,捕获并处理可能出现的网络异常,如连接断开、...

    基于springboot+netty实现的心跳检测源码+项目说明文档.zip

    基于springboot+netty实现的心跳检测源码+项目说明文档.zip ...(4),保持长连接 (5), childHandler()方法需要一个ChannelInitializer类,ChannelInitializer是一个特殊的处理程序,旨在帮助用户配置新的Channel

    netty-netty-4.1.69.Final.tar.gz

    9. **心跳与空闲检测**:Netty提供心跳机制和空闲检测,以保持连接的活跃状态并及时发现死连接。 10. **文档与社区支持**:Netty有详尽的官方文档和活跃的社区,为开发者提供强大的支持。 下载并解压“netty-netty...

    netty-netty-4.1.79.Final.tar.gz

    7. **心跳与保持连接**:Netty提供心跳机制和连接管理,可以自动检测并处理空闲连接和断开的连接,保持网络通信的稳定性。 8. **编码与解码器**:Netty的ChannelHandler接口和一系列编码解码器类,如...

    netty-4.1_javaNetty_netty_服务器_

    9. **心跳与保持连接**:了解如何在 Netty 中实现心跳机制,以检测连接是否断开,以及如何维持长连接。 10. **安全性**:研究 SSL/TLS 的配置和使用,以实现安全的网络通信。 通过学习和实践这些知识点,你可以...

    netty-websocket.zip

    2. **服务端配置心跳**:心跳机制是保持长连接稳定的关键。通过定时发送“心跳”信息,服务器可以检查客户端是否在线。这通常是通过发送一个无实际内容但能确认连接状态的小数据包来实现的。 3. **读写设置心跳时间...

    Netty实现前置系统

    在系统B中,我们可以利用Netty的ChannelHandler接口来定义业务逻辑,处理来自系统A的短连接请求,同时保持与系统C的长连接。ChannelHandlerContext对象用于在ChannelHandler之间传递消息,实现事件驱动的通信。 ...

    netty5.0官方自带的demo

    Netty提供了IdleStateHandler,它可以在连接长时间无数据交换时触发事件,从而保持连接活跃或关闭无响应的连接。 8. **SSL/TLS加密通信** Netty支持SSL/TLS协议,官方示例可能包含了如何启用HTTPS和WSS(WebSocket...

Global site tag (gtag.js) - Google Analytics