`
gaojingsong
  • 浏览: 1201737 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
文章分类
社区版块
存档分类
最新评论

【Mycat1.6之加载Schema的table 源码解读(一)】

阅读更多

一、schema属性介绍

<schema name="TESTDB" checkSQLschema="false" sqlMaxLimit="100" dataNode="">

<table name="tab" dataNode="dn1,dn2,dn3" rule="auto-sharding-long" />

<table name="user1" dataNode="dn1,dn2,dn3" rule="auto-sharding-long" />

<table name="user"  dataNode="dn1,dn2,dn3" rule="auto-sharding-long"

nameSuffix=""   primaryKey=""   autoIncrement=""   needAddLimit=""   type=""   ruleRequired=""   subTables=""/>

</schema>

 

primaryKey记录主键,用于之后路由分析,以及启用自增长主键

nameSuffix路由, 增加对动态日期表的支持

autoIncrement记录是否主键自增,默认不是,(启用全局sequence handler)

needAddLimit记录是否需要加返回结果集限制,默认需要加

type记录type,是否为global

ruleRequired记录是否绑定有分片规则

subTables分表功能

 

二、源码解读

private void load(String dtdFile, String xmlFile) {
	InputStream dtd = null;
	InputStream xml = null; 
	dtd = XMLSchemaLoader.class.getResourceAsStream(dtdFile);
	xml = XMLSchemaLoader.class.getResourceAsStream(xmlFile);
	Element root = ConfigUtil.getDocument(dtd, xml).getDocumentElement();
	//先加载所有的DataHost
	loadDataHosts(root);
	//再加载所有的DataNode
	loadDataNodes(root);
	//最后加载所有的Schema
	loadSchemas(root);	 
}

<schema name="TESTDB" checkSQLschema="false" sqlMaxLimit="100" dataNode="">
	<table name="tab" dataNode="dn1,dn2,dn3" rule="auto-sharding-long" />
	<table name="user1" dataNode="dn1,dn2,dn3" rule="auto-sharding-long" />
	<table name="user"  dataNode="dn1,dn2,dn3" rule="auto-sharding-long" />
</schema>
private void loadSchemas(Element root) {
	NodeList list = root.getElementsByTagName("schema");
	for (int i = 0, n = list.getLength(); i < n; i++) {
		Element schemaElement = (Element) list.item(i);
		//读取各个属性
		String name = schemaElement.getAttribute("name");
		String dataNode = schemaElement.getAttribute("dataNode");
		String checkSQLSchemaStr = schemaElement.getAttribute("checkSQLschema");
		String sqlMaxLimitStr = schemaElement.getAttribute("sqlMaxLimit");
		int sqlMaxLimit = -1;
		//读取sql返回结果集限制
		if (sqlMaxLimitStr != null && !sqlMaxLimitStr.isEmpty()) {
			sqlMaxLimit = Integer.parseInt(sqlMaxLimitStr);
		}
		
		// check dataNode already exists or not,看schema标签中是否有datanode
		String defaultDbType = null;
		//校验检查并添加dataNode
		if (dataNode != null && !dataNode.isEmpty()) {
			List dataNodeLst = new ArrayList(1);
			dataNodeLst.add(dataNode);
			checkDataNodeExists(dataNodeLst);
			String dataHost = dataNodes.get(dataNode).getDataHost();
			defaultDbType = dataHosts.get(dataHost).getDbType();
		} else {
			dataNode = null;
		}
		//加载schema下所有tables
		Map<String, TableConfig> tables = loadTables(schemaElement);
		//判断schema是否重复
		if (schemas.containsKey(name)) {
			throw new ConfigException("schema " + name + " duplicated!");
		}

		// 设置了table的不需要设置dataNode属性,没有设置table的必须设置dataNode属性
		if (dataNode == null && tables.size() == 0) {
			throw new ConfigException(
					"schema " + name + " didn't config tables,so you must set dataNode property!");
		}

		SchemaConfig schemaConfig = new SchemaConfig(name, dataNode,
				tables, sqlMaxLimit, "true".equalsIgnoreCase(checkSQLSchemaStr));

		//设定DB类型,这对之后的sql语句路由解析有帮助
		if (defaultDbType != null) {
			schemaConfig.setDefaultDataNodeDbType(defaultDbType);
			if (!"mysql".equalsIgnoreCase(defaultDbType)) {
				schemaConfig.setNeedSupportMultiDBType(true);
			}
		}

		// 判断是否有不是mysql的数据库类型,方便解析判断是否启用多数据库分页语法解析
		for (TableConfig tableConfig : tables.values()) {
			if (isHasMultiDbType(tableConfig)) {
				schemaConfig.setNeedSupportMultiDBType(true);
				break;
			}
		}
		//记录每种dataNode的DB类型
		Map<String, String> dataNodeDbTypeMap = new HashMap<>();
		for (String dataNodeName : dataNodes.keySet()) {
			DataNodeConfig dataNodeConfig = dataNodes.get(dataNodeName);
			String dataHost = dataNodeConfig.getDataHost();
			DataHostConfig dataHostConfig = dataHosts.get(dataHost);
			if (dataHostConfig != null) {
				String dbType = dataHostConfig.getDbType();
				dataNodeDbTypeMap.put(dataNodeName, dbType);
			}
		}
		schemaConfig.setDataNodeDbTypeMap(dataNodeDbTypeMap);
		schemas.put(name, schemaConfig);
	}
}
private Map<String, TableConfig> loadTables(Element node) {
	// Map<String, TableConfig> tables = new HashMap<String, TableConfig>();
	// 支持表名中包含引号[`] BEN GONG
	Map<String, TableConfig> tables = new TableConfigMap();
	NodeList nodeList = node.getElementsByTagName("table");
	for (int i = 0; i < nodeList.getLength(); i++) {
		Element tableElement = (Element) nodeList.item(i);
		String tableNameElement = tableElement.getAttribute("name").toUpperCase();

		//TODO:路由, 增加对动态日期表的支持
		String tableNameSuffixElement = tableElement.getAttribute("nameSuffix").toUpperCase();
		if ( !"".equals( tableNameSuffixElement ) ) {				
			
			if( tableNameElement.split(",").length > 1 ) {
				throw new ConfigException("nameSuffix " + 
tableNameSuffixElement + ", require name parameter cannot multiple breaks!");
			}
			//前缀用来标明日期格式
			tableNameElement = doTableNameSuffix(tableNameElement, tableNameSuffixElement);
		}
		//记录主键,用于之后路由分析,以及启用自增长主键
		String[] tableNames = tableNameElement.split(",");
		String primaryKey = tableElement.hasAttribute("primaryKey") ? 
tableElement.getAttribute("primaryKey").toUpperCase() : null;
		//记录是否主键自增,默认不是,(启用全局sequence handler)
		boolean autoIncrement = false;
		if (tableElement.hasAttribute("autoIncrement")) {
			autoIncrement = Boolean.parseBoolean(tableElement.getAttribute("autoIncrement"));
		}
		//记录是否需要加返回结果集限制,默认需要加
		boolean needAddLimit = true;
		if (tableElement.hasAttribute("needAddLimit")) {
			needAddLimit = Boolean.parseBoolean(tableElement.getAttribute("needAddLimit"));
		}
		//记录type,是否为global
		String tableTypeStr = tableElement.hasAttribute("type") ? 
tableElement.getAttribute("type") : null;
		int tableType = TableConfig.TYPE_GLOBAL_DEFAULT;
		if ("global".equalsIgnoreCase(tableTypeStr)) {
			tableType = TableConfig.TYPE_GLOBAL_TABLE;
		}
		//记录dataNode,就是分布在哪些dataNode上
		String dataNode = tableElement.getAttribute("dataNode");
		TableRuleConfig tableRule = null;
		if (tableElement.hasAttribute("rule")) {
			String ruleName = tableElement.getAttribute("rule");
			tableRule = tableRules.get(ruleName);
			if (tableRule == null) {
				throw new ConfigException("rule " + ruleName + " is not found!");
			}
		}
		
		boolean ruleRequired = false;
		//记录是否绑定有分片规则
		if (tableElement.hasAttribute("ruleRequired")) {
			ruleRequired = Boolean.parseBoolean(tableElement.getAttribute("ruleRequired"));
		}

		if (tableNames == null) {
			throw new ConfigException("table name is not found!");
		}
		//distribute函数,重新编排dataNode
		String distPrex = "distribute(";
		boolean distTableDns = dataNode.startsWith(distPrex);
		if (distTableDns) {
			dataNode = dataNode.substring(distPrex.length(), dataNode.length() - 1);
		}
		//分表功能
		String subTables = tableElement.getAttribute("subTables");
		
		for (int j = 0; j < tableNames.length; j++) {

			String tableName = tableNames[j];
			TableRuleConfig	tableRuleConfig=tableRule ;
			  if(tableRuleConfig!=null) {
				//对于实现TableRuleAware的function进行特殊处理  根据每个表新建个实例
				  RuleConfig rule= tableRuleConfig.getRule();
				  if(rule.getRuleAlgorithm() instanceof TableRuleAware)  {
					  tableRuleConfig = (TableRuleConfig) ObjectUtil.copyObject(tableRuleConfig);
					  tableRules.remove(tableRuleConfig.getName())   ;
					  String newRuleName = tableRuleConfig.getName() + "_" + tableName;
					  tableRuleConfig. setName(newRuleName);
					  TableRuleAware tableRuleAware= (TableRuleAware) 
tableRuleConfig.getRule().getRuleAlgorithm();
					  tableRuleAware.setRuleName(newRuleName);
					  tableRuleAware.setTableName(tableName);
					  tableRuleConfig.getRule().getRuleAlgorithm().init();
					  tableRules.put(newRuleName,tableRuleConfig);
				  }
			  }

			TableConfig table = new TableConfig(tableName, primaryKey,
					autoIncrement, needAddLimit, tableType, dataNode,
					getDbType(dataNode),
					(tableRuleConfig != null) ? tableRuleConfig.getRule() : null,
					ruleRequired, null, false, null, null,subTables);
			
			checkDataNodeExists(table.getDataNodes());
			// 检查分片表分片规则配置是否合法
			if(table.getRule() != null) {
				checkRuleSuitTable(table);
			}
			
			if (distTableDns) {
				distributeDataNodes(table.getDataNodes());
			}
			//检查去重
			if (tables.containsKey(table.getName())) {
				throw new ConfigException("table " + tableName + " duplicated!");
			}
			//放入map
			tables.put(table.getName(), table);
		}
		//只有tableName配置的是单个表(没有逗号)的时候才能有子表
		if (tableNames.length == 1) {
			TableConfig table = tables.get(tableNames[0]);
			// process child tables  递归调用
			processChildTables(tables, table, dataNode, tableElement);
		}
	}
	return tables;
}
	
	
/**
* 处理动态日期表, 支持 YYYYMM、YYYYMMDD 两种格式
* 
* YYYYMM格式: 	  yyyymm,2015,01,60   
* YYYYMMDD格式:  yyyymmdd,2015,01,10,50
* 
* @param tableNameElement
* @param tableNameSuffixElement
 * @return
 */
