`

ActiveMQ5.0实战三:使用Spring发送,消费topic和queue消息

    博客分类:
  • java
阅读更多
ActiveMQ5.0实战一: 安装配置ActiveMQ5.0
ActiveMQ5.0实战二: 基本配置

简介

实战一 , 实战二 介绍了ActiveMQ的基本概念和配置方式.

本篇将通过一个实例介绍使用spring发送,消费topic, queue类型消息的方法. 不懂topic和queue的google 之.

 

如图示, TOPIC和QUEUE分别代表一个topic和一个queue消息通道.

  1. TopicMessageProducer向topic发送消息, TopicConsumerA和TopicConsumerB则从topic消费消息.
  2. QueueMessageProducer向Queue发送消息, QueueConsumer从Queue中消费消息

Spring整合JMS

就像对orm, web的支持一样, spring同样支持jms, 为整合jms到已有的项目提供了很多便利的方法. 本篇主要讲实战, 是所以先从配置开始, spring配置jms基本上需要8个部分.

  1. ConnectionFactory. 和jms服务器的连接, 可以是外部的jms server, 也可以使用embedded ActiveMQ Broker.
  2. Destination. 有topic和queue两种方式.
  3. JmsTemplate. spring提供的jms模板.
  4. MessageConverter. 消息转换器.
  5. MessageProducer. 消息生产者.
  6. MessageConsumer. 消息消费者.
  7. MessageListener. 消息监听器
  8. MessageListenerContainer. 消息监听容器

下面以实例的方式介绍上面8个部分.

1. ConnectionFactory

<amq:connectionFactory id="jmsConnectionFactory" brokerURL="vm://localhost" />

 brokerURL是指要连接的activeMQ server的地址, activeMQ提供了多种brokerURL, 集体可参见文档.一般我们使用嵌套的ActiveMQ server. 配置如下, 这个配置使用消息的存储机制, 服务器重启也不会丢失消息.

<!--  embedded ActiveMQ Broker -->
	<amq:broker useJmx="false" persistent="true">
		<amq:persistenceAdapter>
			<amq:amqPersistenceAdapter directory="d:/amq"/>
		</amq:persistenceAdapter>
		<amq:transportConnectors>
			<amq:transportConnector uri="tcp://localhost:61616" />
                       <amq:transportConnector uri="vm://localhost:0" />
		</amq:transportConnectors>
	</amq:broker>

 2. Destination

 在实例中我们使用了两种destination

<!--  ActiveMQ destinations  -->
<!--  使用topic方式-->
<amq:topic name="TOPIC" physicalName="JMS-TEST-TOPIC" />
<!--  使用Queue方式-->
<amq:queue name="QUEUE" physicalName="JMS-TEST-QUEUE" />

 3. JmsTemplate

<!--  Spring JmsTemplate config -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory">
			<!--  lets wrap in a pool to avoid creating a connection per send -->
			<bean class="org.springframework.jms.connection.SingleConnectionFactory">
				<property name="targetConnectionFactory" ref="jmsConnectionFactory" />
			</bean>
		</property>
		<!-- custom MessageConverter -->
		<property name="messageConverter" ref="defaultMessageConverter" />
	</bean>

  4. MessageConverter

   MessageConverter实现的是org.springframework.jms.support.converter.MessageConverter接口, 提供消息的转换功能. DefaultMessageConverter的实现见附件.

<bean id="defaultMessageConverter" class="com.andyao.activemq.DefaultMessageConverter" />

  5. MessageProducer

   实例拥有两个消息生产者, 消息生产者都是POJO, 实现见附件.

<!-- POJO which send Message uses  Spring JmsTemplate -->
	<bean id="topicMessageProducer" class="com.andyao.activemq.TopicMessageProducer">
		<property name="template" ref="jmsTemplate" />
		<property name="destination" ref="TOPIC" />
	</bean>
	<bean id="queueMessageProducer" class="com.andyao.activemq.QueuMessageProducer">
		<property name="template" ref="jmsTemplate" />
		<property name="destination" ref="QUEUE" />
	</bean>

 6. MessageConsumer

 TOPIC通道有两个消息消费者, QUEUE有一个消息消费者

<!--  Message Driven POJO (MDP) -->
    <!-- consumer1 for topic a -->
    <bean id="topicConsumerA" class="com.andyao.activemq.TopicConsumerA" />
    <!-- consumer2 for topic a -->
    <bean id="topicConsumerB" class="com.andyao.activemq.TopicConsumerB" />
    <!-- consumer for queue -->
    <bean id="queueConsumer" class="com.andyao.activemq.QueueConsumer" />

  7. MessageListener

每一个消息消费者都对应一个MessageListener

<bean id="topicListenerA" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
		<constructor-arg ref="topicConsumerA" />
		<!--  may be other method -->
		<property name="defaultListenerMethod" value="receive" />
		<!-- custom MessageConverter define -->
		<property name="messageConverter" ref="defaultMessageConverter" />
	</bean>

	<bean id="topicListenerB" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
		<constructor-arg ref="topicConsumerB" />
		<!--  may be other method -->
		<property name="defaultListenerMethod" value="receive" />
		<!-- custom MessageConverter define -->
		<property name="messageConverter" ref="defaultMessageConverter" />
	</bean>

    <bean id="queueListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
		<constructor-arg ref="queueConsumer" />
		<!--  may be other method -->
		<property name="defaultListenerMethod" value="receive" />
		<!-- custom MessageConverter define -->
		<property name="messageConverter" ref="defaultMessageConverter" />
	</bean>

 8. MessageListenerContainer

 有几个MessageListener既有几个MessageListenerContainer

<bean id="topicListenerContainerA" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="jmsConnectionFactory" />
		<property name="destination" ref="TOPIC" />
		<property name="messageListener" ref="topicListenerA" />
	</bean>

    <bean id="topicListenerContainerB" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="jmsConnectionFactory" />
		<property name="destination" ref="TOPIC" />
		<property name="messageListener" ref="topicListenerB" />
	</bean>
    
    <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="jmsConnectionFactory" />
		<property name="destination" ref="QUEUE" />
		<property name="messageListener" ref="queueListener" />
	</bean>

  Summary

写spring配置文件的时候, 要把MessageProducer, MessageConsumer,MessageListener,MessageListenerContainer几个地方弄清楚:

  1. 可以有一个或者多个消息生产者向同一个destination发送消息.
  2. queue类型的只能有一个消息消费者.
  3. topic类型的可以有多个消息消费者.
  4. 每个消费者对应一个MessageListener和一个MessageListenerContainer.

 

 

 

 

分享到:
评论
46 楼 y806839048 2014-10-20  
启动activemq我试了,也不行
45 楼 y806839048 2014-10-20  
是不是要另外启动activemq
44 楼 y806839048 2014-10-20  
为什么我的两个应用中,不能实现通信
43 楼 westboy172887564 2013-04-24  
Queue支持存在多个消费者,楼主是不是最后那块写的有问题?
42 楼 zh286091487 2012-02-28  
4年前的文章,2012才享受到,悲剧,写楼主了
41 楼 xiaxiaorui2003 2012-02-22  
我的这个GenericBeanFactoryAccessor也是找不到,spring和activemq的所有jar都导入了,还是找不到
40 楼 bluethink 2011-11-20  
TestMain里面的
org.springframework.beans.factory.generic.GenericBeanFactoryAccessor;

依赖的是哪个包,我现在用的是Spring 3.0.5,并且加载了spring的所有包,还是提示
The import org.springframework.beans.factory.generic cannot be resolved,我下载的是
xbean-spring-3.5.jar 仍然找不到这个类,麻烦楼主把所有需要的jar包给列出来,谢谢
39 楼 wmj2003 2009-08-17  

package com.work.activemq;

public interface OrderNotifyInterface {
	/**
	 *  通知用户订单已经发送!生成者接口
	 * @param order
	 */
	public void notifyOrder(Order order);
	
	/**
	 * 通过topic的方式发送信息
	 * @param order
	 */
	public void notifyTopic(Order order);
}

实现如下:
package com.work.activemq;

/**
 * 用来测试mq的。
 * @author wangmingjie
 *
 */
public class OrderNotifyImpl implements OrderNotifyInterface {
	private OrderMessageProducer orderMessageProducer;
	public void setOrderMessageProducer(OrderMessageProducer orderMessageProducer) {
		this.orderMessageProducer = orderMessageProducer;
	}
	
	private TopicMessageProducer  topicMessageProducer;

	public void setTopicMessageProducer(TopicMessageProducer topicMessageProducer) {
		this.topicMessageProducer = topicMessageProducer;
	}
	
	/**
	 * 向顾客的邮箱发送订单通知,使用JMS发送.
	 */
	public void notifyOrder(Order order) {
		orderMessageProducer.send(order);
	}
	

	public void notifyTopic(Order order){
		topicMessageProducer.send(order);
	}
}
38 楼 wmj2003 2009-08-17  
package com.work.activemq;


import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.HashMap;
import java.util.Map;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.jms.support.converter.MessageConverter;

/**
 * 订单消息转换类. 使用数据库持久化消息的时候使用。
 * <p>实现MessageConverter使得JMS的发送与接收者可以直接发送POJO而不是JMS Message.</p>
 *
 * @author wangmj
 * @see MessageConverter
 */

public class OrderMessageConverter implements MessageConverter {
	private static final Log log = LogFactory.getLog(OrderMessageConverter.class);
	/*
	 * (non-Javadoc)
	 *
	 * @see org.springframework.jms.support.converter.MessageConverter#toMessage(java.lang.Object,
	 *      javax.jms.Session)
	 */
	public Message toMessage(Object obj, Session session) throws JMSException {
		//check Type
		if (obj instanceof Order) {
			ActiveMQObjectMessage objMsg = (ActiveMQObjectMessage) session.createObjectMessage();
			   HashMap<String, byte[]> map = new HashMap<String, byte[]>();  
			try{
				// POJO must implements Seralizable
				ByteArrayOutputStream bos = new ByteArrayOutputStream();
				ObjectOutputStream oos = new ObjectOutputStream(bos);
				oos.writeObject(obj);
				map.put("Order", bos.toByteArray());
				objMsg.setObjectProperty("Map", map);
			} catch (IOException e) {
				log.error("toMessage(Object, Session)", e);
			}
			return objMsg;
		} else {
			throw new JMSException("Object:[" + obj + "] is not Order");
		}

	}

	/*
	 * (non-Javadoc)
	 *
	 * @see org.springframework.jms.support.converter.MessageConverter#fromMessage(javax.jms.Message)
	 */
	@SuppressWarnings("unchecked")
	public Object fromMessage(Message msg) throws JMSException {
		if (msg instanceof ObjectMessage) {
			 HashMap<String, byte[]> map = (HashMap<String, byte[]>) ((ObjectMessage) msg).getObjectProperty("Map"); 
			try {
				// POJO must implements Seralizable
				ByteArrayInputStream bis=new ByteArrayInputStream((byte[])map.get("Order") );
				ObjectInputStream ois = new ObjectInputStream(bis);
				Object returnObject = ois.readObject();
				return returnObject;
			} catch (IOException e) {
				log.error("fromMessage(Message)", e);

			} catch (ClassNotFoundException e) {
				log.error("fromMessage(Message)", e);
			}
			
			return null;
		} else {
			throw new JMSException("Msg:[" + msg + "] is not Map");
		}
	}

}


package com.work.activemq;

import javax.jms.Topic;

import org.springframework.jms.core.JmsTemplate;

/**
 * 奇怪啊,topic方式,消息被重复消费了。每次都消费两次。
 * 一旦更换为DefaultMessageConverter.java ,控制台就看不到打印信息了。<br>
 * 但是在activemq的监控界面,可以看到消息被消费了,而是web发送信息页面也提示成功。
 * 估计是和Serializable有关系。
 * @author wangmingjie
 * @date 2009-7-26上午11:21:32
 */
public class TopicMessageProducer {
    
    private JmsTemplate template;

	private Topic destination;

	public void setTemplate(JmsTemplate template) {
		this.template = template;
	}

	public void setDestination(Topic destination) {
		this.destination = destination;
	}

	/**
	 * 发送信息
	 * @param message
	 */
	public void send(Order message) {
		template.convertAndSend(this.destination, message);
	}
}


package com.work.activemq;
/**
 * @author wangmingjie
 * @date 2009-7-26上午11:27:00
 */

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 使用多线程接受消息
 */
public class TopicConsumerA {

	ExecutorService exec = Executors.newFixedThreadPool(10);

	public void receive(final Order message) {
		//System.out.println("线程名称"+Thread.currentThread().getName());
		exec.submit((new Runnable() {
			public void run() {
				//stem.out.println(Thread.currentThread().getName());
				System.out.println("**** Topic A : " + message.getId()+message.getName());
			}
		}));
	}

}


package com.work.activemq;
/**
 * @author wangmingjie
 * @date 2009-7-26上午11:27:05
 */
public class TopicConsumerB {

	public void receive(Order message) {
		System.out.println("**** Topic B : " + message.getId()+message.getName());
	}
}


37 楼 andyao 2009-08-17  
TO: wmj2003

把你的java代码也贴出来看看
36 楼 wmj2003 2009-07-27  
我的消息每次都会重复被接受,这是为什么呢?
结果如下
13:43:43,921 INFO  [STDOUT] ****************** Topic B : ID001BUG管理需求和设计.
doc
13:43:43,921 INFO  [STDOUT] ****************** Topic B : ID001BUG管理需求和设计.
doc
13:43:43,921 INFO  [STDOUT] ************** Topic A : ID001BUG管理需求和设计.doc
13:43:43,921 INFO  [STDOUT] ************** Topic A : ID001BUG管理需求和设计.doc

=====================spring配置文件如下============================
<beans
		xmlns="http://www.springframework.org/schema/beans"
		xmlns:amq="http://activemq.org/config/1.0"
		xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
		xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.org/config/1.0 http://activemq.apache.org/schema/core/activemq-core-5.0-SNAPSHOT.xsd" >
 <!-- 
    	推荐版本,使用spring的listenerContainer,消息用数据库持久化保存,服务器重启不会丢失
     -->

	<!--  embedded ActiveMQ Broker -->
	<amq:broker useJmx="false" persistent="true">
		<amq:persistenceAdapter>
			<amq:jdbcPersistenceAdapter id="jdbcAdapter" dataSource="#dataSource" createTablesOnStartup="true"
										useDatabaseLock="false"/>
			<!-- 
				Mysql can setup useDatabaseLock="true",this is defualt
				HSQLDB,MSSQL plz setup useDatabaseLock="false",
				if u setup useDatabaseLock="true",u will catch error:
				MSSQL Error Info:FOR UPDATE clause allowed only for DECLARE CURSOR 
				HSQLDB Error Info:FOR in statement [SELECT * FROM ACTIVEMQ_LOCK FOR UPDATE]

				see http://www.nabble.com/ActiveMQ-JDBC-Persistence-with-SQL-Server-tf2022248.html#a5560296
			-->
		</amq:persistenceAdapter>
		<amq:transportConnectors>
			<amq:transportConnector uri="tcp://localhost:0"/>
		</amq:transportConnectors>
	</amq:broker>
     <!-- 连接外部的activeMQ
	<amq:connectionFactory id="jmsConnectionFactory" brokerURL="tcp://localhost:61616?wireFormat.maxInactivityDuration=0" />
	  ActiveMQ connectionFactory  -->
	  
	<!--  ActiveMQ connectionFactory  连接内部的-->
	<amq:connectionFactory id="jmsConnectionFactory" brokerURL="vm://localhost"/>

	<!--  ActiveMQ destinations  使用Queue方式 -->
	<amq:queue name="destination" physicalName="org.apache.activemq.spring.Test.spring.embedded"/>
	<!--  使用topic方式-->
	<amq:topic name="TOPIC" physicalName="JMS-TEST-TOPIC" />

	<!-- The msSQL Datasource that will be used by the Broker
	<bean id="mssql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
		<property name="driverClassName" value="net.sourceforge.jtds.jdbc.Driver"/>
		<property name="url">
			<value>jdbc:jtds:sqlserver://localhost:1433/wmjqxgl;SelectMethod=cursor;charset=GBK;tds=8.0;lastupdatecount=true</value>
		</property>
		<property name="username" value="sa"/>
		<property name="password" value="sa"/>
		<property name="poolPreparedStatements" value="true"/>
	</bean>	
 	-->
	<!--  Spring JmsTemplate config -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory">
			<!--  lets wrap in a pool to avoid creating a connection per send -->
			<bean class="org.springframework.jms.connection.SingleConnectionFactory">
				<property name="targetConnectionFactory" ref="jmsConnectionFactory"/>
			</bean>
		</property>
		<!-- custom MessageConverter -->
		<property name="messageConverter" ref="orderMessageConverter"/>
	</bean>

	<!--  OrderMessage converter  -->
	<bean id="orderMessageConverter" class="com.work.activemq.OrderMessageConverter"/>

	<!-- POJO which send Message uses  Spring JmsTemplate -->
	<bean id="orderMessageProducer" class="com.work.activemq.OrderMessageProducer">
		<property name="template" ref="jmsTemplate"/>
		<property name="destination" ref="destination"/>
	</bean>
	<!-- topic 方式信息发送者 -->
	<bean id="topicMessageProducer" class="com.work.activemq.TopicMessageProducer">
		<property name="template" ref="jmsTemplate" />
		<property name="destination" ref="TOPIC" />
	</bean>
		
    <!-- consumer1 for topic a 消息消费者 -->
    <bean id="topicConsumerA" class="com.work.activemq.TopicConsumerA" />

    <!-- consumer2 for topic a -->
    <bean id="topicConsumerB" class="com.work.activemq.TopicConsumerB" />	
    
    
    <!-- Message Listener for  -->
	<bean id="topicListenerA" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
		<constructor-arg ref="topicConsumerA" />
		<!--  may be other method -->
		<property name="defaultListenerMethod" value="receive" />
		<!-- custom MessageConverter define -->
		<property name="messageConverter" ref="orderMessageConverter" />
	</bean>

	<bean id="topicListenerB" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
		<constructor-arg ref="topicConsumerB" />
		<!--  may be other method -->
		<property name="defaultListenerMethod" value="receive" />
		<!-- custom MessageConverter define -->
		<property name="messageConverter" ref="orderMessageConverter" />
	</bean>
    
	<!--  Message Driven POJO (MDP)  通过queue的方式发送消息,一个发送一个接收-->
	<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
		<constructor-arg>
			<bean class="com.work.activemq.OrderMessageConsumer">
				<!-- <property name="mailService" ref="mailService"/>  -->
			</bean>
		</constructor-arg>
		<!--  may be other method -->
		<property name="defaultListenerMethod" value="sendEmail"/>
		<!-- custom MessageConverter define -->
		<property name="messageConverter" ref="orderMessageConverter"/>
	</bean>

	<!--  listener container,MDP无需实现接口 -->
	<bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="jmsConnectionFactory"/>
		<property name="destination" ref="destination"/>
		<property name="messageListener" ref="messageListener"/>
	</bean>
	
	<!--  listener container,MDP无需实现接口 -->
	<bean id="topicListenerContainerA" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="jmsConnectionFactory" />
		<property name="destination" ref="TOPIC" />
		<property name="messageListener" ref="topicListenerA" />
	</bean>

    <bean id="topicListenerContainerB" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="jmsConnectionFactory" />
		<property name="destination" ref="TOPIC" />
		<property name="messageListener" ref="topicListenerB" />
	</bean>
	
	<bean id="orderNotify" class="com.work.activemq.OrderNotifyImpl">
		<property name="orderMessageProducer" ref="orderMessageProducer" />
		<property name="topicMessageProducer" ref="topicMessageProducer" />
	</bean>
	<!--  -->

	
</beans>

==================
35 楼 chinaway 2009-05-22  
谢谢博主,我的问题解决了,要想在http://127.0.0.1:8161/admin/queues.jsp中看到,必须把<!--  embedded ActiveMQ Broker --> 
    <amq:broker useJmx="false" persistent="true"> 
        <amq:persistenceAdapter> 
            <amq:amqPersistenceAdapter directory="d:/amq"/> 
        </amq:persistenceAdapter> 
        <amq:transportConnectors> 
            <amq:transportConnector uri="tcp://localhost:61616" /> 
                       <amq:transportConnector uri="vm://localhost:0" /> 
        </amq:transportConnectors> 
    </amq:broker>
这一段中的<amq:transportConnectors> 
            <amq:transportConnector uri="tcp://localhost:61616" /> 
                       <amq:transportConnector uri="vm://localhost:0" /> 
        </amq:transportConnectors>
注释掉,即使用外部的ActiveMQ服务
34 楼 chinaway 2009-05-22  
谢谢博主,可能是我运行的有问题,或者对ActiveMQ理解不够,只能看到消息发送、接收在控制台的输出,如果不单独启动ActiveMQ,连http://localhost:8161/admin都无法访问。
我的理解是Spring整合ActiveMQ后,运行TestMain后,ActiveMQ的broker会自动启动,但是为什么不能通过http://localhost:8161/admin访问。
33 楼 andyao 2009-05-21  
chinaway 写道
博主你好,
我怎么在http://127.0.0.1:8161/admin/topics.jsp中看不到JMS-TEST-TOPIC消息传递的效果呢,消息数量和名称等都看不到
附加本例子所需要的jar包
activemq-all-5.2.0.jar
commons-logging-1.1.jar
log4j-1.2.14.jar
spring2.5.5.jar
xbean-spring-2.8.jar(2.8以上)
jdk
jee

这些jar包都可以在activemq的解压包中找到


给个贴图看看
32 楼 chinaway 2009-05-20  
博主你好,
我怎么在http://127.0.0.1:8161/admin/topics.jsp中看不到JMS-TEST-TOPIC消息传递的效果呢,消息数量和名称等都看不到
附加本例子所需要的jar包
activemq-all-5.2.0.jar
commons-logging-1.1.jar
log4j-1.2.14.jar
spring2.5.5.jar
xbean-spring-2.8.jar(2.8以上)
jdk
jee

这些jar包都可以在activemq的解压包中找到
31 楼 creativity 2009-02-05  
谢谢楼主,已解决,原因是我的TestBean忘了序列化接口(我用了defaultMessageConvertor),在我加上LOG后才发现问题,实现了序列化接口就好了,谢谢!!
30 楼 andyao 2009-02-05  
http://127.0.0.1:8161/admin/这个界面中你可以查看queue和topic的信息列表

Name  topic或者queue的名字 
Number Of Pending Messages   pend消息的数目
Number Of Consumers   消费者的数目
Messages Sent   发送的消息数目
Messages Received  接收的消息数目

你要看有多少消息被接受了, 看Messages Received  条目
29 楼 creativity 2009-02-03  
楼主你好,我看了你的例子,完全可以跑通,我改了下,现有A,B2个web应用系统,都起在一个web服务器(tomcat)下,A应用作为producer,B应用consumer,现在可以看到A应用发的消息,但是在B应用怎么看到已经得到或没有得到这个消息呢?有些不明白怎么处理,我不知道我这样写对不对,请指教,配置如下:
A应用:applicationContext-activemq.xml

<amq:broker useJmx="false" persistent="true">
<amq:persistenceAdapter>
<amq:amqPersistenceAdapter directory="c:/amq"/>
</amq:persistenceAdapter>
<amq:transportConnectors>
<amq:transportConnector uri="tcp://localhost:0" />
</amq:transportConnectors>
</amq:broker>

<!-- 连接外部的activeMQ -->
<amq:connectionFactory id="jmsConnectionFactory" brokerURL="tcp://localhost:61616" />

<!--  ActiveMQ destinations  -->
<!--  使用topic方式-->
<amq:topic name="TOPIC" physicalName="JMS-TEST-TOPIC" />

<!--  Spring JmsTemplate config -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<!--  lets wrap in a pool to avoid creating a connection per send -->
<bean class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="jmsConnectionFactory" />
</bean>
</property>
<!-- custom MessageConverter -->
<property name="messageConverter" ref="defaultMessageConverter" />
</bean>

<!-- converter  -->
<bean id="defaultMessageConverter" class="com.me.jms.DefaultMessageConverter" />

<!-- POJO which send Message uses  Spring JmsTemplate -->
<bean id="topicMessageProducer" class="com.me.jms.TopicMessageProducer">
<property name="template" ref="jmsTemplate" />
<property name="destination" ref="TOPIC" />
</bean>
A应用producer
public class TopicMessageProducer {
   
    private JmsTemplate template;

private Topic destination;

public void setTemplate(JmsTemplate template) {
this.template = template;
}

public void setDestination(Topic destination) {
this.destination = destination;
}

public void send(TestBean message) {
System.out.println("=====> message send! ");
template.convertAndSend(this.destination, message);
}
}
A应用中的Action
public class TestAction extends ActionSupport{

private TestBean tb;
private TopicMessageProducer topicMessageProducer;

public String test(){
System.out.println("===> username:" + tb.getUserName());
System.out.println("===> username:" + tb.getPassWord());
topicMessageProducer.send(tb);

return "success";
}
...geter(),seter();
}

B应用applicationContext-activemq.xml
<amq:connectionFactory id="jmsConnectionFactory"
brokerURL="tcp://localhost:61616" />

<amq:topic name="TOPIC" physicalName="JMS-TEST-TOPIC" />

<!-- converter  -->
<bean id="defaultMessageConverter"
class="com.me.jms.DefaultMessageConverter" />

<!--  Message Driven POJO (MDP) -->
    <!-- consumer1 for topic a -->
    <bean id="topicConsumerA" class="com.me.jms.TopicConsumerA" />

    <!-- consumer2 for topic a -->
    <bean id="topicConsumerB" class="com.me.jms.TopicConsumerB" />
   
    <!-- Message Listener for  -->
<bean id="topicListenerA" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="topicConsumerA" />
<!--  may be other method -->
<property name="defaultListenerMethod" value="receive" />
<!-- custom MessageConverter define -->
<property name="messageConverter" ref="defaultMessageConverter" />
</bean>

<bean id="topicListenerB" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="topicConsumerB" />
<!--  may be other method -->
<property name="defaultListenerMethod" value="receive" />
<!-- custom MessageConverter define -->
<property name="messageConverter" ref="defaultMessageConverter" />
</bean>

<!--  listener container,MDP无需实现接口 -->
<bean id="topicListenerContainerA" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="destination" ref="TOPIC" />
<property name="messageListener" ref="topicListenerA" />
</bean>

    <bean id="topicListenerContainerB" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="destination" ref="TOPIC" />
<property name="messageListener" ref="topicListenerB" />
</bean>
B应用的consumerA和consumerB都一样
public class TopicConsumerA {
public void receive(TestBean message) {
System.out.println("************************************** Topic A userName: " + message.getUserName());
System.out.println("************************************** Topic A passWord: " + message.getPassWord());
}
}
目前是在tomcat控制台和http://127.0.0.1:8161/admin/中可以看消息已发送了,但是在tomcat控制台中和http://127.0.0.1:8161/admin/上看不到是否已经接受到了消息,我该如何查看或修改,谢谢!!!
28 楼 cys6736873 2008-10-31  
楼主能否给个 文档 链接或者下载,感激不尽啊
27 楼 cys6736873 2008-10-30  
andyao 写道
philyes 写道
A应用
<amq:broker useJmx="false" persistent="true">

<amq:persistenceAdapter>
<amq:amqPersistenceAdapter directory="d:/amq"/>
</amq:persistenceAdapter>

<amq:transportConnectors>
<amq:transportConnector uri="tcp://localhost:0" />
</amq:transportConnectors>
</amq:broker>

  
<amq:connectionFactory id="jmsConnectionFactory" brokerURL="vm://localhost" />

    <!--  使用Queue方式-->
    <amq:queue name="QUEUE" physicalName="JMS-TEST-QUEUE" />

<!--  Spring JmsTemplate config -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<!--  lets wrap in a pool to avoid creating a connection per send -->
<bean class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="jmsConnectionFactory" />
</bean>
</property>
<!-- custom MessageConverter -->
<property name="messageConverter" ref="defaultMessageConverter" />
</bean>

<!-- converter  -->
<bean id="defaultMessageConverter" class="com.andyao.activemq.DefaultMessageConverter" />

<bean id="queueMessageProducer" class="com.andyao.activemq.QueueMessageProducer">
<property name="template" ref="jmsTemplate" />
<property name="destination" ref="QUEUE" />
</bean>
B应用
<amq:connectionFactory id="jmsConnectionFactory" brokerURL="vm://localhost" />

<amq:queue name="QUEUE" physicalName="JMS-TEST-QUEUE" />
 
 
<bean id="queueConsumer" class="com.andyao.activemq.QueueConsumer" />
    <bean id="queueListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="queueConsumer" />
<!--  may be other method -->
<property name="defaultListenerMethod" value="receive" />
<!-- custom MessageConverter define -->
<property name="messageConverter" ref="defaultMessageConverter" />
</bean>

   <!-- converter -->
<bean id="defaultMessageConverter" class="com.andyao.activemq.DefaultMessageConverter" />
   
   
    <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="destination" ref="QUEUE" />
<property name="messageListener" ref="queueListener" />
</bean>
启动tomcat时,出现如下信息
信息: ActiveMQ JMS Message Broker (localhost, ID:6b184e6c407c476-1632-1221635612890-0:0) started(此应是是A应用 brokerID)
2008-9-17 15:13:34 org.springframework.web.context.ContextLoader initWebApplicationContext
......
信息: ActiveMQ JMS Message Broker (localhost, ID:6b184e6c407c476-1642-1221635632312-0:0) started
2008-9-17 15:13:53 org.apache.activemq.broker.TransportConnector start
信息: Connector vm://localhost Started(此应是是B应用 brokerID)
.......
我在通过页面得提交给一个servlet一个信息,如123,在servlet调用如下代码
String text = request.getParameter("text");
FooMessage foo = new FooMessage();
int msg = Integer.parseInt(text);
foo.setId(msg);
System.out.println("********start send*********");
WebApplicationContext   ctx=WebApplicationContextUtils.getRequiredWebApplicationContext(this.getServletContext());
QueueMessageProducer sender   =   (QueueMessageProducer)ctx.getBean( "queueMessageProducer");
sender.send(foo);

System.out.println("******end send ***********");
结果显示
********start send*********
queue send start
2008-9-17 15:14:16 org.apache.activemq.broker.TransportConnector start
信息: Connector vm://localhost Started
2008-9-17 15:14:16 org.springframework.jms.connection.SingleConnectionFactory initConnection
信息: Established shared JMS Connection: ActiveMQConnection {id=ID:6b184e6c407c476-1632-1221635612890-2:0,clientId=null,started=false}
queue send end
******end send ***********
B应用没有接收到,这是为什么,B应用的配置对吗,对的话为什么broker是两个不同的id,又为什么收不到A的消息呢,恳请楼主指点


A,B没有链接同一个activeMq

A 中改为
<amq:transportConnector uri="tcp://localhost:61616" />


B 中
<amq:connectionFactory id="jmsConnectionFactory" brokerURL="vm://localhost" />

改为
<amq:connectionFactory id="jmsConnectionFactory" brokerURL="tcp://localhost:61616" />



前面你也说到
transportConnector是定義鏈接activeMQ的brokerURL. 你可以為一個activeMQ server指定多個transportconnector, 比如vm, tcp, ssl, stomp, xmpp等.
那我A多定义几个,B是不是也有不同的brokerURL?那他们又是怎么链接同一个activeMq?

相关推荐

    activeMq5.0

    在学习和使用ActiveMQ 5.0时,你可能需要了解如何创建和配置消息代理、如何编写生产者和消费者代码、如何设置安全策略、如何使用管理工具监控系统状态等。通过实践这些基本操作,你可以深入理解消息中间件的工作原理...

    ActiveMQ5.0 监视的JSP支持中文

    **标题:“ActiveMQ5.0 监视的JSP支持中文”** ActiveMQ是Apache软件基金会的一个开源项目,它是一个功能强大的消息中间件,广泛应用于分布式系统中的异步通信。在ActiveMQ 5.0版本中,对于中文支持的改进是一项...

    Apache ActiveMQ Queue Topic 详解

    ### Apache ActiveMQ Queue & Topic 详解 #### 一、特性及优势 Apache ActiveMQ 是一款高性能、功能丰富的消息中间件,具有以下显著特点: 1. **实现 JMS 1.1 规范**:支持 J2EE 1.4 及以上版本,这意味着它可以...

    activeMq 实战

    - 提供事务性上下文,使得一组消息的发送和接收操作可以作为单一的原子操作来执行。 ##### 1.4 目的地 (Destination) **目的地** 是客户端用来指定消息的发送目标和接收源的对象。根据 JMS 1.0.2 规范,有两种消息...

    ActiveMQ整合Spring(多消费者)

    同时,也需要配置目的地(Topic或Queue),可以使用`jms:topic`或`jms:queue`标签。 3. **消息生产者**: 生产者负责将消息放入消息队列。在Spring中,可以使用`JmsTemplate`的`send`方法来发送消息。需要指定目的...

    activemq与spring整合源代码

    5. 创建消息消费者:设置一个监听器容器,监听特定的队列或主题,当有新消息到达时,调用预先定义好的MessageListener进行处理。 四、实例分析 在提供的"mq"文件中,可能包含了一个示例项目,该项目展示了如何在...

    spring使用activeMQ实现消息发送

    5. **队列和主题的区别**:在ActiveMQ中,消息可以发送到队列(Queue)或主题(Topic)。队列遵循一对一模型,每个消息仅被一个消费者接收;主题遵循一对多模型,一个消息可以被多个订阅者接收。在上述示例中,我们...

    springboot2整合activemq的demo内含queue消息和topic消息

    - 在本地运行这个demo,首先确保安装并启动了ActiveMQ服务器,然后修改配置,运行Spring Boot应用,测试消息发送和接收功能。 - 部署时,可能需要考虑ActiveMQ集群、持久化存储、安全性等因素。 通过深入理解和...

    Spring平台整合消息队列ActiveMQ实现发布订阅、生产者消费者模型(适合新手或者开发人员了解学习ActiveMQ机制)

    本项目基于Spring这一平台,整合流行的开源消息队列中间件ActiveMQ,实现一个向ActiveMQ添加和读取消息的功能。并比较了两种模式:生产者-消费者模式和发布-订阅模式的区别。 包含的特性如下: 1.开启activeMQ,访问...

    Spring和ActiveMQ的整合实例源码

    5. **消息消费者(Consumer)**:Spring提供两种消费消息的方式:基于监听器的容器和基于回调的方法。前者通过实现`MessageListener`接口并在`@JmsListener`注解的回调方法中处理消息。后者则使用`JmsTemplate`的`...

    ActiveMQ-P2P文本消息+Spring和ActiveMQ的整合实例源码

    1. **Spring JMS**: Spring 提供了 JmsTemplate 类,它是发送和接收 JMS 消息的主要工具。通过配置,JmsTemplate 可以连接到 ActiveMQ 并进行消息操作。 2. **ConnectionFactory**: 这是 JMS 规范中的一个接口,...

    activemq_spring.rar_Spring和ActiveMQ_spring_消息中间件_消息发布订阅_消息订阅

    3. **配置Spring的JMS模板**:Spring的JmsTemplate是用于发送和接收消息的主要工具,我们需要在Spring配置文件中定义并配置它。 4. **创建消息生产者**:使用JmsTemplate的send方法可以发布消息到特定的队列或主题...

    activemq与spring整合发送jms消息入门实例

    3. **创建消息模板**:Spring的`JmsTemplate`是发送和接收JMS消息的核心工具。在配置文件中创建一个`JmsTemplate`的bean,并注入连接工厂。 ```xml &lt;bean id="jmsTemplate" class="org.springframework.jms.core....

    activemq 虚拟topic与路由功能

    在ActiveMQ消息中间件中,为了更好地实现消息的分发和管理,引入了虚拟Topic(Virtual Topic)的概念。本文将深入探讨虚拟Topic及路由功能,帮助读者理解如何利用这些特性来优化消息传递机制。 #### 一、虚拟Topic...

    SpringBoot+ActiveMq+MQTT实现消息的发送和接收

    4. 定义消息消费者:创建一个监听特定主题的MessageListener。当有消息到达时,这个监听器会被触发并处理消息。 5. 编写消息发送接口:为了将消息发送逻辑封装起来,可以创建一个服务类,提供发送消息的方法,这些...

    SpringActiveMQ.rar

    4. **消息消费者**:使用Spring的监听容器(如`DefaultMessageListenerContainer`),我们可以创建一个消息监听器来处理接收到的消息。监听器通常是一个实现了`MessageListener`接口的类,其中的`onMessage`方法会在...

    spring 整合 activemq 生产者和消费者 案例源码

    根据需求,配置相应的`Queue`或`Topic`,这将作为生产者发送消息和消费者接收消息的目标。 5. **配置MessageProducer和MessageConsumer**:在Spring配置中,定义`JmsTemplate`作为生产者,它可以发送消息到定义的...

    spring集成activemq演示queue和topic 持久化

    在本示例中,我们将深入探讨如何将Spring框架与ActiveMQ集成,以便实现消息队列(Queue)和主题(Topic)的功能,并确保消息的持久化。ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它支持多种消息协议,如...

    spring 与ACTIVEMQ整合

    2. **创建消息消费者**:创建一个实现了`MessageListener`接口的Bean,重写`onMessage`方法,当接收到消息时执行相应的业务逻辑。 3. **注解方式消费**:使用`@JmsListener`注解,直接在方法上声明消息监听,简化...

Global site tag (gtag.js) - Google Analytics