论坛首页 Java企业应用论坛

ActiveMQ5.0实战三:使用Spring发送,消费topic和queue消息

浏览 51733 次
该帖已经被评为良好帖
作者 正文
   发表时间:2008-08-28  
ActiveMQ5.0实战一: 安装配置ActiveMQ5.0
ActiveMQ5.0实战二: 基本配置

简介

实战一 , 实战二 介绍了ActiveMQ的基本概念和配置方式.

本篇将通过一个实例介绍使用spring发送,消费topic, queue类型消息的方法. 不懂topic和queue的google 之.

 

如图示, TOPIC和QUEUE分别代表一个topic和一个queue消息通道.

  1. TopicMessageProducer向topic发送消息, TopicConsumerA和TopicConsumerB则从topic消费消息.
  2. QueueMessageProducer向Queue发送消息, QueueConsumer从Queue中消费消息

Spring整合JMS

就像对orm, web的支持一样, spring同样支持jms, 为整合jms到已有的项目提供了很多便利的方法. 本篇主要讲实战, 是所以先从配置开始, spring配置jms基本上需要8个部分.

  1. ConnectionFactory. 和jms服务器的连接, 可以是外部的jms server, 也可以使用embedded ActiveMQ Broker.
  2. Destination. 有topic和queue两种方式.
  3. JmsTemplate. spring提供的jms模板.
  4. MessageConverter. 消息转换器.
  5. MessageProducer. 消息生产者.
  6. MessageConsumer. 消息消费者.
  7. MessageListener. 消息监听器
  8. MessageListenerContainer. 消息监听容器

下面以实例的方式介绍上面8个部分.

1. ConnectionFactory

<amq:connectionFactory id="jmsConnectionFactory" brokerURL="vm://localhost" />

 brokerURL是指要连接的activeMQ server的地址, activeMQ提供了多种brokerURL, 集体可参见文档.一般我们使用嵌套的ActiveMQ server. 配置如下, 这个配置使用消息的存储机制, 服务器重启也不会丢失消息.

<!--  embedded ActiveMQ Broker -->
	<amq:broker useJmx="false" persistent="true">
		<amq:persistenceAdapter>
			<amq:amqPersistenceAdapter directory="d:/amq"/>
		</amq:persistenceAdapter>
		<amq:transportConnectors>
			<amq:transportConnector uri="tcp://localhost:61616" />
                       <amq:transportConnector uri="vm://localhost:0" />
		</amq:transportConnectors>
	</amq:broker>

 2. Destination

 在实例中我们使用了两种destination

<!--  ActiveMQ destinations  -->
<!--  使用topic方式-->
<amq:topic name="TOPIC" physicalName="JMS-TEST-TOPIC" />
<!--  使用Queue方式-->
<amq:queue name="QUEUE" physicalName="JMS-TEST-QUEUE" />

 3. JmsTemplate

<!--  Spring JmsTemplate config -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory">
			<!--  lets wrap in a pool to avoid creating a connection per send -->
			<bean class="org.springframework.jms.connection.SingleConnectionFactory">
				<property name="targetConnectionFactory" ref="jmsConnectionFactory" />
			</bean>
		</property>
		<!-- custom MessageConverter -->
		<property name="messageConverter" ref="defaultMessageConverter" />
	</bean>

  4. MessageConverter

   MessageConverter实现的是org.springframework.jms.support.converter.MessageConverter接口, 提供消息的转换功能. DefaultMessageConverter的实现见附件.

<bean id="defaultMessageConverter" class="com.andyao.activemq.DefaultMessageConverter" />

  5. MessageProducer

   实例拥有两个消息生产者, 消息生产者都是POJO, 实现见附件.

<!-- POJO which send Message uses  Spring JmsTemplate -->
	<bean id="topicMessageProducer" class="com.andyao.activemq.TopicMessageProducer">
		<property name="template" ref="jmsTemplate" />
		<property name="destination" ref="TOPIC" />
	</bean>
	<bean id="queueMessageProducer" class="com.andyao.activemq.QueuMessageProducer">
		<property name="template" ref="jmsTemplate" />
		<property name="destination" ref="QUEUE" />
	</bean>

 6. MessageConsumer

 TOPIC通道有两个消息消费者, QUEUE有一个消息消费者

