一、序言
有时候我们追求最快的方式发送消息,我们就采用的异步方式,并且不持久化。但是这样带来的问题有这样几个:
1.如果消费者的消费能力低于生产者,那么消息就会积压在broker, 从而导致broker 可能挂掉。
2.我们知道存放内存的模式,只要出现宕机或者其他问题,容易丢消息,因此得看情况而定
对于问题1,activemq 采用了限流 内存溢出提醒的方式进行处理,下面是一些实例过程。
官方介绍可以参考:http://activemq.apache.org/producer-flow-control.html
二、操作过程:
1.我用的activemq.5.11 版本,下载下来打开conf/activemq.xml默认配置改为异步persistent=false
<broker xmlns="http://activemq.apache.org/schema/core" persistent="false" brokerName="localhost" dataDirectory="${activemq.data}">
同时开启限流模式,未了测试方便,我们限流5M
<policyEntry queue=">" producerFlowControl="true" memoryLimit="5mb" /> # 经过测试 producerFlowControl 是默认开启的
这里有原文信息:
Note that, since the introduction of the new file cursor in ActiveMQ 5.x, non-persisted messages are shunted into the temporary file store to reduce the amount of memory used for non-persistent messaging. As a result, you may find that a queue's memoryLimit is never reached, as the cursor doesn't use very much memory. 英文不好,大概意思是: activemq 5.0 以后引入了new file cursor ,非持久化消息会分流到temporary file 存储,用来减少实际内存使用。结果你会发现queue 的内存限制,无法超标,cursor 不会消耗很多内存。 这里有个疑问,反正temporary file 我是没看到- -,不知道为什么 注意:这里是每个queue 限制5M,不是总的
2.测试方式:
打开borker,然后发送10W消息到queue,你可以看到到了一定条数,producer 就不动了,这时候你再打开customer ,才会正常消费。
3.当内存达到limit 限制的时候,我们可能需要做其他处理,方便我们得知,比如:让生产者异常
<systemUsage> <systemUsage sendFailIfNoSpace="true"> <memoryUsage> <memoryUsage limit="20 mb"/> </memoryUsage> </systemUsage> </systemUsage>
然后生产者,或者说客户端就可以捕获:javax.jms.ResourceAllocationException
当然这个会直接抛出异常,后面了个属性,表示5秒后才抛出这个异常。
<systemUsage sendFailIfNoSpaceAfterTimeout="5000">
我们捕获异常之后就可以持续发送消息之后,就可以做其他处理了,broker 也不会爆掉了。
3.Disabling Flow Control 不控制流量的做法
其实这是为了使用整个服务器的配置资源,也就是说上面是单个控制,这里是整体控制
# 默认的系统配置,分别代表
#memoryUsage 内存
#storeUsage 持久化
#tempUsage 临时数据
#当整体数据 操作下面的时候,才爆异常,就不用单个控制了
# 比如用异步 非持久化 测试,Memory percent used 到100 就不动了
# 这部分的使用可以参考:
# http://akuntamukkala.blogspot.com/2014/01/understanding-memory-usage-in-activemq.html
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="64 mb" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb" />
</storeUsage>
<tempUsage>
<tempUsage limit="10 gb" />
</tempUsage>
</systemUsage>
</systemUsage>
三、基础伪代码
@Autowired private Destination destination; @Autowired JmsTemplate jmsTemplate; @Test public void Sender(){ for(int i=0;i<50000;i++){ sendMsg(i); } } // 消息发送 private void sendMsg(int i){ try { jmsTemplate.convertAndSend(destination,""+i); }catch (Exception e){ sendMsg(i); } }
<bean id = "jmsTemplate" class = "org.springframework.jms.core.JmsTemplate"> <!-- 链接工长 --> <property name="connectionFactory" ref="cachingConnectionFactory"/> <!-- 1:非持久化,2:持久化 --> <property name="deliveryMode" value="1" /> <!-- 消息转换器 --> <property name="messageConverter" ref="msgConvert"/> </bean> <!-- 队列的目的地描述 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 通过 构造 设定 队列的名字 --> <constructor-arg index="0" value="orderQueue"/> </bean>
四、测试步骤:
1.开启broker,开启限流5M
2.发送queue 10W消息,如果没配置异常,则会一直阻塞,配置了则会抛异常
3.阻塞之后开启消费者,消息能正常收到。
小结:
1.这里将官网的东西拿来做了个简单的说明和解释,如果需要这里的详细实习,得去看ActiveMQMessageProducer 类
2.我这里用的spring +mq 的形式,如果用原生的东西,有朋友没测试成功,测试的时候尽量剔除其他东西,比如事务之类的。。
3.有朋友问自己发送了1W消息,为什么控制台只有几千条,因为你发送1W消息其实达到limit 限制之后,后面的消息没有进入quque,因此你在控制台看到的消息是没有那么多的,假设是8K条,而且你的1W消息 只是根据发送点打印的,没进入queue.那么那些消息去哪儿了呢?源码还没具体分析,但是肯定是在内存中,没进入队列。
这种场景我们很好思考:我限制房间只有100个人,但是来的人很快,一次来10-20个,那么满了的时候就会剩下一些在门口进不去,而客户端觉得我已经发送了120个人过去了。。。,实际我房间只有100个,实际多的20个还在客户端的发送队列里面,如果这时候客户端挂了,那么消费者只能收到100个~。~
4.这种场景是允许的,如果保证消息不丢失,那么就要持久化的,一般高效的东西都会容忍这类事件的。
5.有错误的请指点,不胜感谢~。~
相关推荐
Apache ActiveMQ是业界广泛使用的开源消息中间件,它基于Java Message Service (JMS) 规范,提供高效、可靠的异步通信解决方案。标题中的"apache-activemq-5.16.0-bin.tar.gz"指的是Apache ActiveMQ的5.16.0版本的二...
通过消息队列,用户的请求可以先写入队列中,服务器根据队列长度决定是否丢弃请求或进行限流,这有助于缓解服务器的压力,保证系统的稳定性。 在实际应用中,有多种消息中间件产品可供选择,例如ActiveMQ、RabbitMQ...
4. **持久化与非持久化消息**:持久化消息即使在ActiveMQ重启后也能保留,而非持久化消息只在内存中存储,如果服务器故障,可能会丢失。 5. **事务支持**:ActiveMQ支持本地JMS事务和XA事务,确保消息的准确投递。 ...
Java 分布式应用程序设计是构建大型、可扩展和高...以上只是Java分布式应用程序设计的一些核心知识点,实际应用中还会涉及到缓存管理、数据库分库分表、安全认证、限流熔断等多个方面,需要不断学习和实践来提升技能。
**ActiveMQ in Action** 是一本专门探讨Apache ActiveMQ的书籍,该书全面深入地介绍了这个流行的开源消息中间件。ActiveMQ是Apache软件基金会的一个项目,它遵循Java消息服务(JMS)规范,提供了高可靠性和可伸缩性...
- **限流**: - 使用令牌桶算法或漏桶算法限制请求速率。 - **缓存使用**: - 二级缓存、缓存穿透等问题。 - **熔断**: - 当某一微服务发生故障时,通过熔断机制避免连锁反应。 以上知识点涵盖了Java高级面试题的...
Apache Artemis是一个高性能的消息中间件,是ActiveMQ项目的子项目。它采用了可插拔的协议、灵活的地址模型、可配置的传输、持久化存储等多种先进特性。在高性能的消息系统领域,Artemis已经成为了业界的标准之一。 ...
1. **限流削峰**:当系统面临超量请求时,MQ能够暂时存储这些请求,防止系统过载,确保服务的稳定性和持久性。 2. **异步解耦**:通过引入MQ,上游系统可以将调用下游系统的操作改为异步,提高系统的并发处理能力和...
- 限流策略:例如令牌桶算法、漏桶算法,限制秒杀流量,防止雪崩效应。 - 监控与报警:实时监控系统性能指标,一旦出现问题能及时报警并进行修复。 通过学习和实践这些知识点,开发者能够设计出一个稳定、高效的...
服务限流是指对服务请求进行限制,以避免服务过载。通常包括请求频率限制(如每秒请求次数)和并发连接数限制等策略。 **1.5 服务降级(自动优雅降级)** 当服务遇到不可抗力的故障时,可以选择暂时牺牲部分功能来...
8. **微服务架构**:服务拆分、服务治理、API Gateway、熔断和限流策略。 9. **持续集成/持续部署(CI/CD)**:Jenkins、GitLab CI/CD等工具的使用和实践。 10. **容器化与虚拟化**:Docker容器化的优势,...
Dubbo是一个高性能、轻量级的开源服务框架,它提供了一整套解决方案,包括服务发布、发现、调用、负载均衡、容错、限流等。Dubbo支持多种服务注册与发现方式,其中Zookeeper是最常用的一种。 **1.2 Dubbo工作原理**...
- **应用场景**:异步处理、消息解耦。 - **数据提交不成功处理**:重试机制或回滚操作。 #### 电商项目面试问题 - **项目背景介绍**:简述项目的初衷、目标用户及解决的问题。 - **项目架构**:描述使用的架构...
7. **分布式服务框架**:如Dubbo、Spring Cloud,提供服务注册、发现、调用、限流、熔断等功能,支持微服务架构,实现系统的模块化和独立部署。 8. **负载均衡**:如Nginx、HAProxy,可以将流量分发到多个服务器,...
笔记可能涵盖微服务的概念、Spring Boot与Spring Cloud的使用,以及服务发现、熔断、限流和降级等微服务治理策略。 8. **09MySQL数据库篇.pdf** - MySQL是常用的数据库系统,这部分可能涉及SQL基础、事务处理、索引...
总结,本示例展示了RPC的基本原理和实现思路,但实际的RPC框架如Dubbo、gRPC等会包含更多的高级特性,如负载均衡、熔断、限流、服务降级等,以提升系统的稳定性和性能。在设计和实现RPC框架时,需要充分考虑这些因素...
2. **API Gateway**:作为系统的统一入口,处理路由、认证、限流等功能。 3. **服务间通信**:RESTful API、gRPC、Dubbo、Netty等。 4. **服务治理**:熔断、降级、负载均衡、健康检查等。 **Netty与RPC** Netty...
5. **并发控制**:Java并发库提供了丰富的工具,如`Semaphore`用于限流,`ReentrantLock`和`synchronized`关键字用于线程安全,`ConcurrentHashMap`等并发容器用于高效的数据共享。 6. **错误处理与重试机制**:在...
5. 微服务治理:Spring Cloud Netflix提供了完整的微服务治理解决方案,包括服务注册、发现、熔断、限流和降级策略。 总的来说,《大型网站系统与Java中间件实践》涵盖了从系统设计、中间件选择到具体实践的全过程...