`

ActiveMQ 中的消息持久化(二)

 
阅读更多

为了长时间的存储和管理消息,一般会使用数据库。在Activemq中默认使用的是Derby DB。当然也可更改配置来使用其他的DB。Activemq支持以下这些DB: 

  • Apache Derby
  • Axion
  • DB2
  • HSQL
  • Informix
  • MaxDB
  • MySQL
  • Oracle
  • Postgresql
  • SQLServer
  • Sybase


     如果要使用不在上面列表中的DB,可以通过配置SQL语句和JDBC驱动来支持自己的DB。Broker在启动的时候读取配置文件,若在配置文件中指定了特定的JDBC驱动,则会在classpath路径下自动检测配置的JDBC驱动。下面是关于oracle的一个配置: 

Xml代码  收藏代码
  1. <beans ...>  
  2.   <broker xmlns="http://activemq.apache.org/schema/core"  
  3.           brokerName="localhost">  
  4.     ...  
  5.     <persistenceAdapter>  
  6.        <jdbcPersistenceAdapter  
  7.             dataDirectory="${activemq.base}/data"  
  8.             dataSource="#oracle-ds"/>  
  9.     </persistenceAdapter>  
  10.     ...  
  11.   </broker>  
  12.    
  13.   <!-- Oracle DataSource Sample Setup -->  
  14.   <bean id="oracle-ds"  
  15.         class="org.apache.commons.dbcp.BasicDataSource"  
  16.         destroy-method="close">  
  17.     <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>  
  18.     <property name="url" value="jdbc:oracle:thin:@localhost:1521:AMQDB"/>  
  19.     <property name="username" value="scott"/>  
  20.     <property name="password" value="tiger"/>  
  21.     <property name="maxActive" value="200"/>  
  22.     <property name="poolPreparedStatements" value="true"/>  
  23.   </bean>  
  24.   
  25. </beans>  


在InstallDir/conf/activemq-jdbc.xml有一个jdbc配置样例。 

在使用JDBC持久化的时候根据是否支持Activemq提供的高效的journal文件分为两种。 
1、支持高效日志 
在消息消费者能跟上生产者的速度时,journal文件能大大减少需要写入到DB中的消息。举个例子:生产者产生了10000个消息,这10000个消息会保存到journal文件中,但是消费者的速度很快,在journal文件还未同步到DB之前,以消费了9900个消息。那么后面就只需要写入100个消息到DB了。如果消费者不能跟上生产者的速度,journal文件可以使消息以批量的方式写入DB中,JDBC驱动进行DB写入的优化。从而提升了性能。另外,journal文件支持JMS事务的一致性。 
下面是一段支持高效日志的JDBC持久化的配置: 

Xml代码  收藏代码
  1. <beans ...>  
  2.   <broker ...>  
  3.     ...  
  4.   <persistenceFactory>  
  5.     <journaledJDBC journalLogFiles="5" dataSource="#mysql-ds" />  
  6.     </persistenceFactory>  
  7.     ...  
  8.   <broker>  
  9.   ...  
  10. <bean id="mysql-ds"  
  11.       class="org.apache.commons.dbcp.BasicDataSource"  
  12.       destroy-method="close">  
  13.     <property name="driverClassName" value="com.mysql.jdbc.Driver"/>  
  14.     <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>  
  15.     <property name="username" value="activemq"/>  
  16.     <property name="password" value="activemq"/>  
  17.     <property name="poolPreparedStatements" value="true"/>  
  18.   </bean>  
  19. </beans>  


journaledJDBC 节点的有以下可配置属性: 

属性 默认值 描述
adapter   指定持久存储的DB
createTablesOnStartup true 是否在启动的时候创建相应的表
dataDirectory activemq-data 指定Derby存储数据文件的路径
dataSource #derby 指定要引用的数据源
journalArchiveDirectory   指定归档日志文件存储的路径
journalLogFiles 2 指定日志文件的数量
journalLogFileSize 20MB 指定每个日志文件的大小
journalThreadPriority 10 指定写入日志的线程的优先级
useDatabaseLock true 在主从配置的时候是否使用排他锁
useJournal true 指定是否使用日志


2、不使用高效日志 
下面是不使用高效日志的配置: 

Xml代码  收藏代码
  1. <beans ...>  
  2.   <broker ...>  
  3.   ...  
  4.   <persistenceAdapter>  
  5.     <jdbcPersistenceAdapter dataSource="#derby-ds" />  
  6.     </persistenceAdapter>  
  7.     ...  
  8.   <broker>  
  9.   ...  
  10. <bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">  
  11.     <property name="databaseName" value="derbydb"/>  
  12.     <property name="createDatabase" value="create"/>  
  13.   </bean>  
  14. </beans>  


可以看出,使用高效日志的配置使在persistenceFactory节点内的。而不使用高效日志的使配置在persistenceAdapter节点内的。 
下面是jdbcPersistenceAdapter 节点可配置的属性: 

属性 默认值 描述
adapter   指定持久存储的DB
cleanupPeriod 300000 指定删除DB中已收到确认信息的消息的时间周期(ms)
createTablesOnStartup true 是否在启动的时候创建相应的表
databaseLocker DefaultDatabaseLocker instance 防止多个broker同时访问DB的锁实例
dataDirectory activemq-data 指定Derby存储数据文件的路径
dataSource #derby 指定要引用的数据源
lockAcquireSleepInterval 1000 等待获取锁的时间长度(ms)
lockKeepAlivePeriod 30000 定期向lock表中写入当前时间,指定了锁的存活时间
useDatabaseLock true 在主从配置的时候是否使用排他锁
transactionIsolation Connection.TRANSACTION_READ_UNCOMMITTED 指定事务隔离级别


如果需要配置的DB不再Activemq默认支持的DB列表中,可以通过配置SQL语句和JDBC持久层来实现。在journaledJDBC节点下配置statements 节点来指定JDBC持久层对各类数据的处理方式。下面是一个样例: 

Xml代码  收藏代码
  1. <persistenceFactory>  
  2.   <journaledJDBC ...>  
  3.     <statements>  
  4.       <statements stringIdDataType ="VARCHAR(128)"/>  
  5.     </statements>  
  6.   </journaledJDBC>  
  7. </persistenceFactory>  


statements 节点的各个属性: 

属性 默认值 描述
tablePrefix   指定创建的表名的前缀
messageTableName ACTIVEMQ_MSGS 存储消息内容的表名
durableSubAcksTableName ACTIVEMQ_ACKS 存储持久订阅者的确认消息的表名
lockTableName ACTIVEMQ_LOCK lock表名
binaryDataType BLOB 指定存放消息的数据类型
containerNameDataType VARCHAR(250) 指定存放Destination名称的数据类型
msgIdDataType VARCHAR(250) 存放消息ID的数据类型
sequenceDataType INTEGER 存放消息序列ID的数据类型
longDataType BIGINT DB中存放JAVA中long型的数据类型
stringIdDataType VARCHAR(250) 存放长字符串的DB数据类型



定制自己插入和读取数据的SQL语句,可以设置下面的这些属性 

  • addMessageStatement
  • updateMessageStatement
  • removeMessageStatement
  • findMessageSequenceIdStatement
  • findMessageStatement
  • findAllMessagesStatement
  • findLastSequenceIdInMsgsStatement
  • findLastSequenceIdInAcksStatement
  • createDurableSubStatement
  • findDurableSubStatement
  • findAllDurableSubsStatement
  • updateLastAckOfDurableSubStatement
  • deleteSubscriptionStatement
  • findAllDurableSubMessagesStatement
  • findDurableSubMessagesStatement
  • findAllDestinationsStatement
  • removeAllMessagesStatement
  • removeAllSubscriptionsStatement
  • deleteOldMessagesStatement
  • lockCreateStatement
  • lockUpdateStatement
  • nextDurableSubscriberMessageStatement
  • durableSubscriberMessageCountStatement
  • lastAckedDurableSubscriberMessageStatement
  • destinationMessageCountStatement
  • findNextMessageStatement
  • createSchemaStatements
  • dropSchemaStatements


设置adapter属性可以设定JDBC适配器存储和访问BLOB字段的方式,可设置的方式有: 

  • org.activemq.store.jdbc.adapter.BlobJDBCAdapter
  • org.activemq.store.jdbc.adapter.BytesJDBCAdapter
  • org.activemq.store.jdbc.adapter.DefaultJDBCAdapter
  • org.activemq.store.jdbc.adapter.ImageJDBCAdapter


具体需要查看JDBC的驱动数据库的文档。 

Xml代码  收藏代码
  1. <broker persistent="true" ...>  
  2.   ...  
  3.   <persistenceFactory>  
  4.     <journaledJDBC adapter="org.activemq.store.jdbc.adapter.BlobJDBCAdapter" ... />  
  5.   </persistenceFactory>  
  6.   ...  
  7. </broker>  
分享到:
评论

相关推荐

    activemq消息持久化所需Jar包

    在分布式系统中,消息持久化是指当消息代理(如ActiveMQ)接收到消息后,会将其存储到磁盘中,即使服务器重启或出现故障,也能保证这些消息不会丢失。这在高可用性和容错性方面扮演着关键角色。 要实现ActiveMQ的...

    activeMQ mysql 持久化

    标题中的“ActiveMQ MySQL 持久化”指的是在使用ActiveMQ消息中间件时,将消息数据存储到MySQL数据库中以实现数据的持久化。ActiveMQ是Apache软件基金会的一个开源项目,它是一个功能丰富的消息代理,支持多种消息...

    ActiveMQ订阅模式持久化实现

    1. **配置持久化策略**:在ActiveMQ的配置文件中,需要开启消息持久化。这通常涉及修改`activemq.xml`,设置`&lt;destinationPolicy&gt;`元素中的`&lt;policyEntry&gt;`,将`persistent`属性设为`true`,以确保消息在存储和传输...

    ActiveMQ中Topic持久化Demo

    总结起来,ActiveMQ中的Topic持久化涉及到消息和订阅的持久化,通过合理的配置和编程接口,我们可以确保在系统故障后,消息传递的连续性和完整性。在实际应用中,了解和掌握这部分知识对于构建可靠和容错的分布式...

    ActiveMQ配置Mysql8为持久化方式所需Jar包.rar

    在生产环境中,为了保证消息的可靠性,通常会使用持久化存储来保存消息,即使在服务重启后也能恢复数据。本主题主要探讨如何将ActiveMQ配置为使用MySQL 8作为其持久化存储方式,以及在这个过程中所需的Jar包。 1. *...

    消息队列:ActiveMQ:ActiveMQ消息持久化机制.docx

    消息队列:ActiveMQ:ActiveMQ消息持久化机制.docx

    ActiveMQ队列消息过期时间设置和自动清除解决方案.docx

    在 ActiveMQ 中,可以通过设置时间戳插件来实现消息过期时间设置。该插件可以根据消息的过期时间来删除消息。配置示例如下: ... &lt;!-- 86,400,000ms = 1 day --&gt; ... 其中,ttlCeiling 表示过期时间...

    activemq持久化jdbc所需jar包.zip

    标题中的"activemq持久化jdbc所需jar包.zip"指的是Apache ActiveMQ消息中间件在使用JDBC(Java Database Connectivity)进行消息持久化时所需的库文件集合。ActiveMQ是一款开源、高性能、跨语言的企业级消息代理,它...

    spring+activemq topic持久化订阅

    spring +activemq topic消息持久化订阅实例,整个项目中有activemq和spring的整合的所有实例,topic的持久化配置是在ApplicationContext3C、ApplicationContext3C2以及ApplicationContext3P三个中,消息生产者:...

    linux环境下ActiveMQ持久化、集群环境搭建详解

    ActiveMQ 持久化是指将消息队列持久化到数据库或文件中,以便在断电或崩溃后恢复消息队列。可以使用 Apache ActiveMQ 的持久化机制,例如使用 KahaDB 或 AMQP 等。 集群环境 ActiveMQ 集群环境是指多个 ActiveMQ ...

    spring集成activemq演示queue和topic 持久化

    在本示例中,我们将深入探讨如何将Spring框架与ActiveMQ集成,以便实现消息队列(Queue)和主题(Topic)的功能,并确保消息的持久化。ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它支持多种消息协议,如...

    activemq-5.15.15 JDBC持久化mysql8.0+的activemq.xml.pdf

    标题中的“activemq-5.15.15 JDBC持久化mysql8.0+的activemq.xml”指的是Apache ActiveMQ的一个特定版本(5.15.15)配置文件,该配置文件用于实现消息队列的数据持久化,通过JDBC连接MySQL 8.0以上的版本。ActiveMQ...

    7道消息队列ActiveMQ面试题!

    在面试中,面试官可能会问到关于ActiveMQ的一些基础和深入的问题,比如ActiveMQ的特性、消息传递机制、故障处理、消息持久化、性能调优以及消息消费等方面的知识。 1. ActiveMQ的核心概念和功能 ActiveMQ提供了多种...

    ActiveMQ的持久化(数据库)[归类].pdf

    4. **持久化级别**:ActiveMQ 允许用户选择不同的消息持久化级别,例如,可以选择仅持久化消息头,或者同时持久化消息头和正文。这可以根据性能和数据完整性需求进行调整。 5. **事务管理**:在 ActiveMQ 中,可以...

    springboot集成activemq实现消息接收demo

    此外,ActiveMQ支持多种协议和特性,如topic、持久化、事务消息等,可以根据项目需求进一步探索和利用。 这个简单的Demo展示了如何在Spring Boot中集成ActiveMQ进行消息接收。通过这种方式,你可以构建出一个可靠的...

    activeMQ发送消息返回消息

    在实际应用中,ActiveMQ的配置和使用可能更复杂,需要考虑安全性、性能优化、持久化、网络拓扑等因素。同时,JMS规范也提供了许多高级特性,如消息选择器、消息组、消息优先级等,这些都可以根据业务需求进行灵活...

    简单的activemq点对点的同步消息模型

    8. **消息持久化**:ActiveMQ支持消息持久化,即使broker重启,消息也不会丢失。这通过在消息发送时设置消息的持久性标志实现。如果消息被持久化,那么即使消费者在消息到达时未在线,当它重新连接时也能接收到消息...

    ActiveMQ消息中间件面试专题.pdf

    当内存中的非持久化消息堆积到一定程度时,ActiveMQ会将它们写入临时文件,以腾出内存空间。如果文件大小达到配置的最大限制,消息生产者会阻塞,但消费者仍能连接并消费消息。在持久化消息达到文件大小限制时,生产...

    ActiveMQ消息中间件面试题.pdf

    默认情况下,ActiveMQ中的非持久化消息采用异步发送,而持久化消息则采用同步发送方式,这可能导致在某些硬件环境下发送速度缓慢。为了解决这一问题,可以通过以下方法提高效率: - 在发送持久化消息时启用事务模式...

Global site tag (gtag.js) - Google Analytics