RocketMQ的并发读写能力扛住了2016年双十一,每秒17.5万笔订单的创建(单笔订单衍生出N条消息,实际tps是17.5*n 万),下面对其高并发读写原理进行探讨。主要体现在两方面:客户端收发消息,服务器接收消息并持久化(重点)。
客户端(RocketMQ-client)
1,客户端发送消息有负载均衡,客户端内存中保存着当前所有的服务器列表,每次发送都切换一台服务器发送消息,使得每台服务器接收的消息量尽量均衡,避免热点问题。
2,发送代码为线程安全,当Producer实例就绪之后,完全可以死循环发送消息。一般业务方都会有N个数据源实例,所以从数据源方面就保证高并发写能力。
3,消费者端负载均衡集群消费模式下,同一个ID的所有消费者实例平均消费该Topic的所有队列。
服务器端(Broker)
服务端的高并发读写主要利用Linux操作系统的PageCache特性,通过Java的MappedByteBuffer直接操作PageCache。MappedByteBuffer能直接将文件直接映射到内存,其实就是Map把文件的内容被映像到计算机虚拟内存的一块区域,这样就可以直接操作内存当中的数据而无需操作的时候每次都通过I/O去物理硬盘写文件的。
这里先介绍RocketMQ的消息存储结构:由commitLog和consume queue 两部分组成。
commitLog
1,commitLog是保存消息元数据的地方,所有消息到达Broker后都会保存到commitLog文件。
这里需要强调的是所有topic的消息都会统一保存在commitLog中,举个例子:当前集群有TopicA, TopicB,这两个Toipc的消息会按照消息到达的先后顺序保存到同一个commitLog中,而不是每个Topic有自己独立的commitLog。
2,每个commitLog大小上限为1G,满1G之后会自动新建CommitLog文件做保存数据用。
3,CommitLog的清理机制:
- 按时间清理,rocketmq默认会清理3天前的commitLog文件;
- 按磁盘水位清理:当磁盘使用量到达磁盘容量75%,开始清理最老的commitLog文件。
4,文件地址:${user.home}/store/${commitlog}/${fileName}
ConsumerQueue:
1,ConsumerQueue相当于CommitLog的索引文件,消费者消费时会先从ConsumerQueue中查找消息的在commitLog中的offset,再去CommitLog中找元数据。如果某个消息只在CommitLog中有数据,没在ConsumerQueue中, 则消费者无法消费,Rocktet的事务消息就是这个原理。
2,consumequeue的数据结构包含3部分:
- 消息在commitLog文件实际偏移量(commitLogOffset)
- 消息大小
- 消息tag的哈希值
3,文件地址:${user.home}/store/consumequeue/${topicName}/${queueId}/${fileName}
得益于以上的数据结构,MQ在写数据过程是顺序写盘,读数据过程是跳跃读盘(尽量命中PageCache)。
消息顺序写
在单台服务器上,MQ元数据都落在单个文件上(即commitLog),大量数据IO都在顺序写同一个commitLog,满1G了再写新的,真正意义上的顺序写盘,再加上MQ默认是累计4K才强制从PageCache中刷到磁盘(缓存),所以高并发写性能突出。
消息跳跃读
MQ读取消息依赖系统PageCache,PageCache命中率越高,读性能越高,Linux平时也会尽量预读数据,使得应用直接访问磁盘的概率降低。
当客户端向Broker拉取消息时,Broker上系统读文件过程如下:
1,检查要读的数据是否在上次预读的cache中;
2,若不在cache,操作系统从磁盘中读取对应的数据页,并且系统还会将该数据页之后的连续几页(一般三页)也一并读入到cache中,再将应用需要的数据返回给应用。此情况操作系统认为是跳跃读取,属于同步预读。
3,若命中cache,相当于上次缓存的内容有效,操作系统认为顺序读盘,则继续扩大缓存的数据范围,将之前缓存的数据页往后的N页数据再读取到cache中,属于异步预读。
系统给cache的定义了一个数据结构,命名为window,window由 当前要读取的内容 + 预读取的内容(group)组成。
下面结合下图举例说明:
- a状态:操作系统等待应用读请求时的缓存状态。
- b状态:客户端发起读操作,broker发现所读数据不在Cache中,即不在前次预读的group中,则表明文件访问不是顺序访问(场景有可能是不消费中间的某部分消息,直接消费最新的消息),系统采用同步预读,直接从磁盘中读取页面+缓存页到内存。
- c状态:客户端继续发起读操作,系统发现所读数据在Cache中,则表明前次预读命中,操作系统把预读group扩大一倍,并让底层文件系统读入group中剩下尚不在Cache中的文件数据块,异步预读。
所以Broker的机器需要大内存,尽量缓存足够多的commitLog,让Broker读写消息基本在PageCache中操作。在运行时,如果数据量非常大,可以看到broker的进程占用内存比较多,其实大部分是被缓存住的commitlog。
缓存清理机制(PageCache)
Linux会缓存尽量多的消息数据到内存中,提高读数据缓冲命中率。当内存不够时,还是要清理没用的数据,将清理的空间用以缓存新的内容,这整个过程,Linux用一个双向链表来管理,如下图:
inactive_list代表访问冷数据,active_list代表访问热数据,新分配的数据页先链入到inactive_list头部,当其被引用时再将其移到active_list的头部。
当内存不足时,系统首先从尾部开始反向扫描 active_list并将状态不是referenced的项链入到inactive_list的头部,然后系统反向扫描inactive_list,如果所扫描的项的处于合适的状态就回收该项,直到回收了足够数目的Cache项,这就是系统回收内存的过程。
这里需要注意一点,如果内存回收速度比应用写缓存的速度慢,会导致写缓存的线程一直等待,体现到RocketMQ上就是写消息RT很高,这就是 “毛刺问题”。这时就需要结合GC参数和系统内核参数进行调整,此处不对此展开说明了。
demo演示:
git clone https://github.com/javahongxi/incubator-rocketmq.git
创建配置文件conf.properties
rocketmqHome=D:\\github\\incubator-rocketmq\\distribution
namesrvAddr=127.0.0.1:9876
mapedFileSizeCommitLog=52428800
mapedFileSizeConsumeQueue=30000
-c conf.properties
依次启动NamesrvStartup,BrokerStartup,Consumer,Producer
rocketmq扩展:https://github.com/javahongxi/incubator-rocketmq-externals.git
相关推荐
《基于SpringBoot的商城秒杀系统:Redis与RocketMQ在高并发下的应用》 商城秒杀项目是一项技术挑战性极高的任务,它需要系统具备处理大量并发请求的能力,以确保在短时间内顺利完成用户的秒杀操作。在这个项目中,...
- **Message Store模块**:深入研究RocketMQ的数据存储结构,如CommitLog、ConsumeQueue等,以及如何实现高可用和高并发读写。 - **Remoting模块**:了解RocketMQ的网络通信机制,包括协议设计、RPC实现等。 5. *...
### Java秒杀系统方案优化与高性能高并发实战 在当今互联网时代,秒杀活动作为一种吸引用户的营销手段,被广泛应用于各类电商网站。然而,对于技术团队来说,如何设计和实现一个稳定、高效的秒杀系统,成为了巨大的...
1、讲解commitlog、consumequeue、index、transaction文件等数据结构、数据读写、HA高可用等功能; 2、讲解NameServer的启动、注册Broker、客户端查询Topic的路由信息等功能; 3、讲解Broker的启动、注册、处理...
例如,在高并发场景下,通过监控发送和消费速率,可以及时调整Topic和Queue的数量,避免因资源不足导致的服务中断。在故障排查时,消息查询和跟踪功能能快速定位问题所在,减少问题解决的时间。 总的来说,RocketMQ...
- **高并发**:设计优化支持大规模并发读写,满足大数据场景需求。 - **消息顺序**:提供严格的消息顺序保证,适用于金融交易等业务。 - **延迟与定时消息**:支持消息的定时和延时发送,满足不同业务需求。 - *...
RocketMQ是由阿里巴巴开源的消息中间件,具有高性能、高可靠性和高伸缩性的特点,适用于高并发的场景。它支持发布/订阅模型、消息优先级、顺序消息、消息过滤、消息持久化、消息可靠性、低延迟消息、消息重试、定时...
RocketMQ是阿里巴巴开源的一款分布式消息中间件,广泛应用于大数据、实时计算、微服务等领域,具有高可用、高并发、低延迟等特性。RocketMQ管控台是其配套的管理界面,用于监控、管理和操作RocketMQ集群,提供了一种...
4. **高性能**:设计为高吞吐量和低延迟,适合大量并发读操作。 RocketMQ,由阿里巴巴开源并贡献给Apache,是一个基于发布/订阅模式的消息中间件,尤其适合大规模分布式系统的实时消息处理。RocketMQ 3.2版本具有...
- 支持在单机上创建一万以上的虚拟化队列,保证了高并发的处理能力。 - 提供两种刷盘策略:异步刷盘和同步刷盘,以平衡消息的处理速度和数据的可靠性。 - 支持消息查询,可以通过MessageId或MessageKey进行精确...
- **高性能:** 支持高并发消息发送和接收。 - **可扩展性:** 支持水平扩展,可以通过增加Broker节点来提升系统性能。 - **可靠性:** 保证消息的可靠传输,支持消息重试机制。 - **消息过滤:** 消费者可以根据条件...
3. **网络通信**:RocketMQ使用Netty作为网络通信框架,提供了高效的异步非阻塞I/O模型,确保了大规模并发连接下的性能。 4. **消息存储与检索**:RocketMQ将消息存储在硬盘上,通过Message Store模块实现。消息...
本系统采用了基于RocketMQ的消息队列技术,通过预生成秒杀令牌、异步处理订单等方式,实现高并发下的秒杀操作,避免数据库瞬间压力过大导致服务崩溃。 4. **普通商品购买** 系统实现了常规的商品购买流程,包括...
- 单机支持1万以上持久化队列:在高并发场景下,能够支撑大规模的消息堆积。 - 刷盘策略:包括异步刷盘和同步刷盘,可满足不同场景下对消息持久性的要求。 - 消息查询:可按照MessageId和MessageKey进行消息查询。 -...
RocketMQ,作为一款开源的消息中间件,源自阿里巴巴,并在2016年...总之,RocketMQ以其强大的功能和优异的性能,广泛应用于互联网、金融、零售等多个行业的大型分布式系统中,是构建高并发、高可用系统的重要基础设施。
RocketMQ是一款由阿里巴巴开源的分布式消息中间件,它在处理大规模并发消息传递、高可用性和高可扩展性方面表现出色。RocketMQ的可视化界面包,正如其标题所言,是为了解决RocketMQ管理和监控的需求,提供了一个直观...
- **单机支持1万以上持久化队列**:单台Broker可以支持大量的队列,满足高并发的需求。 - **刷盘策略**: - **异步刷盘**:提高写入速度的同时牺牲了一定的数据安全性。 - **同步刷盘**:确保数据的持久性,但会...
1. **高吞吐量和低延迟**:RocketMQ 设计用于处理大规模并发消息,能够在高负载下保持低延迟,满足实时业务需求。 2. **高可用性**:通过主备切换和分布式部署,RocketMQ 可以保证服务的高可用性,避免单点故障。 3....
Netty的异步特性使得RocketMQ能处理高并发场景,而其高效的内存管理机制降低了系统的资源消耗。 在实际学习过程中,可以先从阅读Netty的官方文档开始,理解基本概念和API。然后,通过分析RocketMQ的源码,观察它是...