基本思路:netty服务端通过一个Map保存所有连接上来的客户端SocketChannel,客户端的Id作为Map的key。每次服务器端如果要向某个客户端发送消息,只需根据ClientId取出对应的SocketChannel,往里面写入message即可。心跳检测通过IdleEvent 事件,定时向服务端放送Ping消息,检测SocketChannel是否终断。
环境JDK1.8 和netty5
以下是具体的代码实现和介绍:
1公共的Share部分(主要包含消息协议类型的定义)
设计消息类型:
public enum MsgType {
PING,ASK,REPLY,LOGIN
}
Message基类:
//必须实现序列,serialVersionUID 一定要有,否者在netty消息序列化反序列化会有问题,接收不到消息!!!
public abstract class BaseMsg implements Serializable {
private static final long serialVersionUID = 1L;
private MsgType type;
//必须唯一,否者会出现channel调用混乱
private String clientId;
//初始化客户端id
public BaseMsg() {
this.clientId = Constants.getClientId();
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public MsgType getType() {
return type;
}
public void setType(MsgType type) {
this.type = type;
}
}
常量设置:
public class Constants {
private static String clientId;
public static String getClientId() {
return clientId;
}
public static void setClientId(String clientId) {
Constants.clientId = clientId;
}
}
登录类型消息:
public class LoginMsg extends BaseMsg {
private String userName;
private String password;
public LoginMsg() {
super();
setType(MsgType.LOGIN);
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}
心跳检测Ping类型消息:
public class PingMsg extends BaseMsg {
public PingMsg() {
super();
setType(MsgType.PING);
}
}
请求类型消息:
public class AskMsg extends BaseMsg {
public AskMsg() {
super();
setType(MsgType.ASK);
}
private AskParams params;
public AskParams getParams() {
return params;
}
public void setParams(AskParams params) {
this.params = params;
}
}
//请求类型参数
//必须实现序列化接口
public class AskParams implements Serializable {
private static final long serialVersionUID = 1L;
private String auth;
public String getAuth() {
return auth;
}
public void setAuth(String auth) {
this.auth = auth;
}
}
响应类型消息:
public class ReplyMsg extends BaseMsg {
public ReplyMsg() {
super();
setType(MsgType.REPLY);
}
private ReplyBody body;
public ReplyBody getBody() {
return body;
}
public void setBody(ReplyBody body) {
this.body = body;
}
}
//相应类型body对像
public class ReplyBody implements Serializable {
private static final long serialVersionUID = 1L;
}
public class ReplyClientBody extends ReplyBody {
private String clientInfo;
public ReplyClientBody(String clientInfo) {
this.clientInfo = clientInfo;
}
public String getClientInfo() {
return clientInfo;
}
public void setClientInfo(String clientInfo) {
this.clientInfo = clientInfo;
}
}
public class ReplyServerBody extends ReplyBody {
private String serverInfo;
public ReplyServerBody(String serverInfo) {
this.serverInfo = serverInfo;
}
public String getServerInfo() {
return serverInfo;
}
public void setServerInfo(String serverInfo) {
this.serverInfo = serverInfo;
}
}
2 Server端:主要包含对SocketChannel引用的Map,ChannelHandler的实现和Bootstrap.
Map:
public class NettyChannelMap {
private static Map<String,SocketChannel> map=new ConcurrentHashMap<String, SocketChannel>();
public static void add(String clientId,SocketChannel socketChannel){
map.put(clientId,socketChannel);
}
public static Channel get(String clientId){
return map.get(clientId);
}
public static void remove(SocketChannel socketChannel){
for (Map.Entry entry:map.entrySet()){
if (entry.getValue()==socketChannel){
map.remove(entry.getKey());
}
}
}
}
Handler
public class NettyServerHandler extends SimpleChannelInboundHandler<BaseMsg> {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//channel失效,从Map中移除
NettyChannelMap.remove((SocketChannel)ctx.channel());
}
@Override
protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception {
if(MsgType.LOGIN.equals(baseMsg.getType())){
LoginMsg loginMsg=(LoginMsg)baseMsg;
if("robin".equals(loginMsg.getUserName())&&"yao".equals(loginMsg.getPassword())){
//登录成功,把channel存到服务端的map中
NettyChannelMap.add(loginMsg.getClientId(),(SocketChannel)channelHandlerContext.channel());
System.out.println("client"+loginMsg.getClientId()+" 登录成功");
}
}else{
if(NettyChannelMap.get(baseMsg.getClientId())==null){
//说明未登录,或者连接断了,服务器向客户端发起登录请求,让客户端重新登录
LoginMsg loginMsg=new LoginMsg();
channelHandlerContext.channel().writeAndFlush(loginMsg);
}
}
switch (baseMsg.getType()){
case PING:{
PingMsg pingMsg=(PingMsg)baseMsg;
PingMsg replyPing=new PingMsg();
NettyChannelMap.get(pingMsg.getClientId()).writeAndFlush(replyPing);
}break;
case ASK:{
//收到客户端的请求
AskMsg askMsg=(AskMsg)baseMsg;
if("authToken".equals(askMsg.getParams().getAuth())){
ReplyServerBody replyBody=new ReplyServerBody("server info $$$$ !!!");
ReplyMsg replyMsg=new ReplyMsg();
replyMsg.setBody(replyBody);
NettyChannelMap.get(askMsg.getClientId()).writeAndFlush(replyMsg);
}
}break;
case REPLY:{
//收到客户端
ReplyMsg replyMsg=(ReplyMsg)baseMsg;
ReplyClientBody clientBody=(ReplyClientBody)replyMsg.getBody();
System.out.println("receive client msg: "+clientBody.getClientInfo());
}break;
default:break;
}
ReferenceCountUtil.release(baseMsg);
}
}
ServerBootstrap:
public class NettyServerBootstrap {
private int port;
private SocketChannel socketChannel;
public NettyServerBootstrap(int port) throws InterruptedException {
this.port = port;
bind();
}
private void bind() throws InterruptedException {
EventLoopGroup boss=new NioEventLoopGroup();
EventLoopGroup worker=new NioEventLoopGroup();
ServerBootstrap bootstrap=new ServerBootstrap();
bootstrap.group(boss,worker);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_BACKLOG, 128);
//通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去
bootstrap.option(ChannelOption.TCP_NODELAY, true);
//保持长连接状态
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
p.addLast(new ObjectEncoder());
p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
p.addLast(new NettyServerHandler());
}
});
ChannelFuture f= bootstrap.bind(port).sync();
if(f.isSuccess()){
System.out.println("server start");
}
}
public static void main(String []args) throws InterruptedException {
NettyServerBootstrap bootstrap=new NettyServerBootstrap(9);
while (true){
SocketChannel channel=(SocketChannel)NettyChannelMap.get("001");
if(channel!=null){
AskMsg askMsg=new AskMsg();
channel.writeAndFlush(askMsg);
}
TimeUnit.SECONDS.sleep(5);
}
}
}
3 Client端:包含发起登录,发送心跳,及对应消息处理
handler
public class NettyClientHandler extends SimpleChannelInboundHandler<BaseMsg> {
//利用写空闲发送心跳检测消息
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
switch (e.state()) {
case WRITER_IDLE:
PingMsg pingMsg=new PingMsg();
ctx.writeAndFlush(pingMsg);
System.out.println("send ping to server-");
break;
default:
break;
}
}
}
@Override
protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception {
MsgType msgType=baseMsg.getType();
switch (msgType){
case LOGIN:{
//向服务器发起登录
LoginMsg loginMsg=new LoginMsg();
loginMsg.setPassword("yao");
loginMsg.setUserName("robin");
channelHandlerContext.writeAndFlush(loginMsg);
}break;
case PING:{
System.out.println("receive ping from server-");
}break;
case ASK:{
ReplyClientBody replyClientBody=new ReplyClientBody("client info **** !!!");
ReplyMsg replyMsg=new ReplyMsg();
replyMsg.setBody(replyClientBody);
channelHandlerContext.writeAndFlush(replyMsg);
}break;
case REPLY:{
ReplyMsg replyMsg=(ReplyMsg)baseMsg;
ReplyServerBody replyServerBody=(ReplyServerBody)replyMsg.getBody();
System.out.println("receive client msg: "+replyServerBody.getServerInfo());
}
default:break;
}
ReferenceCountUtil.release(msgType);
}
}
bootstrap
public class NettyClientBootstrap {
private int port;
private String host;
private SocketChannel socketChannel;
private static final EventExecutorGroup group = new DefaultEventExecutorGroup(20);
public NettyClientBootstrap(int port, String host) throws InterruptedException {
this.port = port;
this.host = host;
start();
}
private void start() throws InterruptedException {
EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
Bootstrap bootstrap=new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
bootstrap.group(eventLoopGroup);
bootstrap.remoteAddress(host,port);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new IdleStateHandler(20,10,0));
socketChannel.pipeline().addLast(new ObjectEncoder());
socketChannel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
ChannelFuture future =bootstrap.connect(host,port).sync();
if (future.isSuccess()) {
socketChannel = (SocketChannel)future.channel();
System.out.println("connect server 成功");
}
}
public static void main(String[]args) throws InterruptedException {
Constants.setClientId("001");
NettyClientBootstrap bootstrap=new NettyClientBootstrap(9,"localhost");
LoginMsg loginMsg=new LoginMsg();
loginMsg.setPassword("yao");
loginMsg.setUserName("robin");
bootstrap.socketChannel.writeAndFlush(loginMsg);
while (true){
TimeUnit.SECONDS.sleep(3);
AskMsg askMsg=new AskMsg();
AskParams askParams=new AskParams();
askParams.setAuth("authToken");
askMsg.setParams(askParams);
bootstrap.socketChannel.writeAndFlush(askMsg);
}
}
}
分享到:
相关推荐
综上所述,"java应用netty服务端和客户端"的示例涵盖了Netty框架的基本使用,包括服务器和客户端的启动、连接、数据传输以及模型对象的一致性。理解并掌握这些知识点对于构建基于Java的高性能网络应用至关重要。在...
在Netty4中,通过Channel和ChannelHandlerContext实现长连接,它们是Netty的核心组件,Channel代表网络连接,而ChannelHandlerContext则用于处理与该连接相关的事件。 断开重连机制是确保网络通信稳定的关键。在...
客户端的Pipeline配置可以与服务端类似,但通常会缺少服务器的接受新连接的部分。 心跳机制在长连接中非常重要,它可以确保连接的活跃性。在Netty中,我们可以自定义HeartbeatHandler,定期向对端发送心跳消息,并...
综上所述,通过 Netty 实现 WebSocket 服务,并结合心跳检测和断线重连机制,我们可以构建出健壮、可靠的实时通信系统。在实际项目中,还需要根据业务需求进行调整和优化,确保系统的稳定性和用户体验。
2. **心跳机制**:为了检测连接状态和防止长时间无数据交换导致的连接超时,客户端通常会定期发送心跳包,服务端回应确认,从而维持连接的活跃状态。 3. **数据同步与一致性**:客户端在接收到服务端数据后,可能...
本文将深入探讨“断网断电心跳检测”这一技术主题,以及它与Netty心跳检测的关联。 WebSocket协议允许服务器和客户端之间建立持久连接,允许双向数据传输,极大地简化了实时应用的开发。然而,当网络中断或设备断电...
1. **创建Bootstrap**: 客户端也需要创建一个Bootstrap实例,但配置信息与服务端略有不同,例如,可能需要指定连接的远程地址。 2. **配置ChannelInitializer**: 客户端也需要定义一个ChannelInitializer,用于配置...
5. **心跳机制**:WebSocket连接是持久的,为了检测连接是否仍然活跃,通常需要实现心跳机制。在Netty中,可以通过添加自定义的ChannelInboundHandler来定期发送和检查心跳消息。 以下是一个简单的示例代码片段,...
浏览器与服务器保持长连接,定时心跳检测; 服务器 服务器端使用Netty作为通信框架,支持客户端通过WebSocket通信。服务器会检测链路是否处于空闲,如果60秒内没有收到客户端的任何消息,那么服务器会主动关闭该...
- 心跳机制:为了检测连接是否存活,可以设置心跳机制,定期发送Ping/Pong帧。 - 安全性:确保使用安全的TLS/SSL连接,并对传输的数据进行加密,防止中间人攻击。 在实际项目中,我们还可以使用Netty的...
1、客户端与服务端基于单一长连接进行通信,客户端一条连接被多个线程使用。 2、实现私有协议 请求协议 协议头:4字节的请求序列号 2字节的请求类型 2字节数据包长度 数据部分: 服务调用:2字节请求服务名...
6. **心跳检测**:定期发送PING/PONG报文以检测客户端连接状态,及时关闭无效连接。 通过以上步骤,我们可以将这个简单的MQTT服务端Demo升级为一个功能完备、健壮的服务,能够满足实际项目的需求。
在本文中,我们将深入探讨如何利用 Netty 实现Socket同步数据获取,同时涵盖心跳检测、客户端连接控制以及客户端登录等功能。 首先,让我们理解同步数据获取的基本概念。在传统的基于线程的模型中,同步意味着发送...
本文将深入探讨如何使用SpringBoot整合Netty4实现即时通讯,并结合Zookeeper进行服务发现与管理,同时包含心跳检测和下线重连机制。 **一、SpringBoot与Netty集成** SpringBoot以其简化Spring应用初始搭建以及开发...
- Netty服务端会创建一个ServerBootstrap,配置好处理器链,用于接收客户端连接并处理数据。服务器端需要实现一个自定义的ChannelInboundHandlerAdapter,处理接收到的POJO对象,将其序列化后通过socket发送到...
本项目包括服务端和客户端两部分,为理解 Netty 在即时通讯中的应用提供了实践基础。 1. **Netty 框架介绍** - Netty 是一个非阻塞的 I/O 框架,它基于 Java NIO(非阻塞 I/O)实现,提供了高性能、低延迟的网络...
2. 创建客户端心跳机制:通过使用 IdleStateHandler 可以实现客户端的心跳机制,当客户端空闲时间太长时会自动发送心跳信息以维持连接。 3. 实现服务端心跳机制:服务端也可以实现心跳机制,以便维持与客户端的连接...
1. **服务端与客户端的启动机制**:了解如何初始化Netty服务端和客户端,以及它们的工作原理。 2. **数据载体ByteBuf的使用**:掌握ByteBuf的用法,它是Netty中用于处理二进制数据的核心类。 3. **自定义协议设计**...
连接断开时,客户端的离线状态处理; 心跳超时处理; 消息发布/订阅处理; 4.HttpServerHandler类实现对http消息的自定义处理。该handle类包含以下内容: 对一次完整的http请求进行解码然后处理该请求。
Netty 入门与实战:仿写微信 IM 即时通讯系统,掘金小册子,netty教程。章节齐全无缺失,排版非常不错。 1.仿微信IM系统简介 1 2.Netty是什么? 2 3.服务端启动流程 8 4.客户端启动流程 11 ...16.心跳与空闲检测 25