- 浏览: 35408 次
- 性别:
- 来自: 合肥
博客专栏
-
我和Java数据库操作的那...
浏览量:9540
最新评论
-
ivanlw:
楼主用的这个snapito好神奇啊……是直接把图片链接设置成他 ...
ibookmark.me上线了! -
succinite:
使用TortoiseGit, 出现以下错误。fatal: ht ...
一个简单的JAVA后台程序框架 -
mazhiyuan:
引用很显然,之前所说的梦想,并非是真正心中所想,而只不过想“找 ...
也谈梦想 -
cevin15:
说到我心里去了。现在处于离职状态。也是对前途一片迷茫~
也谈梦想 -
lwjlaser:
<div class="quote_title ...
我和JAVA数据库操作的那些事儿(3)
笔者近期参与一个分析log的项目。主要流程是:读取Log文件,对每一行Text解析成对应的Object,解析器会将多个Object存放到一个List中并发送到ActiveMQ的Queue中,即Queue中的一个Message即应一个Objects List。然后数据处理thread会consume存放在Queue中的Message,并将处理的结果保存到db。
采用JMS来实现读取Log和分析Log之间的异步运行,使用ActiveMQ的可持久化的Queue,当Message被放进Queue中并持久化后,就会更新Log的读取进度,这样即使程序break down,也不会导致数据被漏掉。
使用Spring来配置JMS,则可以简化和方便JMS的使用,同时还可以使用到Transaction Management。
由于程序并不是很复杂,同时也不需要单独提供ActiveMQ server来运行,所以这里使用的是embed的方式。
总的来说,程序运行起来之后,将会启动以下几个Service:
1. JMS service,这个service会启动一个embed broker。
2. Data Reader & Parser service,这个service会引用一个JMS Message Provider,用于发送Message到Queue中。
3. 使用Message Listener Containers来监听Queue,并使用MDP(Message Drive POJO)的方法,来处理并消费掉Message。
本文主要说明如何通过Spring的配置来实现JMS和ActiveMQ的应用,对于这个程序的其他的代码不涉及。
第一步,JMS Service并start embed broker
public class JMSService extends AbstractService { private BrokerService broker; private String mqConfigFile = "xbean:activemq.xml"; public void start() throws Exception { if(broker == null) { broker = BrokerFactory.createBroker(new URI(mqConfigFile)); broker.start(); broker.waitUntilStarted(); } super.start(); } public void stop() throws Exception { if(broker != null && broker.isStarted()) { broker.stop(); broker.waitUntilStopped(); } super.stop(); } }
这里使用BrokerService broker = BrokerFactory.createBroker(new URI(someURI));来创建一个broker,关于someURI的配置,详见这里:http://activemq.apache.org/broker-configuration-uri.html
我这里的xbean:activemq.xml如下:
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <!-- Allows us to use system properties as variables in this configuration file --> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <value>mw.properties</value> </property> </bean> <!-- The <broker> element is used to configure the ActiveMQ broker. --> <broker useJmx="true" xmlns="http://activemq.apache.org/schema/core" brokerName="${activemq.broker.name}" dataDirectory="${activemq.broker.datadir}"> <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" producerFlowControl="true" memoryLimit="10mb"> <pendingSubscriberPolicy> <vmCursor /> </pendingSubscriberPolicy> </policyEntry> <policyEntry queue="mdp.queue." producerFlowControl="true" memoryLimit="128mb"> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> <!-- The managementContext is used to configure how ActiveMQ is exposed in JMX. By default, ActiveMQ uses the MBean server that is started by the JVM. For more information, see: http://activemq.apache.org/jmx.html --> <managementContext> <managementContext createConnector="true" /> </managementContext> <!-- Configure message persistence for the broker. The default persistence mechanism is the KahaDB store (identified by the kahaDB tag). For more information, see: http://activemq.apache.org/persistence.html --> <persistenceAdapter> <kahaDB directory="${activemq.broker.datadir}/kahadb" journalMaxFileLength="96mb" indexWriteBatchSize="10000" indexCacheSize="100000" /> </persistenceAdapter> <plugins> <!-- Configure authentication; Username, passwords and groups --> <simpleAuthenticationPlugin> <users> <authenticationUser username="${activemq.userName}" password="${activemq.password}" groups="users,admins" /> <authenticationUser username="admin" password="pass" groups="users" /> <authenticationUser username="guest" password="pass" groups="guests" /> </users> </simpleAuthenticationPlugin> <!-- Lets configure a destination based authorization mechanism --> <authorizationPlugin> <map> <authorizationMap> <authorizationEntries> <authorizationEntry queue=">" read="admins" write="admins" admin="admins" /> <authorizationEntry queue="USERS.>" read="users" write="users" admin="users" /> <authorizationEntry queue="GUEST.>" read="guests" write="guests,users" admin="guests,users" /> <authorizationEntry queue="TEST.Q" read="guests" write="guests" /> <authorizationEntry topic=">" read="admins" write="admins" admin="admins" /> <authorizationEntry topic="USERS.>" read="users" write="users" admin="users" /> <authorizationEntry topic="GUEST.>" read="guests" write="guests,users" admin="guests,users" /> <authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users" write="guests,users" admin="guests,users" /> </authorizationEntries> </authorizationMap> </map> </authorizationPlugin> </plugins> <!-- The systemUsage controls the maximum amount of space the broker will use before slowing down producers. For more information, see: http://activemq.apache.org/producer-flow-control.html --> <systemUsage> <systemUsage> <memoryUsage> <memoryUsage limit="2048 mb" /> </memoryUsage> <storeUsage> <storeUsage limit="32 gb" name="mdp" /> </storeUsage> <tempUsage> <tempUsage limit="1024 mb" /> </tempUsage> </systemUsage> </systemUsage> <!-- The transport connectors expose ActiveMQ over a given protocol to clients and other brokers. For more information, see: http://activemq.apache.org/configuring-transports.html --> <transportConnectors> <transportConnector name="openwire" uri="${activemq.broker.url}" /> </transportConnectors> </broker> <!-- Uncomment to enable Camel Take a look at activemq-camel.xml for more details <import resource="camel.xml"/> --> <!-- Enable web consoles, REST and Ajax APIs and demos Take a look at activemq-jetty.xml for more details <import resource="jetty.xml"/> --> </beans>
第二步,配置JMS ConnectionFactory和Destination
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>vm://localhost:61616</value> </property> </bean> <!-- Queue --> <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0"> <value>UserMessageQueue</value> </constructor-arg> </bean>
第三步,配置JMS Template,以及Message converter
<!-- Spring JmsTemplate config --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory"> <!-- lets wrap in a pool to avoid creating a connection per send --> <bean class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="jmsConnectionFactory" /> </bean> </property> <!-- custom MessageConverter --> <!-- no message converter configured, so it'll use the SimpleMessageConverter --> </bean>
注意,这里使用了一个 SingleConnectionFactory,这是spring对ConnectionFactory的一个实现,这个实现会在调用createConnection的地方返回相同的Connection并且忽略所有的close()方法,这样多个JMSTemplate可以共用一个相同的connection,避免每次都重复创建connection造成资源的浪费。
并且,我没有配置MessageConverter,而且使用Spring自带的SimpleMessageConverter,这是默认选项,不配就是使用SimpleMessageConverter。
第四步,配置JMS Message producer
<!-- POJO which send Message uses Spring JmsTemplate --> <bean id="userMessageProducer" class="cn.lettoo.meetingwatch.jms.UserMessageProducer"> <property name="template" ref="jmsTemplate" /> <property name="destination" ref="destinationQueue" /> </bean>
Message Producer通过JMS Template和destination来发送Message。下面是代码实现:
public class UserMessageProducer implements IMessageProducer { protected JmsTemplate template; protected Queue destination; public void setTemplate(JmsTemplate template) { this.template = template; } public void setDestination(Queue destination) { this.destination = destination; } public void send(Object object) { template.convertAndSend(this.destination, object); } }
因为我并没有为JMS template指定特定的MessageConverter,所以这里template.convertAndSend()应该就是SimpleMessageConverter.convertAndSend()来实现的。
第五步,配置JMS Message Consumer
我并没有在spring的配置文件单独配置一个Message Consumer,这是一个非常简单的POJO,这里只是为测试写的一个简单示例:只是简单把读到的Message打印出来。
public class UserMessageConsumer { public void printUser(User user) { user.getId(); user.getName(); System.out.println(user); } }
第六步,配置JMS Message Listener Container以及Listerer
<!-- Message Driven POJO --> <bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <constructor-arg> <bean class="cn.lettoo.meetingwatch.jms.UserMessageConsumer"> </bean> </constructor-arg> <property name="defaultListenerMethod" value="printUser" /> </bean> <!-- listener container --> <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="jmsConnectionFactory" /> <property name="destination" ref="destinationQueue" /> <property name="messageListener" ref="messageListener" /> </bean>
这里使用org.springframework.jms.listener.adapter.MessageListenerAdapter来实现一个Listener,这样做的好处是,可以使用非常简单的POJO来作为一个consumer,就是上面的UserMessageConsumer一样。当Queue里有一条Message的时候,会使用UserMessageConsumer的printUser来消费掉这条Message。
Spring JMS提供3种ListenerContainer,最常使用的是DefaultMessageListenerContainer。这里把Listener注入,来实现对Message的Listener。
参考:
1. Chapter 19. JMS (Java Message Service)
2. ActiveMQ5.0实战三:使用Spring发送,消费topic和queue消息
发表评论
-
ibookmark.me上线了!
2012-04-10 23:47 1550记得上次是2009年,接触到了python,gae,于是乎动手 ... -
使用virtualenv开发django应用
2012-01-12 11:03 1789Virtualenv是一个非常好的virtual python ... -
在Amazon EC2上试用play framework
2011-11-11 17:00 1425几个月以前,我在 ... -
一个Log生成工具小项目的实现
2011-11-01 17:32 1386这两天的主要工作是用java写一个log生成工具,用于 ... -
自定义log4j生成的log文件名
2011-11-01 13:57 2992很多时候,log4j的RollingFileAppen ... -
我和java操作数据库那些事儿(5)
2011-10-26 16:56 1672引用 半自动化武器来了:Spring JdbcTemplate ... -
我和JAVA数据库操作的那些事儿(4)
2011-10-24 16:21 1502通过前面几篇的介绍,对于JDBC的使用应该基本上够上项目开发的 ... -
我和JAVA数据库操作的那些事儿(3)
2011-10-20 15:35 2548在前面的两篇文章中,第一篇主要是讲了在jdbc编程中容 ... -
我和JAVA数据库操作的那些事儿(2)
2011-10-20 11:15 1581摘要 写道 上一篇提到的几个问题,在本篇有具体的代码。本篇后 ... -
图说事务隔离级别
2011-10-19 21:38 1451我们经常说的事务隔离级别,一般指的是SQL-92 ... -
我和JAVA数据库操作的那些事儿(1)
2011-10-19 15:26 2237摘要 我开始接触jdbc的 ... -
复习:观察者模式
2011-10-17 17:02 1003观察者模式(有时又被称为发布/订阅模式)是软件设计模式 ... -
复习:代理模式
2011-10-17 15:53 748代理模式是常用的Java 设计模式,它的特征是代理类与 ... -
对chainsaw中一个简单Job Scheduler的扩展
2011-10-14 23:36 1080今天在看apache chainsaw这个项目的源代码 ... -
Spring RMI 简单实现
2011-10-14 13:47 1066好久没有写java代码了,最近工作项目上需要做一个 ... -
一道关于树的面试题
2011-10-13 15:31 949记得不久以前有道面试题,要求下面的数据结构 ... -
一个简单的JAVA后台程序框架
2011-10-13 09:31 1788本项目已经通过git进行版本管理,checkout ... -
测试驱动开发:红、绿、重构
2011-10-12 23:01 901在读Ruby on Rails Tutorial: ...
相关推荐
详细内容: SpringJMS整合ActiveMQ.doc 详细说明文档 apache-activemq-5.8.0-bin.zip ActiveMQ安装包 JMSTest.rar MyEclipse8.5下web工程
综上所述,Spring整合JMS和ActivemQ提供了一套完整的解决方案,帮助开发者轻松地在应用中实现消息的发送和接收。通过这种方式,可以构建出高可用、松耦合、可扩展的分布式系统,提高系统的稳定性和响应速度。在实际...
基于Spring+JMS+ActiveMQ+Tomcat,我使用的版本情况如下所示:Spring 3.2.0,ActiveMQ 5.4.3,Tomcat 6.0.43。本例通过详细的说明和注释,实现消息服务的基本功能:发送与接收。Spring对JMS提供了很好的支持,可以...
本章我将通过spring jms和activemq实现单Web项目服务器间异步访问和多Web项目服务器间异步访问。 一. 简介 1. 什么是Apache ActiveMq Apache ActiveMq是最流行和最强大的开源消息和集成服务器。同时Apache ActiveMq...
标题 "jms Spring+ActiveMQ 5.4.2" 涉及的是Java消息服务(JMS)在Spring框架中的应用,以及ActiveMQ作为消息代理的使用。在这个主题下,我们将深入探讨JMS的基本概念、Spring对JMS的支持以及ActiveMQ 5.4.2版本的...
Spring、JMS、ActiveMQ和Tomcat是Java开发中常用的技术组件,它们分别扮演着不同的角色,构建出高效的企业级消息通信系统。本教程将详细阐述这些技术的使用及其相互结合的方式。 首先,Spring是一个开源的Java应用...
在IT行业中,Spring框架...Spring简化了JMS的集成和管理,ActiveMQ作为强大的消息中间件,保证了消息的稳定传输。通过理解和掌握这一技术栈,开发者可以构建出高可用、松耦合的应用系统,提高系统的整体性能和稳定性。
集成SpringJMS和ActiveMQ首先需要在Spring配置中定义ConnectionFactory和Destination(Topic或Queue)。ConnectionFactory是JMS客户端用来创建与消息服务器的连接的工厂,而Destination是消息的目的地。Spring的`...
1. 添加依赖:在项目的Maven或Gradle配置文件中引入ActiveMQ和Spring JMS的相关库。 2. 配置ConnectionFactory:这是连接到ActiveMQ服务器的关键,可以通过XML配置或Java配置来定义。 3. 创建Destination:JMS中的...
基于Spring+JMS+ActiveMQ+Tomcat,做一个Spring4.1.0和ActiveMQ5.11.1整合实例,实现了Point-To-Point的异步队列消息和PUB/SUB(发布/订阅)模型,简单实例,不包含任何业务。
标题“jms+activeMq+spring学习简单例子”表明这个压缩包包含了一些示例代码,用于演示如何在Spring框架中集成JMS和ActiveMQ,以便于理解和学习。通过这个例子,开发者可以了解如何在实际应用中实现基于消息的通信。...
基于Spring+JMS+ActiveMQ+Tomcat的整合ActiveMQSpringDemo实例源码,此实例基于Spring+JMS+ActiveMQ+Tomcat,注解的完整实例,包含jar包,可供学习及设计参考。
ActiveMQ的主要特点包括支持多种编程语言和协议的客户端、完全支持JMS 1.1和J2EE 1.4规范、对Spring的支持、支持多种传输协议以及持久化和事务处理能力。 在实际的项目中,消息队列经常被用于将一些耗时的操作,如...
将Spring与ActiveMQ整合,可以轻松地在Spring应用中实现消息队列的功能,提高系统的可扩展性和可靠性。 首先,让我们了解Spring框架如何支持消息传递。Spring提供了JmsTemplate类,这是一个模板类,用于简化发送和...
**JMS(Java Message Service)** 是Java平台中用于企业级消息传递的一种API,它提供了一种标准的方式来...通过深入学习和实践,你可以理解如何在Java应用中使用ActiveMQ进行消息传递,并利用Spring框架简化这一过程。
Spring整合JMS基于ActiveMQ实现是一项常见的企业级应用开发任务,它涉及到Spring框架、Java消息服务(JMS)以及ActiveMQ消息中间件的使用。在本文中,我们将深入探讨这三个关键概念,以及如何将它们有效地结合在一起...
通过以上知识点的讲解,我们可以看到,ActiveMQ与Spring的整合使得在Java应用中使用JMS变得更加简单和高效。无论是消息的生产还是消费,都能通过Spring的抽象和ActiveMQ的稳定性能得到很好的支持。在实际项目中,...
10. **Tomcat服务器**:Tomcat是一个流行的Java Web服务器,它可以部署和运行使用Spring和ActiveMQ的Web应用程序。 通过上述知识点,我们可以理解如何在Spring环境中利用ActiveMQ进行消息传递,实现高并发、解耦的...
### Spring+JMS+ActiveMQ+Tomcat 实现消息服务 #### 一、技术栈介绍 在本案例中,我们采用的技术栈为Spring 2.5、ActiveMQ 5.4.0 和 Tomcat 6.0.30。这些技术的结合能够有效地构建一个可靠的消息传递系统。 - **...
标题“spring2 ...分析这些文件可以帮助理解如何将Spring、ActiveMQ和Tomcat集成在一起,实现消息传递功能。学习这个例子可以加深对JMS、Spring集成以及Web应用部署的理解,对于提升Java EE开发技能非常有帮助。