示例工程代码
可从附件下载
具体的说明和用法在后面介绍
需求与目的
一个游戏服务端需要处理各种业务逻辑,每一种业务逻辑都对应着一个请求消息和一个响应消息。那么服务端需要把这些不同的消息自动分发到对应的业务逻辑中处理。
最简单的处理方式就是根据请求消息中的type字段,使用switch case来进行分别处理,但这种方式随着消息的增多,显现了一些坏味道:长长的一大坨不太好看;如果要添加新的消息、新的逻辑,或者去掉新的消息、新的逻辑,在代码上不但要修改这些消息和逻辑,还不得不修改这长长的一坨swtich case,这样的修改显得很多余。
所以我们的目的就是把消息分发这块的代码自动化,在增加、修改、删除消息和逻辑的时候不需要再对消息分发的代码再做修改,从而使得修改的代码最小化。
实现原理
在实现中,使用了注解(annotation)
package com.company.game.dispatcher.annotation; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * 修饰消息类和业务逻辑执行类 * msgType指定对应的类型,从1开始计数 * @author xingchencheng * */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) public @interface UserMsgAndExecAnnotation { short msgType(); }
唯一的字段msgType代表了消息类型,这是客户端与服务端的约定,这里我们从1开始计数。
当我们要增加一个加法消息时,就使用这个注解来修饰我们的请求消息类:
package com.company.game.dispatcher.msg; import com.company.game.dispatcher.annotation.UserMsgAndExecAnnotation; /** * 加法请求消息类 * * @author xingchencheng * */ @UserMsgAndExecAnnotation(msgType = MsgType.ADD) public class UserAddRequest extends RequestMsgBase { private double leftNumber; private double RightNumber; public UserAddRequest() { super(MsgType.ADD); } public double getLeftNumber() { return leftNumber; } public void setLeftNumber(double leftNumber) { this.leftNumber = leftNumber; } public double getRightNumber() { return RightNumber; } public void setRightNumber(double rightNumber) { RightNumber = rightNumber; } }
为什么要这样修饰呢?先从服务端的解码(decode)说起,实例代码中,一个请求消息是这样规定的:
0-1字节表示整个消息的长度(单位:字节)
2-3字节代表消息类型,对应annotation的msgType
余下的是消息的json字符串(UTF-8编码)
我们需要根据2-3字节表示的msgType得到对应请求消息类的class对象,用这个class对象来序列化json字符串,得到具体的请求对象。那么怎么根据msgType得到class对象呢?这就是为什么要使用annotation的原因。
在服务端程序启动前,会执行下面的处理:
// msgType->请求、响应类的class对象 private static Map<Short, Class<?>> typeToMsgClassMap; // 根据类型得到对应的消息类的class对象 public static Class<?> getMsgClassByType(short type) { return typeToMsgClassMap.get(type); } /** * 初始化typeToMsgClassMap * 遍历包com.company.game.dispatcher.msg * 取得消息类的class文件 * * @throws ClassNotFoundException * @throws IOException */ public static void initTypeToMsgClassMap() throws ClassNotFoundException, IOException { Map<Short, Class<?>> tmpMap = new HashMap<Short, Class<?>>(); Set<Class<?>> classSet = getClasses("com.company.game.dispatcher.msg"); if (classSet != null) { for (Class<?> clazz : classSet) { if (clazz.isAnnotationPresent(UserMsgAndExecAnnotation.class)) { UserMsgAndExecAnnotation annotation = clazz.getAnnotation(UserMsgAndExecAnnotation.class); tmpMap.put(annotation.msgType(), clazz); } } } typeToMsgClassMap = Collections.unmodifiableMap(tmpMap); }
程序初始化了一个映射,在指定的包找到请求的消息类的class,读取class上的annotation,保存到一个Map中,这样在后续就可以根据这个Map来根据msgType得到class对象了。
再给出解码器的实现:
package com.company.game.dispatcher.codec; import java.util.List; import com.company.game.dispatcher.util.ClassUtil; import com.company.game.dispatcher.util.GsonUtil; import com.google.gson.Gson; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; /** * 解码器 * 客户端和服务端均有使用 * 0-1字节表示整个消息的长度(单位:字节) * 2-3字节代表消息类型,对应annotation * 余下的是消息的json字符串(UTF-8编码) * * @author xingchencheng * */ public class MsgDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> list) throws Exception { if (buf.readableBytes() < 2) { return; } Gson gson = GsonUtil.getGson(); short jsonBytesLength = (short) (buf.readShort() - 2); short type = buf.readShort(); byte[] tmp = new byte[jsonBytesLength]; buf.readBytes(tmp); String json = new String(tmp, "UTF-8"); Class<?> clazz = ClassUtil.getMsgClassByType(type); Object msgObj = gson.fromJson(json, clazz); list.add(msgObj); } }
解码完成后,程序进入到服务端的handler中:
@Override protected void channelRead0(ChannelHandlerContext ctx, Object msgObject) throws Exception { // 分发消息给对应的消息处理器 Dispatcher.submit(ctx.channel(), msgObject); }
Dispatcher代码如下:
package com.company.game.dispatcher; import io.netty.channel.Channel; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.company.game.dispatcher.exec.BusinessLogicExecutorBase; import com.company.game.dispatcher.msg.RequestMsgBase; import com.company.game.dispatcher.util.ClassUtil; /** * 抽象了分发器 * 多线程执行 * 某个消息对象msgObject指定某个业务逻辑对象executor * submit到线程池中 * @author xingchencheng * */ public class Dispatcher { private static final int MAX_THREAD_NUM = 50; private static ExecutorService executorService = Executors.newFixedThreadPool(MAX_THREAD_NUM); public static void submit(Channel channel, Object msgObject) throws InstantiationException, IllegalAccessException { RequestMsgBase msg = (RequestMsgBase) msgObject; Class<?> executorClass = ClassUtil.getExecutorClassByType(msg.getType()); BusinessLogicExecutorBase executor = (BusinessLogicExecutorBase) executorClass.newInstance(); executor.setChannel(channel); executor.setMsgObject(msgObject); executorService.submit(executor); } }
我们看到,在代码中也是根据msgType取得了对应的一个class对象,并new了一个对象出来,交给了线程池进行并发执行,这个对象就是业务逻辑处理器对象,它实现了Runnable接口,进行一些业务逻辑上的处理。根据msgType取得class对象的映射过程跟前面提到的映射原理是相同的,可以参见代码。贴出业务逻辑处理器对象的代码:
package com.company.game.dispatcher.exec; import com.company.game.dispatcher.annotation.UserMsgAndExecAnnotation; import com.company.game.dispatcher.msg.MsgType; import com.company.game.dispatcher.msg.UserAddRequest; import com.company.game.dispatcher.msg.UserAddResponse; /** * 具体的业务逻辑 * 实现加法 * * @author xingchencheng * */ @UserMsgAndExecAnnotation(msgType = MsgType.ADD) public class UserAddExecutor extends BusinessLogicExecutorBase { public void run() { UserAddResponse response = new UserAddResponse(); if (this.msgObject instanceof UserAddRequest) { UserAddRequest request = (UserAddRequest) this.msgObject; double result = request.getLeftNumber() + request.getRightNumber(); response.setResult(result); response.setSuccess(true); } else { response.setSuccess(false); } System.out.println("服务端处理结果:" + response.getResult()); channel.writeAndFlush(response); } }
注意,它也得用annotation来修饰。
思路大致就是如此,如果要增加一个请求,在示例代码中,需要做3件事情:
- 在MsgType添加一个类型
- 添加请求相应消息类
- 添加业务逻辑处理器类
而不需要修改消息分发的代码。
示例项目的说明和使用
- 工程可在文章开头的github中或附件得到
- 项目使用Maven3构建,构建的结果是一个jar,可通过命令行分别运行服务端和客户端
- 仅仅是个示例,并没有过多的考虑异常处理,性能等方面
- 没有单元测试和其他测试
提供了命令行工具,帮助信息如下:
服务端启动命令:
客户端启动命令:
结语
本文的描述未必清晰,更好的方法是直接看代码。
关于消息分发想必还有更好的方法,这里只是抛砖引玉,希望路过的各位能提供更好的方法一起参考。
相关推荐
使用 Netty 4 实现多线程的消息分发,可以有效地应对高并发场景,提高服务的响应速度和吞吐量。通过对 EventLoopGroup、EventLoop、ChannelHandler 和 ChannelPipeline 的理解和运用,我们可以构建出强大而灵活的...
标题中的“NETTY+ACTIVITYMQ实现高用户并发”表明我们正在探讨如何使用Netty和ActiveMQ结合来处理大量并发用户请求。Netty是一个高性能、异步事件驱动的网络应用程序框架,而ActiveMQ是Apache出品的一款开源消息...
在实际开发中,你可以根据项目需求进一步优化和扩展,比如添加身份验证、消息分发策略等。 总结来说,Spring、Netty和WebSocket的结合为构建实时、双向通信的应用提供了强大的基础。通过深入理解这三个组件的工作...
项目包括: 1.基于netty绑定端口监听,对于mqtt消息和http请求消息分别绑定不同的...4.HttpServerHandler类实现对http消息的自定义处理。该handle类包含以下内容: 对一次完整的http请求进行解码然后处理该请求。
同时,Netty的NioEventLoopGroup负责线程管理和事件分发,确保了服务端的高效运行。 "DataClient"则是客户端组件,负责向数据源发起请求并收集信息。Netty的ClientBootstrap类提供了创建客户端连接的便利,配合...
mqttserver,基于netty 4.1.1,可解码http、mqtt协议请求。 项目包括: 1.基于netty绑定端口监听,对于mqtt消息和http请求消息分别绑定不同的...4.HttpServerHandler类实现对http消息的自定义处理。该handle类包含以下
Netty 结合RabbitMQ进行消息扩散,实现集群消息分发,Netty 结合Zookeeper实现分布式锁控制同一节点资源的并发读写 2、Netty 可以按需单机启动或者多节点集群化启动,集群节点结合Zookeeper实现注册与发现,根据...
4. **消息分发**:接收到的消息需要被正确地分发到应用的业务逻辑中。在Netty中,这通常通过`ChannelHandlerContext`的`channelRead`方法实现,将接收到的数据转发给相应的处理器。 5. **安全性**:在Android上进行...
12. **WebSocket支持**:Netty提供了WebSocket协议的完整实现,包括握手和消息编码解码,使开发WebSocket应用变得简单。 《Netty权威指南(第二版)》这本书将详细介绍这些知识点,并通过实例讲解如何在实践中应用...
7. **Netty源码分析**:通过阅读源码,我们可以深入理解Netty如何实现高效的事件分发、I/O操作、内存管理等核心功能。例如,了解如何通过Selector选择就绪的事件,以及如何在ChannelPipeline中传播事件。 8. **...
Netty推送系统是一种基于Java开发的高性能网络通信框架——Netty实现的实时消息推送平台。在点对点的通信场景中,它能够高效地将数据从服务器推送到客户端,广泛应用于实时聊天、通知推送、游戏同步等场景。下面将...
文件"JDianer.zip"和"IMJavaSparrow.zip"可能是这个项目的组成部分,其中"JDianer"可能是一个具体的业务模块或服务,而"IMJavaSparrow"可能是一个即时通讯(IM)相关的组件,利用Netty实现高效的消息传递。...
Netty 使用了基于事件驱动的Reactor模式,通过EventLoopGroup分发事件到各个EventLoop,每个EventLoop负责处理自己的任务,避免了线程上下文切换的开销。事件模型包括连接建立、数据读取、数据写入、连接关闭等。 ...
遗嘱消息, 保留消息及消息分发重试 心跳机制 MQTT连接认证(可选择是否开启) SSL方式连接(可选择是否开启) 主题过滤(支持单主题订阅如 test_topic /mqtt/test --不能以/结尾, 通配符订阅 # /mqtt/# --以#结尾) ...
在使用Netty实现RPC的过程中,有以下几个关键知识点: 1. **Netty的基础架构**:Netty的核心是它的`EventLoopGroup`和`Channel`概念。`EventLoopGroup`是一组线程,负责处理I/O事件,`Channel`是连接的抽象,用于...
1. **Netty基本架构**: - Netty的核心是其`Reactor`模型,它基于非阻塞I/O(NIO)来提高性能。Reactor设计模式包含多个线程,负责处理连接、读写事件,确保并发处理能力。 - `Channel`和`EventLoop`是Netty中的...
《Netty权威指南 第2版》不仅讲解了Netty的基本概念和使用方法,还深入剖析了Netty的内部原理,是Java开发者学习和掌握Netty不可或缺的参考资料。通过阅读此书,你可以全面了解并熟练运用Netty框架,提升你的网络...
它通过一个或多个线程来处理I/O事件,将事件分发到对应的处理器,从而实现非阻塞I/O,提高了系统效率。在淘宝的业务场景中,面对海量用户请求,Netty的这种特性显得尤为重要。 Netty的ChannelHandler接口是其核心...
4. **协议(Protocol)**:Netty RPC 需要定义一套通信协议,包括请求消息和响应消息的格式,通常包括请求ID、方法名、参数等信息。协议的设计应考虑到高效性和扩展性。 5. **服务注册与发现(Service Registry & ...
4. 消息推送系统:在推送服务中,Netty能够快速、稳定地将消息推送给大量客户端。 5. 其他应用场景还包括流媒体服务、游戏服务器、物联网(IoT)设备通信等。 Netty的核心组件包括: 1. Channel:Netty中的Channel...