private String doTableNameSuffix(String tableNameElement, String tableNameSuffixElement) {
		
	String newTableName = tableNameElement;
	
	String[] params = tableNameSuffixElement.split(",");			
	String suffixFormat = params[0].toUpperCase();		
	if ( suffixFormat.equals("YYYYMM") ) {
		
		//读取参数
		int yyyy = Integer.parseInt( params[1] );
		int mm = Integer.parseInt( params[2] );					
		int mmEndIdx =  Integer.parseInt( params[3] );
		
		//日期处理
		SimpleDateFormat yyyyMMSDF = new SimpleDateFormat("yyyyMM"); 
		
		Calendar cal = Calendar.getInstance();
		cal.set(Calendar.YEAR, yyyy );
		cal.set(Calendar.MONTH, mm - 1 );
		cal.set(Calendar.DATE, 0 );
		
		//表名改写
		StringBuffer tableNameBuffer = new StringBuffer();
		for(int mmIdx = 0; mmIdx <= mmEndIdx; mmIdx++) {						
			tableNameBuffer.append( tableNameElement );
			tableNameBuffer.append( yyyyMMSDF.format(cal.getTime()) );							
			cal.add(Calendar.MONTH, 1);
			
			if ( mmIdx != mmEndIdx) {
				tableNameBuffer.append(",");
			}						
		}
                //逗号分隔的一批表名字A1,B2,C3					
		newTableName = tableNameBuffer.toString();

	} else if ( suffixFormat.equals("YYYYMMDD") ) {
		
		//读取参数
		int yyyy = Integer.parseInt( params[1] );
		int mm = Integer.parseInt( params[2] );
		int dd =  Integer.parseInt( params[3] );
		int ddEndIdx =  Integer.parseInt( params[4] );
		
		//日期处理
		SimpleDateFormat yyyyMMddSDF = new SimpleDateFormat("yyyyMMdd"); 
		
		Calendar cal = Calendar.getInstance();
		cal.set(Calendar.YEAR, yyyy );
		cal.set(Calendar.MONTH, mm - 1 );
		cal.set(Calendar.DATE, dd );
		
		//表名改写
		StringBuffer tableNameBuffer = new StringBuffer();
		for(int ddIdx = 0; ddIdx <= ddEndIdx; ddIdx++) {					
			tableNameBuffer.append( tableNameElement );
			tableNameBuffer.append( yyyyMMddSDF.format(cal.getTime()) );
			
			cal.add(Calendar.DATE, 1);	
			
			if ( ddIdx != ddEndIdx) {
				tableNameBuffer.append(",");
			}
		}
               //逗号分隔的一批表名字A1,B2,C3					
		newTableName = tableNameBuffer.toString();
	}				
	return newTableName;		
}

 

