为了长时间的存储和管理消息,一般会使用数据库。在Activemq中默认使用的是Derby DB。当然也可更改配置来使用其他的DB。Activemq支持以下这些DB:
- Apache Derby
- Axion
- DB2
- HSQL
- Informix
- MaxDB
- MySQL
- Oracle
- Postgresql
- SQLServer
- Sybase
如果要使用不在上面列表中的DB,可以通过配置SQL语句和JDBC驱动来支持自己的DB。Broker在启动的时候读取配置文件,若在配置文件中指定了特定的JDBC驱动,则会在classpath路径下自动检测配置的JDBC驱动。下面是关于oracle的一个配置:
<beans ...>
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="localhost">
...
<persistenceAdapter>
<jdbcPersistenceAdapter
dataDirectory="${activemq.base}/data"
dataSource="#oracle-ds"/>
</persistenceAdapter>
...
</broker>
<!-- Oracle DataSource Sample Setup -->
<bean id="oracle-ds"
class="org.apache.commons.dbcp.BasicDataSource"
destroy-method="close">
<property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>
<property name="url" value="jdbc:oracle:thin:@localhost:1521:AMQDB"/>
<property name="username" value="scott"/>
<property name="password" value="tiger"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
</beans>
在InstallDir/conf/activemq-jdbc.xml有一个jdbc配置样例。
在使用JDBC持久化的时候根据是否支持Activemq提供的高效的journal文件分为两种。
1、支持高效日志
在消息消费者能跟上生产者的速度时,journal文件能大大减少需要写入到DB中的消息。举个例子:生产者产生了10000个消息,这10000个消息会保存到journal文件中,但是消费者的速度很快,在journal文件还未同步到DB之前,以消费了9900个消息。那么后面就只需要写入100个消息到DB了。如果消费者不能跟上生产者的速度,journal文件可以使消息以批量的方式写入DB中,JDBC驱动进行DB写入的优化。从而提升了性能。另外,journal文件支持JMS事务的一致性。
下面是一段支持高效日志的JDBC持久化的配置:
<beans ...>
<broker ...>
...
<persistenceFactory>
<journaledJDBC journalLogFiles="5" dataSource="#mysql-ds" />
</persistenceFactory>
...
<broker>
...
<bean id="mysql-ds"
class="org.apache.commons.dbcp.BasicDataSource"
destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="activemq"/>
<property name="password" value="activemq"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
</beans>
journaledJDBC 节点的有以下可配置属性:
属性 | 默认值 | 描述 |
adapter | | 指定持久存储的DB |
createTablesOnStartup | true | 是否在启动的时候创建相应的表 |
dataDirectory | activemq-data | 指定Derby存储数据文件的路径 |
dataSource | #derby | 指定要引用的数据源 |
journalArchiveDirectory | | 指定归档日志文件存储的路径 |
journalLogFiles | 2 | 指定日志文件的数量 |
journalLogFileSize | 20MB | 指定每个日志文件的大小 |
journalThreadPriority | 10 | 指定写入日志的线程的优先级 |
useDatabaseLock | true | 在主从配置的时候是否使用排他锁 |
useJournal | true | 指定是否使用日志 |
2、不使用高效日志
下面是不使用高效日志的配置:
<beans ...>
<broker ...>
...
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#derby-ds" />
</persistenceAdapter>
...
<broker>
...
<bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">
<property name="databaseName" value="derbydb"/>
<property name="createDatabase" value="create"/>
</bean>
</beans>
可以看出,使用高效日志的配置使在persistenceFactory节点内的。而不使用高效日志的使配置在persistenceAdapter节点内的。
下面是jdbcPersistenceAdapter 节点可配置的属性:
属性 | 默认值 | 描述 |
adapter | | 指定持久存储的DB |
cleanupPeriod | 300000 | 指定删除DB中已收到确认信息的消息的时间周期(ms) |
createTablesOnStartup | true | 是否在启动的时候创建相应的表 |
databaseLocker | DefaultDatabaseLocker instance | 防止多个broker同时访问DB的锁实例 |
dataDirectory | activemq-data | 指定Derby存储数据文件的路径 |
dataSource | #derby | 指定要引用的数据源 |
lockAcquireSleepInterval | 1000 | 等待获取锁的时间长度(ms) |
lockKeepAlivePeriod | 30000 | 定期向lock表中写入当前时间,指定了锁的存活时间 |
useDatabaseLock | true | 在主从配置的时候是否使用排他锁 |
transactionIsolation | Connection.TRANSACTION_READ_UNCOMMITTED | 指定事务隔离级别 |
如果需要配置的DB不再Activemq默认支持的DB列表中,可以通过配置SQL语句和JDBC持久层来实现。在journaledJDBC节点下配置statements 节点来指定JDBC持久层对各类数据的处理方式。下面是一个样例:
<persistenceFactory>
<journaledJDBC ...>
<statements>
<statements stringIdDataType ="VARCHAR(128)"/>
</statements>
</journaledJDBC>
</persistenceFactory>
statements 节点的各个属性:
属性 | 默认值 | 描述 |
tablePrefix | | 指定创建的表名的前缀 |
messageTableName | ACTIVEMQ_MSGS | 存储消息内容的表名 |
durableSubAcksTableName | ACTIVEMQ_ACKS | 存储持久订阅者的确认消息的表名 |
lockTableName | ACTIVEMQ_LOCK | lock表名 |
binaryDataType | BLOB | 指定存放消息的数据类型 |
containerNameDataType | VARCHAR(250) | 指定存放Destination名称的数据类型 |
msgIdDataType | VARCHAR(250) | 存放消息ID的数据类型 |
sequenceDataType | INTEGER | 存放消息序列ID的数据类型 |
longDataType | BIGINT | DB中存放JAVA中long型的数据类型 |
stringIdDataType | VARCHAR(250) | 存放长字符串的DB数据类型 |
定制自己插入和读取数据的SQL语句,可以设置下面的这些属性
- addMessageStatement
- updateMessageStatement
- removeMessageStatement
- findMessageSequenceIdStatement
- findMessageStatement
- findAllMessagesStatement
- findLastSequenceIdInMsgsStatement
- findLastSequenceIdInAcksStatement
- createDurableSubStatement
- findDurableSubStatement
- findAllDurableSubsStatement
- updateLastAckOfDurableSubStatement
- deleteSubscriptionStatement
- findAllDurableSubMessagesStatement
- findDurableSubMessagesStatement
- findAllDestinationsStatement
- removeAllMessagesStatement
- removeAllSubscriptionsStatement
- deleteOldMessagesStatement
- lockCreateStatement
- lockUpdateStatement
- nextDurableSubscriberMessageStatement
- durableSubscriberMessageCountStatement
- lastAckedDurableSubscriberMessageStatement
- destinationMessageCountStatement
- findNextMessageStatement
- createSchemaStatements
- dropSchemaStatements
设置adapter属性可以设定JDBC适配器存储和访问BLOB字段的方式,可设置的方式有:
- org.activemq.store.jdbc.adapter.BlobJDBCAdapter
- org.activemq.store.jdbc.adapter.BytesJDBCAdapter
- org.activemq.store.jdbc.adapter.DefaultJDBCAdapter
- org.activemq.store.jdbc.adapter.ImageJDBCAdapter
具体需要查看JDBC的驱动数据库的文档。
<broker persistent="true" ...>
...
<persistenceFactory>
<journaledJDBC adapter="org.activemq.store.jdbc.adapter.BlobJDBCAdapter" ... />
</persistenceFactory>
...
</broker>
分享到:
相关推荐
在分布式系统中,消息持久化是指当消息代理(如ActiveMQ)接收到消息后,会将其存储到磁盘中,即使服务器重启或出现故障,也能保证这些消息不会丢失。这在高可用性和容错性方面扮演着关键角色。 要实现ActiveMQ的...
标题中的“ActiveMQ MySQL 持久化”指的是在使用ActiveMQ消息中间件时,将消息数据存储到MySQL数据库中以实现数据的持久化。ActiveMQ是Apache软件基金会的一个开源项目,它是一个功能丰富的消息代理,支持多种消息...
1. **配置持久化策略**:在ActiveMQ的配置文件中,需要开启消息持久化。这通常涉及修改`activemq.xml`,设置`<destinationPolicy>`元素中的`<policyEntry>`,将`persistent`属性设为`true`,以确保消息在存储和传输...
总结起来,ActiveMQ中的Topic持久化涉及到消息和订阅的持久化,通过合理的配置和编程接口,我们可以确保在系统故障后,消息传递的连续性和完整性。在实际应用中,了解和掌握这部分知识对于构建可靠和容错的分布式...
在生产环境中,为了保证消息的可靠性,通常会使用持久化存储来保存消息,即使在服务重启后也能恢复数据。本主题主要探讨如何将ActiveMQ配置为使用MySQL 8作为其持久化存储方式,以及在这个过程中所需的Jar包。 1. *...
消息队列:ActiveMQ:ActiveMQ消息持久化机制.docx
在 ActiveMQ 中,可以通过设置时间戳插件来实现消息过期时间设置。该插件可以根据消息的过期时间来删除消息。配置示例如下: ... <!-- 86,400,000ms = 1 day --> ... 其中,ttlCeiling 表示过期时间...
标题中的"activemq持久化jdbc所需jar包.zip"指的是Apache ActiveMQ消息中间件在使用JDBC(Java Database Connectivity)进行消息持久化时所需的库文件集合。ActiveMQ是一款开源、高性能、跨语言的企业级消息代理,它...
spring +activemq topic消息持久化订阅实例,整个项目中有activemq和spring的整合的所有实例,topic的持久化配置是在ApplicationContext3C、ApplicationContext3C2以及ApplicationContext3P三个中,消息生产者:...
ActiveMQ 持久化是指将消息队列持久化到数据库或文件中,以便在断电或崩溃后恢复消息队列。可以使用 Apache ActiveMQ 的持久化机制,例如使用 KahaDB 或 AMQP 等。 集群环境 ActiveMQ 集群环境是指多个 ActiveMQ ...
在本示例中,我们将深入探讨如何将Spring框架与ActiveMQ集成,以便实现消息队列(Queue)和主题(Topic)的功能,并确保消息的持久化。ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它支持多种消息协议,如...
标题中的“activemq-5.15.15 JDBC持久化mysql8.0+的activemq.xml”指的是Apache ActiveMQ的一个特定版本(5.15.15)配置文件,该配置文件用于实现消息队列的数据持久化,通过JDBC连接MySQL 8.0以上的版本。ActiveMQ...
在面试中,面试官可能会问到关于ActiveMQ的一些基础和深入的问题,比如ActiveMQ的特性、消息传递机制、故障处理、消息持久化、性能调优以及消息消费等方面的知识。 1. ActiveMQ的核心概念和功能 ActiveMQ提供了多种...
4. **持久化级别**:ActiveMQ 允许用户选择不同的消息持久化级别,例如,可以选择仅持久化消息头,或者同时持久化消息头和正文。这可以根据性能和数据完整性需求进行调整。 5. **事务管理**:在 ActiveMQ 中,可以...
此外,ActiveMQ支持多种协议和特性,如topic、持久化、事务消息等,可以根据项目需求进一步探索和利用。 这个简单的Demo展示了如何在Spring Boot中集成ActiveMQ进行消息接收。通过这种方式,你可以构建出一个可靠的...
在实际应用中,ActiveMQ的配置和使用可能更复杂,需要考虑安全性、性能优化、持久化、网络拓扑等因素。同时,JMS规范也提供了许多高级特性,如消息选择器、消息组、消息优先级等,这些都可以根据业务需求进行灵活...
8. **消息持久化**:ActiveMQ支持消息持久化,即使broker重启,消息也不会丢失。这通过在消息发送时设置消息的持久性标志实现。如果消息被持久化,那么即使消费者在消息到达时未在线,当它重新连接时也能接收到消息...
当内存中的非持久化消息堆积到一定程度时,ActiveMQ会将它们写入临时文件,以腾出内存空间。如果文件大小达到配置的最大限制,消息生产者会阻塞,但消费者仍能连接并消费消息。在持久化消息达到文件大小限制时,生产...
默认情况下,ActiveMQ中的非持久化消息采用异步发送,而持久化消息则采用同步发送方式,这可能导致在某些硬件环境下发送速度缓慢。为了解决这一问题,可以通过以下方法提高效率: - 在发送持久化消息时启用事务模式...