`

第六章 ActiveMQ 消息存储

 
阅读更多

                                                         ActiveMQ 信息存储

章节导读
  •    消息在Topic和QUEUE中是怎么存储的
  •    ActiveMQ提供的四种存储消息的方式
  •    ActiveMQ怎么为消费者缓存消息
  •    使用订阅恢复撤销控制信息缓存

                Jms应用只有持久化和非持久化两种消息传输方式.用持久化的方式传输消息必须被记录到可靠的存储器中,非持久化的传输方式则不需要.

               Activemq对消息的存储策略是可配置的,支持存储在内存,文件系统,以及关系型数据库中.

 1.1 消息在Topic和QUEUE中是怎么存储的

               在Topic和Queue中存储消息的方式是不同的,因为有许多可以用于优化存储的配置条件对Topic有用,然而却对Queue没有任何影响.

                 在队列中存储消息的非常简单的--先进先出(FIFO),当消息被确认消费之后才会从队列中删除.

                 对于持久的Topic订阅,每个消费者都得到消息的一个副本,为了节省存储空间,只有其中之一的副本存储在Broker中,一个持久的订阅者对象包括指向下一个待获取的消息并拷贝它的副本给消费者.

因为每个消费者的消费速度不同,每个消息也有很多的消费者,直到这个消息被所有订阅消费者消费它才会被删除.

                

 

1.2  KahaDb消息存储

            KahaDb存储是ActiveMq5.3之后推荐的一种存储通用消息的消息存储方式,这是一个基于文件的消息存储相结合的事务日志.可靠的消息存储和恢复,并且有很好的性能和扩展性.

            KahaDb消息存储对它的索引使用事务日志并且对所有的Destination只使用一个文件索引.它已经被用于10000个活跃的连接,每个连接都有单独的队列的生成环境.它的可配置性可以让它通过调优适用于高吞吐的应用.(例如:交易平台)或者存储大量的消息(例如:GPS追踪).

           要在ActiveMq中启用KahaDb,需要在activemq.xml中加入<persistenceAdapter>节点.

         

<broker brokerName="broker" persistent="true" useShutdownHook="false">
      <persistenceAdapter>
    <kahaDB directory="activemq-data" journalMaxFileLength="16mb"/>
   </persistenceAdapter>
...
</broker>

        如果你要使用内嵌式的代理,它也可以实现,

	public class EmbeddedBrokerUsingAMQStoreExample {
		BrokerService createEmbeddedBroker() throws Exception {
			BrokerService broker = new BrokerService();
			File dataFileDir = new File("target/amq-in-action/kahadb");
			KahaDBStore kaha = new KahaDBStore();
			kaha.setDirectory(dataFileDir);
			// Using a bigger journal file
			kaha.setJournalMaxFileLength(1024 * 100);
			// small batch means more frequent and smaller writes
			kaha.setIndexWriteBatchSize(100);
			// do the index write in a separate thread
			kaha.setEnableIndexWriteAsync(true);
			broker.setPersistenceAdapter(kaha);
			// create a transport connector
			broker.addConnector("tcp://localhost:61616");
			// start the broker
			broker.start();
			return broker;
		}
	}

 

      1.2.1 KahaDb消息存储的内部结构

           kahadb是最快的一种消息存储方式,它的速度是基于和由数据日志文件组成的事务日志组合的结果,高度优化的消息Id索引.下图是它的主要组成部分.
                                                   

  • data logs作为消息日志,由滚动的日志消息和命令(事务边界,消息删除),存储在数据文件的长度组成.,当使用的数据文件已经到了上线,一个新的数据文件就会被创建.所有在数据文件中的消息被计算在内,一旦这个消息不在被需要,数据文件可以被删除或归档.在日志文件中,消息总是追加在当前文件的最后,所以存储数据很快.
  • cache会为了有效的消费者临时的保留消息.如果消息被发送的同时也在存储计划中,如果消息及时被确认接收,它们就不需要被写入磁盘.
  • BTree索引保留消息在日志文件中的引用(基于它们消息Id的索引).索引包含了队列的FIFO数据结构和订阅者在主题消息中的指针.redo日志用于如果ActiveMq代理没有正常的关闭,以及确保BTree索引维护的完整性.

         1.2.2 KahaDb消息存储的目录结构

          当你启动一个配置KahaDb的ActiveMq代理时,一个目录用于存储持久化信息的目录将会被创建.目录结构如下图所示.

                                                                                  

  •  归档目录—只有归档属性被激活才会存在.它用于存储不在被KahaDb需要的日志文件.它使得消息可以被在之后的某个时刻恢复.如果归档没有被启用(默认),日志文件在不需要的时候会被删除.
  • db.data —文件包含被保存在数据日志的BTree索引
  • db.redo—用于恢复BTree索引.

       1.2.3  KahaDb消息存储的配置

              

KahaDb的可用配置条件
树形名称 默认值 描述
directory activemq-data 用于KahaDb的目录路径
indexWriteBatchSize 1000 批量写入磁盘的索引页数量
indexWriteBatchSize 10000 缓存在内存中的索引页数量
enableIndexWriteAsync false 如果设置,将会同步写入索引
journalMaxFileLength 32mb 设置每个消息数据日志的最大大小
enableJournalDiskSyncs true 保证每个非事务日志写入紧随磁盘同步自后
cleanupInterval 30000 移除不需要的消息日志文件的时间延迟
checkpointInterval 5000 检查日志的时间间隔
ignoreMissingJournalfiles false 如果启用将会忽略丢失的消息日志文件
checkForCorruptJournalFiles false 如果启用,在启动时会验证消息日志文件时候被破坏
checksumJournalFiles false 如果启用将会为每个消息日志文件提供一个报头校验
archiveDataLogs false 如果启用,将会移动消息日志文件到归档目录而不是删除它
directoryArchive null 定义一个当他们包含的所有消息被消费后数据日志被移动的目录
databaseLockedWaitDelay 10000 获取数据库锁的间隔
maxAsyncJobs 10000 最大数量的排队等待存储的异步消息数量.
concurrentStoreAndDispatchTransactions true 允许分发消息和事务存储同时发生
concurrentStoreAndDispatchTransactions true 允许分发主题消息和消息存储同时发生
concurrentStoreAndDispatchQueues true 允许分发队列消息和消息存储同时发生

              

       ActiveMq为消息存储提供了一个扩展式的Api,有三个额外的实现

  •  AMQ消息存储 --为新能设计的基于文件的消息存储
  • JDBC消息存储 --基于JDbcDE 消息存储
  • 内存消息存储--基于内存的消息存储

   

1.3  AMQ消息存储

     

       类似KahaDb,拥有高性能的索引,可以适用吞吐量大的系统.但是因为它对每个索引都创建两个单独的文件,每个destination创建一个单独的索引,它就不适用于一个代理下挂着成百上千个队列的情况.如果没有正常的关闭,恢复也是很慢的.因为所有的索引需要被重建,这又需要代理遍历所有的数据日志来准确的建立索引.

  

      1.3.1 AMQ消息存储的内部结构

           
          

  •       data logs ---消息日志
  •       cache ----在被写入日志之后,它提供消息在内存中的快速检索
  •      The reference store    --- 它们保存在日志中被它们的id索引的消息引用

    1.3.2  AMQ消息存储目录结构

          

          它包含了所有在机器上运行的代理的子目录.所以强烈建议每一个代理使用唯一的名字(默认是localhost).下图是它的具体结构.

      
                                                       

  •        lock file --确保在任何时间只有一个代理可以访问数据.它用于在同一个系统中存储多个拥有相同名字的代理的热备份.
  •      tmp-storage--用来存储不在被存储在代理内存中的非持久化消息(等待被一下比较慢的消费者消费).
  •     kr-store 
  •    the journal directory -- 包含数据日志和数据控制文件(包含元信息).它被作为计数引用,当所有的信息被发送,它就会被删除或者归档.
  •  archive --如果归档属性开启,它就会存在.用来存储归档文件

   1.3.3  AMQ消息存储的配置

         

 

     以下是在activemq.xml中配置amq的例子:

    

<?xml version="1.0" encoding="UTF-8"?>
<beans>
<broker xmlns="http://activemq.apache.org/schema/core">
  <persistenceAdapter>
    <amqPersistenceAdapter
          directory="target/Broker2-data/activemq-data"          syncOnWrite="true"  indexPageSize="16kb"  indexMaxBinSize="100"maxFileLength="10mb" />
  </persistenceAdapter>
</broker>
</beans>
 

 

1.4  JDBC消息存储

      略;

 

1.5  内存消息存储

      略;

1.6  为消费者缓存消息

 

    ActiveMq提供了为那些在代理中使用消息缓存提供了一种订阅恢复策略.它决定了哪种消息会被缓存,会被缓存多久

 

     1.6.1  消息缓存如何为消费者工作.

          Mq消息代理为每个topic在内存中缓存消息.唯一不支持的是临时topic和advisory topic.被代理缓存的消息只发送给可追溯的消费者,不发送给持久订阅者.

         设置为可追溯的消费者的代码如下:

Topic topic = session.createTopic("TEST.TOPIC?consumer.retroactive=true");
 

 

    1.6.2 Mq订阅恢复政策

  • 固定大小订阅恢复政策----它限制topic缓存消息的数量基于它们使用的内存.它是默认的订阅恢复政策.

     
     
  • 固定数量订阅恢复政策----它限制topic缓存消息的数量基于一个基本的静态值.

     
  • 基于查询的订阅恢复政策----它限制topic缓存消息的数量基于一个JMS的属性选择器.

     
  • 基于时间的订阅恢复政策----它限制topic缓存消息的数量基于一个有效期时间.它可以被MessageProducer设置

     
  • 最近的订阅恢复政策--它只缓存最后一个消息.
  • 无订阅恢复政策--禁用消息缓存
   1.6.3  如何配置订阅恢复政策
         
<?xml version="1.0" encoding="UTF-8"?>
<beans>
	<broker brokerName="test-broker" persistent="true" useShutdownHook="false" deleteAllMessagesOnStartup="true" xmlns="http://activemq.apache.org/schema/core">
		<transportConnectors>
			<transportConnector uri="tcp://localhost:61635"/>
		</transportConnectors>
		<destinationPolicy>
			<policyMap>
				<policyEntries>
					<policyEntry topic="Topic.FixedSizedSubs.>">
						<subscriptionRecoveryPolicy>
							<fixedSizeSubscriptionRecoveryPolicy maximumSize="2000000" useSharedBuffer="false"/>
						</subscriptionRecoveryPolicy>
					</policyEntry>
					<policyEntry topic="Topic.LastImageSubs.>">
						<subscriptionRecoveryPolicy>
							<lastImageSubscriptionRecoveryPolicy/>
						</subscriptionRecoveryPolicy>
					</policyEntry>
					<policyEntry topic="Topic.NoSubs.>">
						<subscriptionRecoveryPolicy>
							<noSubscriptionRecoveryPolicy/>
						</subscriptionRecoveryPolicy>
					</policyEntry>
					<policyEntry topic="Topic.TimedSubs.>">
						<subscriptionRecoveryPolicy>
							<timedSubscriptionRecoveryPolicy recoverDuration="25000"/>
						</subscriptionRecoveryPolicy>
					</policyEntry>
				</policyEntries>
			</policyMap>
		</destinationPolicy>
	</broker>
</beans
 
  • 大小: 22.4 KB
  • 大小: 36.2 KB
  • 大小: 47.1 KB
  • 大小: 18.7 KB
  • 大小: 28.7 KB
  • 大小: 31.5 KB
  • 大小: 24.4 KB
  • 大小: 51.7 KB
  • 大小: 11.5 KB
  • 大小: 7.2 KB
  • 大小: 6.9 KB
  • 大小: 7.6 KB
分享到:
评论

相关推荐

    activeMQ视频

    4. **持久化**:ActiveMQ支持消息的持久化存储,确保消息在网络故障或系统重启后不会丢失。 5. **管理工具**:提供Web控制台和命令行工具,方便用户监控、管理和调试消息代理。 6. **事务支持**:JMS事务允许...

    apache-activemq-5.15.8-bin.tar.gz

    3. **lib** 目录:这里包含了运行ActiveMQ所需的库文件,包括JMS实现、网络通信、持久化存储等第三方依赖库。 4. **data** 目录(可能需要用户手动创建):默认情况下,ActiveMQ会将消息存储在这个目录下,包括日志...

    Manning.ActiveMQ.in.Action.Mar.2011

    - **第6章:保护ActiveMQ的安全性** 安全性是任何企业级应用的关键组成部分。本章重点讨论了如何配置ActiveMQ以确保消息传递的安全性,包括身份验证、授权和加密等方面的技术细节。 #### 核心知识点 1. **面向...

    activemq安装包

    - **消息持久化**:了解ActiveMQ如何在磁盘上存储消息,以及如何在故障后恢复消息。 - **消息确认**:理解不同的确认模式,如自动确认、客户端确认和DUPS_OK确认,以及它们在不同场景下的应用。 - **消息优先级和...

    window系统搭建activeMQ集群和操作步骤

    - 启动第一个节点:在命令行中,进入ActiveMQ目录并执行`bin\activemq start`命令。 - 启动后续节点:在其他实例中,修改`conf/jetty.xml`以使用不同的端口,避免冲突,然后同样启动。 5. **验证集群** - 使用...

    Delphi ActiveMQ 使用帮助

    - 持久化消息存储。 - 支持多种传输协议,如 STOMP、AMQP 等。 - 高度可配置的安全性和管理功能。 #### 四、HabariActiveMQClient 许可证 HabariActiveMQClient 遵循特定的许可证协议,用户在使用前需要了解其许可...

    基于ActiveMQ的消息总线逻辑与物理架构设计详解

    ActiveMQ是一个开源的消息代理和中间件,属于Apache软件基金会下的一个项目,其主要功能是提供高效可靠的消息传递系统,能够支持企业级应用的异步通信。消息总线(Message Bus)是一种设计模式,用于模块间的通信,...

    2021Java字节跳动面试题——面向字节_ActiveMQ.pdf

    - **非持久化消息堆积**:当客户端发送大量非持久化消息导致内存压力时,可能会触发 ActiveMQ 将消息写入临时文件的操作,此操作会阻塞所有其他操作,可能导致 `java.net.SocketException` 异常,进而导致消息丢失。...

    linux 下apache-activemq.zip

    这个文件定义了ActiveMQ的核心配置,包括消息存储、网络连接等。根据需求,你可以调整这些设置以优化性能或安全性。 3. **启动和停止Apache ActiveMQ**: 使用bin目录下的脚本来启动和停止服务: ``` cd /opt/...

    0927分布式消息通信-ActiveMQ1

    6. **域模型**:点对点模型适用于一对一的消息传递,而发布/订阅模型支持一对多的消息广播。 7. **可靠性机制**:ActiveMQ支持事务型和非事务型消息确认,如自动确认、客户端确认和重复数据确认。此外,本地事务和...

    activeMQ所需的头文件和静态库

    6. **事务和持久化**:理解如何在ActiveMQ中使用事务来保证消息的一致性,以及如何启用持久化以防止消息丢失。 7. **故障恢复和高可用性**:了解ActiveMQ的集群和网络恢复策略,确保服务的连续性和可靠性。 8. **...

    activemq-all-5.15.2.jar 和 jms-1.1.jar

    同时,ActiveMQ通过优化的消息存储和调度策略,能够处理高并发和大规模的消息量,保持良好的性能。 7. **扩展性与社区支持**: 由于Apache ActiveMQ是一个开源项目,因此它拥有广泛的社区支持和活跃的开发者参与,这...

    activemq5.15.0-3

    在JMS规范中,ActiveMQ作为消息代理,它接收、存储和转发消息。它支持多种协议,包括OpenWire、AMQP、STOMP、MQTT和WS-Notification,这使得它能够与各种不同类型的客户端和应用程序无缝集成。 ActiveMQ 5.15.0-3的...

    linux下使用的apache-activemq-5.9.0-bin包.zip

    Apache ActiveMQ是Apache软件基金会的一款开源消息中间件,它基于Java Message Service (JMS) 规范,提供了高效、可靠的异步消息传递功能。在Linux环境下使用`apache-activemq-5.9.0-bin.tar.gz`压缩包,我们可以...

    apache-activemq-5.6.0-bin.tar.gz

    Apache ActiveMQ是世界上最流行的开源消息代理和集成中间件,它基于Java消息服务(JMS)规范,提供高效、可靠的异步通信。这个压缩包“apache-activemq-5.6.0-bin.tar.gz”包含了Apache ActiveMQ 5.6.0版本的可执行...

    activemq-5.15.6

    6. **examples目录**:提供了各种示例,展示了如何使用ActiveMQ创建生产者、消费者,以及如何设置不同的消息传输选项,如点对点、发布/订阅模式等。 7. **webapps目录**:ActiveMQ内置了一个基于HTTP的Web控制台,...

    activemq1.14.4

    - **持久化**: ActiveMQ提供消息的持久化存储,即使在服务器重启后也能保证消息不丢失。 - **高性能**: 通过优化的内存管理和多线程处理,ActiveMQ可以处理高并发的消息传递。 - **集群**: 支持多台服务器组成的集群...

    ActiveMQ 代码

    3. **持久化**:ActiveMQ 提供消息的持久化存储,即使在服务器宕机后也能恢复未处理的消息。 4. **集群和高可用性**:ActiveMQ 可以组建集群,提供故障转移和负载均衡,保证服务的高可用性。 5. **网络传输**:支持...

    ActiveMQDemo

    在C++MQDemo中,我们可以看到开发者如何利用C++与ActiveMQ进行交互,尽管ActiveMQ主要以Java API闻名,但通过适配器或第三方库,C++同样可以接入ActiveMQ的消息传递功能。 首先,我们需要了解ActiveMQ的基本概念。...

Global site tag (gtag.js) - Google Analytics