RabbitMQ流控机制的核心是一个称为{InitialCredit, MoreCreditAfter}的元组,默认情况下值为{200, 50}。假如消息发送者进程A要给接收者进程B发消息,每发一条消息,Credit数量减1,直到为0,A被block住,对于接收者B,每接收MoreCreditAfter条消息,会向A发送一条消息,给予A MoreCreditAfter个Credit,当A的Credit>0时,A可以继续向B发送消息。
每个流控进程的进程字典中都维护着4类键:{credit_from, From}, {credit_to, To}, credit_blocked和credit_deferred,这些键值通过UPDATE宏来更新:
-define(UPDATE(Key, Default, Var, Expr),
begin
case get(Key) of
undefined -> Var = Default;
Var -> ok
end,
put(Key, Expr)
end).
其中,Key为键名,Default为初始值,Var为当前值,Expr为更新当前值的表达式。
{credit_from, From}的值表示还能向消息接收进程From发送多少条消息,当前进程会向多少个进程发送消息,进程字典中就有多少个{credit_from, From}键;{credit_to, To}的值表示当前进程再接收多少条消息,就要向消息发送进程增加Credit数量;credit_blocked的值表示当前进程被哪些进程block了,也就是禁止当前进程向其发送消息的进程列表,比如A向B发送消息,A的进程字典中{credit_from, B}值为0,那么,A的credit_blocked值为[B];credit_deferred缓存着消息接收进程向消息发送进程增加Credit的消息列表,它在当前进程被消息接收进程block时使用,当进程被unblock后(credit_blocked值为空),credit_deferred列表中的消息会依次发送。
RabbitMQ的credit_flow模块封装着流控相关的函数。
credit_flow:send/1和credit_flow:send/2函数在消息发送前调用,每发一条消息,更新{credit_from, From},使对应的Credit减1,当Credit数量为0时,调用block/1函数。
send(From) -> send(From, ?DEFAULT_CREDIT).
send(From, {InitialCredit, _MoreCreditAfter}) ->
?UPDATE({credit_from, From}, InitialCredit, C,
if C == 1 -> block(From),
0;
true -> C - 1
end).
credit_flow:block/1函数会更新credit_blocked值,把From对应的进程加到当前block进程列表中,表示当前进程不能再向block进程列表中的进程发送消息。
block(From) -> ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]).
credit_flow:ack/2函数用于更新{credit_to, To}的值,每收到一条消息,消息接收进程ack一次,将{credit_to, To}值减1,当值减到1时,调用grant/2函数,给予消息发送进程MoreCreditAfter个Credit,使其可以继续发送消息。
ack(To) -> ack(To, ?DEFAULT_CREDIT).
ack(To, {_InitialCredit, MoreCreditAfter}) ->
?UPDATE({credit_to, To}, MoreCreditAfter, C,
if C == 1 -> grant(To, MoreCreditAfter),
MoreCreditAfter;
true -> C - 1
end).
credit_flow:grant/2函数可以给予消息发送进程指定数量的Credit,它会向目标进程发送一条{bump_credit, {self(), Quantity}}的普通消息,根据当前进程是否被block,grant/2函数会决定将bump_credit消息立即发送还是放入credit_deferred进程字典中缓存起来,等待时机合适再发送。
grant(To, Quantity) ->
Msg = {bump_credit, {self(), Quantity}},
case blocked() of
false -> To ! Msg;
true -> ?UPDATE(credit_deferred, [], Deferred, [{To, Msg} | Deferred])
end.
credit_flow:blocked/0函数用于判断当前进程是否被block,其判断依据是该进程进程字典中credit_blocked值是否为空,空表示未block,不为空表示其被某些进程block了。
blocked() -> case get(credit_blocked) of
undefined -> false;
[] -> false;
_ -> true
end.
credit_flow:grant/2函数会向消息发送进程发送一条bump_credit消息,那么消息发送进程收到这条消息后,会调用credit_flow:handle_bump_msg/1函数进行处理,更新相应的{credit_from, From}值,增加Credit数量。
handle_bump_msg({From, MoreCredit}) ->
?UPDATE({credit_from, From}, 0, C,
if C =< 0 andalso C + MoreCredit > 0 -> unblock(From),
C + MoreCredit;
true -> C + MoreCredit
end).
如果当前的Credit值小于0,并且加上收到的MoreCredit个Credit后值大于0,这种情况表示当前进程可以继续向From进程发送消息,进行一次unblock/1操作。
credit_flow:unblock/1函数用于将消息接收进程从当前进程的credit_blocked列表中删除。
unblock(From) ->
?UPDATE(credit_blocked, [], Blocks, Blocks -- [From]),
case blocked() of
false -> case erase(credit_deferred) of
undefined -> ok;
Credits -> [To ! Msg || {To, Msg} <- Credits]
end;
true -> ok
end.
如果unblock/1操作后当前进程的block列表为空,那么需要取出由credit_deferred缓存的消息列表,依次发送。
另外,为了避免消息接收进程或者消息发送进程挂掉引起其他进程永远被block,当检测到有进程挂掉后需要调用peer_down/1进行相关记录清理。
peer_down(Peer) ->
unblock(Peer),
erase({credit_from, Peer}),
erase({credit_to, Peer}),
ok.
RabbitMQ中,一条消息从接收到入队,在进程间的传递链路为rabbit_reader -> rabbit_channel -> rabbit_amqqueue_process -> rabbit_msg_store。进程之间构成一个有向无环图:
messages
最后分析出这些进程的关系如下:
rabbit_reader:负责接收网络数据包,解析数据。
rabbit_channel:负责处理amqp协议的各种方法,进行路由解析等。
rabbit_amqqueue_process:负责实现queue的所有逻辑。
rabbit_msg_store:负责实现消息的持久化。
这里以rabbit_reader -> rabbit_channel之间的消息流控为例说明。
rabbit_reader:process_frame/3函数解析帧,当该帧为一个包含内容的帧时,调用rabbit_channel:do_flow/3函数:
process_frame(Frame, Channel, State) ->
......
case rabbit_command_assembler:process(Frame, AState) of
......
{ok, Method, Content, NewAState} ->
rabbit_channel:do_flow(ChPid, Method, Content),
put(ChKey, {ChPid, NewAState}),
post_process_frame(Frame, ChPid, control_throttle(State));
......
end.
rabbit_channel:do_flow/3函数做的事情是调用credit_flow:send/1更新自己的{credit_from, Pid}值,然后将包含消息的帧发送给rabbit_channel进程(由参数Pid指定):
do_flow(Pid, Method, Content) ->
credit_flow:send(Pid),
gen_server2:cast(Pid, {method, Method, Content, flow}).
注意do_flow/3虽然属于rabbit_channel模块,但是它是在rabbit_reader:process_frame/3函数中被调用的,credit_flow:send/1修改的是rabbit_reader进程的进程字典,只有执行完gen_server2:cast/2函数后,消息才进入到rabbit_channel进程空间,rabbit_channel模块对此异步消息的处理函数为:
handle_cast({method, Method, Content, Flow},
State = #ch{reader_pid = Reader}) ->
case Flow of
flow -> credit_flow:ack(Reader);
noflow -> ok
end,
......
rabbit_channel进程每收到一条消息,就调用一次credit_flow:ack/1函数,当进程字典中的{credit_to, Reader}值为0时,向rabbit_reader进程发送bump_credit消息,此消息由rabbit_reader进程的handle_other({bump_credit, Msg}, State)函数进行处理。
handle_other({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
control_throttle(State);
流控机制的基本原理就是这样,但是讲到现在,都还没有讲到RabbitMQ是怎么阻塞客户端的数据请求,现在以一个具体场景为例说明:
在上面的消息传递链中,假设rabbit_msg_store进程(Pid为StorePid)来不及处理rabbit_amqqueue_process进程(Pid为AMQPid)过来的消息,根据流控机制,慢慢地rabbit_amqqueue_process进程字典中的{credit_from, StorePid}值变为0,credit_blocked值变为[StorePid],此时如果有消息从rabbit_channel发送到rabbit_amqqueue_process进程,rabbit_amqqueue_process进程还是会将消息源源不断地发送到rabbit_msg_store进程。咋看之下,流控并没有起作用,实际上,此时rabbit_channel进程字典中的{credit_from, AMQPid}值也会慢慢减少,并且,rabbit_channel进程不会再收到来自rabbit_amqqueue_process进程的bump_credit消息,因为rabbit_amqqueue_process的credit_blocked列表不为空,它会将待发的{bump_credit, {self(), Quantity}}消息缓存在credit_deferred列表中!逐渐,rabbit_channel进程字典中的credit_blocked列表变为[AMQPid],rabbit_channel进程无法向其上游的rabbit_reader进程发送bump_credit消息(全都缓存在自己进程字典的credit_deferred里)。到最后,最上游的rabbit_reader进程字典中credit_blocked列表不为空,在此进程中调用credit_flow:blocked/0返回true。回到rabbit_reader进程,每处理完一条包含内容的帧时(rabbit_channel:do_flow/3),都会调用control_throttle/1函数,如果credit_flow:blocked/0返回true,会将当前的连接状态置为blocking:{running, true} -> State#v1{connection_state = blocking},随即,连接状态会转变为blocked。这时,当rabbit_reader继续接受客户端数据时,就会进入recvloop/2函数子句:
recvloop(Deb, State = #v1{connection_state = blocked}) ->
mainloop(Deb, State);
最终进程阻塞在mainloop/2的rabbit_net:recv/1函数上。rabbit_net:recv/1函数会阻塞的原因是RabbitMQ采用了gen_tcp的半阻塞模型,也就是说每次接受一个tcp消息之后,必须显式调用inet:setopts(Sock, [{active, once}])来激活一下,否则,进程会一直阻塞在receive语句上。
以上就是假设rabbit_msg_store处理速度跟不上,最终导致rabbit_reader进程停止接收客户端数据的流控机制作用的过程。根据实现原理,消息链进程所构成的有向无环图中,任何一条边触发流控机制,最终都会导致这条连接停止接收客户端数据。
注:RabbitMQ源码为3.1.5。
Reference:
[1]. Alvaro Videla – RABBITMQ INTERNALS – CREDIT FLOW FOR ERLANG PROCESSES.
[2]. RabbitMQ流量控制机制分析.docx
相关推荐
"RabbitMQ流量控制机制分析" RabbitMQ 的流量控制机制是指 RabbitMQ 为了避免接收消息的速率过快,导致消息队列过长,影响系统性能的机制。该机制主要通过三部分来实现:开关闸门、关闭闸门和开启闸门。 一、开关...
在本文中,我们将深入探讨RabbitMQ的流控机制,以及如何通过性能测试和追踪技术来识别性能瓶颈。 流控机制的核心是监控每个进程的邮箱(Mailbox),这是一个用于存储待处理消息的地方。当一个进程的邮箱堆积过多...
- "RabbitMQ初步优化分析报告.docx"、"提升RabbitMQ单队列QPS的简单方案.docx" 和 "通过流控机制看rabbimq的性能.docx":这些资料探讨了RabbitMQ性能优化的各种策略,如设置合理的队列长度、调整流控机制以避免过载...
标题中的“Rabbitmq堆积消息后...总的来说,理解RabbitMQ的工作机制和调整策略对于优化系统性能和防止生产者被阻塞至关重要。正确配置和使用这些措施可以帮助我们更好地管理消息队列,避免生产速率因消息堆积而下降。
3. **RabbitMQ流控与性能优化** - **内存和磁盘阈值**:当内存或磁盘使用量达到预设阈值时,RabbitMQ会阻止生产者发送新消息,防止资源耗尽。 - **基于信用证的流控**:为了避免消费者消费速度慢导致的内存压力,...
此外,RabbitMQ还可用于实现最终一致性、广播和错峰流控等场景。 RabbitMQ拥有众多特性,包括: 1. 可靠性:通过持久化、传输确认和发布确认等机制保证消息的可靠性。 2. 灵活的路由:使用Exchange进行消息路由,...
- **流量控制(Flow Control)**:RabbitMQ 提供服务端和消费端的流控机制,防止消息积压和系统过载。 - **监控和报警**:实施监控系统,及时发现并处理性能瓶颈和异常情况。 - **合理设计消息模型**:根据业务...
应用解耦、异步、流量削锋、数据分发、错峰流控、日志收集等等...MQ衡量标准服务性能、数据存储、集群架构主流竞品分析当前市面上mq的产品很多,比如RabbitMQ、Kafka、ActiveMQ、ZeroMQ和阿里巴巴捐献给Apache的...
- 当流量过大、内存超出限制或磁盘空间不足时,RabbitMQ会启动流控机制,暂时停止接收新的消息。 - 这有助于防止因资源耗尽而导致的服务不可用情况。 #### 七、总结 通过上述分析可以看出,消息队列不仅能够提高...
4. **错峰流控**:通过消息队列可以平滑高峰期的流量,避免下游系统因短时间内处理大量请求而过载。 5. **强一致性与最终一致性**:对于需要强一致性的场景,分布式事务是一种解决方案,但它往往较为复杂且成本较高...
- **过载保护**:具备内存占用和磁盘空间监控机制,当资源使用超过阈值时会触发流控措施。 - **消息控制**:支持细粒度的消息生产和消费控制及状态管理。 - **预取机制**:允许一次性获取多条消息,提高消费效率。 -...
- **数据库持久化**:如RabbitMQ和ActiveMQ使用关系型数据库或NoSQL数据库。 - **键值存储系统**:如使用LevelDB或Redis等。 - **文件记录**:如Kafka和RocketMQ将消息存储在磁盘文件中。 - **内存镜像**:类似Redis...
在mt-shop项目中,SpringBoot作为基础框架,负责启动和管理应用,而SpringCloudAlibaba则作为微服务治理工具,用于实现服务发现、负载均衡、熔断机制等功能。具体来说: 1. **服务注册与发现**:Nacos作为服务注册...
持久化队列如Redis、RabbitMQ等,会将数据存储在硬盘上,即使服务重启也能保证日志不丢失。 在“源码”这个标签下,我们可以推测这篇博客可能会深入到具体的代码实现,例如使用Java的BlockingQueue接口或者开源库如...
适配器模块的主要目标是简化 Sentinel 在不同框架中的接入流程,使得开发者能够快速利用 Sentinel 的保护机制,如流控规则、熔断策略和系统保护规则。目前,Sentinel Go 适配器支持的集成主要包含以下几个方面: 1....
4. **异步处理**:为了提高响应速度,秒杀请求可以采用消息队列(如RabbitMQ或Kafka)进行异步处理,将订单创建、库存扣减等操作放入队列,后台服务再按顺序处理,避免前端阻塞,提升用户体验。 5. **缓存策略**:...
在众多的消息队列产品中,如RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ等,以及部分数据库如Redis、MySQL和phxsql,都提供了消息队列的功能,以满足不同场景的需求。 1. **消息队列的核心功能** - **低...