`
邢邢色色
  • 浏览: 229929 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

使用Netty4实现基本的消息分发

阅读更多

示例工程代码

 

可从附件下载

 

具体的说明和用法在后面介绍

 

需求与目的

 

一个游戏服务端需要处理各种业务逻辑,每一种业务逻辑都对应着一个请求消息和一个响应消息。那么服务端需要把这些不同的消息自动分发到对应的业务逻辑中处理。

 

最简单的处理方式就是根据请求消息中的type字段,使用switch case来进行分别处理,但这种方式随着消息的增多,显现了一些坏味道:长长的一大坨不太好看;如果要添加新的消息、新的逻辑,或者去掉新的消息、新的逻辑,在代码上不但要修改这些消息和逻辑,还不得不修改这长长的一坨swtich case,这样的修改显得很多余。

 

所以我们的目的就是把消息分发这块的代码自动化,在增加、修改、删除消息和逻辑的时候不需要再对消息分发的代码再做修改,从而使得修改的代码最小化。

 

实现原理

 

在实现中,使用了注解(annotation)

package com.company.game.dispatcher.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * 修饰消息类和业务逻辑执行类
 * msgType指定对应的类型,从1开始计数
 * @author xingchencheng
 *
 */

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface UserMsgAndExecAnnotation {
	short msgType();
}

 

唯一的字段msgType代表了消息类型,这是客户端与服务端的约定,这里我们从1开始计数。

 

当我们要增加一个加法消息时,就使用这个注解来修饰我们的请求消息类:

package com.company.game.dispatcher.msg;

import com.company.game.dispatcher.annotation.UserMsgAndExecAnnotation;

/**
 * 加法请求消息类
 * 
 * @author xingchencheng
 *
 */

@UserMsgAndExecAnnotation(msgType = MsgType.ADD)
public class UserAddRequest extends RequestMsgBase {

	private double leftNumber;
	private double RightNumber;
	
	public UserAddRequest() {
		super(MsgType.ADD);
	}

	public double getLeftNumber() {
		return leftNumber;
	}

	public void setLeftNumber(double leftNumber) {
		this.leftNumber = leftNumber;
	}

	public double getRightNumber() {
		return RightNumber;
	}

	public void setRightNumber(double rightNumber) {
		RightNumber = rightNumber;
	}

}

 

为什么要这样修饰呢?先从服务端的解码(decode)说起,实例代码中,一个请求消息是这样规定的:

 

 0-1字节表示整个消息的长度(单位:字节)

 2-3字节代表消息类型,对应annotation的msgType

 余下的是消息的json字符串(UTF-8编码)

 

我们需要根据2-3字节表示的msgType得到对应请求消息类的class对象,用这个class对象来序列化json字符串,得到具体的请求对象。那么怎么根据msgType得到class对象呢?这就是为什么要使用annotation的原因。

 

在服务端程序启动前,会执行下面的处理:

 

 // msgType->请求、响应类的class对象
	private static Map<Short, Class<?>> typeToMsgClassMap;
	
	// 根据类型得到对应的消息类的class对象
	public static Class<?> getMsgClassByType(short type) {
		return typeToMsgClassMap.get(type);
	}
	
	/**
	 * 初始化typeToMsgClassMap
	 * 遍历包com.company.game.dispatcher.msg
	 * 取得消息类的class文件
	 * 
	 * @throws ClassNotFoundException
	 * @throws IOException
	 */
	public static void initTypeToMsgClassMap() 
			throws ClassNotFoundException, IOException {
		
		Map<Short, Class<?>> tmpMap = new HashMap<Short, Class<?>>();
		
		Set<Class<?>> classSet = getClasses("com.company.game.dispatcher.msg");
		if (classSet != null) {
			for (Class<?> clazz : classSet) {
				if (clazz.isAnnotationPresent(UserMsgAndExecAnnotation.class)) {
					UserMsgAndExecAnnotation annotation = clazz.getAnnotation(UserMsgAndExecAnnotation.class);
					tmpMap.put(annotation.msgType(), clazz);
				}
			}
		}
		
		typeToMsgClassMap = Collections.unmodifiableMap(tmpMap);
	}

 

程序初始化了一个映射,在指定的包找到请求的消息类的class,读取class上的annotation,保存到一个Map中,这样在后续就可以根据这个Map来根据msgType得到class对象了。

 

再给出解码器的实现:

 

package com.company.game.dispatcher.codec;

import java.util.List;

import com.company.game.dispatcher.util.ClassUtil;
import com.company.game.dispatcher.util.GsonUtil;
import com.google.gson.Gson;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

/**
 * 解码器
 * 客户端和服务端均有使用
 * 0-1字节表示整个消息的长度(单位:字节)
 * 2-3字节代表消息类型,对应annotation
 * 余下的是消息的json字符串(UTF-8编码)
 * 
 * @author xingchencheng
 *
 */

public class MsgDecoder extends ByteToMessageDecoder {

	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf buf,
			List<Object> list) throws Exception {
		
		if (buf.readableBytes() < 2) {
			return;
		}
		
		Gson gson = GsonUtil.getGson();
		short jsonBytesLength = (short) (buf.readShort() - 2);
		short type = buf.readShort();
		
		byte[] tmp = new byte[jsonBytesLength];
		buf.readBytes(tmp);
		String json = new String(tmp, "UTF-8");
		
		Class<?> clazz = ClassUtil.getMsgClassByType(type);
		Object msgObj = gson.fromJson(json, clazz);
		list.add(msgObj);
	}
}

 

解码完成后,程序进入到服务端的handler中:

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, Object msgObject)
			throws Exception {
		
		// 分发消息给对应的消息处理器
		Dispatcher.submit(ctx.channel(), msgObject);
	}

 

Dispatcher代码如下:

package com.company.game.dispatcher;

import io.netty.channel.Channel;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.company.game.dispatcher.exec.BusinessLogicExecutorBase;
import com.company.game.dispatcher.msg.RequestMsgBase;
import com.company.game.dispatcher.util.ClassUtil;

/**
 * 抽象了分发器
 * 多线程执行
 * 某个消息对象msgObject指定某个业务逻辑对象executor
 * submit到线程池中
 * @author xingchencheng
 *
 */

public class Dispatcher {
	
	private static final int MAX_THREAD_NUM = 50;
	
	private static ExecutorService executorService =
			Executors.newFixedThreadPool(MAX_THREAD_NUM);
	
	public static void submit(Channel channel, Object msgObject) 
			throws InstantiationException, IllegalAccessException {
		
		RequestMsgBase msg = (RequestMsgBase) msgObject;
		Class<?> executorClass = ClassUtil.getExecutorClassByType(msg.getType());
		BusinessLogicExecutorBase executor = 
				(BusinessLogicExecutorBase) executorClass.newInstance();
		executor.setChannel(channel);
		executor.setMsgObject(msgObject);
		
		executorService.submit(executor);
	}
}

 

我们看到,在代码中也是根据msgType取得了对应的一个class对象,并new了一个对象出来,交给了线程池进行并发执行,这个对象就是业务逻辑处理器对象,它实现了Runnable接口,进行一些业务逻辑上的处理。根据msgType取得class对象的映射过程跟前面提到的映射原理是相同的,可以参见代码。贴出业务逻辑处理器对象的代码:

package com.company.game.dispatcher.exec;

import com.company.game.dispatcher.annotation.UserMsgAndExecAnnotation;
import com.company.game.dispatcher.msg.MsgType;
import com.company.game.dispatcher.msg.UserAddRequest;
import com.company.game.dispatcher.msg.UserAddResponse;

/**
 * 具体的业务逻辑
 * 实现加法
 * 
 * @author xingchencheng
 *
 */

@UserMsgAndExecAnnotation(msgType = MsgType.ADD)
public class UserAddExecutor extends BusinessLogicExecutorBase {

	public void run() {
		UserAddResponse response = new UserAddResponse();
		
		if (this.msgObject instanceof UserAddRequest) {
			UserAddRequest request = (UserAddRequest) this.msgObject;
			double result = request.getLeftNumber() + request.getRightNumber();
			response.setResult(result);
			response.setSuccess(true);
		} else {
			response.setSuccess(false);
		}
		
		System.out.println("服务端处理结果:" + response.getResult());
		channel.writeAndFlush(response);
	}

}

注意,它也得用annotation来修饰。

 

思路大致就是如此,如果要增加一个请求,在示例代码中,需要做3件事情:

  • 在MsgType添加一个类型
  • 添加请求相应消息类
  • 添加业务逻辑处理器类

而不需要修改消息分发的代码。

 

示例项目的说明和使用

 

  • 工程可在文章开头的github中或附件得到
  • 项目使用Maven3构建,构建的结果是一个jar,可通过命令行分别运行服务端和客户端
  • 仅仅是个示例,并没有过多的考虑异常处理,性能等方面
  • 没有单元测试和其他测试

提供了命令行工具,帮助信息如下:


 

服务端启动命令:


 

客户端启动命令:


 

结语

 

本文的描述未必清晰,更好的方法是直接看代码。

关于消息分发想必还有更好的方法,这里只是抛砖引玉,希望路过的各位能提供更好的方法一起参考。

  • 大小: 16.7 KB
  • 大小: 34.8 KB
  • 大小: 27.5 KB
分享到:
评论

相关推荐

    使用Netty4实现多线程的消息分发

    使用 Netty 4 实现多线程的消息分发,可以有效地应对高并发场景,提高服务的响应速度和吞吐量。通过对 EventLoopGroup、EventLoop、ChannelHandler 和 ChannelPipeline 的理解和运用,我们可以构建出强大而灵活的...

    Android使用Netty网络框架实践(客户端、服务端)

    首先,我们需要理解Netty的基本概念。Netty的核心是其EventLoopGroup(事件循环组),它负责处理I/O事件,并将它们分发到相应的ChannelHandlerContext(通道上下文)中。Channel(通道)是连接到特定I/O资源的对象,...

    基于Netty5实现的弱消息系统消息中间件-MessageCacheCenter.zip

    在这个“基于Netty5实现的弱消息系统消息中间件-MessageCacheCenter”项目中,开发者构建了一个利用Netty5进行通信的消息缓存中心。下面将详细介绍这个中间件的关键知识点。 1. **Netty5核心概念** - **NIO(非...

    NETTY+ACTIVITYMQ实现高用户并发

    标题中的“NETTY+ACTIVITYMQ实现高用户并发”表明我们正在探讨如何使用Netty和ActiveMQ结合来处理大量并发用户请求。Netty是一个高性能、异步事件驱动的网络应用程序框架,而ActiveMQ是Apache出品的一款开源消息...

    springboot+netty 实现简单的一对一聊天

    在本文中,我们将深入探讨如何使用Spring Boot和Netty实现一个简单的一对一聊天应用程序。Spring Boot是Java领域中广泛使用的微服务框架,它简化了配置并提供了快速启动的应用程序开发体验。Netty则是一个高性能、...

    Java + Netty 实现的高并发高可用MQTT服务broker,轻松支持10万并发(有群友实现了130万在线).zip

    遗嘱消息, 保留消息及消息分发重试 心跳机制 MQTT连接认证(可选择是否开启) SSL方式连接(可选择是否开启) 主题过滤(支持单主题订阅如 test_topic /mqtt/test --不能以/结尾, 通配符订阅 # /mqtt/# --以#结尾) ...

    基于netty实现mqtt协议 服务器端开发,可解码http、mqtt协议请求

    项目包括: 1.基于netty绑定端口监听,对于mqtt消息和http请求消息分别绑定不同的...4.HttpServerHandler类实现对http消息的自定义处理。该handle类包含以下内容: 对一次完整的http请求进行解码然后处理该请求。

    netty-netty-4.1.19.Final.zip_netty_netty学习_rocketmq

    Producer通过Netty发送消息到Broker,Broker再将消息分发到相应的Consumer。Netty的异步特性使得RocketMQ能处理高并发场景,而其高效的内存管理机制降低了系统的资源消耗。 在实际学习过程中,可以先从阅读Netty的...

    netty应用于物联网或者即时通讯项目

    Netty 的可扩展性允许我们添加更多的处理器,或者通过负载均衡策略分发连接。同时,利用 SpringBoot 的 Actuator 模块可以对服务器性能进行监控和管理。 **8. 异常处理与日志记录** 良好的异常处理和日志记录对于...

    netty实现微信聊天.zip

    在本文中,我们将深入探讨Netty的基本原理和如何利用它来实现微信聊天功能。 首先,理解Netty的核心概念至关重要。Netty基于Java NIO(非阻塞I/O)构建,提供了一种更高级别的API,简化了网络编程。它的关键组件...

    基于Netty的Java数据采集软件

    同时,Netty的NioEventLoopGroup负责线程管理和事件分发,确保了服务端的高效运行。 "DataClient"则是客户端组件,负责向数据源发起请求并收集信息。Netty的ClientBootstrap类提供了创建客户端连接的便利,配合...

    Spring+Netty+WebSocket实例

    在实际开发中,你可以根据项目需求进一步优化和扩展,比如添加身份验证、消息分发策略等。 总结来说,Spring、Netty和WebSocket的结合为构建实时、双向通信的应用提供了强大的基础。通过深入理解这三个组件的工作...

    使用netty+spring搭建游戏框架

    - **EventLoop**:Netty的核心组件,负责处理I/O事件,将它们分发到相应的ChannelHandler。 - **ChannelHandler**:处理网络事件,如连接建立、数据读写等。 - **ByteBuf**:Netty自定义的缓冲区,比Java的...

    Netty使用心得,Netty实战

    5. **线程模型**:Netty 使用 NIO 的 EventLoopGroup 和 ChannelHandler 模型,将事件分发到不同的线程,确保了线程安全和高效执行。 6. **易于使用和扩展**:Netty 的 API 设计简洁,方便开发者理解和使用,同时...

    通过Netty实现MQTT服务端Demo

    "通过Netty实现MQTT服务端Demo" 这个标题指出我们要讨论的是一个使用Netty框架构建的MQTT服务器的示例应用。Netty是Java领域的一个高性能、异步事件驱动的网络应用程序框架,常用于创建服务器和客户端的网络通信。而...

    mqttserver:基于netty实现mqtt协议 服务器端开发

    mqttserver,基于netty 4.1.1,可解码http、mqtt协议请求。 项目包括: 1.基于netty绑定端口监听,对于mqtt消息和http请求消息分别绑定不同的...4.HttpServerHandler类实现对http消息的自定义处理。该handle类包含以下

    springboot+netty+websocket+redis

    为了确保系统的高可用性,可以考虑使用负载均衡器将请求分发到多个WebSocket服务器节点。同时,通过水平扩展Redis实例,可以处理更多的并发连接和数据量。另外,通过微服务架构,将聊天功能拆分为独立的服务模块,...

    Netty 入门与实战

    文件"16实战:群聊的发起与通知.html"可能讲述了如何在Netty架构下创建群聊并通知所有成员,涉及到的消息分发和通知机制。 10. **客户端互聊原理与实现** 文件"15实战:客户端互聊原理与实现.html"可能涵盖了...

    Netty-入门Netty编码

    Reactor 模式允许 Netty 高效地处理大量并发连接,通过非阻塞 I/O 和事件分发机制来实现。在 Netty 中,我们定义 ChannelHandler 来处理网络事件,如连接建立、数据读取和写入等。ChannelPipeline 是 ChannelHandler...

    译文-Netty教程

    5. 异步和事件驱动:Netty是一个以异步操作为基础的框架,事件驱动的设计使得它可以在多个线程之间灵活地分发事件,从而实现高吞吐量和低延迟。 Netty能够处理包括TCP/IP和UDP/IP在内的多种传输协议。它不仅支持...

Global site tag (gtag.js) - Google Analytics