为什么消息要具备事务能力
还是比较清晰的。简单的说 就是在你业务逻辑过程中,需要发送一条消息给订阅消息的人,但是期望是 此逻辑过程完全成功完成之后才能使订阅者收到消息。
业务逻辑过程 假设是这样的:
逻辑部分a-->发消息给MQ-->逻辑部分b
假设我们在发送消息给MQ之后执行逻辑部分b时产生了异常,那如果MQ不具备事务消息能力时,订阅者也收到了消息。这是我们不希望见到的。
分布式事务基础概念
- 关于分布式事务、两阶段提交协议、三阶提交协议
- 理解分布式事务的两阶段提交2pc
- 分布式事务(一)两阶段提交及JTA
- 分布式系统常用思想和技术总结
- 【整理】脑裂问题
- 分布式系统的事务处理
- 多版本并发控制(MVCC)在分布式系统中的应用
- 戏说PAXOS
- 阿里云消息队列 MQ关于事务消息的文档
rocketmq具备事务能力的demo
参见TransactionProducerDemo.java
向producer注册的TransactionCheckListener实现并没有用,因为返回LocalTransactionState.UNKNOW状态时,在3.2.6版本中,是不支持此状态下回调TransactionCheckListener的,具体参见以下两个issue。
事务消息 LocalTransactionState.UNKNOW 状态下不回查 #221
开源版本支持事务消息吗 #364
测试过程中发现返回UNKNOW状态是不能正确达到期望的,但是返回ROLLBACK_MESSAGE状态还是能达到期望的。
实现分析入口
这个实现的入口还是比较容易找的,只要搜寻ROLLBACK_MESSAGE这个变量的引用即可发现。顺着搜索查看,其实很容易发现,客户端在收到业务逻辑返回的事务状态时会发送一条结束事务的指令给broker。
// com.alibaba.rocketmq.client.impl.MQClientAPIImpl.endTransactionOneway(String, EndTransactionRequestHeader, String, long) 871行 RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);
按broker对外部指令的常规做法,一般会有一个Processor与之对应。是EndTransactionProcessor,看BrokerController374行其注册的地方,没错。
EndTransactionProcessor分析(broker侧)
如果LocalTransactionExecuter.executeLocalTransactionBranch返回LocalTransactionState.ROLLBACK_MESSAGE时,EndTransactionProcessor会清空message的body的置成null,queueOffset也不会更新,那么consumer就收不到消息了。
//--EndTransactionProcessor.processRequest 200行-- if (MessageSysFlag.TransactionRollbackType == requestHeader.getCommitOrRollback()) { msgInner.setBody(null); }
如果LocalTransactionExecuter.executeLocalTransactionBranch返回LocalTransactionState.COMMIT_MESSAGE,那么EndTransactionProcessor则会照常put message。
事务消息分为两个阶段,prepare阶段与commit阶段。prepare阶段的消息会写入store,只是写完之后的queueOffset(逻辑位置)为0(commit阶段写完消息后的queueOffset就不是0了。);
// -- com.alibaba.rocketmq.store.CommitLog.DefaultAppendMessageCallback.doAppend(long, ByteBuffer, int, Object) 1002行 -- final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); switch (tranType) { // Prepared and Rollback message is not consumed, will not enter the // consumer queue case MessageSysFlag.TransactionPreparedType: case MessageSysFlag.TransactionRollbackType: queueOffset = 0L; break; case MessageSysFlag.TransactionNotType: case MessageSysFlag.TransactionCommitType: default: break;
待分析问题列表:
1. prepare阶段已经将消息发了过去,commit的时候是否还会再发送一次消息?
2. rollback的时候是否会将prepare的消息删除?
http://www.cnblogs.com/simoncook/p/6478196.html
分布式事物是基于二阶段提交的
1) 一阶段,向broker发送一条prepared的消息,返回消息的offset即消息地址commitLog中消息偏移量。Prepared状态消息不被消费
发送消息ok,执行本地事物分支, 本地事物方法需要实现rocketmq的回调接口2)2)2) LocalTransactionExecuter,处理本地事物逻辑返回处理的事物状态LocalTransactionState
3) 二阶段,处理完本地事物中业务得到事物状态, 根据offset查找到commitLog中的prepared消息,设置消息状态commitType或者rollbackType, 让后将信息添加到commitLog中, 其实二阶段生成了两条消息
事物消息发送
相关推荐
RocketMQ-Console是RocketMQ项目的扩展插件,是一个图形化管理控制台,提供Broker集群状态查看,Topic管理,Producer、Consumer状态展示,消息查询等常用功能,这个功能在安装好RocketMQ后需要额外单独安装、运行。...
- **生产者(Producer)**:负责发送消息到RocketMQ服务器。 - **消费者(Consumer)**:负责接收并处理来自RocketMQ服务器的消息。 - **NameServer**:服务发现和路由中心,提供主题和队列的注册与查询功能,不...
Apache RocketMQ是一款高性能、分布式的消息中间件,广泛应用于大数据实时处理、在线业务交易和消息订阅等场景。RocketMQ提供可靠的消息传递、高效的批量发送、丰富的消息模型以及大规模集群的扩展性,确保服务的...
- **Producer**:生产者,负责创建和发送消息到RocketMQ系统。 - **Consumer**:消费者,从RocketMQ系统中拉取消息并进行处理。 - **Message**:RocketMQ中的基本数据单元,包含主题(Topic)、标签(Tag)和消息...
4. **事务消息**:RocketMQ支持分布式事务消息,保证了事务的一致性。Producer可以发送半消息,待业务操作成功后,再进行消息的提交或回滚。 5. **消息回溯**:RocketMQ提供了消息回溯功能,允许消费者在一段时间内...
3. Producer:Producer 负责发送消息到 Broker,可以是同步、异步或单向发送。同步发送确保消息被成功投递,异步发送则提高了性能,单向发送则不关心消息是否被成功接收。 4. Consumer:Consumer 从 Broker 拉取...
- Producer:消息生产者,负责生成并发送消息到RocketMQ集群。 - Consumer:消息消费者,订阅并消费来自RocketMQ的消息。 3. **消息模型**: - Push模式:消费者主动从Broker拉取消息。 - Pull模式:Broker推送...
9. 事务消息:RocketMQ提供事务消息功能,实现了分布式事务的一致性,使得业务处理与消息发送成为原子操作。 10. 分布式调度:RocketMQ的NameServer是轻量级的注册中心,负责路由信息的管理和更新,避免了单点故障...
3. **Producer**:生产者客户端用于发送消息到RocketMQ服务。在应用中,你需要实现`DefaultMQProducer`类,并设置Producer实例的Group ID,然后调用`send`方法发送消息。 4. **Consumer**:消费者客户端用于接收和...
- **Producer**:生产者客户端,负责发送消息到RocketMQ集群。 - **Consumer**:消费者客户端,可以是Push Consumer(消息推模式)或Pull Consumer(消息拉模式),用于接收和处理消息。 - **Broker**:消息...
RocketMQ是一个高性能、分布式的消息队列服务,广泛应用于微服务架构中的异步处理、解耦以及容错等场景。Console NG 1.0.1 版本引入了用户登录功能,增强了系统的安全性与管理能力。 在使用rocketmq-console-ng-...
4. **Producer**:Producer是消息的生产者,通过RocketMQ的Java API(标签中提到的“java”),可以创建Producer实例,向指定的Topic发布消息。 5. **Consumer**:Consumer是消息的消费者,同样通过Java API,可以...
RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息...
NameServer负责路由信息管理,Broker处理消息存储和分发,Producer发送消息,Consumer消费消息。 3. **新特性**:在4.9.1版本中,可能包含了对旧版本的bug修复、性能提升、新功能添加等。具体的新特性可以通过查看...
RocketMQ是阿里巴巴开源的一款分布式消息中间件,广泛应用于大规模分布式系统中的消息传递。这个"rocketmq-all-4.5.2-bin-release.zip"压缩包包含了RocketMQ 4.5.2版本的预编译二进制文件,允许用户在本地环境快速...
- **Producer**: 生产者,负责发送消息到RocketMQ服务器。 - **Consumer**: 消费者,订阅主题并接收消息。 - **Topic**: 主题,消息的分类,生产者和消费者都基于主题进行交互。 - **Queue**: 队列,消息实际...