<!--  Message Driven POJO (MDP) -->
    <!-- consumer1 for topic a -->
    <bean id="topicConsumerA" class="com.andyao.activemq.TopicConsumerA" />
    <!-- consumer2 for topic a -->
    <bean id="topicConsumerB" class="com.andyao.activemq.TopicConsumerB" />
    <!-- consumer for queue -->
    <bean id="queueConsumer" class="com.andyao.activemq.QueueConsumer" />

  7. MessageListener

每一个消息消费者都对应一个MessageListener

<bean id="topicListenerA" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
		<constructor-arg ref="topicConsumerA" />
		<!--  may be other method -->
		<property name="defaultListenerMethod" value="receive" />
		<!-- custom MessageConverter define -->
		<property name="messageConverter" ref="defaultMessageConverter" />
	</bean>

	<bean id="topicListenerB" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
		<constructor-arg ref="topicConsumerB" />
		<!--  may be other method -->
		<property name="defaultListenerMethod" value="receive" />
		<!-- custom MessageConverter define -->
		<property name="messageConverter" ref="defaultMessageConverter" />
	</bean>

    <bean id="queueListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
		<constructor-arg ref="queueConsumer" />
		<!--  may be other method -->
		<property name="defaultListenerMethod" value="receive" />
		<!-- custom MessageConverter define -->
		<property name="messageConverter" ref="defaultMessageConverter" />
	</bean>

 8. MessageListenerContainer

 有几个MessageListener既有几个MessageListenerContainer

<bean id="topicListenerContainerA" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="jmsConnectionFactory" />
		<property name="destination" ref="TOPIC" />
		<property name="messageListener" ref="topicListenerA" />
	</bean>

    <bean id="topicListenerContainerB" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="jmsConnectionFactory" />
		<property name="destination" ref="TOPIC" />
		<property name="messageListener" ref="topicListenerB" />
	</bean>
    
    <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="jmsConnectionFactory" />
		<property name="destination" ref="QUEUE" />
		<property name="messageListener" ref="queueListener" />
	</bean>

  Summary

写spring配置文件的时候, 要把MessageProducer, MessageConsumer,MessageListener,MessageListenerContainer几个地方弄清楚:

  1. 可以有一个或者多个消息生产者向同一个destination发送消息.
  2. queue类型的只能有一个消息消费者.
  3. topic类型的可以有多个消息消费者.
  4. 每个消费者对应一个MessageListener和一个MessageListenerContainer.

 

 

 

 

   发表时间:2008-08-28  
不知道楼主有没有测试过:failover的情况。
例如分别有两个系统:A,B(他们都使用外部的activeMQ服务启动),采用的是queue的方式。现在A系统通过发送对象存储到数据库中,测试数据1万条记录插入。中途B系统activeMQ中断。查询数据库有数据丢失,然后再次启动B服务的activeMQ。这个时候再查数据匹配不到
0 请登录后投票
   发表时间:2008-08-28  
1,2找不到呀
0 请登录后投票
   发表时间:2008-08-29  
yeshucheng 写道
不知道楼主有没有测试过:failover的情况。
例如分别有两个系统:A,B(他们都使用外部的activeMQ服务启动),采用的是queue的方式。现在A系统通过发送对象存储到数据库中,测试数据1万条记录插入。中途B系统activeMQ中断。查询数据库有数据丢失,然后再次启动B服务的activeMQ。这个时候再查数据匹配不到


failover不是这篇帖子的主题. ActiveMQ对cluster支持很好. 如果你需要failover, 那么你可以参考文档http://activemq.apache.org/clustering.html

你的这种情况是猜测是由于消息没有持久化保存而造成的, 请参考ActiveMQ5.0实战二: 基本配置中的消息持久化配置
0 请登录后投票
   发表时间:2008-08-29  
