`
fantaxy025025
  • 浏览: 1279146 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类

消息队列_折腾ActiveMQ时遇到的问题和解决方法

 
阅读更多

 

折腾ActiveMQ时遇到的问题和解决方法:

 

1.先讲严重的:服务挂掉。

这得从ActiveMQ的储存机制说起。在通常的情况下,非持久化消息是存储在内存中的,持久化消息是存储在文件中的,它们的最大限制在配置文件的<systemUsage>节点中配置。但是,在非持久化消息堆积到一定程度,内存告急的时候,ActiveMQ会将内存中的非持久化消息写入临时文件中,以腾出内存。虽然都保存到了文件里,但它和持久化消息的区别是,重启后持久化消息会从文件中恢复,非持久化的临时文件会直接删除。

那如果文件增大到达了配置中的最大限制的时候会发生什么?我做了以下实验:

设置2G左右的持久化文件限制,大量生产持久化消息直到文件达到最大限制,此时生产者阻塞,但消费者可正常连接并消费消息,等消息消费掉一部分,文件删除又腾出空间之后,生产者又可继续发送消息,服务自动恢复正常。

设置2G左右的临时文件限制,大量生产非持久化消息并写入临时文件,在达到最大限制时,生产者阻塞,消费者可正常连接但不能消费消息,或者原本慢速消费的消费者,消费突然停止。整个系统可连接,但是无法提供服务,就这样挂了。

具体原因不详,解决方案:尽量不要用非持久化消息,非要用的话,将临时文件限制尽可能的调大。

详细配置信息见文档:http://activemq.apache.org/producer-flow-control.html

 

2.丢消息。

这得从java的java.net.SocketException异常说起。简单点说就是当网络发送方发送一堆数据,然后调用close关闭连接之后。这些发送的数据都在接收者的缓存里,接收者如果调用read方法仍旧能从缓存中读取这些数据,尽管对方已经关闭了连接。但是当接收者尝试发送数据时,由于此时连接已关闭,所以会发生异常,这个很好理解。不过需要注意的是,当发生SocketException后,原本缓存区中数据也作废了,此时接收者再次调用read方法去读取缓存中的数据,就会报Software caused connection abort: recv failed错误。

通过抓包得知,ActiveMQ会每隔10秒发送一个心跳包,这个心跳包是服务器发送给客户端的,用来判断客户端死没死。如果你看过上面第一条,就会知道非持久化消息堆积到一定程度会写到文件里,这个写的过程会阻塞所有动作,而且会持续20到30秒,并且随着内存的增大而增大。当客户端发完消息调用connection.close()时,会期待服务器对于关闭连接的回答,如果超过15秒没回答就直接调用socket层的close关闭tcp连接了。这时客户端发出的消息其实还在服务器的缓存里等待处理,不过由于服务器心跳包的设置,导致发生了java.net.SocketException异常,把缓存里的数据作废了,没处理的消息全部丢失。

解决方案:用持久化消息,或者非持久化消息及时处理不要堆积,或者启动事务,启动事务后,commit()方法会负责任的等待服务器的返回,也就不会关闭连接导致消息丢失了。

关于java.net.SocketException请看我的详细研究:http://blog.163.com/_kid/blog/static/3040547620160231534692/

 

3.持久化消息非常慢。

默认的情况下,非持久化的消息是异步发送的,持久化的消息是同步发送的,遇到慢一点的硬盘,发送消息的速度是无法忍受的。但是在开启事务的情况下,消息都是异步发送的,效率会有2个数量级的提升。所以在发送持久化消息时,请务必开启事务模式。其实发送非持久化消息时也建议开启事务,因为根本不会影响性能。

 

4.消息的不均匀消费。

有时在发送一些消息之后,开启2个消费者去处理消息。会发现一个消费者处理了所有的消息,另一个消费者根本没收到消息。原因在于ActiveMQ的prefetch机制。当消费者去获取消息时,不会一条一条去获取,而是一次性获取一批,默认是1000条。这些预获取的消息,在还没确认消费之前,在管理控制台还是可以看见这些消息的,但是不会再分配给其他消费者,此时这些消息的状态应该算作“已分配未消费”,如果消息最后被消费,则会在服务器端被删除,如果消费者崩溃,则这些消息会被重新分配给新的消费者。但是如果消费者既不消费确认,又不崩溃,那这些消息就永远躺在消费者的缓存区里无法处理。更通常的情况是,消费这些消息非常耗时,你开了10个消费者去处理,结果发现只有一台机器吭哧吭哧处理,另外9台啥事不干。

解决方案:将prefetch设为1,每次处理1条消息,处理完再去取,这样也慢不了多少。

详细文档:http://activemq.apache.org/what-is-the-prefetch-limit-for.html

 

5.死信队列。

如果你想在消息处理失败后,不被服务器删除,还能被其他消费者处理或重试,可以关闭AUTO_ACKNOWLEDGE,将ack交由程序自己处理。那如果使用了AUTO_ACKNOWLEDGE,消息是什么时候被确认的,还有没有阻止消息确认的方法?有!

消费消息有2种方法,一种是调用consumer.receive()方法,该方法将阻塞直到获得并返回一条消息。这种情况下,消息返回给方法调用者之后就自动被确认了。另一种方法是采用listener回调函数,在有消息到达时,会调用listener接口的onMessage方法。在这种情况下,在onMessage方法执行完毕后,消息才会被确认,此时只要在方法中抛出异常,该消息就不会被确认。那么问题来了,如果一条消息不能被处理,会被退回服务器重新分配,如果只有一个消费者,该消息又会重新被获取,重新抛异常。就算有多个消费者,往往在一个服务器上不能处理的消息,在另外的服务器上依然不能被处理。难道就这么退回--获取--报错死循环了吗?

在重试6次后,ActiveMQ认为这条消息是“有毒”的,将会把消息丢到死信队列里。如果你的消息不见了,去ActiveMQ.DLQ里找找,说不定就躺在那里。

详细文档:http://activemq.apache.org/redelivery-policy.html

http://activemq.apache.org/message-redelivery-and-dlq-handling.html

 

 

=

=

=

1

1

1

 

分享到:
评论

相关推荐

    ActiveMQ队列消息过期时间设置和自动清除解决方案.docx

    在使用 ActiveMQ 时,消息过期时间设置和自动清除是一个非常重要的问题。本文将介绍 ActiveMQ 队列消息过期时间设置和自动清除的解决方案。 一、消息过期时间设置 在 ActiveMQ 中,可以通过设置时间戳插件来实现...

    msmq.rar_java msmq_java 消息队列_java消息队列_msmq_消息队列

    6. **启动和停止消息队列**:在需要时启动和关闭消息队列服务。在Windows中,这可以通过服务管理器完成。 7. **测试和调试**:编写测试用例,确保消息能够正确发送和接收。使用日志记录和错误处理机制进行调试。 ...

    Spring-ActiveMQ.rar_Spring Activemq_activemq_activemq spring

    而Spring框架,作为一个Java平台的全功能模块化解决方案,提供了与ActiveMQ集成的能力,让开发者能够轻松地在Spring应用中使用消息队列。本篇将深入探讨Spring与ActiveMQ的集成及其配置过程。 首先,理解Spring与...

    zis.rar_active MQ_activemq_java activeMQ_java 转发

    在这个"zis.rar_active MQ_activemq_java _activeMQ_java 转发"的压缩包中,我们可以推测其主要内容可能涉及如何使用ActiveMQ在Java环境中实现消息的转发功能。 首先,我们需要理解ActiveMQ的基本概念。ActiveMQ...

    消息队列_消息队列_

    2. **异步处理**:消息不是立即处理,而是存入队列,待消费者有空时处理,提高了系统的响应速度。 3. **负载均衡**:多个消费者可以从队列中获取并处理消息,实现任务的负载均衡。 4. **容错性**:如果消费者故障,...

    7道消息队列ActiveMQ面试题!

    ActiveMQ是一款非常流行的开源消息队列中间件,它实现了JMS(Java Message Service,Java消息服务)1.1规范,面向消息的中间件(Message Oriented Middleware,MOM)是指利用高效可靠的消息传递机制进行与平台无关的...

    activemq-cpp-library-3.6.0-src.tar.gz_C# ActiveMQ_activemq_activ

    消息分为多种类型,包括文本、对象、流和文件等,它们可以通过生产者发送到队列或主题,也可以由消费者接收。 3. **异步消息处理**:ActiveMQ-CPP库支持异步消息处理,这意味着消息的发送和接收可以在不同的线程中...

    ActiveMQ学习笔记之九--发送消息到队列中

    在IT行业中,Apache ActiveMQ是一个广泛使用的开源消息代理和队列服务器,它是Java Message Service (JMS) 的实现,能够处理各种消息传递模式,包括点对点和发布/订阅。这篇"ActiveMQ学习笔记之九--发送消息到队列中...

    apache-activemq-5.0.0-src.zip_ActiveMQ 源代码_activemq_activemq.src

    通过深入学习和研究ActiveMQ的源代码,开发者不仅可以掌握消息中间件的基本原理,还能学习到高级特性如事务处理、消息优先级、延迟消息、死信队列等。同时,对于Java NIO、多线程编程、网络编程等领域也有很好的实践...

    消息队列activeMQ

    - **监控与报警**:定期检查ActiveMQ的日志和性能指标,设置合理的报警阈值,及时发现和解决问题。 通过"消息队列activemq运用的dome",你可以深入理解ActiveMQ的工作原理和实际应用,进一步提升你在分布式系统中的...

    spring 整合activemq实现自定义动态消息队列

    本文档参考了纯粹的activemq java代码和百度上的demo,很简洁的实现了动态消息队列的生成和获取,但是没有自定义监听(当前项目不需要),本文档只有功能实现类 即业务层。若要调用和推送 则需要自己根据需求编写。...

    ActiveMQ消息过期时间设置和自动清除解决方案

    ### ActiveMQ 消息过期时间设置与自动清除解决方案 #### 概述 在消息队列的场景下,为了防止消息长时间滞留在队列中占用资源或者为了满足业务上对消息时效性的需求,通常需要对消息设定过期时间。本文档详细介绍了...

    bank.rar_ATM模拟器_消息队列 _读写队列

    在本文中,我们将深入探讨如何使用消息队列和读写队列来构建一个小型的ATM(自动取款机)模拟器。这个名为"bank.rar"的压缩包包含了一个ATM模拟器的实现,它利用了消息队列来处理客户端与服务器间的通信,并通过文件...

    JMS.rar_activemq_jms_jms activemq

    此外,ActiveMQ还支持各种高级特性,如持久化消息、优先级、消息分页、消息组、事务等,以及通过JMX进行管理和监控。对于大型分布式系统,这些特性有助于提高系统的可靠性和性能。 总的来说,本示例旨在演示如何...

    activeMQ_spring简单案例(含XML配置)

    在实际应用中,`activeMQ_p2s`和`activeMQ_spring`这两个文件可能包含了示例代码,分别展示了生产者(Producer)和消费者(Consumer)的实现。生产者通常会使用`JmsTemplate`发送消息,而消费者则通过实现`Message...

    oss上传以及消息队列相关知识activemq

    此外,ActiveMQ还提供了一些高级功能,如消息优先级、消息过滤(通过订阅模式)、消息重试和死信队列等。 总结来说,OSS上传涉及到云存储服务的使用,包括身份验证、文件上传和管理,而ActiveMQ则是实现系统间异步...

    test_jms.zip_activemq_activemq案例_jms_jms test

    标题"test_jms.zip_activemq_activemq案例_jms_jms test"中,我们可以看出这是关于一个与JMS(Java Message Service)相关的项目,使用了ActiveMQ作为消息中间件,并且包含了一些测试内容。"activemq案例"暗示这是一...

    ActiveMQ消息队列主题订阅Spring整合

    可以使用ActiveMQ的Web控制台监控消息队列的状态,查看消息的发送和接收情况。 通过以上步骤,你可以成功地将ActiveMQ消息队列与Spring框架整合,实现基于主题订阅的消息传递。这种整合有助于解耦系统组件,提高...

    queue_消息队列_

    综上所述,消息队列是解决大文件传输问题的有效工具,通过它可以在进程间实现高效、可靠的通信,同时提供了良好的扩展性和容错性。在实际应用中,选择适合的中间件和优化策略对于提升系统性能至关重要。

    7道消息队列ActiveMQ面试题!.zip

    消息队列ActiveMQ是Java开发中的重要组件,尤其在分布式系统和高并发场景下,它扮演着关键角色。本文将围绕ActiveMQ展开,基于提供的标题和描述,详细讲解与ActiveMQ相关的七个面试知识点。 1. **什么是消息队列...

Global site tag (gtag.js) - Google Analytics