方案1–业务方自己实现
假设消息中间件没有提供“事务消息”功能,比如你用的是Kafka。那如何解决这个问题呢?
解决方案如下:
(1)Producer端准备1张消息表,把update DB和insert message这2个操作,放在一个DB事务里面。
(2)准备一个后台程序,源源不断的把消息表中的message传送给消息中间件。失败了,不断重试重传。允许消息重复,但消息不会丢,顺序也不会打乱。
(3)Consumer端准备一个判重表。处理过的消息,记在判重表里面。实现业务的幂等。但这里又涉及一个原子性问题:如果保证消息消费 + insert message到判重表这2个操作的原子性?
消费成功,但insert判重表失败,怎么办?关于这个,在Kafka的源码分析系列,第1篇, exactly once问题的时候,有过讨论。
通过上面3步,我们基本就解决了这里update db和发送网络消息这2个操作的原子性问题。
但这个方案的一个缺点就是:需要设计DB消息表,同时还需要一个后台任务,不断扫描本地消息。导致消息的处理和业务逻辑耦合额外增加业务方的负担。
方案2 – RocketMQ 事务消息
为了能解决该问题,同时又不和业务耦合,RocketMQ提出了“事务消息”的概念。
具体来说,就是把消息的发送分成了2个阶段:Prepare阶段和确认阶段。
具体来说,上面的2个步骤,被分解成3个步骤:
(1) 发送Prepared消息
(2) update DB
(3) 根据update DB结果成功或失败,Confirm或者取消Prepared消息。
可能有人会问了,前2步执行成功了,最后1步失败了怎么办?这里就涉及到了RocketMQ的关键点:RocketMQ会定期(默认是1分钟)扫描所有的Prepared消息,询问发送方,到底是要确认这条消息发出去?还是取消此条消息?
具体代码实现如下:
也就是定义了一个checkListener,RocketMQ会回调此Listener,从而实现上面所说的方案。
// 也就是上文所说的,当RocketMQ发现`Prepared消息`时,会根据这个Listener实现的策略来决断事务
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
// 构造事务消息的生产者
TransactionMQProducer producer = new TransactionMQProducer("groupName");
// 设置事务决断处理类
producer.setTransactionCheckListener(transactionCheckListener);
// 本地事务的处理逻辑,相当于示例中检查Bob账户并扣钱的逻辑
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
producer.start()
// 构造MSG,省略构造参数
Message msg = new Message(......);
// 发送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
producer.shutdown();
public TransactionSendResult sendMessageInTransaction(.....) {
// 逻辑代码,非实际代码
// 1.发送消息
sendResult = this.send(msg);
// sendResult.getSendStatus() == SEND_OK
// 2.如果消息发送成功,处理与消息关联的本地事务单元
LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
// 3.结束事务
this.endTransaction(sendResult, localTransactionState, localException);
}
总结:对比方案2和方案1,RocketMQ最大的改变,其实就是把“扫描消息表”这个事情,不让业务方做,而是消息中间件帮着做了。
至于消息表,其实还是没有省掉。因为消息中间件要询问发送方,事物是否执行成功,还是需要一个“变相的本地消息表”,记录事物执行状态。
人工介入
可能有人又要说了,无论方案1,还是方案2,发送端把消息成功放入了队列,但消费端消费失败怎么办?
消费失败了,重试,还一直失败怎么办?是不是要自动回滚整个流程?
答案是人工介入。从工程实践角度讲,这种整个流程自动回滚的代价是非常巨大的,不但实现复杂,还会引入新的问题。比如自动回滚失败,又怎么处理?
对应这种极低概率的case,采取人工处理,会比实现一个高复杂的自动化回滚系统,更加可靠,也更加简单。
相关推荐
本主题聚焦于“中间件事务处理”,这是一个至关重要的概念,特别是在分布式系统和企业级应用中。事务处理是确保数据一致性、可靠性和完整性的重要机制。 事务是数据库操作的基本单位,它包含了一组逻辑上的数据库...
1. 面向消息的中间件(Message-Oriented Middleware, MOM):如IBM的WebSphere MQ、Apache ActiveMQ等,主要关注消息的可靠传输和事务处理。 2. 目录服务中间件:如LDAP(Lightweight Directory Access Protocol),...
3. **消息传递机制**:论文可能会详细讨论消息的创建、发送、接收和删除过程,包括消息的编码和解码,以及保证消息顺序和可靠性的机制,如事务处理、确认机制、重试策略等。 4. **并发与性能**:消息中间件通常需要...
分布式消息中间件是现代软件架构中的重要组成部分,特别是在微服务和大数据处理场景中,它们扮演着数据通信的关键角色。本书《分布式消息中间件实践_倪炜(著)》深入探讨了四种主流的消息队列(Message Queue,MQ)...
2. 消费者:消费者从消息中间件接收并处理消息。 3. 队列与主题:在P2P模型中,消息被发送到队列;在Pub/Sub模型中,消息被发布到主题。 4. 消息存储与传递:消息中间件负责存储未被消费的消息,直到它们被正确处理...
为了充分利用消息中间件和JMS,开发者需要理解如何配置和管理消息代理,创建生产者和消费者,以及处理消息的生命周期。此外,理解消息的事务管理和一致性策略也是至关重要的,因为这直接影响到系统的可靠性和性能。 ...
本书《分布式消息中间件实践》可能会涵盖如何选择合适的消息中间件、如何设计消息模型、如何保证消息的可靠传输、如何处理消息积压和消费延迟等问题,还会涉及到性能调优、监控和故障排查等实战经验。对于想要深入...
中间件是一种处于系统软件(操作系统和网络软件)与应用软件之间的软件,它能使应用软件之间进行跨网络的透明访问和协同工作.ppt介绍了远程过程调用中间件、分布式对象中间件、事务处理中间件 以及消息中间件的基本...
- **事务支持**:一些消息中间件支持事务性消息,保证了消息的一致性。 在实际应用中,消息中间件常用于以下场景: - **订单处理**:在电商系统中,订单创建后,通过消息队列将订单信息发送给库存、支付等多个系统...
2. **创建会话**:在连接上创建会话,会话是处理消息的基本单元,可以设置为事务性或非事务性。 3. **创建生产者和消费者**:会话上可以创建多个生产者用于发送消息,创建消费者用于接收消息。生产者可以指定目标...
5. **事务支持**:高级的消息中间件还提供事务机制,保证消息的可靠投递。 在“消息中间件总结1.0版本”中,可能包含以下内容: 1. **基础理论**:对消息中间件的基本原理、工作模式、以及与常见分布式架构的适配...
- **事务支持**:确保消息处理的一致性和可靠性。 ### 二、主流消息中间件介绍 目前市面上较为流行的消息中间件主要包括: #### 2.1 RabbitMQ RabbitMQ 是一款开源的消息中间件,基于 AMQP 协议实现,具有以下...
阿里分布式消息中间件RocketMQ深入解析 RocketMQ是阿里巴巴自研...RocketMQ是阿里巴巴自研的第三代分布式消息中间件,具有高性能、低延迟、可靠重试、分布式事务等特性,广泛应用于电商、金融、大数据、物联网等领域。
- **事务机制**:保证消息处理的原子性、一致性、隔离性和持久性。 3. **订阅与发布** - **发布**:消息发布者将消息发送到特定的通道或主题上。 - **订阅**:消息订阅者可以根据自己的需求订阅感兴趣的通道或...
6. **Error Handling**: 为了处理消息传递中的异常,可能会有错误处理策略,例如死信队列,将无法正确处理的消息移到特殊队列,以便后续分析和处理。 7. **指数退避重试策略**: 为了防止消息处理失败时的资源浪费,...
### RabbitMQ消息中间件知识点详解 #### 一、RabbitMQ简介 RabbitMQ是一款开源的消息中间件,基于AMQP(Advanced Message Queuing Protocol)协议实现。它支持多种消息发布订阅模式,包括简单模式(Simple)、工作...
1. **事务支持**:RabbitMQ支持事务操作,可以在一个事务中完成消息的发送和确认,确保数据的一致性。 2. **消息TTL**:可以为消息设置过期时间,超过该时间的消息将被自动删除。 3. **死信队列**:当消息无法被正常...
RocketMQ 是阿里巴巴开源的一款分布式消息中间件,它在大规模分布式系统中扮演着关键角色,用于解耦应用、异步处理以及提高系统的响应速度和吞吐量。本压缩包中的资源可能是一个逐步学习 RocketMQ 的教程,从基础...
Apache ActiveMQ是一个开源的消息中间件,它属于Apache软件基金会。ActiveMQ旨在提供一种可靠的消息传递机制,以支持应用程序之间的异步通信。作为面向消息的中间件(MOM),ActiveMQ实现了JMS(Java Message ...
一致性是消息中间件设计中的一个关键因素,尤其是在处理分布式系统中的事务时更为重要。为了确保事务的一致性,通常会采用以下几种方式: 1. **本地事务**:如示例代码所示,通过在同一个事务中同时保存订单数据并...