cats_tiger 写道
1,2找不到呀

已经修复了.
0 请登录后投票
   发表时间:2008-08-29  

对于

<amq:transportConnectors>  
            <amq:transportConnector uri="tcp://localhost:0" />  
</amq:transportConnectors>

 这段配置还是有点不了解。我在测试的时候把这段配置删除了,同样可以测试通过。我的疑问是:既然有了connectionFactory的属性配置来构建broker,为什么还需要它?可能对于这里的: transportConnectors,包括networkConnectors还是没有很好的理解?

0 请登录后投票
   发表时间:2008-08-29  
yeshucheng 写道

对于

<amq:transportConnectors>  
            <amq:transportConnector uri="tcp://localhost:0" />  
</amq:transportConnectors>

 这段配置还是有点不了解。我在测试的时候把这段配置删除了,同样可以测试通过。我的疑问是:既然有了connectionFactory的属性配置来构建broker,为什么还需要它?可能对于这里的: transportConnectors,包括networkConnectors还是没有很好的理解?

transportConnector是定義鏈接activeMQ的brokerURL. 你可以為一個activeMQ server指定多個transportconnector, 比如vm, tcp, ssl, stomp, xmpp等.

 

brokerURL不是由connectionFactory構建的, connectionFactory中指定的brokerUrl是指要連接該url的ActiveMQ

0 请登录后投票
   发表时间:2008-08-29  
andyao 写道
yeshucheng 写道

对于

<amq:transportConnectors>  
            <amq:transportConnector uri="tcp://localhost:0" />  
</amq:transportConnectors>

 这段配置还是有点不了解。我在测试的时候把这段配置删除了,同样可以测试通过。我的疑问是:既然有了connectionFactory的属性配置来构建broker,为什么还需要它?可能对于这里的: transportConnectors,包括networkConnectors还是没有很好的理解?

transportConnector是定義鏈接activeMQ的brokerURL. 你可以為一個activeMQ server指定多個transportconnector, 比如vm, tcp, ssl, stomp, xmpp等.

 

brokerURL不是由connectionFactory構建的, connectionFactory中指定的brokerUrl是指要連接該url的ActiveMQ


谢谢你的回复,如果定义多个transportconnector,假设设置两个:vm,tcp会有何种情况?

0 请登录后投票
   发表时间:2008-08-29  
yeshucheng 写道
andyao 写道
yeshucheng 写道

 对于

<amq:transportConnectors>  
            <amq:transportConnector uri="tcp://localhost:0" />  
</amq:transportConnectors>

 这段配置还是有点不了解。我在测试的时候把这段配置删除了,同样可以测试通过。我的疑问是:既然有了connectionFactory的属性配置来构建broker,为什么还需要它?可能对于这里的: transportConnectors,包括networkConnectors还是没有很好的理解?

transportConnector是定義鏈接activeMQ的brokerURL. 你可以為一個activeMQ server指定多個transportconnector, 比如vm, tcp, ssl, stomp, xmpp等.

 

brokerURL不是由connectionFactory構建的, connectionFactory中指定的brokerUrl是指要連接該url的ActiveMQ


谢谢你的回复,如果定义多个transportconnector,假设设置两个:vm,tcp会有何种情况?

vm是默认的, 只有一个,

定义多个transport就是说你可以通过多种方式连接到ActiveMQ server.

如果你定义了一个vm, 一个tcp

那么在同一个jvm内 你可以使用vm://localhost连接到server.

不同的jvm可以使用tcp://yourip 连接到server

0 请登录后投票
   发表时间:2008-09-15  
楼主
TestMain.java里面的
ListableBeanFactory lsb = new ClassPathXmlApplicationContext("classpath:com/andyao/activemq/applicationContext-activemq.xml");
怎么执行不过去啊 ??? 代码执行到这里就不能继续向下走了,奇怪得是也没报错,这是为什么的?
0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics