在分布式系统中,为了保证数据一致性是必须使用分布式事务。分布式事务实现方式就很多种,今天主要介绍一下使用 RocketMQ 事务消息,实现分布事务。
文末有彩蛋,看完再走
为什么需要事务消息?
很多同学可能不知道事务消息是什么,没关系,举一个真实业务场景,先来带你了解一下普通的消息存在问题。
上面业务场景中,当用户支付成功,将会更新支付订单,然后发送 MQ 消息。手续费系统将会通过拉取消息,计算手续费然后保存到另外一个手续费数据库中。
由于计算手续费这个步骤可以离线计算,所以这里采用 MQ 解耦支付与计算手续费的流程。
流程主要涉及三个步骤:
- 更新订单数据
- 发送消息给 MQ
- 手续费系统拉取消息
上面提到的步骤,任何一个都会失败,如果我们没有处理,就会使两边数据不一致,将会造成下面两种情况:
- 订单数据更新了,手续费数据没有生成
- 手续费数据生成,订单数据却没有更新
这可是涉及到真正的钱,一旦少计算,就会造成资损,真的赔不起!
对于最后一步来讲,比较简单。如果消费消息失败,只要没有提交消息确认,MQ 服务端将会自动重试。
最大的问题在于我们无法保证更新操作与发送消息一致性。无论我们采用先更新订单数据,再发送消息,还是先发送消息,再更新订单数据,都在存在一个成功,一个失败的可能。
如下所示,采用先发送消息,然后再更新数据库的方式。
上面流程消息发送成功之后,再进行本地事务的提交。这个流程看起来很完美,但是想象一下,如果在提交事务时数据库执行失败,导致事务回滚了。
然而此时消息已经发送出去,无法撤回。这就导致手续费系统紧接会消费消息,计算手续费并更新到数据库中。这就造成支付数据未更新,手续费系统却生成的不一致的情况。
那如果我们流程反一下,是不是就好了呢?
我们使用下面的伪码表示:
// 开始事务
try {
// 1.执行数据库操作
// 2.提交事务
}catch (Exception e){
// 3.回滚事务
}
// 4.发送 mq 消息
这里如果事务提交成功,但是 mq 消息发送失败,就会导致支付数据更新但是手续费数据未生成的的不一致情况。
这里有的同学可能会想到,将发送 mq 消息步骤移动到事务中,消息发送失败,回滚事务,不就完美了吗?
伪码如下:
// 开始事务
try {
// 1.执行数据库操作
// 2.发送 mq 消息
// 3.提交事务
}catch (Exception e){
// 4.回滚事务
}
上面代码看起来确实没什么问题,消息发送失败,回滚事务。
但是实际上第二步有可能存在消息已经发送到 MQ 服务端,但是由于网络问题未及时收到 MQ 的响应消息,从而导致消息发送端认为消息消息发送失败。
这就会导致订单事务回滚了,但是手续费系统却能消费消息,两边数据库又不一致了。
熟悉 MQ 的同学,可能会想到,消息发送失败,可以重试啊。
是的,我们可以增加重试次数,重新发送消息。但是这里我们需要注意,由于消息发送耦合在事务中,过多的重试会拉长数据库事务执行时间,事务处理时间过长,导致事务中锁的持有时间变长,影响整体的数据库吞吐量。
实际业务中,不太建议将消息发送耦合在数据库事务中。
事务消息
事务消息是 RocketMQ 提供的事务功能,可以实现分布式事务,从而保证上面事务操作与消息发送要么都成功,要么都失败。
使用事务消息,整体流程如下:
首先我们将会发送一个半(half) 消息到 MQ 中,通知其开启一个事务。这里半消息并不是说消息内容不完整,实际上它包含所有完整的消息内容。
这个半消息与普通的消息唯一的区别在于,在事物提交之前,这个消息对消费者来说是不可见的,消费者不会消费这个消息。
一旦半消息发送成功,我们就可以执行数据库事务。然后根据事务的执行结果再决定提交或回滚事务消息。
如果事务提交成功,将会发送确认消息至 MQ,手续费系统就可以成功消费到这条消息。
如果事务被回滚,将会发送回滚通知至 MQ,然后 MQ 将会删除这条消息。对于手续费系统来说,都不会知道这条消息的存在。
这就解决了要么都成功,要么都失败的一致性要求。
实际上面的流程还是存在问题,如果我们提交/回滚事务消息失败怎么办?
对于这个问题,RocketMQ 给出一种事务反查的机制。我们需要需要注册一个回调接口,用于反查本地事务状态。
RocketMQ 若未收到提交或回滚的请求,将会定期去反查回调接口,然后可以根据反查结果决定回滚还是提交事务。
RocketMQ 事务消息流程整体如下:
事务消息示例代码如下:
public class TransactionMQProducerExample {
public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
TransactionMQProducer producer = new TransactionMQProducer("test_transaction_producer");
// 不定义将会使用默认的
ExecutorService executorService =
new ThreadPoolExecutor(2, 5, 100,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
TransactionListener transactionListener = new TransactionListenerImpl();
producer.setTransactionListener(transactionListener);
// 改成自己的地址
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Order order = new Order("66666", "books");
Message msg =
new Message("transaction_tp",
JSON.toJSONString(order).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送半消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.println(sendResult.getSendStatus());
producer.shutdown();
}
public static class TransactionListenerImpl implements TransactionListener {
/**
* 半消息发送成功将会自动执行该逻辑
*
* @param msg
* @param arg
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务
Order order = null;
try {
order = JSON.parseObject(new String(msg.getBody(),
RemotingHelper.DEFAULT_CHARSET), Order.class);
boolean isSuccess = updateOrder(order);
if (isSuccess) {
// 本地事务执行成功,提交半消息
System.out.println("本地事务执行成功,提交事务事务消息");
return LocalTransactionState.COMMIT_MESSAGE;
} else {
// 本地事务执行成功,回滚半消息
System.out.println("本地事务执行失败,回滚事务消息");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
} catch (Exception e) {
System.out.println("本地事务执行异常");
}
// 异常情况返回未知状态
return LocalTransactionState.UNKNOW;
}
/**
* 更新订单
* 这里模拟数据库更新,返回事务执行成功
*
* @param order
* @return
*/
private boolean updateOrder(Order order) throws InterruptedException {
TimeUnit.SECONDS.sleep(1);
return true;
}
/***
* 若提交/回滚事务消息失败,rocketmq 自动反查事务状态
* @param msg
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
try {
Order order = JSON.parseObject(new String(msg.getBody(),
RemotingHelper.DEFAULT_CHARSET), Order.class);
boolean isSuccess = queryOrder(order.getOrderId());
if (isSuccess) {
// 本地事务执行成功,提交半消息
return LocalTransactionState.COMMIT_MESSAGE;
} else {
// 本地事务执行成功,回滚半消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
} catch (Exception e) {
System.out.println("查询失败");
}
// 异常情况返回未知状态
return LocalTransactionState.UNKNOW;
}
/**
* 查询订单状态
* 模拟返回查询成功
*
* @param orderId
* @return
*/
private boolean queryOrder(String orderId) throws InterruptedException {
TimeUnit.SECONDS.sleep(1);
return true;
}
}
@Data
public static class Order {
private String orderId;
private String goods;
public Order(String orderId, String goods) {
this.orderId = orderId;
this.goods = goods;
}
}
}
上面代码中:
- 我们需要为生产者指定一个唯一的
ProducerGroup
- 需要继承
TransactionListener
注解回调接口,其中executeLocalTransaction
方法执行本地事务,checkLocalTranscation
用来执行检查本地事务。 - 返回事务状态有三种:
- LocalTransactionState.UNKNOW 中间状态,RocketMQ 将会反查
- LocalTransactionState.COMMIT_MESSAGE 提交事务,消息这后续将会消费这条消息
- LocalTransactionState.ROLLBACK_MESSAGE,回滚事务,RocketMQ 将会删除这条消息
事务消息使用注意点
事务消息最大反查次数
由于单个消息反查次数过多,将会导致半消息队列堆积,影响性能。 RocketMQ 默认将单个消息的检查次数限制为 15 次。
我们可以通过修改 broker
配置文件,增加如下配置:
# N 为最大检查次数
transactionCheckMax=N
当检查次数超过最大次数后,RocketMQ 将会丢弃消息并且打印错误日志。
若想自定义丢弃消息行为,需要修改 RocketMQ broker 端代码,继承 AbstractTransactionalMessageCheckListener
重写 resolveDiscardMsg
方法,加入自定义逻辑。
同步的双重写入机制
为了确保事务消息不丢失,并且保证事务完整性,需要将事务消息复制到集群其他节点,建议使用同步双重写入机制。
事务反查时间设置
我们可以设置以下参数,设置 MQ 服务端多久之后开始反查事务消息(自事务消息保存成功之后开始计算)。
msg.putUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, "10");
或者我们可以在 broker.conf
设置以下参数:
# 单位为 ms,默认为 6 s
transactionTimeout=60000
发送端主动设置配置参数优先级大于 broker
端配置。
另外 RocketMQ 还有一个配置用于控制事务性消息检查间隔:
## 默认为 60s
transactionCheckInterval=5000
如果自定义配置如上,事务消息检查间隔为 5 秒,事务消息设置检查时间为 60 s。
这就代表 broker 每隔 5s 检查一次事务消息,如果此时事务消息到 MQ 服务端时间还未超过 60s,此时将不会反查,直到时间大于 60s。
彩蛋
查找事务消息资料的时候,发现 RocketMQ 文档存在相关错误。
文档地址:https://github.com/9526xu/rocketmq/blob/master/docs/cn/RocketMQ_Example.md
如上两处实际是错误的,应该修改为:AbstractTransactionalMessageCheckListener
与 transactionTimeout
。
顺手修改了一下,提交 PR 。哈哈,也为开源项目贡献了一份力量。
Reference
- https://github.com/apache/rocketmq/issues/481
- https://github.com/9526xu/rocketmq/blob/master/docs/cn/RocketMQ_Example.md
- 极客时间-消息队列高手课
最后说一句(求关注)
以前总以为参加开源项目很难,直到最近接连参与两次开源项目修改,才发现其实并没有想象中那么难。由于版本变更,开源项目文档有些是存在错误的,如果我们看到了,顺手修复一下,这也是为开源项目贡献一份力。
才疏学浅,难免会有纰漏,如果你发现了错误的地方,还请你留言给我指出来,我对其加以修改。
再次感谢您的阅读,我是楼下小黑哥,一位还未秃头的工具猿,下篇文章我们再见~
相关推荐
**什么是RocketMQ事务消息** RocketMQ的事务消息主要目的是为了实现分布式事务的一致性。事务消息不同于普通的消息发送,它在发送消息的同时,还会执行一个本地事务。如果本地事务成功,消息将会被提交;反之,如果...
在本文中,我们将深入探讨如何将SpringBoot框架与Apache RocketMQ集成,特别是在处理事务消息方面。RocketMQ是一款高性能、分布式的消息中间件,广泛应用于大型互联网公司,尤其在处理大规模并发和高可靠性的消息...
RocketMq事务消息发送代码流程详解 RocketMq是一种流行的开源消息队列系统,事务消息是其核心功能之一。事务消息发送代码流程详解是指在RocketMq中实现事务消息发送的整个过程。下面我们将详细介绍RocketMq事务消息...
RocketMQ 消息丢失解决方案:事务消息 在本文中,我们将讨论如何解决 RocketMQ 中的消息丢失问题,特别是在生产者发送消息到 MQ 过程中的消息丢失问题。我们将介绍 RocketMQ 的事务消息机制,如何使用事务消息来...
事务消息是RocketMQ的一大特色,它提供了分布式事务的一致性保证。在传统的消息系统中,消息的发送和业务操作通常是顺序执行的,但在分布式环境下,为了保证事务的ACID特性,RocketMQ引入了半消息(Half Message)和...
此外,RocketMQ提供了事务消息和延迟消息功能。事务消息用于保证消息的事务一致性,使得消息发送与业务操作可以一起构成一个事务。延迟消息则允许设置消息在特定时间后才被消费,适用于定时任务或延时触发的场景。 ...
- **事务消息**:RocketMQ支持事务消息,确保消息发送与本地业务操作的原子性。 - **延迟消息**:可以设定消息在未来某个时间点才被消费,用于定时任务或延后处理。 - **消息重试与死信队列**:RocketMQ提供消息...
五、RocketMQ事务消息 RocketMQ提供了一种半事务消息的机制,可以在分布式环境中实现类似本地事务的一致性。生产者发送半事务消息,如果本地事务执行成功,再发送确认消息;反之,发送回滚消息。 六、RocketMQ监控...
1. **RocketMQ事务消息流程概要**:概述了RocketMQ事务消息的处理流程,确保消息的事务性。 2. **RocketMQ事务消息设计**:深入讨论了如何设计和实现事务消息,保证消息的一致性。 **消息查询和事务** 1. **消息...
此处应有RocketMQ事务消息的序列图,描述了生产者、消费者和RocketMQ Broker之间的交互流程。 2.2.3 客户端事务消息发送序列图 这里应有客户端(生产者)发送事务消息的详细步骤图,展示了从启动本地事务到发送确认...
该资源为在购买了阿里云中间件产品rocketmq消息队列之后,使用的连接rocketmq的demo工程,该程序以 Java 为例,包括普通消息、事务消息、定时消息的测试代码,以及相关 Spring 的配置示例,同时提供tcp连接的程序。
5. **RocketMQ事务消息的优势**:RocketMQ的事务消息设计实现了对外部组件的零依赖,减少了系统复杂性,提高了事务处理的效率和可靠性。此外,由于RocketMQ作为消息中间件的角色,它可以很好地缓冲服务之间的通信...
事务消息:RocketMQ提供类似X/Open XA的分布事务功能,通过消息队列RocketMQ的事务消息能达到分布式事务的最终一致。 顺序消息:RocketMQ提供的一种按照顺序进行发布和消费的消息类型,分为全局顺序消息和分区顺序...
- **事务消息**:提供分布式事务解决方案,确保事务的一致性。 - **消息回溯**:允许消费者回溯到历史消息,方便问题排查。 4. **RocketMQ 实战** - **快速入门**:安装部署 RocketMQ,编写 Producer 和 ...
当`rejectTransactionMessage`设置为`false`时,RocketMQ 允许发送事务消息。这种特性使得生产者可以发送半消息,然后根据业务执行情况决定是否提交或回滚消息。如果业务处理成功,生产者会发送一个确认消息,否则...
4. 分布式事务:RocketMQ具有分布式事务机制,能够确保消息的原子性和一致性。 RocketMQ的应用场景包括: 1. 电商领域:RocketMQ广泛应用于电商领域,能够满足高并发、低延迟的需求。 2. 金融领域:RocketMQ应用于...
RocketMQ提供了分布式事务消息支持,允许在分布式环境中实现两阶段提交。在3.2.6版本中,事务消息的处理可能更加高效和可靠。 8. **顺序消息** 对于需要保证消息顺序的场景,RocketMQ提供顺序消息功能。同一队列...
RocketMQ 的事务消息特性就是基于这种理念设计的。 1. **RocketMQ 事务消息概述**: RocketMQ 提供了半事务消息和全局事务的消息模式,来实现分布式事务。半事务消息用于生产者发送一条消息并执行本地操作,如果...
RocketMQ是一款高性能、分布式的消息中间件,主要特点是它能确保消息的严格顺序、提供多种消息拉取模式、支持大规模订阅者的水平扩展以及实时的消息订阅机制。它最初是为电商平台设计的,因此特别适合处理高并发和大...
* **事务消息**:确保消息发送与业务操作的原子性,支持分布式事务。 **4. 角色介绍** * **NameServer**:负责服务发现和路由管理,保持轻量级且无状态。 * **Broker**:存储和转发消息,分为Master和Slave。 * **...