`
lhx1026
  • 浏览: 306766 次
  • 性别: Icon_minigender_2
  • 来自: 广州
社区版块
存档分类
最新评论

spring JMS、activemq中消费者收不到生产者发送的消息的原因解析

阅读更多

    我们使用jms一般是使用spring-jms和activemq相结合,通过spring的JmsTemplate发送消息到指定的Destination。

 

    首先定义一个activemq的连接池:

 

	<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
		destroy-method="stop">
		<property name="connectionFactory">
			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
				<property name="brokerURL"
					value="failover:(tcp://192.168.20.23:61616?wireFormat.maxInactivityDuration=0)&amp;maxReconnectDelay=1000" />
			</bean>
		</property>
		<property name="maxConnections" value="1"></property>
	</bean>

 

定义jmsTempalte的实例:

 

	<bean id="oamTmpTopic" class="org.apache.activemq.command.ActiveMQTopic">
		<constructor-arg value="oamTmpTopic" />
	</bean>

	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="connectionFactory" />
		<property name="defaultDestination" ref="oamTmpTopic" />
		<property name="explicitQosEnabled" value="true" />
		<property name="deliveryMode" value="1" />
	</bean>
 

定义生产者SendMessage.java:

 

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class SendMessage {

	private JmsTemplate jmsTemplate;

	private String topicName;

	private Topic topic;

	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}

	public void setTopicName(String topicName) {
		this.topicName = topicName;
	}

	public void sendMessage(final String message) {

		try {
			if (topic == null) {
				topic = jmsTemplate.getConnectionFactory().createConnection()
						.createSession(false, Session.AUTO_ACKNOWLEDGE)
						.createTopic(topicName);
			}
			jmsTemplate.send(topic,new MessageCreator() {

				@Override
				public Message createMessage(Session session)
						throws JMSException {

					TextMessage textMessage = session
							.createTextMessage(message);
					return textMessage;
				}
			});
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
}
 

定义消费者TestListener.java:

 

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;

public class TestListener implements MessageListener{
	
	private JmsTemplate jmsTemplate;
	
	private String topicName;
	
	public TestListener(JmsTemplate jmsTemplate,String topicName){
		
		this.jmsTemplate = jmsTemplate;
		
		this.topicName = topicName;
		
		Topic topic;
		try {
			topic = this.jmsTemplate.getConnectionFactory().createConnection().createSession(false,
					Session.AUTO_ACKNOWLEDGE).createTopic(this.topicName);
			
			DefaultMessageListenerContainer dmc = new DefaultMessageListenerContainer();
			dmc.setPubSubDomain(true);
			dmc.setDestination(topic);
			dmc.setConnectionFactory(this.jmsTemplate.getConnectionFactory());
			dmc.setPubSubNoLocal(true);
			dmc.setMessageListener(this);
			dmc.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
			dmc.initialize();
			dmc.start();
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

	@Override
	public void onMessage(Message message) {
		
		System.out.println(message);
	}

}

 

然后在spring的配置文件中定义相关的bean:

 

	<bean id="testListener" class="net.kentop.test.jms.TestListener">
	<constructor-arg ref="jmsTemplate"></constructor-arg>
	<constructor-arg value="testTopic"></constructor-arg>
	</bean>
	
	<bean id="sendMessage" class="net.kentop.test.jms.SendMessage">
	<property name="jmsTemplate" ref="jmsTemplate"></property>
	<property name="topicName" value="testTopic"></property>
	</bean>

 

编写测试代码BeanTest.java:

 

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class BeanTest {

	public static ApplicationContext context = new ClassPathXmlApplicationContext("infrastructure-config.xml");
	
	public static void main(String args[]){
		
		SendMessage sendMessage = (SendMessage) context.getBean("sendMessage");
		
		sendMessage.sendMessage("hahahha,我来测试了");
		sendMessage.sendMessage("dfsdfsfsdfsdfsdf");
		sendMessage.sendMessage("come on baby!");
		sendMessage.sendMessage("hahahha,我来测试了2");
		sendMessage.sendMessage("dfsdfsfsdfsdfsdf2");
		sendMessage.sendMessage("come on baby!2");
		sendMessage.sendMessage("hahahha,我来测试了3");
		sendMessage.sendMessage("dfsdfsfsdfsdfsdf3");
		sendMessage.sendMessage("come on baby!3");
		sendMessage.sendMessage("hahahha,我来测试了4");
		sendMessage.sendMessage("dfsdfsfsdfsdfsdf4");
		sendMessage.sendMessage("come on baby!4");
	}
}
 

    但是这个时候会发现,消费者是无法接收到消费者消息的。因为我们在定义消费者时,定义了以下的代码:

 

			DefaultMessageListenerContainer dmc = new DefaultMessageListenerContainer();
			dmc.setPubSubDomain(true);
			dmc.setDestination(topic);
			dmc.setConnectionFactory(this.jmsTemplate2.getConnectionFactory());	
                        dmc.setPubSubNoLocal(true); 			
                        dmc.setMessageListener(this);
			dmc.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
			dmc.initialize();
			dmc.start();
 

   上面的代码中的:

 

dmc.setPubSubNoLocal(true);

 

    当设置pubSubNoLocal为true时,消费者不会接收来自同一个连接的消息。因为我们在上面的配置文件中定义了连接池的最大连接数为1,因此每次使用的连接都是同一个连接,所以就消费者就接收不到消息。只有当pubSubNoLocal为false时,消费者才能接收到来自同一个连接的消息。

 

    当然,也可以设置连接池的最大连接数为多个,比如为10,这样就可能不会每次都是用同一个连接,消费者也可以接收到消息。但是这样的话,不是每个消息都可以接收到,因为这样的话不排除有时候消费者和生产者有使用同一个连接的可能。如果一定要设置pubSubNoLocal为true的话,那么就必须要使用不同的连接。

 

    在这里也要注意的是:

 

dmc.setPubSubDomain(true);

 

    当消费者要接收topic的消息时,pubSubDomain必须设置为true。当消费者要接收queue的消失时,pubSubDomain必须设置为false。

 

    当然也可以使用两个不同的连接,一个连接被生产者使用,另外一个连接被消费者使用。这样的话,即使设置:

 

dmc.setPubSubNoLocal(true);

 

    pubSubNoLocal为true,消费者也可以接收到消息。

 

    比如,我们再增加一个activemq的连接池,这个连接池的最大连接数为1。

 

		<bean id="connectionFactory2" class="org.apache.activemq.pool.PooledConnectionFactory"
		destroy-method="stop">
		<property name="connectionFactory">
			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
				<property name="brokerURL"
					value="failover:(tcp://192.168.20.23:61616?wireFormat.maxInactivityDuration=0)&amp;maxReconnectDelay=1000" />
			</bean>
		</property>
		<property name="maxConnections" value="1"></property>
	</bean>
 

再定义一个使用该连接池的JmsTemplate:

 

		<bean id="jmsTemplate2" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="connectionFactory2" />
		<property name="defaultDestination" ref="oamTmpTopic" />
		<property name="explicitQosEnabled" value="true" />
		<property name="deliveryMode" value="1" />
	</bean>
 

修改一下消费者,让消费者使用第二个连接池来接收消息:

 

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;

public class TestListener implements MessageListener{
	
	private JmsTemplate jmsTemplate;
	
	private JmsTemplate jmsTemplate2;
	
	private String topicName;
	
	public TestListener(JmsTemplate jmsTemplate,String topicName,JmsTemplate jmsTemplate2){
		
		this.jmsTemplate = jmsTemplate;
		
		this.topicName = topicName;
		
		this.jmsTemplate2 = jmsTemplate2;
		
		Topic topic;
		try {
			topic = this.jmsTemplate.getConnectionFactory().createConnection().createSession(false,
					Session.AUTO_ACKNOWLEDGE).createTopic(this.topicName);
			
			DefaultMessageListenerContainer dmc = new DefaultMessageListenerContainer();
			dmc.setPubSubDomain(true);
			dmc.setDestination(topic);
			dmc.setConnectionFactory(this.jmsTemplate2.getConnectionFactory());
			dmc.setPubSubNoLocal(true);
			dmc.setMessageListener(this);
			dmc.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
			dmc.initialize();
			dmc.start();
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

	@Override
	public void onMessage(Message message) {
		
		System.out.println(message);
	}

}

 

    修改相关的bean定义:

 

	<bean id="testListener" class="net.kentop.test.jms.TestListener">
	<constructor-arg ref="jmsTemplate"></constructor-arg>
	<constructor-arg value="testTopic"></constructor-arg>
	<constructor-arg ref="jmsTemplate2"></constructor-arg>
	</bean>

 

    这样的话,启动测试程序,即使pubSubNoLocal为true,但是因为消费者和生产者使用的不是同一个连接,所以消费者可以接收到生产者的消息。

0
9
分享到:
评论
2 楼 lhx1026 2011-02-10  
kimmking 写道
1、参数都写配置文件里不就完了

2、很少见过同一个连接的场景

3、只有一个server,failover没用


1、参数是可以写在配置文件里面,不过有时候是需要写在代码中。
2、我现在写的一个项目就是使用同一个连接的呢。
1 楼 kimmking 2011-01-25  
1、参数都写配置文件里不就完了

2、很少见过同一个连接的场景

3、只有一个server,failover没用

相关推荐

    spring 整合 activemq 生产者和消费者 案例源码

    Spring整合ActiveMQ是Java消息服务(JMS)在Spring框架中的应用,用于实现生产者与消费者的解耦。在这个案例中,我们将深入探讨如何配置和使用这两个组件,以便于理解它们的工作原理。 首先,ActiveMQ是Apache软件...

    jms Spring+ActiveMQ 5.4.2

    Spring的JMS模块允许开发者声明式地配置消息生产者(发送者)和消费者(接收者),并且支持多种JMS供应商,如ActiveMQ、RabbitMQ和Apache Qpid等。通过使用Spring的`JmsTemplate`类,我们可以方便地发送和接收消息,...

    spring使用activeMQ实现消息发送

    4. **消息消费者(Consumer)**:`consumer`目录中的代码则负责接收消息。这通常涉及到定义一个消息监听器接口,比如`MessageListener`,并在其中实现`onMessage(Message message)`方法。Spring会自动调用这个方法来...

    JMS之Spring +activeMQ实现消息队列

    "JMS之Spring + ActiveMQ实现消息队列"涉及到的关键知识点包括:Spring框架的JMS支持、ActiveMQ的使用、ConnectionFactory的配置、JmsTemplate和MessageListener的实现,以及消息队列在解决系统解耦和异步处理中的...

    Spring+ActiveMQ消息队列+前台接收消息

    2. **创建消息生产者**:在Spring中,你可以使用`JmsTemplate`作为消息生产者,发送消息到ActiveMQ的队列或主题。配置`JmsTemplate`并设置ActiveMQ的连接工厂,然后在需要发送消息的地方调用其`convertAndSend`方法...

    Spring和ActiveMQ整合的完整实例

    3. **编写消息生产者**:在Spring应用中,使用JmsTemplate发送消息到ActiveMQ的队列或主题。可以使用convertAndSend方法,该方法会自动将对象转换为适合的消息类型。 4. **编写消息消费者**:创建实现了Message...

    Spring 实现远程访问详解——jms和activemq

    把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑。 JMS 支持两种消息传递模型: 点对点(point-to-point,简称 PTP)...

    ActiveMQ整合Spring(多消费者)

    4. **消息消费者**: 消费者从队列中接收消息。Spring提供了两种方式实现:监听器容器(`DefaultMessageListenerContainer`)和基于注解的消费(`@JmsListener`)。监听器容器可以配置多个消费者,实现多线程并发...

    spring+jms+activemq

    6. 集成测试:编写测试用例,确保消息能够正确地从生产者发送到消费者,并验证消息的内容和类型。 在实际开发中,我们可能还需要考虑消息的事务性、消息的持久化、错误处理和重试机制等高级特性。例如,Spring的`...

    Spring和ActiveMQ的整合实例源码

    5. **消息消费者(Consumer)**:Spring提供两种消费消息的方式:基于监听器的容器和基于回调的方法。前者通过实现`MessageListener`接口并在`@JmsListener`注解的回调方法中处理消息。后者则使用`JmsTemplate`的`...

    ActiveMQ-P2P文本消息+Spring和ActiveMQ的整合实例源码

    在这种模式下,消息生产者发送消息到队列,而消息消费者从队列中接收消息。队列具有先进先出(FIFO)的特性,即每个消息只能被一个消费者消费一次,一旦被消费,消息就会从队列中移除。 **Spring 和 ActiveMQ 的...

    spring配置activemq详解

    - `activemq-consumer.xml`通常包含消息消费者的配置,例如定义MessageListenerContainer,它监听特定的Queue或Topic,并通过实现MessageListener接口处理接收到的消息。 - `activemq-produce.xml`则包含了消息...

    spring整合JMS-居于ActiveMQ实现

    在JMS集成方面,Spring提供了`org.springframework.jms`包,用于简化消息生产者和消费者的创建与配置。 Java消息服务(JMS)是一种API,它定义了应用程序如何创建、发送、接收和读取消息。JMS允许应用程序在不关心...

    自己实现的 ActiveMQ 多线程客户端 包含生产消息客户端和消费者消息客户端

    - **生产者(Producer)**:负责创建和发送消息到消息队列的客户端。 - **消费者(Consumer)**:从消息队列中接收并处理消息的客户端。 2. **多线程技术**: - **生产者多线程**:Amq_Producer_mt.cpp 文件可能...

    spring整合Activemq源码

    - **消息生产者**:在SpringMVC的控制器中,我们可以创建一个方法,利用JmsTemplate发送消息到ActiveMQ的队列或主题。 - **消息消费者**:创建一个实现了MessageListener接口的类,Spring会自动将这个类注册为消息...

    spring 与ACTIVEMQ整合

    2. **创建消息消费者**:创建一个实现了`MessageListener`接口的Bean,重写`onMessage`方法,当接收到消息时执行相应的业务逻辑。 3. **注解方式消费**:使用`@JmsListener`注解,直接在方法上声明消息监听,简化...

    SpringBoot+ActiveMq+MQTT实现消息的发送和接收

    4. 定义消息消费者:创建一个监听特定主题的MessageListener。当有消息到达时,这个监听器会被触发并处理消息。 5. 编写消息发送接口:为了将消息发送逻辑封装起来,可以创建一个服务类,提供发送消息的方法,这些...

    Spring整合Blazeds实现ActiveMQ JMS消息服务

    标题中的“Spring整合Blazeds实现ActiveMQ JMS消息服务”指的是在Java应用程序中使用Spring框架与Blazeds(一个Flex和Java之间的消息传递中间件)集成,通过ActiveMQ(一个流行的开源JMS提供商)来实现消息队列服务...

    spring2 activemq5 tomcat6构建jms

    3. **生产者(Producer)**: 在Spring应用中创建一个生产者,它负责创建消息并发送到目的地。这通常通过调用JmsTemplate的方法实现。 4. **消费者(Consumer)**: 创建消费者来接收和处理消息。消费者可以是监听...

Global site tag (gtag.js) - Google Analytics