MapedFileQueue
应用层访问commitlog和consumequeue文件是通过MappFileQueue来操作MapedFile类,从而间接操作磁盘上面的文件;MappFileQueue是由多个MapedFile队列组成的,该类的结果如下图所示。
功能清单如下:
1. 获取在某时间点之后更新的文件(getMapedFileByTime)
方法getMapedFileByTime(final long timestamp),遍历MapedFile列表,若遇到文件的更新时间戳大于某时间点timestamp则返回该MapedFile对象,遍历完之后仍然没有找到则返回列表的最后一个MapedFile对象。
清理指定偏移量所在文件之后的文件(truncateDirtyFiles)
方法truncateDirtyFiles(long offset):遍历MapedFile列表,每个MapedFile对象对应着一个固定大小的文件,若当文件的起始偏移量fileFromOffset<=offset<=fileFromOffset+fileSize,则表示指定的位置偏移量offset落在的该文件上,则将对应的MapedFile对象的wrotepostion和commitPosition设置为offset%fileSize,若文件的起始偏移量fileFromOffset>offset,即是命中的文件之后的文件,则将这些文件删除并且从MappFileQueue的MapedFile列表中清除掉。
获取或创建最后一个文件(getLastMapedFile)
从MapedFile列表中获取最后一个MapedFile对象,若列表为空或者最后一个对象对应的文件已经写满,则创建一个新的文件(即新的MapedFile对象);若存在最后一个文件(对应最后一个MapedFile对象)并且未写满,则直接返回最后一个MapedFile对象。
若列表为空,则创建的新文件的文件名(即fileFromOffset值)为0;若最后一个文件写满,则新文件的文件名等于最后一个文件的fileFromOffset+fileSize;
若在Broker启动初始化的时候会创建了后台服务线程(AllocateMapedFileService服务),则调用AllocateMapedFileService.putRequestAndReturnMapedFile方法,在该方法中用下一个文件的文件路径、下下一个文件的路径、文件大小为参数初始化AllocateRequest对象,并放入该服务线程的requestQueue:PriorityBlockingQueue变量中,由该线程在后台监听requestQueue队列,若该队列中存在AllocateRequest对象,则利用该对象的变量值创建MapedFile对象(即在磁盘中生成了对应的物理文件),并存入AllocateRequest对象的MapedFile变量中,并且在下一个新文件之后继续将下下一个新文件也创建了。若是在当前线程中直接创建MappeFile对象,则只创建一个新的文件。
最后将创建或返回的MapedFile对象存入MapedFileQueue的MapedFile列表中,并返回该MapedFile对象给调用者。
获取列表中的最后一个文件(getLastMapedFile2)
1
2
|
<code>从MapedFile列表中获取最后一个MapFile对象;若列表为空则返回 null 。
</code> |
统计内存的数据还有多少未持久化(howMuchFallBehind)
调用getLastMapedFile方法获取Mapped队列中最后一个MapedFile对象,计算得出未刷盘的消息大小,计算公式为:最后一个MapedFile对象fileFromOffset+写入位置wrotepostion-commitedWhere(上次刷盘的位置)。
获取MapedFile队列中最小Offset值(getMinOffset)
先获取MapedFile队列中第一个MapedFile对象,再取该对象的fileFromOffset值(即文件的名字值),该值即为最小offset值;若MapedFile列表为空,则直接返回-1;
获取MapedFile队列中最大Offset值(getMaxOffset)
获取Mapped队列中最后一个MapedFile对象,将最后一个MapedFile对象fileFromOffset加上写入位置wrotepostion值即为最大offset值。
删除某类文件中的最后一个文件(deleteLastMaped)
例如commit类型的文件下面有多个固定大小(1G)的文件,即对应在MapedFile列表中有多个MapedFile对象,若要删除最后一个文件,首先从磁盘中删除物理文件,然后从列表中删除最后一个MapedFile对象。
根据指定的offset找到所在文件(findMapedFileByOffset)
方法findMapedFileByOffset(final long offset, final boolean returnFirstOnNotFound):首先找到MapedFile队列中的第一个MapedFile对象,取该对象的fileFromOffset值;然后指定的offset减去fileFromOffset值再除以fileSize得到文件在MapedFile队列中的序列号index,最后根据index值从列表中获取MapedFile对象。若index值获取对象时出错误(index<0或者大于列表的总数),则根据returnFirstOnNotFound参数决定是返回null或者第一个MapedFile对象。
MapedFile队列中的消息刷盘(commit)
首先根据commitWhere值(上次刷盘的位置)在队列中找到所处的文件对象MapedFile,即调用findMapedFileByOffset方法;然后调用MapedFile对象的commit方法完成消息刷盘操作;最后利用MapedFile对象的fileFromOffset值加上这次刷盘的消息大小得到的总和更新commitWhere值,作为下次刷盘的开始位置,同时更新MapedFileQueue对象的存储时间戳(storeTimestamp)。
https://www.2cto.com/kf/201708/666767.html
https://www.cnblogs.com/guazi/p/6836112.html
相关推荐
而RocketMQ的消息存储在同一个物理文件中,Topic和分区的增加对性能影响较小。 基于这些结果,可以得出结论:在需要处理大量Topic和多消费端的业务场景下,RocketMQ展现出更好的性能稳定性。相比之下,Kafka更适合...
cd /opt/rocketmq-all-4.3.0-bin-release # nohup sh bin/mqnamesrv & #启动每个服务器的nameserver # tail -f nohup.out The Name Server boot success #输出此类信息,说明启动成功 启动broker 服务器Namserver1...
基于spring-cloud-alibaba套件的微服务架构的商场停车场实战案例 关键技术点应用: ... 服务发现——nacos ...异步消息处理——rocketmq 分布式缓存——redis 客户端负载均衡——openfeign RPC调用——dubbo
### 万亿级数据洪峰下的消息引擎——Apache RocketMQ #### 阿里消息中间件的演变历史 自2007年起,阿里巴巴集团在消息中间件领域不断探索与实践,经历了从Notify到MetaQ再到Apache RocketMQ的发展历程。以下是这一...
4. **启动Broker**:Broker是RocketMQ的消息存储和传输节点,运行`bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true`启动。 接下来,我们关注 RocketMQ 控制台 `rocketmq-dashboard-1.0.1-SNAPSHOT.jar`...
5. **消息模型**:RocketMQ支持两种主要的消息模型——发布/订阅(Publish/Subscribe)和点对点(Point-to-Point)。理解这两种模型的区别和应用场景是使用RocketMQ的关键。 6. **事务消息**:RocketMQ的事务消息...
6. **Message模型**:RocketMQ支持两种消息模型——发布/订阅模型和点对点模型。发布/订阅模型下,多Consumer可以订阅同一个Topic,消息会被广播给所有订阅者;点对点模型下,消息只会被单个Consumer消费,且消费后...
技术文档分享,免费获取请私信博主。
RocketMQ是阿里巴巴开源的一款分布式消息中间件,广泛应用于大数据、实时计算、微服务等领域。RocketMQ Console是RocketMQ的管理控制台,它提供了一个图形化的界面,方便用户监控和管理RocketMQ集群的状态,进行消息...
在这个"rocketmq-externals-master.zip"压缩包中,主要包含的是RocketMQ的监控端——RocketMQ-Console,因为从GitHub下载可能速度较慢,所以通过CDN提供了快速访问。 RocketMQ-Console是RocketMQ的Web管理控制台,...
1. **消息队列(Message Queue)**:RocketMQ的核心组件,它是一个存储和转发消息的中间层,负责消息的有序性和可靠性传输。 2. **生产者(Producer)**:负责发送消息到RocketMQ的消息队列。 3. **消费者...
通过对 RocketMQ 源码的深入学习,我们可以了解其内部的调度策略、网络通信机制、数据存储结构等细节,从而更好地定制、优化和调试 RocketMQ,以满足特定业务场景的需求。通过阅读 `rocketmq-all-5.0.0-source-...
RocketMQ存储篇中会详细介绍消息的存储结构,包括CommitLog和ConsumeQueue两种文件类型的设计和作用。在RocketMQ中,消息的存储和管理是非常核心的部分,它涉及到了消息的顺序写入、随机读取、文件刷盘、文件的预...
RocketMQ-Connect 二次开发源码。RocketMQ-Connect 二次开发源码。RocketMQ-Connect 二次开发源码。RocketMQ-Connect 二次开发源码。RocketMQ-Connect 二次开发源码。RocketMQ-Connect 二次开发源码。RocketMQ-...
RocketMQ源码分析,分为存储篇、NameServer篇、Broker篇、Producer篇、Consumer篇五大部分进行源码级的讲解。大致如下: 1、讲解commitlog、consumequeue、index、transaction文件等数据结构、数据读写、HA高可用等...
RocketMQ的技术架构主要包括Java语言、Netty网络框架、LevelDB存储引擎等技术。这些技术对RocketMQ的性能和稳定性有很大的影响。 十七、RocketMQ的应用场景 RocketMQ的应用场景主要包括订单系统、支付系统、物流...
RocketMQ管理工具,原名为`rocketmq-console`,在发展过程中独立成为一个单独的项目,并更名为`rocketmq-dashboard`。这个工具是由阿里巴巴开源出来的一个强大且直观的管理界面,旨在简化RocketMQ消息中间件的监控、...
RocketMQ Dashboard 是一个基于Web的监控和管理工具,主要用于阿里巴巴开源的分布式消息中间件RocketMQ。RocketMQ是一款高效、稳定、可伸缩的分布式消息系统,广泛应用于大数据、实时计算、微服务等领域。RocketMQ ...
RocketMQ是一款开源的消息中间件,由阿里巴巴开发并贡献给Apache软件基金会,被广泛应用于大数据、实时计算、微服务等场景。RocketMQ的核心功能包括消息发送与接收、消息队列、消息回溯、高可用保障、分布式事务等。...
6. **Broker**:RocketMQ服务器,存储消息并提供服务。 **四、高级特性** 1. **发布/订阅模型**:支持点对点模型和发布/订阅模型,满足不同场景需求。 2. **顺序消息**:确保消息按照特定顺序到达消费者。 3. **...