Broker启动的时候需要加载一系列的配置,启动一系列的任务,主要分布在BrokerController 的initialize()和start()方法中
1. 加载topic配置
2. 加载消费进度consumer offset
3. 加载消费者订阅关系consumer subscription
4. 加载本地消息messageStore.load()
a) Load 定时进度
b) Load commit log
commitLog其实调用存储消费队列mapedFileQueue.load()方法来加载的。
遍历出${user.home} \store\${commitlog}目录下所有commitLog文件,按文件名(文件名就是文件的初始偏移量)升序排一下, 每个文件构建一个MapedFile对象, 在MapedFileQueue中用集合list把这些MapedFile文件组成一个逻辑上连续的队列
c) Load consume Queue
遍历${user.home} \store\consumequeue下的所有文件夹(每个topic就是一个文件夹)
遍历${user.home} \store\consumequeue\${topic}下的所有文件夹(每个queueId
就是一个文件夹)
遍历${user.home} \store\consumequeue\${topic}\${queueId}下所有文件,根据topic, queueId, 文件来构建ConsueQueue对象
DefaultMessageStore中存储结构Map<topic,Map<queueId, CosnueQueue>>
每个Consumequeue利用MapedFileQueue把mapedFile组成一个逻辑上连续的队列
d) 加载事物模块
e) 加载存储检查点
加载${user.home} \store\checkpoint 这个文件存储了3个long类型的值来记录存储模型最终一致的时间点,这个3个long的值为
physicMsgTimestamp为commitLog最后刷盘的时间
logicMsgTimestamp为consumeQueue最终刷盘的时间
indexMsgTimestamp为索引最终刷盘时间
checkpoint作用是当异常恢复时需要根据checkpoint点来恢复消息
f) 加载索引服务indexService
g) recover尝试数据恢复
判断是否是正常恢复,系统启动的启动存储服务(DefaultMessageStore)的时候会创建一个临时文件abort, 当系统正常关闭的时候会把这个文件删掉,这个类似在linux下打开vi编辑器生成那个临时文件,所有当这个abort文件存在,系统认为是异常恢复
1) 先按照正常流程恢复ConsumeQueue
为什么说先正常恢复,那么异常恢复在哪呢?当broker是异常启动时候,在异常恢复commitLog时会重新构建请到DispatchMessageService服务,来重新生成ConsumeQueue数据,索引以及事物消息的redolog
什么是恢复ConsumeQueue, 前面不是有步骤load了ConsumeQueue吗,为什么还要恢复?
前面load步骤创建了MapedFile对象建立了文件的内存映射,但是数据是否正确,现在文件写到哪了(wrotePosition), Flush到了什么位置(committedPosition)?恢复数据来帮我解决这些问题。
每个ConsumeQueue的mapedFiles集合中,从倒数第三个文件开始恢复(为什么只恢复倒数三个文件,也许只是个经验值吧),因为consumequeue的存储单元是20字节的定长数据,所以是依次分别取了
Offset long类型存储了commitLog的数据偏移量
Size int类型存储了在commitLog上消息大小
tagcode tag的哈希值
目前rocketmq判断存储的consumequeue数据是否有效的方式为判断offset>= 0 && size > 0
如果数据有效读取下20个字节判断是否有效
如果数据无效跳出循环,记录此时有效数据的偏移量processOffset
如果读到文件尾,读取下一个文件
proccessOffset是有效数据的偏移量,获取这个值的作用什么?
(1) proccessOffset后面的数据属于脏数据,后面的文件要删除掉
(2) 设置proccessOffset所在文件MapedFile的wrotePosition和commitedPosition值,值为 proccessOffset%mapedFileSize
2) 正常恢复commitLog文件
步骤跟流程恢复Consume Queue
判断消息有效, 根据消息的存储格式读取消息到DispatchRequest对象,获取消息大小值msgSize
大于 0 正常数据
等于-1 文件读取错误 恢复结束
等于0 读到文件末尾
3) 异常数据恢复,OSCRASH或者JVM CRASH或者机器掉电
当${user.home}\store\abort文件存在,代表异常恢复
读取${user.home} \store\checkpoint获取最终一致的时间点
判断最终一致的点所在的文件是哪个
从最新的mapedFile开始,获取存储的一条消息在broker的生成时间,大于checkpoint时间点的放弃找前一个文件,小于等于checkpoint时间点的说明checkpoint在此mapedfile文件中
从checkpoint所在mapedFile开始恢复数据,它的整体过程跟正常恢复commitlog类似,最重要的区别在于
(1)读取消息后派送到分发消息服务DispatchMessageService中,来重建ConsumeQueue以及索引
(2)根据恢复的物理offset,清除ConsumeQueue多余的数据
4) 恢复TopicQueueTable=Map<topic-queueid,offset>
(1) 恢复写入消息时,消费记录队列的offset
(2) 恢复每个队列的最小offset
5. 初始化通信层
6. 初始化线程池
7. 注册broker端处理器用来接收client请求后选择处理器处理
8. 启动每天凌晨00:00:00统计消费量任务
9. 启动定时刷消费进度任务
10. 启动扫描数据被删除了的topic,offset记录也对应删除任务
11. 如果namesrv地址不是指定的,而是从静态服务器取的,启动定时向静态服务器获取namesrv地址的任务
12. 如果broker是master,启动任务打印slave落后master没有同步的bytes
如果broker是slave,启动任务定时到mastser同步配置信息
http://blog.csdn.net/lovesomnus/article/details/51769977
相关推荐
RocketMQ-Console是RocketMQ项目的扩展插件,是一个图形化管理控制台,提供Broker集群状态查看,Topic管理,Producer、Consumer状态展示,消息查询等常用功能,这个功能在安装好RocketMQ后需要额外单独安装、运行。...
RocketMQ是一款开源的消息中间件,由阿里巴巴开发并贡献给Apache软件基金会,被广泛应用于大数据、实时计算、微服务等场景。RocketMQ的核心功能包括消息发送与接收、消息队列、消息回溯、高可用保障、分布式事务等。...
1. **集群管理**:可以查看RocketMQ集群的节点状态,包括NameServer和Broker的信息。 2. **主题与队列管理**:创建、删除和修改RocketMQ的主题(Topic)以及每个主题的队列(Queue)设置。 3. **消费组管理**:监控...
"rocketmq-externals-master.zip"是一个包含RocketMQ源码的压缩包,对于深入理解RocketMQ的工作原理、性能优化以及进行二次开发非常有帮助。下面将详细阐述RocketMQ的核心概念、架构、工作流程以及源码解析的关键点...
vim /opt/rocketmq-4.7.1/conf/broker.conf - 启动(顺序) systemctl start mqnamesrv systemctl start mqbroker --查看状态 systemctl status mqnamesrv systemctl status mqbroker --停止(顺序) systemctl ...
以下将详细讲解RocketMQ的主要组成部分、工作原理以及如何利用这些文件进行安装和操作。 1. **主要组件**: - **NameServer**:作为服务注册与发现的中心节点,不存储具体消息,只记录Topic与Broker的映射关系,...
RocketMQ,作为阿里巴巴开源的一款高性能、高可用、稳定可靠的消息中间件,已经在众多企业和项目中得到广泛应用。RocketMQ的核心设计理念是提供低延迟、高吞吐量的分布式消息传输服务,支持大规模分布式系统的实时...
RocketMQ,作为阿里巴巴开源的一款高性能、高可用、稳定可靠的消息中间件,已经在多个大型企业和项目中得到广泛应用。5.1.3版本的RocketMQ在前代基础上进行了诸多优化和改进,旨在提供更强大的消息处理能力和服务...
3. **低延迟**:RocketMQ设计了高效的内存管理机制和磁盘存储结构,如Message Store和CommitLog,能够在大量消息处理时保持低延迟。 4. **事务消息**:RocketMQ支持分布式事务消息,保证了事务的一致性。Producer...
2. Broker:Broker 是 RocketMQ 的核心组件,负责接收 Producer 发送的消息,存储并转发给 Consumer。每个 Broker 可以配置为 Master 或 Slave,Master 用于处理写请求,Slave 用于备份数据并处理读请求,实现数据的...
RocketMQ是阿里巴巴开源的一款分布式消息中间件,广泛应用于大规模分布式系统中的消息传递。4.8.0版本是其最新的稳定版本,提供了许多增强特性和优化。以下是对RocketMQ 4.8.0版本的一些关键知识点的详细说明: 1. ...
3. 可靠性:RocketMQ支持消息持久化,即使在服务宕机的情况下,也能保证消息不丢失。其Message ID机制确保了消息的唯一性,而Exactly-Once语义则确保消息仅被消费一次。 4. 易扩展:RocketMQ设计时考虑到了水平扩展...
3. **Producer**:生产者客户端用于发送消息到RocketMQ服务。在应用中,你需要实现`DefaultMQProducer`类,并设置Producer实例的Group ID,然后调用`send`方法发送消息。 4. **Consumer**:消费者客户端用于接收和...
RocketMQ Console NG 1.0.1 是一个针对Apache RocketMQ的高级可视化管理控制台,主要目的是为了方便用户更直观、高效地管理和监控RocketMQ消息中间件的运行状态。RocketMQ是一个高性能、分布式的消息队列服务,广泛...
RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息...
3. **文件内容解析**: - **DISCLAIMER**:通常包含软件的法律免责声明,提醒用户自行承担使用风险。 - **LICENSE**:Apache License 2.0,表明RocketMQ遵循的开源许可协议,允许自由使用、修改和分发。 - **...