`
zhangwei_david
  • 浏览: 477939 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Spring 之 JMS 监听JMS消息

    博客分类:
  • JMS
 
阅读更多

    在调用JMS消息消费者的receive()方法接收消息时,调用线程在消息可用之前一直阻塞。该线程出了等待还是等待,无所事事。这样的消息接收是同步消息接收,因为只用等到消息到达才能接收线程的工作。

     有同步的消息接收就有异步的消息接收,异步的消息接收就是注册一个消息监听器,该消息监听器必须实现javax.jms.MessageListener接口,当消息到达时将调用onMessage()方法,以消息作为方法的参数。

 

    原生的接口是javax.jms.MessageListener,除了这个原生的接口外,Spring 还提供了 SessionAwareMessageListener和MessageListenerAdapter.

 

/**
 *
 * @author zhangwei_david
 * @version $Id: MessageListener.java, v 0.1 2015年1月31日 下午9:06:02 zhangwei_david Exp $
 */
public class MailMessageListener implements MessageListener {

    private MessageConverter messageConverter;

    /**
     * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
     */
    public void onMessage(Message msg) {
        try {
            System.out.println("on message:" + messageConverter.fromMessage(msg));
        } catch (JMSException e) {
        }
    }

    /**
     * Setter method for property <tt>messageConverter</tt>.
     *
     * @param messageConverter value to be assigned to property messageConverter
     */
    public void setMessageConverter(MessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }

}

     Spring 中提供了多种消息监听器的容器,常用的容器有SimpleMessageListenerContainer 和DefaultMessageListenerContainer。SimpleMessageListenerContainer 是一个最简单的容器,不提供事务的支持,DefaultMessageListenerContainer是默认的容器实现,支持事务。

 

/**
 *
 * @author zhangwei_david
 * @version $Id: ToStringBase.java, v 0.1 2015年2月2日 下午7:41:52 zhangwei_david Exp $
 */
public class ToStringBase {

    /**
     * @see java.lang.Object#toString()
     */
    @Override
    public String toString() {
        return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
    }

}

 

定义一个基本的对象

/**
 *
 * @author zhangwei_david
 * @version $Id: Mail.java, v 0.1 2015年2月2日 下午7:25:24 zhangwei_david Exp $
 */
public class Mail extends ToStringBase {

    /**id**/
    private String mailId;

    private String from;

    private String to;

    private String content;

    /**
     * Getter method for property <tt>mailId</tt>.
     *
     * @return property value of mailId
     */
    public String getMailId() {
        return mailId;
    }

    /**
     * Setter method for property <tt>mailId</tt>.
     *
     * @param mailId value to be assigned to property mailId
     */
    public void setMailId(String mailId) {
        this.mailId = mailId;
    }

    /**
     * Getter method for property <tt>from</tt>.
     *
     * @return property value of from
     */
    public String getFrom() {
        return from;
    }

    /**
     * Setter method for property <tt>from</tt>.
     *
     * @param from value to be assigned to property from
     */
    public void setFrom(String from) {
        this.from = from;
    }

    /**
     * Getter method for property <tt>to</tt>.
     *
     * @return property value of to
     */
    public String getTo() {
        return to;
    }

    /**
     * Setter method for property <tt>to</tt>.
     *
     * @param to value to be assigned to property to
     */
    public void setTo(String to) {
        this.to = to;
    }

    /**
     * Getter method for property <tt>content</tt>.
     *
     * @return property value of content
     */
    public String getContent() {
        return content;
    }

    /**
     * Setter method for property <tt>content</tt>.
     *
     * @param content value to be assigned to property content
     */
    public void setContent(String content) {
        this.content = content;
    }

}

 

/**
 *
 * @author zhangwei_david
 * @version $Id: ProducerImpl.java, v 0.1 2015年1月31日 下午8:25:36 zhangwei_david Exp $
 */
@Component
public class ProducerImpl implements Producer {
    @Autowired
    private JmsTemplate jmsTemplate;

    /**
     */
    @Transactional
    public void send(Mail mail) {
        System.out.println("sende->" + mail);
        jmsTemplate.convertAndSend(mail);

    }
}

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:jms="http://www.springframework.org/schema/jms"
	xmlns:p="http://www.springframework.org/schema/p" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:aop="http://www.springframework.org/schema/aop" 
	xsi:schemaLocation="
		http://www.springframework.org/schema/beans
		http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
		http://www.springframework.org/shcema/jms
		http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
		http://www.springframework.org/schema/context
		http://www.springframework.org/schema/context/spring-context-3.0.xsd
		http://www.springframework.org/schema/aop 
		http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
		">
	<aop:aspectj-autoproxy />

	<context:annotation-config />
	<context:component-scan base-package="com.cathy.demo.jms.*" />
	<!-- connectionFactory -->
	<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="tcp://localhost:61616"/>
	</bean>
	<!-- mailMessage converter -->
	<bean id="mailMessageConverter" class="com.cathy.demo.jms.convert.MailMessageConverter"/>
	<!-- jmsTemplate -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="connectionFactory"/>
		<property name="defaultDestination" ref="topic"/>
		<property name="receiveTimeout" value="60000"/>
		<property name="pubSubDomain" value="true"/>
		<property name="sessionTransacted" value="true"/>   
		<property name="messageConverter" ref="mailMessageConverter"/>
	</bean>
	<!--
	<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg value="message.queue"/>
	</bean>
	-->
	<!-- 主题 -->
	<bean id="topic" class="org.apache.activemq.command.ActiveMQTopic">
		<constructor-arg value="notifyTopic"/>
	</bean>
	<bean id="defaultMessageListener" class="com.cathy.demo.jms.listener.MailMessageListener">
		<property name="messageConverter" ref="mailMessageConverter"/>
	</bean>
	<!-- 消息接收监听器用于异步接收消息-->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
        <property name="connectionFactory" ref="connectionFactory"/>  
        <property name="destination" ref="topic"/>
        <property name="sessionTransacted" value="true"/>
        <property name="messageListener" ref="defaultMessageListener"/>  
   </bean>
   
</beans>

 

/**
 *
 * @author zhangwei_david
 * @version $Id: Sender.java, v 0.1 2015年1月31日 下午8:47:18 zhangwei_david Exp $
 */
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath*:META-INF/spring/jms-beans.xml")
public class SenderAndReciver {
    @Autowired
    private Producer producer;

    @Test
    public void testSend() {
        Mail mail = new Mail();
        mail.setMailId("testId");
        mail.setTo("david");
        mail.setFrom("cathy");
        mail.setContent("Hello");
        producer.send(mail);
    }

}

 测试的结果是:

log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
sende->Mail[mailId=testId,from=cathy,to=david,content=Hello]
on message:Mail[mailId=testId,from=cathy,to=david,content=Hello]

 

MessageListenerAdapter

 

消息监听器适配器代理消息的处理目标 通过反射监听方法,具有灵活的消息类型转换。允许监听器的方法来对邮件内容类型进行操作,完全独立于JMS API

 

定义一个消息处理目标方法

 

/**
 *
 * @author zhangwei_david
 * @version $Id: ListenerDelegate.java, v 0.1 2015年2月3日 下午2:32:11 zhangwei_david Exp $
 */
public class ListenerDelegate {

    public void handleMessage(@SuppressWarnings("rawtypes") Map map) {
        System.out.println("receive  MapMessage->" + map);
    }
}
 
<bean id="defaultListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
         <constructor-arg ref="listenerDelegate"/>
     </bean>
	
	<!-- 消息接收监听器用于异步接收消息-->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
        <property name="connectionFactory" ref="connectionFactory"/>  
        <property name="destination" ref="topic"/>
        <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE"/>
        <property name="messageListener" ref="defaultListener"/> 
        <property name="sessionTransacted" value="true"/> 
   </bean>
 执行测试方法的结果是:

 

sende->Mail[mailId=testId,from=cathy,to=david,content=Hello]
on message:Mail[mailId=testId,from=cathy,to=david,content=Hello]
receive  MapMessage->{mailId=testId, from=cathy, to=david, content=Hello}
 那么在MessageListenerAdapter中又是如何处理消息的呢?
public void onMessage(Message message, Session session) throws JMSException {
		// 获取代理方法
		Object delegate = getDelegate();
		// 如果代理方法不是当前类
		if (delegate != this) {
			// 如果代理方法是实现SessionAwareMessageListener接口的,直接将消息和session交给代理方法处理
			if (delegate instanceof SessionAwareMessageListener) {
				if (session != null) {
					((SessionAwareMessageListener) delegate).onMessage(message, session);
					return;
				}
				// session为null,且该代理对象未实现MessageListener接口则抛出一个异常
				else if (!(delegate instanceof MessageListener)) {
					throw new javax.jms.IllegalStateException("MessageListenerAdapter cannot handle a " +
							"SessionAwareMessageListener delegate if it hasn't been invoked with a Session itself");
				}
			}
			// 如果代理对象实现了MessageListener接口则将消息交给代理对象处理
			if (delegate instanceof MessageListener) {
				((MessageListener) delegate).onMessage(message);
				return;
			}
		}

		//将消息体转换为一个对象
		Object convertedMessage = extractMessage(message);
		// 获取这个消息的处理方法
		String methodName = getListenerMethodName(message, convertedMessage);
		if (methodName == null) {
			throw new javax.jms.IllegalStateException("No default listener method specified: " +
					"Either specify a non-null value for the 'defaultListenerMethod' property or " +
					"override the 'getListenerMethodName' method.");
		}

		// 反射调用处理方法处理消息
		Object[] listenerArguments = buildListenerArguments(convertedMessage);
		Object result = invokeListenerMethod(methodName, listenerArguments);
		if (result != null) {
			handleResult(result, message, session);
		}
		else {
			logger.trace("No result object given - no result to handle");
		}
	}
 
0
0
分享到:
评论

相关推荐

    spring-jms入门

    Spring-JMS是Spring框架的一部分,专门用于处理Java消息服务(JMS)的集成。它提供了一个简单的API,使得开发者能够方便地在应用中使用消息传递功能。本文将深入探讨Spring-JMS的基础知识,包括它的核心概念、配置...

    Spring发送接收JMS消息

    **Spring与JMS消息传递** 在Java世界中,Java Message Service (JMS) 是一个标准接口,用于在分布式环境中发送和接收消息。Spring框架提供了一种简单而强大的方式来集成JMS,使得开发者可以轻松地在应用中实现异步...

    spring_jms

    Spring提供了`MessageListenerContainer`接口,允许我们定义监听JMS消息的策略,例如基于`DefaultMessageListenerContainer`的实现,它可以自动启动和停止消息监听。 接下来,JMS是Java平台的标准API,用于在分布式...

    spring整合jms+activemq

    此外,Spring还支持JMS监听器容器,使得消息的消费可以以非阻塞的方式进行,提高了系统的可扩展性和并发性能。 接下来,我们来看看如何集成ActivemQ。首先,需要在项目中引入ActivemQ的相关依赖,通常是在pom.xml...

    Java网络编程--基于Spring的JMS编程

    在Java中,Spring框架提供了强大而灵活的支持,使得开发人员能够轻松地实现基于Java消息服务(JMS)的网络编程。JMS是一个标准接口,它允许应用程序创建、发送、接收和读取消息,这些消息可以在不同的应用之间传递,...

    spring-jms-4.3.4.RELEASE.zip

    在Spring的配置文件中,开发者可以轻松定义消息模板、监听容器和消息驱动的POJO,从而降低与JMS相关的代码复杂性。 核心概念包括: 1. **MessageTemplate**:这是Spring JMS提供的主要工具类,用于发送JMS消息。它...

    SpringJMS示例代码

    SpringJMS是Spring框架的一部分,它提供了一种与Java消息服务(JMS)进行交互的简单方式。在本文中,我们将深入探讨SpringJMS的基本概念、如何与ActiveMQ集成,以及如何通过示例代码理解其工作原理。 1. **Spring...

    spring-jms源码

    MessageListenerContainer是Spring JMS用于异步处理消息的组件,它可以监听一个或多个队列或主题,并在接收到消息时调用预先注册的MessageListener。这使得应用程序可以专注于业务逻辑,而无需关心消息的接收和处理...

    Spring+Weblogic JMS

    在本项目中,Spring与WebLogic JMS(Java消息服务)的集成展示了如何在Spring环境中使用消息队列进行通信。 WebLogic JMS是Oracle WebLogic Server提供的消息中间件,它遵循JMS规范,用于在分布式环境中传递消息,...

    spring-jms

    Spring-JMS是Spring框架的一部分,专门用于处理Java消息服务(JMS)的集成。这个jar包,即`spring-jms-3.1.1.RELEASE.jar`,包含了Spring对JMS API的抽象和扩展,使得在Spring应用中使用JMS变得更加简单和灵活。 **...

    activemq与spring整合发送jms消息入门实例

    在Java世界中,ActiveMQ和Spring的整合是企业级应用中常见的消息中间件解决方案,用于实现JMS(Java Message Service)消息传递。本教程将深入探讨如何将这两个强大的工具结合在一起,以创建一个简单的发送JMS消息的...

    Spring+JMS+ActiveMQ+Tomcat jar下载

    然后,通过Spring的JMS模板发送消息,并注册MessageListener监听特定的消息。ActiveMQ服务器配置需要指定存储、网络连接等相关参数。最后,将整个应用打包成WAR文件,部署到Tomcat服务器上。 总结起来,"Spring+JMS...

    Spring集成JMS

    在实际应用中,`SpringJMS`可能包含以下示例代码片段: ```xml &lt;bean id="jmsConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"&gt; &lt;!-- 这里配置具体的JMS提供商实现 --&gt;...

    JMS_Spring集成所需jar

    2. **Spring Framework JMS模块** - Spring的核心模块之一,提供对JMS的支持。如`spring-jms.jar`,包含了Spring对JMS的抽象和配置支持。 3. **Java消息服务API** - `jms.jar` 或 `javax.jms-api.jar`,这是Java...

    jms整合spring工程

    - **Spring JMS模块**:Spring框架提供了一组JMS模板和监听器容器,简化了消息的生产和消费。 - **ConnectionFactory**:JMS API的核心对象,用于创建连接到消息代理的连接。 - **Destination**:表示消息的目的...

    Spring整合JMS——实现收发消息

    在IT行业中,Spring框架是Java领域最常用的轻量级应用框架之一,而JMS(Java Message Service)则是Java平台上的消息中间件标准,用于应用程序之间的异步通信。本篇文章将详细探讨如何通过Spring框架整合JMS,特别是...

    JMS之Spring +activeMQ实现消息队列

    总结起来,"JMS之Spring + ActiveMQ实现消息队列"涉及到的关键知识点包括:Spring框架的JMS支持、ActiveMQ的使用、ConnectionFactory的配置、JmsTemplate和MessageListener的实现,以及消息队列在解决系统解耦和异步...

    Spring JMS 消息处理-基于JNDI

    4. **消息监听器**:Spring JMS支持声明式和编程式的消息监听器。博客会展示如何定义MessageListener接口的实现,并将其注册到Spring容器中,以便在接收到消息时自动调用。 5. **发送和接收消息**:博主将演示如何...

    JMS整合Spring实例

    在Spring的配置文件(如applicationContext.xml)中,我们需要配置JMS连接工厂、目的地(Topic或Queue)以及消息监听器容器。 ```xml &lt;bean id="jmsTemplate" class="org.springframework.jms.core....

Global site tag (gtag.js) - Google Analytics