5.6 Caching messages in the broker for consumers
5.6 为消息消费者缓存消息
Although one of the most important aspects of message persistence is that the messages
will survive in long-term storage, there are a number of cases where messages are
required to be available for consumers that were disconnected from the broker, but
persisting the messages in a database is too slow. Real-time data delivery of pricing
information for a trading platform is a good example. But typically real-time data
applications use messages that are only valid for a finite amount of time, often less
than a minute. So it’s pointless to persist them to survive a system outage because new
messages will arrive soon.
虽然消息持久化的一个重要方面是消息能够从长时间存储中恢复,但是在更多情况下要求消息在消息
消费者与代理之间的连接断开后仍然可用.而将消息持久化到数据库中太慢了.交易平台中价格信息的实时分发
就是一个很好的例子.通常,实时数据应用程序使用的消息仅仅在有限的时间里有效,这个有效时间通常小于
一分钟.所以,没必要为保证系统正常运行而持久化这些消息,因为新的消息很快会到来.
ActiveMQ supports the caching of messages for these types of systems using message
caching in the broker by using something called a subscription recovery policy. This
configurable policy is used for deciding which types of messages should be cached,
how many, and for how long. In the rest of this section we’ll explain how message
caching works in ActiveMQ and how to configure the different types of subscription
recovery policies that are available.
ActiveMQ通过在代理中使用一种称为订阅恢复策略,为使用这种类型消息的系统缓存消息提供支持.
这种策略支持配置哪种类型的消息应该被缓存,以及缓存多少,缓存多久.本节的剩余部分,我们将讨论
ActiveMQ如何缓存消息以及如何配置不同的订阅恢复策略.
5.6.1 How message caching for consumers works
5.6.1 如何为消息消费者缓存消息
The ActiveMQ message broker caches messages in memory for every topic that’s used.
The only types of topics that aren’t supported are temporary topics and ActiveMQ
advisory topics. Caching of messages in this way isn’t handled for queues, as the normal
operation of a queue is to hold every message sent to it.
除了临时主题和ActiveMQ建议(advisory)主题,ActiveMQ消息代理在内存中缓存所有的正在使用消息主题中的消息.
ActiveMQ不会通过上述方式为消息队列缓存消息,因为通常情况下队列会保存所有发送给它的消息.
Messages that are cached by the broker are only dispatched to a topic consumer if
the consumer is retroactive, and never to durable topic subscribers.
Topic consumers are marked as being retroactive by a property set on the destination
when the topic consumer is created. Here’s an example:
代理中缓存的消息只能发送到具有消息追溯能力的消费者(译注:打开retroactive开关的消费者),
不会发送到配置成持久化订阅的主题订阅者.
在创建主题消费者时,可以通过设置目的地属性,将主题消费者设置为可追溯的(retroactive).
下面是一个示例代码:
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;
public void createRetroactiveConsumer() throws JMSException
{
ConnectionFactory fac = new ActiveMQConnectionFactory();
Connection connection = fac.createConnection();
connection.start();
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("TEST.TOPIC?consumer.retroactive=true");
MessageConsumer consumer = session.createConsumer(topic);
}
On the broker side, the message caching is controlled by a destination policy called a
subscriptionRecoveryPolicy. The default subscription recovery policy used in the
broker is a FixedSizeSubscriptionRecoveryPolicy. Let’s walk through the different
subscription recovery policies that are available.
在代理端,消息缓存通过一个名称为subscriptionRecoveryPolicy(订阅恢复策略)的目的地策略来控制.默认的
订阅恢复策略是FixedSizeSubscriptionRecoveryPolicy.下面让我们逐一了解可用的各种不同的
订阅恢复策略.
5.6.2 The ActiveMQ subscription recovery policies
5.6.2 ActiveMQ的订阅恢复策略
There are a number of different policies that allow for fine-tuning the duration and
type of messages that are cached for nondurable topic consumers. Each policy type is
explained here.
ActiveMQ提供多种策略用来调优持久化消息以及缓存非持久化主题消费者的消息.下面分别说明这些
策略.
THE ACTIVEMQ FIXED SIZE SUBSCRIPTION RECOVERY POLICY
ACTIVEMQ固定尺寸订阅恢复策略
This policy limits the number of messages cached for the topic based on the amount
of memory they use. This is the default subscription recovery policy in ActiveMQ. You
can choose to have the cache limit applied to all topics, or on a topic-by-topic basis.
The properties available are shown in table 5.6.
这种策略根据使用的内存大小对每个主题的可缓存消息数量做了限制.这也是ActiveMQ默认的订阅恢复策略.
你可以为所有主题设置一种缓存策略,或者给每一个主题单独设置.表5.6是配置该策略时可以使用的参数.
Table 5.6 Configuration properties for a fixed size subscription recovery policy
表5.6 固定尺寸订阅恢复策略可配置属性
Property name Default value Description
属性名称 默认值 描述
maximumSize 6553600 The memory size in bytes for this cache
maximumSize 6553600 缓存可用的内存大小(单位:byte)
useSharedBuffer true If true, the amount of memory allocated will be used across all topics
useSharedBuffer true 如果设置为true,maximumSize设置的内存大小为所有主题的缓存总额
(而不是每个主题缓存都可以达到maximumSize设置的大小)
THE ACTIVEMQ FIXED COUNT SUBSCRIPTION RECOVERY POLICY
固定数量策略
This policy limits the number of messages cached by the topic based on a static count.
Only one property is available, as listed in table 5.7.
该策略根据主题中消息的数量来限制消息缓存.只有一个属性可设置,如表5.7所示
Table 5.7 Configuration properties for a fixed count subscription recovery policy
固定数量策略配置参数列表
Property name Default value Description
属性名称 默认值 描述
maximumSize 100 The number of messages allowed in the topics cache
maximumSize 100 每个主题中可缓存的消息数量最大值
THE ACTIVEMQ QUERY-BASED SUBSCRIPTION RECOVERY POLICY
基于查询的策略
This policy limits the number of messages cached based on a JMS property selector that’s
applied to each message. Only one property is available, as shown in table 5.8.
该策略根据应用到每个消息的JMS属性选择器来限制缓存消息数量.只有一个可设置参数,如表5.8所示:
Table 5.8 Configuration properties for a query-based subscription recovery policy
表5.8 基于查询的策略配置参数列表
Property name Default value Description
属性名称 默认值 描述
query null Caches only messages that match the query
query null 当消息符合设置选择器时,缓存消息.
THE ACTIVEMQ TIMED SUBSCRIPTION RECOVERY POLICY
时间策略
This policy limits the number of messages cached by the topic based on an expiration
time that’s applied to each message. Note that the expiration time on a message is
independent from the timeToLive that’s set by the MessageProducer. The configuration
properties for a timed subscription policy are shown in table 5.9.
该策略根据消息的过期时间来缓存主题中的消息.注意,消息的过期时间是独立的,该过期时间由消息
生产者设置消息的timeToLive参数决定.可配置属性如表5.9所示.
Table 5.9 Configuration properties for a timed subscription recovery policy
表5.9时间策略配置参数列表
Property name Default value Description
属性名称 默认值 描述
recoverDuration 60000 The time in milliseconds to keep messages in the cache
recoverDuration 60000 消息的缓存时间(毫秒)
THE ACTIVEMQ LAST IMAGE SUBSCRIPTION RECOVERY POLICY
最终映像策略
This policy holds only the last message sent to a topic. It can be useful for real-time
pricing information—where a price per topic is used, you might only want the last
price that’s sent to that topic. There are no configuration properties for this policy.
该策略仅缓存发送到主题的最后一个消息.在实时的价格信息中,每个主题就是一个价格信息,这个策略很有用,
因为你可能仅仅关注最后一个发送给主题的价格信息.该策略没有配置属性.
THE ACTIVEMQ NO SUBSCRIPTION RECOVERY POLICY
无缓存策略
This policy disables message caching for topics. There are no properties to configure
for this policy.
该策略禁止缓存主题消息.该策略也没有配置属性.
5.6.3 Configuring the subscription recovery policy
5.6.3 配置订阅恢复策略
You can configure the subscriptionRecoveryPolicy for either individual topics, or
you can use wildcards, in the ActiveMQ broker configuration. An example configuration
is shown here:
在ActiveMQ代理的配置中,可以为每个独立的主题配置subscriptionRecoveryPolicy,或者也可以使用通配符
为多个主题配置策略.下面是配置代码样例:
<?xml version="1.0" encoding="UTF-8"?>
<beans>
<broker brokerName="test-broker" persistent="true" useShutdownHook="false"
deleteAllMessagesOnStartup="true"
xmlns="http://activemq.apache.org/schema/core">
<transportConnectors>
<transportConnector uri="tcp://localhost:61635"/>
</transportConnectors>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic="Topic.FixedSizedSubs.>">
<subscriptionRecoveryPolicy>
<fixedSizeSubscriptionRecoveryPolicy maximumSize="2000000" useSharedBuffer="false"/>
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry topic="Topic.LastImageSubs.>">
<subscriptionRecoveryPolicy>
<lastImageSubscriptionRecoveryPolicy/>
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry topic="Topic.NoSubs.>">
<subscriptionRecoveryPolicy>
<noSubscriptionRecoveryPolicy/>
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry topic="Topic.TimedSubs.>">
<subscriptionRecoveryPolicy>
<timedSubscriptionRecoveryPolicy recoverDuration="25000"/>
</subscriptionRecoveryPolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
</beans>
相关推荐
例如,在创建消费者时,可以设置过滤属性以选择性地接收消息: ``` MessageConsumer consumer = session.createConsumer(destinationQuery, "actionType='REQUEST'"); ``` 在发送信息时,也可以设置订阅属性以选择...
Fountain是监查、捕捉MySQL数据库的增量变化,分发数据变化给消费者处理的一套解决方案。 Fountain,英[ˈfaʊntən],是”源泉“的意思,MySQL数据库源源不断的下发增量,因此而得名。 任何需要快速、准确...
5.6 为计算结果建立高效、可伸缩的高速缓存 第2部分 构建并发应用程序 第6章 任务执行 6.1 在线程中执行任务 6.2 executor 框架 6.3 寻找可强化的并行性 第7章 取消和关闭 7.1 任务取消 7.2 停止基于线程的服务 7.3 ...
5.6 为计算结果建立高效、可伸缩的高速缓存 第2部分 构建并发应用程序 第6章 任务执行 6.1 在线程中执行任务 6.2 executor 框架 6.3 寻找可强化的并行性 第7章 取消和关闭 7.1 任务取消 7.2 停止基于线程的服务 7.3 ...
5.6 为计算结果建立高效、可伸缩的高速缓存 第2部分 构建并发应用程序 第6章 任务执行 6.1 在线程中执行任务 6.2 executor 框架 6.3 寻找可强化的并行性 第7章 取消和关闭 7.1 任务取消 7.2 停止基于线程的服务 7.3 ...
4.2.2定义切点和通知者 4.2.3使用proxyfactorybean 4.3自动代理 4.3.1为spring切面创建自动代理 4.3.2自动代理@aspectj切面 4.4定义纯粹的pojo切面 4.5注入aspectj切面 4.6小结 第二部分企业spring 第5章...
4.2.2 定义切点和通知者 4.2.3 使用ProxyFactoryBean 4.3 自动代理 4.3.1 为Spring切面创建自动代理 4.3.2 自动代理@AspectJ切面 4.4 定义纯粹的POJO切面 4.5 注入AspectJ切面 4.6 小结 第二部分 企业...
4.2.2 定义切点和通知者 4.2.3 使用ProxyFactoryBean 4.3 自动代理 4.3.1 为Spring切面创建自动代理 4.3.2 自动代理@AspectJ切面 4.4 定义纯粹的POJO切面 4.5 注入AspectJ切面 4.6 小结 第二部分 企业...
5.6 为计算结果建立高效、可伸缩的高速缓存 第2部分 构建并发应用程序 第6章 任务执行 6.1 在线程中执行任务 6.2 Executor 框架 6.3 寻找可强化的并行性 第7章 取消和关闭 7.1 任务取消 7.2 停止基于线程的服务 7.3 ...
- **B2B2C模式**:一种运营商管理多个大型商家同时为消费者提供服务的模式,类似天猫。 #### 四、产品体系架构 - **技术架构**:基于J2EE的企业级架构,分为前端技术和后端技术体系,支持大型分布式应用。 - **跨...
1. **准备MySQL环境**:确保MySQL版本为5.1.x, 5.5.x, 5.6.x, 5.7.x 或 8.0.x,并开启binlog功能,设置binlog格式为ROW模式,配置server_id以避免冲突。 2. **创建Canal用户并授权**:创建名为'canal'的用户,并赋予...
#### 5.6 Observer **结构:** - **观察者接口**:定义更新自己的方法。 - **主题接口**:定义注册、注销和通知观察者的方法。 - **具体观察者类**:实现观察者接口。 - **具体主题类**:实现主题接口,维护观察者...
这些芯片组不仅具备良好的处理能力和多媒体性能,还集成了多种连接性和扩展功能,满足了消费者对移动设备日益增长的需求。随着技术的不断进步和发展,联发科技将继续为市场带来更加先进和高效的芯片解决方案。
3.6.5 流水线模型或生产者-消费者模型 3.6.6 混合模型 3.7 书目评注 习题 第4章 基本通信操作 4.1 一对多广播以及多对一归约 4.1.1 环或线性阵列 4.1.2 格网 4.1.3 超立方体 4.1.4 平衡二叉树 4.1.5 算法...
分析了阻塞队列的工作原理及其在生产者-消费者模式中的应用。 - **5.4 阻塞和可中断方法** 讨论了如何优雅地处理阻塞操作和中断请求。 - **5.5 同步器** 探讨了如CountDownLatch、CyclicBarrier等高级同步工具...
在IT领域,第三方支付系统指的是独立于商家和银行的中介平台,它连接商家的支付接口与消费者的支付工具,如支付宝、微信支付等。第四方支付系统则在此基础上,进一步聚合了多家第三方支付渠道,为商家提供一站式接入...
5.3 阻塞队列和生产者-消费者模式 5.3.1 示例:桌面搜索 5.3.2 串行线程封闭 5.3.3 双端队列与工作密取 5.4 阻塞方法与中断方法 5.5 同步工具类 5.5.1 闭锁 5.5.2 FutureTask 5.5.3 信号量 5.5.4 栅栏 5.6...
5.3 阻塞队列和生产者-消费者模式 5.3.1 示例:桌面搜索 5.3.2 串行线程封闭 5.3.3 双端队列与工作密取 5.4 阻塞方法与中断方法 5.5 同步工具类 5.5.1 闭锁 5.5.2 FutureTask 5.5.3 信号量 5.5.4 栅栏 5.6...
### NH056DN04显示控制块说明书知识点解析 #### 一、NH056DN04显示控制模块的特点 1. **安装便捷性**: - 尺寸为144mm×104mm;...同时,该模块的详细规格和技术参数也为设计者提供了充分的设计依据和支持。
- **5.3 阻塞队列与生产者消费者模式**:阐述了阻塞队列的概念及其在实现生产者消费者模式中的作用。 - **5.4 阻塞和可中断方法**:讲解了如何处理阻塞方法和如何使这些方法支持中断。 - **5.5 同步器**:介绍了一些...