`

ActiveMQ的持久化(数据库)

阅读更多

   利用消息队列的异步策略,可以从很大程序上缓解程序的压力,但是,如果MQ所在的机器down机了,又如果队列中的数据不是持久的就会发生数据丢失,后果是可想而知的, 所以消息的持久化是不可不讨论的话题。

 

   1) 关于ActiveMQ消息队列的持久化,主要是在ActiveMQ的配置文件中设置(看粗体部分).

 

<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at
   
    http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
-->
<!-- START SNIPPET: example -->
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd   
  http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">

    <!-- Allows us to use system properties as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
         <property name="locations">
            <value>file:///${activemq.base}/conf/credentials.properties</value>
         </property>      
    </bean>

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.base}/data">

        <!-- Destination specific policies using destination names or wildcards -->
        <destinationPolicy>
            <policyMap>
                <policyEntries>
                    <policyEntry queue=">" memoryLimit="5mb"/>
                    <policyEntry topic=">" memoryLimit="5mb">
                      <!-- you can add other policies too such as these
                        <dispatchPolicy>
                            <strictOrderDispatchPolicy/>
                        </dispatchPolicy>
                        <subscriptionRecoveryPolicy>
                            <lastImageSubscriptionRecoveryPolicy/>
                        </subscriptionRecoveryPolicy>
                      -->
                    </policyEntry>
                </policyEntries>
            </policyMap>
        </destinationPolicy>

        <!-- Use the following to configure how ActiveMQ is exposed in JMX -->
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        <!-- The store and forward broker networks ActiveMQ will listen to -->
        <networkConnectors>
            <!-- by default just auto discover the other brokers -->
            <networkConnector name="default-nc" uri="multicast://default"/>
            <!-- Example of a static configuration:
            <networkConnector name="host1 and host2" uri="static://(tcp://host1:61616,tcp://host2:61616)"/>
            -->
        </networkConnectors>

        <persistenceAdapter>
            <amqPersistenceAdapter syncOnWrite="false" directory="${activemq.base}/data" maxFileLength="20 mb"/>
        </persistenceAdapter>

        <!-- Use the following if you wish to configure the journal with JDBC -->
        <!--
        <persistenceAdapter>
            <journaledJDBC dataDirectory="${activemq.base}/data" dataSource="#postgres-ds"/>
        </persistenceAdapter>
        -->

        <!-- Or if you want to use pure JDBC without a journal -->
        
        <persistenceAdapter>
            <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
        </persistenceAdapter>
     

        <sslContext>
            <sslContext keyStore="file:${activemq.base}/conf/broker.ks" keyStorePassword="password" trustStore="file:${activemq.base}/conf/broker.ts" trustStorePassword="password"/>
        </sslContext>
        
        <!--  The maximum about of space the broker will use before slowing down producers -->
        <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage limit="20 mb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="1 gb" name="foo"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="100 mb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>


        <!-- The transport connectors ActiveMQ will listen to -->
        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>
            <transportConnector name="ssl" uri="ssl://localhost:61617"/>
            <transportConnector name="stomp" uri="stomp://localhost:61613"/>
            <transportConnector name="xmpp" uri="xmpp://localhost:61222"/>
        </transportConnectors>

    </broker>

    <!--
    ** Lets deploy some Enterprise Integration Patterns inside the ActiveMQ Message Broker
    ** For more details see
    **
    ** http://activemq.apache.org/enterprise-integration-patterns.html
    -->
    <camelContext id="camel" xmlns="http://activemq.apache.org/camel/schema/spring">

        <!-- You can use a <package> element for each root package to search for Java routes -->
        <package>org.foo.bar</package>

        <!-- You can use Spring XML syntax to define the routes here using the <route> element -->
        <route>
            <from uri="activemq:example.A"/>
            <to uri="activemq:example.B"/>
        </route>
    </camelContext>

    <!--
    ** Lets configure some Camel endpoints
    **
    ** http://activemq.apache.org/camel/components.html
    -->

    <!-- configure the camel activemq component to use the current broker -->
    <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent" >
        <property name="connectionFactory">
          <bean class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="vm://localhost?create=false&amp;waitForStart=10000" />
            <property name="userName" value="${activemq.username}"/>
            <property name="password" value="${activemq.password}"/>
          </bean>
        </property>
    </bean>



    <!-- Uncomment to create a command agent to respond to message based admin commands on the ActiveMQ.Agent topic -->
    <!--
    <commandAgent xmlns="http://activemq.apache.org/schema/core" brokerUrl="vm://localhost" username="${activemq.username}" password="${activemq.password}"/>
    -->


    <!-- An embedded servlet engine for serving up the Admin console -->
    <jetty xmlns="http://mortbay.com/schemas/jetty/1.0">
        <connectors>
            <nioConnector port="8161"/>
        </connectors>

        <handlers>
            <webAppContext contextPath="/admin" resourceBase="${activemq.base}/webapps/admin" logUrlOnStart="true"/>
            <webAppContext contextPath="/demo" resourceBase="${activemq.base}/webapps/demo" logUrlOnStart="true"/>
            <webAppContext contextPath="/fileserver" resourceBase="${activemq.base}/webapps/fileserver" logUrlOnStart="true"/>
        </handlers>
    </jetty>

    <!--  This xbean configuration file supports all the standard spring xml configuration options -->

    <!-- Postgres DataSource Sample Setup -->
    <!--
    <bean id="postgres-ds" class="org.postgresql.ds.PGPoolingDataSource">
      <property name="serverName" value="localhost"/>
      <property name="databaseName" value="activemq"/>
      <property name="portNumber" value="0"/>
      <property name="user" value="activemq"/>
      <property name="password" value="activemq"/>
      <property name="dataSourceName" value="postgres"/>
      <property name="initialConnections" value="1"/>
      <property name="maxConnections" value="10"/>
    </bean>
    -->

    <!-- MySql DataSource Sample Setup -->
    
    <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
      <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
      <property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/>
      <property name="username" value="root"/>
      <property name="password" value=""/>
      <property name="maxActive" value="200"/>
      <property name="poolPreparedStatements" value="true"/>
    </bean>
 
    
    <!-- Oracle DataSource Sample Setup -->
    <!--
    <bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
      <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>
      <property name="url" value="jdbc:oracle:thin:@localhost:1521:AMQDB"/>
      <property name="username" value="scott"/>
      <property name="password" value="tiger"/>
      <property name="maxActive" value="200"/>
      <property name="poolPreparedStatements" value="true"/>
    </bean>
    -->

    <!-- Embedded Derby DataSource Sample Setup -->
    <!--
    <bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">
      <property name="databaseName" value="derbydb"/>
      <property name="createDatabase" value="create"/>
    </bean>
    -->

</beans>
<!-- END SNIPPET: example -->

 

    改动部分主要是设置了mysql的datasource声明, 还有就是采用mysql作为persistenceAdapter,并声明如下。

 

  <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
      <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
      <property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/>
      <property name="username" value="root"/>
      <property name="password" value=""/>
      <property name="maxActive" value="200"/>
      <property name="poolPreparedStatements" value="true"/>
    </bean>



  <persistenceAdapter>
            <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
  </persistenceAdapter>

 

  

 

 

 

 2) 把数据库的驱动放入ActiveMQ的lib中,使其能够访问相应的数据库,关于数据库的表结构,ActiveMQ会自动创建,但是前提是当ActiveMQ启动以后,声明的数据库要是存在的。

    测试的时候发现以上条件都满足ActiveMQ还是会抛异常,看了一下异常,是有一张表(activemq_acks)创建的时候出了问题,自己手动创建后好了,把表结构列出来。

 

-- phpMyAdmin SQL Dump
-- version 2.10.2
-- http://www.phpmyadmin.net
-- 
-- 主机: localhost
-- 生成日期: 2009 年 11 月 06 日 05:29
-- 服务器版本: 5.0.45
-- PHP 版本: 5.2.3

SET SQL_MODE="NO_AUTO_VALUE_ON_ZERO";

-- 
-- 数据库: `activemq`
-- 

-- --------------------------------------------------------

-- 
-- 表的结构 `activemq_acks`
-- 

CREATE TABLE `activemq_acks` (
  `SUB` varchar(250) collate utf8_bin NOT NULL,
  `CONTAINER` varchar(250) collate utf8_bin NOT NULL,
  `LAST_ACKED_ID` int(11) default NULL,
  `SE_ID` int(11) default NULL,
  `SE_CLIENT_ID` varchar(250) collate utf8_bin default NULL,
  `SE_CONSUMER_NAME` varchar(250) collate utf8_bin default NULL,
  `SE_SELECTOR` varchar(250) collate utf8_bin default NULL
) ENGINE=MyISAM DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- 
-- 导出表中的数据 `activemq_acks`
-- 


-- --------------------------------------------------------

-- 
-- 表的结构 `activemq_lock`
-- 

CREATE TABLE `activemq_lock` (
  `ID` bigint(20) NOT NULL,
  `TIME` bigint(20) default NULL,
  `BROKER_NAME` varchar(250) collate utf8_bin default NULL,
  PRIMARY KEY  (`ID`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- 
-- 导出表中的数据 `activemq_lock`
-- 

INSERT INTO `activemq_lock` (`ID`, `TIME`, `BROKER_NAME`) VALUES 
(1, 1257485355546, NULL);

-- --------------------------------------------------------

-- 
-- 表的结构 `activemq_msgs`
-- 

CREATE TABLE `activemq_msgs` (
  `ID` int(11) NOT NULL,
  `CONTAINER` varchar(250) collate utf8_bin default NULL,
  `MSGID_PROD` varchar(250) collate utf8_bin default NULL,
  `MSGID_SEQ` int(11) default NULL,
  `EXPIRATION` bigint(20) default NULL,
  `MSG` longblob,
  PRIMARY KEY  (`ID`),
  KEY `ACTIVEMQ_MSGS_MIDX` (`MSGID_PROD`,`MSGID_SEQ`),
  KEY `ACTIVEMQ_MSGS_CIDX` (`CONTAINER`),
  KEY `ACTIVEMQ_MSGS_EIDX` (`EXPIRATION`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- 
-- 导出表中的数据 `activemq_msgs`
-- 

 

当消息发送至ActiveMQ时,数据就被持久化到mysql了,如果消息被消费,数据会自动被删除,down机后重启没影响,有一点不好的是,这个有点拖数据库,我在本地的mysql,一开启ActiveMQ, 数据库就会变得很慢,不过这个只是在本地的机子上,想必实际应用时应该好很多。

3
0
分享到:
评论

相关推荐

    activemq持久化jdbc所需jar包.zip

    标题中的"activemq持久化jdbc所需jar包.zip"指的是Apache ActiveMQ消息中间件在使用JDBC(Java Database Connectivity)进行消息持久化时所需的库文件集合。ActiveMQ是一款开源、高性能、跨语言的企业级消息代理,它...

    linux环境下ActiveMQ持久化、集群环境搭建详解

    ActiveMQ 持久化是指将消息队列持久化到数据库或文件中,以便在断电或崩溃后恢复消息队列。可以使用 Apache ActiveMQ 的持久化机制,例如使用 KahaDB 或 AMQP 等。 集群环境 ActiveMQ 集群环境是指多个 ActiveMQ ...

    activeMQ mysql 持久化

    标题中的“ActiveMQ MySQL 持久化”指的是在使用ActiveMQ消息中间件时,将消息数据存储到MySQL数据库中以实现数据的持久化。ActiveMQ是Apache软件基金会的一个开源项目,它是一个功能丰富的消息代理,支持多种消息...

    ActiveMQ配置Mysql8为持久化方式所需Jar包.rar

    - ActiveMQ允许用户选择不同的持久化机制,包括文件系统(KahaDB)和关系数据库(如MySQL)。 - 持久化确保了消息在服务器崩溃或重启时不会丢失,增强了系统的可靠性。 2. **配置MySQL8**: - 首先,你需要在...

    ActiveMQ的持久化(数据库)[归类].pdf

    4. **持久化级别**:ActiveMQ 允许用户选择不同的消息持久化级别,例如,可以选择仅持久化消息头,或者同时持久化消息头和正文。这可以根据性能和数据完整性需求进行调整。 5. **事务管理**:在 ActiveMQ 中,可以...

    ActiveMQ持久化.docx

    **JDBC** 方式允许 ActiveMQ 将消息数据持久化到关系型数据库中,提供了高度的灵活性和可扩展性。 ##### 配置 1. **定义数据源**:首先需要在 `activemq.xml` 文件中定义一个数据源(如 MySQL 数据源)。 2. **配置...

    activemq-5.15.15 JDBC持久化mysql8.0+的activemq.xml.pdf

    总结来说,这个配置文件展示了如何配置ActiveMQ 5.15.15使用JDBC和MySQL 8.0+进行消息持久化,以及解决XML中特殊字符转义的问题,以确保ActiveMQ能够正确地连接并使用MySQL数据库存储和检索消息。这种配置适用于需要...

    activemq消息持久化所需Jar包

    3. **数据库连接驱动**:ActiveMQ支持多种持久化策略,其中一种是使用关系型数据库(如MySQL、PostgreSQL)来存储消息。因此,根据你选择的数据库,需要引入相应的数据库驱动Jar包,例如`mysql-connector-java.jar`...

    spring集成activemq演示queue和topic 持久化

    你可以通过修改`activemq.xml`配置文件来设置持久化存储,比如将消息存储到MySQL数据库。在配置中指定数据源,如下所示: ```xml ${activemq.data} &lt;driverClassName&gt;...

    activeMQ使用JDBC所需要的jar包

    总结来说,使用ActiveMQ的JDBC持久化需要合适的jar包支持,包括ActiveMQ自身的JDBC相关库、数据库驱动和连接池库。正确配置这些组件并设置好数据源,才能确保ActiveMQ能有效地利用JDBC进行消息持久化。

    activemq-oracle.rar

    描述中提到的“ActiveMQ持久化所需jar包”意味着这些jar文件是用于确保ActiveMQ的消息持久化的关键组件。持久化是确保即使在系统崩溃或断电后,消息仍然能够被正确处理的重要特性。以下是每个jar文件的功能: 1. **...

    Active mq jdbc持久化所需要的包.rar

    总之,"Active mq jdbc持久化所需要的包.rar"提供了使用MySQL数据库进行ActiveMQ消息持久化的必要组件。正确配置并使用这些组件,可以在保持消息可靠性的同时,利用数据库的优势来管理和保护消息数据。

    3.1 JDBC消息存储持久化jdbc persistenceFactory高速缓存1

    在ActiveMQ中,消息持久化和高速缓存是两个关键概念,它们确保了即使在服务器重启或网络故障等情况下,消息仍然能够被正确地存储和检索。本篇将详细讲解如何利用JDBC实现ActiveMQ的消息存储持久化以及高速缓存。 1....

    自己写的ActiveMQ的Demo例子

    ActiveMQ 提供了两种主要的持久化机制:持久化到文件和持久化到数据库。 1. **持久化到文件**:这是 ActiveMQ 默认的持久化方式,它将消息存储在文件系统中。通过修改 `activemq.xml` 配置文件,你可以配置 ...

    ActiveMQ的activemq.xml详细配置讲解

    对于与数据库的集成,如`activemq数据库,验证持久化标准配置.txt`所示,ActiveMQ支持使用JDBC进行持久化,确保在故障恢复时数据的完整性。 总之,`activemq.xml`配置文件是管理ActiveMQ核心行为的核心,通过细致地...

    activemq-5.15+mysqljdbc配置.zip

    总结来说,"activemq-5.15+mysqljdbc配置.zip"提供了ActiveMQ的一个定制化版本,它集成了MySQL数据库作为持久化存储,并使用Durid作为连接池。这种配置适用于那些需要强大数据持久化和恢复能力的场景,但也需要对...

    ActiveMQ5.12.1 安装与配置.docx

    - 发送一条消息到该队列,确保消息能够正确地持久化到数据库中。 2. **环境信息** - 操作系统: Linux Ubuntu 3.19.0-15-generic #15-Ubuntu - ActiveMQ版本: ActiveMQ 5.12.1 - 数据库: MySQL 5.6.27 - JDK...

    最新稳定版ActiveMQ5.15.0

    8. **持久化机制**:ActiveMQ支持多种持久化机制,包括本地文件系统、LevelDB、JDBC数据库等,可以根据需求选择适合的策略。 9. **消息优先级**:在某些应用场景中,消息的优先级是重要的。ActiveMQ允许为消息分配...

    apache-activemq-5.5.0

    KahaDB是ActiveMQ自有的轻量级数据库,提供了高效的日志管理和持久化,而LevelDB则是Google开发的一个键值对存储系统,提供更快的读写性能。 此外,ActiveMQ还支持主题(Topic)和队列(Queue)两种消息模式。主题...

    ActiveMQ消息服务器 v6.0.1.zip

    同时,它提供多种持久化机制,如文件系统、数据库存储,确保即使在服务中断后也能恢复消息。 5. 安全性:ActiveMQ内置了用户认证和授权机制,可以通过SSL/TLS加密传输,保障数据的安全性。 二、ActiveMQ应用场景 1....

Global site tag (gtag.js) - Google Analytics