我们在对rabbimq进行性能测试时发现在单个队列的情况下,无论怎么压qps都上不去,而此时服务器的cpu、网络、磁盘都没什么压力,但通过web管理界面看到发送通道不断的处于流控状态,显然流控机制被启动是性能不佳的最表象原因,那这背后到底是什么原因使得rabbimq启动流控机制呢?
首先总结下前篇博客讲述的rabbitmq流控机制原理---实质上就是通过监控没各进程的mailbox,当某个进程负载过高来不及接收消息时,这个进程的mailbox就会开始堆积消息,当堆积到一定量时,就会阻塞住上游进程让其不得接收新消息,从而慢慢上游进程的mailbox也会开始积压消息,到了一定的量也会阻塞上游的上游的进程接收消息,最后就会使得负责网络数据包接收的进程阻塞掉,暂停接收数据。这就有点像一个多级的水库,当下游水库压力过大时,上游水库就得关闭闸门,使得自己的压力也越来越大这就需要关闭更上游的水库闸门直到关闭最最上游的闸门。
从这套流控机制可以看出,对于处于整个流控链中的任意进程,只要该进程被阻塞,上游进程必定全部被阻塞,若某个进程是性能瓶颈,必然会导致起上游所有进程被阻塞。所以我们可以利用流控机制的这个特点找出瓶颈所在的进程,即通过trace找出流控链中被阻塞进程中最下游的那个进程,这个进程的下游进程就是瓶颈所在。例如有四个进程A、B、C、D,消息在进程间传递的顺序是A->B->C->D, 若此时观察到 A,B不断的被阻塞,则瓶颈一定在C ,因为若A是瓶颈就不会有很多的消息往后面传,若B是瓶颈,则A一定被阻塞,下游的消息就会变少,B就不会成为瓶颈,若D是瓶颈则 C也会被阻塞。
首先找出消息传递链条的所有进程以及顺序关系,rabbimq的流控机制实现在模块credit_flow中,当处于流控机制的进程往下游发前会调用credit_flow:send/1在接收消息时会调用crediat_flow:ack/1参数都为pid,在send中该pid为下游进程pid,ack为上游进程pid,为此我们可以trace这两个函数找出所有处于流控机制中的进程以及顺序关系,在通过i(Pid)提供的进程信息结合代码分析各进程扮演的角色。
{ok, Log} = file:open("flow_trace", [write, append]),
Tfun = fun(Msg, _) ->
io:format(Log,"~p ~n", [Msg])
end,
dbg:tracer(process, {Tfun, null}),
dbg:tp({credit_flow, send, '_'}, []),
dbg:tp({credit_flow, ack, '_'}, []),
dbg:p(all, c).
最后分析出这些进程的关系如下:
rabbit_reader:负责接收网络数据包,解析数据。
rabbit_channel:负责处理amqp协议的各种方法,进行路由解析等。
rabbit_amqqueue_process:负责实现queue的所有逻辑。
rabbit_msg_store:负责实现消息的持久化。
有了以上信息后,我们就可以开始给rabbitmq加上load,查看哪些进程被阻塞。由于阻塞这个动作是在credit_flow:send/1这个函数内完成的,并不是每次调用都为阻塞绝大多数时候不会,而且阻塞的状态信息保存在进程字典中,所有直接trace阻塞这个动作不太方便。但由于阻塞之后必然有反阻塞,而这个动作是调用credit_flow:unblock/1完成的,所以我们可以trace这个函数来查看哪些进程被阻塞。
{ok, Log} = file:open("flow_trace", [write, append]),
Tfun = fun(Msg, _) ->
io:format(Log,"~p ~n", [Msg])
end,
dbg:tracer(process, {Tfun, null}),
dbg:tpl({credit_flow, unblock, '_'}, []),
dbg:p(all, c).
通过trace结果发现,瓶颈在rabbit_amqueue_process这个进程。为此eprof看下该进程有没有热点可以优化,profile的结果如下(部分截图)
从上可以看出并没有热点。
对于rabbit_amqqueue_process进程是每个队列都会对应一个这样的进程,那多申明几个队列,多几个这样的进程性能是否能提升呢?通过实验发现答案是肯定的,增加队列能够显著提升qps,但并不是越多越好,太多反而性能会下降,在增加队列数量时发现慢慢瓶颈会转移到rabbit_channel进程上,当然channel也可以声明多个解决该问题,但无论怎么设置多少channel多少queue,瓶颈始终都不会出现在msg_store_persistent进程。
- 大小: 7.7 KB
- 大小: 69.6 KB
分享到:
相关推荐
在本文中,我们将深入探讨RabbitMQ的流控机制,以及如何通过性能测试和追踪技术来识别性能瓶颈。 流控机制的核心是监控每个进程的邮箱(Mailbox),这是一个用于存储待处理消息的地方。当一个进程的邮箱堆积过多...
性能测试主要关注RabbitMQ的消息发送和接收能力,特别关注了消息的应答机制和持久化功能,以保证消息的可靠传输。测试涵盖了各种生产者与消费者数量的组合场景,以模拟实际应用中的复杂情况。 2.2.1 **单机模式测试...
"RabbitMQ流量控制机制分析" RabbitMQ 的流量控制机制是指 RabbitMQ 为了避免接收消息的速率过快,导致消息队列过长,影响系统性能的机制。该机制主要通过三部分来实现:开关闸门、关闭闸门和开启闸门。 一、开关...
消息队列:RabbitMQ:RabbitMQ消息持久化策略.docx
通过设置消息和交换机为持久化,即使RabbitMQ重启,消息也不会丢失。 3. **备用交换机(Backup Exchange)**:在主交换机故障时,备用交换机可以接管消息路由,提供高可用性。这可以通过配置镜像队列或者使用...
比RabbitMQ性能更好的消息队列RocketMQ RabbitMQ 由于持久化场景下的吞吐量只有2.6万 经过 RabbitMQ,Kafka 和 RocketMQ( ActiveMQ 性能较差,暂不考虑)的调研和分析后,我们发现 RocketMQ 比较适合
这意味着它可以模拟真实环境中的高并发场景,帮助开发者了解RabbitMQ在特定配置下的性能瓶颈和处理能力。 在测试过程中,RabbitMQTest提供了配置连接数和通道数的功能。连接数指的是客户端与RabbitMQ服务器建立的...
第11周1-第05章节-Python3.5-RabbitMQ消息持久化.mp4
RabbitMQ 的消息持久化是指在 RabbitMQ Server 中保留消息的机制,以便在 Server 崩溃或重启后可以恢复消息。消息持久化是通过在交换器、队列和消息三个方面实现的。 第一步,交换器的持久化。交换器是 RabbitMQ 中...
"RabbitMQ Mirror机制分析" RabbitMQ Mirror机制是RabbitMQ中的一种高可用性机制,旨在...RabbitMQ Mirror机制可以提供高可用性和持久化的消息队列解决方案,通过镜像队列的同步机制,确保消息的可靠传输和持久化。
【RabbitMQ进程结构分析】 RabbitMQ是一个基于Erlang构建的开源消息队列系统,它遵循AMQP(高级消息...通过对RabbitMQ的进程结构分析和性能调优,可以更好地利用其特性,解决实际工作中的挑战,提高系统的整体效率。
- "RabbitMQ初步优化分析报告.docx"、"提升RabbitMQ单队列QPS的简单方案.docx" 和 "通过流控机制看rabbimq的性能.docx":这些资料探讨了RabbitMQ性能优化的各种策略,如设置合理的队列长度、调整流控机制以避免过载...
通过这个压缩包提供的代码,你可以深入理解RabbitMQ的各种工作模式和消息持久化的实现,对于开发分布式系统和微服务架构有着重要的实践价值。如果你在学习或使用过程中遇到任何问题,可以通过讨论或者直接@我来获取...
3. **日志分析**:MQGhost可以解析和分析RabbitMQ的日志文件,帮助定位错误和性能瓶颈。 4. **队列管理**:用户可以使用MQGhost方便地创建、删除和管理RabbitMQ的队列,查看队列的详细信息,如长度、消息持久化状态...
标题中的“持久化性能测试1”指的是针对消息队列(Message Queue,MQ)系统的持久化功能进行的一次性能评估。这种测试通常是为了了解在不同配置和工作负载下,MQ系统如何处理消息的存储和检索,特别是在系统重启或...
通过理解RabbitMQ的工作原理并进行适当的性能测试和调优,我们可以确保RabbitMQ在实际应用中发挥出最佳性能,满足系统需求。在进行调优时,应结合具体业务场景进行,兼顾性能、稳定性和可扩展性。
消息队列:RabbitMQ:RabbitMQ性能调优与监控.docx
在Java等语言中,我们可以使用JSON、XML或自定义序列化格式将对象转换为字符串,然后通过RabbitMQ发送。这样做允许我们在不同的应用程序之间传递复杂的数据结构,如数据库记录、业务对象或其他类型的数据。 在实际...
RabbitMQ之所以受欢迎,主要因为它具有异步、削峰、负载均衡等高级功能,并且还提供了消息持久化机制,确保了即使在系统故障的情况下,消息也不会丢失。此外,RabbitMQ还实现了生产者和消费者之间的解耦,提高了系统...