`

Spring整合JMS(四)——事务管理

    博客分类:
  • JMS
阅读更多

一、ActiveMQ入门实例

二、Spring整合JMS(一)——基于ActiveMQ实现

三、Spring整合JMS(二)——三种消息监听器

四、Spring整合JMS(三)——MessageConverter消息转换器 

五、Spring整合JMS(四)——事务管理

 

Spring提供了一个JmsTransactionManager用于对JMS ConnectionFactory做事务管理。这将允许JMS应用利用Spring的事务管理特性。JmsTransactionManager在执行本地资源事务管理时将从指定的ConnectionFactory绑定一个ConnectionFactory/Session这样的配对到线程中。JmsTemplate会自动检测这样的事务资源,并对它们进行相应操作。

在Java EE环境中,ConnectionFactory会池化Connection和Session,这样这些资源将会在整个事务中被有效地重复利用。在一个独立的环境中,使用Spring的SingleConnectionFactory时所有的事务将公用一个Connection,但是每个事务将保留自己独立的Session。

JmsTemplate可以利用JtaTransactionManager和能够进行分布式的 JMS ConnectionFactory处理分布式事务。

       在Spring整合JMS的应用中,如果我们要进行本地的事务管理的话非常简单,只需要在定义对应的消息监听容器时指定其sessionTransacted属性为true,如:

 

    <bean id="jmsContainer"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="consumerMessageListener" />
        <property name="sessionTransacted" value="true"/>
    </bean>

 

       该属性值默认为false,这样JMS在进行消息监听的时候就会进行事务控制,当在接收消息时监听器执行失败时JMS就会对接收到的消息进行回滚,对于SessionAwareMessageListener在接收到消息后发送一个返回消息时也处于同一事务下,但是对于其他操作如数据库访问等将不属于该事务控制。

这里我们可以来做一个这样的测试:我们如上配置监听在queueDestination的消息监听容器的sessionTransacted属性为true,然后把我们前面提到的消息监听器ConsumerMessageListener改成这样:

 

public class ConsumerMessageListener implements MessageListener {
 
    public void onMessage(Message message) {
            //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换,或者直接把onMessage方法的参数改成Message的子类TextMessage
            TextMessage textMsg = (TextMessage) message;
            System.out.println("接收到一个纯文本消息。");
            try {
                System.out.println("消息内容是:" + textMsg.getText());
                if (1 == 1) {
                    throw new RuntimeException("Error");
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
    }
 
}

        

我们可以看到在上述代码中我们的ConsumerMessageListener在进行消息接收的时候抛出了一个RuntimeException,根据我们上面说的,因为我们已经在对应的监听容器上定义了其sessionTransacted属性为true,所以当这里抛出异常的时候JMS将对接收到的消息进行回滚,即下次进行消息接收的时候该消息仍然能够被接收到。为了验证这一点,我们先执行一遍测试代码,往queueDestination发送一个文本消息,这个时候ConsumerMessageListener在进行接收的时候将会抛出一个RuntimeException,已经接收到的纯文本消息将进行回滚;接着我们去掉上面代码中抛出异常的语句,即ConsumerMessageListener能够正常的进行消息接收,这个时候我们再运行一次测试代码,往ConsumerMessageListener监听的queueDestination发送一条消息。如果之前在接手时抛出了异常的那条消息已经回滚了的话,那么这个时候将能够接收到两条消息,控制台将输出接收到的两条消息的内容。具体结果有兴趣的朋友可以自己验证一下。

       如果想接收消息和数据库访问处于同一事务中,那么我们就可以配置一个外部的事务管理同时配置一个支持外部事务管理的消息监听容器(如DefaultMessageListenerContainer)。要配置这样一个参与分布式事务管理的消息监听容器,我们可以配置一个JtaTransactionManager,当然底层的JMS ConnectionFactory需要能够支持分布式事务管理,并正确地注册我们的JtaTransactionManager。这样消息监听器进行消息接收和对应的数据库访问就会处于同一数据库控制下,当消息接收失败或数据库访问失败都会进行事务回滚操作。

 

    <bean id="jmsContainer"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="consumerMessageListener" />
        <property name="transactionManager" ref="jtaTransactionManager"/>
    </bean>
    
    <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>

        

当给消息监听容器指定了transactionManager时,消息监听容器将忽略sessionTransacted的值。 

       关于使用JtaTransactionManager来管理上述分布式事务,我们这里也可以来做一个试验。

       首先:往Spring配置文件applicationContext.xml中添加如下配置:

  

    <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
        <property name="dataSource" ref="dataSource"/>
    </bean>
 
    <jee:jndi-lookup jndi-name="jdbc/mysql" id="dataSource"/>
    <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
 
    <tx:annotation-driven transaction-manager="jtaTransactionManager"/>

  

       我们可以看到,在这里我们引入了一个jndi数据源,定义了一个JtaTransactionManager,定义了Spring基于注解的声明式事务管理,定义了一个Spring提供的进行Jdbc操作的工具类jdbcTemplate。

 

       接下来把我们的ConsumerMessageListener改为如下形式:

 
public class ConsumerMessageListener implements MessageListener {
 
    @Autowired
    private TestDao testDao;
    
    private int count = 0;
    
    public void onMessage(Message message) {
            //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换,或者直接把onMessage方法的参数改成Message的子类TextMessage
            TextMessage textMsg = (TextMessage) message;
            System.out.println(new Date().toLocaleString() + "接收到一个纯文本消息。");
            try {
                String text = textMsg.getText();
                System.out.println("消息内容是:" + text);
                System.out.println("当前count的值是:" + count);
                testDao.insert(text + count);
                if (count == 0) {
                    count ++;
                    throw new RuntimeException("Error! 出错啦!");
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
    }
 
}

 

       我们可以看到,在ConsumerMessageListener中我们定义了一个实例变量count,其初始值为0;在onMessage里面,我们可以看到我们把接收到的消息内容作为参数调用了testDao的insert方法;当count值为0,也就是进行第一次消息接收的时候会将count的值加1,同时抛出一个运行时异常。那么我们这里要测试的就是进行第一次接收的时候testDao已经把相关内容插入数据库了,接着在onMessage里面抛出了一个异常同时count加1,我们预期的结果应该是此时数据库进行回滚,同时JMS也回滚,这样JMS将继续尝试接收该消息,此时同样会调用testDao的insert方法将内容插入数据库,再接着count已经不为0了,所以此时将不再抛出异常,JMS成功进行消息的接收,testDao也成功的将消息内容插入到了数据库。要证明这个预期我们除了看数据库中插入的数据外,还可以看控制台的输出,正常情况控制台将输出两次消息接收的内容,且第一次时count为0,第二次count为1。

       TestDao是一个接口,其TestDaoImpl对insert的方法实现如下: 

 

 
    @Transactional(readOnly=false)
    public void insert(final String name) {
        
        jdbcTemplate.update("insert into test(name) values(?)", name);
 
    }

       

这里我们使用支持JtaTransactionManager的Weblogic来进行测试,因为是Web容器,所以我们这里定义了一个Controller来进行消息的发送,具体代码如下:

 

@Controller
@RequestMapping("test")
public class TestController {
 
    @Autowired
    @Qualifier("queueDestination")
    private Destination destination;
    
    @Autowired
    private ProducerService producerService;
    
    @RequestMapping("first")
    public String first() {
        producerService.sendMessage(destination, "你好,现在是:" + new Date().toLocaleString());
        return "/test/first";
    }
    
}

      

接下来就是启用Weblogic服务器,进入其控制台,定义一个名叫“jdbc/mysql”的JNDI数据源,然后把该项目部署到Weblogic服务器上并进行启动。接下来我们就可以访问/test/first.do访问到上述first方法。之后控制台会输出如下信息:



        我们可以看到当count为0时接收了一次,并随后抛出了异常,之后count为1又接收了一次,这说明在count为0时抛出异常后我们的JMS进行回滚了,那么我们的数据库是否有进行回滚呢?接着我们来看数据库中的内容:



        我们可以看到数据库表中只有一条记录,而且最后一位表示count的值的为1,这说明在JMS进行消息接收抛出异常时我们的数据库也回滚了。关于使用JtaTransactionManager进行分布式事务管理的问题就说到这里了,有兴趣的朋友可以自己试验一下。

 

 

分享到:
评论

相关推荐

    Spring整合JMS——实现收发消息

    总结来说,Spring整合JMS和ActiveMQ的过程包括:配置ConnectionFactory,定义Destination,创建MessageListener容器,以及使用JmsTemplate发送消息。通过这种方式,你可以构建一个健壮的、异步的消息传递系统,提高...

    ActiveMQ整合spring的Demo

    6. **事务管理**:Spring和ActiveMQ可以结合使用JTA事务管理,确保消息的可靠传递。在事务内发送的消息只有在事务成功提交后才会被传递,这样可以保证数据的一致性。 7. **持久化与消息确认**:ActiveMQ支持消息...

    SpringMVC精品资源-- SSM框架——详细整合教程(Spring+SpringMVC+MyBatis).zip

    SSM框架,即Spring、SpringMVC和...这个"SpringMVC精品资源-- SSM框架——详细整合教程(Spring+SpringMVC+MyBatis)"压缩包可能包含了从基础概念到实战示例的全面教程,对于学习和提升SSM框架的整合应用大有裨益。

    ActiveMQ 入门实战(3)--SpringBoot 整合 ActiveMQ(csdn)————程序.pdf

    - **ActiveMQ**:Apache ActiveMQ 是一个开源的消息中间件,它实现了 Java Message Service (JMS) 规范,提供可靠的消息传递和队列管理。 - **JMS**:Java Message Service 是一个标准接口,用于在分布式环境中交换...

    【架构视角】一篇文章带你彻底吃透Spring.doc

    比如,使用Spring的声明式事务管理,开发者只需在配置中指定哪些方法需要事务支持,而不必手动编写事务管理代码。 Spring框架的外延部分则涵盖了各种服务和扩展,如数据访问支持(JDBC、ORM、OXM、JMS、...

    ActiveMQ与Spring联合使用Demo

    在将ActiveMQ与Spring整合的过程中,我们需要完成以下步骤: 1. **配置ActiveMQ**:首先,我们需要在本地或服务器上安装并运行ActiveMQ。这可以通过下载ActiveMQ的二进制包,然后启动`bin/activemq`脚本来完成。...

    spring配置全书

    本书首先会介绍Spring的基本概念,包括Spring的核心模块——Core Container,它包含了Bean工厂(BeanFactory)和ApplicationContext,这两个是管理对象生命周期和依赖关系的关键组件。通过XML配置或注解方式,你可以...

    深入Spring2-轻量级J2EE开发框架原理与实践

    读者可以期待了解到Spring2的完整功能集,包括Spring的MVC、AOP、ORM、事务管理、JMS(Java消息服务)支持等方面的知识。此外,书中可能还会涉及Spring与其他开源技术的整合,如Spring Security、Spring Batch等,以...

    spring培训学习笔记

    AOP是Spring提供的另一关键特性,它允许开发者定义“切面”——关注点的模块化,如日志、事务管理等。切面可以在不修改业务逻辑代码的情况下,横向插入到代码执行流程中。Spring支持两种AOP类型:基于代理的AOP和...

    spring+MQ消息队列

    6. **事务和持久化**:Spring和ActiveMQ结合可以支持事务性的消息发送,确保消息在事务成功提交后才真正发送。同时,ActiveMQ提供持久化机制,即使在服务器宕机后,未消费的消息也能恢复。 7. **性能优化**:...

    spring-aspects.zip

    5. 其他模块如spring-jms、spring-tx等则分别对应于消息队列和事务管理,AOP同样可以在这里发挥重要作用,如消息发送前的验证、事务边界内的操作等。 总结,Spring Aspects是Spring框架中的重要组成部分,通过面向...

    精通spring2.x企业应用开发详解

    本章主要讲解Spring的核心特性之一——依赖注入,它是实现松耦合的关键。DI允许开发者通过配置文件或注解来管理对象之间的依赖关系,而非在代码中硬编码。这使得系统更易于测试和维护。 2. **Chapter 7:AOP(面向...

    spring陈华雄学习光盘代码二部分.rar

    - **Chapter 9**:可能讲解Spring与数据访问的整合,如使用JDBC、JPA或MyBatis进行数据库操作,并涉及事务管理。 - **Chapter 10**:可能涉及Spring的单元测试和集成测试,使用JUnit、Mockito等工具进行测试驱动...

    activemq-spring-1.3.jar.zip

    《ActiveMQ与Spring整合——深度解析activemq-spring-1.3.jar》 在Java消息服务(JMS)领域,Apache ActiveMQ是一款广泛应用的消息中间件,它支持多种协议,如OpenWire、AMQP、STOMP等,为分布式系统提供可靠的消息...

    spring源码(5.1.1)

    首先,让我们关注 Spring 的核心模块——Core Container。该模块包括 Beans、Context 和 Core Support 三个子模块。Beans 模块提供了 IoC(Inversion of Control,控制反转)容器,它是 Spring 框架的核心。通过 XML...

    apahe cxf 与spring 开发例子1

    整合Apache CXF和Spring不仅简化了服务的开发,还提供了更高级的功能,如AOP安全控制、事务管理等。这样的集成是现代企业级Java应用中常见的做法,能够提高开发效率,同时保持代码的清晰性和可维护性。

    ssh整合框架包

    Spring还提供了对JDBC、JMS、JTA等的抽象层,简化了数据库操作,并支持事务管理。此外,Spring的MVC模块可以作为Web层的控制器,负责处理HTTP请求和响应。 Struts框架则专注于Web层的开发,它遵循Model-View-...

    activemq-spring-1.2_001.jar.zip

    《ActiveMQ与Spring整合——深度解析activemq-spring-1.2_001.jar.zip》 在Java世界中,消息中间件ActiveMQ扮演着至关重要的角色,它提供了高效、可靠的异步通信机制,而Spring框架则以其强大的依赖注入和面向切面...

    J2EE面试题(包含了大部分的框架面试题如:hibernate Spring Strust 等)

    本资源主要关注的是J2EE相关的面试题目,特别是与三大主流框架——Hibernate、Spring和Struts紧密相关的部分。这些框架在现代企业级应用开发中占据了核心地位,理解并掌握它们是成为合格J2EE开发者的关键。 ...

Global site tag (gtag.js) - Google Analytics