`

多数据源--同一事物中访问不同数据库即分库事物的实现

 
阅读更多

前一节总结了如何配置动态路由,本节讨论如何在同一事物中访问不同数据库即分库事物的实现。

   由于同一个事物只能绑定一个数据源连接,当切换数据源时需要解除老数据源连接的绑定,将新数据源绑定到当前线程,访问完毕后在将老数据源绑定回线程。

datasource-config.xml如下:

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context"

xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://www.springframework.org/schema/beans  

http://www.springframework.org/schema/beans/spring-beans-2.5.xsd  

http://www.springframework.org/schema/context  

 http://www.springframework.org/schema/context/spring-context-2.5.xsd  

http://www.springframework.org/schema/aop   

http://www.springframework.org/schema/aop/spring-aop-2.5.xsd  

http://www.springframework.org/schema/tx   

http://www.springframework.org/schema/tx/spring-tx-2.5.xsd">

 

<!-- AOP aspectj配置 可用于异常处理,权限,及参数验证等 -->

<aop:aspectj-autoproxy proxy-target-class="true" />

<bean id="multiDbAspect" class="com.aop.MultiDbAspect">

<property name="dataSource" ref="dataSource" />

</bean>

 

 

<!-- 数据源配置 -->

<bean id="dataSourceFirst" class="org.apache.commons.dbcp.BasicDataSource"

destroy-method="close">

<property name="driverClassName" value="oracle.jdbc.driver.OracleDriver" />

<property name="url" value="jdbc:oracle:thin:@10.20.151.4:1521:ptdev" />

<property name="username" value="pt" />

<property name="password" value="pt" />

<property name="maxActive" value="200" />

<property name="maxIdle" value="5" />

<property name="poolPreparedStatements" value="true" />

<property name="removeAbandoned" value="true" />

<property name="removeAbandonedTimeout" value="300" />

</bean>

 

<bean id="dataSourceSecond" class="org.apache.commons.dbcp.BasicDataSource"

destroy-method="close">

<property name="driverClassName" value="oracle.jdbc.driver.OracleDriver" />

<property name="url" value="jdbc:oracle:thin:@10.20.151.12:1521:pt10g" />

<property name="username" value="pt" />

<property name="password" value="pt" />

<property name="maxActive" value="200" />

<property name="maxIdle" value="5" />

<property name="poolPreparedStatements" value="true" />

<property name="removeAbandoned" value="true" />

<property name="removeAbandonedTimeout" value="300" />

</bean>

 

<bean id="dataSource" class="com.common.bean.RoutingDataSource">

<property name="targetDataSources">

<map>

<entry key="1" value-ref="dataSourceFirst" />

<entry key="2" value-ref="dataSourceSecond" />

</map>

</property>

<property name="defaultTargetDataSource">

<ref local="dataSourceFirst" />

</property>

</bean>

 

<bean id="transactionManager"

class="org.springframework.jdbc.datasource.DataSourceTransactionManager">

<property name="dataSource">

<ref local="dataSource" />

</property>

</bean>

 

<bean id="lobHandler" class="org.springframework.jdbc.support.lob.DefaultLobHandler"

lazy-init="true" />

 

<bean id="sqlMapClient" class="org.springframework.orm.ibatis.SqlMapClientFactoryBean">

<property name="dataSource" ref="dataSource" />

<property name="lobHandler" ref="lobHandler" />

<property name="configLocations" value="classpath*:/ibatis/config/sql-map.xml" />

</bean>

 

<bean id="txAttributeSource"

class="org.springframework.transaction.interceptor.NameMatchTransactionAttributeSource">

<property name="properties">

<props>

<prop key="add*">PROPAGATION_REQUIRED,-PtServiceException</prop>

<prop key="update*">PROPAGATION_REQUIRED,-PtServiceException</prop>

<prop key="delete*">PROPAGATION_REQUIRED,-PtServiceException</prop>

<prop key="batch*">PROPAGATION_REQUIRED,-PtServiceException</prop>

<prop key="get*">PROPAGATION_REQUIRED,-PtServiceException</prop>

</props>

</property>

</bean>

 

<bean id="transactionManagerProxy" class="org.springframework.aop.framework.ProxyFactoryBean">

<property name="proxyTargetClass">

<value>true</value>

</property>

<property name="target">

<ref bean="transactionManager" />

</property>

</bean>

 

<bean id="transactionDefinition"

class="org.springframework.transaction.interceptor.TransactionProxyFactoryBean"

abstract="true">

<property name="transactionManager">

<ref bean="transactionManagerProxy" />

</property>

<property name="transactionAttributeSource">

<ref bean="txAttributeSource" />

</property>

<!--若不设置该属性,则运行main方法会报:

Exception in thread "main" java.lang.ClassCastException: $Proxy5 cannot be cast to com.service.impl.DbInfoServiceImpl

at com.transaction.TransactionTest.main(TransactionTest.java:16)
错误-->

<property name="proxyTargetClass" value="true">

</property>

</bean>

 

<bean id="baseDao" class="com.common.dao.impl.BaseDaoImpl">

<property name="sqlMapClient">

<ref bean="sqlMapClient" />

</property>

<property name="dataSource">

<ref bean="dataSource" />

</property>

</bean>

<--配置事务-->

<bean id="dbInfoService" parent="transactionDefinition">

<property name="target">

<bean class="com.service.impl.DbInfoServiceImpl">

<property name="dbInfoDao" ref="dbInfoDao" />

</bean>

</property>

</bean>

<bean id="dbInfoDao" class="com.service.dao.impl.DbInfoDaoImpl">

<property name="baseDao" ref="baseDao"></property>

</bean>

</beans>

 

 

public class DbInfoDaoImpl implements DbInfoDao{

public BaseDao baseDao;

 

public void setBaseDao(BaseDao baseDao) {

this.baseDao = baseDao;

}

 

public List<Map<String, Object>> addUserInfo(User user, Integer db) {

baseDao.add("login.addUser", user);

List<Map<String, Object>> result = baseDao.getList("login.getUserInfo",

user.getName());

return result;

}

}

 

public class DbInfoServiceImpl implements DbInfoService{
public DbInfoDao dbInfoDao;
public void setDbInfoDao(DbInfoDao dbInfoDao) {
this.dbInfoDao = dbInfoDao;
}

public List<Map<String,Object>> getUserInfo(User user)
{
                //数据源1
List<Map<String,Object>> result1=dbInfoDao.addUserInfo(user,1);
                //数据源2
List<Map<String,Object>> result2=dbInfoDao.addUserInfo(user,2);

return result1;
}

}

 

import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

import javax.sql.DataSource;

import org.apache.log4j.Logger;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.jdbc.datasource.ConnectionHolder;
import org.springframework.jdbc.datasource.DataSourceUtils;
import org.springframework.jdbc.datasource.DelegatingDataSource;
import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy;
import org.springframework.transaction.UnexpectedRollbackException;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;

import com.common.bean.Shard;
import com.common.bean.ThreadInfoHolder;
import com.common.bean.RoutingDataSource;
@Aspect
public class MultiDbAspect {
public final static long NOT_FIND = -1L;
   private final Logger     logger   = Logger.getLogger(MultiDbAspect.class);
   private DataSource       dataSource;
   
   @Around("execution (* com.service.dao.impl.*Impl.*(..))")
   public Object getDataFromAllDb(ProceedingJoinPoint jointPoint) throws Throwable {
       List<Object> returnList = new ArrayList<Object>();
       Object[] args = jointPoint.getArgs();
       Integer db=1;
       for (Object arg : args) {
           if (arg != null && Integer.class.isAssignableFrom(arg.getClass())) {
           	db = (Integer) arg;
               break;
           }
       }
                    //设置数据源
           Shard shard = new Shard();
           shard.setDbId(db.intValue());

           //动态数据源同步对象
           DynamicTransactionSynchonization syn = new DynamicTransactionSynchonization();
           try {
               syn.initDynamicRouting(shard);
               Object retVal = jointPoint.proceed();
               returnList.add(retVal);
           } catch (Throwable e) {
               this.logger.error(e.getMessage(), e);
               throw e;
           } finally {
               syn.endDynamicRouting();
           }
       return returnList;

   }


   private Shard getShard(Long accountId, List<Shard> shards) {
       for (Shard shard : shards) {
           if (shard.getAccountId().longValue() == accountId.longValue()) {
               return shard;
           }
       }

       return null;
   }


   /**
    * 动态事务源同步类,以实现跟其它数据源事务同步,实现类JTA功能
    * 
    * @author xiaoming.niexm
    */
   final private class DynamicTransactionSynchonization extends TransactionSynchronizationAdapter {
       private Logger                           logger = Logger
                                                               .getLogger(DynamicTransactionSynchonization.class);

       private ConnectionHolder                 connectionHolder;

       private int                              order;

       private ConnectionHolder                 oldConnectionHolder;

       private List<TransactionSynchronization> oldSynchronizations;

       public int getOrder() {
           return this.order;
       }

       public DynamicTransactionSynchonization() {
           this.order = this.getConnectionSynchronizationOrder(MultiDbAspect.this.dataSource);
       }

       /**
        * 暂停线程上绑定的事务资源,如Connection
        * 分库访问时,需要暂时本线程上的已有事务资源,从DataSource上获取新Connection(否则只能拿到原事务connection),
        */
       @SuppressWarnings("unchecked")
       private void suspendThreadTransactionResource() {
           //获取线程绑定的事务资源
           this.oldConnectionHolder = (ConnectionHolder) TransactionSynchronizationManager
                   .getResource(dataSource);
           this.oldSynchronizations = TransactionSynchronizationManager.getSynchronizations();

           //清除线程绑定的事务资源,之后操作须从DataSource获取新的Connection
           if (oldConnectionHolder != null) {
               TransactionSynchronizationManager.unbindResource(dataSource);
           }

       }

       /**
        * 根据shard指定的数据源动态获取Connection
        * 
        * @throws SQLException
        */
       private void initDynamicRouting1(Shard shard) throws SQLException {
       	ThreadInfoHolder.addCurrentThreadShard(shard);
       }
       private void initDynamicRouting(Shard shard) throws SQLException {
           //在当前线程threadLocalMap里面,存储分片信息,以便PtRoutingDataSource获取
           ThreadInfoHolder.addCurrentThreadShard(shard);

           //业务方法已配置事务
           if (TransactionSynchronizationManager.hasResource(dataSource)) {
               String currentThreadDb = getCurrentDbId();
               String changeToDb = ((RoutingDataSource) dataSource).getTargetDbId();

               //将要切换的DB为当前DB,不做事务资源切换,等同于默认的require事务
               if (currentThreadDb != null && currentThreadDb.equals(changeToDb)) {
                   return;
               }

               Integer isolationLevel = TransactionSynchronizationManager
                       .getCurrentTransactionIsolationLevel();

               //暂停线程上绑定的事务资源,如Connection
               this.suspendThreadTransactionResource();

               //事务数据源标志
               boolean transactionAware = (dataSource instanceof TransactionAwareDataSourceProxy);

               //根据 上一步设置的shard信息,根据shard指定的数据源动态获取Connection(会自动绑定到线程,调用XxxDao.xxx()方法访问数据库时,使用绑定的Connection)
               if (transactionAware) {
                   dataSource.getConnection();
               } else {
                   DataSourceUtils.doGetConnection(dataSource);
               }

               //获取上一步绑定到线程的事务资源
               this.connectionHolder = (ConnectionHolder) TransactionSynchronizationManager
                       .getResource(dataSource);

               if (logger.isDebugEnabled()) {
                   Connection con = this.connectionHolder.getConnection();
                   logger.debug("PT change con:" + con.toString() + " URL:"
                           + con.getMetaData().getURL());
               }

               //自动创建的connection为autocommit,须设置Connection属性后重新绑定
               TransactionSynchronizationManager.unbindResource(dataSource);

               //设置Connection连接属性   
               if (isolationLevel != null) {
                   DefaultTransactionDefinition txnDef = new DefaultTransactionDefinition();
                   txnDef.setIsolationLevel(isolationLevel);
                   DataSourceUtils.prepareConnectionForTransaction(this.connectionHolder
                           .getConnection(), txnDef);
               }

               this.connectionHolder.getConnection().setAutoCommit(false);

               //重新绑定
               TransactionSynchronizationManager.bindResource(dataSource, this.connectionHolder);

               //主要删除新connection的Synchronization
               if (TransactionSynchronizationManager.isSynchronizationActive()) {
                   TransactionSynchronizationManager.clearSynchronization();
               }

               //向业务方法事务注册同步对象,以便业务方法事务提交/回滚前,调用本类动态创建的Connection相关事务
               TransactionSynchronizationManager.initSynchronization();
               TransactionSynchronizationManager.registerSynchronization(this);

           }
           //else 业务方法没有配置事务,Ibatis执行时会自动从连接池获取一个新的autocommit Connection

       }

       /**
        * 动态获取Datasource连接操作结束后,恢复
        */
       private void endDynamicRouting() {
           //清空当前线程threadLocalMap里面存储的分片信息,防止影响线程其它方法调用
           ThreadInfoHolder.cleanCurrentThreadShard();

           //恢复initDynamicRouting()暂停的事务资源,如Connection(业务事务资源)
           if (this.oldConnectionHolder != null) {
               //解除initDynamicRouting()方法中绑定的动态数据源(注,提交、回滚操作在外层事务提交,回滚时自动调用beforeCompletion()完成,清除资源自动调用DataSourceUtils.ConnectionSynchronization.afterCompletion()
               TransactionSynchronizationManager.unbindResource(dataSource);

               TransactionSynchronizationManager.bindResource(dataSource, oldConnectionHolder);
           }

           if (TransactionSynchronizationManager.isSynchronizationActive()
                   && this.oldSynchronizations != null && this.oldSynchronizations.size() > 0) {
               for (TransactionSynchronization oldSynchronization : oldSynchronizations) {
                   TransactionSynchronizationManager.registerSynchronization(oldSynchronization);
               }
           }

       }

       /**
        * 动态事务提交
        */
       private void commit() throws UnexpectedRollbackException {
           try {
               this.connectionHolder.getConnection().commit();
           } catch (SQLException e) {
               String msg = "Commit dynamic transaction error:" + e.getMessage();
               logger.error(msg, e);
               throw new UnexpectedRollbackException(msg, e);
           }

       }

       /**
        * 动态事务回滚
        */
       private void rollback() throws UnexpectedRollbackException {
           try {
               this.connectionHolder.getConnection().rollback();
           } catch (SQLException e) {
               String msg = "Rollback dynamic transaction error:" + e.getMessage();
               logger.error(msg, e);
               throw new UnexpectedRollbackException(msg, e);
           }
       }

       /**
        * 获取外层事务资源
        * 
        * @return
        */
       private ConnectionHolder getOutTxnCollectionHolder() {
           return (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
       }

       /**
        * 获取外层事务是否已回滚
        * 
        * @return
        */
       private boolean isOutTransactionRollbackOnly() {
           ConnectionHolder outTxnHolder = this.getOutTxnCollectionHolder();
           return (outTxnHolder != null && outTxnHolder.isRollbackOnly());
       }

       /**
        * 提交、回滚动态创建的事务(外层事务提交、回滚之前调用本方法)
        */
       @Override
       public void beforeCompletion() {
           //获取外层事务(PowerTrace 系统XXXService.xxx()方法的事务)提交、回滚状态
           boolean isOutTxnRollback = this.isOutTransactionRollbackOnly();

           //业务逻辑出错需要回滚,动态创建的事务也要跟着回滚
           if (isOutTxnRollback) {
               this.rollback();

           }
           //业务逻辑正常完成提交,动态创建的事务也要跟着提交
           else {
               this.commit();//
           }
       }

       @Override
       public void afterCompletion(int status) {
           if (this.connectionHolder != null) {
               Connection con = null;
               String url = null;
               try {
                   con = this.connectionHolder.getConnection();

                   if (con != null) {
                       url = con.getMetaData().getURL();

                       if (logger.isDebugEnabled()) {
                           logger.debug("PT release con:" + con.toString() + " ,URL:" + url);
                       }

                       connectionHolder.released();
                       DataSourceUtils.releaseConnection(con, dataSource);
                   }
               } catch (SQLException e) {
                   logger.error("Close connection error:" + con.toString() + " ,URL:" + url, e);
               }

               this.connectionHolder.reset();
               this.connectionHolder = null;
           }
       }

       private String getCurrentDbId() throws SQLException {
           ConnectionHolder currentHolder = (ConnectionHolder) TransactionSynchronizationManager
                   .getResource(MultiDbAspect.this.dataSource);

           if (currentHolder.getConnectionHandle() != null) {
               Connection conn = currentHolder.getConnectionHandle().getConnection();
               if (conn != null) {
                   //SimpleConnectionHandle: jdbc:oracle:thin:@10.20.151.4:1521:ptdev, UserName=xx, Oracle JDBC driver
                   String connectionDesc = conn.getMetaData().getURL();

                   int beginIdx = connectionDesc.indexOf("@") + 1;
                   int endIdx = connectionDesc.indexOf(":", beginIdx);

                   return connectionDesc.substring(beginIdx, endIdx);
               }
           }

           return null;
       }

       /**
        * 获取连接同步对象调用顺序号
        * 
        * @param dataSource
        * @return
        */
       private int getConnectionSynchronizationOrder(DataSource dataSource) {
           //一定要比DataSourceUtils.ConnectionSynchronization同步对象Order值小,要比该对象先运行(该对象主要用来清理获取的资源,如Collection.close)
           int order = DataSourceUtils.CONNECTION_SYNCHRONIZATION_ORDER - 1;
           DataSource currDs = dataSource;
           while (currDs instanceof DelegatingDataSource) {
               order--;
               currDs = ((DelegatingDataSource) currDs).getTargetDataSource();
           }
           return order;
       }

   }

   /**
    * @param dataSource the dataSource to set
    */
   public void setDataSource(DataSource dataSource) {
       this.dataSource = dataSource;
   }
}

 

import java.sql.Connection;
import java.sql.SQLException;

import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;


public class RoutingDataSource extends AbstractRoutingDataSource {

    protected Object determineCurrentLookupKey() {
        //获取当前线程处理的账号对应分片信息
        Shard shard = ThreadInfoHolder.getCurrentThreadShard();

        //动态选定DataSource
        String dbId = shard == null ? null : String.valueOf(shard.getDbId());

        return dbId;
    }

    @Override
    public String toString() {
        //获取当前线程处理的账号对应分片信息
        Shard shard = ThreadInfoHolder.getCurrentThreadShard();

        //动态选定DataSource
        String dbId = shard == null ? null : String.valueOf(shard.getDbId());

        return "DB ID " + dbId + ":" + super.toString();
    }

    public String getTargetDbId() throws SQLException {
        Connection conn = null;
        try {
            //jdbc:oracle:thin:@10.20.151.4:1521:ptdev, UserName=xx, Oracle JDBC driver
            conn = determineTargetDataSource().getConnection();

            if (conn != null) {
                String connectionDesc = conn.getMetaData().getURL();
                int beginIdx = connectionDesc.indexOf("@") + 1;
                int endIdx = connectionDesc.indexOf(":", beginIdx);

                return connectionDesc.substring(beginIdx, endIdx);
            }
        } finally {
            if (conn != null) {
                conn.close();
            }
        }

        return null;
    }

}

 

import java.util.List;
import java.util.Map;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.common.dao.model.User;
import com.service.impl.DbInfoServiceImpl;

public class TransactionTest {

public static void main(String[] args) {
ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath*:spring/datasource-config.xml");
DbInfoServiceImpl db=(DbInfoServiceImpl)ctx.getBean("dbInfoService");
User user=new User();
user.setName("xj");
user.setPassword("123");
List<Map<String,Object>> result =db.getUserInfo(user);
System.out.println(result);
}

}

 

 

分享到:
评论

相关推荐

    数据库与数据仓库.pptx

    数据库是更高层次的数据组织形式,它可以包含多个不同类型的文件,并通过关系模型进行组织和管理。 数据库与数据库管理系统(DBMS)是现代数据管理的核心。数据库是一种结构化的数据集合,用于存储、管理和检索信息...

    系统集成项目管理工程师学习笔记

    - 多态:同一操作作用于不同的对象,可以有不同的解释。 - 接口:规定了类的行为标准。 - 消息:对象之间的交互方式。 - 组件:可重用的软件单元。 - 模式:解决特定问题的设计模式。 - 复用:重复使用已有的代码或...

    java 编程常用英语单词 解释

    抽象类是不能被实例化的类,通常包含一个或多个未实现(即没有具体实现代码)的方法。抽象方法是没有主体的方法,即没有具体的实现细节。 - **用途**:抽象类和方法主要用于提供一个通用的模板或者接口,子类可以...

    计算机英语

    - **Fetch**:获取,指的是从数据库或其他数据源检索数据的过程。 - **ICONIFIED**:最小化状态,通常指窗口被最小化到任务栏的状态。 - **Expire**:过期,指数据或对象的有效期结束。 - **Extract**:提取,从文件...

    C++教材(钱能版).doc

    - **多态性**:同一种操作作用于不同的对象可以有不同的解释,导致不同的执行结果。 #### 五、C++语言简介 - **C++**:一种强大的编程语言,它包含了C语言的所有特性,并且支持面向对象编程。 - **过程性语言部分**...

    C#试题库()附-参考答案.docx

    37. **SQL选择语句**:`SELECT`关键字用于从数据库中选择数据。 38. **C#源代码文件**:通常以`.cs`为扩展名。 39. **continue语句**:在循环中遇到`continue`会结束当前循环迭代,进入下一次循环。 40. **集合类...

    C#试题库(全)附 参考答案.docx

    这篇文档包含了多个C#编程语言的基础知识,涵盖了类与对象、关键字、控制流、数据类型、接口、异常处理、数据库操作、循环结构、数组、集合类、控件使用以及SQL语句等多个方面。以下是这些知识点的详细说明: 1. 虚...

    SCJP必掌握的英语词汇

    在编程中,这可能指数据结构中的重复项或代码库中的重复代码。 #### 三维数组 (Three-Dimensional Array) **三维数组**是指具有三个维度的数组结构。它可以用来表示三维空间中的数据。 #### 约束 (Restrict) **...

    软件工程-理论与实践(许家珆)习题答案

    C) 数据源条目、数据流条目、数据处理条目、数据文件条目 D) 数据流条目、数据文件条目、数据池条目、加工条目 9. 在需求分析阶段主要采用图形工具来描述的原因是(B C)。 A) 图形的信息量大,便于描述规模大的...

    二十三种设计模式【PDF版】

    可扩展的使用 JDBC针对不同的数据库编程,Facade提供了一种灵活的实现. 设计模式之 Composite(组合) 就是将类用树形结构组合成一个单位.你向别人介绍你是某单位,你是单位中的一个元素,别人和你做买卖,相当于 和...

    abc

    多态则允许不同类型的对象对同一消息作出不同的响应。 4. **异常处理**:Java使用try-catch-finally结构来处理程序运行时可能出现的异常,这有助于提高程序的健壮性。 5. **集合框架**:Java集合框架提供了一组...

Global site tag (gtag.js) - Google Analytics