`
lettoo
  • 浏览: 35623 次
  • 性别: Icon_minigender_1
  • 来自: 合肥
博客专栏
58ccff5b-5ca6-387a-9c99-a277f31a9e51
我和Java数据库操作的那...
浏览量:9585
社区版块
存档分类
最新评论

Spring JMS和ActiveMQ的应用

阅读更多

    笔者近期参与一个分析log的项目。主要流程是:读取Log文件,对每一行Text解析成对应的Object,解析器会将多个Object存放到一个List中并发送到ActiveMQ的Queue中,即Queue中的一个Message即应一个Objects List。然后数据处理thread会consume存放在Queue中的Message,并将处理的结果保存到db。

 

    采用JMS来实现读取Log和分析Log之间的异步运行,使用ActiveMQ的可持久化的Queue,当Message被放进Queue中并持久化后,就会更新Log的读取进度,这样即使程序break down,也不会导致数据被漏掉。

 

    使用Spring来配置JMS,则可以简化和方便JMS的使用,同时还可以使用到Transaction Management。

 

    由于程序并不是很复杂,同时也不需要单独提供ActiveMQ server来运行,所以这里使用的是embed的方式。

 

    总的来说,程序运行起来之后,将会启动以下几个Service:

 

    1. JMS service,这个service会启动一个embed broker。

    2. Data Reader & Parser service,这个service会引用一个JMS Message Provider,用于发送Message到Queue中。

    3. 使用Message Listener Containers来监听Queue,并使用MDP(Message Drive POJO)的方法,来处理并消费掉Message。

 

    本文主要说明如何通过Spring的配置来实现JMS和ActiveMQ的应用,对于这个程序的其他的代码不涉及。

 

    第一步,JMS Service并start embed broker

public class JMSService extends AbstractService {

    private BrokerService broker;

    private String mqConfigFile = "xbean:activemq.xml";
    
    public void start() throws Exception {
        if(broker == null) {            
            broker = BrokerFactory.createBroker(new URI(mqConfigFile));
            broker.start();
            broker.waitUntilStarted();
        }
        super.start();
    }

    public void stop() throws Exception {
        if(broker != null && broker.isStarted()) {
            broker.stop();
            broker.waitUntilStopped();
        }
        super.stop();
    }

}

 

    这里使用BrokerService broker = BrokerFactory.createBroker(new URI(someURI));来创建一个broker,关于someURI的配置,详见这里:http://activemq.apache.org/broker-configuration-uri.html

    我这里的xbean:activemq.xml如下:

<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at
   
    http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- Allows us to use system properties as variables in this configuration 
        file -->
    <bean
        class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>mw.properties</value>
        </property>
    </bean>

    <!-- The <broker> element is used to configure the ActiveMQ broker. -->
    <broker useJmx="true" xmlns="http://activemq.apache.org/schema/core"
        brokerName="${activemq.broker.name}" dataDirectory="${activemq.broker.datadir}">
        <destinationPolicy>
            <policyMap>
                <policyEntries>
                    <policyEntry topic=">" producerFlowControl="true"
                        memoryLimit="10mb">
                        <pendingSubscriberPolicy>
                            <vmCursor />
                        </pendingSubscriberPolicy>
                    </policyEntry>
                    <policyEntry queue="mdp.queue." producerFlowControl="true"
                        memoryLimit="128mb">
                    </policyEntry>
                </policyEntries>
            </policyMap>
        </destinationPolicy>


        <!-- The managementContext is used to configure how ActiveMQ is exposed 
            in JMX. By default, ActiveMQ uses the MBean server that is started by the 
            JVM. For more information, see: http://activemq.apache.org/jmx.html -->
        <managementContext>
            <managementContext createConnector="true" />
        </managementContext>

        <!-- Configure message persistence for the broker. The default persistence 
            mechanism is the KahaDB store (identified by the kahaDB tag). For more information, 
            see: http://activemq.apache.org/persistence.html -->
        <persistenceAdapter>
            <kahaDB directory="${activemq.broker.datadir}/kahadb"
                journalMaxFileLength="96mb" indexWriteBatchSize="10000"
                indexCacheSize="100000" />
        </persistenceAdapter>

        <plugins>
            <!-- Configure authentication; Username, passwords and groups -->
            <simpleAuthenticationPlugin>
                <users>
                    <authenticationUser username="${activemq.userName}"
                        password="${activemq.password}" groups="users,admins" />
                    <authenticationUser username="admin" password="pass"
                        groups="users" />
                    <authenticationUser username="guest" password="pass"
                        groups="guests" />
                </users>
            </simpleAuthenticationPlugin>


            <!-- Lets configure a destination based authorization mechanism -->
            <authorizationPlugin>
                <map>
                    <authorizationMap>
                        <authorizationEntries>
                            <authorizationEntry queue=">" read="admins"
                                write="admins" admin="admins" />
                            <authorizationEntry queue="USERS.>" read="users"
                                write="users" admin="users" />
                            <authorizationEntry queue="GUEST.>" read="guests"
                                write="guests,users" admin="guests,users" />

                            <authorizationEntry queue="TEST.Q" read="guests"
                                write="guests" />

                            <authorizationEntry topic=">" read="admins"
                                write="admins" admin="admins" />
                            <authorizationEntry topic="USERS.>" read="users"
                                write="users" admin="users" />
                            <authorizationEntry topic="GUEST.>" read="guests"
                                write="guests,users" admin="guests,users" />

                            <authorizationEntry topic="ActiveMQ.Advisory.>"
                                read="guests,users" write="guests,users" admin="guests,users" />
                        </authorizationEntries>
                    </authorizationMap>
                </map>
            </authorizationPlugin>
        </plugins>

        <!-- The systemUsage controls the maximum amount of space the broker will 
            use before slowing down producers. For more information, see: http://activemq.apache.org/producer-flow-control.html -->
        <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage limit="2048 mb" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="32 gb" name="mdp" />
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="1024 mb" />
                </tempUsage>
            </systemUsage>
        </systemUsage>


        <!-- The transport connectors expose ActiveMQ over a given protocol to 
            clients and other brokers. For more information, see: http://activemq.apache.org/configuring-transports.html -->
        <transportConnectors>
            <transportConnector name="openwire" uri="${activemq.broker.url}" />
        </transportConnectors>

    </broker>

    <!-- Uncomment to enable Camel Take a look at activemq-camel.xml for more 
        details <import resource="camel.xml"/> -->

    <!-- Enable web consoles, REST and Ajax APIs and demos Take a look at activemq-jetty.xml 
        for more details <import resource="jetty.xml"/> -->
</beans>
 

    第二步,配置JMS ConnectionFactory和Destination

	<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL">
			<value>vm://localhost:61616</value>
		</property>
	</bean>

	<!-- Queue -->
	<bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg index="0">
			<value>UserMessageQueue</value>
		</constructor-arg>
	</bean>

 

    第三步,配置JMS Template,以及Message converter

	<!-- Spring JmsTemplate config -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory">
			<!-- lets wrap in a pool to avoid creating a connection per send -->
			<bean class="org.springframework.jms.connection.SingleConnectionFactory">
				<property name="targetConnectionFactory" ref="jmsConnectionFactory" />
			</bean>
		</property>
		<!-- custom MessageConverter -->
		<!-- no message converter configured, so it'll use the SimpleMessageConverter  -->
	</bean>


    注意,这里使用了一个 SingleConnectionFactory,这是spring对ConnectionFactory的一个实现,这个实现会在调用createConnection的地方返回相同的Connection并且忽略所有的close()方法,这样多个JMSTemplate可以共用一个相同的connection,避免每次都重复创建connection造成资源的浪费。

    并且,我没有配置MessageConverter,而且使用Spring自带的SimpleMessageConverter,这是默认选项,不配就是使用SimpleMessageConverter。

 

    第四步,配置JMS Message producer

	<!-- POJO which send Message uses Spring JmsTemplate -->
	<bean id="userMessageProducer" class="cn.lettoo.meetingwatch.jms.UserMessageProducer">
		<property name="template" ref="jmsTemplate" />
		<property name="destination" ref="destinationQueue" />
	</bean>

    Message Producer通过JMS Template和destination来发送Message。下面是代码实现:

public class UserMessageProducer implements IMessageProducer {

    protected JmsTemplate template; 

    protected Queue destination; 

    public void setTemplate(JmsTemplate template) { 
       this.template = template; 
    } 

    public void setDestination(Queue destination) { 
       this.destination = destination; 
    }

    public void send(Object object) {
        template.convertAndSend(this.destination, object); 
    } 

}

 因为我并没有为JMS template指定特定的MessageConverter,所以这里template.convertAndSend()应该就是SimpleMessageConverter.convertAndSend()来实现的。

 

    第五步,配置JMS Message Consumer

    我并没有在spring的配置文件单独配置一个Message Consumer,这是一个非常简单的POJO,这里只是为测试写的一个简单示例:只是简单把读到的Message打印出来。

public class UserMessageConsumer {

    public void printUser(User user) {
        user.getId();
        user.getName();
        System.out.println(user);
    }

}
 

    第六步,配置JMS Message Listener Container以及Listerer

	<!-- Message Driven POJO -->
	<bean id="messageListener"
		class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
		<constructor-arg>
			<bean class="cn.lettoo.meetingwatch.jms.UserMessageConsumer">
			</bean>
		</constructor-arg>
		<property name="defaultListenerMethod" value="printUser" />
	</bean>

	<!-- listener container -->
	<bean id="listenerContainer"
		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="jmsConnectionFactory" />
		<property name="destination" ref="destinationQueue" />
		<property name="messageListener" ref="messageListener" />
	</bean>

    这里使用org.springframework.jms.listener.adapter.MessageListenerAdapter来实现一个Listener,这样做的好处是,可以使用非常简单的POJO来作为一个consumer,就是上面的UserMessageConsumer一样。当Queue里有一条Message的时候,会使用UserMessageConsumer的printUser来消费掉这条Message。

    Spring JMS提供3种ListenerContainer,最常使用的是DefaultMessageListenerContainer。这里把Listener注入,来实现对Message的Listener。

 

参考:

1. Chapter 19. JMS (Java Message Service)

2. ActiveMQ5.0实战三:使用Spring发送,消费topic和queue消息

分享到:
评论

相关推荐

    SpringJMS整合ActiveMQ

    详细内容: SpringJMS整合ActiveMQ.doc 详细说明文档 apache-activemq-5.8.0-bin.zip ActiveMQ安装包 JMSTest.rar MyEclipse8.5下web工程

    spring整合jms+activemq

    综上所述,Spring整合JMS和ActivemQ提供了一套完整的解决方案,帮助开发者轻松地在应用中实现消息的发送和接收。通过这种方式,可以构建出高可用、松耦合、可扩展的分布式系统,提高系统的稳定性和响应速度。在实际...

    Spring+JMS+ActiveMQ+Tomcat实现消息服务的demo

    基于Spring+JMS+ActiveMQ+Tomcat,我使用的版本情况如下所示:Spring 3.2.0,ActiveMQ 5.4.3,Tomcat 6.0.43。本例通过详细的说明和注释,实现消息服务的基本功能:发送与接收。Spring对JMS提供了很好的支持,可以...

    Spring 实现远程访问详解——jms和activemq

    本章我将通过spring jms和activemq实现单Web项目服务器间异步访问和多Web项目服务器间异步访问。 一. 简介 1. 什么是Apache ActiveMq Apache ActiveMq是最流行和最强大的开源消息和集成服务器。同时Apache ActiveMq...

    jms Spring+ActiveMQ 5.4.2

    标题 "jms Spring+ActiveMQ 5.4.2" 涉及的是Java消息服务(JMS)在Spring框架中的应用,以及ActiveMQ作为消息代理的使用。在这个主题下,我们将深入探讨JMS的基本概念、Spring对JMS的支持以及ActiveMQ 5.4.2版本的...

    Spring+JMS+ActiveMQ+Tomcat jar下载

    Spring、JMS、ActiveMQ和Tomcat是Java开发中常用的技术组件,它们分别扮演着不同的角色,构建出高效的企业级消息通信系统。本教程将详细阐述这些技术的使用及其相互结合的方式。 首先,Spring是一个开源的Java应用...

    spring+jms+activemq

    在IT行业中,Spring框架...Spring简化了JMS的集成和管理,ActiveMQ作为强大的消息中间件,保证了消息的稳定传输。通过理解和掌握这一技术栈,开发者可以构建出高可用、松耦合的应用系统,提高系统的整体性能和稳定性。

    SpringJMS示例代码

    集成SpringJMS和ActiveMQ首先需要在Spring配置中定义ConnectionFactory和Destination(Topic或Queue)。ConnectionFactory是JMS客户端用来创建与消息服务器的连接的工厂,而Destination是消息的目的地。Spring的`...

    JMS之Spring +activeMQ实现消息队列

    1. 添加依赖:在项目的Maven或Gradle配置文件中引入ActiveMQ和Spring JMS的相关库。 2. 配置ConnectionFactory:这是连接到ActiveMQ服务器的关键,可以通过XML配置或Java配置来定义。 3. 创建Destination:JMS中的...

    基于Spring+JMS+ActiveMQ+Tomcat整合

    基于Spring+JMS+ActiveMQ+Tomcat,做一个Spring4.1.0和ActiveMQ5.11.1整合实例,实现了Point-To-Point的异步队列消息和PUB/SUB(发布/订阅)模型,简单实例,不包含任何业务。

    jms+activeMq+spring学习简单例子

    标题“jms+activeMq+spring学习简单例子”表明这个压缩包包含了一些示例代码,用于演示如何在Spring框架中集成JMS和ActiveMQ,以便于理解和学习。通过这个例子,开发者可以了解如何在实际应用中实现基于消息的通信。...

    基于Spring+JMS+ActiveMQ+Tomcat的整合ActiveMQSpringDemo实例源码.zip

    基于Spring+JMS+ActiveMQ+Tomcat的整合ActiveMQSpringDemo实例源码,此实例基于Spring+JMS+ActiveMQ+Tomcat,注解的完整实例,包含jar包,可供学习及设计参考。

    MQ、JMS以及ActiveMQ关系的理解

    ActiveMQ的主要特点包括支持多种编程语言和协议的客户端、完全支持JMS 1.1和J2EE 1.4规范、对Spring的支持、支持多种传输协议以及持久化和事务处理能力。 在实际的项目中,消息队列经常被用于将一些耗时的操作,如...

    Spring和ActiveMQ整合的完整实例

    将Spring与ActiveMQ整合,可以轻松地在Spring应用中实现消息队列的功能,提高系统的可扩展性和可靠性。 首先,让我们了解Spring框架如何支持消息传递。Spring提供了JmsTemplate类,这是一个模板类,用于简化发送和...

    JMS-activemq 实例(分ppt,eclipse工程,说明三部分)

    **JMS(Java Message Service)** 是Java平台中用于企业级消息传递的一种API,它提供了一种标准的方式来...通过深入学习和实践,你可以理解如何在Java应用中使用ActiveMQ进行消息传递,并利用Spring框架简化这一过程。

    spring整合JMS-居于ActiveMQ实现

    Spring整合JMS基于ActiveMQ实现是一项常见的企业级应用开发任务,它涉及到Spring框架、Java消息服务(JMS)以及ActiveMQ消息中间件的使用。在本文中,我们将深入探讨这三个关键概念,以及如何将它们有效地结合在一起...

    JMS之ActiveMQ与Spring整合源码

    通过以上知识点的讲解,我们可以看到,ActiveMQ与Spring的整合使得在Java应用中使用JMS变得更加简单和高效。无论是消息的生产还是消费,都能通过Spring的抽象和ActiveMQ的稳定性能得到很好的支持。在实际项目中,...

    Spring和ActiveMQ的整合实例源码

    10. **Tomcat服务器**:Tomcat是一个流行的Java Web服务器,它可以部署和运行使用Spring和ActiveMQ的Web应用程序。 通过上述知识点,我们可以理解如何在Spring环境中利用ActiveMQ进行消息传递,实现高并发、解耦的...

    Spring+JMS+ActiveMQ+Tomcat实现消息服务_服务器应用

    ### Spring+JMS+ActiveMQ+Tomcat 实现消息服务 #### 一、技术栈介绍 在本案例中,我们采用的技术栈为Spring 2.5、ActiveMQ 5.4.0 和 Tomcat 6.0.30。这些技术的结合能够有效地构建一个可靠的消息传递系统。 - **...

    spring2 activemq5 tomcat6构建jms

    标题“spring2 ...分析这些文件可以帮助理解如何将Spring、ActiveMQ和Tomcat集成在一起,实现消息传递功能。学习这个例子可以加深对JMS、Spring集成以及Web应用部署的理解,对于提升Java EE开发技能非常有帮助。

Global site tag (gtag.js) - Google Analytics