`

activemq持久化方式

阅读更多
一、JMS的理解
JMS(Java Message Service)是jcp组织02-03年定义了jsr914规范(http://jcp.org/en/jsr/detail?id=914),它定义了消息的格式和消息传递模式;
消息包括:消息头,消息扩展属性和消息体,其结构看起来与SOAP非常的相似,但一般情况下,SOAP主要关注远程服务调用,而消息则专注于信息的交换;
消息分为:消息生产者,消息服务器和消息消费者。生产者与消费者之间是透明的,生产者在产生消息之后,把消息发送到消息服务器,再由消息服务器发给消费者,因此它们构成了JMS的3点结构;
消息服务器再给消费者时,有2种模式:点到点(ptp: point to point)模式和发布/订阅(publish/subscribe)模式;
ptp:即生产者把消息投递到消息服务器后,这条消息只能由某一个消费者使用;
发布/订阅:顾名思义,就是共享消息了,只要愿意,消费者都可以监听消息;

二、消息服务器(ActiveMQ)
消息服务器在JMS的3点结构中起着重要作用,没有它,生产者的消息不知道如何投递出去,消费者不知道从哪里取得消息,它同样是隔离生产者和消费者的关键部分…………
JMS消息服务器有很多:ActiveMQ、Jboss MQ、Open MQ、RabbitMQ、ZeroMQ等等。
本文介绍的是开源的Java实现的Apache ActiveMQ(http://activemq.apache.org),它的特性在首页就能看到,我就不再介绍了;

1、下载AMQ:http://activemq.apache.org/download.html,最新版本是5.5.0;
2、解压apache-activemq-5.5.0-bin.zip文件到文件系统(比如D:\ActiveMQ-5.5.0);
3、执行bin/activemq.bat脚本即可启动AMQ:
INFO | ActiveMQ 5.5.0 JMS Message Broker (localhost) is starting
......
INFO | Listening for connections at: tcp://SHI-AP33382A:61616
当看到上面的日志输出时,表示AMQ已经启动了;
4、默认情况下,AMQ使用conf/activemq.xml作为配置文件,我们可修改它,然后以 bin/activemq.bat xbean:./conf/my.xml启动AMQ;

三、持久化消息(MySQL)
因为接下来我们修改AMQ的默认配置文件,所以先备份conf/activemq.xml文件;
1、建立MySQL数据库:要使用MySQL存储消息,必须告诉AMQ数据源:
/**
* 创建数据库
*/
CREATE DATABASE misc DEFAULT CHARSET=UTF8;

/**
* 创建用户和授权
*/
GRANT ALL PRIVILEGES ON misc.* TO 'misc_root'@'%' IDENTIFIED BY 'misc_root_pwd';
GRANT ALL PRIVILEGES ON misc.* TO 'misc_root'@'localhost' IDENTIFIED BY 'misc_root_pwd';
通过上面的SQL脚本,我们建立了名为misc的数据库,并且把所有权限都赋予了misc_root的用户;
由于AMQ需要在本数据库中建立数据表,因此用户的权限必须具有建表权限;
2、添加MySQL数据源:默认情况下,AMQ使用KahaDB存储(我对KahaDB不了解),注释到KahaDB的配置方式,改为MySQL的:
复制代码
<!--
<persistenceAdapter>
    <kahaDB directory="${activemq.base}/data/kahadb"/>
</persistenceAdapter>
-->
<persistenceAdapter>
    <jdbcPersistenceAdapter dataSource="#MySQL-DS"/>
</persistenceAdapter>
复制代码
该配置表示,我们将要使用一个叫做“MySQL-DS”的JDBC数据源;
3、配置MySQL数据源:在</broker>节点后面,增加MySQL数据源配置:
复制代码
<!-- MySQL DataSource -->
<bean id="MySQL-DS" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://127.0.0.1:3306/misc?useUnicode=true&amp;characterEncoding=UTF-8"/>
    <property name="username" value="misc_root"/>
    <property name="password" value="misc_root_pwd"/>
    <property name="poolPreparedStatements" value="true"/>
</bean>
复制代码
其实这就是一个Spring的Bean的配置,注意id与上面的保持一致;

整个AMQ的配置文件内容为:
复制代码
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- Allows us to use system properties as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>classpath:/META-INF/credentials.properties</value>
        </property>
    </bean>

    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost">
        <!--
            For better performances use VM cursor and small memory limit. For more information, see: http://activemq.apache.org/message-cursors.html Also, if your producer is "hanging", it's probably due to producer flow control. For more information, see:
            http://activemq.apache.org/producer-flow-control.html
        -->
        <destinationPolicy>
            <policyMap>
                <policyEntries>
                    <policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb">
                        <pendingSubscriberPolicy>
                            <vmCursor />
                        </pendingSubscriberPolicy>
                    </policyEntry>
                    <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
                        <!--
                            Use VM cursor for better latency For more information, see: http://activemq.apache.org/message-cursors.html <pendingQueuePolicy> <vmQueueCursor/> </pendingQueuePolicy>
                        -->
                    </policyEntry>
                </policyEntries>
            </policyMap>
        </destinationPolicy>

        <!--
            The managementContext is used to configure how ActiveMQ is exposed in
            JMX. By default, ActiveMQ uses the MBean server that is started by
            the JVM. For more information, see:
          
            http://activemq.apache.org/jmx.html
        -->
        <managementContext>
            <managementContext createConnector="false" />
        </managementContext>

        <!--
            Configure message persistence for the broker. The default persistence
            mechanism is the KahaDB store (identified by the kahaDB tag).
            For more information, see:
          
            http://activemq.apache.org/persistence.html
        -->
        <!--
        <persistenceAdapter>
            <kahaDB directory="${activemq.base}/data/kahadb"/>
        </persistenceAdapter>
        -->
        <persistenceAdapter>
            <jdbcPersistenceAdapter dataSource="#MySQL-DS" />
        </persistenceAdapter>

        <!--
            The transport connectors expose ActiveMQ over a given protocol to
            clients and other brokers. For more information, see:
          
            http://activemq.apache.org/configuring-transports.html
        -->
        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616" />
        </transportConnectors>
    </broker>

    <!-- MySQL DataSource -->
    <bean id="MySQL-DS" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="com.mysql.jdbc.Driver" />
        <property name="url" value="jdbc:mysql://127.0.0.1:3306/misc?useUnicode=true&amp;characterEncoding=UTF-8" />
        <property name="username" value="misc_root" />
        <property name="password" value="misc_root_pwd" />
        <property name="poolPreparedStatements" value="true" />
    </bean>

    <!--
        Enable web consoles, REST and Ajax APIs and demos
        It also includes Camel (with its web console), see ${ACTIVEMQ_HOME}/conf/camel.xml for more info
      
        Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
    -->
    <import resource="jetty.xml"/>

</beans>
复制代码

四、查看MySQL数据表
重新启动AMQ,启动完成之后,我们发现,misc数据库多了3张数据表:
mysql> SHOW tables;
+----------------+
| Tables_in_misc |
+----------------+
| activemq_acks  |
| activemq_lock  |
| activemq_msgs  |
+----------------+
数据表activemq_msgs即为持久化消息表;

五、持久化消息
系统启动完毕之后,消息表中内容为空:
mysql> SELECT * FROM activemq_msgs;
Empty set
1、发送消息:打开http://127.0.0.1:8161/demo/页面,找到“Send a message”链接,打开页面(http://127.0.0.1:8161/demo/send.html),填写完表格后,点击“Send”按键,即AMQ投递了一个消息;
2、查看消息:发送之后,我们可以看到数据表中多了一条消息:
mysql> SELECT * FROM activemq_msgs;
+----+-----------------+--------------------------------------------+-----------+------------+-----+----------+
| ID | CONTAINER       | MSGID_PROD                                 | MSGID_SEQ | EXPIRATION | MSG | PRIORITY |
+----+-----------------+--------------------------------------------+-----------+------------+-----+----------+
|  1 | queue://FOO.BAR | ID:SHI-AP33382A-1486-1309840138441-2:2:1:1 |         1 |          0 | |        5 |
+----+-----------------+--------------------------------------------+-----------+------------+-----+----------+
3、取得消息:找到“Receive a message”链接,打开页面(http://127.0.0.1:8161/demo/message/FOO/BAR?readTimeout=10000&type=queue),发现该页面不是一个标准HTML页面,查看其源代码,其内容是不是就是刚才的消息内容?
4、查看消息:消息消费之后,我们可以看到数据表没有消息了:
mysql> SELECT * FROM activemq_msgs;
Empty set
5、我们可以生产多条消息,然后一条一条的消费,发现消息表中的消息一条一条的减少;
6、在发送消息页面,“Destination Type”如果选择“Topic”的话,则消息表中并没有数据,原因在于“Queue”为ptp模式消息,“Topic”为发布/订阅模式消息,当没有订阅者时,消息直接丢掉了。

JMS的内容先介绍到这里,下面我将结合Spring来启动AMQ(即AMQ与应用一同启动,上面介绍的都是单独的启动),通过测试代码来发送和消费消息,敬请期待!
转自http://www.cnblogs.com/obullxl/archive/2011/07/05/jms-activemq-persist-mysql.html
分享到:
评论
发表评论

文章已被作者锁定,不允许评论。

相关推荐

    activemq持久化jdbc所需jar包.zip

    标题中的"activemq持久化jdbc所需jar包.zip"指的是Apache ActiveMQ消息中间件在使用JDBC(Java Database Connectivity)进行消息持久化时所需的库文件集合。ActiveMQ是一款开源、高性能、跨语言的企业级消息代理,它...

    linux环境下ActiveMQ持久化、集群环境搭建详解

    Linux 环境下 ActiveMQ 持久化、集群环境搭建详解 在 Linux 环境下搭建 ActiveMQ 持久化和集群环境是一种复杂的任务,需要对 Linux 操作系统、Java 环境、ActiveMQ 等方面有深入的了解。以下是搭建 ActiveMQ 持久化...

    activeMQ mysql 持久化

    标题中的“ActiveMQ MySQL 持久化”指的是在使用ActiveMQ消息中间件时,将消息数据存储到MySQL数据库中以实现数据的持久化。ActiveMQ是Apache软件基金会的一个开源项目,它是一个功能丰富的消息代理,支持多种消息...

    ActiveMQ配置Mysql8为持久化方式所需Jar包.rar

    总结,配置ActiveMQ使用MySQL 8作为持久化方式涉及修改配置文件、添加必要的Jar包依赖,并确保数据库连接的正确性。这不仅增强了消息传递的可靠性,还便于数据库的管理和维护。在实际操作中,务必根据自己的环境调整...

    ActiveMQ订阅模式持久化实现

    **ActiveMQ订阅模式持久化实现** ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它遵循JMS(Java Message Service)规范,提供了多种消息传递模式,包括发布/订阅(Publish/Subscribe)模式。在发布/订阅...

    spring+activemq topic持久化订阅

    spring +activemq topic消息持久化订阅实例,整个项目中有activemq和spring的整合的所有实例,topic的持久化配置是在ApplicationContext3C、ApplicationContext3C2以及ApplicationContext3P三个中,消息生产者:...

    activemq-5.15.15 JDBC持久化mysql8.0+的activemq.xml.pdf

    标题中的“activemq-5.15.15 JDBC持久化mysql8.0+的activemq.xml”指的是Apache ActiveMQ的一个特定版本(5.15.15)配置文件,该配置文件用于实现消息队列的数据持久化,通过JDBC连接MySQL 8.0以上的版本。ActiveMQ...

    activemq消息持久化所需Jar包

    Apache ActiveMQ是业界广泛使用的开源消息中间件,它支持多种协议,如AMQP、STOMP、MQTT等,且提供了消息持久化功能,确保在系统故障后仍能恢复消息,保持数据完整性。本主题主要围绕“activemq消息持久化所需Jar包...

    ActiveMQ持久化机制代码实例

    在本文中,我们将深入探讨ActiveMQ的持久化机制,并通过代码实例来展示其工作原理。 ActiveMQ的持久化机制是为了确保在系统崩溃或重启后,未被消费的消息仍然能够被恢复并继续处理。这主要涉及到两个关键概念:非...

    ActiveMQ中Topic持久化Demo

    本篇主要围绕"ActiveMQ中Topic持久化Demo"进行深入探讨,旨在帮助读者理解如何在ActiveMQ中实现Topic的持久化。 ActiveMQ 是一个功能强大的消息代理,支持多种协议,包括 OpenWire、STOMP、AMQP 和 MQTT。它提供了...

    spring集成activemq演示queue和topic 持久化

    在本示例中,我们将深入探讨如何将Spring框架与ActiveMQ集成,以便实现消息队列(Queue)和主题(Topic)的功能,并确保消息的持久化。ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它支持多种消息协议,如...

    ActiveMQ的持久化(数据库)[归类].pdf

    4. **持久化级别**:ActiveMQ 允许用户选择不同的消息持久化级别,例如,可以选择仅持久化消息头,或者同时持久化消息头和正文。这可以根据性能和数据完整性需求进行调整。 5. **事务管理**:在 ActiveMQ 中,可以...

    activeMQ使用JDBC所需要的jar包

    在ActiveMQ中,为了实现消息持久化,我们通常会利用JDBC(Java Database Connectivity)来存储消息数据。这确保了即使在服务器重启或故障后,消息依然能够被恢复,从而保持系统的高可用性和可靠性。本主题将详细讲解...

    ActiveMQ开发规范及方案

    ActiveMQ提供了多种持久化方式,例如jdbc、kahadb等。 介绍 持久化是 ActiveMQ 的一个重要特性,用于确保消息的安全和可靠性。 持久化方式 ActiveMQ提供了多种持久化方式,例如jdbc、kahadb等。jdbc持久化方式是...

    消息和主题(持久化-非持久化)分类--持久化订阅

    标题中的“消息和主题(持久化-非持久化)分类--持久化订阅”指的是在分布式消息传递系统中,特别是基于发布/订阅模型的系统中,关于消息存储和处理的两种不同策略:持久化和非持久化订阅。在这个场景下,我们将深入...

    自己写的ActiveMQ的Demo例子

    1. **持久化到文件**:这是 ActiveMQ 默认的持久化方式,它将消息存储在文件系统中。通过修改 `activemq.xml` 配置文件,你可以配置 ActiveMQ 如何管理这些文件,例如设置数据存储路径、大小限制等。 2. **持久化到...

    7道消息队列ActiveMQ面试题!

    ActiveMQ的存储机制包括非持久化消息和持久化消息两种方式。非持久化消息存储在内存中,而持久化消息则存储在磁盘文件中。当内存中的非持久化消息过多时,ActiveMQ会将这些消息写入临时文件来腾出内存。如果磁盘空间...

    ActiveMQ的activemq.xml详细配置讲解

    对于与数据库的集成,如`activemq数据库,验证持久化标准配置.txt`所示,ActiveMQ支持使用JDBC进行持久化,确保在故障恢复时数据的完整性。 总之,`activemq.xml`配置文件是管理ActiveMQ核心行为的核心,通过细致地...

    Active mq jdbc持久化所需要的包.rar

    在ActiveMQ中,持久化是确保消息即使在服务器崩溃或重启后仍能被正确处理的关键特性。 JDBC(Java Database Connectivity)是Java编程语言中用于与数据库交互的一套标准API,它允许开发者通过统一的方式访问各种...

Global site tag (gtag.js) - Google Analytics