三、动态日期表演示



<schema name="TESTDB" checkSQLschema="false" sqlMaxLimit="100">

<table name="tab" dataNode="dn1,dn2,dn3" rule="auto-sharding-long" />

<table name="user1" dataNode="dn1,dn2,dn3" rule="auto-sharding-long" />

<table name="user"  dataNode="dn1,dn2,dn3" rule="auto-sharding-long" />

 

<!-- 2015年-1月即2014年的12月开始,累计到24个月-->

<table name="test"  dataNode="dn1,dn2,dn3" rule="auto-sharding-long" nameSuffix="yyyymm,2015,01,24" />

<!-- 2015年1月1号开始,累计到100天  -->

<table name="demo"  dataNode="dn1,dn2,dn3" rule="auto-sharding-long" nameSuffix="yyyymmdd,2015,01,01,100" />

</schema>

  • 大小: 100.5 KB
0
1
分享到:
评论

相关推荐

    Mycat1.6源码

    《深入剖析Mycat 1.6源码》 Mycat是一款开源的分布式数据库中间件,它在Java平台上运行,旨在解决大数据分布式存储和处理的...对于想要涉足分布式数据库领域的开发者来说,Mycat 1.6源码无疑是一份宝贵的参考资料。

    Mycat 1.6 稳定版-linux环境 gz包.7z

    《Mycat 1.6 稳定版在Linux环境下的部署与应用》 Mycat是一款开源的、基于Java开发的分布式数据库中间件,主要用于解决大数据量、高并发场景下的数据库处理问题。它实现了MySQL协议,可以作为MySQL的一个高性能、高...

    【Mycat1.6之操作SQLServer案例】

    【Mycat1.6与SQLServer操作案例详解】 Mycat是一款开源的分布式数据库中间件,它在大型分布式系统中扮演着数据库分片的角色,能够有效地解决单个数据库性能瓶颈的问题。Mycat 1.6是其一个重要版本,提供了更稳定、...

    mycat1.6windows+linux.zip

    mycat1.6windows+linux.zip, 我就很不理解,为什么好多人上传的资料,下载都需要积分。0积分能咋地?官网能访问但是下载不了,后再巧合下在另外一个网站上下载的,我在这里给大家分享下。不需要积分也不需要花钱,...

    mycat1.6.7.1.rar

    总结,mycat1.6.7.1版本作为一款强大的数据库中间件,为大数据时代的分布式数据库解决方案提供了强大支持。其在Linux上的稳定运行,以及丰富的分片策略、事务管理机制和监控手段,使得Mycat成为应对海量数据挑战的...

    mycat1.6jar包反编译的源码

    《深入解析mycat1.6源码:一次技术探索之旅》 Mycat,作为一款开源的分布式数据库中间件,广泛应用于大型分布式系统中,它实现了数据分片、读写分离、故障切换等功能,为高并发、大数据量的场景提供了优秀的解决...

    mycat1.6.7.5.zip

    2. **解压Mycat**:解压mycat1.6.7.5.zip,并将其移动到一个合适的目录,如/usr/local/mycat。 3. **配置Mycat**:修改Mycat目录下的conf目录中的server.xml、schema.xml、rule.xml等配置文件,定义数据源、分片...

    mycat-server 1.6 源码包 可直接运行

    《深入解析mycat-server 1.6:源码探索与实战指南》 Mycat-Server 1.6 是一个开源的分布式数据库中间件,它主要用于解决大数据环境下高并发、高性能的问题,尤其在分库分表场景下表现卓越。这款源码包的特点是可...

    Mycat1.6 安装步骤

    ### Mycat 1.6 安装步骤详解 #### 一、Mycat简介与应用场景 Mycat作为一款开源的数据库中间件,主要用于解决大型系统中的数据分库分表问题,通过它能够将大量的数据分散到多个物理数据库中,以此来提升系统的并发...

    mycat-1.6.7.6_BYMONTH.zip

    基于MyCat1.6.7.6正式版的源码修改的,支持subTables的按月分表正则配置 subTables=“ tableName_$202101-?” subTableWay="BYMONTH" rule="sharding-by-month" 表示从202101月份开始进行分表处理,?表示当前日期的...

    Mycat-Server-1.6(源码)

    3. **Config模块**:配置管理模块,用于加载和管理Mycat的配置文件,如schema.xml、server.xml等。这些配置文件定义了数据库连接信息、分片规则、服务器设置等。 4. **Heartbeat模块**:心跳检测模块,定期检测...

    Mycat-1.6.7.3.zip

    其中,`conf`目录包含了一系列配置文件,如`schema.xml`,这是Mycat的核心配置文件,定义了数据节点、库、表的映射关系以及分片规则。我们需要在此文件中修改数据库的密码,确保Mycat能够正确连接到后端的数据库...

    Mycat 1.6权威指南.pdf和1.5的word版

    其次,Mycat的路由机制是其核心之一。1.5版中,路由规则根据SQL语句进行解析,将请求定向到正确的数据节点。1.6版则进一步提升了路由效率,并且增加了对复杂查询的支持,如子查询和联接查询。同时,1.6版本还加强了...

    mycat1.6.7.4

    总的来说,Mycat1.6.7.4是一款强大的数据库中间件,它通过分布式技术解决了大数据量下的存储和处理难题,为企业级应用提供了高效、稳定的数据库解决方案。无论是在电商、社交还是数据分析等领域,Mycat都能发挥其...

    linux的Mycat安装包1.6.7.4版本

    在IT行业中,Linux系统因其稳定性和安全性而广泛应用于服务器领域,而Mycat则是一款针对大数据分布式处理的开源数据库中间件,它为大型网站和企业提供了高性能的数据分片解决方案。本篇文章将详细介绍如何在Linux...

    MySql 中间件 Mycat 1.6.7.5 - release- win.tar windows 程序

    在本压缩包中,我们获得了Mycat 1.6.7.5的Windows版本,这是一款专为Windows环境设计的程序,用于在MySQL数据库上构建高效的数据处理架构。 **Mycat核心特性:** 1. **读写分离**:Mycat可以自动将读操作路由到从...

    mycat 1.6 win和linux版

    2. **分库分表**:Mycat的核心功能之一就是实现数据库的横向扩展,通过分库分表策略,将大表拆分成多个小表,分布在不同的数据库实例上,提高数据读写性能。 3. **SQL路由**:Mycat支持自定义SQL解析规则,可以根据...

    解决mycatJDBC8驱动连接Mycat1.6报错 Unknown system variable 'query_cache_size'

    标题中的问题涉及到的是在使用Mycat数据中间件时,尝试使用JDBC 8驱动连接到Mycat 1.6版本时遇到的一个错误:`Unknown system variable 'query_cache_size'`。这个问题出现的原因在于MySQL 8.0版本中移除了`query_...

    【Mycat1.6之操作Oracle案例】

    【Mycat1.6与Oracle操作案例详解】 在IT行业中,数据库管理是核心部分,尤其是在大型企业级应用中。Mycat是一个开源的分布式数据库中间件,它提供了数据库分片、读写分离、故障切换等功能,适用于高并发、大数据量...

    mycat1.6jar包

    当mycat的jar包的版本过低时,替换这个可解决问题.

Global site tag (gtag.js) - Google Analytics