`
weitao1026
  • 浏览: 1052844 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Spring整合JMS

阅读更多
Spring提供了一个JmsTransactionManager用于对JMS ConnectionFactory做事务管理。这将允许JMS应用利用Spring的事务管理特性。JmsTransactionManager在执行本地资源事务管理时将从指定的ConnectionFactory绑定一个ConnectionFactory/Session这样的配对到线程中。JmsTemplate会自动检测这样的事务资源,并对它们进行相应操作。

在Java EE环境中,ConnectionFactory会池化Connection和Session,这样这些资源将会在整个事务中被有效地重复利用。在一个独立的环境中,使用Spring的SingleConnectionFactory时所有的事务将公用一个Connection,但是每个事务将保留自己独立的Session。

JmsTemplate可以利用JtaTransactionManager和能够进行分布式的 JMS ConnectionFactory处理分布式事务。

       在Spring整合JMS的应用中,如果我们要进行本地的事务管理的话非常简单,只需要在定义对应的消息监听容器时指定其sessionTransacted属性为true,如:


Xml代码  收藏代码

    <bean id="jmsContainer" 
        class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 
        <property name="connectionFactory" ref="connectionFactory" /> 
        <property name="destination" ref="queueDestination" /> 
        <property name="messageListener" ref="consumerMessageListener" /> 
        <property name="sessionTransacted" value="true"/> 
    </bean> 



       该属性值默认为false,这样JMS在进行消息监听的时候就会进行事务控制,当在接收消息时监听器执行失败时JMS就会对接收到的消息进行回滚,对于SessionAwareMessageListener在接收到消息后发送一个返回消息时也处于同一事务下,但是对于其他操作如数据库访问等将不属于该事务控制。

这里我们可以来做一个这样的测试:我们如上配置监听在queueDestination的消息监听容器的sessionTransacted属性为true,然后把我们前面提到的消息监听器ConsumerMessageListener改成这样:


Java代码  收藏代码

    public class ConsumerMessageListener implements MessageListener { 
      
        public void onMessage(Message message) { 
                //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换,或者直接把onMessage方法的参数改成Message的子类TextMessage 
                TextMessage textMsg = (TextMessage) message; 
                System.out.println("接收到一个纯文本消息。"); 
                try { 
                    System.out.println("消息内容是:" + textMsg.getText()); 
                    if (1 == 1) { 
                        throw new RuntimeException("Error"); 
                    } 
                } catch (JMSException e) { 
                    e.printStackTrace(); 
                } 
        } 
      
    } 

       

我们可以看到在上述代码中我们的ConsumerMessageListener在进行消息接收的时候抛出了一个RuntimeException,根据我们上面说的,因为我们已经在对应的监听容器上定义了其sessionTransacted属性为true,所以当这里抛出异常的时候JMS将对接收到的消息进行回滚,即下次进行消息接收的时候该消息仍然能够被接收到。为了验证这一点,我们先执行一遍测试代码,往queueDestination发送一个文本消息,这个时候ConsumerMessageListener在进行接收的时候将会抛出一个RuntimeException,已经接收到的纯文本消息将进行回滚;接着我们去掉上面代码中抛出异常的语句,即ConsumerMessageListener能够正常的进行消息接收,这个时候我们再运行一次测试代码,往ConsumerMessageListener监听的queueDestination发送一条消息。如果之前在接手时抛出了异常的那条消息已经回滚了的话,那么这个时候将能够接收到两条消息,控制台将输出接收到的两条消息的内容。具体结果有兴趣的朋友可以自己验证一下。

       如果想接收消息和数据库访问处于同一事务中,那么我们就可以配置一个外部的事务管理同时配置一个支持外部事务管理的消息监听容器(如DefaultMessageListenerContainer)。要配置这样一个参与分布式事务管理的消息监听容器,我们可以配置一个JtaTransactionManager,当然底层的JMS ConnectionFactory需要能够支持分布式事务管理,并正确地注册我们的JtaTransactionManager。这样消息监听器进行消息接收和对应的数据库访问就会处于同一数据库控制下,当消息接收失败或数据库访问失败都会进行事务回滚操作。


Xml代码  收藏代码

    <bean id="jmsContainer" 
        class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 
        <property name="connectionFactory" ref="connectionFactory" /> 
        <property name="destination" ref="queueDestination" /> 
        <property name="messageListener" ref="consumerMessageListener" /> 
        <property name="transactionManager" ref="jtaTransactionManager"/> 
    </bean> 
     
    <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/> 

       

当给消息监听容器指定了transactionManager时,消息监听容器将忽略sessionTransacted的值。

       关于使用JtaTransactionManager来管理上述分布式事务,我们这里也可以来做一个试验。

       首先:往Spring配置文件applicationContext.xml中添加如下配置:

 
Xml代码  收藏代码

    <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"> 
        <property name="dataSource" ref="dataSource"/> 
    </bean> 
     
    <jee:jndi-lookup jndi-name="jdbc/mysql" id="dataSource"/> 
    <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/> 
     
    <tx:annotation-driven transaction-manager="jtaTransactionManager"/> 

 

       我们可以看到,在这里我们引入了一个jndi数据源,定义了一个JtaTransactionManager,定义了Spring基于注解的声明式事务管理,定义了一个Spring提供的进行Jdbc操作的工具类jdbcTemplate。



       接下来把我们的ConsumerMessageListener改为如下形式:

Java代码  收藏代码

    public class ConsumerMessageListener implements MessageListener { 
      
        @Autowired 
        private TestDao testDao; 
         
        private int count = 0; 
         
        public void onMessage(Message message) { 
                //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换,或者直接把onMessage方法的参数改成Message的子类TextMessage 
                TextMessage textMsg = (TextMessage) message; 
                System.out.println(new Date().toLocaleString() + "接收到一个纯文本消息。"); 
                try { 
                    String text = textMsg.getText(); 
                    System.out.println("消息内容是:" + text); 
                    System.out.println("当前count的值是:" + count); 
                    testDao.insert(text + count); 
                    if (count == 0) { 
                        count ++; 
                        throw new RuntimeException("Error! 出错啦!"); 
                    } 
                } catch (JMSException e) { 
                    e.printStackTrace(); 
                } 
        } 
      
    } 



       我们可以看到,在ConsumerMessageListener中我们定义了一个实例变量count,其初始值为0;在onMessage里面,我们可以看到我们把接收到的消息内容作为参数调用了testDao的insert方法;当count值为0,也就是进行第一次消息接收的时候会将count的值加1,同时抛出一个运行时异常。那么我们这里要测试的就是进行第一次接收的时候testDao已经把相关内容插入数据库了,接着在onMessage里面抛出了一个异常同时count加1,我们预期的结果应该是此时数据库进行回滚,同时JMS也回滚,这样JMS将继续尝试接收该消息,此时同样会调用testDao的insert方法将内容插入数据库,再接着count已经不为0了,所以此时将不再抛出异常,JMS成功进行消息的接收,testDao也成功的将消息内容插入到了数据库。要证明这个预期我们除了看数据库中插入的数据外,还可以看控制台的输出,正常情况控制台将输出两次消息接收的内容,且第一次时count为0,第二次count为1。

       TestDao是一个接口,其TestDaoImpl对insert的方法实现如下:



Java代码  收藏代码

    @Transactional(readOnly=false) 
    public void insert(final String name) { 
         
        jdbcTemplate.update("insert into test(name) values(?)", name); 
     
    } 

      

这里我们使用支持JtaTransactionManager的Weblogic来进行测试,因为是Web容器,所以我们这里定义了一个Controller来进行消息的发送,具体代码如下:


Java代码  收藏代码

    @Controller 
    @RequestMapping("test") 
    public class TestController { 
      
        @Autowired 
        @Qualifier("queueDestination") 
        private Destination destination; 
         
        @Autowired 
        private ProducerService producerService; 
         
        @RequestMapping("first") 
        public String first() { 
            producerService.sendMessage(destination, "你好,现在是:" + new Date().toLocaleString()); 
            return "/test/first"; 
        } 
         
    } 

     

接下来就是启用Weblogic服务器,进入其控制台,定义一个名叫“jdbc/mysql”的JNDI数据源,然后把该项目部署到Weblogic服务器上并进行启动。接下来我们就可以访问/test/first.do访问到上述first方法。之后控制台会输出如下信息:



        我们可以看到当count为0时接收了一次,并随后抛出了异常,之后count为1又接收了一次,这说明在count为0时抛出异常后我们的JMS进行回滚了,那么我们的数据库是否有进行回滚呢?接着我们来看数据库中的内容:



        我们可以看到数据库表中只有一条记录,而且最后一位表示count的值的为1,这说明在JMS进行消息接收抛出异常时我们的数据库也回滚了。关于使用JtaTransactionManager进行分布式事务管理的问题就说到这里了,有兴趣的朋友可以自己试验一下。
分享到:
评论

相关推荐

    Spring整合JMS(四)——事务管理

    在Spring整合JMS的过程中,我们通常会使用`DefaultMessageListenerContainer`或`SimpleMessageListenerContainer`类来消费消息。这些容器提供了与JMS提供者(如ActiveMQ、RabbitMQ等)的连接管理,并且支持事务性...

    spring整合jms+activemq

    综上所述,Spring整合JMS和ActivemQ提供了一套完整的解决方案,帮助开发者轻松地在应用中实现消息的发送和接收。通过这种方式,可以构建出高可用、松耦合、可扩展的分布式系统,提高系统的稳定性和响应速度。在实际...

    Spring集成JMS

    Spring集成JMS是Java消息服务(Java Message Service)与Spring框架的结合,它提供了一种在分布式系统中高效处理异步消息传递的方式。Spring通过其强大的IoC(Inversion of Control,控制反转)和AOP(Aspect ...

    Spring整合JMS——实现收发消息

    总结来说,Spring整合JMS和ActiveMQ的过程包括:配置ConnectionFactory,定义Destination,创建MessageListener容器,以及使用JmsTemplate发送消息。通过这种方式,你可以构建一个健壮的、异步的消息传递系统,提高...

    Spring整合JMS.doc

    Spring整合JMS与ActiveMQ深度解析 一、JMS概览与原理 Java Message Service (JMS) 是Java平台中的消息服务应用接口,为应用程序之间的通信提供消息传递机制。JMS支持两种通信模型:点对点(Point-to-Point, PTP)...

    spring整合JMS-居于ActiveMQ实现

    Spring整合JMS基于ActiveMQ实现是一项常见的企业级应用开发任务,它涉及到Spring框架、Java消息服务(JMS)以及ActiveMQ消息中间件的使用。在本文中,我们将深入探讨这三个关键概念,以及如何将它们有效地结合在一起...

    Spring整合JMS(三)——MessageConverter介绍

    总结,Spring整合JMS中的MessageConverter是实现数据在Java对象和JMS消息间转换的关键。正确地选择和配置MessageConverter,能有效提高系统通信的效率和灵活性,同时降低开发复杂度。在实际应用中,根据业务需求选择...

    消息中间件ActiveMQ及Spring整合JMS.docx

    【ActiveMQ和Spring整合JMS】的文档主要介绍了消息中间件的基本概念,特别是重点讨论了ActiveMQ和JMS的相关知识。消息中间件是用于不同分布式系统之间数据交流的工具,通过消息传递机制来扩展进程间的通信。ActiveMQ...

    spring集成JMS

    Spring框架作为一个强大的企业级应用开发框架,提供了对JMS的全面支持,使得开发者能够方便地在Spring应用中集成JMS,实现异步消息处理和高可用性。 一、JMS简介 JMS是一种API,用于在分布式环境中发送、接收和管理...

    spring-jms入门

    Spring-JMS是Spring框架的一部分,专门用于处理Java消息服务(JMS)的集成。它提供了一个简单的API,使得开发者能够方便地在应用中使用消息传递功能。本文将深入探讨Spring-JMS的基础知识,包括它的核心概念、配置...

    JMS实例,整合spring,含jar

    5. **事务管理**:Spring提供了集成JMS事务的能力,可以在发送消息时开启事务,确保消息的原子性和一致性。 6. **测试与运行**:项目中的“完整可运行”意味着包含了运行所需的全部资源,包括配置文件、源代码、...

    spring_jms

    Spring JMS(Java Message Service)是Spring框架的一部分,专门用于集成JMS消息传递系统,以实现异步通信和解耦应用程序组件。在这个入门级实例中,我们将探讨如何使用Maven、Spring和ActiveMQ来构建一个简单的...

    jms简单demo,集成spring和不集成

    1. Spring集成JMS的配置文件,如`applicationContext.xml`或`application.yml`,配置`ActiveMQConnectionFactory`、`JmsTemplate`等。 2. 使用`@JmsListener`注解的消息消费者类,处理从队列或主题接收到的消息。 3....

    spring整合JMS实现同步收发消息(基于ActiveMQ的实现)

    Spring 整合 JMS 实现同步收发消息(基于 ActiveMQ 的实现) Spring 整合 JMS 实现同步收发消息是指在 Spring 框架下使用 JMS(Java Message Service)协议来实现消息的同步收发。JMS 是一个基于 Java 的消息服务 ...

    spring-jms-4.3.4.RELEASE.zip

    综上所述,Spring JMS 4.3.4.RELEASE为开发者提供了一套完整的JMS集成方案,通过抽象和封装JMS API,使得消息处理更加简单且易于维护。配合Spring Framework的其他模块,如Spring JDBC、Spring AOP等,可以构建出...

    JMS_Spring集成所需jar

    通过这些库文件和正确的配置,Spring可以帮助开发者轻松地集成JMS,实现可靠的消息传递,提升系统的可扩展性和容错性。同时,Spring的声明式事务管理可以确保消息传递的一致性,使得在整个系统中实现高可用和健壮的...

    spring-jms

    Spring-JMS是Spring框架的一部分,专门用于处理Java消息服务(JMS)的集成。这个jar包,即`spring-jms-3.1.1.RELEASE.jar`,包含了Spring对JMS API的抽象和扩展,使得在Spring应用中使用JMS变得更加简单和灵活。 **...

    Spring-JMS把企业消息处理变容易.doc

    Spring JMS 是一个强大的框架,它极大地简化了Java企业级消息处理。它通过提供一套抽象和模板类,使得开发者能够更加便捷地使用Java消息服务(JMS),并与各种JMS提供者,如IBM的WebSphere MQ进行集成。本文将深入...

Global site tag (gtag.js) - Google Analytics