前一节总结了如何配置动态路由,本节讨论如何在同一事物中访问不同数据库即分库事物的实现。
由于同一个事物只能绑定一个数据源连接,当切换数据源时需要解除老数据源连接的绑定,将新数据源绑定到当前线程,访问完毕后在将老数据源绑定回线程。
datasource-config.xml如下:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd"> <!-- AOP aspectj配置 可用于异常处理,权限,及参数验证等 --> <aop:aspectj-autoproxy proxy-target-class="true" /> <bean id="multiDbAspect" class="com.aop.MultiDbAspect"> <property name="dataSource" ref="dataSource" /> </bean> <!-- 数据源配置 --> <bean id="dataSourceFirst" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver" /> <property name="url" value="jdbc:oracle:thin:@10.20.151.4:1521:ptdev" /> <property name="username" value="pt" /> <property name="password" value="pt" /> <property name="maxActive" value="200" /> <property name="maxIdle" value="5" /> <property name="poolPreparedStatements" value="true" /> <property name="removeAbandoned" value="true" /> <property name="removeAbandonedTimeout" value="300" /> </bean> <bean id="dataSourceSecond" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver" /> <property name="url" value="jdbc:oracle:thin:@10.20.151.12:1521:pt10g" /> <property name="username" value="pt" /> <property name="password" value="pt" /> <property name="maxActive" value="200" /> <property name="maxIdle" value="5" /> <property name="poolPreparedStatements" value="true" /> <property name="removeAbandoned" value="true" /> <property name="removeAbandonedTimeout" value="300" /> </bean> <bean id="dataSource" class="com.common.bean.RoutingDataSource"> <property name="targetDataSources"> <map> <entry key="1" value-ref="dataSourceFirst" /> <entry key="2" value-ref="dataSourceSecond" /> </map> </property> <property name="defaultTargetDataSource"> <ref local="dataSourceFirst" /> </property> </bean> <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource"> <ref local="dataSource" /> </property> </bean> <bean id="lobHandler" class="org.springframework.jdbc.support.lob.DefaultLobHandler" lazy-init="true" /> <bean id="sqlMapClient" class="org.springframework.orm.ibatis.SqlMapClientFactoryBean"> <property name="dataSource" ref="dataSource" /> <property name="lobHandler" ref="lobHandler" /> <property name="configLocations" value="classpath*:/ibatis/config/sql-map.xml" /> </bean> <bean id="txAttributeSource" class="org.springframework.transaction.interceptor.NameMatchTransactionAttributeSource"> <property name="properties"> <props> <prop key="add*">PROPAGATION_REQUIRED,-PtServiceException</prop> <prop key="update*">PROPAGATION_REQUIRED,-PtServiceException</prop> <prop key="delete*">PROPAGATION_REQUIRED,-PtServiceException</prop> <prop key="batch*">PROPAGATION_REQUIRED,-PtServiceException</prop> <prop key="get*">PROPAGATION_REQUIRED,-PtServiceException</prop> </props> </property> </bean> <bean id="transactionManagerProxy" class="org.springframework.aop.framework.ProxyFactoryBean"> <property name="proxyTargetClass"> <value>true</value> </property> <property name="target"> <ref bean="transactionManager" /> </property> </bean> <bean id="transactionDefinition" class="org.springframework.transaction.interceptor.TransactionProxyFactoryBean" abstract="true"> <property name="transactionManager"> <ref bean="transactionManagerProxy" /> </property> <property name="transactionAttributeSource"> <ref bean="txAttributeSource" /> </property> <!--若不设置该属性,则运行main方法会报: Exception in thread "main" java.lang.ClassCastException: $Proxy5 cannot be cast to com.service.impl.DbInfoServiceImpl at com.transaction.TransactionTest.main(TransactionTest.java:16) 错误--> <property name="proxyTargetClass" value="true"> </property> </bean> <bean id="baseDao" class="com.common.dao.impl.BaseDaoImpl"> <property name="sqlMapClient"> <ref bean="sqlMapClient" /> </property> <property name="dataSource"> <ref bean="dataSource" /> </property> </bean> <--配置事务--> <bean id="dbInfoService" parent="transactionDefinition"> <property name="target"> <bean class="com.service.impl.DbInfoServiceImpl"> <property name="dbInfoDao" ref="dbInfoDao" /> </bean> </property> </bean> <bean id="dbInfoDao" class="com.service.dao.impl.DbInfoDaoImpl"> <property name="baseDao" ref="baseDao"></property> </bean> </beans>
public class DbInfoDaoImpl implements DbInfoDao{ public BaseDao baseDao; public void setBaseDao(BaseDao baseDao) { this.baseDao = baseDao; } public List<Map<String, Object>> addUserInfo(User user, Integer db) { baseDao.add("login.addUser", user); List<Map<String, Object>> result = baseDao.getList("login.getUserInfo", user.getName()); return result; } }
public class DbInfoServiceImpl implements DbInfoService{ public DbInfoDao dbInfoDao; public void setDbInfoDao(DbInfoDao dbInfoDao) { this.dbInfoDao = dbInfoDao; } public List<Map<String,Object>> getUserInfo(User user) { //数据源1 List<Map<String,Object>> result1=dbInfoDao.addUserInfo(user,1); //数据源2 List<Map<String,Object>> result2=dbInfoDao.addUserInfo(user,2); return result1; } }
import java.sql.Connection; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import javax.sql.DataSource; import org.apache.log4j.Logger; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.springframework.jdbc.datasource.ConnectionHolder; import org.springframework.jdbc.datasource.DataSourceUtils; import org.springframework.jdbc.datasource.DelegatingDataSource; import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy; import org.springframework.transaction.UnexpectedRollbackException; import org.springframework.transaction.support.DefaultTransactionDefinition; import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transaction.support.TransactionSynchronizationAdapter; import org.springframework.transaction.support.TransactionSynchronizationManager; import com.common.bean.Shard; import com.common.bean.ThreadInfoHolder; import com.common.bean.RoutingDataSource; @Aspect public class MultiDbAspect { public final static long NOT_FIND = -1L; private final Logger logger = Logger.getLogger(MultiDbAspect.class); private DataSource dataSource; @Around("execution (* com.service.dao.impl.*Impl.*(..))") public Object getDataFromAllDb(ProceedingJoinPoint jointPoint) throws Throwable { List<Object> returnList = new ArrayList<Object>(); Object[] args = jointPoint.getArgs(); Integer db=1; for (Object arg : args) { if (arg != null && Integer.class.isAssignableFrom(arg.getClass())) { db = (Integer) arg; break; } } //设置数据源 Shard shard = new Shard(); shard.setDbId(db.intValue()); //动态数据源同步对象 DynamicTransactionSynchonization syn = new DynamicTransactionSynchonization(); try { syn.initDynamicRouting(shard); Object retVal = jointPoint.proceed(); returnList.add(retVal); } catch (Throwable e) { this.logger.error(e.getMessage(), e); throw e; } finally { syn.endDynamicRouting(); } return returnList; } private Shard getShard(Long accountId, List<Shard> shards) { for (Shard shard : shards) { if (shard.getAccountId().longValue() == accountId.longValue()) { return shard; } } return null; } /** * 动态事务源同步类,以实现跟其它数据源事务同步,实现类JTA功能 * * @author xiaoming.niexm */ final private class DynamicTransactionSynchonization extends TransactionSynchronizationAdapter { private Logger logger = Logger .getLogger(DynamicTransactionSynchonization.class); private ConnectionHolder connectionHolder; private int order; private ConnectionHolder oldConnectionHolder; private List<TransactionSynchronization> oldSynchronizations; public int getOrder() { return this.order; } public DynamicTransactionSynchonization() { this.order = this.getConnectionSynchronizationOrder(MultiDbAspect.this.dataSource); } /** * 暂停线程上绑定的事务资源,如Connection * 分库访问时,需要暂时本线程上的已有事务资源,从DataSource上获取新Connection(否则只能拿到原事务connection), */ @SuppressWarnings("unchecked") private void suspendThreadTransactionResource() { //获取线程绑定的事务资源 this.oldConnectionHolder = (ConnectionHolder) TransactionSynchronizationManager .getResource(dataSource); this.oldSynchronizations = TransactionSynchronizationManager.getSynchronizations(); //清除线程绑定的事务资源,之后操作须从DataSource获取新的Connection if (oldConnectionHolder != null) { TransactionSynchronizationManager.unbindResource(dataSource); } } /** * 根据shard指定的数据源动态获取Connection * * @throws SQLException */ private void initDynamicRouting1(Shard shard) throws SQLException { ThreadInfoHolder.addCurrentThreadShard(shard); } private void initDynamicRouting(Shard shard) throws SQLException { //在当前线程threadLocalMap里面,存储分片信息,以便PtRoutingDataSource获取 ThreadInfoHolder.addCurrentThreadShard(shard); //业务方法已配置事务 if (TransactionSynchronizationManager.hasResource(dataSource)) { String currentThreadDb = getCurrentDbId(); String changeToDb = ((RoutingDataSource) dataSource).getTargetDbId(); //将要切换的DB为当前DB,不做事务资源切换,等同于默认的require事务 if (currentThreadDb != null && currentThreadDb.equals(changeToDb)) { return; } Integer isolationLevel = TransactionSynchronizationManager .getCurrentTransactionIsolationLevel(); //暂停线程上绑定的事务资源,如Connection this.suspendThreadTransactionResource(); //事务数据源标志 boolean transactionAware = (dataSource instanceof TransactionAwareDataSourceProxy); //根据 上一步设置的shard信息,根据shard指定的数据源动态获取Connection(会自动绑定到线程,调用XxxDao.xxx()方法访问数据库时,使用绑定的Connection) if (transactionAware) { dataSource.getConnection(); } else { DataSourceUtils.doGetConnection(dataSource); } //获取上一步绑定到线程的事务资源 this.connectionHolder = (ConnectionHolder) TransactionSynchronizationManager .getResource(dataSource); if (logger.isDebugEnabled()) { Connection con = this.connectionHolder.getConnection(); logger.debug("PT change con:" + con.toString() + " URL:" + con.getMetaData().getURL()); } //自动创建的connection为autocommit,须设置Connection属性后重新绑定 TransactionSynchronizationManager.unbindResource(dataSource); //设置Connection连接属性 if (isolationLevel != null) { DefaultTransactionDefinition txnDef = new DefaultTransactionDefinition(); txnDef.setIsolationLevel(isolationLevel); DataSourceUtils.prepareConnectionForTransaction(this.connectionHolder .getConnection(), txnDef); } this.connectionHolder.getConnection().setAutoCommit(false); //重新绑定 TransactionSynchronizationManager.bindResource(dataSource, this.connectionHolder); //主要删除新connection的Synchronization if (TransactionSynchronizationManager.isSynchronizationActive()) { TransactionSynchronizationManager.clearSynchronization(); } //向业务方法事务注册同步对象,以便业务方法事务提交/回滚前,调用本类动态创建的Connection相关事务 TransactionSynchronizationManager.initSynchronization(); TransactionSynchronizationManager.registerSynchronization(this); } //else 业务方法没有配置事务,Ibatis执行时会自动从连接池获取一个新的autocommit Connection } /** * 动态获取Datasource连接操作结束后,恢复 */ private void endDynamicRouting() { //清空当前线程threadLocalMap里面存储的分片信息,防止影响线程其它方法调用 ThreadInfoHolder.cleanCurrentThreadShard(); //恢复initDynamicRouting()暂停的事务资源,如Connection(业务事务资源) if (this.oldConnectionHolder != null) { //解除initDynamicRouting()方法中绑定的动态数据源(注,提交、回滚操作在外层事务提交,回滚时自动调用beforeCompletion()完成,清除资源自动调用DataSourceUtils.ConnectionSynchronization.afterCompletion() TransactionSynchronizationManager.unbindResource(dataSource); TransactionSynchronizationManager.bindResource(dataSource, oldConnectionHolder); } if (TransactionSynchronizationManager.isSynchronizationActive() && this.oldSynchronizations != null && this.oldSynchronizations.size() > 0) { for (TransactionSynchronization oldSynchronization : oldSynchronizations) { TransactionSynchronizationManager.registerSynchronization(oldSynchronization); } } } /** * 动态事务提交 */ private void commit() throws UnexpectedRollbackException { try { this.connectionHolder.getConnection().commit(); } catch (SQLException e) { String msg = "Commit dynamic transaction error:" + e.getMessage(); logger.error(msg, e); throw new UnexpectedRollbackException(msg, e); } } /** * 动态事务回滚 */ private void rollback() throws UnexpectedRollbackException { try { this.connectionHolder.getConnection().rollback(); } catch (SQLException e) { String msg = "Rollback dynamic transaction error:" + e.getMessage(); logger.error(msg, e); throw new UnexpectedRollbackException(msg, e); } } /** * 获取外层事务资源 * * @return */ private ConnectionHolder getOutTxnCollectionHolder() { return (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource); } /** * 获取外层事务是否已回滚 * * @return */ private boolean isOutTransactionRollbackOnly() { ConnectionHolder outTxnHolder = this.getOutTxnCollectionHolder(); return (outTxnHolder != null && outTxnHolder.isRollbackOnly()); } /** * 提交、回滚动态创建的事务(外层事务提交、回滚之前调用本方法) */ @Override public void beforeCompletion() { //获取外层事务(PowerTrace 系统XXXService.xxx()方法的事务)提交、回滚状态 boolean isOutTxnRollback = this.isOutTransactionRollbackOnly(); //业务逻辑出错需要回滚,动态创建的事务也要跟着回滚 if (isOutTxnRollback) { this.rollback(); } //业务逻辑正常完成提交,动态创建的事务也要跟着提交 else { this.commit();// } } @Override public void afterCompletion(int status) { if (this.connectionHolder != null) { Connection con = null; String url = null; try { con = this.connectionHolder.getConnection(); if (con != null) { url = con.getMetaData().getURL(); if (logger.isDebugEnabled()) { logger.debug("PT release con:" + con.toString() + " ,URL:" + url); } connectionHolder.released(); DataSourceUtils.releaseConnection(con, dataSource); } } catch (SQLException e) { logger.error("Close connection error:" + con.toString() + " ,URL:" + url, e); } this.connectionHolder.reset(); this.connectionHolder = null; } } private String getCurrentDbId() throws SQLException { ConnectionHolder currentHolder = (ConnectionHolder) TransactionSynchronizationManager .getResource(MultiDbAspect.this.dataSource); if (currentHolder.getConnectionHandle() != null) { Connection conn = currentHolder.getConnectionHandle().getConnection(); if (conn != null) { //SimpleConnectionHandle: jdbc:oracle:thin:@10.20.151.4:1521:ptdev, UserName=xx, Oracle JDBC driver String connectionDesc = conn.getMetaData().getURL(); int beginIdx = connectionDesc.indexOf("@") + 1; int endIdx = connectionDesc.indexOf(":", beginIdx); return connectionDesc.substring(beginIdx, endIdx); } } return null; } /** * 获取连接同步对象调用顺序号 * * @param dataSource * @return */ private int getConnectionSynchronizationOrder(DataSource dataSource) { //一定要比DataSourceUtils.ConnectionSynchronization同步对象Order值小,要比该对象先运行(该对象主要用来清理获取的资源,如Collection.close) int order = DataSourceUtils.CONNECTION_SYNCHRONIZATION_ORDER - 1; DataSource currDs = dataSource; while (currDs instanceof DelegatingDataSource) { order--; currDs = ((DelegatingDataSource) currDs).getTargetDataSource(); } return order; } } /** * @param dataSource the dataSource to set */ public void setDataSource(DataSource dataSource) { this.dataSource = dataSource; } }
import java.sql.Connection; import java.sql.SQLException; import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource; public class RoutingDataSource extends AbstractRoutingDataSource { protected Object determineCurrentLookupKey() { //获取当前线程处理的账号对应分片信息 Shard shard = ThreadInfoHolder.getCurrentThreadShard(); //动态选定DataSource String dbId = shard == null ? null : String.valueOf(shard.getDbId()); return dbId; } @Override public String toString() { //获取当前线程处理的账号对应分片信息 Shard shard = ThreadInfoHolder.getCurrentThreadShard(); //动态选定DataSource String dbId = shard == null ? null : String.valueOf(shard.getDbId()); return "DB ID " + dbId + ":" + super.toString(); } public String getTargetDbId() throws SQLException { Connection conn = null; try { //jdbc:oracle:thin:@10.20.151.4:1521:ptdev, UserName=xx, Oracle JDBC driver conn = determineTargetDataSource().getConnection(); if (conn != null) { String connectionDesc = conn.getMetaData().getURL(); int beginIdx = connectionDesc.indexOf("@") + 1; int endIdx = connectionDesc.indexOf(":", beginIdx); return connectionDesc.substring(beginIdx, endIdx); } } finally { if (conn != null) { conn.close(); } } return null; } }
import java.util.List; import java.util.Map; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import com.common.dao.model.User; import com.service.impl.DbInfoServiceImpl; public class TransactionTest { public static void main(String[] args) { ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath*:spring/datasource-config.xml"); DbInfoServiceImpl db=(DbInfoServiceImpl)ctx.getBean("dbInfoService"); User user=new User(); user.setName("xj"); user.setPassword("123"); List<Map<String,Object>> result =db.getUserInfo(user); System.out.println(result); } }
相关推荐
数据库是更高层次的数据组织形式,它可以包含多个不同类型的文件,并通过关系模型进行组织和管理。 数据库与数据库管理系统(DBMS)是现代数据管理的核心。数据库是一种结构化的数据集合,用于存储、管理和检索信息...
- 多态:同一操作作用于不同的对象,可以有不同的解释。 - 接口:规定了类的行为标准。 - 消息:对象之间的交互方式。 - 组件:可重用的软件单元。 - 模式:解决特定问题的设计模式。 - 复用:重复使用已有的代码或...
抽象类是不能被实例化的类,通常包含一个或多个未实现(即没有具体实现代码)的方法。抽象方法是没有主体的方法,即没有具体的实现细节。 - **用途**:抽象类和方法主要用于提供一个通用的模板或者接口,子类可以...
- **Fetch**:获取,指的是从数据库或其他数据源检索数据的过程。 - **ICONIFIED**:最小化状态,通常指窗口被最小化到任务栏的状态。 - **Expire**:过期,指数据或对象的有效期结束。 - **Extract**:提取,从文件...
- **多态性**:同一种操作作用于不同的对象可以有不同的解释,导致不同的执行结果。 #### 五、C++语言简介 - **C++**:一种强大的编程语言,它包含了C语言的所有特性,并且支持面向对象编程。 - **过程性语言部分**...
37. **SQL选择语句**:`SELECT`关键字用于从数据库中选择数据。 38. **C#源代码文件**:通常以`.cs`为扩展名。 39. **continue语句**:在循环中遇到`continue`会结束当前循环迭代,进入下一次循环。 40. **集合类...
这篇文档包含了多个C#编程语言的基础知识,涵盖了类与对象、关键字、控制流、数据类型、接口、异常处理、数据库操作、循环结构、数组、集合类、控件使用以及SQL语句等多个方面。以下是这些知识点的详细说明: 1. 虚...
在编程中,这可能指数据结构中的重复项或代码库中的重复代码。 #### 三维数组 (Three-Dimensional Array) **三维数组**是指具有三个维度的数组结构。它可以用来表示三维空间中的数据。 #### 约束 (Restrict) **...
C) 数据源条目、数据流条目、数据处理条目、数据文件条目 D) 数据流条目、数据文件条目、数据池条目、加工条目 9. 在需求分析阶段主要采用图形工具来描述的原因是(B C)。 A) 图形的信息量大,便于描述规模大的...
可扩展的使用 JDBC针对不同的数据库编程,Facade提供了一种灵活的实现. 设计模式之 Composite(组合) 就是将类用树形结构组合成一个单位.你向别人介绍你是某单位,你是单位中的一个元素,别人和你做买卖,相当于 和...
多态则允许不同类型的对象对同一消息作出不同的响应。 4. **异常处理**:Java使用try-catch-finally结构来处理程序运行时可能出现的异常,这有助于提高程序的健壮性。 5. **集合框架**:Java集合框架提供了一组...