`
liujiekasini0312
  • 浏览: 147311 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

netty长连接实例

 
阅读更多

通过netty实现服务端与客户端的长连接通讯,及心跳检测。

基本思路:netty服务端通过一个Map保存所有连接上来的客户端SocketChannel,客户端的Id作为Map的key。每次服务器端如果要向某个客户端发送消息,只需根据ClientId取出对应的SocketChannel,往里面写入message即可。心跳检测通过IdleEvent事件,定时向服务端放送Ping消息,检测SocketChannel是否终断。

环境JDK1.8 和netty5

以下是具体的代码实现和介绍:

1公共的Share部分(主要包含消息协议类型的定义)

设计消息类型:

?
1
2
3
publicenumMsgType{
PING,ASK,REPLY,LOGIN
}

Message基类:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//必须实现序列,serialVersionUID一定要有,否者在netty消息序列化反序列化会有问题,接收不到消息!!!
publicabstractclassBaseMsgimplementsSerializable{
privatestaticfinallongserialVersionUID=1L;
privateMsgTypetype;
//必须唯一,否者会出现channel调用混乱
privateStringclientId;
//初始化客户端id
publicBaseMsg(){
this.clientId=Constants.getClientId();
}
publicStringgetClientId(){
returnclientId;
}
publicvoidsetClientId(StringclientId){
this.clientId=clientId;
}
publicMsgTypegetType(){
returntype;
}
publicvoidsetType(MsgTypetype){
this.type=type;
}
}

常量设置:

?
1
2
3
4
5
6
7
8
9
publicclassConstants{
privatestaticStringclientId;
publicstaticStringgetClientId(){
returnclientId;
}
publicstaticvoidsetClientId(StringclientId){
Constants.clientId=clientId;
}
}

登录类型消息:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
publicclassLoginMsgextendsBaseMsg{
privateStringuserName;
privateStringpassword;
publicLoginMsg(){
super();
setType(MsgType.LOGIN);
}
publicStringgetUserName(){
returnuserName;
}
publicvoidsetUserName(StringuserName){
this.userName=userName;
}
publicStringgetPassword(){
returnpassword;
}
publicvoidsetPassword(Stringpassword){
this.password=password;
}
}

心跳检测Ping类型消息:

?
1
2
3
4
5
6
publicclassPingMsgextendsBaseMsg{
publicPingMsg(){
super();
setType(MsgType.PING);
}
}

请求类型消息:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
publicclassAskMsgextendsBaseMsg{
publicAskMsg(){
super();
setType(MsgType.ASK);
}
privateAskParamsparams;
publicAskParamsgetParams(){
returnparams;
}
publicvoidsetParams(AskParamsparams){
this.params=params;
}
}
//请求类型参数
//必须实现序列化接口
publicclassAskParamsimplementsSerializable{
privatestaticfinallongserialVersionUID=1L;
privateStringauth;
publicStringgetAuth(){
returnauth;
}
publicvoidsetAuth(Stringauth){
this.auth=auth;
}
}

响应类型消息:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
publicclassReplyMsgextendsBaseMsg{
publicReplyMsg(){
super();
setType(MsgType.REPLY);
}
privateReplyBodybody;
publicReplyBodygetBody(){
returnbody;
}
publicvoidsetBody(ReplyBodybody){
this.body=body;
}
}
//相应类型body对像
publicclassReplyBodyimplementsSerializable{
privatestaticfinallongserialVersionUID=1L;
}
publicclassReplyClientBodyextendsReplyBody{
privateStringclientInfo;
publicReplyClientBody(StringclientInfo){
this.clientInfo=clientInfo;
}
publicStringgetClientInfo(){
returnclientInfo;
}
publicvoidsetClientInfo(StringclientInfo){
this.clientInfo=clientInfo;
}
}
publicclassReplyServerBodyextendsReplyBody{
privateStringserverInfo;
publicReplyServerBody(StringserverInfo){
this.serverInfo=serverInfo;
}
publicStringgetServerInfo(){
returnserverInfo;
}
publicvoidsetServerInfo(StringserverInfo){
this.serverInfo=serverInfo;
}
}

2 Server端:主要包含对SocketChannel引用的Map,ChannelHandler的实现和Bootstrap.

Map:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
publicclassNettyChannelMap{
privatestaticMap<String,SocketChannel>map=newConcurrentHashMap<String,SocketChannel>();
publicstaticvoidadd(StringclientId,SocketChannelsocketChannel){
map.put(clientId,socketChannel);
}
publicstaticChannelget(StringclientId){
returnmap.get(clientId);
}
publicstaticvoidremove(SocketChannelsocketChannel){
for(Map.Entryentry:map.entrySet()){
if(entry.getValue()==socketChannel){
map.remove(entry.getKey());
}
}
}
}

Handler

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
publicclassNettyServerHandlerextendsSimpleChannelInboundHandler<BaseMsg>{
@Override
publicvoidchannelInactive(ChannelHandlerContextctx)throwsException{
//channel失效,从Map中移除
NettyChannelMap.remove((SocketChannel)ctx.channel());
}
@Override
protectedvoidmessageReceived(ChannelHandlerContextchannelHandlerContext,BaseMsgbaseMsg)throwsException{
if(MsgType.LOGIN.equals(baseMsg.getType())){
LoginMsgloginMsg=(LoginMsg)baseMsg;
if("robin".equals(loginMsg.getUserName())&&"yao".equals(loginMsg.getPassword())){
//登录成功,把channel存到服务端的map中
NettyChannelMap.add(loginMsg.getClientId(),(SocketChannel)channelHandlerContext.channel());
System.out.println("client"+loginMsg.getClientId()+"登录成功");
}
}else{
if(NettyChannelMap.get(baseMsg.getClientId())==null){
//说明未登录,或者连接断了,服务器向客户端发起登录请求,让客户端重新登录
LoginMsgloginMsg=newLoginMsg();
channelHandlerContext.channel().writeAndFlush(loginMsg);
}
}
switch(baseMsg.getType()){
casePING:{
PingMsgpingMsg=(PingMsg)baseMsg;
PingMsgreplyPing=newPingMsg();
NettyChannelMap.get(pingMsg.getClientId()).writeAndFlush(replyPing);
}break;
caseASK:{
//收到客户端的请求
AskMsgaskMsg=(AskMsg)baseMsg;
if("authToken".equals(askMsg.getParams().getAuth())){
ReplyServerBodyreplyBody=newReplyServerBody("serverinfo$$$$!!!");
ReplyMsgreplyMsg=newReplyMsg();
replyMsg.setBody(replyBody);
NettyChannelMap.get(askMsg.getClientId()).writeAndFlush(replyMsg);
}
}break;
caseREPLY:{
//收到客户端回复
ReplyMsgreplyMsg=(ReplyMsg)baseMsg;
ReplyClientBodyclientBody=(ReplyClientBody)replyMsg.getBody();
System.out.println("receiveclientmsg:"+clientBody.getClientInfo());
}break;
default:break;
}
ReferenceCountUtil.release(baseMsg);
}
}

ServerBootstrap:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
publicclassNettyServerBootstrap{
privateintport;
privateSocketChannelsocketChannel;
publicNettyServerBootstrap(intport)throwsInterruptedException{
this.port=port;
bind();
}
privatevoidbind()throwsInterruptedException{
EventLoopGroupboss=newNioEventLoopGroup();
EventLoopGroupworker=newNioEventLoopGroup();
ServerBootstrapbootstrap=newServerBootstrap();
bootstrap.group(boss,worker);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_BACKLOG,128);
//通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去
bootstrap.option(ChannelOption.TCP_NODELAY,true);
//保持长连接状态
bootstrap.childOption(ChannelOption.SO_KEEPALIVE,true);
bootstrap.childHandler(newChannelInitializer<SocketChannel>(){
@Override
protectedvoidinitChannel(SocketChannelsocketChannel)throwsException{
ChannelPipelinep=socketChannel.pipeline();
p.addLast(newObjectEncoder());
p.addLast(newObjectDecoder(ClassResolvers.cacheDisabled(null)));
p.addLast(newNettyServerHandler());
}
});
ChannelFuturef=bootstrap.bind(port).sync();
if(f.isSuccess()){
System.out.println("serverstart---------------");
}
}
publicstaticvoidmain(String[]args)throwsInterruptedException{
NettyServerBootstrapbootstrap=newNettyServerBootstrap(9999);
while(true){
SocketChannelchannel=(SocketChannel)NettyChannelMap.get("001");
if(channel!=null){
AskMsgaskMsg=newAskMsg();
channel.writeAndFlush(askMsg);
}
TimeUnit.SECONDS.sleep(5);
}
}
}

3 Client端:包含发起登录,发送心跳,及对应消息处理

handler

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
publicclassNettyClientHandlerextendsSimpleChannelInboundHandler<BaseMsg>{
//利用写空闲发送心跳检测消息
@Override
publicvoiduserEventTriggered(ChannelHandlerContextctx,Objectevt)throwsException{
if(evtinstanceofIdleStateEvent){
IdleStateEvente=(IdleStateEvent)evt;
switch(e.state()){
caseWRITER_IDLE:
PingMsgpingMsg=newPingMsg();
ctx.writeAndFlush(pingMsg);
System.out.println("sendpingtoserver----------");
break;
default:
break;
}
}
}
@Override
protectedvoidmessageReceived(ChannelHandlerContextchannelHandlerContext,BaseMsgbaseMsg)throwsException{
MsgTypemsgType=baseMsg.getType();
switch(msgType){
caseLOGIN:{
//向服务器发起登录
LoginMsgloginMsg=newLoginMsg();
loginMsg.setPassword("yao");
loginMsg.setUserName("robin");
channelHandlerContext.writeAndFlush(loginMsg);
}break;
casePING:{
System.out.println("receivepingfromserver----------");
}break;
caseASK:{
ReplyClientBodyreplyClientBody=newReplyClientBody("clientinfo****!!!");
ReplyMsgreplyMsg=newReplyMsg();
replyMsg.setBody(replyClientBody);
channelHandlerContext.writeAndFlush(replyMsg);
}break;
caseREPLY:{
ReplyMsgreplyMsg=(ReplyMsg)baseMsg;
ReplyServerBodyreplyServerBody=(ReplyServerBody)replyMsg.getBody();
System.out.println("receiveclientmsg:"+replyServerBody.getServerInfo());
}
default:break;
}
ReferenceCountUtil.release(msgType);
}
}

bootstrap

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
publicclassNettyClientBootstrap{
privateintport;
privateStringhost;
privateSocketChannelsocketChannel;
privatestaticfinalEventExecutorGroupgroup=newDefaultEventExecutorGroup(20);
publicNettyClientBootstrap(intport,Stringhost)throwsInterruptedException{
this.port=port;
this.host=host;
start();
}
privatevoidstart()throwsInterruptedException{
EventLoopGroupeventLoopGroup=newNioEventLoopGroup();
Bootstrapbootstrap=newBootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
bootstrap.group(eventLoopGroup);
bootstrap.remoteAddress(host,port);
bootstrap.handler(newChannelInitializer<SocketChannel>(){
@Override
protectedvoidinitChannel(SocketChannelsocketChannel)throwsException{
socketChannel.pipeline().addLast(newIdleStateHandler(20,10,0));
socketChannel.pipeline().addLast(newObjectEncoder());
socketChannel.pipeline().addLast(newObjectDecoder(ClassResolvers.cacheDisabled(null)));
socketChannel.pipeline().addLast(newNettyClientHandler());
}
});
ChannelFuturefuture=bootstrap.connect(host,port).sync();
if(future.isSuccess()){
socketChannel=(SocketChannel)future.channel();
System.out.println("connectserver成功---------");
}
}
publicstaticvoidmain(String[]args)throwsInterruptedException{
Constants.setClientId("001");
NettyClientBootstrapbootstrap=newNettyClientBootstrap(9999,"localhost");
LoginMsgloginMsg=newLoginMsg();
loginMsg.setPassword("yao");
loginMsg.setUserName("robin");
bootstrap.socketChannel.writeAndFlush(loginMsg);
while(true){
TimeUnit.SECONDS.sleep(3);
AskMsgaskMsg=newAskMsg();
AskParamsaskParams=newAskParams();
askParams.setAuth("authToken");
askMsg.setParams(askParams);
bootstrap.socketChannel.writeAndFlush(askMsg);
}
}
}

具体的例子和相应pom.xml 见https://github.com/WangErXiao/ServerClient

转发请注明来源:http://my.oschina.net/robinyao/blog/399060

分享到:
评论

相关推荐

    netty 实现长连接

    9. **实例代码**:通常会包含如何配置和启动一个支持长连接的Netty服务器,以及客户端如何连接和发送数据的示例代码。 由于没有具体的博客内容,以上是基于常见情况的分析。实际的博客可能会详细解释这些概念,并...

    netty5长连接.自动重连

    在“netty5长连接.自动重连”这个主题中,我们将深入探讨 Netty 如何实现长连接以及自动重连的机制。 首先,让我们理解什么是长连接。在传统的 TCP/IP 协议中,每次通信都需要建立一次连接(三次握手),完成数据...

    socket长连接,netty服务器与android源码

    Socket长连接和Netty框架是网络编程中的重要概念,尤其在Android应用开发和服务器通信中扮演着关键角色。本文将深入探讨这两个主题,并通过提供的源码进行实例解析。 首先,让我们了解一下Socket。Socket是网络通信...

    netty聊天室实例

    在“Netty聊天室实例”中,我们将深入理解Netty如何被用来构建一个实时的、多用户交互的聊天系统。这个实例将涵盖以下几个关键知识点: 1. **Netty基本架构**: - Netty的核心是其`Reactor`模型,它基于非阻塞I/O...

    Spring+Netty+WebSocket实例

    Netty提供了对WebSocket协议的全面支持,允许开发者直接在Netty服务器上处理WebSocket连接,提高了性能和灵活性。 在实际项目中,Spring与Netty的结合可以发挥各自的优势。Spring负责业务逻辑和路由控制,而Netty...

    netty并发服务器实例代码

    在描述中提到的"netty并发服务器实例代码",意味着这个压缩包可能包含了使用Netty框架搭建的一个服务器端应用的源码。通常,这样的服务器会处理大量的并发连接,并且能够高效地处理来自客户端的请求。Netty的非阻塞I...

    netty_server实例代码

    通过分析和运行"netty_server"实例代码,你可以更好地理解Netty框架如何与WebSocket协议结合,以及如何处理网络通信的各个阶段,包括连接建立、数据交换和连接关闭。此外,你还可以学习到如何自定义处理器以适应特定...

    netty简单实例

    当有新的连接建立时,Netty 将为每个连接创建一个新的 Channel 实例,并在该 Channel 上设置我们配置的 Pipeline。 4. **连接到服务器**:在客户端,我们使用 Bootstrap 连接到服务器的指定地址和端口。同样,...

    WebSocket利用netty连接入门项目

    - `Server端`:Java代码,使用Netty实现WebSocket服务器,处理客户端连接和消息收发。 - `Client端`:HTML文件,包含JavaScript代码,负责建立WebSocket连接,发送和接收消息。 5. **HTML前端** HTML文件通常会...

    NettySocket同步数据获取实现

    在Netty中,我们创建一个ServerBootstrap实例来配置服务器,并设置一个处理器Pipeline,用于处理进来的连接和消息。然后,我们调用bind方法启动服务器并等待客户端连接。当接收到客户端消息时,我们可以在...

    Netty实现长连接通讯-连接协议为了简单json封装

    为了实现长连接,我们需要配置`Bootstrap`实例,设置心跳机制以保持连接活跃。心跳消息可以在`MyBusinessHandler`中定时发送,当收到对端的心跳回应时,确认连接依然有效。 ```java public class MyBusinessHandler...

    netty与socket交互实例

    4. **Netty处理Client连接**:当Netty服务器接收到新的连接请求时,它会创建一个新的Channel实例,该实例将关联到接收到连接的Socket。然后,服务器可以使用ChannelHandlerContext发送消息到客户端。 5. **数据传输...

    netty5完整配置实例

    这个“netty5完整配置实例”显然旨在帮助开发者理解和使用Netty 5版本,考虑到Netty的不同版本间的确存在显著差异,这个实例应该包含了Netty 5的关键配置和示例代码。 首先,Netty 5可能已经不被广泛使用,因为最新...

    Netty UDP协议网络打洞实例

    Netty UDP协议网络打洞实例是利用Netty框架在UDP(User Datagram Protocol)协议基础上实现的一种穿透NAT(Network Address Translation)的技术。NAT技术在现代互联网中广泛使用,它允许内部网络中的设备共享一个...

    netty socketio 在线聊天程序

    1. **异步IO模型**:Netty基于NIO(非阻塞I/O)模型,提供了一种高效的事件驱动的编程模型,能够处理大量并发连接。 2. **高度可定制**:Netty提供了丰富的ChannelHandler接口,允许开发者自定义处理各种网络事件,...

    Netty最容易懂的实例和教材

    通过这些实例,你可以了解到Netty的Bootstrap、Channel、Handler、Pipeline等核心组件的工作方式,以及如何处理网络事件如连接建立、数据读写、异常处理等。这些实例将帮助你理解Netty的非阻塞I/O模型和事件驱动架构...

    netty4服务端客户端实例

    在本文中,我们将深入探讨如何利用Netty 4构建一个简单的服务端和客户端实例,以及如何在IntelliJ IDEA 2018.1.3这个强大的Java开发环境中设置和运行这些实例。 首先,Netty 4相较于之前的版本,引入了更多的性能...

    spring+netty+mybatis整合实例

    当客户端连接到服务器并发送一个用户名时,Netty服务器会通过MyBatis从数据库查询相应的用户信息,并将结果回传给客户端。这个简单的示例展示了这三个框架的强大结合,可以在实际项目中构建高效、灵活的网络应用。在...

    netty3的完整配置实例

    在本文中,我们将深入探讨 Netty 3 的配置实例,这对于理解 Netty 的基本工作原理以及如何在实际项目中使用它是至关重要的。 首先,Netty 3 版本相较于更现代的版本,例如 Netty 4,存在一些显著的差异。例如,...

    android netty使用demo

    总之,"android netty使用demo"提供了一个基础的Android Netty应用实例,展示了如何在Android环境中使用Netty实现长连接通信。然而,实际开发中,还需要考虑更多的细节和优化,以满足复杂的网络应用场景。

Global site tag (gtag.js) - Google Analytics