`
greemranqq
  • 浏览: 975482 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

spring+activemq(4)-异步消息限流

    博客分类:
  • JMS
阅读更多

一、序言

       有时候我们追求最快的方式发送消息,我们就采用的异步方式,并且不持久化。但是这样带来的问题有这样几个:

       1.如果消费者的消费能力低于生产者,那么消息就会积压在broker, 从而导致broker 可能挂掉。

       

       2.我们知道存放内存的模式,只要出现宕机或者其他问题,容易丢消息,因此得看情况而定

          对于问题1,activemq 采用了限流 内存溢出提醒的方式进行处理,下面是一些实例过程。

          官方介绍可以参考:http://activemq.apache.org/producer-flow-control.html

 

二、操作过程:

       1.我用的activemq.5.11 版本,下载下来打开conf/activemq.xml默认配置改为异步persistent=false

       

 <broker xmlns="http://activemq.apache.org/schema/core" persistent="false"  brokerName="localhost" dataDirectory="${activemq.data}">

       同时开启限流模式,未了测试方便,我们限流5M

 

      

<policyEntry queue=">" producerFlowControl="true" memoryLimit="5mb" />
# 经过测试 producerFlowControl 是默认开启的

    这里有原文信息:

 

    

Note that, since the introduction of the new file cursor in ActiveMQ 5.x, non-persisted messages are shunted into the temporary file store to reduce the amount of memory used for non-persistent messaging. As a result, you may find that a queue's memoryLimit is never reached, as the cursor doesn't use very much memory. 
英文不好,大概意思是:
activemq 5.0 以后引入了new file cursor ,非持久化消息会分流到temporary file 存储,用来减少实际内存使用。结果你会发现queue 的内存限制,无法超标,cursor 不会消耗很多内存。
这里有个疑问,反正temporary file 我是没看到- -,不知道为什么
注意:这里是每个queue 限制5M,不是总的

   

 

    2.测试方式:

       打开borker,然后发送10W消息到queue,你可以看到到了一定条数,producer 就不动了,这时候你再打开customer ,才会正常消费。   

 

     3.当内存达到limit 限制的时候,我们可能需要做其他处理,方便我们得知,比如:让生产者异常

     

<systemUsage>
 <systemUsage sendFailIfNoSpace="true">
   <memoryUsage>
     <memoryUsage limit="20 mb"/>
   </memoryUsage>
 </systemUsage>
</systemUsage>

    然后生产者,或者说客户端就可以捕获:javax.jms.ResourceAllocationException

 

 当然这个会直接抛出异常,后面了个属性,表示5秒后才抛出这个异常。

  

 <systemUsage sendFailIfNoSpaceAfterTimeout="5000">

 

 

    我们捕获异常之后就可以持续发送消息之后,就可以做其他处理了,broker 也不会爆掉了。

 

    3.Disabling Flow Control 不控制流量的做法

       其实这是为了使用整个服务器的配置资源,也就是说上面是单个控制,这里是整体控制

       

# 默认的系统配置,分别代表
#memoryUsage 内存
#storeUsage 持久化
#tempUsage 临时数据
#当整体数据 操作下面的时候,才爆异常,就不用单个控制了
# 比如用异步 非持久化 测试,Memory percent used 到100 就不动了
# 这部分的使用可以参考:
# http://akuntamukkala.blogspot.com/2014/01/understanding-memory-usage-in-activemq.html

<systemUsage>
  <systemUsage>
    <memoryUsage>
      <memoryUsage limit="64 mb" />
    </memoryUsage>
    <storeUsage>
      <storeUsage limit="100 gb" />
    </storeUsage>
    <tempUsage>
      <tempUsage limit="10 gb" />
    </tempUsage>
  </systemUsage>
</systemUsage>

 

 

三、基础伪代码

     

    @Autowired
    private Destination destination;
    @Autowired
    JmsTemplate jmsTemplate;

    @Test
    public void Sender(){
        for(int i=0;i<50000;i++){
            sendMsg(i);
        }
    }
    // 消息发送
    private void sendMsg(int i){
        try {
            jmsTemplate.convertAndSend(destination,""+i);
        }catch (Exception e){
            sendMsg(i);
        }
    }

   

   

<bean id = "jmsTemplate" class = "org.springframework.jms.core.JmsTemplate">
        <!-- 链接工长 -->
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <!-- 1:非持久化,2:持久化 -->
        <property name="deliveryMode" value="1" />
        <!-- 消息转换器 -->
        <property name="messageConverter" ref="msgConvert"/>
    </bean>
<!-- 队列的目的地描述 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 通过 构造 设定 队列的名字 -->
        <constructor-arg index="0" value="orderQueue"/>
    </bean>

 

四、测试步骤:

       1.开启broker,开启限流5M

       2.发送queue 10W消息,如果没配置异常,则会一直阻塞,配置了则会抛异常

       3.阻塞之后开启消费者,消息能正常收到。

 

小结:

     1.这里将官网的东西拿来做了个简单的说明和解释,如果需要这里的详细实习,得去看ActiveMQMessageProducer 类

    2.我这里用的spring +mq 的形式,如果用原生的东西,有朋友没测试成功,测试的时候尽量剔除其他东西,比如事务之类的。。

    3.有朋友问自己发送了1W消息,为什么控制台只有几千条,因为你发送1W消息其实达到limit 限制之后,后面的消息没有进入quque,因此你在控制台看到的消息是没有那么多的,假设是8K条,而且你的1W消息 只是根据发送点打印的,没进入queue.那么那些消息去哪儿了呢?源码还没具体分析,但是肯定是在内存中,没进入队列。

      这种场景我们很好思考:我限制房间只有100个人,但是来的人很快,一次来10-20个,那么满了的时候就会剩下一些在门口进不去,而客户端觉得我已经发送了120个人过去了。。。,实际我房间只有100个,实际多的20个还在客户端的发送队列里面,如果这时候客户端挂了,那么消费者只能收到100个~。~

       4.这种场景是允许的,如果保证消息不丢失,那么就要持久化的,一般高效的东西都会容忍这类事件的。

       5.有错误的请指点,不胜感谢~。~

 

 

 

 

0
0
分享到:
评论
2 楼 greemranqq 2015-03-17  
liubey 写道
你们用activemq来做什么业务?还是说就是用来学习

简单的订单推送,顺便多学习一些
1 楼 liubey 2015-03-17  
你们用activemq来做什么业务?还是说就是用来学习

相关推荐

    apache-activemq-5.16.0-bin.tar.gz 下载(5积分)

    Apache ActiveMQ是业界广泛使用的开源消息中间件,它基于Java Message Service (JMS) 规范,提供高效、可靠的异步通信解决方案。标题中的"apache-activemq-5.16.0-bin.tar.gz"指的是Apache ActiveMQ的5.16.0版本的二...

    ActiveMQ-讲义.pdf

    通过消息队列,用户的请求可以先写入队列中,服务器根据队列长度决定是否丢弃请求或进行限流,这有助于缓解服务器的压力,保证系统的稳定性。 在实际应用中,有多种消息中间件产品可供选择,例如ActiveMQ、RabbitMQ...

    【BAT必备】activeMQ面试题

    4. **持久化与非持久化消息**:持久化消息即使在ActiveMQ重启后也能保留,而非持久化消息只在内存中存储,如果服务器故障,可能会丢失。 5. **事务支持**:ActiveMQ支持本地JMS事务和XA事务,确保消息的准确投递。 ...

    Java 分布式应用程序设计

    Java 分布式应用程序设计是构建大型、可扩展和高...以上只是Java分布式应用程序设计的一些核心知识点,实际应用中还会涉及到缓存管理、数据库分库分表、安全认证、限流熔断等多个方面,需要不断学习和实践来提升技能。

    ActiveMQInAction

    **ActiveMQ in Action** 是一本专门探讨Apache ActiveMQ的书籍,该书全面深入地介绍了这个流行的开源消息中间件。ActiveMQ是Apache软件基金会的一个项目,它遵循Java消息服务(JMS)规范,提供了高可靠性和可伸缩性...

    2019年一线互联网公司Java高级面试题总结

    - **限流**: - 使用令牌桶算法或漏桶算法限制请求速率。 - **缓存使用**: - 二级缓存、缓存穿透等问题。 - **熔断**: - 当某一微服务发生故障时,通过熔断机制避免连锁反应。 以上知识点涵盖了Java高级面试题的...

    Artemis 高性能的MQ

    Apache Artemis是一个高性能的消息中间件,是ActiveMQ项目的子项目。它采用了可插拔的协议、灵活的地址模型、可配置的传输、持久化存储等多种先进特性。在高性能的消息系统领域,Artemis已经成为了业界的标准之一。 ...

    分布式系统会产生海量级数据流,如:业务日志、监控数据、用户行为等 针对这些数据流进行实时或批量采集汇总,然后对这些数据流进行大数

    1. **限流削峰**:当系统面临超量请求时,MQ能够暂时存储这些请求,防止系统过载,确保服务的稳定性和持久性。 2. **异步解耦**:通过引入MQ,上游系统可以将调用下游系统的操作改为异步,提高系统的并发处理能力和...

    java商城秒杀资料

    - 限流策略:例如令牌桶算法、漏桶算法,限制秒杀流量,防止雪崩效应。 - 监控与报警:实时监控系统性能指标,一旦出现问题能及时报警并进行修复。 通过学习和实践这些知识点,开发者能够设计出一个稳定、高效的...

    Java高级架构必备知识点

    服务限流是指对服务请求进行限制,以避免服务过载。通常包括请求频率限制(如每秒请求次数)和并发连接数限制等策略。 **1.5 服务降级(自动优雅降级)** 当服务遇到不可抗力的故障时,可以选择暂时牺牲部分功能来...

    JAVA面试题(Zookeeper、消息队列、分布式等最新的也有)

    8. **微服务架构**:服务拆分、服务治理、API Gateway、熔断和限流策略。 9. **持续集成/持续部署(CI/CD)**:Jenkins、GitLab CI/CD等工具的使用和实践。 10. **容器化与虚拟化**:Docker容器化的优势,...

    java中高级面试必备技术

    Dubbo是一个高性能、轻量级的开源服务框架,它提供了一整套解决方案,包括服务发布、发现、调用、负载均衡、容错、限流等。Dubbo支持多种服务注册与发现方式,其中Zookeeper是最常用的一种。 **1.2 Dubbo工作原理**...

    Java工程师面试宝典

    - **应用场景**:异步处理、消息解耦。 - **数据提交不成功处理**:重试机制或回滚操作。 #### 电商项目面试问题 - **项目背景介绍**:简述项目的初衷、目标用户及解决的问题。 - **项目架构**:描述使用的架构...

    大型网站系统与Java中间件实践+曾宪杰

    7. **分布式服务框架**:如Dubbo、Spring Cloud,提供服务注册、发现、调用、限流、熔断等功能,支持微服务架构,实现系统的模块化和独立部署。 8. **负载均衡**:如Nginx、HAProxy,可以将流量分发到多个服务器,...

    面试专题课堂笔记.zip

    笔记可能涵盖微服务的概念、Spring Boot与Spring Cloud的使用,以及服务发现、熔断、限流和降级等微服务治理策略。 8. **09MySQL数据库篇.pdf** - MySQL是常用的数据库系统,这部分可能涉及SQL基础、事务处理、索引...

    rpc的简单实现.docx

    总结,本示例展示了RPC的基本原理和实现思路,但实际的RPC框架如Dubbo、gRPC等会包含更多的高级特性,如负载均衡、熔断、限流、服务降级等,以提升系统的稳定性和性能。在设计和实现RPC框架时,需要充分考虑这些因素...

    2021年JAVA核心知识点整理.rar

    2. **API Gateway**:作为系统的统一入口,处理路由、认证、限流等功能。 3. **服务间通信**:RESTful API、gRPC、Dubbo、Netty等。 4. **服务治理**:熔断、降级、负载均衡、健康检查等。 **Netty与RPC** Netty...

    Java实时回单同步的代码

    5. **并发控制**:Java并发库提供了丰富的工具,如`Semaphore`用于限流,`ReentrantLock`和`synchronized`关键字用于线程安全,`ConcurrentHashMap`等并发容器用于高效的数据共享。 6. **错误处理与重试机制**:在...

    大型网站系统与Java中间件实践

    5. 微服务治理:Spring Cloud Netflix提供了完整的微服务治理解决方案,包括服务注册、发现、熔断、限流和降级策略。 总的来说,《大型网站系统与Java中间件实践》涵盖了从系统设计、中间件选择到具体实践的全过程...

Global site tag (gtag.js) - Google Analytics