`
liyixing1
  • 浏览: 959012 次
  • 性别: Icon_minigender_1
  • 来自: 江西上饶
社区版块
存档分类
最新评论

activemq 消息存储

    博客分类:
  • jms
阅读更多
JMS规范支持两种方式分发消息:持久化和非持久化。ActiveMQ同时支持上面两种。ActiveMQ支持一种可插拔式的消息存储,并且提供三种方式消息存储:存储到内存,
存储到文件,以及存储到相关的数据库.

消息队列是属于先进先出的规则,消息被确定收到后,会从代理的消息存储中删除。
订阅模式,代理器只会存储一个消息,但是会为每个订阅者创建指针,只想下一条,但消息所对应的指针数量0时,被删除。

KahaDB消息存储
ctiveMQ从5.3版本以后,推荐使用KahaDB作为通用的消息存储方式.KahaDB是一种基于文件的存储,具有卓越的性能和扩展性,具有事务日志功能,数据日志,消息ID索引,基于内容的消息缓存等特点。能够可靠的消息的存储和恢复.
为了在ActiveMQ启用KahaDB存储,你需要在配置文件activemq.xml中配置<persistenceAdapter>
元素.下面是启用KahaDB存储的一个最小化配置例子:
<broker brokerName="broker" persistent="true" useShutdownHook="false">
  ...
  <persistenceAdapter>
    <kahaDB directory="activemq-data" journalMaxFileLength="16mb"/>
  </persistenceAdapter>
  ...
</broker>


如果是通过代码启动内嵌代理,那么可以通过

public class EmbeddedBrokerUsingAMQStoreExample {
BrokerService createEmbeddedBroker() throws Exception {
BrokerService broker = new BrokerService();
File dataFileDir = new File("target/amq-in-action/kahadb");
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(dataFileDir);
// Using a bigger journal file
kaha.setJournalMaxFileLength(1024*100);
// small batch means more frequent and smaller writes
kaha.setIndexWriteBatchSize(100);
// do the index write in a separate thread
kaha.setEnableIndexWriteAsync(true);
broker.setPersistenceAdapter(kaha);
//create a transport connector
broker.addConnector("tcp://localhost:61616");
//start the broker
broker.start();
return broker;
}
}


KahaDB为数据日志和索引使用不同的磁盘文件

db log 文件 --KahaDB将消息存储到数据日志文件中,该文件命名方式为db-<编号>.log,文件大小事先已经定义好.
当数据日志文件达到预定的大小,一个新的文件会自动创建,同时文件名中的编号自动加1.如果没有任何引用指向
数据日志文件中的消息,那么该日志文件将被删除或者归档.

归档目录 -- 归档目录仅在启用归档时才会出现.归档文件用于存储那些不再被KahaDB使用的数据日志文件.使用归档文件可以保存消息以便以后重新检视消息变为可能.如果没有启用归档功能(默认不启用归档),不在使用的数据日志文件会从文件系统中删除.

db.data -- 该文件保存了数据日志文件中消息的持久化BTree索引.

db.redo -- 这是重做日志文件,用于KahaDB从一次非正常关闭后重启时恢复BTree索引.

配置属性
属性名                        默认值                     描述
directory                 activemq-data           KahaDB使用的目录
indexWriteBatchSize           1000        一次写入磁盘的属性页数量
indexCacheSize                10000       内存中缓存的所引用数量
enableIndexWriteAsync         false       如果设置为true,索引将以异步方式写入
journalMaxFileLength          32mb        设置每个消息数据日志文件的最大尺寸
enableJournalDiskSyncs        true        保证每个非事务日志写操作后进行磁盘同步(JMS持久化要求)       
cleanupInterval               30000       检查 丢弃/移动数据日志文件中是不不再使用的消息 间隔时间(单位 毫秒)
checkpointInterval            5000        执行checkpoint的时间间隔
ignoreMissingJournalfiles     false       如果启用,则忽视丢失消息日志(译注:在消息丢失时不记录日志)
checkForCorruptJournalFiles   false       如果启用,在启动是会检查消息数据日志文件是否被占用
checksumJournalFiles          false       如果启用,将为每一个消息数据日志问价开启检查消息总数服务
archiveDataLogs               false       如果启用,不用的消息数据文件会被移动到归档目录 而不是删除掉
directoryArchive              null        存放规定文件的目录
databaseLockedWaitDelay       10000       尝试获取数据库锁的等待时间(用于共享主/从数据库)
maxAsyncJobs                  10000       排队等待存储的异步消息的最大数量(应该和并发的消息生产者数量相同)
concurrentStoreAndDispatchTransactions  true      允许在存储事务时分发消息到对其感兴趣的客户端
concurrentStoreAndDispatchTopics        true      允许在存储消息时,分发主题中的消息给对其感兴趣的客户端
concurrentStoreAndDispatchQueues        true      允许在存储消息是,分发队列中的消息给对其感兴趣的客户端

ActiveMQ为消息存储提供了一种可插拔式的API.ActiveMQ版本中包含一下三种KahaDB消息存储实现:
AMQ 消息存储 -- 基于文件的高性能消息存储
JDBC 消息存储 -- 基于JDBC的消息存储
Memory 消息存储 -- 基于内存的消息存储



AMQ消息存储
每个代理中会产生大量的消息队列是不应该使用AMQ消息存储

和KahaDB类似,具有保障可靠持久化的事务日志以及搞性能索引,因而当消息吞吐量是主要需求时,应用程序使用这种消息存储是最好的选择.但是因为AMQ消息存储中每个索引都使用连个文件,而且为每一个消息目的地都单独使用索引,所以当每个代理中会产生大量的消息队列是不应该使用AMQ消息存储.同样,使用这种消息存储代理从非正常关闭中恢复会比较慢.原因是代理恢复时,所有的的索引需要重建,这就要求代理遍历它的数据日志文件以便正确的重建索引.

AMQ消息存储包含一下三个独立部分:
数据日志文件 -- 该文件保存消息引用,这些消息已根据消息ID建立了索引
缓存 -- 在消息被写入数据日志文件后,缓存是在内存中保存这些消息以便快速读取这些消息
引用存储 -- 引用存储是掉数据日志中消息的引用,这些消息已经根据消息ID建立了索引


<?xml version="1.0" encoding="UTF-8"?>
<beans>
  <broker xmlns="http://activemq.apache.org/schema/core">
    <persistenceAdapter>
      <amqPersistenceAdapter
      directory="target/Broker2-data/activemq-data"
      syncOnWrite="true"
      indexPageSize="16kb"
      indexMaxBinSize="100"
      maxFileLength="10mb" />
    </persistenceAdapter>
  </broker>
</beans>



JDBC
使用一个共享数据库,可以制作出一种具有主从关系的代理网络,多个网络之间会获取数据库的锁,先获取的具有从的特点。其余的会以从属存在。当连接主代理时失败时,那么客户端需要等待,另外一个主获得锁。

至少需要三个表,结构大概是
ACTIVEMQ_MSGS,存储消息



ACTIVEMQ_ACKS 存储长期订阅者



ACTIVEMQ_LOCK 用来确保只有一个代理能获得锁





JDBC配置例子
<beans>
<broker brokerName="test-broker"
persistent="true"
xmlns="http://activemq.apache.org/schema/core">
<persistenceAdapter>
<jdbcPersistenceAdapter dataDirectory="activemq-data"/>
</persistenceAdapter>
</broker>
</beans>


MYSQL的例子
<?xml version="1.0" encoding="UTF-8"?>
<beans>
<broker brokerName="test-broker"
persistent="true"
xmlns="http://activemq.apache.org/schema/core">
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>
</broker>
<bean id="mysql-ds"
class="org.apache.commons.dbcp.BasicDataSource"
destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url"
value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="activemq"/>
<property name="password" value="activemq"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
</beans>


内存消息存储
小数据或者测试情况使用
配置内存消息存储很简单.配置内存消息存储是只需要将broker的persistent属性值设置为false

<?xml version="1.0" encoding="UTF-8"?>
<beans>
  <broker brokerName="test-broker" persistent="false" xmlns="http://activemq.apache.org/schema/core">
    <transportConnectors>
      <transportConnector uri="tcp://localhost:61635"/>
    </transportConnectors>
  </broker>
</beans>


代码启动代理的时候

import org.apache.activemq.broker.BrokerService;
public void createEmbeddedBroker() throws Exception 
{
  BrokerService broker = new BrokerService();
  //configure the broker to use the Memory Store
  broker.setPersistent(false);
  //Add a transport connector
  broker.addConnector("tcp://localhost:61616");
  //now start the broker
  broker.start();
}


缓存
有些情况,如消息持久化到数据库中太慢,消息的有用失效较短等等原因,ActiveMQ通过在代理中使用一种称为订阅恢复策略,为使用这种类型消息的系统缓存消息提供支持。
只支持主题模式,不支持队列模式(除了临时主题和ActiveMQ建议(advisory)主题)。

代理中缓存的消息只能发送到具有消息追溯能力的消费者(译注:打开retroactive开关的消费者),在创建主题消费者时,可以通过设置目的地属性,将主题消费者设置为可追溯的(retroactive).


Topic topic = session.createTopic("TEST.TOPIC?consumer.retroactive=true");
如上就开始了该属性。

在代理端,消息缓存通过一个名称为subscriptionRecoveryPolicy(订阅恢复策略)的目的地策略来控制.默认的
订阅恢复策略是FixedSizeSubscriptionRecoveryPolicy.

固定尺寸订阅恢复策略
这种策略根据使用的内存大小对每个主题的可缓存消息数量做了限制.这也是ActiveMQ默认的订阅恢复策略.你可以为所有主题设置一种缓存策略,或者给每一个主题单独设置


固定尺寸订阅恢复策略可配置属性
maximumSize         6553600         缓存可用的内存大小(单位:byte)
useSharedBuffer     true            如果设置为true,maximumSize设置的内存大小为所有主题的缓存总额(而不是每个主题缓存都可以达到maximumSize设置的大小)

固定数量策略
该策略根据主题中消息的数量来限制消息缓存.只有一个属性可设置,
maximumSize         100               每个主题中可缓存的消息数量最大值

基于查询的策略             
该策略根据应用到每个消息的JMS属性选择器来限制缓存消息数量.只有一个可设置参数
query               null                当消息符合设置选择器时,缓存消息.

时间策略
该策略根据消息的过期时间来缓存主题中的消息.注意,消息的过期时间是独立的,该过期时间由消息生产者设置消息的timeToLive参数决定.
recoverDuration     60000                 消息的缓存时间(毫秒)


最终映像策略
该策略仅缓存发送到主题的最后一个消息.在实时的价格信息中,每个主题就是一个价格信息,这个策略很有用,
因为你可能仅仅关注最后一个发送给主题的价格信息.


无缓存策略
该策略禁止缓存主题消息.该策略也没有配置属性.

在ActiveMQ代理的配置中,可以为每个独立的主题配置subscriptionRecoveryPolicy,或者也可以使用通配符
为多个主题配置策略.下面是配置代码样例:

<?xml version="1.0" encoding="UTF-8"?>
<beans>
  <broker brokerName="test-broker" persistent="true" useShutdownHook="false"
  deleteAllMessagesOnStartup="true"
  xmlns="http://activemq.apache.org/schema/core">
    <transportConnectors>
      <transportConnector uri="tcp://localhost:61635"/>
    </transportConnectors>
    
    <destinationPolicy>
      <policyMap>
        <policyEntries>
        
          <policyEntry topic="Topic.FixedSizedSubs.>">
            <subscriptionRecoveryPolicy>
              <fixedSizeSubscriptionRecoveryPolicy maximumSize="2000000" useSharedBuffer="false"/>
            </subscriptionRecoveryPolicy>
          </policyEntry>
          
          <policyEntry topic="Topic.LastImageSubs.>">
            <subscriptionRecoveryPolicy>
              <lastImageSubscriptionRecoveryPolicy/>
            </subscriptionRecoveryPolicy>
          </policyEntry>
          
          <policyEntry topic="Topic.NoSubs.>">
            <subscriptionRecoveryPolicy>
              <noSubscriptionRecoveryPolicy/>
            </subscriptionRecoveryPolicy>
          </policyEntry>
        
          <policyEntry topic="Topic.TimedSubs.>">
            <subscriptionRecoveryPolicy>
              <timedSubscriptionRecoveryPolicy recoverDuration="25000"/>
            </subscriptionRecoveryPolicy>
          </policyEntry>
        </policyEntries>
      </policyMap>
    </destinationPolicy>
  </broker>
</beans>
  • 大小: 66.6 KB
  • 大小: 69 KB
  • 大小: 21.6 KB
分享到:
评论

相关推荐

    activemq实战

    - **存储机制**:介绍ActiveMQ中消息存储的各种策略,如基于内存的存储、基于文件系统的持久化存储等。 - **性能优化**:探讨不同存储策略对系统性能的影响,以及如何根据业务需求选择合适的存储方案。 - **安全...

    ActiveMQ消息过期时间设置和自动清除解决方案

    死信队列(DLQ)是用于存储那些无法正常投递或处理的消息。这些消息可能因为各种原因无法被正确处理,最终被发送到DLQ中。在ActiveMQ中,有多种策略可供选择,以决定如何处理这些死信消息。 ##### 直接抛弃死信队列 ...

    7道消息队列ActiveMQ面试题!

    ActiveMQ的存储机制包括非持久化消息和持久化消息两种方式。非持久化消息存储在内存中,而持久化消息则存储在磁盘文件中。当内存中的非持久化消息过多时,ActiveMQ会将这些消息写入临时文件来腾出内存。如果磁盘空间...

    ActiveMQ消息中间件面试专题1

    通过理解服务器宕机时的数据存储策略,处理丢消息的策略,优化持久化消息的发送,调整prefetch机制以确保消息公平分配,以及利用死信队列来处理异常情况,我们可以更好地利用ActiveMQ构建健壮的分布式系统。

    activemq in action

    - ActiveMQ消息存储 - ActiveMQ的安全性配置 4. **ActiveMQ在Action中的示例应用** #### 消息中间件与Java消息服务(JMS) **消息中间件**是一种软件系统,用于实现分布式应用程序之间的通信。它通过在不同系统...

    ActiveMQ消息服务器 v6.0.1.zip

    2. 高性能和可扩展性:ActiveMQ采用了高效的存储机制和网络协议,能够处理大量并发连接和高频率的消息交换。通过集群和负载均衡,可以轻松扩展以满足不断增长的需求。 3. 路由和过滤:ActiveMQ提供丰富的消息路由和...

    activemq消息持久化所需Jar包

    3. **数据库连接驱动**:ActiveMQ支持多种持久化策略,其中一种是使用关系型数据库(如MySQL、PostgreSQL)来存储消息。因此,根据你选择的数据库,需要引入相应的数据库驱动Jar包,例如`mysql-connector-java.jar`...

    Manning.ActiveMQ.in.Action.Mar.2011

    本章详细介绍了ActiveMQ的消息存储机制。它涵盖了不同的存储选项(如基于内存的存储、持久化存储等),以及如何根据不同的性能和可靠性需求来选择合适的存储策略。 - **第6章:保护ActiveMQ的安全性** 安全性是...

    ActiveMQ消息总线介绍

    ### ActiveMQ消息总线介绍 #### 一、消息中间件(Message-Oriented Middleware, MOM)概述 消息中间件是一种软件技术,它通过在不同系统之间传输和分发消息来连接网络中的独立系统。这种技术的核心是围绕一个队列...

    ActiveMQ消息中间件面试题.pdf

    根据消息是否需要持久化,ActiveMQ将消息分别存储在内存和文件中: - **非持久化消息**:默认情况下存储在内存中,当内存不足时会溢出到临时文件中。 - **持久化消息**:始终存储在磁盘文件中,重启后可以从磁盘恢复...

    activemq-store-mongodb.zip

    activemq-store-mongodb 是一个使用mongodb实现的activemq 消息存储。 为了在配置文件支持mongodb存储,需要稍微改一下activemq的XSD和serviceloader配置文件,参见此处。 标签:activemq

    ActiveMQ 消息队列

    6. **高效的消息持久化机制**:ActiveMQ提供了两种持久化选项—JDBC和Journal存储,以满足不同应用的性能和可靠性需求。 7. **灵活的集群配置**:ActiveMQ的设计考虑到了高可用性和负载均衡的需求,支持客户端-...

    ActiveMQ In Action

    - **配置ActiveMQ消息存储**:可以配置使用不同类型的存储方案,如AMQ消息存储等。 - **AMQ消息存储的工作原理**:该存储机制如何确保消息的一致性和可靠性。 - **安全配置**: - **保护ActiveMQ**:通过配置...

    ActiveMQ消息中间件面试专题.pdf

    Apache ActiveMQ是一个开源的消息中间件,它属于Apache软件基金会。ActiveMQ旨在提供一种可靠的消息传递机制,以支持应用程序之间的异步通信。作为面向消息的中间件(MOM),ActiveMQ实现了JMS(Java Message ...

    ActiveMQ消息服务器 v5.18.3.zip

    1. **核心消息引擎**:这是ActiveMQ的核心部分,负责接收、存储、路由和传递消息。它使用高效的内存存储和可选的持久化机制来确保高可用性和数据安全性。 2. **多种协议支持**:ActiveMQ支持多种消息协议,如JMS、...

    ActiveMQ消息服务配置

    3. **data**:数据与日志目录,存储消息队列的数据和日志文件。 #### 三、安装与启动ActiveMQ 在正式环境中推荐将ActiveMQ安装为系统服务。安装过程如下: 1. **安装服务**:在`bin\win32`(64位系统下为`bin\win...

    activeMq消息队列demo

    - **broker**:ActiveMQ服务器,负责存储、路由和管理消息。 2. **安装与配置ActiveMQ** - 下载ActiveMQ的二进制包并解压。 - 运行`bin/activemq start`启动服务。 - 访问`http://localhost:8161/admin`以查看...

    SpringBoot+ActiveMq+MQTT实现消息的发送和接收

    ActiveMQ可以存储和转发各种消息类型,包括文本、对象、文件以及流数据。在SpringBoot中集成ActiveMQ,可以方便地处理消息的生产和消费。 MQTT协议是为设备到设备的通信设计的,特别适合物联网(IoT)场景。它的主要...

    spring + activemq 消息sample

    7. **消息持久化**:ActiveMQ支持消息的持久化存储,即使在服务器重启后,未被消费的消息也不会丢失。 8. **监控与管理**:ActiveMQ提供了一个Web控制台,可以实时查看消息队列的状态、监控性能等。 在实际项目中...

Global site tag (gtag.js) - Google Analytics