本文简单介绍ActiveMQ使用Mysql数据库实现消息的持久化存储
一、ActiveMQ配置修改
二、代码示例
一、ActiveMQ配置修改
对于ActiveMQ需要保证消息的可靠性,需要持久化进行存储,默认情况下使用kahadb进行数据的默认持久化存储技术,同时也可以使用leveldb、mysql、oracle
此次,使用mysql对消息进行持久化操作。
1、active.xml文件的修改
(1)数据源配置信息
<persistenceAdapter> <!-- 默认使用kahadb进行持久化操作,保证消息的可靠性 --> <kahaDB directory="${activemq.data}/kahadb"/> --> <!-- 当前使用mysql进行数据的持久化操作 --> <jdbcPersistenceAdapter dataSource="#mysql-ds" /> </persistenceAdapter>
<!-- 设置mysql数据源的配置信息 --> <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver" /> <property name="url" value="jdbc:mysql://localhost:3306/test" /> <property name="username" value="root" /> <property name="password" value="***"/> <property name="maxActive" value="200" /> <property name="poolPreparedStatements" value="true" /> </bean>
(2)消息队列中消息的优先级
<policyEntries> <policyEntry topic=">" > <!-- The constantPendingMessageLimitStrategy is used to prevent slow topic consumers to block producers and affect other consumers by limiting the number of messages that are retained For more information, see: http://activemq.apache.org/slow-consumer-handling.html --> <pendingMessageLimitStrategy> <constantPendingMessageLimitStrategy limit="1000"/> </pendingMessageLimitStrategy> </policyEntry> <!-- 指定队列中消息的优先级 queue 自定义的队列的名称--> <policyEntry queue="persistMysql" prioritizedMessages="true"/> </policyEntries>
消息优先级说明:
消息的优先级有0-9十个级别的优先级,0-4为普通的消息,5-9为加急消息,如果不指定优先级,默认为4。JMS不严格按照这十个优先级发送消息,但必须保证单次加急消息要先于普通消息到达,并不能保证顺序消费机制。
2、ActiveMQ安转目录lib下需要添加如下jar
mysql-connector-java-5.1.45.jar
commons-dbcp-1.4.jar
commons-pool-1.5.4.jar
二、代码示例
(1)生产者
package com.chinasoft.activemqv1; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; /** * ActiveMQ消息的生产者 * * 存储技术使用Mysql保证数据的可靠性 * * @author Freedom * */ public class Producer { private ConnectionFactory f = null; private Connection c = null; private Session session = null; private Destination d = null; private MessageProducer p = null; public Producer() { try { f = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616"); c = f.createConnection(); // 创建连接对象 // 创建好连接对象后要打开连接 c.start(); session = c.createSession(false, Session.AUTO_ACKNOWLEDGE); d = session.createQueue("persistMysql"); p = session.createProducer(null); // 发送消息时指定消息的目的地 } catch (JMSException e) { e.printStackTrace(); } } public void sender() { // 创建一个 MapMessage类型的消息 try { MapMessage msg = session.createMapMessage(); msg.setStringProperty("name", "cc");// 利用此方法设置的消息,当消费端使用选择器可以对消息进行过滤 msg.setIntProperty("age", 26); msg.setIntProperty("salary", 5600); MapMessage msg1 = session.createMapMessage(); msg1.setStringProperty("name", "zs");// 利用此方法设置的消息,当消费端使用选择器可以对消息进行过滤 msg1.setIntProperty("age", 22); msg1.setIntProperty("salary", 4000); MapMessage msg2 = session.createMapMessage(); msg2.setStringProperty("name", "lsi");// 利用此方法设置的消息,当消费端使用选择器可以对消息进行过滤 msg2.setIntProperty("age", 21); msg2.setIntProperty("salary", 9100); MapMessage msg3 = session.createMapMessage(); msg3.setStringProperty("name", "nb");// 利用此方法设置的消息,当消费端使用选择器可以对消息进行过滤 msg3.setIntProperty("age", 19); msg3.setIntProperty("salary", 3600); MapMessage msg4 = session.createMapMessage(); msg4.setStringProperty("name", "ww");// 利用此方法设置的消息,当消费端使用选择器可以对消息进行过滤 msg4.setIntProperty("age", 27); msg4.setIntProperty("salary", 7600); // 生产者发送消息 // 默认情况,数据时需要进行持久化操作,可以指定DeliveryMode不进行初始化操作 // 2指定消息的优先级,需要active.xml配置 <policyEntity /> p.send(d, msg1, DeliveryMode.PERSISTENT, 2, 1000 * 60 * 1L); p.send(d, msg2); p.send(d, msg3); p.send(d, msg4); } catch (JMSException e) { e.printStackTrace(); } finally { if (c != null) { try { c.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } public static void main(String[] args) { Producer pro = new Producer(); pro.sender(); } }
(2)消费者
消费者中使用了消息监听机制,监听MQ上的消息,并处理消息
package com.chinasoft.activemqv1; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; /** * ActiveMQ消费者 * * @author Freedom * */ public class Consumer { private static final String SELECTOR = "age>21 and salary>4000"; private ConnectionFactory f = null; private Connection c = null; private Session session = null; private Destination d = null; private MessageConsumer mc = null; public Consumer() { try { f = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616"); c = f.createConnection(); c.start(); session = c.createSession(false, Session.AUTO_ACKNOWLEDGE); d = session.createQueue("persistMysql"); mc = session.createConsumer(d, SELECTOR);// 第二个参数为一个selctor选择器用于筛选数据,满足SQL92规范 } catch (JMSException e) { e.printStackTrace(); } } public void recevice() { // 消费端创建一个监听类,监听MQ上消息并读取消息 try { mc.setMessageListener(new MessageListener() { @Override public void onMessage(Message m) { if (m instanceof MapMessage) { System.out.println("消费者接受到的消息**** " + m); } } }); } catch (JMSException e) { e.printStackTrace(); } } public static void main(String[] args) { Consumer c = new Consumer(); c.recevice(); } }
执行完成,打开数据库表,会生成三个表
xxx_msgs表记录生产者发送的消息信息,如果消费者消费完成,则会清空表中的消息
相关推荐
本主题主要探讨如何将ActiveMQ配置为使用MySQL 8作为其持久化存储方式,以及在这个过程中所需的Jar包。 1. **ActiveMQ与持久化**: - ActiveMQ允许用户选择不同的持久化机制,包括文件系统(KahaDB)和关系数据库...
MySQL作为持久化存储,可以提供高可靠性和可扩展性。通过配置ActiveMQ,我们可以设置将消息写入MySQL的数据表中,当服务重新启动时,可以从数据库中读取这些消息,继续处理。 描述中提到的博客链接可能详细解释了...
- 在这个项目中,Eclipselink用于将聊天记录等数据持久化到MySQL数据库中。 3. **JPA (Java Persistence API)**: - JPA是Java标准,定义了如何在Java应用中管理和持久化对象。 - 它提供了一套API来映射Java类到...
Hibernate则是一个持久化框架,它简化了数据库操作,将Java对象与数据库表映射,实现了对象关系映射(ORM)。 **easyUI**: EasyUI是一个基于jQuery的UI库,主要用于快速构建美观、响应式的用户界面。它包含丰富的...
总结来说,"activemq-5.15+mysqljdbc配置.zip"提供了ActiveMQ的一个定制化版本,它集成了MySQL数据库作为持久化存储,并使用Durid作为连接池。这种配置适用于那些需要强大数据持久化和恢复能力的场景,但也需要对...
标题中的“activemq-5.15.15 JDBC持久化mysql8.0+的activemq.xml”指的是Apache ActiveMQ的一个特定版本(5.15.15)配置文件,该配置文件用于实现消息队列的数据持久化,通过JDBC连接MySQL 8.0以上的版本。ActiveMQ...
4. **KahaDB存储引擎**:这是ActiveMQ内置的默认持久化存储,不需要额外的数据库。KahaDB是一个轻量级、高效的日志文件系统,提供消息持久化。`kahaDB-store.jar` 是与KahaDB相关的库。 5. **OpenJPA**:OpenJPA是...
在描述中提到的"activemq消息持久化所需Jar包,亲测可用"表明这些jar文件是经过验证的,可以确保在ActiveMQ配置为使用JDBC存储时正常工作。持久化是确保消息在系统故障后仍然可恢复的关键特性,JDBC持久化利用数据库...
你可以通过修改`activemq.xml`配置文件来设置持久化存储,比如将消息存储到MySQL数据库。在配置中指定数据源,如下所示: ```xml ${activemq.data} <driverClassName>...
在ActiveMQ中,JDBC持久化意味着消息存储和检索依赖于关系型数据库,如MySQL,这提供了高可靠性和数据安全性。 针对"Active mq jdbc持久化所需要的包.rar"这个压缩文件,我们可以推断它包含了实现ActiveMQ使用JDBC...
在ActiveMQ中,为了实现消息持久化,我们通常会利用JDBC(Java Database Connectivity)来存储消息数据。这确保了即使在服务器重启或故障后,消息依然能够被恢复,从而保持系统的高可用性和可靠性。本主题将详细讲解...
在本系统中,MySql 存储了所有核心业务数据,如用户信息、医疗记录、预约详情等,为系统的数据持久化提供了支持。 5. **MQ(Message Queue)**: 消息队列如 RabbitMQ 或者 ActiveMQ 在系统中起着重要的通信角色。...
2. **持久化到数据库**:为了提高可靠性,ActiveMQ 还可以配置为将消息存储在关系型数据库(如 MySQL 或 PostgreSQL)中。同样,这需要在 `activemq.xml` 配置文件中进行设置,包括连接数据库的详细信息以及具体的...
总结来说,ActiveMQ的JDBC消息持久化和高速缓存是通过配置数据源,使用MySQL存储消息,同时利用内存中的高速缓存提高性能。在实践中,根据实际需求调整配置,如数据目录的位置,可以优化系统性能并确保数据的安全性...
对于与数据库的集成,如`activemq数据库,验证持久化标准配置.txt`所示,ActiveMQ支持使用JDBC进行持久化,确保在故障恢复时数据的完整性。 总之,`activemq.xml`配置文件是管理ActiveMQ核心行为的核心,通过细致地...
- 本文档提到使用MySQL作为ActiveMQ的持久化存储。首先,需要在MySQL中创建一个用户账号并授予必要的权限。 ```sql CREATE USER 'activemq'@'%' IDENTIFIED BY 'activemq'; GRANT ALL PRIVILEGES ON activemq.* ...
配置Master/Slave环节中,需要修改ActiveMQ配置文件activemq.xml,在其中添加数据源配置,并调整persistenceAdapter的设置以适应JDBC方式的持久化。配置文件修改完成后,需要将修改后的配置文件复制到另一台虚拟机的...
在ActiveMQ 5.14.5中,重点优化了消息持久化机制,确保即使在服务器宕机或硬件故障后,已经发送但未被消费的消息也能被安全恢复。这依赖于其高效的数据存储模型,可以使用文件系统或数据库(如MySQL或PostgreSQL)来...
此文件名表明该压缩包用于在ActiveMQ中集成JDBM数据库作为持久化存储。 ActiveMQ作为消息中间件,主要职责是接收、存储和转发消息,确保消息的可靠传输。它支持多种协议,如OpenWire、STOMP、AMQP、MQTT等,并且...