- 浏览: 147311 次
- 性别:
- 来自: 北京
文章分类
最新评论
netty长连接实例
通过netty实现服务端与客户端的长连接通讯,及心跳检测。
基本思路:netty服务端通过一个Map保存所有连接上来的客户端SocketChannel,客户端的Id作为Map的key。每次服务器端如果要向某个客户端发送消息,只需根据ClientId取出对应的SocketChannel,往里面写入message即可。心跳检测通过IdleEvent事件,定时向服务端放送Ping消息,检测SocketChannel是否终断。
环境JDK1.8 和netty5
以下是具体的代码实现和介绍:
1公共的Share部分(主要包含消息协议类型的定义)
设计消息类型:
1
2
3
|
public enum MsgType{
PING,ASK,REPLY,LOGIN
}
|
Message基类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
//必须实现序列,serialVersionUID一定要有,否者在netty消息序列化反序列化会有问题,接收不到消息!!!
public abstract class BaseMsg implements Serializable{
private static final long serialVersionUID=1L;
private MsgTypetype;
//必须唯一,否者会出现channel调用混乱
private StringclientId;
//初始化客户端id
public BaseMsg(){
this .clientId=Constants.getClientId();
}
public StringgetClientId(){
return clientId;
}
public void setClientId(StringclientId){
this .clientId=clientId;
}
public MsgTypegetType(){
return type;
}
public void setType(MsgTypetype){
this .type=type;
}
}
|
常量设置:
1
2
3
4
5
6
7
8
9
|
public class Constants{
private static StringclientId;
public static StringgetClientId(){
return clientId;
}
public static void setClientId(StringclientId){
Constants.clientId=clientId;
}
}
|
登录类型消息:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
public class LoginMsg extends BaseMsg{
private StringuserName;
private Stringpassword;
public LoginMsg(){
super ();
setType(MsgType.LOGIN);
}
public StringgetUserName(){
return userName;
}
public void setUserName(StringuserName){
this .userName=userName;
}
public StringgetPassword(){
return password;
}
public void setPassword(Stringpassword){
this .password=password;
}
}
|
心跳检测Ping类型消息:
1
2
3
4
5
6
|
public class PingMsg extends BaseMsg{
public PingMsg(){
super ();
setType(MsgType.PING);
}
}
|
请求类型消息:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
public class AskMsg extends BaseMsg{
public AskMsg(){
super ();
setType(MsgType.ASK);
}
private AskParamsparams;
public AskParamsgetParams(){
return params;
}
public void setParams(AskParamsparams){
this .params=params;
}
}
//请求类型参数
//必须实现序列化接口
public class AskParams implements Serializable{
private static final long serialVersionUID=1L;
private Stringauth;
public StringgetAuth(){
return auth;
}
public void setAuth(Stringauth){
this .auth=auth;
}
}
|
响应类型消息:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
public class ReplyMsg extends BaseMsg{
public ReplyMsg(){
super ();
setType(MsgType.REPLY);
}
private ReplyBodybody;
public ReplyBodygetBody(){
return body;
}
public void setBody(ReplyBodybody){
this .body=body;
}
}
//相应类型body对像
public class ReplyBody implements Serializable{
private static final long serialVersionUID=1L;
}
public class ReplyClientBody extends ReplyBody{
private StringclientInfo;
public ReplyClientBody(StringclientInfo){
this .clientInfo=clientInfo;
}
public StringgetClientInfo(){
return clientInfo;
}
public void setClientInfo(StringclientInfo){
this .clientInfo=clientInfo;
}
}
public class ReplyServerBody extends ReplyBody{
private StringserverInfo;
public ReplyServerBody(StringserverInfo){
this .serverInfo=serverInfo;
}
public StringgetServerInfo(){
return serverInfo;
}
public void setServerInfo(StringserverInfo){
this .serverInfo=serverInfo;
}
}
|
2 Server端:主要包含对SocketChannel引用的Map,ChannelHandler的实现和Bootstrap.
Map:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
public class NettyChannelMap{
private static Map<String,SocketChannel>map= new ConcurrentHashMap<String,SocketChannel>();
public static void add(StringclientId,SocketChannelsocketChannel){
map.put(clientId,socketChannel);
}
public static Channelget(StringclientId){
return map.get(clientId);
}
public static void remove(SocketChannelsocketChannel){
for (Map.Entryentry:map.entrySet()){
if (entry.getValue()==socketChannel){
map.remove(entry.getKey());
}
}
}
}
|
Handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
public class NettyServerHandler extends SimpleChannelInboundHandler<BaseMsg>{
@Override
public void channelInactive(ChannelHandlerContextctx) throws Exception{
//channel失效,从Map中移除
NettyChannelMap.remove((SocketChannel)ctx.channel());
}
@Override
protected void messageReceived(ChannelHandlerContextchannelHandlerContext,BaseMsgbaseMsg) throws Exception{
if (MsgType.LOGIN.equals(baseMsg.getType())){
LoginMsgloginMsg=(LoginMsg)baseMsg;
if ( "robin" .equals(loginMsg.getUserName())&& "yao" .equals(loginMsg.getPassword())){
//登录成功,把channel存到服务端的map中
NettyChannelMap.add(loginMsg.getClientId(),(SocketChannel)channelHandlerContext.channel());
System.out.println( "client" +loginMsg.getClientId()+ "登录成功" );
}
} else {
if (NettyChannelMap.get(baseMsg.getClientId())== null ){
//说明未登录,或者连接断了,服务器向客户端发起登录请求,让客户端重新登录
LoginMsgloginMsg= new LoginMsg();
channelHandlerContext.channel().writeAndFlush(loginMsg);
}
}
switch (baseMsg.getType()){
case PING:{
PingMsgpingMsg=(PingMsg)baseMsg;
PingMsgreplyPing= new PingMsg();
NettyChannelMap.get(pingMsg.getClientId()).writeAndFlush(replyPing);
} break ;
case ASK:{
//收到客户端的请求
AskMsgaskMsg=(AskMsg)baseMsg;
if ( "authToken" .equals(askMsg.getParams().getAuth())){
ReplyServerBodyreplyBody= new ReplyServerBody( "serverinfo$$$$!!!" );
ReplyMsgreplyMsg= new ReplyMsg();
replyMsg.setBody(replyBody);
NettyChannelMap.get(askMsg.getClientId()).writeAndFlush(replyMsg);
}
} break ;
case REPLY:{
//收到客户端回复
ReplyMsgreplyMsg=(ReplyMsg)baseMsg;
ReplyClientBodyclientBody=(ReplyClientBody)replyMsg.getBody();
System.out.println( "receiveclientmsg:" +clientBody.getClientInfo());
} break ;
default : break ;
}
ReferenceCountUtil.release(baseMsg);
}
}
|
ServerBootstrap:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
public class NettyServerBootstrap{
private int port;
private SocketChannelsocketChannel;
public NettyServerBootstrap( int port) throws InterruptedException{
this .port=port;
bind();
}
private void bind() throws InterruptedException{
EventLoopGroupboss= new NioEventLoopGroup();
EventLoopGroupworker= new NioEventLoopGroup();
ServerBootstrapbootstrap= new ServerBootstrap();
bootstrap.group(boss,worker);
bootstrap.channel(NioServerSocketChannel. class );
bootstrap.option(ChannelOption.SO_BACKLOG, 128 );
//通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去
bootstrap.option(ChannelOption.TCP_NODELAY, true );
//保持长连接状态
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true );
bootstrap.childHandler( new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannelsocketChannel) throws Exception{
ChannelPipelinep=socketChannel.pipeline();
p.addLast( new ObjectEncoder());
p.addLast( new ObjectDecoder(ClassResolvers.cacheDisabled( null )));
p.addLast( new NettyServerHandler());
}
});
ChannelFuturef=bootstrap.bind(port).sync();
if (f.isSuccess()){
System.out.println( "serverstart---------------" );
}
}
public static void main(String[]args) throws InterruptedException{
NettyServerBootstrapbootstrap= new NettyServerBootstrap( 9999 );
while ( true ){
SocketChannelchannel=(SocketChannel)NettyChannelMap.get( "001" );
if (channel!= null ){
AskMsgaskMsg= new AskMsg();
channel.writeAndFlush(askMsg);
}
TimeUnit.SECONDS.sleep( 5 );
}
}
}
|
3 Client端:包含发起登录,发送心跳,及对应消息处理
handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
public class NettyClientHandler extends SimpleChannelInboundHandler<BaseMsg>{
//利用写空闲发送心跳检测消息
@Override
public void userEventTriggered(ChannelHandlerContextctx,Objectevt) throws Exception{
if (evt instanceof IdleStateEvent){
IdleStateEvente=(IdleStateEvent)evt;
switch (e.state()){
case WRITER_IDLE:
PingMsgpingMsg= new PingMsg();
ctx.writeAndFlush(pingMsg);
System.out.println( "sendpingtoserver----------" );
break ;
default :
break ;
}
}
}
@Override
protected void messageReceived(ChannelHandlerContextchannelHandlerContext,BaseMsgbaseMsg) throws Exception{
MsgTypemsgType=baseMsg.getType();
switch (msgType){
case LOGIN:{
//向服务器发起登录
LoginMsgloginMsg= new LoginMsg();
loginMsg.setPassword( "yao" );
loginMsg.setUserName( "robin" );
channelHandlerContext.writeAndFlush(loginMsg);
} break ;
case PING:{
System.out.println( "receivepingfromserver----------" );
} break ;
case ASK:{
ReplyClientBodyreplyClientBody= new ReplyClientBody( "clientinfo****!!!" );
ReplyMsgreplyMsg= new ReplyMsg();
replyMsg.setBody(replyClientBody);
channelHandlerContext.writeAndFlush(replyMsg);
} break ;
case REPLY:{
ReplyMsgreplyMsg=(ReplyMsg)baseMsg;
ReplyServerBodyreplyServerBody=(ReplyServerBody)replyMsg.getBody();
System.out.println( "receiveclientmsg:" +replyServerBody.getServerInfo());
}
default : break ;
}
ReferenceCountUtil.release(msgType);
}
}
|
bootstrap
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
public class NettyClientBootstrap{
private int port;
private Stringhost;
private SocketChannelsocketChannel;
private static final EventExecutorGroupgroup= new DefaultEventExecutorGroup( 20 );
public NettyClientBootstrap( int port,Stringhost) throws InterruptedException{
this .port=port;
this .host=host;
start();
}
private void start() throws InterruptedException{
EventLoopGroupeventLoopGroup= new NioEventLoopGroup();
Bootstrapbootstrap= new Bootstrap();
bootstrap.channel(NioSocketChannel. class );
bootstrap.option(ChannelOption.SO_KEEPALIVE, true );
bootstrap.group(eventLoopGroup);
bootstrap.remoteAddress(host,port);
bootstrap.handler( new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannelsocketChannel) throws Exception{
socketChannel.pipeline().addLast( new IdleStateHandler( 20 , 10 , 0 ));
socketChannel.pipeline().addLast( new ObjectEncoder());
socketChannel.pipeline().addLast( new ObjectDecoder(ClassResolvers.cacheDisabled( null )));
socketChannel.pipeline().addLast( new NettyClientHandler());
}
});
ChannelFuturefuture=bootstrap.connect(host,port).sync();
if (future.isSuccess()){
socketChannel=(SocketChannel)future.channel();
System.out.println( "connectserver成功---------" );
}
}
public static void main(String[]args) throws InterruptedException{
Constants.setClientId( "001" );
NettyClientBootstrapbootstrap= new NettyClientBootstrap( 9999 , "localhost" );
LoginMsgloginMsg= new LoginMsg();
loginMsg.setPassword( "yao" );
loginMsg.setUserName( "robin" );
bootstrap.socketChannel.writeAndFlush(loginMsg);
while ( true ){
TimeUnit.SECONDS.sleep( 3 );
AskMsgaskMsg= new AskMsg();
AskParamsaskParams= new AskParams();
askParams.setAuth( "authToken" );
askMsg.setParams(askParams);
bootstrap.socketChannel.writeAndFlush(askMsg);
}
}
}
|
具体的例子和相应pom.xml 见https://github.com/WangErXiao/ServerClient
转发请注明来源:http://my.oschina.net/robinyao/blog/399060
相关推荐
9. **实例代码**:通常会包含如何配置和启动一个支持长连接的Netty服务器,以及客户端如何连接和发送数据的示例代码。 由于没有具体的博客内容,以上是基于常见情况的分析。实际的博客可能会详细解释这些概念,并...
在“netty5长连接.自动重连”这个主题中,我们将深入探讨 Netty 如何实现长连接以及自动重连的机制。 首先,让我们理解什么是长连接。在传统的 TCP/IP 协议中,每次通信都需要建立一次连接(三次握手),完成数据...
Socket长连接和Netty框架是网络编程中的重要概念,尤其在Android应用开发和服务器通信中扮演着关键角色。本文将深入探讨这两个主题,并通过提供的源码进行实例解析。 首先,让我们了解一下Socket。Socket是网络通信...
在“Netty聊天室实例”中,我们将深入理解Netty如何被用来构建一个实时的、多用户交互的聊天系统。这个实例将涵盖以下几个关键知识点: 1. **Netty基本架构**: - Netty的核心是其`Reactor`模型,它基于非阻塞I/O...
Netty提供了对WebSocket协议的全面支持,允许开发者直接在Netty服务器上处理WebSocket连接,提高了性能和灵活性。 在实际项目中,Spring与Netty的结合可以发挥各自的优势。Spring负责业务逻辑和路由控制,而Netty...
在描述中提到的"netty并发服务器实例代码",意味着这个压缩包可能包含了使用Netty框架搭建的一个服务器端应用的源码。通常,这样的服务器会处理大量的并发连接,并且能够高效地处理来自客户端的请求。Netty的非阻塞I...
通过分析和运行"netty_server"实例代码,你可以更好地理解Netty框架如何与WebSocket协议结合,以及如何处理网络通信的各个阶段,包括连接建立、数据交换和连接关闭。此外,你还可以学习到如何自定义处理器以适应特定...
当有新的连接建立时,Netty 将为每个连接创建一个新的 Channel 实例,并在该 Channel 上设置我们配置的 Pipeline。 4. **连接到服务器**:在客户端,我们使用 Bootstrap 连接到服务器的指定地址和端口。同样,...
- `Server端`:Java代码,使用Netty实现WebSocket服务器,处理客户端连接和消息收发。 - `Client端`:HTML文件,包含JavaScript代码,负责建立WebSocket连接,发送和接收消息。 5. **HTML前端** HTML文件通常会...
在Netty中,我们创建一个ServerBootstrap实例来配置服务器,并设置一个处理器Pipeline,用于处理进来的连接和消息。然后,我们调用bind方法启动服务器并等待客户端连接。当接收到客户端消息时,我们可以在...
为了实现长连接,我们需要配置`Bootstrap`实例,设置心跳机制以保持连接活跃。心跳消息可以在`MyBusinessHandler`中定时发送,当收到对端的心跳回应时,确认连接依然有效。 ```java public class MyBusinessHandler...
4. **Netty处理Client连接**:当Netty服务器接收到新的连接请求时,它会创建一个新的Channel实例,该实例将关联到接收到连接的Socket。然后,服务器可以使用ChannelHandlerContext发送消息到客户端。 5. **数据传输...
这个“netty5完整配置实例”显然旨在帮助开发者理解和使用Netty 5版本,考虑到Netty的不同版本间的确存在显著差异,这个实例应该包含了Netty 5的关键配置和示例代码。 首先,Netty 5可能已经不被广泛使用,因为最新...
Netty UDP协议网络打洞实例是利用Netty框架在UDP(User Datagram Protocol)协议基础上实现的一种穿透NAT(Network Address Translation)的技术。NAT技术在现代互联网中广泛使用,它允许内部网络中的设备共享一个...
1. **异步IO模型**:Netty基于NIO(非阻塞I/O)模型,提供了一种高效的事件驱动的编程模型,能够处理大量并发连接。 2. **高度可定制**:Netty提供了丰富的ChannelHandler接口,允许开发者自定义处理各种网络事件,...
通过这些实例,你可以了解到Netty的Bootstrap、Channel、Handler、Pipeline等核心组件的工作方式,以及如何处理网络事件如连接建立、数据读写、异常处理等。这些实例将帮助你理解Netty的非阻塞I/O模型和事件驱动架构...
在本文中,我们将深入探讨如何利用Netty 4构建一个简单的服务端和客户端实例,以及如何在IntelliJ IDEA 2018.1.3这个强大的Java开发环境中设置和运行这些实例。 首先,Netty 4相较于之前的版本,引入了更多的性能...
当客户端连接到服务器并发送一个用户名时,Netty服务器会通过MyBatis从数据库查询相应的用户信息,并将结果回传给客户端。这个简单的示例展示了这三个框架的强大结合,可以在实际项目中构建高效、灵活的网络应用。在...
在本文中,我们将深入探讨 Netty 3 的配置实例,这对于理解 Netty 的基本工作原理以及如何在实际项目中使用它是至关重要的。 首先,Netty 3 版本相较于更现代的版本,例如 Netty 4,存在一些显著的差异。例如,...
总之,"android netty使用demo"提供了一个基础的Android Netty应用实例,展示了如何在Android环境中使用Netty实现长连接通信。然而,实际开发中,还需要考虑更多的细节和优化,以满足复杂的网络应用场景。