`
dengkane
  • 浏览: 42363 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

记一次消息总线的打造

阅读更多

虽说消息队列的用法很简单:PUB/SUB, PRODUCER/CONSUMER,不过真做起来还真麻烦。

先说下原始需求:

  • Web前端发送命令消息,后端Consumer处理,然后前端得到结果
  • 需要支持Windows服务

很快,下图就出来了:

先来分析分析:

    • 前端怎么知道后端已经处理完成?
    • 前端如何在处理完后的第一时间被触发去执行某些callback呢?
    • Web前端很可能会通过ajax来定时查看某消息的处理状态

  第一反应是增加应答队列,此时:

    • 前端能够很及时的被通知到(后端处理完触发),来执行callback
    • 但是
      • ajax类型的定时查看怎么做?在ResponseQueue中查?显然不行(队列中数据越多,性能越差)
      • 如果前端关闭一段时间,消息会积压下来,性能越变越差

  因此决定增加一个DB来解决这些消息的保存以及后续的ajax类型的多次查询,如下图:

 再来分析分析,此时

    • 前端ajax类型的不定时、多次的查询某消息处理状态是解决了
    • 如果消息量很大,也可以将RabbitMQ以及DB分别做集群以及切片
    • 但,似乎还是得增加应答队列进去,因为现在CONSUMER处理完成后,针对前端的通知很麻烦,理由如下
      • 基于DB行记录的通知效率低
    • 但,即便增加了这个应答队列,也会出现如下问题
      • 如果前端崩掉后有段时间未on service,此时应答队列就会积压消息...性能会变差

  此时该咋办?

  答:用PUB/SUB机制来做这个应答队列,此时如果前端崩掉,就不会SUB了,只要online时才会有消息被通知到

  因此,继续出一张图

    

  图中的Notifier, NotifierPublisher是前端和后端的BROKER,考虑到有些线程需要主动监听,因此画在了上面。

 

  再来谈谈后端,由于没有特别高要求,对后端的要求也就是这么几点:

    • 在业务逻辑角度,消息只能被无错处理一次
    • 如果出现了Exception, 则需要后续人工介入,消息不能丢失,但也不应该造成无限循环的报Exception
    • 对于报Exception的消息,人工处理要方便

  分别分析    

    • 在业务逻辑角度,消息只能被无错处理一次
      • 在业务处理没有报错的情况下,将RabbitMQ消息的Ack动作与DB的消息状态回写做成一个TRANSACTION,如下
      • 这样就能保证此消息“从RABBITMQ Server中remove、写入查询DB”同时确保
      • 但是,如果存在下述情况时,会出现即便业务逻辑没有报错情况下多次执行
        • 那就是:如果业务逻辑执行完毕,没有报错,此时,即将触发上述代码,却还没有触发的时刻,服务crash了...
        • 解决方法
          • 在业务逻辑代码中加入幂等性
          • 在业务逻辑代码中加入检查性质代码
          • 告诉我下其他简便的方法吧(记录本地文件日志能解决,就是比较复杂)
    • 如果出现了Exception, 则需要后续人工介入,消息不能丢失,但也不应该造成无限循环的报Exception
      • 增加相应的Exception队列,实际中是增加了2个:如:
        • 比如目前有队列messages.CommandA,则异常队列有:
          • messages.CommandA.exceptions1
          • messages.CommandA.exceptions2
          • 为啥是2个?看后续
    • 对于报Exception的消息,人工处理要方便

      • 在配置文件中增加一个参数,表示运行级别:普通、异常1、异常2
      • 如果是普通级别,则CONSUMER会从messages.CommandA中获取消息进行处理,报错后会将消息move到exception1中
      • 如果是异常1级别,则CONSUMER会从messages.CommandA.exception1中获取消息进行处理,报错后会将消息move到exception2中
      • 如果是异常2级别,则CONSUMER会从messages.CommandA.exception2中获取消息进行处理,报错后将消息move到exception1中
      • 这里还有个问题,就是上面的这些RABBITMQ级别的消息从exception1移动到exception2中,都是分成PUBLISH和BASICACK两个CHANNEL上的动作完成的,不能套RABBITMQ的TX,也就是存在一致性问题
        • 解决起来同HandleSuccessfulMessage类似,都是通过本地db事务来做,都是借助了BASE思想来实现的

 

剩下的一个问题,RABBITMQ有优先级队列特性吗?答案是有:

 

注意:RABBITMQ默认是不支持客户端消息的优先级的,默认只支持Consumer的优先级,那如何实现客户端消息的优先级呢?

    1. 发送消息时,设置properties属性,记得设置Priority属性值
    2. 安装插件rabbitmq_priority_queue
    3. DeclareQueue时,加入x-max-priority特性值

 

DONE.

 

[转载自:http://www.cnblogs.com/aarond/p/MessageBus.html]

分享到:
评论

相关推荐

    消息总线java实现

    在软件开发中,消息总线是一种重要的设计模式,它允许组件之间通过解耦的方式进行通信。消息总线的实现通常涉及消息的生产、传输和消费,以及相应的消息队列和调度策略。本作业将深入探讨如何使用Java语言来实现一个...

    消息总线文档管理

    消息总线文档管理是IT行业中一个重要的概念,它涉及到系统间的通信、数据交换以及软件架构的设计。消息总线,也称为消息中间件,是一种软件组件,用于在分布式系统中的不同应用之间传递信息,实现了应用程序之间的...

    基于层次消息总线的体系结构

    在这样的背景下,一种新的软件体系风格——基于层次消息总线(Hierarchical Message Bus, HMB)的体系结构应运而生。这种架构不仅能够有效地解决传统架构中遇到的问题,还能更好地支持构建的分布和并发。 #### 二、...

    企业消息总线(ESB)源代码

    企业消息总线(ESB),全称为Enterprise Service Bus,是企业级软件系统中的一种关键架构组件,用于实现不同系统间的松耦合通信。它通过提供一个中间层来处理消息传递,使得应用程序可以发送和接收消息,而不必直接...

    消息系统总线需求模板.docx

    消息系统总线作为一种高效的消息传递机制,在现代软件架构中扮演着至关重要的角色。它不仅能够实现不同组件之间的解耦,还能确保消息的可靠传输。本文档将详细介绍消息系统总线的需求模板,包括其总体需求、功能性...

    基于消息中心的对象总线

    【基于消息中心的对象总线】是一种先进的软件设计模式,它允许不同组件或对象在分布式环境中进行通信和协作,而无需直接引用或了解彼此的具体实现。这种模式的核心是“消息中心”,它扮演着中介的角色,接收来自各个...

    软件体系结构 消息总线风格 文档管理

    消息总线风格是软件体系结构中的一个重要模式,尤其在现代分布式系统和企业级应用中广泛应用。文档管理则是任何组织中不可或缺的一部分,涉及到信息的有效存储、检索和共享。 消息总线风格的软件体系结构是一种设计...

    DeFiBus=RPC+MQ,是基于开源消息中间件打造的安全可控的分布式金融级消息总线

    DeFiBus=RPC+MQ,是基于开源消息中间件打造的安全可控的分布式金融级消息总线。DeFiBus不仅提供了RPC同步调用,还提供了MQ的异步事件通知、事件组播和广播等常用服务调用和消息模式,同时增加了应用多中心多活、服务...

    service-bus, 支持CQRS的PHP轻量级消息总线.zip

    service-bus, 支持CQRS的PHP轻量级消息总线 PSB - ProophServiceBus支持CQRS和微服务的PHP 7.1 轻量级消息总线 消息 APIprooph/服务总线是一个轻量级的消息传递 facade 。 它允许你用消息来定义你的模型的API 。

    swift-`RxEventHub`是一个基于`RxSwift`的消息总线

    `RxEventHub` 是一个基于 `RxSwift` 的消息总线(类似于 NSNotificationCenter), 它使得广播事件变得简单、类型安全、尽可能少出错,很多情况下可以用于代替 `NSNotificationCenter`。

    33-Spring Cloud消息队列&消息总线1

    消息总线是Spring Cloud提供的一个特性,主要用于服务间的事件传播。它基于消息队列,可以用来广播事件,比如配置的更新。在Spring Cloud Config的场景下,当对配置服务器的配置进行修改后,可以通过消息总线将刷新...

    Android语音控制消息总线

    【Android语音控制消息总线】是一种在Android平台上实现的消息通信机制,它允许应用程序通过语音指令进行交互,提高用户体验,尤其适用于驾驶、健身等手不便操作的场景。这一技术结合了语音识别技术和消息传递框架,...

    一个消息总线模块:当收到等待的消息时,调用回调函数;当在规定时间内没收到时,调用超时响应函数

    消息总线模块是一种软件设计模式,它用于在应用程序的不同组件之间传递信息,简化了组件间的通信,提高了系统的可扩展性和灵活性。在这个特定的模块中,它包含两个关键功能:一个是当接收到预期消息时调用回调函数,...

    Mule是一个企业服务总线(ESB)消息框架

    Mule是一个企业服务总线(ESB)消息框架.它的主要特性包括: 1.基于J2EE1.4的企业消息总线(ESB)和消息代理(broker). 2.可插入的连接性:比如Jms,jdbc,tcp,udp,multicast,http,servlet,smtp,pop3, file,xmpp等. 3.支持...

    ActiveMQ是Apache出品的开源消息总线.rar

    ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。 ActiveMQ 是一个完全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现,尽管 JMS规范出台已经是很久的事情了,但是 JMS 在当今的 J2EE 应用中间...

    微型计算机总线简介 微型计算机总线简介

    VESA总线,即视频电子标准协会总线,是一种局部总线,设计用于提高CPU与外设之间的数据传输速度,尤其适合高性能图形应用。PCI总线是Intel公司推出的高速局部总线,提供32位或64位数据通道,支持突发传输,速度高达...

    系统总线和具有基本输入输出功能的总线接口实验

    系统总线和具有基本输入输出功能的总线接口实验 实验目的:理解总线的概念及其特性,掌握控制总线的功能和应用。 实验设备: 1. TD-CMA 教学实验系统 1 台 2. PC 微机 1 台 实验原理: 1. 外设需要外部总线提供...

Global site tag (gtag.js) - Google Analytics