`
IXHONG
  • 浏览: 452318 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

从AbstractRoutingDataSource说分库分表实现

阅读更多

    很多人不知分库分表怎么实现,可能是把它想得复杂了。事实上,我们将复杂的事情分工后就简单了。如果仅仅是单库分表,那直接在代码中根据分表的维度得到表名后缀,如“0001”,然后比如在mybatis下,sql语句就可以这么写“select * from user_#tbIndex#”。程序中我们能够操作数据库中的表,是因为我们拿到了数据源DataSource,并由此getConnection(),因此对于分库分表,我们首先要实现的是动态数据源,我们根据路由规则确定要访问哪个数据源的哪个表。怎么实现数据源的切换呢?而且多个数据源的连接要怎么管理呢?

    Spring为我们提供了实现方案,核心类是AbstractRoutingDataSource,代码如下:

    

public abstract class AbstractRoutingDataSource extends AbstractDataSource implements InitializingBean {
    private Map<Object, Object> targetDataSources;
    private Object defaultTargetDataSource;
    private boolean lenientFallback = true;
    private DataSourceLookup dataSourceLookup = new JndiDataSourceLookup();
    private Map<Object, DataSource> resolvedDataSources;
    private DataSource resolvedDefaultDataSource;

    public AbstractRoutingDataSource() {
    }

    public void setTargetDataSources(Map<Object, Object> targetDataSources) {
        this.targetDataSources = targetDataSources;
    }

    public void setDefaultTargetDataSource(Object defaultTargetDataSource) {
        this.defaultTargetDataSource = defaultTargetDataSource;
    }

    public void setLenientFallback(boolean lenientFallback) {
        this.lenientFallback = lenientFallback;
    }

    public void setDataSourceLookup(DataSourceLookup dataSourceLookup) {
        this.dataSourceLookup = (DataSourceLookup)(dataSourceLookup != null?dataSourceLookup:new JndiDataSourceLookup());
    }

    public void afterPropertiesSet() {
        if(this.targetDataSources == null) {
            throw new IllegalArgumentException("Property \'targetDataSources\' is required");
        } else {
            this.resolvedDataSources = new HashMap(this.targetDataSources.size());
            Iterator var2 = this.targetDataSources.entrySet().iterator();

            while(var2.hasNext()) {
                Entry entry = (Entry)var2.next();
                Object lookupKey = this.resolveSpecifiedLookupKey(entry.getKey());
                DataSource dataSource = this.resolveSpecifiedDataSource(entry.getValue());
                this.resolvedDataSources.put(lookupKey, dataSource);
            }

            if(this.defaultTargetDataSource != null) {
                this.resolvedDefaultDataSource = this.resolveSpecifiedDataSource(this.defaultTargetDataSource);
            }

        }
    }

    protected DataSource resolveSpecifiedDataSource(Object dataSource) throws IllegalArgumentException {
        if(dataSource instanceof DataSource) {
            return (DataSource)dataSource;
        } else if(dataSource instanceof String) {
            return this.dataSourceLookup.getDataSource((String)dataSource);
        } else {
            throw new IllegalArgumentException("Illegal data source value - only [javax.sql.DataSource] and String supported: " + dataSource);
        }
    }

    public Connection getConnection() throws SQLException {
        return this.determineTargetDataSource().getConnection();
    }

    public Connection getConnection(String username, String password) throws SQLException {
        return this.determineTargetDataSource().getConnection(username, password);
    }

    protected DataSource determineTargetDataSource() {
        Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
        Object lookupKey = this.determineCurrentLookupKey();
        DataSource dataSource = (DataSource)this.resolvedDataSources.get(lookupKey);
        if(dataSource == null && (this.lenientFallback || lookupKey == null)) {
            dataSource = this.resolvedDefaultDataSource;
        }

        if(dataSource == null) {
            throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
        } else {
            return dataSource;
        }
    }

    protected Object resolveSpecifiedLookupKey(Object lookupKey) {
        return lookupKey;
    }

    protected abstract Object determineCurrentLookupKey();
}

     

    AbstractRoutingDataSource实现了AbstractDataSource,该抽象类又继承了javax.sql.DataSource接口。我们常用的org.apache.commons.dbcp.BasicDataSource就是实现了这个接口,该接口的核心方法是getConnection(),AbstractRoutingDataSource实现该方法如下:

 

public Connection getConnection() throws SQLException {
    return this.determineTargetDataSource().getConnection();
}

 

显然我们要关注选择目标数据源的方法,该方法中两个重要的地方是determineCurrentLookupKey()方法和属性resolvedDataSources。determineCurrentLookupKey()是个抽象方法,需要我们自己去实现,返回的是当前要操作的数据源的标识。resolvedDataSources和resolvedDefaultDataSource是在bean实例化后的操作得到的,即afterPropertiesSet()。下面给出bean的配置:

 

<bean id="dynamicDataSource" class="org.javared.wely.dao.db.DynamicDataSource">  
   <property name="targetDataSources">     
      <map key-type="java.lang.String">     
         <entry key="db1" value-ref="dataSource1"/>     
         <entry key="db2" value-ref="dataSource2"/>     
      </map>     
   </property>     
   <property name="defaultTargetDataSource" ref="dataSource"/>    
</bean>

 

 DynamicDataSource需实现determineCurrentLookupKey()方法,代码如下:

 

public class DynamicDataSource extends AbstractRoutingDataSource {
    public DynamicDataSource() {
    }

    protected Object determineCurrentLookupKey() {
        return DbContextHolder.getDbKey(); // ThreadLocal
    }
}

 

显然,现在我们的重点是路由规则实现了,即根据某个或几个字段维度找到对应的DB和table,并把dbKey和tbIndex保存于当前线程中。

 

   <bean id="dbRouter" class="org.javared.wely.dao.db.DBRouterImpl">
		<property name="dbRules">
			<list>
				<ref bean="dbRule1" />
			</list>
		</property>
	</bean>

	<bean id="dbRule1" class="org.javared.wely.db.DbRule">
                <!-- 维度字段计算得到的long值范围 -->
		<property name="routeFieldStart" value="0"></property>
		<property name="routeFieldEnd" value="9200000000000000000"></property>
                <!-- db个数 -->
		<property name="dbNumber" value="2"></property>
                <!-- 路由规则,分表,分库,既分库又分表 -->
		<property name="routeType" value="2"></property>
                <!-- 每个库里分表个数 -->
		<property name="tableNumber" value="2"></property>
		<property name="dbKeys">
			<list>
				<value>db1</value>
				<value>db2</value>
			</list>
		</property>
	</bean>

 

public String route(String fieldId) {
	if(StringUtils.isEmpty(fieldId)) {
	    throw new IllegalArgumentException("dbsCount and tablesCount must be both positive!");
	} else {
            // base64编码得到的字符串取hashcode
	    int routeFieldInt = RouteUtils.getResourceCode(fieldId); 
	    String dbKey = getDbKey(this.dbRules, routeFieldInt);
	    return dbKey;
	}
}

public static String getDbKey(List<DbRule> rules, int routeFieldInt) {
        Object dbRule = null;
        if(rules != null && rules.size() > 0) {
            String dbKey = null;
            Iterator<DbRule> iter = rules.iterator();
            while(iter.hasNext()) {
                    DbRule item = iter.next();
                    if(item.getDbKeys() != null && item.getDbNumber() != 0) {
                        long dbIndex = 0L;
                        long tbIndex = 0L;
                        long mode = (long)item.getDbNumber();
                        String tableIndex;
                        if(item.getRouteType() == 2 && item.getTableNumber() != 0) {
                           // 分库又分表
                            mode = (long)(item.getDbNumber() * item.getTableNumber());
                            dbIndex = (long)routeFieldInt % mode / (long)item.getTableNumber();
                            tbIndex = (long)(routeFieldInt % item.getTableNumber());
                            tableIndex = getFormateTableIndex(item.getTableIndexStyle(), tbIndex);
                            DbContextHolder.setTableIndex(tableIndex);
                        } else if(item.getRouteType() == 0) { // 只分库
                            mode = (long)item.getDbNumber();
                            dbIndex = (long)routeFieldInt % mode;
                        } else if(item.getRouteType() == 1) { // 只分表
                            tbIndex = (long)(routeFieldInt % item.getTableNumber());
                            tableIndex = getFormateTableIndex(item.getTableIndexStyle(), tbIndex);
                            DbContextHolder.setTableIndex(tableIndex);
                        }

                        dbKey = (String)item.getDbKeys().get(Long.valueOf(dbIndex).intValue());
                        log.info("resource:{}------->dbkey:{},tableIndex:{},", new Object[]{Integer.valueOf(routeFieldInt), dbKey, Long.valueOf(tbIndex)});
                        DbContextHolder.setDbKey(dbKey);
                    }
                    break;
            }

            return dbKey;
        } else {
            throw new IllegalArgumentException("dbsCount and tablesCount must be both positive!");
        }
    }

 

public class RouteUtils {
    private static final Logger log = LoggerFactory.getLogger(RouteUtils.class);
    private static final String encode = "utf-8";
    private static final int resourceMax = 10000;

    public RouteUtils() {
    }

    public static int getHashCodeBase64(String routeValue) {
        int hashCode = 0;

        try {
            String e = Base64Binrary.encodeBase64Binrary(routeValue.getBytes("utf-8"));
            hashCode = Math.abs(e.hashCode());
        } catch (Exception var3) {
            log.error("hashCode 失败", var3);
        }

        return hashCode;
    }

    public static int getResourceCode(String routeValue) {
        int hashCode = getHashCodeBase64(routeValue);
        int resourceCode = hashCode % 10000;
        return resourceCode;
    }

    public static void main(String[] args) {
        String payid = "140331160123935469773";
        String resource = payid.substring(payid.length() - 4);
        int routeFieldInt = Integer.valueOf(resource).intValue();
        short mode = 1200;
        int dbIndex = routeFieldInt % mode / 200;
        int tbIndex = routeFieldInt % 200;
        System.out.println(dbIndex + "-->" + tbIndex);
    }
}

 

    应用时,先执行dbRouter.route(field),这时dynamicDataSource.getConnection()得到的就是当前线程需要对应的数据源连接,DbContextHolder.getTableIndex()得到的是当前线程需要对应的表名后缀。

 

 最后,对于dbRouter.route(field)和DbContextHolder.getTableIndex(),我们可以用注解的方式来处理,这样程序员只需在代码中加入注解即可。下面给出一种解决方案:

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface DoRoute {
    String routeField() default "userId";

    String tableStyle() default "_0000";
}

@Aspect
@Component
public class DBRouterInterceptor {
    private static final Logger log = LoggerFactory.getLogger(DBRouterInterceptor.class);
    private DBRouter dBRouter;

    public DBRouterInterceptor() {
    }

    @Pointcut("@annotation( com.jd.jr.baitiao.dbrouter.annotation.DoRoute)")
    public void aopPoint() {
    }

    @Before("aopPoint()")
    public Object doRoute(JoinPoint jp) throws Throwable {
        long t1 = System.currentTimeMillis();
        boolean result = true;
        Method method = this.getMethod(jp);
        DoRoute doRoute = (DoRoute)method.getAnnotation(DoRoute.class);
        String routeField = doRoute.routeField();
        Object[] args = jp.getArgs();
        if(args != null && args.length > 0) {
            for(int i = 0; i < args.length; ++i) {
                long t2 = System.currentTimeMillis();
                String routeFieldValue = BeanUtils.getProperty(args[i], routeField);
                if(StringUtils.isNotEmpty(routeFieldValue)) {
                    if("userId".equals(routeField)) {
                        this.dBRouter.doRouteByResource("" + RouteUtils.getResourceCode(routeFieldValue));
                    } else {
                        String resource = routeFieldValue.substring(routeFieldValue.length() - 4);
                        this.dBRouter.doRouteByResource(resource);
                    }
                    break;
                }
            }
        }

        log.info("doRouteTime{}" + (System.currentTimeMillis() - t1));
        return Boolean.valueOf(result);
    }

    private Method getMethod(JoinPoint jp) throws NoSuchMethodException {
        Signature sig = jp.getSignature();
        MethodSignature msig = (MethodSignature)sig;
        return this.getClass(jp).getMethod(msig.getName(), msig.getParameterTypes());
    }

    private Class<? extends Object> getClass(JoinPoint jp) throws NoSuchMethodException {
        return jp.getTarget().getClass();
    }

    public DBRouter getdBRouter() {
        return this.dBRouter;
    }

    public void setdBRouter(DBRouter dBRouter) {
        this.dBRouter = dBRouter;
    }
}

 上面定义了一个切面,需要在spring配置文件中加上<aop:aspectj-autoproxy />,这样spring会发现切面并织入到匹配的目标bean中。

 

附:生产环境配置参数参考

 

sqlMapConfig配置
<settings cacheModelsEnabled="false" enhancementEnabled="true"
		lazyLoadingEnabled="false" errorTracingEnabled="true" maxRequests="200"
		maxSessions="60" maxTransactions="20" useStatementNamespaces="true"
		defaultStatementTimeout="2" />
 

 

 

<bean id="dataSource1" class="org.apache.commons.dbcp.BasicDataSource">
   <property name="driverClassName" value="${db.jdbc.driverClassName}" />
   <property name="url" value="${db1.jdbc.url}" />
   <property name="username" value="${db1.jdbc.username}" />
   <property name="password" value="${db1.jdbc.password}" />
   <property name="maxActive" value="20" />
   <property name="maxIdle" value="3" />
   <property name="maxWait" value="15000" />
   <property name="timeBetweenEvictionRunsMillis" value="60000" />
   <property name="minEvictableIdleTimeMillis" value="180000" />
</bean>

 

<!-- MQ发消息线程池 -->
<bean id="taskMqExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >
   <!-- 核心线程数  -->
<property name="corePoolSize" value="10" />
   <!-- 最大线程数 -->
<property name="maxPoolSize" value="200" />
   <!-- 队列最大长度 -->
<property name="queueCapacity" value="500" />
   <!-- 线程池维护线程所允许的空闲时间 -->
<property name="keepAliveSeconds" value="5" />
   <!-- 线程池对拒绝任务(无线程可用)的处理策略 -->
<property name="rejectedExecutionHandler">
      <bean class="java.util.concurrent.ThreadPoolExecutor$DiscardPolicy" />
   </property>
</bean>
分享到:
评论
2 楼 IXHONG 2017-04-14  
好,可以加我联系方式 javahongxi@qq.com
pan19849529 写道
大神,能发个demo看一下吗

1 楼 pan19849529 2017-04-14  
大神,能发个demo看一下吗

相关推荐

    spring动态数据源+mybatis分库分表

    "spring动态数据源+mybatis分库分表"是一个针对大型数据库场景的解决方案,它利用Spring框架的动态数据源功能和MyBatis的SQL映射能力,实现数据库的透明化分片。以下是这个主题的详细知识点: 1. **Spring动态数据...

    zdp-sharding-jdbc:轻量级分库分表框架

    这是一个轻量级分库分表框架&读写分离框架,无需搭建中间层代理服务,开发者只需要引入本框架的jar包,并进行相应的配置即可实现分库分表&读写分离逻辑 读写分离 读写分离的实现依然是继承了spring提供的...

    SpringBoot 多数据源

    总结来说,SpringBoot结合多数据源和MySQL的分库分表策略,能够有效地应对大数据量和高并发的挑战,提升系统的稳定性和性能。通过ShardingSphere等工具,我们可以方便地实现这些功能,简化开发过程。在实际项目中,...

    spring boot AOP注解方式实现多数据源

    在Spring Boot中,AOP(面向切面编程)和多数据源的整合是常见的应用场景,尤其是在大型企业级项目中,为了实现数据的隔离或者优化...在实际项目中,这样的设计可以满足复杂的数据访问需求,比如读写分离、分库分表等。

    基于mybatis,springboot开箱即用的读写分离插件.zip

    6. 分库分表:在大型系统中,可能还需要结合分库分表技术,进一步提高数据库的处理能力。例如ShardingSphere等工具可以帮助实现这一目标。 7. 监控与调优:为了保证系统的稳定性和性能,需要对数据库的读写分离效果...

    spring +springboot+mybatis+maven 读写分离及事务管理

    spring +springboot+mybatis+maven 读写分离,数据库采用mysql, 采用springboot 采用项目框架搭建,继承spring 中的AbstractRoutingDataSource,实现 determineCurrentLookupKey 进行数据源的动态切换,采用Spring ...

    java简单分布式架构,多个数据源,线程池多线程访问

    在Java应用中,特别是Web应用,往往需要连接到多个数据库,例如,主从数据库分离、读写分离、分库分表等场景。Spring框架的多数据源支持非常完善,可以通过AbstractRoutingDataSource实现动态数据源切换,或者使用...

    spring boot mybatis多数据源最简解决方案

    在大型分布式系统中,通常采用数据库主从复制或者分库分表策略来提高系统的可扩展性和性能。主从模式可以实现读写分离,提升读取效率;分库分表则可以分散负载,避免单一数据库成为性能瓶颈。因此,多数据源的配置是...

    多数据源应用

    4. ShardingSphere:阿里巴巴开源的数据库中间件,提供分库分表、读写分离等功能,支持多数据源。 四、优缺点 优点: 1. 提高系统灵活性:可以根据业务需求动态调整数据源。 2. 提升系统健壮性:通过数据源冗余,...

    mybatis+mysql+springmvc + multidatasource 多数据源

    通过分库分表,可以处理大规模数据,防止单一数据库成为系统瓶颈。同时,该架构也便于后期的系统升级和维护,降低了技术债务。 总之,"mybatis+mysql+springmvc + multidatasource 多数据源"的组合是企业级应用中...

    springboot实现多数据源

    在实际应用中,多数据源常用于读写分离、分库分表等场景。例如,读操作从一个数据库获取数据,写操作则提交到另一个数据库。这种设计提高了系统的并发处理能力和可用性。 总结来说,Spring Boot实现多数据源的关键...

    一套Spring+Hibernate的多个数据库切换的源码

    通过以上步骤,这套源码应该演示了如何在Spring和Hibernate环境中实现多数据源的动态切换,这对于处理分布式系统中的数据隔离或分库分表等复杂情况非常有帮助。学习并理解这个项目,可以帮助开发者掌握更高级的...

    spring-boot-mybatis-annotation-mulidatasource.zip_REVL_TSPS_myba

    本项目"spring-boot-mybatis-annotation-mulidatasource"着重于利用Spring Boot和MyBatis注解实现多数据源连接,这在处理多个数据库或者分库分表的场景中非常实用。 首先,Spring Boot提供了自动配置功能,使得我们...

    dynamic-datasource-aop-loop.rar

    动态数据源切换则是多数据源的一种高级应用,它允许在运行时动态地改变数据源,这样可以灵活应对业务变化,比如读写分离、分库分表等场景。在SpringBoot中,我们可以通过实现`AbstractRoutingDataSource`或使用第三...

    java多数据源代码实例

    4. 负载均衡:通过分库分表,可以将请求分散到多个数据库,降低单个数据库的压力。 二、Java多数据源实现方式 1. 动态切换数据源:在运行时根据业务需求动态选择合适的数据源,常见的实现方式是使用AOP(面向切面...

    数据源切换

    例如,可以轻松实现读写分离、故障切换、分库分表等高级功能。在给定的"demo"文件中,可能包含了实现这一功能的具体代码和配置示例,通过学习和研究这些代码,可以更好地理解和掌握数据源切换的实现方法。

    Spring+MyBatis多数据源配置实现

    而在多数据源环境中,一个应用可以同时连接并操作多个不同的数据库,这对于处理分布式系统或需要分库分表的场景非常有用。 1. **配置多数据源** - **DataSource配置**:Spring提供了AbstractRoutingDataSource抽象...

    data-demo:主从,分表分库

    master-slave环境:mysql、mybatis、springboot 、tkmapper1、通过MultiDataSourceConfig 创建多数据源指定@Primary初始数据库,以防报错2、通过RoutingDataSource 实现数据库路由实现AbstractRoutingDataSource接口...

    springboot整合项目

    - **分库分表**:通过多数据源实现不同业务数据的分库,提高系统性能。 - **读写分离**:主库负责写操作,从库负责读操作,提高系统读取效率。 - **数据库版本管理**:在不同环境中使用不同数据源,便于进行数据库...

Global site tag (gtag.js) - Google Analytics