`
nicegege
  • 浏览: 591071 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

netty实现tcp长连接和心跳检测

 
阅读更多

       通过netty实现服务端与客户端的长连接通讯,及心跳检测。

       基本思路:netty服务端通过一个Map保存所有连接上来的客户端SocketChannel,客户端的Id作为Map的key。每次服务器端如果要向某个客户端发送消息,只需根据ClientId取出对应的SocketChannel,往里面写入message即可。心跳检测通过IdleEvent事件,定时向服务端放送Ping消息,检测SocketChannel是否终断。

        环境JDK1.8 和netty5

        以下是具体的代码实现和介绍:

1公共的Share部分(主要包含消息协议类型的定义)

设计消息类型:

public enum  MsgType {
    PING,ASK,REPLY,LOGIN
}

 Message基类:

 

//必须实现序列,serialVersionUID 一定要有,否者在netty消息序列化反序列化会有问题,接收不到消息!!!
public abstract class BaseMsg  implements Serializable {
    private static final long serialVersionUID = 1L;
    private MsgType type;
    //必须唯一,否者会出现channel调用混乱
    private String clientId;
 
    //初始化客户端id
    public BaseMsg() {
        this.clientId = Constants.getClientId();
    }
 
    public String getClientId() {
        return clientId;
    }
 
    public void setClientId(String clientId) {
        this.clientId = clientId;
    }
 
    public MsgType getType() {
        return type;
    }
 
    public void setType(MsgType type) {
        this.type = type;
    }
}

 常量设置:

 

public class Constants {
    private static String clientId;
    public static String getClientId() {
        return clientId;
    }
    public static void setClientId(String clientId) {
        Constants.clientId = clientId;
    }
}

 登录类型消息:

 

public class LoginMsg extends BaseMsg {
    private String userName;
    private String password;
    public LoginMsg() {
        super();
        setType(MsgType.LOGIN);
    }
 
    public String getUserName() {
        return userName;
    }
 
    public void setUserName(String userName) {
        this.userName = userName;
    }
 
    public String getPassword() {
        return password;
    }
 
    public void setPassword(String password) {
        this.password = password;
    }
}

 心跳检测Ping类型消息:

public class PingMsg extends BaseMsg {
    public PingMsg() {
        super();
        setType(MsgType.PING);
    }
}

 请求类型消息:

public class AskMsg extends BaseMsg {
    public AskMsg() {
        super();
        setType(MsgType.ASK);
    }
    private AskParams params;
 
    public AskParams getParams() {
        return params;
    }
 
    public void setParams(AskParams params) {
        this.params = params;
    }
}
//请求类型参数
//必须实现序列化接口
public class AskParams implements Serializable {
    private static final long serialVersionUID = 1L;
    private String auth;
 
    public String getAuth() {
        return auth;
    }
 
    public void setAuth(String auth) {
        this.auth = auth;
    }
}

 响应类型消息:

public class ReplyMsg extends BaseMsg {
    public ReplyMsg() {
        super();
        setType(MsgType.REPLY);
    }
    private ReplyBody body;
 
    public ReplyBody getBody() {
        return body;
    }
 
    public void setBody(ReplyBody body) {
        this.body = body;
    }
}
//相应类型body对像
public class ReplyBody implements Serializable {
    private static final long serialVersionUID = 1L;
}
public class ReplyClientBody extends ReplyBody {
    private String clientInfo;
 
    public ReplyClientBody(String clientInfo) {
        this.clientInfo = clientInfo;
    }
 
    public String getClientInfo() {
        return clientInfo;
    }
 
    public void setClientInfo(String clientInfo) {
        this.clientInfo = clientInfo;
    }
}
public class ReplyServerBody extends ReplyBody {
    private String serverInfo;
    public ReplyServerBody(String serverInfo) {
        this.serverInfo = serverInfo;
    }
    public String getServerInfo() {
        return serverInfo;
    }
    public void setServerInfo(String serverInfo) {
        this.serverInfo = serverInfo;
    }
}

 2 Server端:主要包含对SocketChannel引用的Map,ChannelHandler的实现和Bootstrap.

Map:

public class NettyChannelMap {
    private static Map<String,SocketChannel> map=new ConcurrentHashMap<String, SocketChannel>();
    public static void add(String clientId,SocketChannel socketChannel){
        map.put(clientId,socketChannel);
    }
    public static Channel get(String clientId){
       return map.get(clientId);
    }
    public static void remove(SocketChannel socketChannel){
        for (Map.Entry entry:map.entrySet()){
            if (entry.getValue()==socketChannel){
                map.remove(entry.getKey());
            }
        }
    }
 
}

 Handler

public class NettyServerHandler extends SimpleChannelInboundHandler<BaseMsg> {
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        //channel失效,从Map中移除
        NettyChannelMap.remove((SocketChannel)ctx.channel());
    }
    @Override
    protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception {
 
        if(MsgType.LOGIN.equals(baseMsg.getType())){
            LoginMsg loginMsg=(LoginMsg)baseMsg;
            if("robin".equals(loginMsg.getUserName())&&"yao".equals(loginMsg.getPassword())){
                //登录成功,把channel存到服务端的map中
                NettyChannelMap.add(loginMsg.getClientId(),(SocketChannel)channelHandlerContext.channel());
                System.out.println("client"+loginMsg.getClientId()+" 登录成功");
            }
        }else{
            if(NettyChannelMap.get(baseMsg.getClientId())==null){
                    //说明未登录,或者连接断了,服务器向客户端发起登录请求,让客户端重新登录
                    LoginMsg loginMsg=new LoginMsg();
                    channelHandlerContext.channel().writeAndFlush(loginMsg);
            }
        }
        switch (baseMsg.getType()){
            case PING:{
                PingMsg pingMsg=(PingMsg)baseMsg;
                PingMsg replyPing=new PingMsg();
                NettyChannelMap.get(pingMsg.getClientId()).writeAndFlush(replyPing);
            }break;
            case ASK:{
                //收到客户端的请求
                AskMsg askMsg=(AskMsg)baseMsg;
                if("authToken".equals(askMsg.getParams().getAuth())){
                    ReplyServerBody replyBody=new ReplyServerBody("server info $$$$ !!!");
                    ReplyMsg replyMsg=new ReplyMsg();
                    replyMsg.setBody(replyBody);
                    NettyChannelMap.get(askMsg.getClientId()).writeAndFlush(replyMsg);
                }
            }break;
            case REPLY:{
                //收到客户端回复
                ReplyMsg replyMsg=(ReplyMsg)baseMsg;
                ReplyClientBody clientBody=(ReplyClientBody)replyMsg.getBody();
                System.out.println("receive client msg: "+clientBody.getClientInfo());
            }break;
            default:break;
        }
        ReferenceCountUtil.release(baseMsg);
    }
}

 ServerBootstrap:

public class NettyServerBootstrap {
    private int port;
    private SocketChannel socketChannel;
    public NettyServerBootstrap(int port) throws InterruptedException {
        this.port = port;
        bind();
    }
 
    private void bind() throws InterruptedException {
        EventLoopGroup boss=new NioEventLoopGroup();
        EventLoopGroup worker=new NioEventLoopGroup();
        ServerBootstrap bootstrap=new ServerBootstrap();
        bootstrap.group(boss,worker);
        bootstrap.channel(NioServerSocketChannel.class);
        bootstrap.option(ChannelOption.SO_BACKLOG, 128);
        //通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去
        bootstrap.option(ChannelOption.TCP_NODELAY, true);
        //保持长连接状态
        bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                ChannelPipeline p = socketChannel.pipeline();
                p.addLast(new ObjectEncoder());
                p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
                p.addLast(new NettyServerHandler());
            }
        });
        ChannelFuture f= bootstrap.bind(port).sync();
        if(f.isSuccess()){
            System.out.println("server start---------------");
        }
    }
    public static void main(String []args) throws InterruptedException {
        NettyServerBootstrap bootstrap=new NettyServerBootstrap(9999);
        while (true){
            SocketChannel channel=(SocketChannel)NettyChannelMap.get("001");
            if(channel!=null){
                AskMsg askMsg=new AskMsg();
                channel.writeAndFlush(askMsg);
            }
            TimeUnit.SECONDS.sleep(5);
        }
    }
}

 3 Client端:包含发起登录,发送心跳,及对应消息处理

handler

public class NettyClientHandler extends SimpleChannelInboundHandler<BaseMsg> {
    //利用写空闲发送心跳检测消息
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            switch (e.state()) {
                case WRITER_IDLE:
                    PingMsg pingMsg=new PingMsg();
                    ctx.writeAndFlush(pingMsg);
                    System.out.println("send ping to server----------");
                    break;
                default:
                    break;
            }
        }
    }
    @Override
    protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception {
        MsgType msgType=baseMsg.getType();
        switch (msgType){
            case LOGIN:{
                //向服务器发起登录
                LoginMsg loginMsg=new LoginMsg();
                loginMsg.setPassword("yao");
                loginMsg.setUserName("robin");
                channelHandlerContext.writeAndFlush(loginMsg);
            }break;
            case PING:{
                System.out.println("receive ping from server----------");
            }break;
            case ASK:{
                ReplyClientBody replyClientBody=new ReplyClientBody("client info **** !!!");
                ReplyMsg replyMsg=new ReplyMsg();
                replyMsg.setBody(replyClientBody);
                channelHandlerContext.writeAndFlush(replyMsg);
            }break;
            case REPLY:{
                ReplyMsg replyMsg=(ReplyMsg)baseMsg;
                ReplyServerBody replyServerBody=(ReplyServerBody)replyMsg.getBody();
                System.out.println("receive client msg: "+replyServerBody.getServerInfo());
            }
            default:break;
        }
        ReferenceCountUtil.release(msgType);
    }
}

 bootstrap

public class NettyClientBootstrap {
    private int port;
    private String host;
    private SocketChannel socketChannel;
    private static final EventExecutorGroup group = new DefaultEventExecutorGroup(20);
    public NettyClientBootstrap(int port, String host) throws InterruptedException {
        this.port = port;
        this.host = host;
        start();
    }
    private void start() throws InterruptedException {
        EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
        Bootstrap bootstrap=new Bootstrap();
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
        bootstrap.group(eventLoopGroup);
        bootstrap.remoteAddress(host,port);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new IdleStateHandler(20,10,0));
                socketChannel.pipeline().addLast(new ObjectEncoder());
                socketChannel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
                socketChannel.pipeline().addLast(new NettyClientHandler());
            }
        });
        ChannelFuture future =bootstrap.connect(host,port).sync();
        if (future.isSuccess()) {
            socketChannel = (SocketChannel)future.channel();
            System.out.println("connect server  成功---------");
        }
    }
    public static void main(String[]args) throws InterruptedException {
        Constants.setClientId("001");
        NettyClientBootstrap bootstrap=new NettyClientBootstrap(9999,"localhost");
 
        LoginMsg loginMsg=new LoginMsg();
        loginMsg.setPassword("yao");
        loginMsg.setUserName("robin");
        bootstrap.socketChannel.writeAndFlush(loginMsg);
        while (true){
            TimeUnit.SECONDS.sleep(3);
            AskMsg askMsg=new AskMsg();
            AskParams askParams=new AskParams();
            askParams.setAuth("authToken");
            askMsg.setParams(askParams);
            bootstrap.socketChannel.writeAndFlush(askMsg);
        }
    }
}

具体的例子和相应pom.xml 见 https://github.com/WangErXiao/ServerClient

转发请注明来源:http://my.oschina.net/robinyao/blog/399060

总结:

java实现tcp的序列化和反序列化的时候,最好使用json字符串传递数据。这样确保客户端反序列化成功。如果客户端和服务端都是使用java实现,则使用对象序列化和反序列化可以的。如果客户端使用非java语言实现的,不能使用java的对象序列化和反序列化。最好使用字符串传递数据。推荐使用json字符串。

 

分享到:
评论

相关推荐

    netty 实现长连接

    2. **Netty中的Channel和EventLoop**:Netty使用NIO(非阻塞I/O)模型,Channel代表一个连接,EventLoop负责处理I/O事件,它们是实现长连接的关键组件。 3. **心跳机制**:为了检测长连接是否依然有效,通常会引入...

    Netty4长连接(服务端+客户端)

    在Netty4中,通过Channel和ChannelHandlerContext实现长连接,它们是Netty的核心组件,Channel代表网络连接,而ChannelHandlerContext则用于处理与该连接相关的事件。 断开重连机制是确保网络通信稳定的关键。在...

    netty+websocket实现心跳和断线重连

    在本文中,我们将深入探讨如何利用 Netty 和 WebSocket 实现心跳检测和断线重连机制。 首先,我们需要理解 WebSocket 协议。WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,它为客户端和服务器提供了低...

    springboot netty4 及时通讯(带心跳检测,zookeeper)

    本文将深入探讨如何使用SpringBoot整合Netty4实现即时通讯,并结合Zookeeper进行服务发现与管理,同时包含心跳检测和下线重连机制。 **一、SpringBoot与Netty集成** SpringBoot以其简化Spring应用初始搭建以及开发...

    netty基于http socket websocke及心跳包机制t的demo

    WebSocket是一种在单个TCP连接上进行全双工通信的协议,它极大地简化了浏览器和服务器之间的数据传输。Netty提供了WebSocket Server和WebSocket Client的实现,使得开发者可以方便地创建WebSocket服务端和客户端应用...

    基于springboot+netty实现的心跳检测-源码

    在心跳检测中,Netty提供了低级的网络通信能力,如TCP/IP连接的建立、管理,以及数据传输的封装。 3. **心跳检测机制** 心跳检测是网络通信中的一个重要组成部分,主要用于确认网络连接是否畅通。在Spring Boot和...

    netty通信框架tcp+oracle

    - **心跳机制**: 实现心跳检测,保持TCP连接活跃,检测并处理断开的连接。 通过以上步骤,Netty 可以高效地处理TCP连接,并将接收到的实时数据安全地保存到Oracle数据库中。同时,通过优化和错误处理策略,确保了...

    jt808-tcp-netty_nettyjt808_jt808_

    这个项目"jt808-tcp-netty"显然是一个基于Java的实现,利用Netty框架来处理JT808协议的TCP通信。 Netty是一个高性能、异步事件驱动的网络应用框架,适用于开发可伸缩且高度并发的服务器和客户端。在JT808协议的实现...

    netty通信以及心跳demo

    心跳机制在长连接通信中非常重要,用于检测网络连接的健康状态,防止因网络延迟或故障导致的连接失效。Netty 中实现心跳通常有以下步骤: 1. **定义心跳消息**:创建一个特定的 ByteBuf 或其他消息类型作为心跳包。...

    自己学习netty的笔记和Demo

    7. **心跳检测和空闲连接处理**:笔记-7讲述了如何在Netty中实现心跳检测,以检查远程连接的活性,防止死连接占用资源。同时,还可能涉及超时设置和连接关闭策略。 8. **客户端断线重连**:在实际网络环境中,...

    netty实现微信聊天.zip

    在Netty中,这可以通过使用心跳机制、分包和粘包处理以及状态管理来实现。 1. 心跳机制:为了检测连接是否存活,我们可以定期发送心跳包。如果一段时间内没有收到对方的心跳回应,就可以认为连接已经断开,从而进行...

    使用netty3建立的JT809处理基本工程

    我们需要在Netty中实现心跳检测和回应机制,确保连接不会因长时间无数据交换而断开。 5. **异常处理**:在处理网络通信时,必须考虑各种可能出现的异常情况,如网络中断、数据解析错误等。Netty提供了`...

    面试官:Netty心跳检测机制是什么,怎么自定义检测间隔时间?.doc

    在长连接场景下,保持连接的稳定性至关重要,这就涉及到了心跳检测机制。 心跳检测机制是网络通信中确保连接有效性的常见策略。它通过在客户端和服务端之间周期性地发送特定的消息(通常称为“心跳包”)来确认双方...

    基于netty实现采用自定义协议方式通讯,同时支持心跳机制和重连机制

    协议的编码、解码工作由`MyMessageEncoder`,`MyMessageDecoder`两个类完成,在tcp传输过程中的拆、粘包问题使用`LengthFieldBasedFrameDecoder`类解决。 #### 心跳机制 - 服务端采用 `IdleStateHandler`,在一段...

    netty5实现的socket服务器

    在 Netty 中,我们可以通过保持 Channel 活跃状态来实现长连接。当客户端无数据传输时,服务器端可以发送心跳包以检测连接是否仍然有效。 在实际项目中,还需要考虑如异常处理、资源释放、连接管理和性能优化等多...

    xunyu_cmpp:cmpp+netty实现网关系统用来收发短(彩)信

    4. 心跳机制:利用Netty的IdleStateHandler,可以自动实现心跳检测,防止连接超时。 三、网关系统架构设计 1. 连接管理模块:负责与SCP建立和维护TCP连接,处理心跳包,以及异常恢复机制。 2. 消息处理模块:接收...

    自用Android长连接

    "自用Android长连接"项目涉及的是如何在Android平台上实现一个具备心跳检测功能的长连接服务。下面将详细阐述相关知识点。 一、Android Socket长连接 Socket是网络编程的基础,它提供了进程间通过网络进行通信的...

    深入浅出Netty_netty_

    除此之外,Netty还提供了强大的心跳检测机制,可以防止因网络延迟或故障导致的连接僵死。其优雅的关闭机制也能确保在系统关闭时,所有正在处理的连接都能得到妥善处理。 总之,《深入浅出Netty》会带你深入理解...

    Netty HelloWorld + HeartBeat Demo

    心跳机制在长连接中至关重要,它确保了连接的活跃性和可靠性。在Netty中,我们可以自定义心跳包并实现相应的发送和处理逻辑。 1. **Heartbeat Frame**:定义一个特殊的数据帧,例如空消息或特定标识符,用来表示...

    netty+4G DTU

    Netty 提供了丰富的 ChannelHandler 接口,开发者可以根据需求自定义数据的编码解码、连接管理、心跳检测等功能。 以下是使用 Netty 开发 IoT 应用的关键步骤: 1. **配置 Netty 服务器**:创建 ServerBootstrap,...

Global site tag (gtag.js) - Google Analytics