`

ActiveMQ Producer Flow Control(生产者流量控制)

阅读更多

转载自:http://windows9834.blog.163.com/blog/static/2734500420131192144239/

对官方文档:http://activemq.apache.org/producer-flow-control.html  进行翻译。

在ActiveMQ4.x版本中,使用了流量控制措施,流量控制是用TCP流量控制实现的。因生产者受流量限制使消费者的底层网络连接将被挂起(等待消息),以强制进行流量控制限制。这个策略非常高效,但是如果有多个生产者和消费者共享同一个连接的时候,可能会导致死锁。

在ActiveMQ5.0版本中,我们可以分别对一个共享连接上的各个生产者进行流量控制,而不需要挂起整个连接。“流量控制”意味着当代理 (broker)检测到目标(destination)的内存容量,或temp-或file-store超过了限制,消息流的速度可能被减慢。生产者将会被阻塞直至资源可用,或者收到一个JMSException异常:这种行为是可配置的,下面的<systemUsage>章节会描述到。

 

值得注意的是,当memoryLimit或<systemUsage>限制达到的时候,<systemUsage>默认的设置会引起生产者阻塞:这种阻塞行为有时会被误解为“挂起的生产者”,而事实是生产者只是被挂起,还是活动的,只是一直在等待,直到有可用空间。

.同步发送的消息将会自动对每一个生产者使用流量控制;这一般只针对于持久性消息同步发送,除非您启用useAsyncSend的标志。

.当生产者使用异步发送消息时,一般来说,就是发送非持久化消息的生产者,不需要等候来自代理broker的任何确认回复消息;所以,如果内存限制被超过了,将不会被通知。如果你真的想什么时候都能知道代理broker的限制被超过了,则需要配置ProducerWindowSize这一连接选项,这样就算是异步消息也会对每一个生产者进行流量控制。

ActiveMQConnectionFactory connctionFactory =...
connctionFactory.setProducerWindowSize(1024000);

ProducerWindowSize是一个生产者在等到确认消息之前,可以发送给代理broker的最大字节数据量,这个确认消息用来告诉生产者,代理broker已经收到先前发送的消息了。

也就是说,
ProducerWindowSize是指在收到broker确认应答之前,生产者能够传送消息给broker的最大信息量。
即使是异步发送消息,生产者也是在收到broker确认应答之后才把下一条消息传给broker。当使用异步传送的时候,可以设置jms.producerWindowSize(单位为字节)的属性,当生产者中等待发送的信息量到达设置的值时,即使没有收到broker的应答消息,生产者同样会把这些消息发给broker。

另外,如果你要发送非持久化的消息(该消息默认是异步发送的),并且想要知道queue或者topic的内存使用是否达到限制,你那你可以简单的将连接工厂配置为“alwaysSyncSend”。虽然这样会变得稍微慢一点,但是这将保证当出现内存问题时,你的消息生产者能够及时得到通知。


如果你愿意,你可以通过在代理broker的配置中,对特定JMS的queue和toipc禁止流量控制,在目的地(destination)的策略(policy)中的producerFlowControl标志设置为false,使代理broker上特定的JMS queue和topic无效,例如:

<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntrytopic="FOO.>" producerFlowControl="false"/>
</policyEntries>
</policyMap>
</destinationPolicy>

查看 Broker Configuration.
需要注意的是,自从ActiveMQ 5.x中引入新的文件游标之后,非持久化消息被分流到了临时文件存储中,以此来减少非持久化消息传送使用的内存总量。结果就是,你可能会发现一个队列的内存限制永远达不到,因为游标不需要使用太多的内存。如果你真的想把所有的非持久化消息存放在内存中,并在达到内存限制的时候停掉生产者,你需要配置<vmQueueCursor>。

<policyEntryqueue=">" producerFlowControl="true" memoryLimit="1mb">    
<pendingQueuePolicy>
<vmQueueCursor/>
</pendingQueuePolicy>
</policyEntry>

上面的片段可以保证所有的非持久化队列消息都保存在内存中,每一个队列的内存限制为1Mb。

How Producer Flow Control works(生产者流量控制是如何工作的)

如果你发送一条持久化消息(这样就会有一个对OpenWire响应消息的期望),代理broker将会给生产者发送一个ProducerAck 应答消息。该消息会通知生产者其先前的发送窗口已经被处理了,所以它现在可以发送另外的窗口了。这有点像消费者应答,不过是反向的。

Advantage 优势

所以,一个很好的生产者再发送更多的数据之前,会等待生产者应答,以此来避免对代理broker的冲击(并且如果出现了一个比较慢的消费者,强制代理阻塞整个连接)。如果你想知道这部分的源代码是怎么实现的,可以看一下ActiveMQMessageProducer的代码。

 

虽然一个客户端可以完全忽略生产者的所有应答消息,并且处理慢消费者的时候,代理可以在需要的时候拖延传送;虽然这意味着它将拖延整个连接。

Configure Client-Side Exceptions(配置客户端的异常)

应对当代理broker空间不足,而导致send()方法的无限期阻塞操作的一种替代方案,就是将其配置成客户端抛出的一个异常。通过将sendFailIfNoSpace属性设置为true,代理broker将会引起send()方法失败,并抛出javax.jms.ResourceAllocationException异常,并将其传递给客户端。下面是一个这种配置的示例:

<systemUsage>
<systemUsagesendFailIfNoSpace="true">
<memoryUsage>
<memoryUsagelimit="20 mb"/>
</memoryUsage>
</systemUsage>
</systemUsage>

设置这个属性的好处是,客户端可以捕获javax.jms.ResourceAllocationException异常,过后,并重试send()操作,而不是无限期地傻等下去。

从5.3.1版本之后,sendFailIfNoSpaceAfterTimeout属性被加了进来。这个属性同样导致send()方法失败,并在客户端抛出异常,但仅当等待了指定时间之后才触发。如果在配置的等待时间过去之后,代理broker上的空间仍然没有被释放,仅当这个时候send()方法才会失败,并且在客户端抛出异常。下面是一个示例:

<systemUsage>
<systemUsagesendFailIfNoSpaceAfterTimeout="3000">
<memoryUsage>
<memoryUsagelimit="20 mb"/>
</memoryUsage>
</systemUsage>
</systemUsage>

定义超时时间的单位是毫秒,所以上面的例子将会在当send()方法失败并对客户端抛出异常之前,等待三秒。这个属性的优点是,它仅仅阻塞配置指定的时间,而不是立即抛出发送失败的异常,或者是无限期阻塞。这个属性不仅在代理broker端提供了一个改进,还对客户端提供了一个改进,使得客户端能捕获异常,稍后并重试send() 操作。

Disabling Flow Control(禁用流量控制)

一个常见的要求是禁用流量控制,使得消息分发能够持续进行,直到所有可用的磁盘空间被等待发送(pending)的消息占用耗尽(无论是持久化的还是配置了非持久化的)。要实现这个要求,你可以使用消息游标(Message Cursors)。

 

System usage(系统资源占用)

你还可以通过<systemUsage>元素的一些属性来减慢生产者。来看一眼下面的例子:

<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsagelimit="64 mb"/>
</memoryUsage>
<storeUsage>
<storeUsagelimit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsagelimit="10 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>

你可以为非持久化的消息(NON_PERSISTENT messages)设置内存限制,为持久化消息(PERSISTENT messages)设置磁盘空间,以及为临时消息设置总的空间,代理broker将在减慢生产者发送速度之前使用这些空间。使用上述的默认设置,当资源使用完时,代理broker将会一直阻塞send()方法的调用,直至一些消息被消费,并且代理有了可用空间。默认值如上例所述,你可能需要根据你的环境增加这些值。

分享到:
评论

相关推荐

    springboot整合activemq 生产者 一对一,一对多

    springboot整合 activeMq 生产者 发送消息 包含队列模式点对点发送消息 以及 主题模式一对多发送消息 这是生产者的demo producer; 需要配合消费者的demo consumer 使用

    ActiveMQ生产者

    **ActiveMQ生产者详解** ActiveMQ是Apache组织开发的一个开源消息中间件,它遵循Java Message Service(JMS)规范,提供了高效、可靠的异步通信能力。在分布式系统中,ActiveMQ作为消息代理,允许应用程序之间通过...

    activemq生产者和消费者案例代码.zip

    3. **生产者(Producer)** 生产者是创建和发送消息到消息队列的角色。在Java中,这通常涉及创建一个`ConnectionFactory`,然后通过这个工厂创建一个`Connection`对象,接着创建一个`Session`,在会话中创建`...

    基于SpringBoot的ActiveMQ生产者/消费者

    在本项目中,我们探讨的是如何使用SpringBoot集成Apache ActiveMQ来构建一个生产者和消费者的应用。SpringBoot以其简洁的配置和快速启动特性,成为现代Java应用开发的首选框架之一,而ActiveMQ则是流行的消息中间件...

    自己实现的 ActiveMQ 多线程客户端 包含生产消息客户端和消费者消息客户端

    - **Amq_Producer.cpp**:这是单线程消息生产者的实现,可能包含创建连接、创建生产者对象、构建消息和发送消息的代码。 - **Amq_Producer_mt.cpp**:扩展了 Amq_Producer.cpp,增加了多线程支持,每个线程独立...

    activeMQ生产者和消费者代码

    在分布式系统中,ActiveMQ作为消息代理,负责接收、存储和转发消息,从而实现生产者与消费者之间的解耦。 生产者和消费者是JMS中的核心概念。生产者是发送消息的应用,而消费者则是接收这些消息的应用。在ActiveMQ...

    spring 整合 activemq 生产者和消费者 案例源码

    Spring整合ActiveMQ是Java消息服务(JMS)在Spring框架中的应用,用于实现生产者与消费者的解耦。在这个案例中,我们将深入探讨如何配置和使用这两个组件,以便于理解它们的工作原理。 首先,ActiveMQ是Apache软件...

    ActiveMQProducer

    **ActiveMQProducer** 是一个基于Java开发的组件,主要用于与Apache ActiveMQ消息中间件进行交互,实现生产者角色的功能。ActiveMQ是Apache软件基金会的一个开源项目,它是一个强大的消息代理,支持多种消息协议,如...

    spring-boot-activemq-producer

    spring-boot-activemq-producer 源码

    ActiveMQ集群及生产者和消费者Java代码.zip

    在这个“ActiveMQ集群及生产者和消费者Java代码”压缩包中,我们可以探讨以下几个关键知识点: 1. **ActiveMQ集群**:ActiveMQ的集群能力允许多个服务器形成一个逻辑单元,提供高可用性和负载均衡。当一个消息代理...

    activeMQ 详细教程与源码(包含消费者与生产者)

    **消息生产者**(Producer)是发送消息的组件,通常在业务处理完成后,将结果或者事件封装为消息发送到消息队列。在 Spring 中,可以通过 `JmsTemplate` 或者 `MessageProducer` 接口来实现。`springMQProducer.rar`...

    Spring平台整合消息队列ActiveMQ实现发布订阅、生产者消费者模型(适合新手或者开发人员了解学习ActiveMQ机制)

    2.在项目中,我们为消息的生产者和发布者分别注册了两个消费者和订阅者,当有消息到达activeMQ时,消费者和订阅者会自动获取对应的消息,其中两个消费者会轮流消费消息,而两个订阅者会同时订阅所有消息;...

    activemq实战项目,同ssh框架整合(生产者+消费者)

    - **生产者**:生产者负责创建和发送消息到ActiveMQ。在SSH项目中,生产者可能是一个服务或控制器,它将业务数据包装成消息并发送到队列或主题。 - **消费者**:消费者订阅队列或主题,接收并处理消息。消费者可能...

    activeMQ Demo

    - **生产者(Producer)**: 负责创建并发送消息到消息队列。 - **消费者(Consumer)**: 从消息队列中接收并处理消息。 - **主题(Topic)**: 支持发布/订阅模式,一个主题可以有多个订阅者,当生产者发送一条...

    activeMQ简单入门案例

    - 实现生产者代码,创建一个连接工厂,连接到ActiveMQ服务器,并创建一个Producer发送消息: ```java import org.apache.activemq.ActiveMQConnectionFactory; // 创建连接工厂 ActiveMQConnectionFactory ...

    activemq-consumer-producer-jee:针对 ActiveMQ 代理的 JEE 生产者和消费者

    本项目 "activemq-consumer-producer-jee" 专注于展示如何在Java Enterprise Edition (JEE) 环境下,利用ActiveMQ创建生产者和消费者。 1. **Java Message Service (JMS)**:JMS 是一个标准接口,它定义了应用程序...

    activemq 入门示例代码

    生产者(Producer) 生产者代码通常会创建一个 `ConnectionFactory`,通过这个工厂创建 `Connection`,然后创建 `Session`。在 `Session` 上创建一个 `Destination`(Queue 或 Topic),接着创建 `MessageProducer...

    springboot整合activemq 消费者 ACK手动确认 &消息重发

    springboot整合 activeMq 消费者 消费接收消息 包含队列模式点对点发 以及 主题模式一对多 这是消费者的demo consumer 。 里面有消息重发机制,手动确认ACK模式。...配合 producer 生产者demo使用。

    用C#实现的ActiveMQ发布/订阅消息传送

    在这种模式下,生产者(publisher)发布消息到一个主题(topic),而消费者(subscriber)订阅该主题以接收这些消息。与点对点模型不同,发布/订阅模式中的消费者可以是多个,每个订阅者都能接收到所有发布的消息。 ...

    SpringBoot整合ActiveMQ案列

    这个依赖包含了Spring对ActiveMQ的支持,包括生产者和消费者的配置。 接着,我们需要配置ActiveMQ服务器。在SpringBoot的配置文件`application.properties`或`application.yml`中,添加以下内容: ```properties #...

Global site tag (gtag.js) - Google Analytics