`

Java事务处理全解析(一至八)

    博客分类:
  • jta
jta 
阅读更多

Java中的事务处理有多简单?在使用EJB时,事务在我们几乎察觉不到的情况下发挥着 作用;而在使用Spring时,也只需要配置一个TransactionManager,然后在需要事务的方法上加上Transactional注解就行 了。Java的事务处理之所以这么简单是因为框架在背后为我们做了太多的工作。这样,我们虽然可以快速地完成开发工作,但是一旦程序出现问题,在一阵 google和stackoverflow之后,你估计还是一筹莫展。作为一个有技术追求的程序员,你应该了解Java事务的底层工作原理。

 

这是一个关于Java事务处理的系列文章,请通过以下方式下载github源代码:

git clone https://github.com/davenkin/java_transaction_workshop.git

 

本系列文章将在不依赖Spring和Java EE容器的条件下讲解Java中事务处理的基本方法与原理,包含以下文章:

 

   (一)Java事务处理的基本问题

   (二)失败的案例

   (三)丑陋的案例

   (四)成功的案例(自己实现一个线程安全的TransactionManager)

   (五)Template模式

   (六)使用动态代理(Dynamic Proxy)完成事务

   (七)像Spring一样使用Transactional注解(Annotation)

   (八)分布式事务入门例子(Spring+JTA+Atomikos+Hibernate+JMS)

 

(一)Java事务处理的基本问题

Java通过JDBC与数据库进行交互,这是一个如今多数程序员都不会直接使用的技术,我们更倾向于使用Hibernate和Mybatis,但是,他们在底层都需要JDBC与数据库通信,事务处理亦是如此,那么,我们首先来看看JDBC提供的事务处理API。

 

(1)JDBC提供的事务处理API

JDBC提供的事务处理API非常少,请不要被Spring中事务处理的那一堆源代码所打击得信心尽失,这些框架提供的事务处理功能归根结底主要通过以Connection类的方法完成:

Connection.setAutoCommit(boolean);

Connection.commit();

Connection.rollback();

 

在Spring的事务处理源代码中,有很多都是处理多线程的,另外一些使用了一些设计模式。不要惊慌,在本系列中(除了系列八),你将看不到任何 Spring的影子,我们会通过简单的代码来学习Java事务,学完之后,你可以阅读一下Spring的事务处理源代码,然后将本系列中的事务处理原理与 Spring对比,你会发现,Spring要面临与处理的问题也是本系列文章中遇到的问题。

 

(2)本地事务和分布式事务

本地(Local Transaction)事务指只有一个数据源参与的事务,比如只有数据库或者只有JMS;分布式事务(Distributed Transaction)指有多个数据源同时参与的事务,比如一项操作需要同时访问数据库和通过JMS发送消息,或者一项操作需要同时访问两个不同数据 库。对于分布式事务,Java提供了JTA规范,它的原理与本地事务存在不同。 鉴于多数情况下Java事务为本地事务,本系列主要讲解本地事务,而在系列八中有分布式事务的入门例子。

 

(3)线程安全

线程安全是Java事务处理的一大难点,比如一个DAO类维护了一个Connection实例变量,两个线程同时使用该DAO类与数据库交互,其中 一个在使用完Connection后将其关闭,而此时另一个线程正在使用该Connection访问数据库,这时另一个线程对数据库的访问将失败。在本系 列的后续文章中,我们将学到如何处理这样的问题并开发线程安全的程序。

 

(4)Service层和DAO层

通常来说,数据持久化层又分为Service层和DAO层,Service层用于完成与业务逻辑有关的工作,并且Service层包含了工作单元 (Unit of work),也即Service层中的方法为事务作用的边界;DAO层用于完成对数据库的实际操作(增删改查)。有时在使用Hibernate或是JPA 时我们也会直接在Service层访问数据库而省略掉DAO层。在本系列中,我们会用一个BankService例子贯穿始终。该BankService 用于将用户银行账户(Bank Account)中的存款转帐到该用户的保险账户(Insurance Account)中,两个账户对应有不同的数据库表。

BankService需要两个DAO类协同起来工作,一个负责银行账户表的操作,另一个负责保险账户表操作,这是一个典型的事务处理例子。在下一篇文章中,我们将学习一个关于该BankService事务处理失败的案例。

在本系列的上一篇文章中,我们讲到了Java事务处理的基本问题,并且讲到了Service层和DAO层,在本篇文章中,我们将以BankService为例学习一个事务处理失败的案例。

请通过以下方式下载github源代码:

git clone https://github.com/davenkin/java_transaction_workshop.git

 

BankService的功能为:某个用户有两个账户,分别为银行账户和保险账户,并且有各自的账户号,BankService的transfer方法从该用户的银行账户向保险账户转帐,两个DAO分别用于对两个账户表的存取操作。

定义一个BankService接口如下:

package davenkin;

public interface BankService {
    public void transfer(int fromId, int toId, int amount);
}

 

在两个DAO对象中,我们通过传入的同一个DataSource获得Connection,然后通过JDBC提供的API直接对数据库进行操作。

定义操作银行账户表的DAO类如下:

复制代码
package davenkin.step1_failure;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

public class FailureBankDao {
    private DataSource dataSource;

    public FailureBankDao(DataSource dataSource) {
        this.dataSource = dataSource;
    }


    public void withdraw(int bankId, int amount) throws SQLException {
        Connection connection = dataSource.getConnection();
        PreparedStatement selectStatement = connection.prepareStatement("SELECT BANK_AMOUNT FROM BANK_ACCOUNT WHERE BANK_ID = ?");
        selectStatement.setInt(1, bankId);
        ResultSet resultSet = selectStatement.executeQuery();
        resultSet.next();
        int previousAmount = resultSet.getInt(1);
        resultSet.close();
        selectStatement.close();


        int newAmount = previousAmount - amount;
        PreparedStatement updateStatement = connection.prepareStatement("UPDATE BANK_ACCOUNT SET BANK_AMOUNT = ? WHERE BANK_ID = ?");
        updateStatement.setInt(1, newAmount);
        updateStatement.setInt(2, bankId);
        updateStatement.execute();

        updateStatement.close();
        connection.close();

    }
}
复制代码

 

FailureBankDao的withdraw方法,从银行账户表(BANK_ACCOUNT)中帐号为bankId的用户账户中取出数量为amount的金额。

采用同样的方法,定义保险账户的DAO类如下:

复制代码
package davenkin.step1_failure;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

public class FailureInsuranceDao {
    private DataSource dataSource;

    public FailureInsuranceDao(DataSource dataSource){
        this.dataSource = dataSource;
    }

    public void deposit(int insuranceId, int amount) throws SQLException {
        Connection connection = dataSource.getConnection();
        PreparedStatement selectStatement = connection.prepareStatement("SELECT INSURANCE_AMOUNT FROM INSURANCE_ACCOUNT WHERE INSURANCE_ID = ?");
        selectStatement.setInt(1, insuranceId);
        ResultSet resultSet = selectStatement.executeQuery();
        resultSet.next();
        int previousAmount = resultSet.getInt(1);
        resultSet.close();
        selectStatement.close();


        int newAmount = previousAmount + amount;
        PreparedStatement updateStatement = connection.prepareStatement("UPDATE INSURANCE_ACCOUNT SET INSURANCE_AMOUNT = ? WHERE INSURANCE_ID = ?");
        updateStatement.setInt(1, newAmount);
        updateStatement.setInt(2, insuranceId);
        updateStatement.execute();

        updateStatement.close();
        connection.close();
    }
}
复制代码

 

FailureInsuranceDao类的deposit方法向保险账户表(INSURANCE_ACCOUNT)存入amount数量的金额, 这样在BankService中,我们可以先调用FailureBankDao的withdraw方法取出一定金额的存款,再调用 FailureInsuranceDao的deposit方法将该笔存款存入保险账户表中,一切看似OK,实现BankService接口如下:

复制代码
package davenkin.step1_failure;

import davenkin.BankService;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;

public class FailureBankService implements BankService{
    private FailureBankDao failureBankDao;
    private FailureInsuranceDao failureInsuranceDao;
    private DataSource dataSource;

    public FailureBankService(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    public void transfer(int fromId, int toId, int amount) {
        Connection connection = null;
        try {
            connection = dataSource.getConnection();
            connection.setAutoCommit(false);

            failureBankDao.withdraw(fromId, amount);
            failureInsuranceDao.deposit(toId, amount);

            connection.commit();
        } catch (Exception e) {
            try {
                assert connection != null;
                connection.rollback();
            } catch (SQLException e1) {
                e1.printStackTrace();
            }
        } finally {
            try
            {
                assert connection != null;
                connection.close();
            } catch (SQLException e)
            {
                e.printStackTrace();
            }
        }
    }

    public void setFailureBankDao(FailureBankDao failureBankDao) {
        this.failureBankDao = failureBankDao;
    }

    public void setFailureInsuranceDao(FailureInsuranceDao failureInsuranceDao) {
        this.failureInsuranceDao = failureInsuranceDao;
    }
}
复制代码

 

在FailureBankService的transfer方法中,我们首先通过DataSource获得Connection,然后调用 connection.setAutoCommit(false)已开启手动提交模式,如果一切顺利,则commit,如果出现异常,则 rollback。 接下来,开始测试我们的BankService吧。

为了准备测试数据,我们定义个BankFixture类,该类负责在每次测试之前准备测试数据,分别向银行账户(1111)和保险账户(2222) 中均存入1000元。BankFixture还提供了两个helper方法(getBankAmount和getInsuranceAmount)帮助我 们从数据库中取出数据以做数据验证。我们使用HSQL数据库的in-memory模式,这样不用启动数据库server,方便测试。 BankFixture类定义如下:

复制代码
package davenkin;

import org.junit.Before;
import javax.sql.DataSource;
import java.sql.*;

public class BankFixture
{

    protected final DataSource dataSource = DataSourceFactory.createDataSource();

    @Before
    public void setUp() throws SQLException
    {
        Connection connection = dataSource.getConnection();
        Statement statement = connection.createStatement();

        statement.execute("DROP TABLE BANK_ACCOUNT IF EXISTS");
        statement.execute("DROP TABLE INSURANCE_ACCOUNT IF EXISTS");
        statement.execute("CREATE TABLE BANK_ACCOUNT (\n" +
                "BANK_ID INT,\n" +
                "BANK_AMOUNT INT,\n" +
                "PRIMARY KEY(BANK_ID)\n" +
                ");");

        statement.execute("CREATE TABLE INSURANCE_ACCOUNT (\n" +
                "INSURANCE_ID INT,\n" +
                "INSURANCE_AMOUNT INT,\n" +
                "PRIMARY KEY(INSURANCE_ID)\n" +
                ");");

        statement.execute("INSERT INTO BANK_ACCOUNT VALUES (1111, 1000);");
        statement.execute("INSERT INTO INSURANCE_ACCOUNT VALUES (2222, 1000);");

        statement.close();
        connection.close();
    }

    protected int getBankAmount(int bankId) throws SQLException
    {
        Connection connection = dataSource.getConnection();
        PreparedStatement selectStatement = connection.prepareStatement("SELECT BANK_AMOUNT FROM BANK_ACCOUNT WHERE BANK_ID = ?");
        selectStatement.setInt(1, bankId);
        ResultSet resultSet = selectStatement.executeQuery();
        resultSet.next();
        int amount = resultSet.getInt(1);
        resultSet.close();
        selectStatement.close();
        connection.close();
        return amount;
    }

    protected int getInsuranceAmount(int insuranceId) throws SQLException
    {
        Connection connection = dataSource.getConnection();
        PreparedStatement selectStatement = connection.prepareStatement("SELECT INSURANCE_AMOUNT FROM INSURANCE_ACCOUNT WHERE INSURANCE_ID = ?");
        selectStatement.setInt(1, insuranceId);
        ResultSet resultSet = selectStatement.executeQuery();
        resultSet.next();
        int amount = resultSet.getInt(1);
        resultSet.close();
        selectStatement.close();
        connection.close();
        return amount;
    }

}
复制代码

 

编写的Junit测试继承自BankFixture类,测试代码如下:

复制代码
package davenkin.step1_failure;

import davenkin.BankFixture;
import org.junit.Test;
import java.sql.SQLException;
import static junit.framework.Assert.assertEquals;

public class FailureBankServiceTest extends BankFixture
{
    @Test
    public void transferSuccess() throws SQLException
    {
        FailureBankDao failureBankDao = new FailureBankDao(dataSource);
        FailureInsuranceDao failureInsuranceDao = new FailureInsuranceDao(dataSource);

        FailureBankService bankService = new FailureBankService(dataSource);
        bankService.setFailureBankDao(failureBankDao);
        bankService.setFailureInsuranceDao(failureInsuranceDao);

        bankService.transfer(1111, 2222, 200);

        assertEquals(800, getBankAmount(1111));
        assertEquals(1200, getInsuranceAmount(2222));

    }

    @Test
    public void transferFailure() throws SQLException
    {
        FailureBankDao failureBankDao = new FailureBankDao(dataSource);
        FailureInsuranceDao failureInsuranceDao = new FailureInsuranceDao(dataSource);

        FailureBankService bankService = new FailureBankService(dataSource);
        bankService.setFailureBankDao(failureBankDao);
        bankService.setFailureInsuranceDao(failureInsuranceDao);

        int toNonExistId = 3333;
        bankService.transfer(1111, toNonExistId, 200);

        assertEquals(1000, getInsuranceAmount(2222));
        assertEquals(1000, getBankAmount(1111));
    }
}
复制代码

 

运行测试,第一个测试(transferSuccess)成功,第二个测试(transferFailure)失败。

分析错误,原因在于:我们分别从FailureBankService,FailureBankDao和FailureInsuranceDao中 调用了三次dataSource.getConnection(),亦即我们创建了三个不同的Connection对象,而Java事务是作用于 Connection之上的,所以从在三个地方我们开启了三个不同的事务,而不是同一个事务。

第一个测试之所以成功,是因为在此过程中没有任何异常发生。虽然在FailureBankService中将Connection的提交模式改为了 手动提交,但是由于两个DAO使用的是各自的Connection对象,所以两个DAO中的Connection依然为默认的自动提交模式。

在第二个测试中,我们给出一个不存在的保险账户id(toNonExistId),就是为了使程序产生异常,然后在assertion语句中验证两 张表均没有任何变化,但是测试在第二个assertion语句处出错。发生异常时,银行账户中的金额已经减少,而虽然程序发生了rollback,但是调 用的是FailureBankService中Connection的rollback,而不是FailureInsuranceDao中 Connection的,对保险账户的操作根本就没有执行,所以保险账户中依然为1000,而银行账户却变为了800。

因此,为了使两个DAO在同一个事务中,我们应该在整个事务处理过程中使用一个Connection对象,在下一篇文章中,我们将讲到通过共享Connection对象的方式达到事务处理的目的。

本系列上一篇文 章中,我们看到了一个典型的事务处理失败的案例,其主要原因在于,service层和各个DAO所使用的Connection是不一样的,而JDBC中事 务处理的作用对象正是Connection对象,所以不同DAO中的操作不在同一个事务里面,从而导致事务失败。从中我们得出了教训:要避免这种失败,我 们可以使所有操作共享一个Connection对象,这样应该就没有问题了。

 

请通过以下方式下载本系列文章的github源代码:

git clone https://github.com/davenkin/java_transaction_workshop.git

 

在本篇文章中,我们将看到一个成功的,但是丑陋的事务处理方案,它的基本思路是:在service层创建Connection对象,再将该Connection传给各个DAO类,这样就完成了Connection共享的目的。

 

修改两个DAO类,使他们都接受一个Connection对象,定义UglyBankDao类如下:

复制代码
package davenkin.step2_ugly;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

public class UglyBankDao
{
    public void withdraw(int bankId, int amount, Connection connection) throws SQLException
    {
        PreparedStatement selectStatement = connection.prepareStatement("SELECT BANK_AMOUNT FROM BANK_ACCOUNT WHERE BANK_ID = ?");
        selectStatement.setInt(1, bankId);
        ResultSet resultSet = selectStatement.executeQuery();
        resultSet.next();
        int previousAmount = resultSet.getInt(1);
        resultSet.close();
        selectStatement.close();

        int newAmount = previousAmount - amount;
        PreparedStatement updateStatement = connection.prepareStatement("UPDATE BANK_ACCOUNT SET BANK_AMOUNT = ? WHERE BANK_ID = ?");
        updateStatement.setInt(1, newAmount);
        updateStatement.setInt(2, bankId);
        updateStatement.execute();

        updateStatement.close();
    }
}
复制代码

 

使用同样的方法,定义UglyInsuranceDao类:

复制代码
package davenkin.step2_ugly;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

public class UglyInsuranceDao
{
    public void deposit(int insuranceId, int amount, Connection connection) throws SQLException
    {
        PreparedStatement selectStatement = connection.prepareStatement("SELECT INSURANCE_AMOUNT FROM INSURANCE_ACCOUNT WHERE INSURANCE_ID = ?");
        selectStatement.setInt(1, insuranceId);
        ResultSet resultSet = selectStatement.executeQuery();
        resultSet.next();
        int previousAmount = resultSet.getInt(1);
        resultSet.close();
        selectStatement.close();


        int newAmount = previousAmount + amount;
        PreparedStatement updateStatement = connection.prepareStatement("UPDATE INSURANCE_ACCOUNT SET INSURANCE_AMOUNT = ? WHERE INSURANCE_ID = ?");
        updateStatement.setInt(1, newAmount);
        updateStatement.setInt(2, insuranceId);
        updateStatement.execute();

        updateStatement.close();
    }
}
复制代码

 

然后修改Service类,在UglyBankService类的transfer方法中,首先创建一个Connection对象,然后在将该对象 依次传给UglyBankDao的withdraw方法和UglyInsuranceDao类的deposit方法,这样service层和DAO层使用 相同的Connection对象。定义UglyBankService类如下:

复制代码
package davenkin.step2_ugly;

import davenkin.BankService;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;

public class UglyBankService implements BankService
{
    private DataSource dataSource;
    private UglyBankDao uglyBankDao;
    private UglyInsuranceDao uglyInsuranceDao;

    public UglyBankService(DataSource dataSource)
    {
        this.dataSource = dataSource;
    }

    public void transfer(int fromId, int toId, int amount)
    {
        Connection connection = null;
        try
        {
            connection = dataSource.getConnection();
            connection.setAutoCommit(false);

            uglyBankDao.withdraw(fromId, amount, connection);
            uglyInsuranceDao.deposit(toId, amount, connection);

            connection.commit();
        } catch (Exception e)
        {
            try
            {
                assert connection != null;
                connection.rollback();
            } catch (SQLException e1)
            {
                e1.printStackTrace();
            }
        } finally
        {
            try
            {
                assert connection != null;
                connection.close();
            } catch (SQLException e)
            {
                e.printStackTrace();
            }
        }
    }

    public void setUglyBankDao(UglyBankDao uglyBankDao)
    {
        this.uglyBankDao = uglyBankDao;
    }

    public void setUglyInsuranceDao(UglyInsuranceDao uglyInsuranceDao)
    {
        this.uglyInsuranceDao = uglyInsuranceDao;
    }
}
复制代码

 

通过上面共享Connection对象的方法虽然可以完成事务处理的目的,但是这样做法是丑陋的,原因在于:为了完成事务处理的目的,我们需要将一 个底层的Connection类在service层和DAO层之间进行传递,而DAO层的方法也要接受这个Connection对象,这种做法显然是不好 的,这就是典型的API污染。

 

下一篇博文中,我们将讲到如何在不传递Connection对象的情况下完成和本文相同的事务处理功能。

本系列上一篇文章中我们讲到,要实现在同一个事务中使用相同的Connection对象,我们可以通过传递Connection对象的方式达到共享的目的,但是这种做法是丑陋的。在本篇文章中,我们将引入另外一种机制(ConnectionHolder)来完成事务管理。

 

这是一个关于Java事务处理的系列文章,请通过以下方式下载github源代码:

git clone https://github.com/davenkin/java_transaction_workshop.git

 

ConnectionHolder的工作机制是:我们将Connection对象放在一个全局公用的地方,然后在不同的操作中都从这个地方取得 Connection,从而完成Connection共享的目的,这也是一种ServiceLocator模式,有点像JNDI。定义一个 ConnectionHolder类如下:

复制代码
package davenkin.step3_connection_holder;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;

public class ConnectionHolder
{
    private Map<DataSource, Connection> connectionMap = new HashMap<DataSource, Connection>();

    public Connection getConnection(DataSource dataSource) throws SQLException
    {
        Connection connection = connectionMap.get(dataSource);
        if (connection == null || connection.isClosed())
        {
            connection = dataSource.getConnection();
            connectionMap.put(dataSource, connection);
        }

        return connection;
    }

    public void removeConnection(DataSource dataSource)
    {
        connectionMap.remove(dataSource);
    }
}
复制代码

 

从ConnectionHolder类中可以看出,我们维护了一个键为DataSource、值为Connection的Map,这主要用于使 ConnectionHolder可以服务多个DataSource。在调用getConnection方法时传入了一个DataSource对象,如果 Map里面已经存在该DataSource对应的Connection,则直接返回该Connection,否则,调用DataSource的 getConnection方法获得一个新的Connection,再将其加入到Map中,最后返回该Connection。这样在同一个事务过程中,我 们先后从ConnectionHolder中取得的Connection是相同的,除非在中途我们调用了ConnectionHolder的 removeConnection方法将当前Connection移除掉或者调用了Connection.close()将Connection关闭,然 后在后续的操作中再次调用ConnectionHolder的getConnection方法,此时返回的则是一个新的Connection对象,从而导 致事务处理失败,你应该不会做出这种中途移除或关闭Connection的事情。

 

然而,虽然我们不会自己手动地在中途移除或者关闭Conncetion对象(当然,在事务处理末尾我们应该关闭Conncetion),我们却无法 阻止其他线程这么做。比如,ConnectionHolder类是可以在多个线程中同时使用的,并且这些线程使用了同一个DataSource,其中一个 线程使用完Connection后便将其关闭,而此时另外一个线程正试图使用这个Connection,问题就出来了。因此,上面的 ConnectionHolder不是线程安全的。

 

为了获得线程安全的ConnectionHolder类,我们可以引入Java提供的ThreadLocal类,该类保证一个类的实例变量在各个线 程中都有一份单独的拷贝,从而不会影响其他线程中的实例变量。定义一个SingleThreadConnectionHolder类如下:

复制代码
package davenkin.step3_connection_holder;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;

public class SingleThreadConnectionHolder
{
    private static ThreadLocal<ConnectionHolder> localConnectionHolder = new ThreadLocal<ConnectionHolder>();

    public static Connection getConnection(DataSource dataSource) throws SQLException
    {
        return getConnectionHolder().getConnection(dataSource);
    }

    public static void removeConnection(DataSource dataSource)
    {
        getConnectionHolder().removeConnection(dataSource);
    }

    private static ConnectionHolder getConnectionHolder()
    {
        ConnectionHolder connectionHolder = localConnectionHolder.get();
        if (connectionHolder == null)
        {
            connectionHolder = new ConnectionHolder();
            localConnectionHolder.set(connectionHolder);
        }
        return connectionHolder;
    }

}
复制代码

 

有了一个线程安全的SingleThreadConnectionHolder类,我们便可以在service层和各个DAO中使用该类来获取Connection对象:

        Connection connection = SingleThreadConnectionHolder.getConnection(dataSource);

 

当然,此时我们需要传入一个DataSource,这个DataSource可以作为DAO类的实例变量存在,所以我们不用像上一篇文 章那样将Connection对象直接传给DAO的方法。这里你可能要问,既然可以将DataSource作为实例变量,那么在上一篇文章中,为什么不可 以将Connection也作为实例变量呢,这样不就不会造成丑陋的API了吗?原因在于:将Connection对象作为实例变量同样会带来线程安全问 题,当多个线程同时使用同一个DAO类时,一个线程关闭了Connection而另一个正在使用,这样的问题和上面讲到的 ConnectionHolder的线程安全问题一样。

关于Bank DAO和Insurance DAO类的源代码这里就不列出了,他们和上篇文章只是获得Connection对象的方法不一样而已,你可以参考github源代码

接下来,我们再来看看TransactionManager类,在上几篇文章中,我们都是在service类中直接写和事务处理相关的代码,而更好的方式是声明一个TransactionManger类将事务处理相关工作集中管理:

复制代码
package davenkin.step3_connection_holder;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;

public class TransactionManager
{
    private DataSource dataSource;

    public TransactionManager(DataSource dataSource)
    {
        this.dataSource = dataSource;
    }

    public final void start() throws SQLException
    {
        Connection connection = getConnection();
        connection.setAutoCommit(false);
    }

    public final void commit() throws SQLException
    {
        Connection connection = getConnection();
        connection.commit();
    }

    public final void rollback()
    {
        Connection connection = null;
        try
        {
            connection = getConnection();
            connection.rollback();

        } catch (SQLException e)
        {
            throw new RuntimeException("Couldn't rollback on connection[" + connection + "].", e);
        }
    }

    public final void close()
    {
        Connection connection = null;
        try
        {
            connection = getConnection();
            connection.setAutoCommit(true);
            connection.setReadOnly(false);
            connection.close();
            SingleThreadConnectionHolder.removeConnection(dataSource);
        } catch (SQLException e)
        {
            throw new RuntimeException("Couldn't close connection[" + connection + "].", e);
        }
    }

    private Connection getConnection() throws SQLException
    {
        return SingleThreadConnectionHolder.getConnection(dataSource);
    }
}
复制代码

 

可以看出,TransactionManager对象也维护了一个DataSource实例变量,并且也是通过 SingleThreadConnectionHolder来获取Connection对象的。然后我们在service类中使用该 TransactionManager:

复制代码
package davenkin.step3_connection_holder;

import davenkin.BankService;
import javax.sql.DataSource;

public class ConnectionHolderBankService implements BankService
{
    private TransactionManager transactionManager;
    private ConnectionHolderBankDao connectionHolderBankDao;
    private ConnectionHolderInsuranceDao connectionHolderInsuranceDao;

    public ConnectionHolderBankService(DataSource dataSource)
    {
        transactionManager = new TransactionManager(dataSource);
        connectionHolderBankDao = new ConnectionHolderBankDao(dataSource);
        connectionHolderInsuranceDao = new ConnectionHolderInsuranceDao(dataSource);

    }

    public void transfer(int fromId, int toId, int amount)
    {
        try
        {
            transactionManager.start();
            connectionHolderBankDao.withdraw(fromId, amount);
            connectionHolderInsuranceDao.deposit(toId, amount);
            transactionManager.commit();
        } catch (Exception e)
        {
            transactionManager.rollback();
        } finally
        {
            transactionManager.close();
        }
    }
}
复制代码

 

在ConnectionHolderBankService中,我们使用TransactionManager来管理事务,由于 TransactionManger和两个DAO类都是使用SingleThreadConnectionHolder来获取Connection,故他 们在整个事务处理过程中使用了相同的Connection对象,事务处理成功。我们也可以看到,在两个DAO的withdraw和deposit方法没有 接受和业务无关的对象,消除了API污染;另外,使用TransactionManager来管理事务,使Service层代码也变简洁了。

 

下一篇文章中,我们将讲到使用Template模式来完成事务处理。

本系列上一篇文章中,我们讲到了使用TransactionManger和ConnectionHolder完成线程安全的事务管理,在本篇中,我们将在此基础上引入Template模式进行事务管理。

 

这是一个关于Java事务处理的系列文章,请通过以下方式下载github源代码:

git clone https://github.com/davenkin/java_transaction_workshop.git

 

Template模式大家应该都很熟悉,比如Spring就提供了许多Template,像JdbcTemplate和JmsTemplate等。 Template模式的基本思想是:在超类里将完成核心功能的方法声明为抽象方法,留给子类去实现,而在超类中完成一些通用操作,比如JMS的 Session的建立和数据库事务的准备工作等。

 

在本篇文章中,我们使用一个Template类来帮助管理事务,定义TransactionTemplate类如下:

复制代码
package davenkin.step4_transaction_template;

import davenkin.step3_connection_holder.TransactionManager;
import javax.sql.DataSource;

public abstract class TransactionTemplate
{
    private TransactionManager transactionManager;

    protected TransactionTemplate(DataSource dataSource)
    {
        transactionManager = new TransactionManager(dataSource);
    }

    public void doJobInTransaction()
    {
        try
        {
            transactionManager.start();
            doJob();
            transactionManager.commit();
        } catch (Exception e)
        {
            transactionManager.rollback();
        } finally
        {
            transactionManager.close();
        }
    }

    protected abstract void doJob() throws Exception;
}
复制代码

 

在TransactionTemplate类中定义一个doJobInTransaction方法,在该方法中首先使用 TransactionManager开始事务,然后调用doJob方法完成业务功能,doJob方法为抽象方法,完成业务功能的子类应该实现该方法,最 后,根据doJob方法执行是否成功决定commit事务或是rollback事务。

 

然后定义使用TransactionTemplate的TransactionTemplateBankService:

复制代码
package davenkin.step4_transaction_template;

import davenkin.BankService;
import davenkin.step3_connection_holder.ConnectionHolderBankDao;
import davenkin.step3_connection_holder.ConnectionHolderInsuranceDao;
import javax.sql.DataSource;

public class TransactionTemplateBankService implements BankService
{
    private DataSource dataSource;
    private ConnectionHolderBankDao connectionHolderBankDao;
    private ConnectionHolderInsuranceDao connectionHolderInsuranceDao;

    public TransactionTemplateBankService(DataSource dataSource)
    {
        this.dataSource = dataSource;
        connectionHolderBankDao = new ConnectionHolderBankDao(dataSource);
        connectionHolderInsuranceDao = new ConnectionHolderInsuranceDao(dataSource);
    }

    public void transfer(final int fromId, final int toId, final int amount)
    {
        new TransactionTemplate(dataSource)
        {
            @Override
            protected void doJob() throws Exception
            {
                connectionHolderBankDao.withdraw(fromId, amount);
                connectionHolderInsuranceDao.deposit(toId, amount);
            }
        }.doJobInTransaction();
    }
}
复制代码

 

在TransactionTemplateBankService的transfer方法中,我们创建了一个匿名的 TtransactionTemplate类,并且实现了doJob方法,在doJob方法中调用两个DAO完成业务操作,然后调用调用 TransactionTemplateBankService的doJobInTransaction方法。

 

由于TransactionTemplate只是对上一篇文章中事务处理的一层封装,故TransactionManager和两个DAO类都保持和上一篇中的一样,此时他们都使用SingleThreadConnectionHolder获得Connection,故事务处理成功。

 

下一篇文章中,我们会讲到使用Java的动态代理来完成事务处理,这也是Spring管理事务的典型方法。

本系列上一篇文 章中,我们讲到了使用Template模式进行事务管理,这固然是一种很好的方法,但是不那么完美的地方在于我们依然需要在service层中编写和事务 处理相关的代码,即我们需要在service层中声明一个TransactionTemplate。在本篇文章中,我们将使用Java提供的动态代理 (Dynamic Proxy)功能来完成事务处理,你将看到无论是在service层还是DAO层都不会有事务处理代码,即他们根本就意识不到事务处理的存在。使用动态代 理完成事务处理也是AOP的一种典型应用。

 

这是一个关于Java事务处理的系列文章,请通过以下方式下载github源代码:

git clone https://github.com/davenkin/java_transaction_workshop.git

 

Java动态代理的基本原理为:被代理对象需要实现某个接口(这是前提),代理对象会拦截对被代理对象的方法调用,在其中可以全然抛弃被代理对象的 方法实现而完成另外的功能,也可以在被代理对象方法调用的前后增加一些额外的功能。在本篇文章中,我们将拦截service层的transfer方法,在 其调用之前加入事务准备工作,然后调用原来的transfer方法,之后根据transfer方法是否执行成功决定commit还是rollback。

 

首先定义一个TransactionEnabledProxyManager类:

复制代码
package davenkin.step5_transaction_proxy;

import davenkin.step3_connection_holder.TransactionManager;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

public class TransactionEnabledProxyManager
{
    private TransactionManager transactionManager;

    public TransactionEnabledProxyManager(TransactionManager transactionManager)
    {

        this.transactionManager = transactionManager;
    }

    public Object proxyFor(Object object)
    {
        return Proxy.newProxyInstance(object.getClass().getClassLoader(), object.getClass().getInterfaces(), new TransactionInvocationHandler(object, transactionManager));
    }
}

class TransactionInvocationHandler implements InvocationHandler
{
    private Object proxy;
    private TransactionManager transactionManager;

    TransactionInvocationHandler(Object object, TransactionManager transactionManager)
    {
        this.proxy = object;
        this.transactionManager = transactionManager;
    }

    public Object invoke(Object o, Method method, Object[] objects) throws Throwable
    {
        transactionManager.start();
        Object result = null;
        try
        {
            result = method.invoke(proxy, objects);
            transactionManager.commit();
        } catch (Exception e)
        {
            transactionManager.rollback();
        } finally
        {
            transactionManager.close();
        }
        return result;
    }
}
复制代码

 

通过调用该类的proxyFor方法,传入需要被代理的对象(本例中为service对象),返回一个代理对象。此后,在调用代理对象的 transfer方法时,会自动调用TransactionIvocationHandler的invoke方法,在该方法中,我们首先开始事务,然后执 行:

 result = method.invoke(proxy, objects);

 

上面一行代码执行的是原service层的transfer方法,如果方法执行成功则commit,否则rollback事务。

 

由于与事务处理相关的代码都被转移到了代理对象中,在service层中我们只需调用DAO即可:

复制代码
package davenkin.step5_transaction_proxy;

import davenkin.BankService;
import davenkin.step3_connection_holder.ConnectionHolderBankDao;
import davenkin.step3_connection_holder.ConnectionHolderInsuranceDao;
import javax.sql.DataSource;

public class BareBankService implements BankService
{
    private ConnectionHolderBankDao connectionHolderBankDao;
    private ConnectionHolderInsuranceDao connectionHolderInsuranceDao;

    public BareBankService(DataSource dataSource)
    {
        connectionHolderBankDao = new ConnectionHolderBankDao(dataSource);
        connectionHolderInsuranceDao = new ConnectionHolderInsuranceDao(dataSource);
    }

    public void transfer(final int fromId, final int toId, final int amount)
    {
        try
        {
            connectionHolderBankDao.withdraw(fromId, amount);
            connectionHolderInsuranceDao.deposit(toId, amount);
        } catch (Exception e)
        {
            throw new RuntimeException();
        }
    }
}
复制代码

 

如何,上面的BareBankService中没有任何事务处理的影子,我们只需关注核心业务逻辑即可。

 

然后在客户代码中,我们需要先创建代理对象(这在Spring中通常是通过配置实现的):

复制代码
  @Test
    public void transferFailure() throws SQLException
    {
        TransactionEnabledProxyManager transactionEnabledProxyManager = new TransactionEnabledProxyManager(new TransactionManager(dataSource));
        BankService bankService = new BareBankService(dataSource);
        BankService proxyBankService = (BankService) transactionEnabledProxyManager.proxyFor(bankService);

        int toNonExistId = 3333;
        proxyBankService.transfer(1111, toNonExistId, 200);

        assertEquals(1000, getBankAmount(1111));
        assertEquals(1000, getInsuranceAmount(2222));
    }
复制代码

 

在上面的测试代码中,我们首先创建一个BareBankService对象,然后调用 transactionEnabledProxyManager的proxyFor方法生成对原BareBankService对象的代理对象,最后在代 理对象上调用transfer方法,测试运行成功。

 

可以看到,通过以上动态代理实现,BareBankService中的所有public方法都被代理了,即他们都被加入到事务中。这对于 service层中的所有方法都需要和数据库打交道的情况是可以的,本例即如此(有且只有一个transfer方法),然而对于service层中不需要 和数据库打交道的public方法,这样做虽然也不会出错,但是却显得多余。在下一篇文章中,我们将讲到使用Java注解(annotation)的方式来声明一个方法是否需要事务,就像Spring中的Transactional注解一样。

本系列上一篇文 章中,我们讲到了使用动态代理的方式完成事务处理,这种方式将service层的所有public方法都加入到事务中,这显然不是我们需要的,需要代理的 只是那些需要操作数据库的方法。在本篇中,我们将讲到如何使用Java注解(Annotation)来标记需要事务处理的方法。

 

这是一个关于Java事务处理的系列文章,请通过以下方式下载github源代码:

git clone https://github.com/davenkin/java_transaction_workshop.git

 

首先定义Transactional注解:

复制代码
package davenkin.step6_annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Transactional
{
}
复制代码

 

使用注解标记事务的基本原理为:依然使用上一篇中 讲到的动态代理的方式,只是在InvocationHandler的invoke方法中,首先判断被代理的方法是否标记有Transactional注 解,如果没有则直接调用method.invoke(proxied, objects),否则,先准备事务,在调用method.invoke(proxied, objects),然后根据该方法是否执行成功调用commit或rollback。定义 TransactionEnabledAnnotationProxyManager如下:

复制代码
package davenkin.step6_annotation;

import davenkin.step3_connection_holder.TransactionManager;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

public class TransactionEnabledAnnotationProxyManager
{
    private TransactionManager transactionManager;

    public TransactionEnabledAnnotationProxyManager(TransactionManager transactionManager)
    {

        this.transactionManager = transactionManager;
    }

    public Object proxyFor(Object object)
    {
        return Proxy.newProxyInstance(object.getClass().getClassLoader(), object.getClass().getInterfaces(), new AnnotationTransactionInvocationHandler(object, transactionManager));
    }
}

class AnnotationTransactionInvocationHandler implements InvocationHandler
{
    private Object proxied;
    private TransactionManager transactionManager;

    AnnotationTransactionInvocationHandler(Object object, TransactionManager transactionManager)
    {
        this.proxied = object;
        this.transactionManager = transactionManager;
    }

    public Object invoke(Object proxy, Method method, Object[] objects) throws Throwable
    {
        Method originalMethod = proxied.getClass().getMethod(method.getName(), method.getParameterTypes());
        if (!originalMethod.isAnnotationPresent(Transactional.class))
        {
            return method.invoke(proxied, objects);
        }

        transactionManager.start();
        Object result = null;
        try
        {
            result = method.invoke(proxied, objects);
            transactionManager.commit();
        } catch (Exception e)
        {
            transactionManager.rollback();
        } finally
        {
            transactionManager.close();
        }
        return result;
    }
}
复制代码

 

可以看到,在AnnotationTransactionInvocationHandler的invoke方法中,我们首先获得原service 的transfer方法,然后根据originalMethod.isAnnotationPresent(Transactional.class)判 断该方法是否标记有Transactional注解,如果没有,则任何额外功能都不加,直接调用原来service的transfer方法;否则,将其加 入到事务处理中。

 

在service层中,我们只需将需要加入事务处理的方法用Transactional注解标记就行了:

复制代码
package davenkin.step6_annotation;

import davenkin.BankService;
import davenkin.step3_connection_holder.ConnectionHolderBankDao;
import davenkin.step3_connection_holder.ConnectionHolderInsuranceDao;

import javax.sql.DataSource;

public class AnnotationBankService implements BankService
{
    private ConnectionHolderBankDao connectionHolderBankDao;
    private ConnectionHolderInsuranceDao connectionHolderInsuranceDao;

    public AnnotationBankService(DataSource dataSource)
    {
        connectionHolderBankDao = new ConnectionHolderBankDao(dataSource);
        connectionHolderInsuranceDao = new ConnectionHolderInsuranceDao(dataSource);
    }

    @Transactional
    public void transfer(final int fromId, final int toId, final int amount)
    {
        try
        {
            connectionHolderBankDao.withdraw(fromId, amount);
            connectionHolderInsuranceDao.deposit(toId, amount);
        } catch (Exception e)
        {
            throw new RuntimeException();
        }
    }
}
复制代码

 

然后执行测试:

复制代码
    @Test
    public void transferFailure() throws SQLException
    {
        TransactionEnabledAnnotationProxyManager transactionEnabledAnnotationProxyManager = new TransactionEnabledAnnotationProxyManager(new TransactionManager(dataSource));
        BankService bankService = new AnnotationBankService(dataSource);
        BankService proxyBankService = (BankService) transactionEnabledAnnotationProxyManager.proxyFor(bankService);

        int toNonExistId = 3333;
        proxyBankService.transfer(1111, toNonExistId, 200);

        assertEquals(1000, getBankAmount(1111));
        assertEquals(1000, getInsuranceAmount(2222));
    }
复制代码

 

测试运行成功,如果将AnnotationBankService中transfer方法的Transactional注解删除,那么以上测试将抛 出RuntimeException异常,该异常为transfer方法中我们人为抛出的,也即由于此时没有事务来捕捉异常,程序便直接抛出该异常而终止 运行。在下一篇(本系列最后一篇)文章中,我们将讲到分布式事务的一个入门例子。

本系列先前的文章中,我们主要讲解了JDBC对本地事务的处理,本篇文章将讲到一个分布式事务的例子。

 

请通过以下方式下载github源代码:

git clone https://github.com/davenkin/jta-atomikos-hibernate-activemq.git

本地事务和分布式事务的区别在于:本地事务只用于处理单一数据源事务(比如单个数据库),分布式事务可以处理多种异构的数据源,比如某个业务操作中同时包含了JDBC和JMS或者某个操作需要访问多个不同的数据库。

 

Java通过JTA完成分布式事务,JTA本身只是一种规范,不同的应用服务器都包含有自己的实现(比如JbossJTA),同时还存在独立于应用服务器的单独JTA实现,比如本篇中要讲到的Atomikos。对于JTA的原理,这里不细讲,读者可以通过这篇文章了解相关知识。

 

在本篇文章中,我们将实现以下一个应用场景:你在网上购物,下了订单之后,订单数据将保存在系统的数据库中,同时为了安排物流,订单信息将以消息(Message)的方式发送到物流部门以便送货。

 

以上操作同时设计到数据库操作和JMS消息发送,为了使整个操作成为一个原子操作,我们只能选择分布式事务。我们首先设计一个service层,定义OrderService接口:

package davenkin;

public interface OrderService {

    public void makeOrder(Order order);
}

 

为了简单起见,我们设计一个非常简单的领域对象Order:

复制代码
@XmlRootElement(name = "Order")
@XmlAccessorType(XmlAccessType.FIELD)
public class Order {

    @XmlElement(name = "Id",required = true)
    private long id;

    @XmlElement(name = "ItemName",required = true)
    private String itemName;

    @XmlElement(name = "Price",required = true)
    private double price;

    @XmlElement(name = "BuyerName",required = true)
    private String buyerName;

    @XmlElement(name = "MailAddress",required = true)
    private String mailAddress;

    public Order() {
    }
复制代码

 

为了采用JAXB对Order对象进行Marshal和Unmarshal,我们在Order类中加入了JAXB相关的Annotation。 我们将使用Hibernate来完成数据持久化,然后使用Spring提供的JmsTemplate将Order转成xml后以TextMessage的 形式发送到物流部门的ORDER.QUEUE中。

 

(一)准备数据库

为了方便,我们将采用Spring提供的embedded数据库,默认情况下Spring采用HSQL作为后台数据库,虽然在本例中我们将采用HSQL的非XA的DataSource,但是通过Atomikos包装之后依然可以参与分布式事务。

SQL脚本包含在createDB.sql文件中:

复制代码
CREATE TABLE USER_ORDER(
ID INT NOT NULL,
ITEM_NAME VARCHAR (100) NOT NULL UNIQUE,
PRICE DOUBLE NOT NULL,
BUYER_NAME CHAR (32) NOT NULL,
MAIL_ADDRESS VARCHAR(500) NOT NULL,
PRIMARY KEY(ID)
);
复制代码

 

在Spring中配置DataSource如下:

    <jdbc:embedded-database id="dataSource">
        <jdbc:script location="classpath:createDB.sql"/>
    </jdbc:embedded-database>

 

(二)启动ActiveMQ

我们将采用embedded的ActiveMQ,在测试之前启动ActiveMQ提供的BrokerService,在测试执行完之后关闭BrokerService。

复制代码
  @BeforeClass
    public static void startEmbeddedActiveMq() throws Exception {
        broker = new BrokerService();
        broker.addConnector("tcp://localhost:61616");
        broker.start();
    }

    @AfterClass
    public static void stopEmbeddedActiveMq() throws Exception {
        broker.stop();
    }
复制代码

 

(三)实现OrderService

创建一个DefaultOrderService,该类实现了OrderService接口,并维护一个JmsTemplate和一个Hibernate的SessionFactory实例变量,分别用于Message的发送和数据库处理。

复制代码
package davenkin;

import org.hibernate.SessionFactory;
import org.hibernate.classic.Session;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.transaction.annotation.Transactional;

public class DefaultOrderService  implements OrderService{
    private JmsTemplate jmsTemplate;
    private SessionFactory sessionFactory;

    @Override
    @Transactional
    public void makeOrder(Order order) {
        Session session = sessionFactory.getCurrentSession();
        session.save(order);
        jmsTemplate.convertAndSend(order);

    }

    @Required
    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    @Required
    public void setSessionFactory(SessionFactory sessionFactory) {
        this.sessionFactory = sessionFactory;
    }
}
复制代码

 

(四)创建Order的Mapping配置文件

复制代码
<?xml version="1.0"?>
<!DOCTYPE hibernate-mapping PUBLIC "-//Hibernate/Hibernate Mapping DTD 3.0//EN"
        "http://hibernate.sourceforge.net/hibernate-mapping-3.0.dtd">

<hibernate-mapping>
    <class name="davenkin.Order" table="USER_ORDER">
        <id name="id" type="long">
            <column name="ID" />
            <generator class="increment" />
        </id>
        <property name="itemName" type="string">
            <column name="ITEM_NAME" />
        </property>
        <property name="price" type="double">
            <column name="PRICE"/>
        </property>
        <property name="buyerName" type="string">
            <column name="BUYER_NAME"/>
        </property>
        <property name="mailAddress" type="string">
            <column name="MAIL_ADDRESS"/>
        </property>
    </class>
</hibernate-mapping>
复制代码

 

(五)配置Atomikos事务

在Spring的IoC容器中,我们需要配置由Atomikos提供的UserTransaction和TransactionManager,然后再配置Spring的JtaTransactionManager:

复制代码
  <bean id="userTransactionService" class="com.atomikos.icatch.config.UserTransactionServiceImp" init-method="init" destroy-method="shutdownForce">
        <constructor-arg>
            <props>
                <prop key="com.atomikos.icatch.service">com.atomikos.icatch.standalone.UserTransactionServiceFactory</prop>
            </props>
        </constructor-arg>
    </bean>

    <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close" depends-on="userTransactionService">
        <property name="forceShutdown" value="false" />
    </bean>

    <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp" depends-on="userTransactionService">
        <property name="transactionTimeout" value="300" />
    </bean>

    <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager" depends-on="userTransactionService">
        <property name="transactionManager" ref="atomikosTransactionManager" />
        <property name="userTransaction" ref="atomikosUserTransaction" />
    </bean>

    <tx:annotation-driven transaction-manager="jtaTransactionManager" />
复制代码

 

(六)配置JMS

对于JMS,为了能使ActiveMQ加入到分布式事务中,我们需要配置ActiveMQXAConnectionFactory,而不是 ActiveMQConnectionFactory,然后再配置JmsTemplate,此外还需要配置MessageConvertor在Order 对象和XML之间互转。

复制代码
    <bean id="jmsXaConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616" />
    </bean>

    <bean id="amqConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init">
        <property name="uniqueResourceName" value="XAactiveMQ" />
        <property name="xaConnectionFactory" ref="jmsXaConnectionFactory" />
        <property name="poolSize" value="5"/>
    </bean>


    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="amqConnectionFactory"/>
        <property name="receiveTimeout" value="2000" />
        <property name="defaultDestination" ref="orderQueue"/>
        <property name="sessionTransacted" value="true" />
        <property name="messageConverter" ref="oxmMessageConverter"/>
    </bean>

    <bean id="orderQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="ORDER.QUEUE"/>
    </bean>


    <bean id="oxmMessageConverter"
          class="org.springframework.jms.support.converter.MarshallingMessageConverter">
        <property name="marshaller" ref="marshaller"/>
        <property name="unmarshaller" ref="marshaller"/>
    </bean>

    <oxm:jaxb2-marshaller id="marshaller">
        <oxm:class-to-be-bound name="davenkin.Order"/>
    </oxm:jaxb2-marshaller>
复制代码

 

(七)测试

在测试中,我们首先通过(二)中的方法启动ActiveMQ,再调用DefaultOrderService,最后对数据库和QUEUE进行验证:

复制代码
   @Test
    public void makeOrder(){
        orderService.makeOrder(createOrder());
        JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
        assertEquals(1, jdbcTemplate.queryForInt("SELECT COUNT(*) FROM USER_ORDER"));
        String dbItemName = jdbcTemplate.queryForObject("SELECT ITEM_NAME FROM USER_ORDER", String.class);
        String messageItemName = ((Order) jmsTemplate.receiveAndConvert()).getItemName();
        assertEquals(dbItemName, messageItemName);
    }

    @Test(expected = IllegalArgumentException.class)
    public void failToMakeOrder()
    {
        orderService.makeOrder(null);
        JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
        assertEquals(0, jdbcTemplate.queryForInt("SELECT COUNT(*) FROM USER_ORDER"));
        assertNull(jmsTemplate.receiveAndConvert());
    }
复制代码

 

 

 

 

分享到:
评论

相关推荐

    java事务处理总结

    通过对Java事务处理的理解,我们可以发现,事务不仅是Java应用开发中不可或缺的一部分,更是确保数据完整性和一致性的关键所在。无论是使用JDBC还是JTA进行事务管理,都需要开发者深刻理解事务的基本原则以及具体的...

    HL7解析 Java

    HAPI是Java平台上的一个开源框架,它提供了对HL7 v2.x消息的强大支持,包括解析、创建和验证HL7消息。HAPI不仅简化了HL7消息处理,还提供了一些高级功能,如事务处理、错误处理和性能优化。 关于HAPI库的使用,以下...

    Java Spring 源码解析 Xmind 思维导图

    Java Spring 是一个广泛使用的开源框架,它为Java应用程序提供了依赖注入(DI)和面向切面编程(AOP)的功能,极大地简化了企业级应用的开发。Spring的核心是IoC(Inversion of Control)容器,它负责管理对象的生命...

    深入解析Java的Spring框架中的混合事务与bean的区分

    在Java的Spring框架中,事务管理和Bean管理是两个核心概念,它们在Web应用程序开发中扮演着至关重要的角色。本文将深入解析Spring框架中的混合事务(也称为编程式事务管理)与Bean的区别,以及如何在实际应用中有效...

    【面试必备】JAVA 最常见面试题全解析 附 PDF.pdf_java面试_javapdf_java_面试题_

    这份"【面试必备】JAVA 最常见面试题全解析 附 PDF"文档,正是为准备Java面试的开发者量身定制的资源,涵盖了Java的基础到高级知识点,旨在帮助你全面理解和掌握Java技术。 一、基础篇 1. Java语法:包括变量、数据...

    java解析dbf文件三种方法、以及解析驱动

    ### Java解析DBF文件的方法与解析驱动介绍 #### DBF文件简介 DBF(dBase File)是一种常用的数据文件格式,最初由dBase数据库应用程序创建并使用。它以表格形式存储数据,支持多种不同的数据库管理系统(DBMS)。...

    java版excel解析,并上传至mysql数据库

    在Java编程环境中,将Excel数据解析并上传到MySQL数据库是一项常见的任务,特别是在数据处理和导入导出场景下。本项目提供了完整的解决方案,包括所需的jar包、数据库SQL语句以及一个可直接在Eclipse环境中运行的...

    基于RabbitMQ的Java分布式事务设计源码解析

    本项目深入解析了利用RabbitMQ实现Java分布式事务的设计源码,包含24个文件,涵盖Java源代码、XML配置、shell脚本以及相关配置文件,适用于对分布式事务有深入了解的开发者和技术爱好者。

    Java 面试全解析:核心知识点与典型面试题.zip

    本资源包"Java 面试全解析:核心知识点与典型面试题.zip"包含了多个关键主题,帮助求职者深入理解和掌握 Java 的核心概念,以及应对面试中的各种问题。 1. **设计模式** - 34-设计模式常见面试题汇总.html 设计...

    java动态代理详细解析

    Java动态代理是一种编程技术,主要用于在运行时创建一个新的对象,该对象可以作为现有接口的实现。动态代理的主要目的是为了在不修改原有代码的基础上,为已有的接口或类添加额外的功能,比如日志、事务管理、性能...

    Java数据库连接全解析:从JDBC到实践应用

    Java提供了一种强大的方式来操作数据库,这主要得益于Java Database Connectivity(JDBC)API。JDBC是一个标准的Java API,用于执行数据库操作,它提供了一种统一的方法,使得Java程序能够独立于数据库的实现。本文...

    java应用开发解析包

    在Java应用开发中,解析包通常包含了用于处理数据解析、网络通信和数据库操作的重要库和工具。本资源集合涵盖了MySQL数据库交互、JSON数据格式处理以及HTTP网络请求等多个关键领域,对于提升Java开发者的工作效率和...

    Java 最常见 200+ 面试题全解析:面试必备.pdf

    Java面试题全解析涉及的知识点非常广泛,覆盖了Java编程的多个重要模块和主题。在详细介绍前,我们首先要明白JDK和JRE的区别。JDK即Java Development Kit,是开发Java程序的环境,包含了编译器(javac)和运行环境...

    java框架课程代码全解析

    Java框架课程代码全解析主要涵盖了Java开发中常用且重要的框架技术,这些框架是现代Java企业级应用开发的基础。本课程的目的是帮助学习者深入理解并掌握这些框架的使用,提升开发效率,为构建稳定、高效的后端服务...

    深度解析Java游戏服务器开发源代码

    《深度解析Java游戏服务器开发源代码》是一本专注于Java技术在游戏服务器开发领域的专著,旨在帮助读者深入了解和掌握如何利用Java语言构建高效、稳定的游戏服务端系统。书中的源代码提供了丰富的实践示例,涵盖了从...

    java jar包 ZIp压缩解压 数据库连接驱动 XML解析

    Java中,JDOM(Java Document Object Model)是一个开源项目,提供了对XML文档的API,使得解析和操作XML变得简单。JDOM通过构建一个DOM树来表示XML文档,允许开发者通过对象模型访问和修改XML数据。例如,我们可以...

    Java中数据库事务处理的代码清单.pdf

    以下是一份简化的Java中数据库事务处理的代码清单,以及相关的知识点解析。 1. **连接数据库**: - 在`ConnectAccess.java`类中,通过`Class.forName()`方法加载数据库驱动,这里是JDBC的Access驱动。`url`属性...

    java读取数据库表中内容转存sql文件,然后解析执行此文件

    在Java编程中,读取数据库表中的内容并将其转换为SQL文件,然后解析并执行这个文件,是一项常见的数据处理任务。这项操作可能涉及到数据库连接、数据查询、文件操作以及SQL语句的构建与执行。以下将详细介绍这个过程...

    java 解析xml 并导入数据库(dom4j )

    在Java编程中,解析XML文件并将其数据导入到数据库是一项常见的任务。在这个场景下,我们主要涉及两个核心技术:XML解析库和数据库操作。本篇将详细介绍如何使用DOM4J库来解析XML,并将解析得到的数据有效地导入到...

    Java解析Ping++ WebHooks数据

    对于Java开发者来说,解析这些WebHooks数据并从中提取所需信息是至关重要的。 在处理Ping++的WebHooks数据时,我们可以使用各种JSON库,如GSon和fastJson。由于数据结构通常比较复杂,这里我们将重点关注两种高效的...

Global site tag (gtag.js) - Google Analytics