本人实现的功能为activemq将消息持久化到数据库的方法:
1:前言
这一段给公司开发消息总线有机会研究ActiveMQ,今天撰文给大家介绍一下他的持久化消息。本文只介绍三种方式,分别是持久化为文件,MYSql,Oracle。下面逐一介绍。
A:持久化为文件
这个你装ActiveMQ时默认就是这种,只要你设置消息为持久化就可以了。涉及到的配置和代码有
<kahaDB directory="${activemq.base}/data/kahadb"/>
</persistenceAdapter>
producer.Send(request, MsgDeliveryMode.Persistent, level, TimeSpan.MinValue);
B:持久化为MySql
你首先需要把MySql的驱动放到ActiveMQ的Lib目录下,我用的文件名字是:mysql-connector-java-5.0.4-bin.jar
接下来你修改配置文件
<jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#derby-ds"/>
</persistenceAdapter>
在配置文件中的broker节点外增加
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="activemq"/>
<property name="password" value="activemq"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
从配置中可以看出数据库的名称是activemq,你需要手动在MySql中增加这个库。
然后重新启动消息队列,你会发现多了3张表
1:activemq_acks
2:activemq_lock
3:activemq_msgs
C:持久化为Oracle
和持久化为MySql一样。这里我说两点
1;在ActiveMQ安装文件夹里的Lib文件夹中增加Oracle的JDBC驱动。驱动文件位于Oracle客户端安装文件中的product\11.1.0\client_1\jdbc\lib文件夹下。
2:
<property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>
<property name="url" value="jdbc:oracle:thin:@10.53.132.47:1521:test"/>
<property name="username" value="qdcommu"/>
<property name="password" value="qdcommu"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
这里的jdbc:oracle:thin:@10.53.132.47:1521:test按照自己实际情况设置一下就可以了,特别注意的是test是SID即服务名称而不是TNS中配置的节点名。各位同学只需要替换IP,端口和这个SID就可以了。
消息消费者的事先代码:
package easyway.activemq.app; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; /*** * 消息持久化到数据库 * @author longgangbai */ public class MessageCustomer { private static Logger logger=LogManager.getLogger(MessageProductor.class); private String username=ActiveMQConnectionFactory.DEFAULT_USER; private String password=ActiveMQConnectionFactory.DEFAULT_PASSWORD; private String url=ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; private static String QUEUENAME="ActiveMQ.QUEUE"; protected static final int messagesExpected = 10; protected ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( url+"?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries="+messagesExpected); /*** * 创建Broker服务对象 * @return * @throws Exception */ public BrokerService createBroker()throws Exception{ BrokerService broker=new BrokerService(); broker.addConnector(url); return broker; } /** * 启动BrokerService进程 * @throws Exception */ public void init() throws Exception{ BrokerService brokerService=createBroker(); brokerService.start(); } /** * 接收的信息 * @return * @throws Exception */ public int receiveMessage() throws Exception{ Connection connection=connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(true, Session.SESSION_TRANSACTED); return receiveMessages(messagesExpected,session); } /** * 接受信息的方法 * @param messagesExpected * @param session * @return * @throws Exception */ protected int receiveMessages(int messagesExpected, Session session) throws Exception { int messagesReceived = 0; for (int i=0; i<messagesExpected; i++) { Destination destination = session.createQueue(QUEUENAME); MessageConsumer consumer = session.createConsumer(destination); Message message = null; try { logger.debug("Receiving message " + (messagesReceived+1) + " of " + messagesExpected); message = consumer.receive(2000); logger.info("Received : " + message); if (message != null) { session.commit(); messagesReceived++; } } catch (Exception e) { logger.debug("Caught exception " + e); session.rollback(); } finally { if (consumer != null) { consumer.close(); } } } return messagesReceived; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } }
消息生产者的代码:
package easyway.activemq.app; import java.io.File; import java.util.Properties; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.sql.DataSource; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.store.jdbc.adapter.MySqlJDBCAdapter; import org.apache.commons.dbcp.BasicDataSourceFactory; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import easyway.activemq.app.utils.BrokenPersistenceAdapter; /** * 消息持久化到数据库 * @author longgangbai * */ public class MessageProductor { private static Logger logger=LogManager.getLogger(MessageProductor.class); private String username=ActiveMQConnectionFactory.DEFAULT_USER; private String password=ActiveMQConnectionFactory.DEFAULT_PASSWORD; private String url=ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; private static String queueName="ActiveMQ.QUEUE"; private BrokerService brokerService; protected static final int messagesExpected = 10; protected ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "tcp://localhost:61617?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries="+messagesExpected); /*** * 创建Broker服务对象 * @return * @throws Exception */ public BrokerService createBroker()throws Exception{ BrokerService broker=new BrokerService(); BrokenPersistenceAdapter jdbc=createBrokenPersistenceAdapter(); broker.setPersistenceAdapter(jdbc); jdbc.setDataDirectory(System.getProperty("user.dir")+File.separator+"data"+File.separator); jdbc.setAdapter(new MySqlJDBCAdapter()); broker.setPersistent(true); broker.addConnector("tcp://localhost:61617"); //broker.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL); return broker; } /** * 创建Broken的持久化适配器 * @return * @throws Exception */ public BrokenPersistenceAdapter createBrokenPersistenceAdapter() throws Exception{ BrokenPersistenceAdapter jdbc=new BrokenPersistenceAdapter(); DataSource datasource=createDataSource(); jdbc.setDataSource(datasource); jdbc.setUseDatabaseLock(false); //jdbc.deleteAllMessages(); return jdbc; } /** * 创建数据源 * @return * @throws Exception */ public DataSource createDataSource() throws Exception{ Properties props=new Properties(); props.put("driverClassName", "com.mysql.jdbc.Driver"); props.put("url", "jdbc:mysql://localhost:3306/activemq"); props.put("username", "root"); props.put("password", "root"); DataSource datasource=BasicDataSourceFactory.createDataSource(props); return datasource; } /** * 启动BrokerService进程 * @throws Exception */ public void init() throws Exception{ createBrokerService(); brokerService.start(); } public BrokerService createBrokerService() throws Exception{ if(brokerService==null){ brokerService=createBroker(); } return brokerService; } public void sendMessage() throws JMSException{ Connection connection=connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(queueName); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); for(int i=0;i<messagesExpected;i++){ logger.debug("Sending message " + (i+1) + " of " + messagesExpected); producer.send(session.createTextMessage("test message " + (i+1))); } connection.close(); } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } }
持久化适配器类
package easyway.activemq.app.utils; import java.io.IOException; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * @author longgangbai * */ public class BrokenPersistenceAdapter extends JDBCPersistenceAdapter { private final Logger LOG = LoggerFactory.getLogger(BrokenPersistenceAdapter.class); private boolean shouldBreak = false; @Override public void commitTransaction(ConnectionContext context) throws IOException { if ( shouldBreak ) { LOG.warn("Throwing exception on purpose"); throw new IOException("Breaking on purpose"); } LOG.debug("in commitTransaction"); super.commitTransaction(context); } public void setShouldBreak(boolean shouldBreak) { this.shouldBreak = shouldBreak; } }
测测试代码如下:
package easyway.activemq.app.test; import easyway.activemq.app.MessageProductor; public class MessageProductorTest { public static void main(String[] args) throws Exception { MessageProductor productor =new MessageProductor(); productor.init(); productor.sendMessage(); //productor.createBrokerService().stop(); } }
package easyway.activemq.app.test; import easyway.activemq.app.MessageCustomer; public class MessageCustomerTest { public static void main(String[] args) throws Exception { MessageCustomer customer=new MessageCustomer(); //customer.init(); //当两台机器在不同的服务器上启动客户端的broker进程 customer.receiveMessage(); } }
备注:运行过程为:首先执行MessageProductorTest,MessageCustomerTest。
mysql数据库activemq必须存在。关于消息持久化的表结构如下:
相关推荐
标题中的“ActiveMQ MySQL 持久化”指的是在使用ActiveMQ消息中间件时,将消息数据存储到MySQL数据库中以实现数据的持久化。ActiveMQ是Apache软件基金会的一个开源项目,它是一个功能丰富的消息代理,支持多种消息...
4. **持久化级别**:ActiveMQ 允许用户选择不同的消息持久化级别,例如,可以选择仅持久化消息头,或者同时持久化消息头和正文。这可以根据性能和数据完整性需求进行调整。 5. **事务管理**:在 ActiveMQ 中,可以...
在分布式系统中,消息持久化是指当消息代理(如ActiveMQ)接收到消息后,会将其存储到磁盘中,即使服务器重启或出现故障,也能保证这些消息不会丢失。这在高可用性和容错性方面扮演着关键角色。 要实现ActiveMQ的...
标题中的"activemq持久化jdbc所需jar包.zip"指的是Apache ActiveMQ消息中间件在使用JDBC(Java Database Connectivity)进行消息持久化时所需的库文件集合。ActiveMQ是一款开源、高性能、跨语言的企业级消息代理,它...
- ActiveMQ允许用户选择不同的持久化机制,包括文件系统(KahaDB)和关系数据库(如MySQL)。 - 持久化确保了消息在服务器崩溃或重启时不会丢失,增强了系统的可靠性。 2. **配置MySQL8**: - 首先,你需要在...
ActiveMQ 持久化是指将消息队列持久化到数据库或文件中,以便在断电或崩溃后恢复消息队列。可以使用 Apache ActiveMQ 的持久化机制,例如使用 KahaDB 或 AMQP 等。 集群环境 ActiveMQ 集群环境是指多个 ActiveMQ ...
你可以通过修改`activemq.xml`配置文件来设置持久化存储,比如将消息存储到MySQL数据库。在配置中指定数据源,如下所示: ```xml ${activemq.data} <driverClassName>...
总结来说,这个配置文件展示了如何配置ActiveMQ 5.15.15使用JDBC和MySQL 8.0+进行消息持久化,以及解决XML中特殊字符转义的问题,以确保ActiveMQ能够正确地连接并使用MySQL数据库存储和检索消息。这种配置适用于需要...
ActiveMQ 提供了两种主要的持久化机制:持久化到文件和持久化到数据库。 1. **持久化到文件**:这是 ActiveMQ 默认的持久化方式,它将消息存储在文件系统中。通过修改 `activemq.xml` 配置文件,你可以配置 ...
在ActiveMQ中,为了实现消息持久化,我们通常会利用JDBC(Java Database Connectivity)来存储消息数据。这确保了即使在服务器重启或故障后,消息依然能够被恢复,从而保持系统的高可用性和可靠性。本主题将详细讲解...
同时,它提供多种持久化机制,如文件系统、数据库存储,确保即使在服务中断后也能恢复消息。 5. 安全性:ActiveMQ内置了用户认证和授权机制,可以通过SSL/TLS加密传输,保障数据的安全性。 二、ActiveMQ应用场景 1....
2. **持久化机制**:ActiveMQ提供基于文件的持久化,确保即使在服务器重启后,消息也不会丢失。此外,还支持JDBC和JPA等数据库持久化,增强系统的可靠性。 3. **高可用性**:通过集群和复制策略,ActiveMQ可以实现...
总之,"Active mq jdbc持久化所需要的包.rar"提供了使用MySQL数据库进行ActiveMQ消息持久化的必要组件。正确配置并使用这些组件,可以在保持消息可靠性的同时,利用数据库的优势来管理和保护消息数据。
KahaDB是ActiveMQ自有的轻量级数据库,提供了高效的日志管理和持久化,而LevelDB则是Google开发的一个键值对存储系统,提供更快的读写性能。 此外,ActiveMQ还支持主题(Topic)和队列(Queue)两种消息模式。主题...
在ActiveMQ中,消息持久化和高速缓存是两个关键概念,它们确保了即使在服务器重启或网络故障等情况下,消息仍然能够被正确地存储和检索。本篇将详细讲解如何利用JDBC实现ActiveMQ的消息存储持久化以及高速缓存。 1....
总结来说,"activemq-5.15+mysqljdbc配置.zip"提供了ActiveMQ的一个定制化版本,它集成了MySQL数据库作为持久化存储,并使用Durid作为连接池。这种配置适用于那些需要强大数据持久化和恢复能力的场景,但也需要对...
描述中提到的“ActiveMQ持久化所需jar包”意味着这些jar文件是用于确保ActiveMQ的消息持久化的关键组件。持久化是确保即使在系统崩溃或断电后,消息仍然能够被正确处理的重要特性。以下是每个jar文件的功能: 1. **...
5. **持久化**:ActiveMQ提供消息持久化,即使服务器重启,消息也不会丢失,这对于实现可靠的业务流程至关重要。 6. **网络传输优化**:ActiveMQ使用高效的网络协议OpenWire,可以降低延迟,提高消息传输效率。 7....
8. **持久化机制**:ActiveMQ支持多种持久化机制,包括本地文件系统、LevelDB、JDBC数据库等,可以根据需求选择适合的策略。 9. **消息优先级**:在某些应用场景中,消息的优先级是重要的。ActiveMQ允许为消息分配...