`
不平凡的人
  • 浏览: 35784 次
  • 性别: Icon_minigender_1
  • 来自: 嘉峪关
社区版块
存档分类
最新评论

ActiveMQ+Mysql持久化存储

 
阅读更多

本文简单介绍ActiveMQ使用Mysql数据库实现消息的持久化存储

 

一、ActiveMQ配置修改

二、代码示例

 

 

一、ActiveMQ配置修改

对于ActiveMQ需要保证消息的可靠性,需要持久化进行存储,默认情况下使用kahadb进行数据的默认持久化存储技术,同时也可以使用leveldb、mysql、oracle

此次,使用mysql对消息进行持久化操作。

 

1、active.xml文件的修改

(1)数据源配置信息

  

        <persistenceAdapter>
		    <!-- 默认使用kahadb进行持久化操作,保证消息的可靠性 -->
            <kahaDB directory="${activemq.data}/kahadb"/> 
			-->
			
		    <!-- 当前使用mysql进行数据的持久化操作 -->
			<jdbcPersistenceAdapter dataSource="#mysql-ds" />
			
        </persistenceAdapter>

 

<!-- 设置mysql数据源的配置信息 -->
	<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
	      <property name="driverClassName" value="com.mysql.jdbc.Driver" />
		  <property name="url" value="jdbc:mysql://localhost:3306/test" />
		  <property name="username" value="root" />
		  <property name="password" value="***"/>
		  <property name="maxActive" value="200" />
		  <property name="poolPreparedStatements" value="true" />
	</bean>

 

 

(2)消息队列中消息的优先级

              <policyEntries>
                <policyEntry topic=">" >
                    <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:

                         http://activemq.apache.org/slow-consumer-handling.html

                    -->
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
				
				<!-- 指定队列中消息的优先级  queue 自定义的队列的名称-->
				<policyEntry queue="persistMysql"  prioritizedMessages="true"/>
				
              </policyEntries>

 

消息优先级说明:

消息的优先级有0-9十个级别的优先级,0-4为普通的消息,5-9为加急消息,如果不指定优先级,默认为4。JMS不严格按照这十个优先级发送消息,但必须保证单次加急消息要先于普通消息到达,并不能保证顺序消费机制。

 

 

2、ActiveMQ安转目录lib下需要添加如下jar

mysql-connector-java-5.1.45.jar

commons-dbcp-1.4.jar

commons-pool-1.5.4.jar

 

二、代码示例

(1)生产者

package com.chinasoft.activemqv1;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * ActiveMQ消息的生产者
 * 
 * 存储技术使用Mysql保证数据的可靠性
 * 
 * @author Freedom
 *
 */
public class Producer {

	private ConnectionFactory f = null;
	private Connection c = null;
	private Session session = null;
	private Destination d = null;
	private MessageProducer p = null;

	public Producer() {

		try {

			f = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
					ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616");
			c = f.createConnection(); // 创建连接对象
			// 创建好连接对象后要打开连接
			c.start();

			session = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
			d = session.createQueue("persistMysql");
			p = session.createProducer(null); // 发送消息时指定消息的目的地

		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

	public void sender() {

		// 创建一个 MapMessage类型的消息
		try {
			MapMessage msg = session.createMapMessage();
			msg.setStringProperty("name", "cc");// 利用此方法设置的消息,当消费端使用选择器可以对消息进行过滤
			msg.setIntProperty("age", 26);
			msg.setIntProperty("salary", 5600);

			MapMessage msg1 = session.createMapMessage();
			msg1.setStringProperty("name", "zs");// 利用此方法设置的消息,当消费端使用选择器可以对消息进行过滤
			msg1.setIntProperty("age", 22);
			msg1.setIntProperty("salary", 4000);

			MapMessage msg2 = session.createMapMessage();
			msg2.setStringProperty("name", "lsi");// 利用此方法设置的消息,当消费端使用选择器可以对消息进行过滤
			msg2.setIntProperty("age", 21);
			msg2.setIntProperty("salary", 9100);

			MapMessage msg3 = session.createMapMessage();
			msg3.setStringProperty("name", "nb");// 利用此方法设置的消息,当消费端使用选择器可以对消息进行过滤
			msg3.setIntProperty("age", 19);
			msg3.setIntProperty("salary", 3600);

			MapMessage msg4 = session.createMapMessage();
			msg4.setStringProperty("name", "ww");// 利用此方法设置的消息,当消费端使用选择器可以对消息进行过滤
			msg4.setIntProperty("age", 27);
			msg4.setIntProperty("salary", 7600);

			// 生产者发送消息
			// 默认情况,数据时需要进行持久化操作,可以指定DeliveryMode不进行初始化操作
			// 2指定消息的优先级,需要active.xml配置 <policyEntity />

			p.send(d, msg1, DeliveryMode.PERSISTENT, 2, 1000 * 60 * 1L);
			p.send(d, msg2);
			p.send(d, msg3);
			p.send(d, msg4);

		} catch (JMSException e) {
			e.printStackTrace();
		} finally {
			if (c != null) {
				try {
					c.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}

	}

	public static void main(String[] args) {

		Producer pro = new Producer();
		pro.sender();

	}

}

 

(2)消费者

消费者中使用了消息监听机制,监听MQ上的消息,并处理消息

package com.chinasoft.activemqv1;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * ActiveMQ消费者
 * 
 * @author Freedom
 *
 */
public class Consumer {

	private static final String SELECTOR = "age>21 and salary>4000";

	private ConnectionFactory f = null;
	private Connection c = null;
	private Session session = null;
	private Destination d = null;
	private MessageConsumer mc = null;

	public Consumer() {

		try {

			f = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
					ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616");
			c = f.createConnection();
			c.start();

			session = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
			d = session.createQueue("persistMysql");
			mc = session.createConsumer(d, SELECTOR);// 第二个参数为一个selctor选择器用于筛选数据,满足SQL92规范

		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

	public void recevice() {

		// 消费端创建一个监听类,监听MQ上消息并读取消息
		try {
			mc.setMessageListener(new MessageListener() {

				@Override
				public void onMessage(Message m) {

					if (m instanceof MapMessage) {

						System.out.println("消费者接受到的消息****  " + m);
					}

				}
			});
		} catch (JMSException e) {
			e.printStackTrace();
		}

	}

	public static void main(String[] args) {
		Consumer c = new Consumer();
		c.recevice();
	}

}

 

执行完成,打开数据库表,会生成三个表


 

xxx_msgs表记录生产者发送的消息信息,如果消费者消费完成,则会清空表中的消息



 
 

 

  • 大小: 67.1 KB
  • 大小: 3.8 KB
  • 大小: 19.9 KB
分享到:
评论

相关推荐

    ActiveMQ配置Mysql8为持久化方式所需Jar包.rar

    本主题主要探讨如何将ActiveMQ配置为使用MySQL 8作为其持久化存储方式,以及在这个过程中所需的Jar包。 1. **ActiveMQ与持久化**: - ActiveMQ允许用户选择不同的持久化机制,包括文件系统(KahaDB)和关系数据库...

    activeMQ mysql 持久化

    MySQL作为持久化存储,可以提供高可靠性和可扩展性。通过配置ActiveMQ,我们可以设置将消息写入MySQL的数据表中,当服务重新启动时,可以从数据库中读取这些消息,继续处理。 描述中提到的博客链接可能详细解释了...

    SpringMvc+Eclipselink+JPA+ActiveMQ+MySQL在线聊天

    - 在这个项目中,Eclipselink用于将聊天记录等数据持久化到MySQL数据库中。 3. **JPA (Java Persistence API)**: - JPA是Java标准,定义了如何在Java应用中管理和持久化对象。 - 它提供了一套API来映射Java类到...

    管理系统系列--基于SSH+easyUI+ActiveMQ+MySQL的校园宿舍管理系统。 分为系统管理员、楼宇管理.zip

    Hibernate则是一个持久化框架,它简化了数据库操作,将Java对象与数据库表映射,实现了对象关系映射(ORM)。 **easyUI**: EasyUI是一个基于jQuery的UI库,主要用于快速构建美观、响应式的用户界面。它包含丰富的...

    activemq-5.15+mysqljdbc配置.zip

    总结来说,"activemq-5.15+mysqljdbc配置.zip"提供了ActiveMQ的一个定制化版本,它集成了MySQL数据库作为持久化存储,并使用Durid作为连接池。这种配置适用于那些需要强大数据持久化和恢复能力的场景,但也需要对...

    activemq-5.15.15 JDBC持久化mysql8.0+的activemq.xml.pdf

    标题中的“activemq-5.15.15 JDBC持久化mysql8.0+的activemq.xml”指的是Apache ActiveMQ的一个特定版本(5.15.15)配置文件,该配置文件用于实现消息队列的数据持久化,通过JDBC连接MySQL 8.0以上的版本。ActiveMQ...

    activemq消息持久化所需Jar包

    4. **KahaDB存储引擎**:这是ActiveMQ内置的默认持久化存储,不需要额外的数据库。KahaDB是一个轻量级、高效的日志文件系统,提供消息持久化。`kahaDB-store.jar` 是与KahaDB相关的库。 5. **OpenJPA**:OpenJPA是...

    activemq持久化jdbc所需jar包.zip

    在描述中提到的"activemq消息持久化所需Jar包,亲测可用"表明这些jar文件是经过验证的,可以确保在ActiveMQ配置为使用JDBC存储时正常工作。持久化是确保消息在系统故障后仍然可恢复的关键特性,JDBC持久化利用数据库...

    spring集成activemq演示queue和topic 持久化

    你可以通过修改`activemq.xml`配置文件来设置持久化存储,比如将消息存储到MySQL数据库。在配置中指定数据源,如下所示: ```xml ${activemq.data} &lt;driverClassName&gt;...

    Active mq jdbc持久化所需要的包.rar

    在ActiveMQ中,JDBC持久化意味着消息存储和检索依赖于关系型数据库,如MySQL,这提供了高可靠性和数据安全性。 针对"Active mq jdbc持久化所需要的包.rar"这个压缩文件,我们可以推断它包含了实现ActiveMQ使用JDBC...

    activeMQ使用JDBC所需要的jar包

    在ActiveMQ中,为了实现消息持久化,我们通常会利用JDBC(Java Database Connectivity)来存储消息数据。这确保了即使在服务器重启或故障后,消息依然能够被恢复,从而保持系统的高可用性和可靠性。本主题将详细讲解...

    基于Springboot+Mybatis+Redis+MySql+MQ的校园医疗管理系统.zip

    在本系统中,MySql 存储了所有核心业务数据,如用户信息、医疗记录、预约详情等,为系统的数据持久化提供了支持。 5. **MQ(Message Queue)**: 消息队列如 RabbitMQ 或者 ActiveMQ 在系统中起着重要的通信角色。...

    自己写的ActiveMQ的Demo例子

    2. **持久化到数据库**:为了提高可靠性,ActiveMQ 还可以配置为将消息存储在关系型数据库(如 MySQL 或 PostgreSQL)中。同样,这需要在 `activemq.xml` 配置文件中进行设置,包括连接数据库的详细信息以及具体的...

    3.1 JDBC消息存储持久化jdbc persistenceFactory高速缓存1

    总结来说,ActiveMQ的JDBC消息持久化和高速缓存是通过配置数据源,使用MySQL存储消息,同时利用内存中的高速缓存提高性能。在实践中,根据实际需求调整配置,如数据目录的位置,可以优化系统性能并确保数据的安全性...

    ActiveMQ的activemq.xml详细配置讲解

    对于与数据库的集成,如`activemq数据库,验证持久化标准配置.txt`所示,ActiveMQ支持使用JDBC进行持久化,确保在故障恢复时数据的完整性。 总之,`activemq.xml`配置文件是管理ActiveMQ核心行为的核心,通过细致地...

    ActiveMQ5.12.1 安装与配置.docx

    - 本文档提到使用MySQL作为ActiveMQ的持久化存储。首先,需要在MySQL中创建一个用户账号并授予必要的权限。 ```sql CREATE USER 'activemq'@'%' IDENTIFIED BY 'activemq'; GRANT ALL PRIVILEGES ON activemq.* ...

    ActiveMQ Master/Slave 主从配置

    配置Master/Slave环节中,需要修改ActiveMQ配置文件activemq.xml,在其中添加数据源配置,并调整persistenceAdapter的设置以适应JDBC方式的持久化。配置文件修改完成后,需要将修改后的配置文件复制到另一台虚拟机的...

    apache-activemq-5.14.5。windows版本

    在ActiveMQ 5.14.5中,重点优化了消息持久化机制,确保即使在服务器宕机或硬件故障后,已经发送但未被消费的消息也能被安全恢复。这依赖于其高效的数据存储模型,可以使用文件系统或数据库(如MySQL或PostgreSQL)来...

    activemq-store-jdbm-1.5.jar.zip

    此文件名表明该压缩包用于在ActiveMQ中集成JDBM数据库作为持久化存储。 ActiveMQ作为消息中间件,主要职责是接收、存储和转发消息,确保消息的可靠传输。它支持多种协议,如OpenWire、STOMP、AMQP、MQTT等,并且...

Global site tag (gtag.js) - Google Analytics