`
nepxion
  • 浏览: 38139 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

(七) Nepxion-Thunder分布式RPC集成框架 - 消息队列模型

阅读更多

Nepxion-Thunder(QQ 群 471164539)发布在https://github.com/Nepxion/

 

 

 1. JMS消息队列模型

主要是用于ActiveMQ & Tibco等基于JMS标准的MQ。结构图如下:


 点击查看大图

  • 工作原理
     1)Spring扫描线程扫描到一个Service节点后,就会去新建一个MQContext对象,放入ServiceContextMap缓存中(如果存在则不新建,去缓存中拿),进而创建queueAsyncResponseDestination,queueSyncResponseDestination,topicAsyncResponseDestination,放入相关DestinationMap缓存中(如果存在则不新建,去缓存中拿),然后根据这些Destination建立MessagerListener消息监听对象,实现对Response Queue/Topic的监听。如果有消息被监听到,从消息里获取ProtocolRequest对象,执行ServerHandlerAdapter.handle(request, response)。机制跟Netty等一样,不阐述了。
     2)Spring扫描线程扫描到一个Reference节点后,就会去新建一个MQContext对象,放入ReferenceContextMap缓存中(如果存在则不新建,去缓存中拿),进而创建 queueAsyncRequestDestination,queueSyncRequestDestination,放入相关DestinationMap缓存中(如果存在则不新建,去缓存中拿),然后根据这些Destination建立MessagerListener消息监听对象,实现对Request Queue的监听。如果有消息被监听到,从消息里获取ProtocolResponse对象,执行ClientHandlerAdapter.handle(response)。机制跟Netty等一样,不阐述了。
     3)当调用端通过Spring Aop进行同步/异步远程调用时,先从缓存获取相关的DestinationMap中获取Destination对象,把
    ProtocolRequest请求通过MQProducer.produce发送到MQ服务器的Response Queue里,服务端监听到消息后处理,把结果封装到ProtocolResponse通过MQProducer.produce发送到Request Queue,调用端监听到消息后处理,如果是异步调用Callback方式完成调用,如果是同步通过CyclicBarrier的线程等待返回值,最后完成调用
     4)当调用端通过Spring Aop进行广播远程调用时,把ProtocolRequest发布到Response Topic,服务端订阅监听该ProtocolRequest后,进行处理,不返回结果
     5)Queue和Topic
    Destination名称是 destinationType + "-" + group + "-" + application + "-" + interfaze,防止不同类型的应用,重名接口接入到同一个MQ服务器
          例如:request-queue-async-MY_GROUP-APP_IOS-com.nepxion.thunder.test.service.UserService
     6)
    为服务/调用的接口指定不同的MQ服务器,前提是配置文件中必须出现两个以上的MQ服务器配置,如果只有一个,可以不指定,默认取配置的那个MQ服务器
     7)为MQ指定
    Connection或Session的缓存方式(SingleConnectionFactory,CachingConnectionFactory,PooledConnectionFactory),这个必须在配置文件中配置
     8)为MQ指定
    两种不同的初始化方式(JNDI和非JNDI)

     点击查看大图
  • 类结构

    1)MQTemplate.java - 消息发送模板类,继承JmsTemplate.java,覆盖doSend方法,把JmsMessage转换为ProtocolMessage,设定DeliveryMode,TimeToLive等属性值
    2)MQProducer.java - 消息生产者类,通过线程池发布消息到MQ服务器指定Queue或者Topic,同时指定消息消费者消费完消息后,发送响应消息的指定Queue。调用端发送请求消息到Response Queue/Topic,实现对Request Queue监听,服务端发送响应消息到Request Queue,实现对Response Queue/Topic的监听
    3)MQConsumer - 消息消费类,
    实现SessionAwareMessageListener.java
    4)MQHierachy.java - MQ层次类,实现创建不同类型的ConnectionFactory,MQTemplate,MQProducer等核心对象,以及DefaultMessageListenerContainer的优化等,包括消息过滤,消费并发数,接收超时等
    5)MQConnectionHierachy.java - 继承
    MQHierachy.java,实现非JNDI连接方式的初始化MQ连接
    6)MQJndiHierachy.java -
    继承MQHierachy.java,实现JNDI连接方式的初始化MQ连接
    7)MQExecutorDelegate.java - MQExecutor的代理接口
    8)MQServerExecutor.java - 继承AbstractServerExecutor.java,实现
    MQExecutorDelegate.java,初始化MQContext,初始化跟Response Queue/Topic相关的事情
    9)MQServerHandler.java - 继承
    MQConsumer.java的onMessage方法,实现服务端对消息监听
    10)MQClientExecutor.java -
    继承AbstractClientExecutor.java,实现MQExecutorDelegate.java,初始化MQContext,初始化跟Request Queue和Response Queue/Topic相关的事情
    11)
    MQClientHandler.java - 继承MQConsumer.java的onMessage方法,实现调用端对消息监听
    12)MQClientInterceptor.java - 继承AbstractClientInterceptor.java,实现如下调用方式:
          异步调用:从缓存获取要调用的queueResponseDestination和queueRequestDestination,通过MQProducer.produce方法发送异步请求ProtocolRequest。通过监听获取异步返回
          同步调用:同异步调用,采用返回值返回
          广播调用:
    从缓存获取要调用的topicResponseDestination和topicRequestDestination,通过MQProducer.produce方法发送广播请求ProtocolRequest。不返回
          重复调用:不支持

    13)MQContext.java - 实现初始化连接(
    MQHierachy),初始化消息队列(Queue/Topic),初始化消息监听(MQServerHandler/MQClientHandler),初始化重连机制 
    14)MQCacheContainer.java - 缓存容器
    15)MQQueueDestinationContainer.java - Queue Destination缓存容器
    16)MQTopicDestinationContainer.java - Topic Destination缓存容器
    17)MQDestinationUtil.java - Destination工具类
    18)MQSelectorUtil.java - Selector工具类
    19)MQBytesMessageConverter - 二进制和Java序列化对象互转的适配类

2. 非JMS消息队列模型

主要是用于Kafka。结构图如下:


 点击查看大图

使用Kafka作为点对点通信,有响应返回的场景(同步返回值,异步带Callback返回),特别要注意,所对应的Topic下分区数一定要大于等于调用端数目

 

  • 工作原理
     1)Spring扫描线程扫描到一个Service节点后,就会去新建一个KafkaMQContext对象,放入ServiceContextMap缓存中(如果存在则不新建,去缓存中拿),进而创建 responseQueueDestinationEntity,responseTopicDestinationEntity,requestQueueDestinationEntity,然后根据这些DestinationEntity建立KafkaMQConsumer消息监听对象,实现对Response Queue/Topic(名称)的监听。如果有消息被监听到,从消息里获取ProtocolRequest对象,执行ServerHandlerAdapter.handle(request, response)。机制跟Netty等一样,不阐述了。
     2)Spring扫描线程扫描到一个Reference节点后,就会去新建一个KafkaMQContext对象,放入ReferenceContextMap缓存中(如果存在则不新建,去缓存中拿),进而创建 requestQueueDestinationEntity然后根据这些DestinationEntity建立KafkaMQConsumer消息监听对象,实现对Request Queue(名称)的监听。如果有消息被监听到,从消息里获取ProtocolResponse对象,执行ClientHandlerAdapter.handle(response)。机制跟Netty等一样,不阐述了。
     3)当调用端通过Spring Aop进行同步/异步远程调用时,先通过相关参数获得Topic名称,把
    ProtocolRequest请求通过KafkaMQProducer.produce发送到Kafka服务器的Response Queue里,服务端监听到消息后处理,把结果封装到ProtocolResponse通过KafkaMQProducer.produce发送到Request Queue,调用端监听到消息后处理,如果是异步调用Callback方式完成调用,如果是同步通过CyclicBarrier的线程等待返回值,最后完成调用
     4)当调用端通过Spring Aop进行广播远程调用时,把ProtocolRequest发布到Response Topic,服务端订阅监听该ProtocolRequest后,进行处理,不返回结果
     5)Queue和Topic Destination名称是 destinationType + "-" + group + "-" + application + "-" + interfaze,防止不同类型的应用,重名接口接入到同一个MQ服务器
          例如:request-queue-async-MY_GROUP-APP_IOS-com.nepxion.thunder.test.service.UserService

     6)
    为服务/调用的接口指定不同的Kafka服务器,前提是配置文件中必须出现两个以上的Kafka服务器配置,如果只有一个,可以不指定,默认取配置的那个Kafka服务器
     7)同步/异步调用和广播调用的区别,是前者消费者必须在相同Group,后者消费者必须在不同Group
     8)框架使用Kafka的High Level API,对于消息在分区上的分配,采用
    DefaultPartitioner的分区策略,见官方解释:
           /**
            * The default partitioning strategy:
            * <ul>
            * <li>If a partition is specified in the record, use it
            * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
            * <li>If no partition or key is present choose a partition in a round-robin fashion
            */
           对于Response Queue和Topic(名称),Partition的分区策略通过DefaultPartitioner第三种策略,即不指定
    Partition和Key,让Kafka做权重轮询放入不同的分区
           对于Request Queue(名称),
    Partition的分区策略通过DefaultPartitioner第二种策略,即不指定Partition,但指定Key(IP:Port),通过Key的Hash值来决定放入哪个分区。这就意味着,一个服务调用端即享用一个独立的分区,这就为端到端的实现创造了条件
    9)框架使用Google Guava的EventBus,使框架内部事件异步发送到外部。无论调用端和提供端,在每次Produce和Consume成功或者失败,都会发送相关的Event。如果是失败事件,为Retry创造了条件。当然可以通过配置文件来关闭事件发送


     点击查看大图
  • 类结构

    1)KafkaMQProducer.java - 消息生产者类,通过线程池发布消息到Kafka服务器指定Queue或者Topic,同时指定消息消费者消费完消息后,发送响应消息的指定Queue。调用端发送请求消息到Response Queue/Topic,实现对Request Queue监听,服务端发送响应消息到Request Queue,实现对Response Queue/Topic的监听
    2)KafkaMQConsumer.java - 消息消费类
    3)KafkaMQExecutorDelegate.java - KafkaExecutor的代理接口
    4)KafkaMQServerExecutor.java - 继承AbstractServerExecutor.java,实现
    KafkaMQExecutorDelegate.java,初始化KafkaContext,初始化跟Request Queue和Response Queue/Topic相关的事情
    5)KafkaMQServerHandler.java - 继承
    KafkaMQConsumer.java,实现服务端对消息监听(Poll)
    6)KafkaMQClientExecutor.java -
    继承AbstractClientExecutor.java,实现KafkaMQExecutorDelegate.java,初始化KafkaContext,初始化跟Response Queue相关的事情
    7)
    KafkaMQClientHandler.java - 继承KafkaMQConsumer.java,实现客户端端对消息监听(Poll)
    8)KafkaMQClientInterceptor.java - 继承AbstractClientInterceptor.java,实现如下调用方式:
          异步调用:通过KafkaMQProducer.produce方法发送异步请求ProtocolRequest。通过监听获取异步返回
          同步调用:同异步调用,采用返回值返回
          广播调用:
    通过KafkaMQProducer.produce方法发送广播请求ProtocolRequest。不返回
          重复调用:支持,通过
    Google Guava的EventBus实现
    9)KafkaMQContext.java - 实现
    初始化消息队列(Queue/Topic),初始化消息监听(KafkaMQServerHandler/KafkaMQClientHandler)
    10)KafkaMQCacheContainer.java - 缓存容器
    11)KafkaMQDestinationUtil.java - Destination工具类

  • 大小: 19.6 KB
  • 大小: 10.7 KB
  • 大小: 142 KB
  • 大小: 311.3 KB
  • 大小: 143.4 KB
  • 大小: 274.8 KB
  • 大小: 154.3 KB
  • 大小: 183.9 KB
  • 大小: 115.8 KB
  • 大小: 326.7 KB
分享到:
评论

相关推荐

    基于Java的Thunder分布式RPC框架设计源码

    Nepxion Thunder是一个基于Java的分布式RPC框架,集成了Netty、Hessian、Kafka、ActiveMQ、Tibco、Zookeeper、Redis、Spring Web MVC、Spring Boot和Docker等技术。它支持多协议、多组件和多序列化,为开发者提供了...

    协程式驱动框架Nepxion-Coroutine.zip

    Coroutine是基于Kilim/Promise JDeferred的协程式驱动框架,基于Apache Zookeeper的分布式规则存储和动态规则变更通知。 主要特性: 1. 基于微服务框架理念设计 2. 支持同步/异步调用 3. 支持串行/并行调用 4....

    Thunder::high_voltage: Nepxion Thunder is a distribution RPC framework based on Netty + Hessian + Kafka + ActiveMQ + Tibco + Zookeeper + Redis + Spring Web MVC + Spring Boot + Docker 多协议、多组件、多序列化的分布式RPC调用框架

    Nepxion Thunder是一款基于Netty + Hessian + Kafka + ActiveMQ + Tibco + Zookeeper(Curator Framework) + Redis + FST + Spring + Spring Web MVC + Spring Boot + Docker分布式RPC调用框架。架构思想主要是来自...

    yinheli/docker-thunder-xware:latest 镜像打包下载

    yinheli/docker-thunder-xware:latest 镜像打包下载 群晖 NAS DSM 系统,只要三步使用 Docker 安装迅雷远程下载

    Go-Thunder⚡️一个Go框架用于快速构建强大的graphql服务

    在压缩包"thunder-master"中,包含了Thunder框架的源码和其他相关资源。开发者可以通过查看源码,了解其内部实现原理,也可以直接使用它来快速搭建自己的GraphQL服务。在实际开发过程中,结合Go语言的标准库和第三方...

    基于Kilim、Promise JDeferred、Zookeeper和Spring Boot的协程分布式调用聚合框架设计源码

    该项目是一款基于Kilim、Promise JDeferred、Zookeeper和Spring Boot技术的协程驱动分布式...该框架支持Nepxion Thunder、Dubbo和Motan等RPC调用的集成,并通过规则配置实现调用聚合,适用于构建高性能的分布式系统。

    wine-thunder_0.6-2_all.deb

    wine-thunder_0.6-2_all.deb用于在linux系统下,使用wine直接按装的迅雷软件,实现高速下载,在ubunut,fedora等linux版本中,实现直接点击安装

    开源项目-omeid-thunder.zip

    "thunder-master"这个压缩包子文件名可能代表项目的主分支或主代码库,这在Git等版本控制系统中很常见,"master"通常指的是默认分支,存放着项目的最新稳定版本。解压后,用户可以访问到项目的源代码、文档、构建...

    Go-Thunder是BoltDB的交互式Shell

    Go-Thunder是一个基于BoltDB数据库的交互式Shell工具,主要设计用于方便地与BoltDB数据库进行交互,提供了一种命令行界面来操作和管理数据。BoltDB本身是Go语言实现的一个轻量级、文件存储的键值对数据库,它以其...

    A10-Thunder_1030S方案白皮书.pdf

    A10-Thunder_1030S方案白皮书.pdf

    home-work-thunder

    【压缩包子文件的文件名称列表】"thunder-oms" 这个文件名可能代表“Thunder Operation Management System”(迅雷运营管理系统),或者是一种特定的模块或服务。它可能包含了项目的源代码、配置文件、测试脚本等,...

    系统工具-文件下载-thunder_3.4.0.4338.zip

    标题中的“系统工具-文件下载-thunder_3.4.0.4338.zip”表明这是一款系统工具,具体来说是与文件下载相关的。这里的“thunder”很可能指的是迅雷,一个在中国广为人知的下载管理软件。版本号“3.4.0.4338”指示这是...

    开源项目-muesli-thunder.zip

    在使用Thunder时,用户应首先下载并解压“muesli-thunder.zip”文件,得到“thunder-master”目录。然后按照项目提供的安装指南编译并安装Thunder,最后通过命令行启动Thunder,开始探索和操作BoltDB数据库。对于...

    A10-Thunder_6430S方案白皮书.pdf

    A10-Thunder_6430S方案白皮书.pdf

    3D-Thunder-Lightning.zip

    3D-Thunder-Lightning.zip,受航母指令启发的开源未来动作飞行模拟器游戏,3D建模使用专门的软件来创建物理对象的数字模型。它是3D计算机图形的一个方面,用于视频游戏,3D打印和VR,以及其他应用程序。

    A10-Thunder_930方案白皮书.pdf

    A10 Thunder 930方案白皮书 A10 Thunder 930是A10 Networks公司推出的统一应用服务网关(UASG),采用64位系统、1U硬件,提供了极具性价比的解决方案。该设备基于A10极具扩展性的灵活高级核心操作系统(ACOS)架构...

    系统工具-文件下载-Thunderbird91.0b4.zip

    Thunderbird是一款由Mozilla基金会开发的开源邮件客户端,它集成了电子邮件、新闻组、RSS阅读器和日历功能,为用户提供了一站式的通信解决方案。Thunderbird91.0b4是该软件的一个版本,其中“91.0b4”表示的是版本号...

    A10-Thunder-5430S方案白皮书

    《A10 Thunder 5430S方案白皮书》是针对A10 Networks公司推出的Thunder 5430S高性能负载均衡解决方案的一份详细技术文档。在IT行业中,负载均衡是一项至关重要的技术,它确保了网络服务的高可用性和性能优化。A10 ...

    Android代码-Thunder

    Thunder Android OkHttp util package let response callback at MainThread(UIThread), also it‘s lifecycle safety. ⚠️ Thunder‘s code is based on SugarTask(Very nice code

    live-transit-thunder-bay:Thunder Bay 实时 GTFS 数据馈送的 JSON 代理

    实时过境雷湾 ... 运行npm install live-transit-thunder-bay --save 使用 API: var liveTransit = require ( 'live-transit-thunder-bay' ) ; liveTransit . start ( ) ; 运行node index.js 。 Expres

Global site tag (gtag.js) - Google Analytics