`
ybbct
  • 浏览: 30975 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

RabbitMQ流量控制机制简单分析

阅读更多

 

在RabbitMQ中,消息可能被存储在多个不同的队列,消息越早被消费,那么消息经过的队列层次越少,则平均每个消息处理的开销就越小。但若接收消息的速率过快,MQ来不及处理,这些消息就可能进入很深层次的队列,大大增加平均每个消息的处理开销,进一步使得处理新消息和发送旧消息的能力减弱,更多的消息会进入很深的队列,循环往复,整个系统的性能就会极大的降低。另外若接收消息的速率过快还会实现某些进程的mailbox过大,可能会产生很严重的后果。为此RabbitMQ设计了一套流控机制,本文从以下三个方面去阐述该流控机制是如何工作的。

1.如何开关闸门

RabbitMQ使用TCP长连接进行通讯,接收数据的起点进程为rabbit_reader。首先分析它的接收loop

 

recvloop(Deb, State = #v1{connection_state = blocked}) ->
    mainloop(Deb, State); 
recvloop(Deb, State = #v1{sock = Sock, recv_len = RecvLen, buf_len = BufLen})
  when BufLen < RecvLen ->
    ok = rabbit_net:setopts(Sock, [{active, once}]), 
    mainloop(Deb, State#v1{pending_recv = true});
recvloop(Deb, State = #v1{recv_len = RecvLen, buf = Buf, buf_len = BufLen}) ->
    {Data, Rest} = split_binary(case Buf of
                                    [B] -> B;
                                    _   -> list_to_binary(lists:reverse(Buf))
                                end, RecvLen),
    recvloop(Deb, handle_input(State#v1.callback, Data,
                               State#v1{buf = [Rest],
                                        buf_len = BufLen - RecvLen})).
 

从上面代码可以看出,rabbit_reader每接收到一个包,就设置套接字属性为{active, onece},若当前连接被blocked时则不设置{active,once},这个接收进程就阻塞在receive方法上。通过这种方式来实现闸门的开关。

2.何时关闭闸门

RabbitMQ是用erlang/OTP开发的,一个消息从被接收到被发送给订阅者,必然要在多个进程间的转发,从接收到被消费,一个消息所走过的所有进程自然形成一条消息链,RabbitMQ通过监控这条链上每个节点“mailbox”中未被接收的消息数量,决定何时关闭闸门。实现机制如下所述:


如图所示,进程A、B、C连成一条消息链,每个进程字典中有一对关于收发消息的credit值,以进程B为例,{{credit_from, C}, Value},表示能发多少条消息给C,每发一条消息该值减1,当为0时,本进程阻塞住不再往下游进程发消息也不再接收上游的消息;{{credit_to, A}, Value}表示再接收多少个消息就向上游进程发增加credit值的消息{bump_credit, { self(), Quantity}},在上游进程接收到该消息后,就增加{credit_from, pid}值,这样上游进程就能持续发消息。但当上游发送速率高于下游接收速率,credit值会逐渐被耗光这时进程就会被阻塞,阻塞的情况会一直传递到最上游

Rabbit_reader,这时rabbit_reader就关闭闸门。

3.何时开启闸门

当上游进程收到来自下游进程的bump_credit消息时,若此时上游进程处于block状态则解除block状态,开始接收更上游进程的消息,一个个的传导最终能够解除rabbit_reader的block状态。


 

  • 大小: 29 KB
分享到:
评论

相关推荐

    淘宝RabbitMQ系统的实验测试资料.zip

    - "RabbitMQ_Backing_Queue结构.docx" 和 "RabbitMQ流量控制机制分析.docx":这部分内容深入解析了RabbitMQ的后端队列结构,以及如何利用流量控制机制(如QoS)来防止消息过快生产和堆积,维持系统的平衡。...

    RabbitMQ消息中间件技术精讲.txt

    5. **Virtual Host(虚拟主机)**:虚拟主机是RabbitMQ中的逻辑隔离单元,可以看作是独立的RabbitMQ服务器,主要用于权限控制和资源隔离。 #### 三、RabbitMQ的工作原理 1. **消息发布**:生产者将消息发送给交换...

    rabbitmq介绍

    无论是简单的消息传递还是复杂的业务流程协调,RabbitMQ都能够提供稳定可靠的支持。随着云原生架构的发展,RabbitMQ在微服务架构中扮演着越来越重要的角色,成为构建高效、可靠分布式系统不可或缺的一部分。

    一个简单的基于Springboot实现商品秒杀系统

    3. **限流与降级**:为了防止系统在秒杀开始时因大量请求导致崩溃,我们需要进行流量控制。可以使用Guava的RateLimiter或Spring Cloud Gateway的限流策略。另外,当系统压力过大时,可以采取服务降级策略,优先保证...

    JeecgCloud简介.pptx

    Spring Cloud Gateway是Spring Cloud官方推出的第二代网关框架,它提供了强大的路由转发能力和灵活的过滤器机制,使得微服务间的路由变得更加简单且高效。在JeecgCloud中,Spring Cloud Gateway充当了API网关的角色...

    Kafka设计解析

    这种设计增加了系统的灵活性和峰值处理能力,让系统能够更好地应对突发流量,而无需为可能很少出现的流量高峰投入大量资源。消息系统还能够减少系统组件间的耦合度,提高了系统的可恢复性和消息处理的顺序性。 消息...

    大型网站架构不得不考虑的10个问题

    - **解决方案**:实施乐观锁或悲观锁机制来控制并发访问;采用事务管理确保数据的一致性;优化缓存策略,比如使用LRU算法管理缓存,避免不必要的更新冲突;使用读写分离技术减轻主数据库的压力。 #### 3. 文件存贮...

    java+redis+秒杀项目实战.zip

    6. **限流与降级策略**:为了保护系统稳定,项目可能会采用滑动窗口算法、漏桶算法或令牌桶算法进行流量控制。同时,对于超出系统处理能力的请求,可能采取降级策略,如返回友好的错误页面,防止系统崩溃。 7. **...

    SpringCloud.zip

    2. **Zuul**或**Spring Cloud Gateway**:作为边缘服务,它们提供动态路由、流量控制、安全过滤等功能,是微服务的入口。 3. **Ribbon**:这是一个客户端负载均衡器,它与Eureka结合,可以在客户端进行服务的选择。...

    jd_seckill-master.zip

    3. **队列服务**:为了防止系统过载,秒杀请求可能会先放入消息队列,如RabbitMQ、Kafka等,然后由后台服务按顺序处理,实现流量削峰填谷。 4. **数据库优化**:秒杀商品的库存信息需要频繁更新,数据库层面可能...

    springcloud代码.zip

    9. **Spring Cloud Sleuth**:分布式追踪 - 结合 Zipkin 或 ELK Stack(Elasticsearch, Logstash, Kibana),可以进行分布式系统中请求的跟踪,帮助分析和优化微服务间的调用链路。 在 "02_代码" 中,我们可能看到...

    PHP秒杀系统 高并发高性能的极致挑战-

    这两种算法可以帮助我们有效地控制秒杀活动中的流量,防止系统因过载而崩溃。 #### 五、实战案例分析 在具体实现过程中,可以通过以下几个步骤来搭建一个简单的秒杀系统: 1. **环境准备**:搭建PHP运行环境,...

    Java高性能系统常见设计与优化

    6. **内存分析**:使用内存分析工具(如VisualVM或JProfiler)找出内存泄漏。 通过理解这些关键概念和实践,开发者可以构建出性能卓越的Java应用。同时,不断学习和适应新的技术和工具也是保持系统高性能的重要途径...

    SpringBoot实现Java高并发秒杀系统.zip

    6. **限流和熔断机制**:使用Hystrix或Sentinel等组件,对秒杀接口进行限流,防止流量洪峰导致系统崩溃。同时,熔断机制可以在服务不可用时,快速返回错误,保护整个系统稳定。 7. **缓存预热**:在秒杀开始前,将...

    报警接口说明

    4. **网络安全**:利用入侵检测系统(IDS)监测网络流量,发现异常访问模式后立即启动防御机制。 ### 六、报警接口的测试与维护 为了保证报警接口的正常运行,还需要进行定期的测试与维护工作: 1. **功能测试**:...

    (完整word版)互联网高并发架构设计.doc

    3. **并发控制**:防止逻辑错误,如用户签到多次发放积分,可通过锁机制或乐观锁等手段控制。 4. **服务化**:随着业务增长,可以将不同功能拆分成独立服务,如用户服务、订单服务,各自拥有独立的并发架构。 对于...

    SpringCloud 22道面试题和答案.docx

    Spring Cloud 是一套全面的微服务解决方案,它基于 Spring Boot 的便利性,为开发分布式系统(如配置管理、服务发现、断路器、智能路由、微代理、控制总线、一次性令牌、全局锁、领导选举、分布式会话、集群状态)...

    Play Framework Cookbook(PlayFramework )(September 4, 2011)

    - **实现方法**:使用Play Framework 的路由机制和控制器来设计RESTful API,并通过JSON/XML等格式提供数据。 #### 七、利用验证能力 - **知识点概述**:Play Framework 集成了强大的数据验证功能,帮助开发者确保...

Global site tag (gtag.js) - Google Analytics