`
y806839048
  • 浏览: 1125964 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

mq预取机制

阅读更多

 

总括:

 

预取时生产者向消费者推预取的数量,消费完后再给,不消费玩不给(计划经济)(批量给相应的消费者)

 

预取为0时消费者主动获取

 

 

 ActiveMQ的一个主要的设计目标是:提供一个高性能的消息中间件。它使用了SEDA(Staged Event Driven Architecture)架构及异步传输。为了提供更高的性能,很重要的一点是 尽快地将消息传送给消费者,这样消费者利用消息缓冲区等待处理,而不是等待消息。
  然后,这样也有很大风险:不断地向 消费者 传送消息可能使得其消息缓冲溢出,因为传送的速度比消费者真正“消费”消息更快,这是很可能。

  因此,ActiveMQ使用了 消息”预取限制“(prefetch limit):表示在某个时间段内,可能向消费者传输的最大消息量,如果达到该上限,那么停止发送,直到ActiveMQ收到消费者的acknowledgements(确认,表示已经处理了该消息)。prefetch limit可以针对每个不同的consumer来设置。
  为了获取更高的性能,prefetch limit当然是越大越好,只要consumer有足够大的消息缓冲区(messagevolume)。如果消息的总量非常少,而且每个消息的处理时间非常的长,那么,可以将prefetch设置为1,这样,每次向consumer发送一个消息,等其确认已经处理完毕后,再发送第二个。
  特别地,如果prefetch设置为0,表示consumer每次 主动向activeMQ要求传输最大的数据量,而不是被动地接收消息。

如何指定prefetech的值:
  通过指定ActiveMQConnection或ActiveMQConnectionFactory的
ActiveMQPretchPolicy来设置所有的pretch值:各种不同的消息服务都可以指定不同的值,如下:
   

  • persistent queues (default value: 1000)
  • non-persistent queues (default value: 1000)
  • persistent topics (default value: 100)
  • non-persistent topics (default value: Short.MAX_VALUE -1)

To change the prefetch size for all consumer types you would use a connection URI similar to:

tcp://localhost:61616?jms.prefetchPolicy.all=50

To change the prefetch size for just queue consumer types you would use a connection URI similar to:

tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1

It can also be configured on a per consumer basis using Destination Options.

  1.  
    queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
  2.  
    consumer = session.createConsumer(queue);

P

 

PPooled Connections and prefetch

当连接来自一个连接池中,消费消息可能出现一些由于“prefetch”而产生的问题:预取的消息(还未被处理)当连接关闭时会被释放(即,可以在activeMQ中再次读取到该消息)。而连接池中的连接只有在连接池关闭后才真正的销毁。这使得 预取的消息直到连接被重用时才会被处理(或者等连接池关闭,可再次从activeMQ中读取)。这样导致了消息可能丢失,或者当连接池中有多个连接时,消息乱序(out-of-sequence)!


一个处理方法是:将生产者放入连接池,消费者则不放入连接池!当然,这样在消费者的性能上,会受到影响(当有多个线程快速的消费消息时,对象被不断的创建销毁)。

另外一种方法:将消费者的连接池中数量设为1。

 

第三种方法:在使用连接池的情况下,将prefetch设为1或者0。当使用Spring JMS和MessageDrivenPojo时,只能将prefetch设为1,而不能为0;

分享到:
评论

相关推荐

    MQ 消息机制

    9. **性能优化**:ActiveMQ有多种性能优化策略,如预取(Prefetch)机制,允许消费者一次性获取多个消息,减少网络往返次数,提高效率。 10. **监控与管理**:ActiveMQ提供了一个Web控制台,便于监控队列状态、查看...

    MQ java 编程指南

    此外,还会讨论如何进行性能调优,包括批量发送、预取策略以及线程池管理等。 对于分布式系统的设计,书中有专门的章节讨论如何利用MQ实现微服务间的通信,包括事件驱动架构、发布/订阅模式以及点对点模型。这部分...

    MQClient写的成品逻辑

    7. **错误处理和重试机制**:确保消息在失败时能够被正确处理,可能需要实现重试策略和死信队列。 8. **性能优化**:考虑消息批量发送、预取消息和缓冲策略来提高效率。 9. **安全性**:设置权限控制和身份验证,...

    MQ考试教程(英文)

    6. **安全性**:MQ提供了安全机制,包括用户身份验证、权限控制和加密。这确保了只有授权的用户和应用程序可以访问和处理消息。 7. **高可用性与故障恢复**:MQ可以通过镜像队列、集群和备份策略实现高可用性,确保...

    带你去MQ的世界旅行

    - **预取机制**:允许一次性获取多条消息,提高消费效率。 - **生态完善**:拥有成熟的社区支持和工具集,方便管理和维护。 **劣势**: - **伪分布式设计**:尽管支持集群部署,但并不支持数据分片,限制了其扩展...

    04-消息中间件MQ面试题(最新版).zip

    - **预取机制**:消费者提前从队列中拉取一定数量的消息,减少延迟。 - **调整队列和消费者数量**:根据系统负载动态调整,平衡性能和资源消耗。 10. **如何处理MQ的故障恢复?** - **冗余部署**:MQ集群化,...

    MQ可靠生产与可靠消费

    RabbitMQ提供了一种称为“发布者确认”的机制,确保生产者发送的消息已被MQ服务器接收。当启用此功能时,RabbitMQ会向生产者发送一个确认信号,表明消息已经安全存储。如果生产者没有收到确认,可以重试发送,从而...

    全网最新,MQ面试题(持续更新)

    - **内存消耗**:预取和缓存机制可能导致较高的内存占用。 - **复杂性**:丰富的特性和配置选项可能增加学习和运维成本。 **应用场景**:RabbitMQ适用于对协议标准、消息路由灵活性和可靠性要求较高的场景,尤其...

    Rabbimq几个关键参数对性能的影响1

    同样,从测试#1和#7中,我们发现开启消费者与MQ之间的Ack2(Acknowledge)也会影响QPS,表明了确认机制对性能的负面影响。 再者,QoS设置,特别是prefetch_count(预取计数),对性能的影响体现在测试#1和#8中。...

    通过消息传递数据1.rar_消息传递_消息机制

    另外,许多中间件和框架也支持消息传递,如IBM的MQ系列、RabbitMQ、Apache Kafka等,它们提供了更高级别的抽象和更强大的功能,如负载均衡、消息持久化和高可用性。 “通过消息传递数据”这个主题通常涵盖以下几个...

    activeMQ:1.消息队列MQ的项目实战代码

    消息队列MQ的项目实战代码”中,我们将探讨如何在实际项目中使用ActiveMQ,以及如何通过提供的代码进行学习和实践。 首先,ActiveMQ作为消息队列,其主要功能是提供消息的发送、存储和接收。在分布式环境中,它可以...

    ActiveMQ高并发处理方案

    ActiveMQ使用了一种称为“预取策略”的机制来决定向消费者发送多少条消息。默认情况下,每个消费者的预取数量为1000条,这意味着在没有特别配置的情况下,消费者将预取最多1000条消息到本地缓冲区中。当一个消费者...

    3.6:消费端如何做限流量1

    2. **消息预取策略**:RabbitMQ的QoS还支持预取(prefetch)设置,即设置从broker到消费者的消息批量大小。通过`prefetchSize`和`prefetchCount`参数来调整。`prefetchSize`通常设为0,表示不限制消息大小,而`...

    轻量级java消息中间件源码.zip

    7. 性能优化措施,如批量发送、预取消息等。 通过深入理解这些关键技术,开发者不仅能掌握Uncode MQ的工作原理,还能为构建自己的消息中间件或优化现有系统打下坚实基础。在实际应用中,可以根据业务需求选择适合的...

    queue-demo-wwtbnbw.zip

    此外,通过批量消费和预取机制,消费者可以提高处理效率。消息队列还支持消息确认机制,确保消息被正确处理,即使在系统故障时也能保证数据的完整性。 为了实现高可用性,消息队列通常会采用集群部署,确保即使某个...

    ActiveMQ使用Ajax实现多人聊天室

    8. **性能优化**:根据实际需求,可能需要调整ActiveMQ的配置,如预取策略(Prefetch Policy)以平衡消息传递速度和内存使用,以及队列的持久化策略等。 总结起来,通过集成ActiveMQ和Ajax,我们可以构建一个高效、...

    ActiveMQ消息中间件面试专题.pdf

    在ActiveMQ中,默认情况下,消费者会一次性预取一批消息(默认为1000条),这个机制被称为prefetch。这可能导致消息的不均匀消费,因为所有消息会优先分配给少数消费者,而其他消费者则没有分配到消息。要解决这个...

    7道消息队列ActiveMQ面试题!

    这主要是因为ActiveMQ的prefetch机制,它允许消费者一次预取一批消息(默认是1000条)。如果消费者未能及时确认消息,这些消息将不会被分配给其他消费者。为了实现更均匀的消息消费,可以将prefetch的值设置为1,即...

    MQserier入门,金融解决方案

    4. 性能优化:MQSeries支持批量发送和预取消息,减少网络延迟,提高处理速度。 5. 易于管理:提供丰富的管理工具,方便监控、配置和故障排查。 在MQSeries的入门学习中,我们需要了解以下几个关键概念: - 队列...

    基于Cerebot MX4开发板的家用智能监控系统整体方案设计.doc

    4. **烟雾检测**:MQ-2传感器对可燃气体和烟雾有高灵敏度,当检测到异常时,输出电压变化触发报警机制。无线收发模块将报警信息实时传递给Cerebot MX4,由GSM模块发送短信通知用户。 5. **附加功能**:系统具备真人...

Global site tag (gtag.js) - Google Analytics