- 浏览: 386657 次
- 性别:
- 来自: 印度
文章分类
最新评论
-
天天来注册:
...
多线程的死锁 -
memoryisking:
可以看看这篇文章,这个是struts教程网上一个简单的例子,构 ...
Java5中的线程池实例讲解 -
a123159521:
菜鸟不再菜 写道楼主很明显没有说明守护线程到底是怎么服务Use ...
守护线程总结 -
jjruanlili:
要搞个executor和nio的结合,差不多
Java5中的线程池实例讲解 -
josico:
纠正楼主一个问题‘如果四个队员都在忙时,再有新的任务,这个小组 ...
线程池ThreadPoolExecutor使用简介
简介:
[1]
在介绍ActiveMQ之前,首先简要介绍一下JMS规范。
JMS的简介:
(1)
JMS(Java Message Service,Java消息服务)是一组Java应用程序接口(Java API),它提供创建、发送、接收、读取消息的服务。JMS 使您能够通过消息收发服务从一个 JMS 客户机向另一个 JML 客户机交流消息。
JMS是一种与厂商无关的 API,用来访问消息收发系统。它类似于 JDBC (Java Database Connectivity):这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商目前都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ.
(2)
JMS典型的应用场景:
操作可异步执行.
发email了, 发msn消息了.
或者一些比较耗时的操作, 比如要在某目录下生成一个大报表. 操作者把指令发出去就完事.
[2]
JMS的基本构件:
(1)
Broker
什么是Broker呢?可以把JMS Brokers 看成是服务器端。这个服务器可以独立运行.也可以随着其他容器
以内嵌方式云心,如下配置:
使用显示的Java代码创建
BrokerService broker = new BrokerService();
// configure the broker
broker.addConnector("tcp://localhost:61616");
broker.start();
使用BrokerFacotry创建
BrokerService broker = BrokerFactory.getInstance().createBroker(someURI);
使用Spring Bean创建
<bean id=”broker” class=”org.apache.activemq.xbean.BrokerFactoryBean”>
<property name=”config” value=”classpath:org/apache/activemq/xbean/activemq.xml” />
<property name=”start” value=”true” />
</bean>
还可以使用XBean或Spring 2.0等多种配置方式配置,
通过ActiveMQConnectionFactory还可以隐含的创建内嵌的broker,这个broker就不是一个独立的服务了。
<bean id=”jmsTemplate” class=”org.springframework.jms.core.JmsTemplate”>
<property name=”connectionFactory” ref=”jmsFactory”/>
<property name=”defaultDestination” ref=”destination” />
<property name=”destinationResolver” ref=”默认是DynamicDestionResolver” />
<property name=”pubSubDomain”><value>true or false默认是false,
false是QueneDestination, true是TopicDestination</value>
</bean>
上面的defaultDestination是指默认发送和接收的目的地,我们也可以不指定,而是通过目的地名称让jmsTemplate自动帮我们创建.
(2)
1 连接工厂
连接工厂是客户用来创建连接的对象,例如ActiveMQ提供的ActiveMQConnectionFactory。
2 连接
JMS Connection封装了客户与JMS提供者之间的一个虚拟的连接。
3 会话
JMS Session是生产和消费消息的一个单线程上下文。会话用于创建消息生产者(producer)、消息消费者(consumer)和消息(message)等。会话提供了一个事务性的上下文,在这个上下文中,一组发送和接收被组合到了一个原子操作中。
(3)
目的地:
目的地是客户用来指定它生产的消息的目标和它消费的消息的来源的对象。JMS1.0.2规范中定义了两种
消息传递域:Point-to-Point消息(P2P),点对点;发布订阅消息(Publish Subscribe messaging,简称Pub/Sub)
两者的区别:
P2P消息模型是在点对点之间传递消息时使用。如果应用程序开发者希望每一条消息都能够被处理,那么应该使用P2P消息模型。与Pub/Sub消息模型不同,P2P消息总是能够被传送到指定的位置。
P2P消息,每个消息只能有一个消费者。
Pub/Sub模型在一到多的消息广播时使用。如果一定程度的消息传递的不可靠性可以被接受的话,那么应用程序开发者也可以使用Pub/Sub消息模型。换句话说,它适用于所有的消息消费程序并不要求能够收到所有的信息或者消息消费程序并不想接收到任何消息的情况。
Pub/Sub,每个消息可以有多个消费者。
在点对点消息传递域中,目的地被成为队列(queue);在发布/订阅消息传递域中,目的地被成为主题(topic)。
(3)
3.1
消息生产者
消息生产者是由会话创建的一个对象,用于把消息发送到一个目的地。
3.2
消息消费者
消息消费者是由会话创建的一个对象,它用于接收发送到目的地的消息。消息的消费可以采用以下两种
方法之一:
? 异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。(异步操作)
? 同步消费。通过调用消费者的receive方法从目的地中显式提取消息。receive方法可以一直阻塞到消息到达。
3.3
消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来划分,可以将消息分为几种类型,它们分别携带:
简单文本 (TextMessage)、可序列化的对象 (ObjectMessage)、属性集合
(MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。
(4)
JMS定义了从0到9的优先级路线级别,0是最低的优先级而9则是最高的。更特殊的是0到4是正常优先级的变化幅度,而5到9是加快的优先级的变化幅度。
[3]
ActiveMQ简介:
ActiveMQ 是开源的JMS实现,Geronimo应用服务器就是使用的ActiveMQ提供JMS服务。
安装
在http://activemq.apache.org/download.html 下载5.0.0发行包,解压即可,
启动
window环境运行解压目录下的/bin/activemq.bat
测试
ActiveMQ默认使用的TCP连接端口是61616, 通过查看该端口的信息可以测试ActiveMQ是否成功启动
window环境运行 netstat -an|find "61616"
监控
ActiveMQ5.0版本默认启动时,启动了内置的jetty服务器,提供一个demo应用和用于监控ActiveMQ的admin应用。
admin:http://127.0.0.1:8161/admin/
demo:http://127.0.0.1:8161/demo/
点击demo应用中的“ Market data publisher ”,就会发一些测试的消息。转到admin页面的topics menu下面(queue和topic的区别见 http://andyao.iteye.com/blog/153173 ),可以看到消息在增长。
ActiveMQ5.0的配置文件在解压目录下的/conf目录下面。主要配置文件为activemq.xml.
说明:
(2)
VM Transport
VM transport允许在VM内部通信,从而避免了网络传输的开销。这时候采用的连接不是socket连接,而是直接地方法调用。第一个创建VM 连接的客户会启动一个embed VM broker,接下来所有使用相同的broker name的VM连接都会使用这个broker。当这个broker上所有的连接都关闭的时候,这个broker也会自动关闭。
TCP Transport
TCP transport 允许客户端通过TCP socket连接到远程的broker。以下是配置语法:
tcp://hostname:port?transportOptions
tcp://localhost:61616
(3)
3.1
启动activeMQ后,用户创建的queues会被保存在activeMQ解压目录下的\data\kr-store\data中.
3.2
创建queue,可以在代码中创建,也可以直接进入http://localhost:8161/admin--->点击queue--->在上面的field中填下你要创建的queue名-->点击创建即可.
3.3
若用户创建的queue,不是持久化的,则在重启activeMQ后,数据文件中的内容会被清空,但文件仍存在.
(4)
messageProducer发送消息后,会在保存在目的地,即上面的queue中,也即就是在\data\kr-store\data目录下的文件中.
messageReceiver(连接到相同目的地的接收者),不需要立即接收.只要activeMQ的服务端不关闭,当运行接收者,连接到activeMQ的服务端时,就可以获取activeMQ服务端上已发送的消息.
发送/接收的消息情况及数量及消息的内容与处理(删除),可以在
http://localhost:8161/admin/queue.jsp中查看,操作.
(5)
可以将activeMQ的服务端放于一PC中,发送者位于另一PC,接收者也位于另一PC中.
只要:tcp://activeMQ的服务端IP:activeMQ的服务端口,进行连接即可.
(6)
queue消息,只被消费一次.
实例二:
异步的电子邮件(将topic的应用实例也放在一起)
(4.1)
背景说明:
以传统的方式发送电子邮件(作为同步请求的一部分)会引起一些问题。首先,连接到电子邮件服务器需要一次网络往返,速度可能会很慢,尤其是在服务器非常繁忙的时候。过载的电子邮件服务器甚至可以使依赖于电子邮件的服务暂时不可用。
xa 事务支持
另一个显而易见的问题是,电子邮件服务器通常在本质上是非事务性的。当事务被回滚时,这可以导致出现不一致的通知——把一条消息放入队列之后不能取消它。幸运的是, jms 支持事务,而且可以通过把消息的发送延迟到提交底层事务的时候来解决这个问题。
(4.2)
实现效果:
用户在更改密码后,系统会发送邮件通知用户,为了避免发送邮件时程序对用户操作的阻塞,可以用JMS异步发送邮件.
(4.3)
实现流程:
当用户更改密码后
--1->调用JMS的发送者,发送者会利用jmsTemplate发送消息到目的地
--2->系统,执行原系统的程序.
--2->当消息发送后,messageListener侦听到消息,接收后执行相应的方法handleMessage().在此方法中执行发送email.
好处:
从而实现了异步发送邮件,避免用户等待单一线程完成,使原程序的执行更快,提升用户体验
安装注意事项:amq:broker无法认识,如下图修改
applicationContext-jms.xml
如果是持久化到mysql,spring配置为:
注意:要把mysql驱动放到应用的lib目录下
注释掉消费者配置,然后就可以看到数据库中存在未发送的消息,取消注释,重新启动应用,消息重新发送出去,表的记录也被删除
[1]
在介绍ActiveMQ之前,首先简要介绍一下JMS规范。
JMS的简介:
(1)
JMS(Java Message Service,Java消息服务)是一组Java应用程序接口(Java API),它提供创建、发送、接收、读取消息的服务。JMS 使您能够通过消息收发服务从一个 JMS 客户机向另一个 JML 客户机交流消息。
JMS是一种与厂商无关的 API,用来访问消息收发系统。它类似于 JDBC (Java Database Connectivity):这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商目前都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ.
(2)
JMS典型的应用场景:
操作可异步执行.
发email了, 发msn消息了.
或者一些比较耗时的操作, 比如要在某目录下生成一个大报表. 操作者把指令发出去就完事.
[2]
JMS的基本构件:
(1)
Broker
什么是Broker呢?可以把JMS Brokers 看成是服务器端。这个服务器可以独立运行.也可以随着其他容器
以内嵌方式云心,如下配置:
使用显示的Java代码创建
BrokerService broker = new BrokerService();
// configure the broker
broker.addConnector("tcp://localhost:61616");
broker.start();
使用BrokerFacotry创建
BrokerService broker = BrokerFactory.getInstance().createBroker(someURI);
使用Spring Bean创建
<bean id=”broker” class=”org.apache.activemq.xbean.BrokerFactoryBean”>
<property name=”config” value=”classpath:org/apache/activemq/xbean/activemq.xml” />
<property name=”start” value=”true” />
</bean>
还可以使用XBean或Spring 2.0等多种配置方式配置,
通过ActiveMQConnectionFactory还可以隐含的创建内嵌的broker,这个broker就不是一个独立的服务了。
<bean id=”jmsTemplate” class=”org.springframework.jms.core.JmsTemplate”>
<property name=”connectionFactory” ref=”jmsFactory”/>
<property name=”defaultDestination” ref=”destination” />
<property name=”destinationResolver” ref=”默认是DynamicDestionResolver” />
<property name=”pubSubDomain”><value>true or false默认是false,
false是QueneDestination, true是TopicDestination</value>
</bean>
上面的defaultDestination是指默认发送和接收的目的地,我们也可以不指定,而是通过目的地名称让jmsTemplate自动帮我们创建.
(2)
1 连接工厂
连接工厂是客户用来创建连接的对象,例如ActiveMQ提供的ActiveMQConnectionFactory。
2 连接
JMS Connection封装了客户与JMS提供者之间的一个虚拟的连接。
3 会话
JMS Session是生产和消费消息的一个单线程上下文。会话用于创建消息生产者(producer)、消息消费者(consumer)和消息(message)等。会话提供了一个事务性的上下文,在这个上下文中,一组发送和接收被组合到了一个原子操作中。
(3)
目的地:
目的地是客户用来指定它生产的消息的目标和它消费的消息的来源的对象。JMS1.0.2规范中定义了两种
消息传递域:Point-to-Point消息(P2P),点对点;发布订阅消息(Publish Subscribe messaging,简称Pub/Sub)
两者的区别:
P2P消息模型是在点对点之间传递消息时使用。如果应用程序开发者希望每一条消息都能够被处理,那么应该使用P2P消息模型。与Pub/Sub消息模型不同,P2P消息总是能够被传送到指定的位置。
P2P消息,每个消息只能有一个消费者。
Pub/Sub模型在一到多的消息广播时使用。如果一定程度的消息传递的不可靠性可以被接受的话,那么应用程序开发者也可以使用Pub/Sub消息模型。换句话说,它适用于所有的消息消费程序并不要求能够收到所有的信息或者消息消费程序并不想接收到任何消息的情况。
Pub/Sub,每个消息可以有多个消费者。
在点对点消息传递域中,目的地被成为队列(queue);在发布/订阅消息传递域中,目的地被成为主题(topic)。
(3)
3.1
消息生产者
消息生产者是由会话创建的一个对象,用于把消息发送到一个目的地。
3.2
消息消费者
消息消费者是由会话创建的一个对象,它用于接收发送到目的地的消息。消息的消费可以采用以下两种
方法之一:
? 异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。(异步操作)
? 同步消费。通过调用消费者的receive方法从目的地中显式提取消息。receive方法可以一直阻塞到消息到达。
3.3
消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来划分,可以将消息分为几种类型,它们分别携带:
简单文本 (TextMessage)、可序列化的对象 (ObjectMessage)、属性集合
(MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。
(4)
JMS定义了从0到9的优先级路线级别,0是最低的优先级而9则是最高的。更特殊的是0到4是正常优先级的变化幅度,而5到9是加快的优先级的变化幅度。
[3]
ActiveMQ简介:
ActiveMQ 是开源的JMS实现,Geronimo应用服务器就是使用的ActiveMQ提供JMS服务。
安装
在http://activemq.apache.org/download.html 下载5.0.0发行包,解压即可,
启动
window环境运行解压目录下的/bin/activemq.bat
测试
ActiveMQ默认使用的TCP连接端口是61616, 通过查看该端口的信息可以测试ActiveMQ是否成功启动
window环境运行 netstat -an|find "61616"
监控
ActiveMQ5.0版本默认启动时,启动了内置的jetty服务器,提供一个demo应用和用于监控ActiveMQ的admin应用。
admin:http://127.0.0.1:8161/admin/
demo:http://127.0.0.1:8161/demo/
点击demo应用中的“ Market data publisher ”,就会发一些测试的消息。转到admin页面的topics menu下面(queue和topic的区别见 http://andyao.iteye.com/blog/153173 ),可以看到消息在增长。
ActiveMQ5.0的配置文件在解压目录下的/conf目录下面。主要配置文件为activemq.xml.
说明:
(2)
VM Transport
VM transport允许在VM内部通信,从而避免了网络传输的开销。这时候采用的连接不是socket连接,而是直接地方法调用。第一个创建VM 连接的客户会启动一个embed VM broker,接下来所有使用相同的broker name的VM连接都会使用这个broker。当这个broker上所有的连接都关闭的时候,这个broker也会自动关闭。
TCP Transport
TCP transport 允许客户端通过TCP socket连接到远程的broker。以下是配置语法:
tcp://hostname:port?transportOptions
tcp://localhost:61616
(3)
3.1
启动activeMQ后,用户创建的queues会被保存在activeMQ解压目录下的\data\kr-store\data中.
3.2
创建queue,可以在代码中创建,也可以直接进入http://localhost:8161/admin--->点击queue--->在上面的field中填下你要创建的queue名-->点击创建即可.
3.3
若用户创建的queue,不是持久化的,则在重启activeMQ后,数据文件中的内容会被清空,但文件仍存在.
(4)
messageProducer发送消息后,会在保存在目的地,即上面的queue中,也即就是在\data\kr-store\data目录下的文件中.
messageReceiver(连接到相同目的地的接收者),不需要立即接收.只要activeMQ的服务端不关闭,当运行接收者,连接到activeMQ的服务端时,就可以获取activeMQ服务端上已发送的消息.
发送/接收的消息情况及数量及消息的内容与处理(删除),可以在
http://localhost:8161/admin/queue.jsp中查看,操作.
(5)
可以将activeMQ的服务端放于一PC中,发送者位于另一PC,接收者也位于另一PC中.
只要:tcp://activeMQ的服务端IP:activeMQ的服务端口,进行连接即可.
(6)
queue消息,只被消费一次.
实例二:
异步的电子邮件(将topic的应用实例也放在一起)
(4.1)
背景说明:
以传统的方式发送电子邮件(作为同步请求的一部分)会引起一些问题。首先,连接到电子邮件服务器需要一次网络往返,速度可能会很慢,尤其是在服务器非常繁忙的时候。过载的电子邮件服务器甚至可以使依赖于电子邮件的服务暂时不可用。
xa 事务支持
另一个显而易见的问题是,电子邮件服务器通常在本质上是非事务性的。当事务被回滚时,这可以导致出现不一致的通知——把一条消息放入队列之后不能取消它。幸运的是, jms 支持事务,而且可以通过把消息的发送延迟到提交底层事务的时候来解决这个问题。
(4.2)
实现效果:
用户在更改密码后,系统会发送邮件通知用户,为了避免发送邮件时程序对用户操作的阻塞,可以用JMS异步发送邮件.
(4.3)
实现流程:
当用户更改密码后
--1->调用JMS的发送者,发送者会利用jmsTemplate发送消息到目的地
--2->系统,执行原系统的程序.
--2->当消息发送后,messageListener侦听到消息,接收后执行相应的方法handleMessage().在此方法中执行发送email.
好处:
从而实现了异步发送邮件,避免用户等待单一线程完成,使原程序的执行更快,提升用户体验
安装注意事项:amq:broker无法认识,如下图修改
applicationContext-jms.xml
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <!-- 第一步: 创建broker,jmsFactory,destination,messageConverter,jmsTemplate(用于发送JMS消息). 然后将jmsFactory,Destination,MessageConverter放入jmsTemplate中. --> <!-- 在Spring中配置嵌入式的 activemq broker,这样就不用在使用JMS时,要手动启动activeMQ broker --> <amq:broker useJmx="false" persistent="false"> <amq:transportConnectors> <amq:transportConnector uri="tcp://localhost:61616" /> </amq:transportConnectors> </amq:broker> <!-- 消息的存储机制, 服务器重启也不会丢失消息. <amq:broker useJmx="false" persistent="true"> <amq:persistenceAdapter> <amq:amqPersistenceAdapter directory="d:/amq"/> </amq:persistenceAdapter> <amq:transportConnectors> <amq:transportConnector uri="tcp://localhost:61616" /> <amq:transportConnector uri="vm://localhost:0" /> </amq:transportConnectors> </amq:broker> --> <!-- Connection Factory。 --> <bean id="jmsFactory2" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <!-- 也可直接用: <amq:connectionFactory id="jmsFactory2" brokerURL="tcp://localhost:61616" /> --> <!-- 配置消息发送目的地: --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="MY.topic" /> </bean> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="MY.queue" /> </bean> <!-- 也可直接用: <amq:topic name="topicDestination" physicalName="MY.topic"/> <amq:queue name="queueDestination" physicalName="MY.queue"/> --> <!-- 配置Spring中消息发送的JMS Template --> <bean id="producerJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory"> <bean class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="jmsFactory2" /> </bean> </property> <!-- 因为要实现此模板同时用于queue与topic,所以目的地要放于发送者中.若单独使用,放于此,更方便! <property name="defaultDestination" ref="queueDestination" /> --> <property name="messageConverter" ref="userMessageConverter" /> </bean> <!-- 消息转换器 --> <bean id="userMessageConverter" class="com.test.jms.UserMessageConverter" /> <!-- 第二步:配置发送消息: --> <!-- 发送queue消息 --> <bean id="userMessageProducer" class="com.test.jms.UserMessageProducer"> <property name="jmsTemplate" ref="producerJmsTemplate" /> <property name="defaultDestination" ref="queueDestination" /> </bean> <!-- 发送topic消息 --> <bean id="topicMessageProducer" class="com.test.jms.TopicMessageProducer"> <property name="jmsTemplate" ref="producerJmsTemplate" /> <property name="defaultDestination" ref="topicDestination" /> </bean> <!-- 定义消息消费者,然后直接在messageListener中调用. 消费者,不用加入jmsTemplate属性,jmsTemplate只用于发送消息 --> <!-- queue消息消费者,只能一个 --> <bean id="userMessageConsumer" class="com.test.jms.UserMessageConsumer" /> <!-- topic消息消费者,可以多个 --> <bean id="topicConsumerA" class="com.test.jms.TopicConsumerA" /> <bean id="topicConsumerB" class="com.test.jms.TopicConsumerB" /> <!--配置messageListener,用它来接收消息.(用jmsTemplate来发送消息) --> <!-- 定义queue,topic(A/B consumer)各自的侦听器 --> <bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <constructor-arg ref="userMessageConsumer" /> <property name="defaultListenerMethod" value="handleMessage" /> <property name="messageConverter" ref="userMessageConverter" /> </bean> <bean id="topicListenerA" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <constructor-arg ref="topicConsumerA" /> <property name="defaultListenerMethod" value="receiveA" /> <property name="messageConverter" ref="userMessageConverter" /> </bean> <bean id="topicListenerB" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <constructor-arg ref="topicConsumerB" /> <property name="defaultListenerMethod" value="receiveB" /> <property name="messageConverter" ref="userMessageConverter" /> </bean> <!-- 配置消息侦听容器,并指定我们定义的消息侦听器。 --> <!-- 定义queue,topic(A/B consumer)各自的侦听容器 --> <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="concurrentConsumers" value="5" /> <property name="connectionFactory" ref="jmsFactory2" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="messageListener" /> </bean> <bean id="topicListenerContainerA" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="concurrentConsumers" value="5" /> <property name="connectionFactory" ref="jmsFactory2" /> <property name="destination" ref="topicDestination" /> <property name="messageListener" ref="topicListenerA" /> </bean> <bean id="topicListenerContainerB" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="concurrentConsumers" value="5" /> <property name="connectionFactory" ref="jmsFactory2" /> <property name="destination" ref="topicDestination" /> <property name="messageListener" ref="topicListenerB" /> </bean> </beans>
package com.test.jms; import java.io.Serializable; public class PersonInfo implements Serializable { private static final long serialVersionUID = -1018868413858723239L; String id; String name; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String toString() { return this.id + ", " + this.name; } }
package com.test.jms; import javax.jms.JMSException; public class TopicConsumerA { public void receiveA(PersonInfo personInfo) throws JMSException { System.out.println("TopicConsumerA收到TopicProducer的消息---->personInfo的用户名是:" + personInfo.getName()); } }
package com.test.jms; import javax.jms.JMSException; public class TopicConsumerB { public void receiveB(PersonInfo personInfo) throws JMSException { System.out.println("TopicConsumerB收到TopicProducer的消息---->personInfo的用户名是:" + personInfo.getName()); } }
package com.test.jms; import javax.jms.Topic; import org.springframework.jms.core.JmsTemplate; public class TopicMessageProducer { private JmsTemplate jmsTemplate; private Topic defaultDestination; public void sendTopicMessage(PersonInfo personInfo) { getJmsTemplate().convertAndSend(this.defaultDestination, personInfo); } public JmsTemplate getJmsTemplate() { return jmsTemplate; } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } public Topic getDefaultDestination() { return defaultDestination; } public void setDefaultDestination(Topic defaultDestination) { this.defaultDestination = defaultDestination; } }
package com.test.jms; import javax.jms.JMSException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; public class UserMessageConsumer { private static transient Log logger = LogFactory.getLog(UserMessageConsumer.class); public void handleMessage(PersonInfo personInfo) throws JMSException { if (logger.isDebugEnabled()) { logger.debug("Receive a User object from ActiveMQ: " + personInfo.toString()); } // mailSender.send(personInfo, "h***7@126.com"); System.out.println("UserMessageConsumer recive : " + personInfo); } }
package com.test.jms; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.ObjectMessage; import javax.jms.Session; import org.apache.activemq.command.ActiveMQObjectMessage; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.jms.support.converter.MessageConverter; public class UserMessageConverter implements MessageConverter { private static transient Log logger = LogFactory.getLog(UserMessageConverter.class); public Object fromMessage(Message message) throws JMSException { if (logger.isDebugEnabled()) { logger.debug("Receive JMS message: " + message); } if (message instanceof ObjectMessage) { ObjectMessage oMsg = (ObjectMessage) message; if (oMsg instanceof ActiveMQObjectMessage) { ActiveMQObjectMessage aMsg = (ActiveMQObjectMessage) oMsg; try { PersonInfo personInfo = (PersonInfo) aMsg.getObject(); return personInfo; } catch (Exception e) { logger.error("Message:[" + message + "] is not a instance of personInfo."); throw new JMSException("Message:[" + message + "] is not a instance of personInfo."); } } else { logger.error("Message:[" + message + "] is not " + "a instance of ActiveMQObjectMessage[personInfo]."); throw new JMSException("Message:[" + message + "] is not " + "a instance of ActiveMQObjectMessage[personInfo]."); } } else { logger.error("Message:[" + message + "] is not a instance of ObjectMessage."); throw new JMSException("Message:[" + message + "] is not a instance of ObjectMessage."); } } public Message toMessage(Object obj, Session session) throws JMSException { if (logger.isDebugEnabled()) { logger.debug("Convert User object to JMS message: " + obj); } if (obj instanceof PersonInfo) { ActiveMQObjectMessage msg = (ActiveMQObjectMessage) session.createObjectMessage(); msg.setObject((PersonInfo) obj); return msg; } else { logger.error("Object:[" + obj + "] is not a instance of PersonInfo."); throw new JMSException("Object:[" + obj + "] is not a instance of PersonInfo."); } } }
package com.test.jms; import javax.jms.Queue; import org.springframework.jms.core.JmsTemplate; public class UserMessageProducer { private JmsTemplate jmsTemplate; // 因为"目的地"属性放于jmsTemplate中,则不用添加此属性. private Queue defaultDestination; public void sendUserLoginInformationMail(PersonInfo personInfo) { // 若"目的地"属性放于jmsTemplate中,则用此方式 // getJmsTemplate().convertAndSend(personInfo); getJmsTemplate().convertAndSend(this.defaultDestination, personInfo); } public JmsTemplate getJmsTemplate() { return jmsTemplate; } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } public Queue getDefaultDestination() { return defaultDestination; } public void setDefaultDestination(Queue defaultDestination) { this.defaultDestination = defaultDestination; } }
package com.test.jms; import java.io.IOException; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.springframework.context.ApplicationContext; import org.springframework.web.context.support.WebApplicationContextUtils; public class TestJms extends HttpServlet { private static final long serialVersionUID = -7800122914813997741L; public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { this.doPost(request, response); } public void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { PersonInfo p = new PersonInfo(); p.setId("11"); p.setName("11name"); ApplicationContext context = WebApplicationContextUtils.getRequiredWebApplicationContext(this.getServletContext()); UserMessageProducer userMessageProducer = (UserMessageProducer) context.getBean("userMessageProducer"); userMessageProducer.sendUserLoginInformationMail(p); TopicMessageProducer topicMessageProducer = (TopicMessageProducer) context.getBean("topicMessageProducer"); topicMessageProducer.sendTopicMessage(p); } }
如果是持久化到mysql,spring配置为:
<!-- 消息的存储机制, 服务器重启也不会丢失消息. --> <amq:broker useJmx="false" persistent="true"> <amq:persistenceAdapter> <amq:jdbcPersistenceAdapter dataSource="#mysql-ds"/> </amq:persistenceAdapter> <amq:transportConnectors> <amq:transportConnector uri="tcp://localhost:61616" /> <amq:transportConnector uri="vm://localhost:0" /> </amq:transportConnectors> </amq:broker> <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> <property name="username" value="root"/> <property name="password" value="root"/> <property name="poolPreparedStatements" value="true"/> </bean>
注意:要把mysql驱动放到应用的lib目录下
注释掉消费者配置,然后就可以看到数据库中存在未发送的消息,取消注释,重新启动应用,消息重新发送出去,表的记录也被删除
发表评论
-
通过spring开发ActiveMQ简单应用
2011-01-23 20:27 1422使用spring的支持类开发JMS程序可以简化代码,确保开发质 ... -
IOC控制反转和DI依赖注入区别
2010-08-10 15:54 2338IOC控制反转:说的是创建对象实例的控制权从代码控制剥离到IO ... -
Spring2.5注解实现AOP
2010-08-03 16:23 1006http://hypgr.iteye.com/blog/418 ... -
AOP原理简介
2010-08-03 16:03 21311. 面向切面编程(AOP)原理以及Helloworl ... -
Spring IOC/DI/注解-理论与实例并存
2010-08-03 13:57 3233一、定义:Spring 是一个开源的控制反转(Inversio ... -
SpringAOP结合MemCached做缓存的设想
2010-07-08 08:54 13261, 给DAO的方法上加SpringAOP的Around通知, ... -
Spring jdbc 对象Mapper的简单封装
2010-07-07 14:06 2297一般查询实体的时候,都需要这么使用: /** *//** ... -
spring的通用事务管理机制
2010-05-16 18:28 1191<tx:advice id="service ... -
Spring 的优秀工具类盘点
2010-04-26 21:43 1061http://www.ibm.com/developerwor ... -
Struts与Spring整合
2010-04-19 00:21 1055如果需要使用第三方MVC框架,则不能在web.xml文件中配置 ...
相关推荐
Struts2.1.8+Spring2.5.6+Hibernate3.3.2是经典的Java Web开发框架组合,常被称为SSH。这三个框架协同工作,为开发者提供了强大的模型-视图-控制器(MVC)架构支持,使得企业级应用的开发更加高效和模块化。 Struts...
一、Spring MVC环境搭建:(Spring 2.5.6 + Hibernate 3.2.0) 1. jar包引入 Spring 2.5.6:spring.jar、spring-webmvc.jar、commons-logging.jar、cglib-nodep-2.1_3.jar Hibernate 3.6.8:hibernate3.jar...
spring 2.5.6 + struts 2.2.1 + hibernate 3.5.5 内有 AOP 的切面、全部的注解的 M V C 结构,之外还在上网上找了一个同学的学习笔记一起打包,感觉好不错!
综上所述,Struts2.1.6+Spring2.5.6+Hibernate3.3.2的整合提供了全面的Java Web开发解决方案,它简化了开发流程,提高了代码的可维护性和可扩展性。开发者可以通过掌握这三个框架的整合,提升自身在企业级应用开发中...
Struts2.1.6+Spring2.5.6+Hibernate3.3.1框架整合开发 http://www.cnitblog.com/intrl/archive/2009/04/13/56322.aspx 这篇文章的源码由于他的上传的空间用户可能下载失败,作者相当牛而且具有奉献精神
标题和描述中提到的"hibernate3.3.2+spring2.5.6+struts2.1.6整合包+anntations"是指一个经典的Java Web开发框架集成,通常被称为SSH(Struts2、Spring、Hibernate)框架。这个整合包包含这三个组件的特定版本,用于...
在深入探讨Struts2.1.6+Spring2.5.6+Hibernate3.3.1全注解的实现细节之前,我们先回顾一下这三个框架的基本概念及其在JavaEE企业级开发中的作用。 ### Struts2框架 Struts2是一个开源的Web应用框架,它遵循MVC...
在"struts2+spring2.5.6+hibernate3.0+ext"的整合开发中,通常会使用Spring来管理Struts2的Action类,实现业务逻辑的解耦。同时,Spring可以作为Hibernate的事务管理器,处理数据库操作。Ext则用于创建前端用户界面...
Tapestry5.1+Spring2.5.6+Hibernate3.2.5写得简单CURD的程序,借鉴了SpringSide3.1.4.2的配置及数据层。实现了添加,列表,修改,删除,显示等操作。内有源码及生成的WAR文件。可以直接使用!
struts2.2.3+spring2.5.6+hibernate3.2 内部WEB-INF/lib有所用的全部JAR,不用自己另外再找一些JAR webroot/database下有mysql所用的数据库创建脚本和已经存在的一些数据 student_data.sql 已有数据 student.sql...
Struts2+Spring2.5.6+Hibernate3 用到的jar包 antlr-2.7.6.jar asm.jar backport-util-concurrent.jar cglib-nodep-2.1_3.jar commons-collections-3.1.jar commons-dbcp.jar commons-fileupload-1.2.1.jar commons-...
Struts1、Spring 2.5.6 和 JDBC 是经典的Java Web开发技术组合,它们各自在应用程序架构中扮演着不同的角色。Struts1是MVC(Model-View-Controller)框架,Spring则是一个全面的轻量级应用框架,而JDBC(Java ...
《基于Struts2+Hibernate3.3+Spring2.5.6+ExtJS3.2的图书管理系统详解》 在当今信息化社会,图书管理系统的构建是图书馆自动化、网络化的重要一环。本系统采用了一套经典的Java技术栈,即Struts2作为MVC框架、...
标题中的"spring2.5.6 +struts2.1.8+hiernate3.3.2(jar)"指的是一个集成开发环境,它包含了三个关键的Java Web框架:Spring 2.5.6、Struts 2.1.8和Hibernate 3.3.2。这些框架在Java应用开发中扮演着重要的角色,尤其...
这个集合包含了Spring 2.5.6、Struts2 2.1.6、Hibernate 3.3.2以及适用于MySQL的驱动程序,这些都是构建基于Java的动态Web应用程序的关键组件。 首先,Spring框架是Java企业级应用开发的核心框架,版本2.5.6。...
### Struts2.1.6 + Spring2.5.6 + Hibernate3.3.1 整合 在本文中,我们将详细介绍如何将Struts2.1.6、Spring2.5.6以及Hibernate3.3.1进行整合,并构建一个简单的MVC应用。通过这个过程,读者将了解到这三种框架的...
JQuery1.4.2+Struts2.1.8+JSON0.34+Spring2.5.6+Hibernate3.5+XFire1.2.6整合实例(已上传) 1、JSON0.34使用的是struts2附带的struts2-json-plugin-2.1.8.1.jar 2、db是mysql,名字为test,用户名root,密码空 3、...
"Jboss4.2.2+Spring2.5.6+Hibernate+JTA事务的实现"就是一个典型的例子,它涉及到四个关键的技术栈,即JBoss Application Server 4.2.2、Spring 2.5.6、Hibernate ORM以及Java Transaction API(JTA)。这些技术的...
标题 "spring2.5.6+hibernate3.3.2+struts2.1.8" 提供了一个经典的Java Web开发技术栈,这个组合被称为S2SH(Spring、Struts2和Hibernate)。这个版本的集成对于初学者来说是一个很好的起点,因为它包含了三个主要的...
总结来说,"struts 2+spring 2.5.6+ibatis2.3.4集成"是一个经典的Java Web开发组合,它利用各框架的优势,构建出稳定、高效的Web应用程序。这个集成方案在当时被广泛应用,并为许多开发者提供了构建复杂系统的基石。...