参考文档:
我们的需求达到的目标和现有的条件:
- 不同类型数据源都可能存在master和slave区分;
- 数据源之间已经可以通过package区分,不同package对应的service也不同;
- aop在service层面,对应不同数据源的service之间可能存在互相调用;
- 最外层方法的名称决定了该数据源应该使用master(可写)还是slave数据源(不可写);
- 在嵌套使用其他service的过程中,根据情况分析该service方法是否使用slave数据源;
我们在spring中的配置文件中使用了切面式的配置来定义声明式事务:
<tx:advice id="txAdvice" transaction-manager="transactionManager"> <tx:attributes> <tx:method name="save*" propagation="REQUIRED"/> <tx:method name="add*" propagation="REQUIRED"/> <tx:method name="create*" propagation="REQUIRED"/> <tx:method name="insert*" propagation="REQUIRED"/> <tx:method name="update*" propagation="REQUIRED"/> <tx:method name="merge*" propagation="REQUIRED"/> <tx:method name="del*" propagation="REQUIRED"/> <tx:method name="remove*" propagation="REQUIRED"/> <!--hibernate4必须配置为开启事务 否则 getCurrentSession()获取不到 --> <tx:method name="get*" propagation="SUPPORTS" read-only="true"/> <tx:method name="count*" propagation="SUPPORTS" read-only="true"/> <tx:method name="find*" propagation="SUPPORTS" read-only="true"/> <tx:method name="list*" propagation="SUPPORTS" read-only="true"/> <tx:method name="*" propagation="SUPPORTS" read-only="true"/> </tx:attributes> </tx:advice> <!-- 只对业务逻辑层实施事务 --> <aop:config expose-proxy="true"> <aop:pointcut id="txPointcut" expression="(execution(* com.api.example.*.*.service..*.*(..))) or (execution(* com.api.example.*.service..*.*(..)))"/> <aop:advisor advice-ref="txAdvice" pointcut-ref="txPointcut"/> </aop:config>
在aop:config中,只对业务逻辑层实施事务管理,此时需要定义pointCut:用于确定实行动态织入用到的方法条件,和advisor:用于确定方法中使用到的事务管理器,事务管理器中定义了各种类型方法前缀所定义的事务范围和传播属性。
我们的数据源需要动态定义,需要在事务开启之前,切入数据源去选择该方法执行过程到底是该使用读库还是写库来开启该事务,因此这个切面需要在org.springframework.transaction.interceptor.TransactionInterceptor之前就要起作用。
实现方案
完成的整体类图如下:
从AbstractDataSource中继承,增加写库以及多个从库,以便于在spring配置文件中能够配置该数据源,由于需要支持多个数据源的master/slave主从库配置,所以MasterSlaveDataSourceDecision中不能简单地定义静态ThreadLocal变量来维持当前事务状态,而且每个TransactionManager都需要定义对应的数据源决策类。
public class MasterSlaveDataSource extends AbstractDataSource implements InitializingBean { private static final Logger log = LoggerFactory.getLogger(MasterSlaveDataSource.class); private DataSource masterDataSource; private Map<String, DataSource> slaveDataSourceMap;
重写其中的getConnection()方法:
@Override public Connection getConnection() throws SQLException { return determineDataSource().getConnection(); } @Override public Connection getConnection(String username, String password) throws SQLException { return determineDataSource().getConnection(username, password); }
通过determineDataSource()方法来决定使用写库还是从库,对于多个从库来说,可以采用其他算法来支持,也可以根据线程ID,让同一个线程能够使用同一从库(当前实现并没有这么做):
public DataSource determineDataSource() { if (masterSlaveDataSourceDecision.isChoiceWrite()) { log.debug("current determine write datasource"); return masterDataSource; } else if (masterSlaveDataSourceDecision.isChoiceNone()) { log.debug("no choice read/write, default determine write datasource"); return masterDataSource; } else { return selectReadDataSource(); } }
定义切换数据源使用到的切面方法,当进行到需要启动事务的方法时,根据需要选择。我们一般会定义一个txAdvice,用于声明式事务的传播属性以及readonly属性,如果我们需要使用到该属性,需要利用spring的Bean加载完成通知,实现BeanPostProcessor接口中的postProcessAfterInitialization方法
public class MasterSlaveDataSourceProcessor implements BeanPostProcessor { private Map<String, Boolean> readWriteMethodMap = new HashMap<String, Boolean>(); private String txAdviceName; private MasterSlaveDataSourceDecision masterSlaveDataSourceDecision; public void setTxAdviceName(String txAdviceName) { this.txAdviceName = txAdviceName; } public void setMasterSlaveDataSourceDecision(MasterSlaveDataSourceDecision masterSlaveDataSourceDecision) { this.masterSlaveDataSourceDecision = masterSlaveDataSourceDecision; } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if (txAdviceName.equalsIgnoreCase(beanName)) { try { TransactionInterceptor transactionInterceptor = (TransactionInterceptor) bean; NameMatchTransactionAttributeSource transactionAttributeSource = (NameMatchTransactionAttributeSource) transactionInterceptor.getTransactionAttributeSource(); Field nameMapField = ReflectionUtils.findField(NameMatchTransactionAttributeSource.class, "nameMap"); nameMapField.setAccessible(true); Map<String, TransactionAttribute> nameMap = (Map<String, TransactionAttribute>) nameMapField .get(transactionAttributeSource); for (Entry<String, TransactionAttribute> entry : nameMap.entrySet()) { RuleBasedTransactionAttribute attr = (RuleBasedTransactionAttribute) entry.getValue(); // 仅对read-only的处理 String methodName = entry.getKey(); if (attr.isReadOnly()) { if (forceChoiceReadWhenWrite) { // 不管之前操作是写,默认强制从读库读 (设置为NOT_SUPPORTED即可) // NOT_SUPPORTED会挂起之前的事务 attr.setPropagationBehavior(Propagation.NOT_SUPPORTED.value()); } else { // 否则 设置为SUPPORTS(这样可以参与到写事务) attr.setPropagationBehavior(Propagation.SUPPORTS.value()); } } log.info("read/write transaction process method:{} force read:{}", methodName, forceChoiceReadWhenWrite); readWriteMethodMap.put(methodName, attr.isReadOnly()); } } catch (Exception e) { throw new ReadWriteDataSourceTransactionException("process read/write transaction error", e); } } return bean; }
由于我们的环境中允许存在多个数据源的主从库设置,也存在多个事务管理器,当然也会有多个txAdvice,这里在其中设置一个属性txAdvice名称,每个不同数据源的MasterSlaveDataSourceProcessor监听不同的txAdvice。
我们将其中的tx method属性分成两种类型,一种为只读(从库),一种为写(写库),将其放置到对应的readWriteMethodMap中,
<tx:method name="save*" propagation="REQUIRED"/> <tx:method name="add*" propagation="REQUIRED"/> <tx:method name="create*" propagation="REQUIRED"/> <tx:method name="insert*" propagation="REQUIRED"/> <tx:method name="update*" propagation="REQUIRED"/> <tx:method name="merge*" propagation="REQUIRED"/> <tx:method name="del*" propagation="REQUIRED"/> <tx:method name="remove*" propagation="REQUIRED"/> <tx:method name="get*" propagation="SUPPORTS" read-only="true"/> <tx:method name="count*" propagation="SUPPORTS" read-only="true"/> <tx:method name="find*" propagation="SUPPORTS" read-only="true"/> <tx:method name="list*" propagation="SUPPORTS" read-only="true"/> <tx:method name="*" propagation="SUPPORTS" read-only="true"/>
根据切面的方法名称,以及刚才获得的readWriteMethodMap,来确定该方法是否可以读从库来减轻压力:
public Object selectDataSource(ProceedingJoinPoint pjp) throws Throwable { if (isChoiceReadDB(pjp.getSignature().getName())) { masterSlaveDataSourceDecision.markRead(); } else { masterSlaveDataSourceDecision.markWrite(); } try { return pjp.proceed(); } catch (Throwable t) { masterSlaveDataSourceDecision.reset(); throw t; } finally { masterSlaveDataSourceDecision.pop(); } }
spring可以通过内置的PatternMatchUtils工具类,来进行简单匹配工作,实现最长路径匹配,找到最合适的matchName(该代码是从NameMatchTransactionAttributeSource.getTransactionAttribute()中获得)。
private boolean isChoiceReadDB(String methodName) { String bestNameMatch = null; for (String mappedName : this.readWriteMethodMap.keySet()) { if (PatternMatchUtils.simpleMatch(mappedName, methodName)&& (bestNameMatch == null || bestNameMatch.length() <= mappedName.length())) { bestNameMatch = mappedName; } } // 默认走写库 boolean currentRead = (bestNameMatch == null ? false : readWriteMethodMap.get(bestNameMatch)); // 如果当前为读库,并且设置了强制读,则忽略当前主库写状态 if (currentRead && forceChoiceReadWhenWrite) { return true; } // 如果之前选择了写库,则当前使用写库 if (masterSlaveDataSourceDecision.isChoiceWrite()) { return false; } return currentRead; }
由于需要支持service方法之间的嵌套操作,MasterSlaveDataSourceDecision需要使用ThreadLocal<Stack>的方法保存当前上下文对应的数据源配置:
public class MasterSlaveDataSourceDecision { public enum DataSourceType { write, read; } private final ThreadLocal<Stack<DataSourceType>> holder = new ThreadLocal<Stack<DataSourceType>>() { @Override protected Stack<DataSourceType> initialValue() { return new Stack<>(); } }; public void markWrite() { holder.get().push(DataSourceType.write); } public void markRead() { holder.get().push(DataSourceType.read); } public void reset() { holder.get().clear(); } public boolean isChoiceNone() { return holder.get().isEmpty(); } public boolean isChoiceWrite() { return !isChoiceNone() && DataSourceType.write == holder.get().peek(); } public boolean isChoiceRead() { return !isChoiceNone() && DataSourceType.read == holder.get().peek(); } public DataSourceType pop() { return isChoiceNone() ? null : holder.get().pop(); } }
配置方法
数据源定义应该使用我们定义的主从DataSource,其中包含了写库以及从库的相关配置:
<bean id="masterSlaveDataSource" class="com.api.example.tx.MasterSlaveDataSource"> <property name="masterSlaveDataSourceDecision" ref="masterSlaveDataSourceDecision"/> <property name="masterDataSource" ref="dataSource"/> <property name="slaveDataSourceMap"> <map><entry key="slave1" value-ref="dataSourceSlave"/></map> </property> </bean>
定义transactionManager使用该数据源:
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="masterSlaveDataSource"/> </bean>
定义txAdvice,关联该transactionManager,并定义事务的传播属性,以及readOnly属性
<tx:advice id="txAdvice" transaction-manager="transactionManager"> <tx:attributes> <tx:method name="save*" propagation="REQUIRED"/> <tx:method name="add*" propagation="REQUIRED"/> <tx:method name="create*" propagation="REQUIRED"/> <tx:method name="insert*" propagation="REQUIRED"/> <tx:method name="update*" propagation="REQUIRED"/> <tx:method name="merge*" propagation="REQUIRED"/> <tx:method name="del*" propagation="REQUIRED"/> <tx:method name="remove*" propagation="REQUIRED"/> <!--hibernate4必须配置为开启事务 否则 getCurrentSession()获取不到 --> <tx:method name="get*" propagation="SUPPORTS" read-only="true"/> <tx:method name="count*" propagation="SUPPORTS" read-only="true"/> <tx:method name="find*" propagation="SUPPORTS" read-only="true"/> <tx:method name="list*" propagation="SUPPORTS" read-only="true"/> <tx:method name="*" propagation="SUPPORTS" read-only="true"/> </tx:attributes> </tx:advice>
定义对应的aop config,使用aop:aspect,注意要设置order,保证该interceptor在事务启动之前能够选择到对应的dataSource,
<!-- 只对业务逻辑层实施事务 --> <aop:config expose-proxy="true"> <aop:pointcut id="txPointcut" expression="execution(* com.api.example.service..*.*(..))"/> <aop:advisor advice-ref="txAdvice" pointcut-ref="txPointcut"/> <aop:aspect order="-1" ref="masterSlaveDataSourceProcessor"> <aop:around method="selectDataSource" pointcut-ref="txPointcut"/> </aop:aspect> </aop:config>
加入Processor,以及decision,注意DataSourceProcessor需要关联对应的decision类,以及txAdvice名称(其id名称),以保证Processor会在对应的txAdvice加载完成后使用其定义的txAttributes属性信息,用于判断该事务的方法是否读主库,或从库。
<bean id="masterSlaveDataSourceProcessor" class="com.api.example.tx.MasterSlaveDataSourceProcessor"> <property name="txAdviceName" value="txAdvice"/> <property name="masterSlaveDataSourceDecision" ref="masterSlaveDataSourceDecision"/> </bean> <bean id="masterSlaveDataSourceDecision" class="com.api.example.tx.MasterSlaveDataSourceDecision"/>
此外com.api.example.tx.MasterSlaveDataSourceProcessor中forceChoiceReadWhenWrite属性用于控制该执行该方法时,是否需要强制读操作(如果存在嵌套事务,将当前事务挂起)。
经过测试,可以满足我们的需求,达到根据service包中方法名称动态切换主从数据源的目的。
相关推荐
在 `thc-datasources` 压缩包中,可能包含了更详细的配置示例、数据源切换注解的实现以及相关的辅助工具类,如 `DynamicDataSourceContextHolder`。读者可以结合这些资源,进一步理解和实践多数据源动态切换的实现。
创建一个自定义的数据源切换注解,比如`@SwitchDataSource`,并在需要切换数据源的方法上使用。通过AspectJ的切面处理,我们可以在方法执行前后动态改变ThreadLocal中的数据源引用。 3. **Spring Cloud Config ...
Spring Boot + MyBatis-Plus 实现 MySQL 主从复制动态数据源切换
本项目——"spring-boot-easy-connection-pool-master",主要探讨了如何在Spring Boot环境下配置和管理连接池,并且实现了动态配置多数据源的功能,这对于大数据部门和数据中台服务尤其重要。 首先,让我们深入理解...
基于 SpringBoot 多数据源 动态数据源 主从分离 快速启动器 支持分布式事务。一个基于springboot的快速集成多数据源的启动器。支持 数据源分组 ,适用于多种场景 纯粹多库 读写分离 一主多从 混合模式。支持数据库...
在实际应用中,为了实现双数据源切换,我们可能需要定义两个数据源,分别对应主库和从库,并在业务逻辑中根据读写需求选择合适的数据源。此外,还可以使用Spring的DataSourceRouter或AbstractRoutingDataSource等...
在许多实际项目中,我们可能需要连接并操作多个数据库,比如主从数据库、读写分离、不同环境的数据隔离等,这时就需要用到Spring的多数据源支持。 Spring多数据源允许我们在一个应用中同时管理多个数据库连接,通过...
计算机技术、IT咨询、人工智能AI理论介绍,学习参考资料计算机技术、IT咨询、人工智能AI理论介绍,学习参考资料计算机技术、IT咨询、人工智能AI理论介绍,学习参考资料计算机技术、IT咨询、人工智能AI理论介绍,学习...
5. **切换数据源**:在业务代码中,根据需求切换数据源。例如,在插入、更新、删除操作时使用主库,而在查询操作时使用从库。这通常通过AOP(面向切面编程)实现,定义一个切点和通知,根据方法的注解或其他条件判断...
在这个类中,我们将使用`@ConfigurationProperties`注解来绑定上面的YAML配置,然后通过`@Primary`和`@Qualifier`注解来指定主从数据源: ```java @Configuration @EnableJpaRepositories( basePackages = {...
基于Spring的 AbstractRoutingDataSource 进行简单的封装,方便进行数据源的切换,目前主要用于主从数据库的读写切换上。切换spring数据源的工具,使用aop注解方式进行快速切换,减少编码的入侵
同时,为了保证数据一致性,我们需要在主从数据源之间进行同步,确保写入主库的数据最终能够反映到从库。 工具方面,有很多开源工具可以帮助我们实现多数据源的管理和切换。例如,MySql的Replication Manager可以...
本示例主要讲解如何使用Spring Boot结合MyBatis实现多数据源切换,并确保AOP事务管理仍然有效。 首先,我们需要配置多数据源。在Spring Boot中,可以使用`DataSource`接口的实现类,如`HikariCP`或`Druid`,创建两...
4. 配置数据源切换:在`@Configuration`注解的类中,使用`@Bean`注解创建一个`PrimaryDataSourceBean`和`SecondaryDataSourceBean`,返回DruidDataSource实例。同时,创建一个`DataSource`的`@Bean`,返回`...
7. **测试与监控**: 实现动态数据源切换后,需要进行充分的测试以确保所有业务场景都能正确处理。同时,监控系统中的数据源切换情况,例如切换频率、失败率等,对于及时发现和解决问题至关重要。 8. **数据库复制与...
一个基于springboot的快速集成多数据源的启动器简介dynamic-datasource-spring-boot-starter是一个基于springboot的快速集成多数据源的启动器。其支持Jdk 1.7 +,SpringBoot 1.4.x 1.5.x 2.xx。文件| 文献资料|特性...
支持 多层数据源嵌套切换 。(ServiceA >>> ServiceB >>> ServiceC)。 提供 基于seata的分布式事务方案 。 支持 数据源分组 ,适用于多种场景 纯粹多库 读写分离 一主多从 混合模式。 支持数据库敏感配置信息 加
当项目需要连接并操作多个不同的数据库时,我们可以通过动态数据源切换来实现这一需求。本文将深入探讨如何在Spring Boot应用中利用注解实现多数据源的动态切换,以满足后端开发的需求。 首先,我们需要理解什么是...
##多数据源动态切换 #####支持数据源分组和简单的负载均衡(轮询) 作者:kyrin(云中鹤) ##项目简介 在公司开发的过程中,用到主从库的切换,于是就想自己动手实现一个可以实现多库之间的随意切 换。于是该项目就...
通过以上步骤,我们已经成功地在SpringBoot应用中实现了基于AOP的多数据源切换。在业务代码中,只需在需要切换数据源的方法上添加`@SwitchDataSource`注解,并指定相应的数据源类型,AOP切面就会自动处理数据源的...