每隔5秒调用一次MQClientInstance.persistAllConsumerOffset()方法将消费进度向Broker同步。遍历MQClientInstance.consumerTable: ConcurrentHashMap<String/*group */, MQConsumerInner>变量。对于PushConsumer端和PullConsumer端,处理逻辑是一样的,以DefaultMQPushConsumerImpl为例,调用DefaultMQPushConsumerImpl.persistConsumerOffset()方法。
1、获取DefaultMQPushConsumerImpl.rebalanceImpl变量的processQueueTable:ConcurrentHashMap<MessageQueue, ProcessQueue>变量值,取该变量的key值集合,即MessageQueue集合;以该集合为参数调用OffsetStore.persistAll(Set<MessageQueue> mqs)方法;
2、若消息模式是广播(BROADCASTING),即DefaultMQPushConsumerImpl.offsetStore变量初始化为LocalFileOffsetStore对象,在此调用LocalFileOffsetStore.persistAll(Set<MessageQueue> mqs)方法,在此方法中遍历LocalFileOffsetStore.offsetTable:ConcurrentHashMap<MessageQueue,AtomicLong>变量,将包含在第1步的MessageQueue集合中的MessageQueue对象的消费进度持久化到consumerOffset.json物理文件中;
3、若消息模式为集群模式,即DefaultMQPushConsumerImpl.offsetStore变量初始化为RemoteBrokerOffsetStore对象,在此调用RemoteBrokerOffsetStore.persistAll(Set<MessageQueue> mqs)方法,在此方法中遍历RemoteBrokerOffsetStore.offsetTable:ConcurrentHashMap <MessageQueue,AtomicLong>变量;对于包含在第1步的MessageQueue集合中的MessageQueue对象,调用updateConsumeOffsetToBroker(MessageQueuemq, long offset)方法向Broker发送UPDATE_CONSUMER_OFFSET请求码的消费进度信息;
消费进度(offset)
消费进度是指,当一个consumer group里的consumer在消费某个queue里的消息时,equeue是通过记录消费位置(offset)来知道当前消费到哪里了。以便该consumer重启后继续从该位置开始消费。比如一个topic有4个queue,一个consumer group有4个consumer,则每个consumer分配到一个queue,然后每个consumer分别消费自己的queue里的消息。equeue会分别记录每个consumer对其queue的消费进度,从而保证每个consumer重启后知道下次从哪里开始继续消费。实际上,也许下次重启后不是由该consumer消费该queue了,而是由group里的其他consumer消费了,这样也没关系,因为我们已经记录了这个queue的消费位置了。所以可以看出,消费位置和consumer其实无关,消费位置完全是queue的一个属性,用来记录当前被消费到哪里了。另外一点很重要的是,一个topic可以被多个consumer group里的consumer订阅。不同consumer group里的consumer即便是消费同一个topic下的同一个queue,那消费进度也是分开存储的。也就是说,不同的consumer group内的consumer的消费完全隔离,彼此不受影响。还有一点就是,对于集群消费和广播消费,消费进度持久化的地方是不同的,集群消费的消费进度是放在broker,也就是消息队列服务器上的,而广播消费的消费进度是存储在consumer本地磁盘上的。之所以这样设计是因为,对于集群消费,由于一个queue的消费者可能会更换,因为consumer group下的consumer数量可能会增加或减少,然后就会重新计算每个consumer该消费的queue是哪些,这个能理解的把?所以,当出现一个queue的consumer变动的时候,新的consumer如何知道该从哪里开始消费这个queue呢?如果这个queue的消费进度是存储在前一个consumer服务器上的,那就很难拿到这个消费进度了,因为有可能那个服务器已经挂了,或者下架了,都有可能。而因为broker对于所有的consumer总是在服务的,所以,在集群消费的情况下,被订阅的topic的queue的消费位置是存储在broker上的,存储的时候按照不同的consumer group做隔离,以确保不同的consumer group下的consumer的消费进度互补影响。然后,对于广播消费,由于不会出现一个queue的consumer会变动的情况,所以我们没必要让broker来保存消费位置,所以是保存在consumer自己的服务器上。
http://blog.csdn.net/meilong_whpu/article/details/77065587
https://www.cc362.com/content/Npz3glwzPQ.html
相关推荐
例如,Consumer可能在处理消息之前就向Broker确认消息已被处理,但在后续处理过程中遇到故障,这将导致消息事实上未被正确处理而被认为是已处理的状态。 - **解决方案**:为了避免这类问题的发生,可以将Consumer的...
在搭建rocketmq集群过程中遇到的问题,记录下了,以免后来人浪费时间
- 消息回溯:Consumer可以设置消费位点,实现消息的回溯,方便排查问题。 - 容错机制:RocketMQ提供了消息重试、死信队列等功能,应对各种异常情况。 - 动态扩展:可以根据业务需求动态添加或减少Broker节点,实现...
2. **NameServer**: 是RocketMQ的路由注册中心,负责维护Broker的地址信息,便于生产者和消费者查找合适的Broker进行通信。NameServer每隔10秒检查Broker是否在120秒内报告了心跳,而Broker则每30秒向NameServer上报...
RocketMQ基于Producer-Broker-Consumer模型,其中Producer负责创建和发送消息,Broker作为消息的存储和转发中心,Consumer则负责消费这些消息。每个Broker可以承载多个Topic,每个Topic的消息又可以分片存储在不同的...
RocketMQ是一款开源的消息中间件,由阿里巴巴开发并贡献给Apache软件基金会,主要用于处理高并发、低延迟、高可用的消息传输任务。它在分布式系统中扮演着重要的角色,为应用程序提供可靠的消息传递服务,支持发布/...
5. **Consumer**:Consumer是消息的接收方,它可以订阅一个或多个Topic,按照Pull或Push方式消费消息。Pull模式下,Consumer主动从Broker拉取消息;Push模式下,Broker会根据Consumer的消费能力推送消息。 6. **...
RocketMQ 是一款高性能、分布式的消息中间件,常用于大型分布式系统中的消息传递。以下是对RocketMQ常见问题的详细解析: 1. **API 文档**:RocketMQ 暂时未提供官方的 API 文档,但用户可以参考源码中的实例代码来...
4. Pull模式:Consumer主动从Broker拉取消息,适用于消息量不定且需要自定义消费策略的场景。 三、消息队列与分片 RocketMQ将每个主题(Topic)划分为多个队列(Queue),每个队列又由多条消息组成。消息分片...
a、Cmd命令框执行进入至‘MQ文件夹\bin’下,然后执行‘start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true’ ,启动BROKER。成功后会弹出提示框,此框勿关闭。 b、假如弹出提示框提示‘错误: 找...
4. 消息回溯:Consumer可以通过设置消费位点回溯,重新消费历史消息。 5. 消费策略:包括顺序消费、广播消费和集群消费等,满足不同业务场景需求。 四、RocketMQ的高可用性 1. Master-Slave复制:通过主从复制确保...
ActiveMQ 集群——JDBC Master Slave + Broker Cluster ActiveMQ 集群是指将多个 ActiveMQ 服务器组合在一起,以提高系统的可扩展性和可靠性。在这个集群中,我们可以使用 JDBC Master Slave 模式和 Broker Cluster...
6. **NameServer**:RocketMQ中的服务注册与发现组件,生产者和消费者需要通过NameServer找到相应的Broker进行通信。 7. **Broker**:实际存储消息的服务器,负责消息的接收、存储和分发。 RocketMQConsole的特性...
RocketMQ 配置文件:(下面是默认配置) brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH
7. **延迟消息**:RocketMQ允许设置消息的延迟时间,让消息在指定的时间后才被消费,适用于定时任务或触发器场景。 8. **消息回溯**:在某些场景下,如错误处理或数据分析,需要重新消费历史消息,RocketMQ的回溯...
7007_RocketMQ_Broker配置文件详解 8008_RocketMQ_helloworld示例讲解 9009_RocketMQ_整体架构概述详解 10010_RocketMQ_Producer_API详解 11011_RocketMQ_Producer_顺序消费机制详解 12012_RocketMQ_Producer_事务...
该项目为基于Java核心的RocketMQ协议处理插件与Pulsar Broker的集成设计方案,源码包含219个文件,涵盖170个Java源文件、12个Go语言文件、10个XML配置文件、6个Markdown文件、6个YAML配置文件、3个Shell脚本、2个...
2. **架构设计**:RocketMQ的架构主要由NameServer、Producer、Consumer和Broker四个关键组件组成。NameServer负责路由管理,Producer负责生产消息,Consumer负责消费消息,Broker则作为存储节点,承载着消息的存储...
Consumer可以选择从Master或Slave订阅消息,如果Master宕机,Consumer会在30s后转向Slave消费,可能会有少量消息丢失,但在Master恢复后,未同步的消息会被消费。 三、RocketMQ的最佳实践 1. 集群部署:为了高可用...