`

分布式java事务一

阅读更多
Java事务处理全解析(八)——分布式事务入门例子(Spring+JTA+Atomikos+Hibernate+JMS)
在本系列先前的文章中,我们主要讲解了JDBC对本地事务的处理,本篇文章将讲到一个分布式事务的例子。

 

请通过以下方式下载github源代码:

git clone https://github.com/davenkin/jta-atomikos-hibernate-activemq.git

本地事务和分布式事务的区别在于:本地事务只用于处理单一数据源事务(比如单个数据库),分布式事务可以处理多种异构的数据源,比如某个业务操作中同时包含了JDBC和JMS或者某个操作需要访问多个不同的数据库。

 

Java通过JTA完成分布式事务,JTA本身只是一种规范,不同的应用服务器都包含有自己的实现(比如JbossJTA),同时还存在独立于应用服务器的单独JTA实现,比如本篇中要讲到的Atomikos。对于JTA的原理,这里不细讲,读者可以通过这篇文章了解相关知识。

 

在本篇文章中,我们将实现以下一个应用场景:你在网上购物,下了订单之后,订单数据将保存在系统的数据库中,同时为了安排物流,订单信息将以消息(Message)的方式发送到物流部门以便送货。

 

以上操作同时设计到数据库操作和JMS消息发送,为了使整个操作成为一个原子操作,我们只能选择分布式事务。我们首先设计一个service层,定义OrderService接口:

package davenkin;

public interface OrderService {

    public void makeOrder(Order order);
}
 

为了简单起见,我们设计一个非常简单的领域对象Order:


@XmlRootElement(name = "Order")
@XmlAccessorType(XmlAccessType.FIELD)
public class Order {

    @XmlElement(name = "Id",required = true)
    private long id;

    @XmlElement(name = "ItemName",required = true)
    private String itemName;

    @XmlElement(name = "Price",required = true)
    private double price;

    @XmlElement(name = "BuyerName",required = true)
    private String buyerName;

    @XmlElement(name = "MailAddress",required = true)
    private String mailAddress;

    public Order() {
    }

 

为了采用JAXB对Order对象进行Marshal和Unmarshal,我们在Order类中加入了JAXB相关的Annotation。 我们将使用Hibernate来完成数据持久化,然后使用Spring提供的JmsTemplate将Order转成xml后以TextMessage的形式发送到物流部门的ORDER.QUEUE中。

 

(一)准备数据库

为了方便,我们将采用Spring提供的embedded数据库,默认情况下Spring采用HSQL作为后台数据库,虽然在本例中我们将采用HSQL的非XA的DataSource,但是通过Atomikos包装之后依然可以参与分布式事务。

SQL脚本包含在createDB.sql文件中:


CREATE TABLE USER_ORDER(
ID INT NOT NULL,
ITEM_NAME VARCHAR (100) NOT NULL UNIQUE,
PRICE DOUBLE NOT NULL,
BUYER_NAME CHAR (32) NOT NULL,
MAIL_ADDRESS VARCHAR(500) NOT NULL,
PRIMARY KEY(ID)
);

 

在Spring中配置DataSource如下:

    <jdbc:embedded-database id="dataSource">
        <jdbc:script location="classpath:createDB.sql"/>
    </jdbc:embedded-database>
 

(二)启动ActiveMQ

我们将采用embedded的ActiveMQ,在测试之前启动ActiveMQ提供的BrokerService,在测试执行完之后关闭BrokerService。


  @BeforeClass
    public static void startEmbeddedActiveMq() throws Exception {
        broker = new BrokerService();
        broker.addConnector("tcp://localhost:61616");
        broker.start();
    }

    @AfterClass
    public static void stopEmbeddedActiveMq() throws Exception {
        broker.stop();
    }

 

(三)实现OrderService

创建一个DefaultOrderService,该类实现了OrderService接口,并维护一个JmsTemplate和一个Hibernate的SessionFactory实例变量,分别用于Message的发送和数据库处理。


package davenkin;

import org.hibernate.SessionFactory;
import org.hibernate.classic.Session;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.transaction.annotation.Transactional;

public class DefaultOrderService  implements OrderService{
    private JmsTemplate jmsTemplate;
    private SessionFactory sessionFactory;

    @Override
    @Transactional
    public void makeOrder(Order order) {
        Session session = sessionFactory.getCurrentSession();
        session.save(order);
        jmsTemplate.convertAndSend(order);

    }

    @Required
    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    @Required
    public void setSessionFactory(SessionFactory sessionFactory) {
        this.sessionFactory = sessionFactory;
    }
}

 

(四)创建Order的Mapping配置文件


<?xml version="1.0"?>
<!DOCTYPE hibernate-mapping PUBLIC "-//Hibernate/Hibernate Mapping DTD 3.0//EN"
        "http://hibernate.sourceforge.net/hibernate-mapping-3.0.dtd">

<hibernate-mapping>
    <class name="davenkin.Order" table="USER_ORDER">
        <id name="id" type="long">
            <column name="ID" />
            <generator class="increment" />
        </id>
        <property name="itemName" type="string">
            <column name="ITEM_NAME" />
        </property>
        <property name="price" type="double">
            <column name="PRICE"/>
        </property>
        <property name="buyerName" type="string">
            <column name="BUYER_NAME"/>
        </property>
        <property name="mailAddress" type="string">
            <column name="MAIL_ADDRESS"/>
        </property>
    </class>
</hibernate-mapping>

 

(五)配置Atomikos事务

在Spring的IoC容器中,我们需要配置由Atomikos提供的UserTransaction和TransactionManager,然后再配置Spring的JtaTransactionManager:


  <bean id="userTransactionService" class="com.atomikos.icatch.config.UserTransactionServiceImp" init-method="init" destroy-method="shutdownForce">
        <constructor-arg>
            <props>
                <prop key="com.atomikos.icatch.service">com.atomikos.icatch.standalone.UserTransactionServiceFactory</prop>
            </props>
        </constructor-arg>
    </bean>

    <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close" depends-on="userTransactionService">
        <property name="forceShutdown" value="false" />
    </bean>

    <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp" depends-on="userTransactionService">
        <property name="transactionTimeout" value="300" />
    </bean>

    <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager" depends-on="userTransactionService">
        <property name="transactionManager" ref="atomikosTransactionManager" />
        <property name="userTransaction" ref="atomikosUserTransaction" />
    </bean>

    <tx:annotation-driven transaction-manager="jtaTransactionManager" />

 

(六)配置JMS

对于JMS,为了能使ActiveMQ加入到分布式事务中,我们需要配置ActiveMQXAConnectionFactory,而不是ActiveMQConnectionFactory,然后再配置JmsTemplate,此外还需要配置MessageConvertor在Order对象和XML之间互转。


    <bean id="jmsXaConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616" />
    </bean>

    <bean id="amqConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init">
        <property name="uniqueResourceName" value="XAactiveMQ" />
        <property name="xaConnectionFactory" ref="jmsXaConnectionFactory" />
        <property name="poolSize" value="5"/>
    </bean>


    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="amqConnectionFactory"/>
        <property name="receiveTimeout" value="2000" />
        <property name="defaultDestination" ref="orderQueue"/>
        <property name="sessionTransacted" value="true" />
        <property name="messageConverter" ref="oxmMessageConverter"/>
    </bean>

    <bean id="orderQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="ORDER.QUEUE"/>
    </bean>


    <bean id="oxmMessageConverter"
          class="org.springframework.jms.support.converter.MarshallingMessageConverter">
        <property name="marshaller" ref="marshaller"/>
        <property name="unmarshaller" ref="marshaller"/>
    </bean>

    <oxm:jaxb2-marshaller id="marshaller">
        <oxm:class-to-be-bound name="davenkin.Order"/>
    </oxm:jaxb2-marshaller>

 

(七)测试

在测试中,我们首先通过(二)中的方法启动ActiveMQ,再调用DefaultOrderService,最后对数据库和QUEUE进行验证:


   @Test
    public void makeOrder(){
        orderService.makeOrder(createOrder());
        JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
        assertEquals(1, jdbcTemplate.queryForInt("SELECT COUNT(*) FROM USER_ORDER"));
        String dbItemName = jdbcTemplate.queryForObject("SELECT ITEM_NAME FROM USER_ORDER", String.class);
        String messageItemName = ((Order) jmsTemplate.receiveAndConvert()).getItemName();
        assertEquals(dbItemName, messageItemName);
    }

    @Test(expected = IllegalArgumentException.class)
    public void failToMakeOrder()
    {
        orderService.makeOrder(null);
        JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
        assertEquals(0, jdbcTemplate.queryForInt("SELECT COUNT(*) FROM USER_ORDER"));
        assertNull(jmsTemplate.receiveAndConvert());
    }

 
分享到:
评论

相关推荐

    java分布式事务demo

    Java分布式事务是大型分布式系统中不可或缺的一个重要组成部分,它确保在多个网络节点间的数据操作能够保持一致性和完整性。在分布式环境中,由于多个服务之间可能存在数据交互,因此需要一种机制来处理跨服务的数据...

    分布式Java应用基础与实践pdf

    分布式Java应用基础与实践是Java开发领域中的一个重要主题,它涉及到如何通过网络将多个独立的计算机节点连接起来,协同处理任务,以实现系统的高可用性、高性能和可伸缩性。在Java中,分布式系统主要依赖于一些核心...

    分布式java应用.pdf

    在分布式Java应用中,有一些框架扮演了重要角色,比如Spring框架,它提供了一个全面的编程和配置模型,支持企业级开发。其中Spring Boot简化了基于Spring的应用开发,而Spring Cloud提供了在分布式系统中实现微服务...

    分布式Java应用基础与实践源码.zip

    通过上述内容,我们可以看到分布式Java应用涉及的领域广泛,涵盖了从通信、数据存储到计算和事务处理等多个方面。源码分析是深入理解这些技术的关键,它可以帮助我们更好地掌握分布式系统的设计和实现。在实践中,...

    分布式Java应用

    《分布式Java应用:基础与实践》一书由林昊著,为学习这一领域的读者提供了宝贵的资源。 首先,分布式Java应用的基础包括网络编程、并发处理和远程方法调用(RMI)。网络编程是分布式系统的核心,Java的Socket API...

    基于Java的分布式事务解决方案myth设计源码

    该项目是一个采用消息队列解决分布式事务的开源框架,基于Java语言开发(JDK1.8),并支持dubbo、springcloud、motan等RPC框架进行分布式事务处理。通过该项目,开发者可以学习并实践分布式事务的处理,为后续的...

    分布式java应用_林昊_电子版

    分布式Java应用是现代互联网行业中广泛讨论的话题,尤其在大型企业级系统中,为了处理高并发、海量数据等问题,采用分布式架构成为了必要的选择。林昊,作为知名的Java技术专家,他的著作《分布式Java应用》深入浅出...

    分布式java应用完整版

    总的来说,分布式Java应用是一个涵盖广泛的技术领域,包括了系统架构、网络通信、数据处理等多个层面。通过学习和实践,开发者可以提升自己在大型、复杂系统中的设计和开发能力,为构建高效、稳定的企业级应用打下...

    分布式java应用基础与实践附带源码(完整版)

    分布式Java应用基础与实践是Java开发领域中的一个重要话题,它涉及到如何在多个计算机节点上协同工作,以提高系统的可扩展性、可靠性和性能。在这个完整版的资源中,包括了一本名为“分布式JAVA应用 基础与实践.pdf...

    分布式JAVA应用基础与实践(林昊)完整版pad+源码

    分布式JAVA应用基础与实践是Java开发领域中一个重要的主题,主要涵盖了如何在大规模网络环境中设计、部署和管理Java应用程序。本书由林昊编著,旨在帮助开发者深入理解分布式系统的基本概念,掌握Java在分布式环境中...

    java+spring+mybatis+mysql+RuoYi-atomikos-实现分布式事务.zip

    本项目"java+spring+mybatis+mysql+RuoYi-atomikos-实现分布式事务.zip"是一个基于若依(RuoYi)框架改造的多模块分布式事务解决方案,它利用了Atomikos这一强大的分布式事务管理器。以下将详细解析这个项目的知识点...

    分布式事务.pdf

    分布式事务是为了解决分布式系统中跨越多个节点的操作,要求这些操作要么全部成功要么全部失败的一种事务机制。它是为了保证在不同节点上的数据一致性而产生的概念。分布式事务广泛应用于微服务架构、数据库分库分表...

    分布式Java应用基础与实践[带目录书签]

    分布式Java应用基础与实践是Java开发中的重要领域,它涵盖了多台计算机之间的协作,通过网络进行数据交换和服务调用,以实现大规模系统的高效运行。在Java世界中,分布式技术广泛应用于构建可扩展、高可用的互联网...

    记录redisson实现redis分布式事务锁

    Redisson是基于Redis的Java客户端,它提供了丰富的数据结构和服务,包括分布式锁、信号量、队列、计数器等,极大地扩展了Redis在分布式系统中的应用能力。本篇文章将详细探讨如何使用Redisson实现Redis分布式事务锁...

    分布式数据库事务处理(COM+实现)

    分布式数据库事务处理是数据库系统中的一个重要概念,尤其是在大型企业级应用和互联网服务中,它能够保证数据的一致性和完整性,即使在多台计算机之间进行数据操作。COM+(Component Object Model Plus)是微软提出...

    springboot多数据源即分布式事务解决方案

    SpringBoot作为一款轻量级的框架,提供了便捷的多数据源配置和分布式事务管理方案,使得开发者能够高效地管理和操作不同的数据库。本文将详细探讨SpringBoot如何实现多数据源以及分布式事务。 首先,我们要理解什么...

    分布式JAVA应用+基础与实践(完整版)

    1. **分布式系统概念**:分布式系统是由多台独立的计算机节点通过网络互相连接,共同协作完成一个单一任务的系统。它涉及到负载均衡、数据一致性、容错机制等核心概念。 2. **Java分布式服务框架**:包括RMI...

    (开发入门)分布式java基础与实践

    分布式Java基础与实践是开发者进入大型系统开发领域的重要学习内容,尤其对于初学者而言,掌握这一领域的基础知识至关重要。本文将围绕分布式系统、Java在其中的角色以及实践应用进行深入讲解。 首先,分布式系统是...

Global site tag (gtag.js) - Google Analytics