Nepxion-Thunder(QQ 群 471164539)发布在https://github.com/Nepxion/
1. JMS消息队列模型
主要是用于ActiveMQ & Tibco等基于JMS标准的MQ。结构图如下:
点击查看大图
-
工作原理
1)Spring扫描线程扫描到一个Service节点后,就会去新建一个MQContext对象,放入ServiceContextMap缓存中(如果存在则不新建,去缓存中拿),进而创建queueAsyncResponseDestination,queueSyncResponseDestination,topicAsyncResponseDestination,放入相关DestinationMap缓存中(如果存在则不新建,去缓存中拿),然后根据这些Destination建立MessagerListener消息监听对象,实现对Response Queue/Topic的监听。如果有消息被监听到,从消息里获取ProtocolRequest对象,执行ServerHandlerAdapter.handle(request, response)。机制跟Netty等一样,不阐述了。
2)Spring扫描线程扫描到一个Reference节点后,就会去新建一个MQContext对象,放入ReferenceContextMap缓存中(如果存在则不新建,去缓存中拿),进而创建 queueAsyncRequestDestination,queueSyncRequestDestination,放入相关DestinationMap缓存中(如果存在则不新建,去缓存中拿),然后根据这些Destination建立MessagerListener消息监听对象,实现对Request Queue的监听。如果有消息被监听到,从消息里获取ProtocolResponse对象,执行ClientHandlerAdapter.handle(response)。机制跟Netty等一样,不阐述了。
3)当调用端通过Spring Aop进行同步/异步远程调用时,先从缓存获取相关的DestinationMap中获取Destination对象,把ProtocolRequest请求通过MQProducer.produce发送到MQ服务器的Response Queue里,服务端监听到消息后处理,把结果封装到ProtocolResponse通过MQProducer.produce发送到Request Queue,调用端监听到消息后处理,如果是异步调用Callback方式完成调用,如果是同步通过CyclicBarrier的线程等待返回值,最后完成调用
4)当调用端通过Spring Aop进行广播远程调用时,把ProtocolRequest发布到Response Topic,服务端订阅监听该ProtocolRequest后,进行处理,不返回结果
5)Queue和Topic Destination名称是 destinationType + "-" + group + "-" + application + "-" + interfaze,防止不同类型的应用,重名接口接入到同一个MQ服务器
例如:request-queue-async-MY_GROUP-APP_IOS-com.nepxion.thunder.test.service.UserService
6)为服务/调用的接口指定不同的MQ服务器,前提是配置文件中必须出现两个以上的MQ服务器配置,如果只有一个,可以不指定,默认取配置的那个MQ服务器
7)为MQ指定Connection或Session的缓存方式(SingleConnectionFactory,CachingConnectionFactory,PooledConnectionFactory),这个必须在配置文件中配置
8)为MQ指定两种不同的初始化方式(JNDI和非JNDI)
点击查看大图
-
类结构
1)MQTemplate.java - 消息发送模板类,继承JmsTemplate.java,覆盖doSend方法,把JmsMessage转换为ProtocolMessage,设定DeliveryMode,TimeToLive等属性值
2)MQProducer.java - 消息生产者类,通过线程池发布消息到MQ服务器指定Queue或者Topic,同时指定消息消费者消费完消息后,发送响应消息的指定Queue。调用端发送请求消息到Response Queue/Topic,实现对Request Queue监听,服务端发送响应消息到Request Queue,实现对Response Queue/Topic的监听
3)MQConsumer - 消息消费类,实现SessionAwareMessageListener.java
4)MQHierachy.java - MQ层次类,实现创建不同类型的ConnectionFactory,MQTemplate,MQProducer等核心对象,以及DefaultMessageListenerContainer的优化等,包括消息过滤,消费并发数,接收超时等
5)MQConnectionHierachy.java - 继承MQHierachy.java,实现非JNDI连接方式的初始化MQ连接
6)MQJndiHierachy.java - 继承MQHierachy.java,实现JNDI连接方式的初始化MQ连接
7)MQExecutorDelegate.java - MQExecutor的代理接口
8)MQServerExecutor.java - 继承AbstractServerExecutor.java,实现MQExecutorDelegate.java,初始化MQContext,初始化跟Response Queue/Topic相关的事情
9)MQServerHandler.java - 继承MQConsumer.java的onMessage方法,实现服务端对消息监听
10)MQClientExecutor.java - 继承AbstractClientExecutor.java,实现MQExecutorDelegate.java,初始化MQContext,初始化跟Request Queue和Response Queue/Topic相关的事情
11)MQClientHandler.java - 继承MQConsumer.java的onMessage方法,实现调用端对消息监听
12)MQClientInterceptor.java - 继承AbstractClientInterceptor.java,实现如下调用方式:
异步调用:从缓存获取要调用的queueResponseDestination和queueRequestDestination,通过MQProducer.produce方法发送异步请求ProtocolRequest。通过监听获取异步返回
同步调用:同异步调用,采用返回值返回
广播调用:从缓存获取要调用的topicResponseDestination和topicRequestDestination,通过MQProducer.produce方法发送广播请求ProtocolRequest。不返回
重复调用:不支持
13)MQContext.java - 实现初始化连接(MQHierachy),初始化消息队列(Queue/Topic),初始化消息监听(MQServerHandler/MQClientHandler),初始化重连机制
14)MQCacheContainer.java - 缓存容器
15)MQQueueDestinationContainer.java - Queue Destination缓存容器
16)MQTopicDestinationContainer.java - Topic Destination缓存容器
17)MQDestinationUtil.java - Destination工具类
18)MQSelectorUtil.java - Selector工具类
19)MQBytesMessageConverter - 二进制和Java序列化对象互转的适配类
2. 非JMS消息队列模型
主要是用于Kafka。结构图如下:
点击查看大图
使用Kafka作为点对点通信,有响应返回的场景(同步返回值,异步带Callback返回),特别要注意,所对应的Topic下分区数一定要大于等于调用端数目
-
工作原理
1)Spring扫描线程扫描到一个Service节点后,就会去新建一个KafkaMQContext对象,放入ServiceContextMap缓存中(如果存在则不新建,去缓存中拿),进而创建 responseQueueDestinationEntity,responseTopicDestinationEntity,requestQueueDestinationEntity,然后根据这些DestinationEntity建立KafkaMQConsumer消息监听对象,实现对Response Queue/Topic(名称)的监听。如果有消息被监听到,从消息里获取ProtocolRequest对象,执行ServerHandlerAdapter.handle(request, response)。机制跟Netty等一样,不阐述了。
2)Spring扫描线程扫描到一个Reference节点后,就会去新建一个KafkaMQContext对象,放入ReferenceContextMap缓存中(如果存在则不新建,去缓存中拿),进而创建 requestQueueDestinationEntity,然后根据这些DestinationEntity建立KafkaMQConsumer消息监听对象,实现对Request Queue(名称)的监听。如果有消息被监听到,从消息里获取ProtocolResponse对象,执行ClientHandlerAdapter.handle(response)。机制跟Netty等一样,不阐述了。
3)当调用端通过Spring Aop进行同步/异步远程调用时,先通过相关参数获得Topic名称,把ProtocolRequest请求通过KafkaMQProducer.produce发送到Kafka服务器的Response Queue里,服务端监听到消息后处理,把结果封装到ProtocolResponse通过KafkaMQProducer.produce发送到Request Queue,调用端监听到消息后处理,如果是异步调用Callback方式完成调用,如果是同步通过CyclicBarrier的线程等待返回值,最后完成调用
4)当调用端通过Spring Aop进行广播远程调用时,把ProtocolRequest发布到Response Topic,服务端订阅监听该ProtocolRequest后,进行处理,不返回结果
5)Queue和Topic Destination名称是 destinationType + "-" + group + "-" + application + "-" + interfaze,防止不同类型的应用,重名接口接入到同一个MQ服务器
例如:request-queue-async-MY_GROUP-APP_IOS-com.nepxion.thunder.test.service.UserService
6)为服务/调用的接口指定不同的Kafka服务器,前提是配置文件中必须出现两个以上的Kafka服务器配置,如果只有一个,可以不指定,默认取配置的那个Kafka服务器
7)同步/异步调用和广播调用的区别,是前者消费者必须在相同Group,后者消费者必须在不同Group
8)框架使用Kafka的High Level API,对于消息在分区上的分配,采用DefaultPartitioner的分区策略,见官方解释:
/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a partition based on a hash of the key
* <li>If no partition or key is present choose a partition in a round-robin fashion
*/
对于Response Queue和Topic(名称),Partition的分区策略通过DefaultPartitioner第三种策略,即不指定Partition和Key,让Kafka做权重轮询放入不同的分区
对于Request Queue(名称),Partition的分区策略通过DefaultPartitioner第二种策略,即不指定Partition,但指定Key(IP:Port),通过Key的Hash值来决定放入哪个分区。这就意味着,一个服务调用端即享用一个独立的分区,这就为端到端的实现创造了条件
9)框架使用Google Guava的EventBus,使框架内部事件异步发送到外部。无论调用端和提供端,在每次Produce和Consume成功或者失败,都会发送相关的Event。如果是失败事件,为Retry创造了条件。当然可以通过配置文件来关闭事件发送
点击查看大图
-
类结构
1)KafkaMQProducer.java - 消息生产者类,通过线程池发布消息到Kafka服务器指定Queue或者Topic,同时指定消息消费者消费完消息后,发送响应消息的指定Queue。调用端发送请求消息到Response Queue/Topic,实现对Request Queue监听,服务端发送响应消息到Request Queue,实现对Response Queue/Topic的监听
2)KafkaMQConsumer.java - 消息消费类
3)KafkaMQExecutorDelegate.java - KafkaExecutor的代理接口
4)KafkaMQServerExecutor.java - 继承AbstractServerExecutor.java,实现KafkaMQExecutorDelegate.java,初始化KafkaContext,初始化跟Request Queue和Response Queue/Topic相关的事情
5)KafkaMQServerHandler.java - 继承KafkaMQConsumer.java,实现服务端对消息监听(Poll)
6)KafkaMQClientExecutor.java - 继承AbstractClientExecutor.java,实现KafkaMQExecutorDelegate.java,初始化KafkaContext,初始化跟Response Queue相关的事情
7)KafkaMQClientHandler.java - 继承KafkaMQConsumer.java,实现客户端端对消息监听(Poll)
8)KafkaMQClientInterceptor.java - 继承AbstractClientInterceptor.java,实现如下调用方式:
异步调用:通过KafkaMQProducer.produce方法发送异步请求ProtocolRequest。通过监听获取异步返回
同步调用:同异步调用,采用返回值返回
广播调用:通过KafkaMQProducer.produce方法发送广播请求ProtocolRequest。不返回
重复调用:支持,通过Google Guava的EventBus实现
9)KafkaMQContext.java - 实现初始化消息队列(Queue/Topic),初始化消息监听(KafkaMQServerHandler/KafkaMQClientHandler)
10)KafkaMQCacheContainer.java - 缓存容器
11)KafkaMQDestinationUtil.java - Destination工具类
相关推荐
Nepxion Thunder是一款基于Netty + Hessian + Kafka + ActiveMQ + Tibco + Zookeeper(Curator Framework) + Redis + FST + Spring + Spring Web MVC + Spring Boot + Docker分布式RPC调用框架。架构思想主要是来自...
企业oa管理系统,系统包含两种角色:管理员、用户,主要功能如下。 管理员: 1. 个人中心:管理管理员个人信息和进行相关操作。 2. 用户管理:管理用户的基本信息和权限设置。 3. 公告信息管理:发布和管理企业内部公告信息。 4. 客户关系管理:管理客户信息、跟进记录和销售机会等。 5. 通讯录管理:管理企业内部员工通讯录信息。 6. 日程安排管理:管理个人和团队的日程安排和会议安排。 7. 车辆信息管理:管理企业车辆的基本信息和使用情况。 8. 文件信息管理:管理企业内部文件的上传、下载和共享。 9. 工作日志管理:记录和管理员工的工作日志和任务完成情况。 10. 上班考勤管理:管理员工的上班打卡和考勤记录。 11. 工资信息管理:管理员工的工资信息和薪资发放。 用户: 1. 个人中心:管理个人信息和进行相关操作。 2. 公告信息管理:查看和阅读企业内部公告信息。 3. 客户关系管理:查看和管理与自己相关的客户信息和销售机会。 4. 通讯录管理:查找和查看企业内部员工通讯录信息。 5. 日程安排管理:查看和管理个人的日程安排和会议安排。 6. 车辆信息管理:查看和申请企业车辆的使
AutoX.js 是一个基于 JavaScript 的自动化工具,主要用于安卓设备的自动化操作。它是 Auto.js 的一个分支,提供了更多的功能和更高的兼容性。以下是关于 AutoX.js 的一些基本介绍和使用方法。
1.开篇:用正确的方式学习 TypeScript 2.工欲善其事:打造最舒适的 TypeScript 开发环境 3.进入类型的世界:理解原始类型与对象类型 4.掌握字面量类型与枚举,让你的类型再精确一些 5.函数与 Class 中的类型:详解函数重载与面向对象 6.探秘内置类型:any、unknown、never 与类型断言 7.类型编程好帮手:TypeScript 类型工具(上) 8.类型编程好帮手:TypeScript 类型工具(下) 9.类型编程基石:TypeScript 中无处不在的泛型 10.结构化类型系统:类型兼容性判断的幕后 11.类型系统层级:从 Top Type 到 Bottom Type 12.类型里的逻辑运算:条件类型与 infer 13.内置工具类型基础:别再妖魔化工具类型了! 14.反方向类型推导:用好上下文相关类型 15.数类型:协变与逆变的比较 16.了解类型编程与类型体操的意义,找到平衡点 17.内置工具类型进阶:类型编程进阶 18.基础类型新成员:模板字符串类型入门 19.类型编程新范式:模板字符串工具类型进阶 20.工程层面的类型能力:类型声明、类型
本文档是3GPP组织发布的关于5G系统中统一数据仓库服务(UDR)的技术规范,旨在为第三阶段(Relase 19)提供协议和数据模型的详细定义。该规范主要针对Nudr接口,规定了通过此接口NF服务消费 限时福利!送硅基流动100w deepseek token,支持api调用:https://cloud.siliconflow.cn/i/VkH0G5VX
西门子G120 Vf和矢量控制的区别及相关参数设置
【宝城期货-2025研报】短期降息预期落空,国债期货震荡筑底.pdf
基于yolov5识别算法实现的DNF自动脚本源码.zip
HR人力资源管理系统PersonManage
【宝城期货-2025研报】有色日报:有色午后走强,铜增仓上行.pdf
商城程序电脑端手机端微信端三合一板
个人理财系统,系统包含两种角色:管理员、用户,系统分为前台和后台两大模块,主要功能如下。 【管理员】: 个人中心:管理员可以在个人中心查看和修改自己的个人信息。 用户管理:管理员可以对用户进行管理,包括添加新用户、编辑用户信息、删除用户以及查看用户列表。 账单类型管理:管理员可以管理账单类型,包括添加新的账单类型、编辑账单类型信息、删除账单类型以及查看账单类型列表。 【用户】: 个人中心:用户可以在个人中心查看和修改自己的个人信息。 收入账单管理:用户可以添加和管理收入账单,包括记录收入的金额、时间、来源等信息,查看、编辑和删除已记录的收入账单。 支出账单管理:用户可以添加和管理支出账单,包括记录支出的金额、时间、用途等信息,查看、编辑和删除已记录的支出账单。 理财计划管理:用户可以制定和管理理财计划,包括设定目标、计划时间、计划金额等信息,查看、编辑和删除已设定的理财计划。 统计分析管理:用户可以查看自己的收入和支出情况的统计分析结果。 备忘录管理:用户可以添加、编辑和删除备忘录。 二、项目技术 编程语言:Java 数据库:MySQL 项目管理工具:Maven 前端技术Vue
面试题关注有更多资源,私信免费的
【宝城期货-2025研报】宝城期货橡胶早报.pdf
以面向对象思想编写单片机程序
## 介绍 平均风速是描述一个地区风力强弱的重要气象指标,是指空间某一点,在给定的时段内各次观测的风速之和除以观测次数,其广泛应用于气候研究、农业、风能开发等领域 本分享数据包含中国各区县的平均风速数据,涵盖了1951年至2024年之间共21w+条数据,数据按年度进行整理 ## 一、中国各区县平均风速数据的介绍 数据年份:1951-2024年 数据范围:区县 数据格式:面板数据 样本数量:21万+ ## 二、数据指标
UPX_4.2.2(压缩exe文件).rar
基于Python Tkinter的智能书库管理系统代码
Tutorial Design of Smart Sensors-ISSCC2010.rar
steam_appid