`
m635674608
  • 浏览: 5041859 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

RocketMQ——Consumer篇:向Broker同步消费进度的定时任务

    博客分类:
  • MQ
 
阅读更多

每隔5秒调用一次MQClientInstance.persistAllConsumerOffset()方法将消费进度向Broker同步。遍历MQClientInstance.consumerTable: ConcurrentHashMap<String/*group */, MQConsumerInner>变量。对于PushConsumer端和PullConsumer端,处理逻辑是一样的,以DefaultMQPushConsumerImpl为例,调用DefaultMQPushConsumerImpl.persistConsumerOffset()方法。

1、获取DefaultMQPushConsumerImpl.rebalanceImpl变量的processQueueTable:ConcurrentHashMap<MessageQueue, ProcessQueue>变量值,取该变量的key值集合,即MessageQueue集合;以该集合为参数调用OffsetStore.persistAll(Set<MessageQueue> mqs)方法;

2、若消息模式是广播(BROADCASTING),即DefaultMQPushConsumerImpl.offsetStore变量初始化为LocalFileOffsetStore对象,在此调用LocalFileOffsetStore.persistAll(Set<MessageQueue> mqs)方法,在此方法中遍历LocalFileOffsetStore.offsetTable:ConcurrentHashMap<MessageQueue,AtomicLong>变量,将包含在第1步的MessageQueue集合中的MessageQueue对象的消费进度持久化到consumerOffset.json物理文件中;

3、若消息模式为集群模式,即DefaultMQPushConsumerImpl.offsetStore变量初始化为RemoteBrokerOffsetStore对象,在此调用RemoteBrokerOffsetStore.persistAll(Set<MessageQueue> mqs)方法,在此方法中遍历RemoteBrokerOffsetStore.offsetTable:ConcurrentHashMap <MessageQueue,AtomicLong>变量;对于包含在第1步的MessageQueue集合中的MessageQueue对象,调用updateConsumeOffsetToBroker(MessageQueuemq, long offset)方法向Broker发送UPDATE_CONSUMER_OFFSET请求码的消费进度信息;

 

消费进度(offset)

消费进度是指,当一个consumer group里的consumer在消费某个queue里的消息时,equeue是通过记录消费位置(offset)来知道当前消费到哪里了。以便该consumer重启后继续从该位置开始消费。比如一个topic有4个queue,一个consumer group有4个consumer,则每个consumer分配到一个queue,然后每个consumer分别消费自己的queue里的消息。equeue会分别记录每个consumer对其queue的消费进度,从而保证每个consumer重启后知道下次从哪里开始继续消费。实际上,也许下次重启后不是由该consumer消费该queue了,而是由group里的其他consumer消费了,这样也没关系,因为我们已经记录了这个queue的消费位置了。所以可以看出,消费位置和consumer其实无关,消费位置完全是queue的一个属性,用来记录当前被消费到哪里了。另外一点很重要的是,一个topic可以被多个consumer group里的consumer订阅。不同consumer group里的consumer即便是消费同一个topic下的同一个queue,那消费进度也是分开存储的。也就是说,不同的consumer group内的consumer的消费完全隔离,彼此不受影响。还有一点就是,对于集群消费和广播消费,消费进度持久化的地方是不同的,集群消费的消费进度是放在broker,也就是消息队列服务器上的,而广播消费的消费进度是存储在consumer本地磁盘上的。之所以这样设计是因为,对于集群消费,由于一个queue的消费者可能会更换,因为consumer group下的consumer数量可能会增加或减少,然后就会重新计算每个consumer该消费的queue是哪些,这个能理解的把?所以,当出现一个queue的consumer变动的时候,新的consumer如何知道该从哪里开始消费这个queue呢?如果这个queue的消费进度是存储在前一个consumer服务器上的,那就很难拿到这个消费进度了,因为有可能那个服务器已经挂了,或者下架了,都有可能。而因为broker对于所有的consumer总是在服务的,所以,在集群消费的情况下,被订阅的topic的queue的消费位置是存储在broker上的,存储的时候按照不同的consumer group做隔离,以确保不同的consumer group下的consumer的消费进度互补影响。然后,对于广播消费,由于不会出现一个queue的consumer会变动的情况,所以我们没必要让broker来保存消费位置,所以是保存在consumer自己的服务器上。

 

http://blog.csdn.net/meilong_whpu/article/details/77065587

https://www.cc362.com/content/Npz3glwzPQ.html

分享到:
评论

相关推荐

    RocketMQ消息丢失解决方案:同步刷盘+手动提交.docx

    例如,Consumer可能在处理消息之前就向Broker确认消息已被处理,但在后续处理过程中遇到故障,这将导致消息事实上未被正确处理而被认为是已处理的状态。 - **解决方案**:为了避免这类问题的发生,可以将Consumer的...

    搭建rocketmq集群遇到的坑点及解决办法

    在搭建rocketmq集群过程中遇到的问题,记录下了,以免后来人浪费时间

    rocketmq安装包,rocketmq-all-5.1.3-bin-release

    5. **Consumer**:Consumer是消息的接收方,它可以订阅一个或多个Topic,按照Pull或Push方式消费消息。Pull模式下,Consumer主动从Broker拉取消息;Push模式下,Broker会根据Consumer的消费能力推送消息。 6. **...

    rocketmq-all-4.9.3-source

    - 消息回溯:Consumer可以设置消费位点,实现消息的回溯,方便排查问题。 - 容错机制:RocketMQ提供了消息重试、死信队列等功能,应对各种异常情况。 - 动态扩展:可以根据业务需求动态添加或减少Broker节点,实现...

    RocketMQ概念 producer:生产者,消息发送者

    2. **NameServer**: 是RocketMQ的路由注册中心,负责维护Broker的地址信息,便于生产者和消费者查找合适的Broker进行通信。NameServer每隔10秒检查Broker是否在120秒内报告了心跳,而Broker则每30秒向NameServer上报...

    RocketMQ高级原理:深入剖析消息系统的核心机制

    RocketMQ基于Producer-Broker-Consumer模型,其中Producer负责创建和发送消息,Broker作为消息的存储和转发中心,Consumer则负责消费这些消息。每个Broker可以承载多个Topic,每个Topic的消息又可以分片存储在不同的...

    rocketmq安装包.rar

    RocketMQ是一款开源的消息中间件,由阿里巴巴开发并贡献给Apache软件基金会,主要用于处理高并发、低延迟、高可用的消息传输任务。它在分布式系统中扮演着重要的角色,为应用程序提供可靠的消息传递服务,支持发布/...

    RocketMQ群问题整理

    RocketMQ 是一款高性能、分布式的消息中间件,常用于大型分布式系统中的消息传递。以下是对RocketMQ常见问题的详细解析: 1. **API 文档**:RocketMQ 暂时未提供官方的 API 文档,但用户可以参考源码中的实例代码来...

    rocketmq可视化控制台最新版 rocketmq-console-ng-2.x

    3. **消息发送与消费监控**:监控生产者和消费者的行为,查看消息发送速率、消费进度、堆积情况等。 4. **消息追踪**:追踪消息在整个生命周期中的流转,定位消息丢失或延迟问题。 5. **实例监控**:展示各个Broker...

    RocketMQ高级原理:深入剖析消息系统的核心机制(超详细整理讲解,值得收藏)

    4. Pull模式:Consumer主动从Broker拉取消息,适用于消息量不定且需要自定义消费策略的场景。 三、消息队列与分片 RocketMQ将每个主题(Topic)划分为多个队列(Queue),每个队列又由多条消息组成。消息分片...

    RocketMQ部署

    a、Cmd命令框执行进入至‘MQ文件夹\bin’下,然后执行‘start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true’ ,启动BROKER。成功后会弹出提示框,此框勿关闭。 b、假如弹出提示框提示‘错误: 找...

    ActiveMQ 集群——JDBC Master Slave + Broker Cluster

    ActiveMQ 集群——JDBC Master Slave + Broker Cluster ActiveMQ 集群是指将多个 ActiveMQ 服务器组合在一起,以提高系统的可扩展性和可靠性。在这个集群中,我们可以使用 JDBC Master Slave 模式和 Broker Cluster...

    RocketMQ技术内幕.rar

    4. 消息回溯:Consumer可以通过设置消费位点回溯,重新消费历史消息。 5. 消费策略:包括顺序消费、广播消费和集群消费等,满足不同业务场景需求。 四、RocketMQ的高可用性 1. Master-Slave复制:通过主从复制确保...

    rocketmq可视化界面,rocketmq-console

    6. **NameServer**:RocketMQ中的服务注册与发现组件,生产者和消费者需要通过NameServer找到相应的Broker进行通信。 7. **Broker**:实际存储消息的服务器,负责消息的接收、存储和分发。 RocketMQConsole的特性...

    rocketmq_broker.conf

    RocketMQ 配置文件:(下面是默认配置) brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH

    rocketMQ(原版操作手册)

    7. **延迟消息**:RocketMQ允许设置消息的延迟时间,让消息在指定的时间后才被消费,适用于定时任务或触发器场景。 8. **消息回溯**:在某些场景下,如错误处理或数据分析,需要重新消费历史消息,RocketMQ的回溯...

    JAVA-ACE-架构师系列视频教程在线观看地址- RocketMQ(订单实战上下全集)

    7007_RocketMQ_Broker配置文件详解 8008_RocketMQ_helloworld示例讲解 9009_RocketMQ_整体架构概述详解 10010_RocketMQ_Producer_API详解 11011_RocketMQ_Producer_顺序消费机制详解 12012_RocketMQ_Producer_事务...

    基于Java与多语言兼容的RocketMQ协议处理插件集成Pulsar Broker设计源码

    该项目为基于Java核心的RocketMQ协议处理插件与Pulsar Broker的集成设计方案,源码包含219个文件,涵盖170个Java源文件、12个Go语言文件、10个XML配置文件、6个Markdown文件、6个YAML配置文件、3个Shell脚本、2个...

    阿里RocketMQ_用户指南_V3.2.4 & RocketMQ-原理简介

    2. **架构设计**:RocketMQ的架构主要由NameServer、Producer、Consumer和Broker四个关键组件组成。NameServer负责路由管理,Producer负责生产消息,Consumer负责消费消息,Broker则作为存储节点,承载着消息的存储...

Global site tag (gtag.js) - Google Analytics