`

5.6 为消息消费者缓存消息

 
阅读更多

5.6 Caching messages in the broker for consumers

5.6 为消息消费者缓存消息

 

Although one of the most important aspects of message persistence is that the messages

will survive in long-term storage, there are a number of cases where messages are

required to be available for consumers that were disconnected from the broker, but

persisting the messages in a database is too slow. Real-time data delivery of pricing

information for a trading platform is a good example. But typically real-time data

applications use messages that are only valid for a finite amount of time, often less

than a minute. So it’s pointless to persist them to survive a system outage because new

messages will arrive soon.

 

虽然消息持久化的一个重要方面是消息能够从长时间存储中恢复,但是在更多情况下要求消息在消息

消费者与代理之间的连接断开后仍然可用.而将消息持久化到数据库中太慢了.交易平台中价格信息的实时分发

就是一个很好的例子.通常,实时数据应用程序使用的消息仅仅在有限的时间里有效,这个有效时间通常小于

一分钟.所以,没必要为保证系统正常运行而持久化这些消息,因为新的消息很快会到来.

 

ActiveMQ supports the caching of messages for these types of systems using message

caching in the broker by using something called a subscription recovery policy. This

configurable policy is used for deciding which types of messages should be cached,

how many, and for how long. In the rest of this section we’ll explain how message

caching works in ActiveMQ and how to configure the different types of subscription

recovery policies that are available.

ActiveMQ通过在代理中使用一种称为订阅恢复策略,为使用这种类型消息的系统缓存消息提供支持.

这种策略支持配置哪种类型的消息应该被缓存,以及缓存多少,缓存多久.本节的剩余部分,我们将讨论

ActiveMQ如何缓存消息以及如何配置不同的订阅恢复策略.

 

5.6.1 How message caching for consumers works

5.6.1 如何为消息消费者缓存消息

 

The ActiveMQ message broker caches messages in memory for every topic that’s used.

The only types of topics that aren’t supported are temporary topics and ActiveMQ

advisory topics. Caching of messages in this way isn’t handled for queues, as the normal

operation of a queue is to hold every message sent to it.

 

除了临时主题和ActiveMQ建议(advisory)主题,ActiveMQ消息代理在内存中缓存所有的正在使用消息主题中的消息.

ActiveMQ不会通过上述方式为消息队列缓存消息,因为通常情况下队列会保存所有发送给它的消息.

 

Messages that are cached by the broker are only dispatched to a topic consumer if

the consumer is retroactive, and never to durable topic subscribers.

Topic consumers are marked as being retroactive by a property set on the destination

when the topic consumer is created. Here’s an example:

 

代理中缓存的消息只能发送到具有消息追溯能力的消费者(译注:打开retroactive开关的消费者),

不会发送到配置成持久化订阅的主题订阅者.

在创建主题消费者时,可以通过设置目的地属性,将主题消费者设置为可追溯的(retroactive).

下面是一个示例代码:

 

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.MessageConsumer;

import javax.jms.Session;

import javax.jms.Topic;

 

public void createRetroactiveConsumer() throws JMSException

{

  ConnectionFactory fac = new ActiveMQConnectionFactory();

  Connection connection = fac.createConnection();

  connection.start();

  Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

  Topic topic = session.createTopic("TEST.TOPIC?consumer.retroactive=true");

  MessageConsumer consumer = session.createConsumer(topic);

}

 

On the broker side, the message caching is controlled by a destination policy called a

subscriptionRecoveryPolicy. The default subscription recovery policy used in the

broker is a FixedSizeSubscriptionRecoveryPolicy. Let’s walk through the different

subscription recovery policies that are available.

 

在代理端,消息缓存通过一个名称为subscriptionRecoveryPolicy(订阅恢复策略)的目的地策略来控制.默认的

订阅恢复策略是FixedSizeSubscriptionRecoveryPolicy.下面让我们逐一了解可用的各种不同的

订阅恢复策略.

 

5.6.2 The ActiveMQ subscription recovery policies

5.6.2 ActiveMQ的订阅恢复策略

 

There are a number of different policies that allow for fine-tuning the duration and

type of messages that are cached for nondurable topic consumers. Each policy type is

explained here.

 

ActiveMQ提供多种策略用来调优持久化消息以及缓存非持久化主题消费者的消息.下面分别说明这些

策略.

 

THE ACTIVEMQ FIXED SIZE SUBSCRIPTION RECOVERY POLICY

ACTIVEMQ固定尺寸订阅恢复策略

 

This policy limits the number of messages cached for the topic based on the amount

of memory they use. This is the default subscription recovery policy in ActiveMQ. You

can choose to have the cache limit applied to all topics, or on a topic-by-topic basis.

The properties available are shown in table 5.6.

 

这种策略根据使用的内存大小对每个主题的可缓存消息数量做了限制.这也是ActiveMQ默认的订阅恢复策略.

你可以为所有主题设置一种缓存策略,或者给每一个主题单独设置.表5.6是配置该策略时可以使用的参数.

 

Table 5.6 Configuration properties for a fixed size subscription recovery policy

表5.6 固定尺寸订阅恢复策略可配置属性

 

Property name       Default value       Description

属性名称            默认值               描述

 

maximumSize         6553600         The memory size in bytes for this cache

maximumSize         6553600         缓存可用的内存大小(单位:byte) 

 

useSharedBuffer     true            If true, the amount of memory allocated will be used across all topics

useSharedBuffer     true            如果设置为true,maximumSize设置的内存大小为所有主题的缓存总额

                                    (而不是每个主题缓存都可以达到maximumSize设置的大小)                                            

 

THE ACTIVEMQ FIXED COUNT SUBSCRIPTION RECOVERY POLICY

固定数量策略

 

This policy limits the number of messages cached by the topic based on a static count.

Only one property is available, as listed in table 5.7.

该策略根据主题中消息的数量来限制消息缓存.只有一个属性可设置,如表5.7所示

 

Table 5.7 Configuration properties for a fixed count subscription recovery policy

固定数量策略配置参数列表

 

Property name       Default value       Description

属性名称            默认值               描述

maximumSize         100               The number of messages allowed in the topics cache

maximumSize         100               每个主题中可缓存的消息数量最大值

 

THE ACTIVEMQ QUERY-BASED SUBSCRIPTION RECOVERY POLICY

基于查询的策略

 

This policy limits the number of messages cached based on a JMS property selector that’s

applied to each message. Only one property is available, as shown in table 5.8.

该策略根据应用到每个消息的JMS属性选择器来限制缓存消息数量.只有一个可设置参数,如表5.8所示:

 

Table 5.8 Configuration properties for a query-based subscription recovery policy

表5.8 基于查询的策略配置参数列表

 

Property name       Default value       Description

属性名称            默认值               描述

query               null                Caches only messages that match the query

query               null                当消息符合设置选择器时,缓存消息.

 

THE ACTIVEMQ TIMED SUBSCRIPTION RECOVERY POLICY

时间策略

 

This policy limits the number of messages cached by the topic based on an expiration

time that’s applied to each message. Note that the expiration time on a message is

independent from the timeToLive that’s set by the MessageProducer. The configuration

properties for a timed subscription policy are shown in table 5.9.

该策略根据消息的过期时间来缓存主题中的消息.注意,消息的过期时间是独立的,该过期时间由消息

生产者设置消息的timeToLive参数决定.可配置属性如表5.9所示.

 

Table 5.9 Configuration properties for a timed subscription recovery policy

表5.9时间策略配置参数列表

 

Property name       Default value       Description

属性名称            默认值               描述

recoverDuration     60000                 The time in milliseconds to keep messages in the cache

recoverDuration     60000                 消息的缓存时间(毫秒)

 

THE ACTIVEMQ LAST IMAGE SUBSCRIPTION RECOVERY POLICY

最终映像策略

 

This policy holds only the last message sent to a topic. It can be useful for real-time

pricing information—where a price per topic is used, you might only want the last

price that’s sent to that topic. There are no configuration properties for this policy.

该策略仅缓存发送到主题的最后一个消息.在实时的价格信息中,每个主题就是一个价格信息,这个策略很有用,

因为你可能仅仅关注最后一个发送给主题的价格信息.该策略没有配置属性.

 

THE ACTIVEMQ NO SUBSCRIPTION RECOVERY POLICY

无缓存策略

This policy disables message caching for topics. There are no properties to configure

for this policy.

该策略禁止缓存主题消息.该策略也没有配置属性.

 

5.6.3 Configuring the subscription recovery policy

5.6.3 配置订阅恢复策略

 

You can configure the subscriptionRecoveryPolicy for either individual topics, or

you can use wildcards, in the ActiveMQ broker configuration. An example configuration

is shown here:

在ActiveMQ代理的配置中,可以为每个独立的主题配置subscriptionRecoveryPolicy,或者也可以使用通配符

为多个主题配置策略.下面是配置代码样例:

 

<?xml version="1.0" encoding="UTF-8"?>

<beans>

  <broker brokerName="test-broker" persistent="true" useShutdownHook="false"

  deleteAllMessagesOnStartup="true"

  xmlns="http://activemq.apache.org/schema/core">

    <transportConnectors>

      <transportConnector uri="tcp://localhost:61635"/>

    </transportConnectors>

    

    <destinationPolicy>

      <policyMap>

        <policyEntries>

        

          <policyEntry topic="Topic.FixedSizedSubs.>">

            <subscriptionRecoveryPolicy>

              <fixedSizeSubscriptionRecoveryPolicy maximumSize="2000000" useSharedBuffer="false"/>

            </subscriptionRecoveryPolicy>

          </policyEntry>

          

          <policyEntry topic="Topic.LastImageSubs.>">

            <subscriptionRecoveryPolicy>

              <lastImageSubscriptionRecoveryPolicy/>

            </subscriptionRecoveryPolicy>

          </policyEntry>

          

          <policyEntry topic="Topic.NoSubs.>">

            <subscriptionRecoveryPolicy>

              <noSubscriptionRecoveryPolicy/>

            </subscriptionRecoveryPolicy>

          </policyEntry>

        

          <policyEntry topic="Topic.TimedSubs.>">

            <subscriptionRecoveryPolicy>

              <timedSubscriptionRecoveryPolicy recoverDuration="25000"/>

            </subscriptionRecoveryPolicy>

          </policyEntry>

        </policyEntries>

      </policyMap>

    </destinationPolicy>

  </broker>

</beans>

分享到:
评论
5 楼 jackyin5918 2014-04-24  
wuzhaoxue 写道
Messages that are cached by the broker are only dispatched to a topic consumer if
the consumer is retroactive.  这句话翻译得不太恰当。


你好,感谢指正.

已修改成:

代理中缓存的消息只能发送到具有消息追溯能力的消费者(译注:打开retroactive开关的消费者),不会发送到配置成持久化订阅的主题订阅者.

如有问题,欢迎继续留言给我.

关于消息追溯,请参考:http://jackyin5918.iteye.com/blog/1990533
4 楼 wuzhaoxue 2014-04-23  
Messages that are cached by the broker are only dispatched to a topic consumer if
the consumer is retroactive.  这句话翻译得不太恰当。
3 楼 sdfasdfgfh 2014-03-06  
jackyin5918 写道
sdfasdfgfh 写道
请问用activemq怎么收到最后一条信息,就是无论什么情况下都能收到指定topic下的最后一条信息?这样能做到吗?


不知道 持久化订阅这个能否满足需求.
假如发送消息到主题时,主题消费者不在线,此时消息会持久化起来,在消费者
重新上线时,仍然会收到消息.

参考:http://jackyin5918.iteye.com/blog/2005513


就是持久化不能解决这个问题,项目需求是:股票的价格实时变动,但不需要持久化,当今天收盘后价格不会变动,明天开盘前这个价格应该还是和昨天收盘时的价格一样,最后一条消息应该能够保存才能够得到解决.不知道博主有没有好的解决方法?
2 楼 jackyin5918 2014-03-05  
sdfasdfgfh 写道
请问用activemq怎么收到最后一条信息,就是无论什么情况下都能收到指定topic下的最后一条信息?这样能做到吗?


不知道 持久化订阅这个能否满足需求.
假如发送消息到主题时,主题消费者不在线,此时消息会持久化起来,在消费者
重新上线时,仍然会收到消息.

参考:http://jackyin5918.iteye.com/blog/2005513
1 楼 sdfasdfgfh 2014-03-05  
请问用activemq怎么收到最后一条信息,就是无论什么情况下都能收到指定topic下的最后一条信息?这样能做到吗?

相关推荐

    ActiveMQ Failover Transport Options

    例如,在创建消费者时,可以设置过滤属性以选择性地接收消息: ``` MessageConsumer consumer = session.createConsumer(destinationQuery, "actionType='REQUEST'"); ``` 在发送信息时,也可以设置订阅属性以选择...

    同步MySQL数据库增量变化fountain.zip

    Fountain是监查、捕捉MySQL数据库的增量变化,分发数据变化给消费者处理的一套解决方案。  Fountain,英[ˈfaʊntən],是”源泉“的意思,MySQL数据库源源不断的下发增量,因此而得名。  任何需要快速、准确...

    JAVA并发编程实践_中文版(1-16章全)_1/4

    5.6 为计算结果建立高效、可伸缩的高速缓存 第2部分 构建并发应用程序 第6章 任务执行 6.1 在线程中执行任务 6.2 executor 框架 6.3 寻找可强化的并行性 第7章 取消和关闭 7.1 任务取消 7.2 停止基于线程的服务 7.3 ...

    Java并发编程part2

    5.6 为计算结果建立高效、可伸缩的高速缓存 第2部分 构建并发应用程序 第6章 任务执行 6.1 在线程中执行任务 6.2 executor 框架 6.3 寻找可强化的并行性 第7章 取消和关闭 7.1 任务取消 7.2 停止基于线程的服务 7.3 ...

    Java并发编程实践part1

    5.6 为计算结果建立高效、可伸缩的高速缓存 第2部分 构建并发应用程序 第6章 任务执行 6.1 在线程中执行任务 6.2 executor 框架 6.3 寻找可强化的并行性 第7章 取消和关闭 7.1 任务取消 7.2 停止基于线程的服务 7.3 ...

    Spring in Action(第2版)中文版

    4.2.2定义切点和通知者 4.2.3使用proxyfactorybean 4.3自动代理 4.3.1为spring切面创建自动代理 4.3.2自动代理@aspectj切面 4.4定义纯粹的pojo切面 4.5注入aspectj切面 4.6小结 第二部分企业spring 第5章...

    Spring in Action(第二版 中文高清版).part2

    4.2.2 定义切点和通知者 4.2.3 使用ProxyFactoryBean 4.3 自动代理 4.3.1 为Spring切面创建自动代理 4.3.2 自动代理@AspectJ切面 4.4 定义纯粹的POJO切面 4.5 注入AspectJ切面 4.6 小结 第二部分 企业...

    Spring in Action(第二版 中文高清版).part1

    4.2.2 定义切点和通知者 4.2.3 使用ProxyFactoryBean 4.3 自动代理 4.3.1 为Spring切面创建自动代理 4.3.2 自动代理@AspectJ切面 4.4 定义纯粹的POJO切面 4.5 注入AspectJ切面 4.6 小结 第二部分 企业...

    Java并发编程实践 PDF 高清版

    5.6 为计算结果建立高效、可伸缩的高速缓存 第2部分 构建并发应用程序 第6章 任务执行 6.1 在线程中执行任务 6.2 Executor 框架 6.3 寻找可强化的并行性 第7章 取消和关闭 7.1 任务取消 7.2 停止基于线程的服务 7.3 ...

    LegendShop 技术介绍

    - **B2B2C模式**:一种运营商管理多个大型商家同时为消费者提供服务的模式,类似天猫。 #### 四、产品体系架构 - **技术架构**:基于J2EE的企业级架构,分为前端技术和后端技术体系,支持大型分布式应用。 - **跨...

    canal简介.ppt

    1. **准备MySQL环境**:确保MySQL版本为5.1.x, 5.5.x, 5.6.x, 5.7.x 或 8.0.x,并开启binlog功能,设置binlog格式为ROW模式,配置server_id以避免冲突。 2. **创建Canal用户并授权**:创建名为'canal'的用户,并赋予...

    Pattens In Java.pdf

    #### 5.6 Observer **结构:** - **观察者接口**:定义更新自己的方法。 - **主题接口**:定义注册、注销和通知观察者的方法。 - **具体观察者类**:实现观察者接口。 - **具体主题类**:实现主题接口,维护观察者...

    MTK MT8377

    这些芯片组不仅具备良好的处理能力和多媒体性能,还集成了多种连接性和扩展功能,满足了消费者对移动设备日益增长的需求。随着技术的不断进步和发展,联发科技将继续为市场带来更加先进和高效的芯片解决方案。

    并行计算导论(原书第2版).[美]Ananth Grama(带详细书签).pdf

    3.6.5 流水线模型或生产者-消费者模型 3.6.6 混合模型 3.7 书目评注 习题 第4章 基本通信操作 4.1 一对多广播以及多对一归约 4.1.1 环或线性阵列 4.1.2 格网 4.1.3 超立方体 4.1.4 平衡二叉树 4.1.5 算法...

    java_concurrency_in_practice

    分析了阻塞队列的工作原理及其在生产者-消费者模式中的应用。 - **5.4 阻塞和可中断方法** 讨论了如何优雅地处理阻塞操作和中断请求。 - **5.5 同步器** 探讨了如CountDownLatch、CyclicBarrier等高级同步工具...

    白云易支付源码/第三方第四方支付系统源码 含有搭建说明

    在IT领域,第三方支付系统指的是独立于商家和银行的中介平台,它连接商家的支付接口与消费者的支付工具,如支付宝、微信支付等。第四方支付系统则在此基础上,进一步聚合了多家第三方支付渠道,为商家提供一站式接入...

    Java并发编程实战

    5.3 阻塞队列和生产者-消费者模式 5.3.1 示例:桌面搜索 5.3.2 串行线程封闭 5.3.3 双端队列与工作密取 5.4 阻塞方法与中断方法 5.5 同步工具类 5.5.1 闭锁 5.5.2 FutureTask 5.5.3 信号量 5.5.4 栅栏 5.6...

    Java 并发编程实战

    5.3 阻塞队列和生产者-消费者模式 5.3.1 示例:桌面搜索 5.3.2 串行线程封闭 5.3.3 双端队列与工作密取 5.4 阻塞方法与中断方法 5.5 同步工具类 5.5.1 闭锁 5.5.2 FutureTask 5.5.3 信号量 5.5.4 栅栏 5.6...

    NH056DN04显示控制块说明书

    ### NH056DN04显示控制块说明书知识点解析 #### 一、NH056DN04显示控制模块的特点 1. **安装便捷性**: - 尺寸为144mm×104mm;...同时,该模块的详细规格和技术参数也为设计者提供了充分的设计依据和支持。

    Java.Concurrency.in.Practice.pdf

    - **5.3 阻塞队列与生产者消费者模式**:阐述了阻塞队列的概念及其在实现生产者消费者模式中的作用。 - **5.4 阻塞和可中断方法**:讲解了如何处理阻塞方法和如何使这些方法支持中断。 - **5.5 同步器**:介绍了一些...

Global site tag (gtag.js) - Google Analytics