`
xiao_nan_ren
  • 浏览: 2433 次
社区版块
存档分类
最新评论

spring jms transaction manager

阅读更多
Spring整合JMS(四)——事务管理

    博客分类: Spring

SpringJMS事务sessionTransactedJtaTransactionManager


Spring提供了一个JmsTransactionManager用于对JMS ConnectionFactory做事务管理。这将允许JMS应用利用Spring的事务管理特性。JmsTransactionManager在执行本地资源事务管理时将从指定的ConnectionFactory绑定一个ConnectionFactory/Session这样的配对到线程中。JmsTemplate会自动检测这样的事务资源,并对它们进行相应操作。

在Java EE环境中,ConnectionFactory会池化Connection和Session,这样这些资源将会在整个事务中被有效地重复利用。在一个独立的环境中,使用Spring的SingleConnectionFactory时所有的事务将公用一个Connection,但是每个事务将保留自己独立的Session。

JmsTemplate可以利用JtaTransactionManager和能够进行分布式的 JMS ConnectionFactory处理分布式事务。

       在Spring整合JMS的应用中,如果我们要进行本地的事务管理的话非常简单,只需要在定义对应的消息监听容器时指定其sessionTransacted属性为true,如:


Xml代码  收藏代码

    <bean id="jmsContainer" 
        class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 
        <property name="connectionFactory" ref="connectionFactory" /> 
        <property name="destination" ref="queueDestination" /> 
        <property name="messageListener" ref="consumerMessageListener" /> 
        <property name="sessionTransacted" value="true"/> 
    </bean> 



       该属性值默认为false,这样JMS在进行消息监听的时候就会进行事务控制,当在接收消息时监听器执行失败时JMS就会对接收到的消息进行回滚,对于SessionAwareMessageListener在接收到消息后发送一个返回消息时也处于同一事务下,但是对于其他操作如数据库访问等将不属于该事务控制。

这里我们可以来做一个这样的测试:我们如上配置监听在queueDestination的消息监听容器的sessionTransacted属性为true,然后把我们前面提到的消息监听器ConsumerMessageListener改成这样:


Java代码  收藏代码

    public class ConsumerMessageListener implements MessageListener { 
      
        public void onMessage(Message message) { 
                //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换,或者直接把onMessage方法的参数改成Message的子类TextMessage 
                TextMessage textMsg = (TextMessage) message; 
                System.out.println("接收到一个纯文本消息。"); 
                try { 
                    System.out.println("消息内容是:" + textMsg.getText()); 
                    if (1 == 1) { 
                        throw new RuntimeException("Error"); 
                    } 
                } catch (JMSException e) { 
                    e.printStackTrace(); 
                } 
        } 
      
    } 

       

我们可以看到在上述代码中我们的ConsumerMessageListener在进行消息接收的时候抛出了一个RuntimeException,根据我们上面说的,因为我们已经在对应的监听容器上定义了其sessionTransacted属性为true,所以当这里抛出异常的时候JMS将对接收到的消息进行回滚,即下次进行消息接收的时候该消息仍然能够被接收到。为了验证这一点,我们先执行一遍测试代码,往queueDestination发送一个文本消息,这个时候ConsumerMessageListener在进行接收的时候将会抛出一个RuntimeException,已经接收到的纯文本消息将进行回滚;接着我们去掉上面代码中抛出异常的语句,即ConsumerMessageListener能够正常的进行消息接收,这个时候我们再运行一次测试代码,往ConsumerMessageListener监听的queueDestination发送一条消息。如果之前在接手时抛出了异常的那条消息已经回滚了的话,那么这个时候将能够接收到两条消息,控制台将输出接收到的两条消息的内容。具体结果有兴趣的朋友可以自己验证一下。

       如果想接收消息和数据库访问处于同一事务中,那么我们就可以配置一个外部的事务管理同时配置一个支持外部事务管理的消息监听容器(如DefaultMessageListenerContainer)。要配置这样一个参与分布式事务管理的消息监听容器,我们可以配置一个JtaTransactionManager,当然底层的JMS ConnectionFactory需要能够支持分布式事务管理,并正确地注册我们的JtaTransactionManager。这样消息监听器进行消息接收和对应的数据库访问就会处于同一数据库控制下,当消息接收失败或数据库访问失败都会进行事务回滚操作。


Xml代码  收藏代码

    <bean id="jmsContainer" 
        class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 
        <property name="connectionFactory" ref="connectionFactory" /> 
        <property name="destination" ref="queueDestination" /> 
        <property name="messageListener" ref="consumerMessageListener" /> 
        <property name="transactionManager" ref="jtaTransactionManager"/> 
    </bean> 
     
    <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/> 

       

当给消息监听容器指定了transactionManager时,消息监听容器将忽略sessionTransacted的值。

       关于使用JtaTransactionManager来管理上述分布式事务,我们这里也可以来做一个试验。

       首先:往Spring配置文件applicationContext.xml中添加如下配置:

  
Xml代码  收藏代码

    <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"> 
        <property name="dataSource" ref="dataSource"/> 
    </bean> 
     
    <jee:jndi-lookup jndi-name="jdbc/mysql" id="dataSource"/> 
    <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/> 
     
    <tx:annotation-driven transaction-manager="jtaTransactionManager"/> 

 

       我们可以看到,在这里我们引入了一个jndi数据源,定义了一个JtaTransactionManager,定义了Spring基于注解的声明式事务管理,定义了一个Spring提供的进行Jdbc操作的工具类jdbcTemplate。



       接下来把我们的ConsumerMessageListener改为如下形式:
Java代码  收藏代码

    public class ConsumerMessageListener implements MessageListener { 
      
        @Autowired 
        private TestDao testDao; 
         
        private int count = 0; 
         
        public void onMessage(Message message) { 
                //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换,或者直接把onMessage方法的参数改成Message的子类TextMessage 
                TextMessage textMsg = (TextMessage) message; 
                System.out.println(new Date().toLocaleString() + "接收到一个纯文本消息。"); 
                try { 
                    String text = textMsg.getText(); 
                    System.out.println("消息内容是:" + text); 
                    System.out.println("当前count的值是:" + count); 
                    testDao.insert(text + count); 
                    if (count == 0) { 
                        count ++; 
                        throw new RuntimeException("Error! 出错啦!"); 
                    } 
                } catch (JMSException e) { 
                    e.printStackTrace(); 
                } 
        } 
      
    } 



       我们可以看到,在ConsumerMessageListener中我们定义了一个实例变量count,其初始值为0;在onMessage里面,我们可以看到我们把接收到的消息内容作为参数调用了testDao的insert方法;当count值为0,也就是进行第一次消息接收的时候会将count的值加1,同时抛出一个运行时异常。那么我们这里要测试的就是进行第一次接收的时候testDao已经把相关内容插入数据库了,接着在onMessage里面抛出了一个异常同时count加1,我们预期的结果应该是此时数据库进行回滚,同时JMS也回滚,这样JMS将继续尝试接收该消息,此时同样会调用testDao的insert方法将内容插入数据库,再接着count已经不为0了,所以此时将不再抛出异常,JMS成功进行消息的接收,testDao也成功的将消息内容插入到了数据库。要证明这个预期我们除了看数据库中插入的数据外,还可以看控制台的输出,正常情况控制台将输出两次消息接收的内容,且第一次时count为0,第二次count为1。

       TestDao是一个接口,其TestDaoImpl对insert的方法实现如下:


Java代码  收藏代码

    @Transactional(readOnly=false) 
    public void insert(final String name) { 
         
        jdbcTemplate.update("insert into test(name) values(?)", name); 
     
    } 

      

这里我们使用支持JtaTransactionManager的Weblogic来进行测试,因为是Web容器,所以我们这里定义了一个Controller来进行消息的发送,具体代码如下:


Java代码  收藏代码

    @Controller 
    @RequestMapping("test") 
    public class TestController { 
      
        @Autowired 
        @Qualifier("queueDestination") 
        private Destination destination; 
         
        @Autowired 
        private ProducerService producerService; 
         
        @RequestMapping("first") 
        public String first() { 
            producerService.sendMessage(destination, "你好,现在是:" + new Date().toLocaleString()); 
            return "/test/first"; 
        } 
         
    } 

     

接下来就是启用Weblogic服务器,进入其控制台,定义一个名叫“jdbc/mysql”的JNDI数据源,然后把该项目部署到Weblogic服务器上并进行启动。接下来我们就可以访问/test/first.do访问到上述first方法。之后控制台会输出如下信息:



        我们可以看到当count为0时接收了一次,并随后抛出了异常,之后count为1又接收了一次,这说明在count为0时抛出异常后我们的JMS进行回滚了,那么我们的数据库是否有进行回滚呢?接着我们来看数据库中的内容:



        我们可以看到数据库表中只有一条记录,而且最后一位表示count的值的为1,这说明在JMS进行消息接收抛出异常时我们的数据库也回滚了。关于使用JtaTransactionManager进行分布式事务管理的问题就说到这里了,有兴趣的朋友可以自己试验一下。
分享到:
评论

相关推荐

    Java分布式开发spring+jta+jotm

    在Spring中,Java Transaction API (JTA) 和 JOTM(Java Open Transaction Manager)是实现分布式事务管理的关键组件。 **Spring 框架** Spring 是一个开源的Java平台,它提供了一个全面的编程和配置模型,用于现代...

    CXF与Spring整合下载文件四

    - 可能需要配置CXF的JMS支持,通过Spring的`&lt;jee:jndi-lookup&gt;`和`&lt;tx:jta-transaction-manager&gt;`标签来连接消息队列和管理事务。 由于没有具体的描述,我们无法深入到代码层面去分析这个整合的具体实现,但以上...

    spring-boot-reference.pdf

    37.6. Supporting an Alternative Embedded Transaction Manager 38. Hazelcast 39. Quartz Scheduler 40. Spring Integration 41. Spring Session 42. Monitoring and Management over JMX 43. Testing 43.1. Test ...

    分布式事务JTA之实践:Spring+ATOMIKOS

    Atomikos包含了事务协调器(Transaction Manager)和资源管理器(如XAResource),可以处理跨数据库、JMS和其他JTA兼容资源的事务。Atomikos的使用使得在不依赖特定数据库厂商API的情况下,能够在不同资源之间实现两...

    Spring2.0的配置

    &lt;tx:advice id="txAdvice" transaction-manager="transactionManager"&gt; *" propagation="REQUIRED"/&gt; *" propagation="REQUIRED" rollback-for="java.lang.RuntimeException"/&gt; *" propagation="REQUIRED" ...

    spring 事务配置方式

    对于JMS或JTA等分布式事务,Spring提供了LocalContainerEntityManagerFactoryBean和LocalTransactionManager,可以处理跨数据库的事务。 5. **PlatformTransactionManager的自定义实现** 如果以上方式都不能满足...

    Spring 3.1配置文件示例(备忘)

    &lt;tx:annotation-driven transaction-manager="transactionManager"/&gt; &lt;bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"&gt; ``` 最后,资源加载是 Spring...

    Spring基础教程

    3. **数据访问抽象** (Data Access Abstraction): Spring提供了一系列用于简化数据访问的技术,包括JDBC、ORM(如Hibernate、JPA)和JMS的支持,这些技术可以帮助开发者更轻松地与数据库进行交互。 4. **事务管理**:...

    java_spring_day05.pdf

    - 通过配置`&lt;tx:annotation-driven transaction-manager="transactionManager"/&gt;`启用注解驱动的事务管理。 - 使用`@Transactional`注解来标记需要事务处理的方法或类。 2. **编程式事务管理:** - 适用于复杂的...

    基于Java的实例源码-开源事务管理器 JOTM.zip

    Java开源事务管理器JOTM(Java Open Transaction Manager)是一个强大的、完全符合X/Open XA规范的事务管理器,它提供了分布式事务处理的能力。在Java应用程序中,事务管理是确保数据一致性、完整性和原子性的重要...

    ow2-jotm-dist-2.2.1-bin.tar.gz

    JOTM,全称Java Open Transaction Manager,是一款开源的Java事务管理器,它实现了JTA(Java Transaction API)标准,用于在分布式环境中协调跨系统的事务处理。 描述中的信息同样为 "ow2-jotm-dist-2.2.1-bin.tar....

    java技术架构图汇总.docx

    Hibernate架构主要包括Session、Transaction、Query、Criteria等几个部分。 Struts2架构:Struts2是一个基于Java的开源web应用程序框架,旨在简化web应用程序的开发。Struts2架构主要包括Action、Interceptor、...

    tuxedo教程和资料

    随着技术的发展,虽然Tuxedo仍然在某些领域发挥重要作用,但也有其他中间件如Java的JMS、Spring框架的Apache CXF等成为替代选择。同时,Tuxedo也可以与其他技术如Web Services、RESTful API集成,实现更广泛的应用...

    金蝶BOSV6.1_业务组件API参考手册

    com.kingdee.bos.transaction.springframework.transaction.interceptor com.kingdee.bos.transaction.springframework.transaction.jta com.kingdee.bos.transaction.springframework.transaction.support ...

    RMI规范说明

    远程方法调用(Remote Method Invocation,RMI)是Java平台上的...尽管现代的Java技术如JMS、Web服务和分布式框架(如Spring Cloud)在某些场景下提供了更强大的解决方案,但RMI依然是理解和掌握Java分布式计算的基础。

    Java程序员面试题大全

    - 员工表查询,可以使用JOIN和IFNULL函数处理,例如:`SELECT e.id, e.name, IFNULL(m.name, '无') AS manager_name FROM employee e LEFT JOIN employee m ON e.managerid = m.id;` - Java规范,比如命名规范:...

    EhcacheUserGuide

    #### 三十、缓存管理器事件监听器(Cache Manager Event Listeners) 缓存管理器事件监听器可以用来监听缓存管理器的各种事件,如缓存创建、销毁等。这部分内容介绍了如何配置和使用缓存管理器事件监听器。 #### 三...

Global site tag (gtag.js) - Google Analytics