`
m635674608
  • 浏览: 5031283 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

RocketMQ原理解析-broker 3.load&recover

    博客分类:
  • MQ
 
阅读更多

Broker启动的时候需要加载一系列的配置,启动一系列的任务,主要分布在BrokerController 的initialize()和start()方法中

1.      加载topic配置

2.      加载消费进度consumer offset

3.      加载消费者订阅关系consumer subscription

4.      加载本地消息messageStore.load()

a)        Load 定时进度

b)        Load commit log

commitLog其实调用存储消费队列mapedFileQueue.load()方法来加载的。

遍历出${user.home} \store\${commitlog}目录下所有commitLog文件,按文件名(文件名就是文件的初始偏移量)升序排一下, 每个文件构建一个MapedFile对象, 在MapedFileQueue中用集合list把这些MapedFile文件组成一个逻辑上连续的队列

c)        Load consume Queue

遍历${user.home} \store\consumequeue下的所有文件夹(每个topic就是一个文件夹)

遍历${user.home} \store\consumequeue\${topic}下的所有文件夹(每个queueId

就是一个文件夹)

遍历${user.home} \store\consumequeue\${topic}\${queueId}下所有文件,根据topic, queueId, 文件来构建ConsueQueue对象

DefaultMessageStore中存储结构Map<topic,Map<queueId, CosnueQueue>>

 

每个Consumequeue利用MapedFileQueue把mapedFile组成一个逻辑上连续的队列

d)        加载事物模块

 

e)        加载存储检查点

加载${user.home} \store\checkpoint 这个文件存储了3个long类型的值来记录存储模型最终一致的时间点,这个3个long的值为

physicMsgTimestamp为commitLog最后刷盘的时间

logicMsgTimestamp为consumeQueue最终刷盘的时间

indexMsgTimestamp为索引最终刷盘时间

 

checkpoint作用是当异常恢复时需要根据checkpoint点来恢复消息

 

f)         加载索引服务indexService

 

g)        recover尝试数据恢复

判断是否是正常恢复,系统启动的启动存储服务(DefaultMessageStore)的时候会创建一个临时文件abort, 当系统正常关闭的时候会把这个文件删掉,这个类似在linux下打开vi编辑器生成那个临时文件,所有当这个abort文件存在,系统认为是异常恢复

 

 

1)  先按照正常流程恢复ConsumeQueue

为什么说先正常恢复,那么异常恢复在哪呢?当broker是异常启动时候,在异常恢复commitLog时会重新构建请到DispatchMessageService服务,来重新生成ConsumeQueue数据,索引以及事物消息的redolog

 

什么是恢复ConsumeQueue, 前面不是有步骤load了ConsumeQueue吗,为什么还要恢复?

前面load步骤创建了MapedFile对象建立了文件的内存映射,但是数据是否正确,现在文件写到哪了(wrotePosition), Flush到了什么位置(committedPosition)?恢复数据来帮我解决这些问题。

 

每个ConsumeQueue的mapedFiles集合中,从倒数第三个文件开始恢复(为什么只恢复倒数三个文件,也许只是个经验值吧),因为consumequeue的存储单元是20字节的定长数据,所以是依次分别取了

Offset long类型存储了commitLog的数据偏移量

Size int类型存储了在commitLog上消息大小

tagcode tag的哈希值

目前rocketmq判断存储的consumequeue数据是否有效的方式为判断offset>= 0 && size > 0

                            如果数据有效读取下20个字节判断是否有效

                            如果数据无效跳出循环,记录此时有效数据的偏移量processOffset

                            如果读到文件尾,读取下一个文件

                  

                            proccessOffset是有效数据的偏移量,获取这个值的作用什么?

(1)    proccessOffset后面的数据属于脏数据,后面的文件要删除掉

(2)    设置proccessOffset所在文件MapedFile的wrotePosition和commitedPosition值,值为 proccessOffset%mapedFileSize

 

2)  正常恢复commitLog文件

步骤跟流程恢复Consume Queue

判断消息有效, 根据消息的存储格式读取消息到DispatchRequest对象,获取消息大小值msgSize

           大于 0  正常数据

           等于-1      文件读取错误 恢复结束

           等于0  读到文件末尾

 

3)  异常数据恢复,OSCRASH或者JVM CRASH或者机器掉电

${user.home}\store\abort文件存在,代表异常恢复

读取${user.home} \store\checkpoint获取最终一致的时间点

判断最终一致的点所在的文件是哪个

从最新的mapedFile开始,获取存储的一条消息在broker的生成时间,大于checkpoint时间点的放弃找前一个文件,小于等于checkpoint时间点的说明checkpoint在此mapedfile文件中

从checkpoint所在mapedFile开始恢复数据,它的整体过程跟正常恢复commitlog类似,最重要的区别在于

(1)读取消息后派送到分发消息服务DispatchMessageService中,来重建ConsumeQueue以及索引

(2)根据恢复的物理offset,清除ConsumeQueue多余的数据

 

 

4)  恢复TopicQueueTable=Map<topic-queueid,offset>

(1)          恢复写入消息时,消费记录队列的offset

(2)          恢复每个队列的最小offset

 

 

 

5.      初始化通信层

6.      初始化线程池

7.      注册broker端处理器用来接收client请求后选择处理器处理

8.      启动每天凌晨00:00:00统计消费量任务

9.      启动定时刷消费进度任务

10.  启动扫描数据被删除了的topic,offset记录也对应删除任务

11.  如果namesrv地址不是指定的,而是从静态服务器取的,启动定时向静态服务器获取namesrv地址的任务

12.  如果broker是master,启动任务打印slave落后master没有同步的bytes

如果broker是slave,启动任务定时到mastser同步配置信息

http://blog.csdn.net/lovesomnus/article/details/51769977

分享到:
评论

相关推荐

    rocketmq-console-ng-1.0.1.jar

    RocketMQ-Console是RocketMQ项目的扩展插件,是一个图形化管理控制台,提供Broker集群状态查看,Topic管理,Producer、Consumer状态展示,消息查询等常用功能,这个功能在安装好RocketMQ后需要额外单独安装、运行。...

    rocketmq可视化控制台最新版 rocketmq-console-ng-2.x

    RocketMQ是一款开源的消息中间件,由阿里巴巴开发并贡献给Apache软件基金会,被广泛应用于大数据、实时计算、微服务等场景。RocketMQ的核心功能包括消息发送与接收、消息队列、消息回溯、高可用保障、分布式事务等。...

    rocketmq-console-ng-1.0.0.zip

    1. **集群管理**:可以查看RocketMQ集群的节点状态,包括NameServer和Broker的信息。 2. **主题与队列管理**:创建、删除和修改RocketMQ的主题(Topic)以及每个主题的队列(Queue)设置。 3. **消费组管理**:监控...

    rocketmq-externals-master.zip

    "rocketmq-externals-master.zip"是一个包含RocketMQ源码的压缩包,对于深入理解RocketMQ的工作原理、性能优化以及进行二次开发非常有帮助。下面将详细阐述RocketMQ的核心概念、架构、工作流程以及源码解析的关键点...

    rocketmq-4.7.1-.el7.x86_64.rpm

    vim /opt/rocketmq-4.7.1/conf/broker.conf - 启动(顺序) systemctl start mqnamesrv systemctl start mqbroker --查看状态 systemctl status mqnamesrv systemctl status mqbroker --停止(顺序) systemctl ...

    rocketmq-all-5.1.1-bin-release.zip

    以下将详细讲解RocketMQ的主要组成部分、工作原理以及如何利用这些文件进行安装和操作。 1. **主要组件**: - **NameServer**:作为服务注册与发现的中心节点,不存储具体消息,只记录Topic与Broker的映射关系,...

    rocketmq-all-5.0.0-bin-release.zip

    RocketMQ,作为阿里巴巴开源的一款高性能、高可用、稳定可靠的消息中间件,已经在众多企业和项目中得到广泛应用。RocketMQ的核心设计理念是提供低延迟、高吞吐量的分布式消息传输服务,支持大规模分布式系统的实时...

    rocketmq-all-5.1.3-bin-release.zip

    RocketMQ,作为阿里巴巴开源的一款高性能、高可用、稳定可靠的消息中间件,已经在多个大型企业和项目中得到广泛应用。5.1.3版本的RocketMQ在前代基础上进行了诸多优化和改进,旨在提供更强大的消息处理能力和服务...

    rocketmq-all-5.1.4-bin-release.zip

    3. **低延迟**:RocketMQ设计了高效的内存管理机制和磁盘存储结构,如Message Store和CommitLog,能够在大量消息处理时保持低延迟。 4. **事务消息**:RocketMQ支持分布式事务消息,保证了事务的一致性。Producer...

    rocketmq-all-4.6.0-bin-release.zip

    2. Broker:Broker 是 RocketMQ 的核心组件,负责接收 Producer 发送的消息,存储并转发给 Consumer。每个 Broker 可以配置为 Master 或 Slave,Master 用于处理写请求,Slave 用于备份数据并处理读请求,实现数据的...

    最新版 rocketmq-all-4.8.0-bin-release.zip

    RocketMQ是阿里巴巴开源的一款分布式消息中间件,广泛应用于大规模分布式系统中的消息传递。4.8.0版本是其最新的稳定版本,提供了许多增强特性和优化。以下是对RocketMQ 4.8.0版本的一些关键知识点的详细说明: 1. ...

    rocketmq-all-5.2.0-bin-release.7z

    3. 可靠性:RocketMQ支持消息持久化,即使在服务宕机的情况下,也能保证消息不丢失。其Message ID机制确保了消息的唯一性,而Exactly-Once语义则确保消息仅被消费一次。 4. 易扩展:RocketMQ设计时考虑到了水平扩展...

    rocketmq-all-4.9.4-bin-release

    3. **Producer**:生产者客户端用于发送消息到RocketMQ服务。在应用中,你需要实现`DefaultMQProducer`类,并设置Producer实例的Group ID,然后调用`send`方法发送消息。 4. **Consumer**:消费者客户端用于接收和...

    rocketmq-console-ng-1.0.1.zip

    RocketMQ Console NG 1.0.1 是一个针对Apache RocketMQ的高级可视化管理控制台,主要目的是为了方便用户更直观、高效地管理和监控RocketMQ消息中间件的运行状态。RocketMQ是一个高性能、分布式的消息队列服务,广泛...

    rocketmq-all-4.7.1-bin-release.zip

    RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息...

    apache-rocketmq-all.tar.gz

    3. **文件内容解析**: - **DISCLAIMER**:通常包含软件的法律免责声明,提醒用户自行承担使用风险。 - **LICENSE**:Apache License 2.0,表明RocketMQ遵循的开源许可协议,允许自由使用、修改和分发。 - **...

Global site tag (gtag.js) - Google Analytics