`
gaojingsong
  • 浏览: 1182739 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
文章分类
社区版块
存档分类
最新评论

【Mycat1.6之加载Schema的DataHosts完整属性源码解读】

阅读更多

一、Mycat1.6只Schema加载DataHosts完整属性源码解读

        private final static String DEFAULT_DTD = "/schema.dtd";
	private final static String DEFAULT_XML = "/schema.xml";

	public XMLSchemaLoader(String schemaFile, String ruleFile) {
		//先读取rule.xml
		XMLRuleLoader ruleLoader = new XMLRuleLoader(ruleFile);
		//将tableRules拿出,用于这里加载Schema做rule有效判断,以及之后的分片路由计算
		this.tableRules = ruleLoader.getTableRules();
		//释放ruleLoader
		ruleLoader = null;
		this.dataHosts = new HashMap<String, DataHostConfig>();
		this.dataNodes = new HashMap<String, DataNodeConfig>();
		this.schemas = new HashMap<String, SchemaConfig>();
		//读取加载schema配置
		this.load(DEFAULT_DTD, schemaFile == null ? DEFAULT_XML : schemaFile);
	}
	
	//加载顺序:主机/节点/ Schema
	private void load(String dtdFile, String xmlFile) {
		InputStream dtd = null;
		InputStream xml = null;
		dtd = XMLSchemaLoader.class.getResourceAsStream(dtdFile);
		xml = XMLSchemaLoader.class.getResourceAsStream(xmlFile);
		Element root = ConfigUtil.getDocument(dtd, xml).getDocumentElement();
		//先加载所有的DataHost
		loadDataHosts(root);
		//再加载所有的DataNode
		loadDataNodes(root);
		//最后加载所有的Schema
		loadSchemas(root);
		 
	}
	 
       /**
<dataHost name="localhost1_gaojingsong1" maxCon="1000" minCon="1" balance="0" writeType="0" 
dbType="db2" dbDriver="jdbc" switchType="-1" slaveThreshold="" tempReadHostAvailable=""  
filters="" logTime="">
<heartbeat>select 1 from sysibm.sysdummy1</heartbeat>
<connectionInitSql>alter session set nls_date_format='yyyy-mm-dd hh24:mi:ss'
</connectionInitSql>
<writeHost host="hostM1" url="jdbc:db2://19:50000/TEST001" user="db2inst1" password="123456" >
</writeHost> 
</dataHost>
	*/
	private void loadDataHosts(Element root) {
		NodeList list = root.getElementsByTagName("dataHost");
		for (int i = 0, n = list.getLength(); i < n; ++i) {
			
			Element element = (Element) list.item(i);
			String name = element.getAttribute("name");
			//判断是否重复
			if (dataHosts.containsKey(name)) {
				throw new ConfigException("dataHost name " + name + "duplicated!");
			}
			//读取最大连接数
			int maxCon = Integer.parseInt(element.getAttribute("maxCon"));
			//读取最小连接数
			int minCon = Integer.parseInt(element.getAttribute("minCon"));
	/**
	* 读取负载均衡配置
	* 1. balance="0", 不开启分离机制,所有读操作都发送到当前可用的 writeHost 上。
	* 2. balance="1",全部的 readHost 和 stand by writeHost 参不 select 的负载均衡
	* 3. balance="2",所有读操作都随机的在 writeHost、readhost 上分发。
	* 4. balance="3",所有读请求随机的分发到 wiriterHost 对应的 readhost 执行,writerHost 不负担读压力
	 */
			int balance = Integer.parseInt(element.getAttribute("balance"));
			/**
			 * 读取切换类型
			 * -1 表示不自动切换
			 * 1 默认值,自动切换
			 * 2 基于MySQL主从同步的状态决定是否切换
			 * 心跳询句为 show slave status
			 * 3 基于 MySQL galary cluster 的切换机制
			 */
			String switchTypeStr = element.getAttribute("switchType");
	int switchType = switchTypeStr.equals("") ? -1 :Integer.parseInt(switchTypeStr);
			//读取从延迟界限
			String slaveThresholdStr = element.getAttribute("slaveThreshold");
	int slaveThreshold = slaveThresholdStr.equals("") ? -1 : Integer.parseInt(slaveThresholdStr);
			
    //如果 tempReadHostAvailable 设置大于 0 则表示写主机如果挂掉,临时的读服务依然可用
     String tempReadHostAvailableStr = element.getAttribute("tempReadHostAvailable");
     boolean tempReadHostAvailable = !tempReadHostAvailableStr.equals("")
 && Integer.parseInt(tempReadHostAvailableStr) > 0;
			/**
			 * 读取 写类型
			 * 这里只支持 0 - 所有写操作仅配置的第一个 writeHost
			 */
			String writeTypStr = element.getAttribute("writeType");
int writeType = "".equals(writeTypStr) ? PhysicalDBPool.WRITE_ONLYONE_NODE : Integer.parseInt(writeTypStr);


			String dbDriver = element.getAttribute("dbDriver");
			String dbType = element.getAttribute("dbType");
			String filters = element.getAttribute("filters");
			String logTimeStr = element.getAttribute("logTime");
long logTime = "".equals(logTimeStr) ? PhysicalDBPool.LONG_TIME : Long.parseLong(logTimeStr) ;
//读取心跳语句
String heartbeatSQL = element.getElementsByTagName("heartbeat").item(0).getTextContent();
//读取 初始化sql配置,用于oracle
NodeList connectionInitSqlList = element.getElementsByTagName("connectionInitSql");
			String initConSQL = null;
			if (connectionInitSqlList.getLength() > 0) {
				initConSQL = connectionInitSqlList.item(0).getTextContent();
			}
	//读取writeHost
	/**
	<writeHost host="hostM1" url="localhost:3306" user="root"
		 password="123456" usingDecrypt="" weight="">
	<!-- can have multi read hosts -->
	<readHost host="hostS2" url="192.168.1.200:3306" user="root" password="xxx" />
	</writeHost>
	*/
	NodeList writeNodes = element.getElementsByTagName("writeHost");
	DBHostConfig[] writeDbConfs = new DBHostConfig[writeNodes.getLength()];
	Map<Integer, DBHostConfig[]> readHostsMap = new HashMap<Integer, DBHostConfig[]>(2);
			for (int w = 0; w < writeDbConfs.length; w++) {
				Element writeNode = (Element) writeNodes.item(w);
writeDbConfs[w] = createDBHostConf(name, writeNode, dbType,dbDriver, maxCon, minCon,filters,logTime);
				NodeList readNodes = writeNode.getElementsByTagName("readHost");
				//读取对应的每一个readHost
				if (readNodes.getLength() != 0) {
DBHostConfig[] readDbConfs = new DBHostConfig[readNodes.getLength()];
					for (int r = 0; r < readDbConfs.length; r++) {
						Element readNode = (Element) readNodes.item(r);
readDbConfs[r] = createDBHostConf(name,readNode, dbType, dbDriver, maxCon, minCon,filters, logTime);
					}
					readHostsMap.put(w, readDbConfs);
				}
			}

DataHostConfig hostConf = new DataHostConfig(name, dbType, dbDriver, writeDbConfs, 
readHostsMap, switchType, slaveThreshold, tempReadHostAvailable);		
			
			hostConf.setMaxCon(maxCon);
			hostConf.setMinCon(minCon);
			hostConf.setBalance(balance);
			hostConf.setWriteType(writeType);
			hostConf.setHearbeatSQL(heartbeatSQL);
			hostConf.setConnectionInitSql(initConSQL);
			hostConf.setFilters(filters);
			hostConf.setLogTime(logTime);
			dataHosts.put(hostConf.getName(), hostConf);
		}
	}
	
	private DBHostConfig createDBHostConf(String dataHost, Element node,
String dbType, String dbDriver, int maxCon, int minCon, 
String filters, long logTime) {
		
		String nodeHost = node.getAttribute("host");
		String nodeUrl = node.getAttribute("url");
		String user = node.getAttribute("user");
		String password = node.getAttribute("password");
		String usingDecrypt = node.getAttribute("usingDecrypt");
		String passwordEncryty= DecryptUtil.DBHostDecrypt(usingDecrypt, 
nodeHost, user, password);
		
		String weightStr = node.getAttribute("weight");
		int weight = "".equals(weightStr) ? PhysicalDBPool.WEIGHT :
 Integer.parseInt(weightStr) ;
		
		String ip = null;
		int port = 0;
		if (empty(nodeHost) || empty(nodeUrl) || empty(user)) {
			throw new ConfigException(
					"dataHost "
							+ dataHost
		+ " define error,some attributes of this element is empty: "
							+ nodeHost);
		}
		if ("native".equalsIgnoreCase(dbDriver)) {
			int colonIndex = nodeUrl.indexOf(':');
			ip = nodeUrl.substring(0, colonIndex).trim();
			port = Integer.parseInt(nodeUrl.substring(colonIndex + 1).trim());
		} else {
			URI url;
			try {
				url = new URI(nodeUrl.substring(5));
			} catch (Exception e) {
				throw new ConfigException("invalid jdbc url " + nodeUrl + 
" of " + dataHost);
			}
			ip = url.getHost();
			port = url.getPort();
		}

		DBHostConfig conf = new DBHostConfig(nodeHost, ip, port, nodeUrl, user,
 passwordEncryty,password);
		conf.setDbType(dbType);
		conf.setMaxCon(maxCon);
		conf.setMinCon(minCon);
		conf.setFilters(filters);
		conf.setLogTime(logTime);
		conf.setWeight(weight); 	//新增权重
		return conf;
	}
	
	public static String DBHostDecrypt(String usingDecrypt,String host,
String user ,String passwrod){
		if("1".equals(usingDecrypt)){
			//type:host:user:password
        	//1:myhost1:test:test
        	boolean flag = false;
        	try {
        		String passwrods[] = DecryptUtil.decrypt(passwrod).split(":");
            	if("1".equals(passwrods[0]) && host.equals(passwrods[1]) && 
user.equals(passwrods[2])){
            		return passwrods[3];
            	}
            	if(flag==false){
            		 throw new ConfigException("user " + user + " passwrod need to decrype ,but decrype password is wrong !");
            	}
        	} catch (Exception e2) {
       		    throw new ConfigException("host " + host + ",user " + user + " passwrod need to decrype ,but decrype password is wrong !",e2);
			}
		}
		return passwrod;
	}

二、总结:

dataHost 完整配置如下:

/**

<dataHost name="localhost1_gaojingsong1" maxCon="1000" minCon="1" balance="0" writeType="0" 

dbType="db2" dbDriver="jdbc" switchType="-1" slaveThreshold="" tempReadHostAvailable=""  filters="" logTime="">

<heartbeat>select 1 from sysibm.sysdummy1</heartbeat>

<connectionInitSql>alter session set nls_date_format='yyyy-mm-dd hh24:mi:ss'</connectionInitSql>

<writeHost host="hostM1" url="jdbc:db2://192.168.8.129:50000/TEST001" user="db2inst1" password="123456" >

</writeHost> 

</dataHost>

*/

 

 

//读取writeHost

/**

<writeHost host="hostM1" url="localhost:3306" user="root"

 password="123456" usingDecrypt="" weight="">

<!-- can have multi read hosts -->

<readHost host="hostS2" url="192.168.1.200:3306" user="root" password="xxx"   usingDecrypt="" weight=""/>

</writeHost>

*/

 

备注:

writeType 这里只支持 0 - 所有写操作仅配置的第一个 writeHost

/** * balance读取负载均衡配置

* 1. balance="0", 不开启分离机制,所有读操作都发送到当前可用的 writeHost 上。

* 2. balance="1",全部的 readHost 和 stand by writeHost 参不 select 的负载均衡

* 3. balance="2",所有读操作都随机的在 writeHost、readhost 上分发。

* 4. balance="3",所有读请求随机的分发到 wiriterHost 对应的 readhost 执行,

writerHost 不负担读压力

*/

 

/**

* switchType读取切换类型

* -1 表示不自动切换 * 1 默认值,自动切换

* 2 基于MySQL主从同步的状态决定是否切换

* 心跳询句为 show slave status

* 3 基于 MySQL galary cluster 的切换机制

*/

 

tempReadHostAvailable 设置大于 0 则表示写主机如果挂掉, 临时的读服务依然可用 

0
2
分享到:
评论

相关推荐

    【Mycat1.6之操作SQLServer案例】

    【Mycat1.6与SQLServer操作案例详解】 Mycat是一款开源的分布式数据库中间件,它在大型分布式系统中扮演着数据库分片的角色,能够有效地解决单个数据库性能瓶颈的问题。Mycat 1.6是其一个重要版本,提供了更稳定、...

    mycat1.6windows+linux.zip

    mycat1.6windows+linux.zip, 我就很不理解,为什么好多人上传的资料,下载都需要积分。0积分能咋地?官网能访问但是下载不了,后再巧合下在另外一个网站上下载的,我在这里给大家分享下。不需要积分也不需要花钱,...

    Mycat1.6源码

    《深入剖析Mycat 1.6源码》 Mycat是一款开源的分布式数据库中间件,它在Java平台上运行,旨在解决大数据分布式存储和处理的问题。Mycat 1.6版本是其一个重要的里程碑,它在前一版本的基础上进行了一系列的优化和...

    Mycat 1.6 稳定版-linux环境 gz包.7z

    《Mycat 1.6 稳定版在Linux环境下的部署与应用》 Mycat是一款开源的、基于Java开发的分布式数据库中间件,主要用于解决大数据量、高并发场景下的数据库处理问题。它实现了MySQL协议,可以作为MySQL的一个高性能、高...

    mycat1.6jar包反编译的源码

    《深入解析mycat1.6源码:一次技术探索之旅》 Mycat,作为一款开源的分布式数据库中间件,广泛应用于大型分布式系统中,它实现了数据分片、读写分离、故障切换等功能,为高并发、大数据量的场景提供了优秀的解决...

    mycat1.6.7.5.zip

    在本压缩包"mycat1.6.7.5.zip"中,包含了在Linux环境下安装Mycat 1.6.7.5所需的所有关键组件,尤其是Java Development Kit (JDK) 1.8的安装文件,因为Mycat运行在Java平台上,所以JDK是其运行的前提。 首先,让我们...

    mycat1.6.7.1.rar

    《Mycat数据库中间件详解——基于mycat1.6.7.1版本》 Mycat是一款开源的、基于Java开发的分布式数据库中间件,主要用于解决大数据量、高并发的分布式数据库架构问题。在本篇文章中,我们将深入探讨mycat1.6.7.1版本...

    Mycat1.6 安装步骤

    ### Mycat 1.6 安装步骤详解 #### 一、Mycat简介与应用场景 Mycat作为一款开源的数据库中间件,主要用于解决大型系统中的数据分库分表问题,通过它能够将大量的数据分散到多个物理数据库中,以此来提升系统的并发...

    Mycat-1.6.7.3.zip

    其中,`conf`目录包含了一系列配置文件,如`schema.xml`,这是Mycat的核心配置文件,定义了数据节点、库、表的映射关系以及分片规则。我们需要在此文件中修改数据库的密码,确保Mycat能够正确连接到后端的数据库...

    Mycat 1.6权威指南.pdf和1.5的word版

    《Mycat 1.6权威指南》与《Mycat 1.5权威指南》是两本关于Mycat数据库中间件的重要参考资料,分别提供了详细的1.6和1.5版本的技术信息和实践指导。Mycat作为一款开源的分布式数据库系统,它解决了大数据量下的高并发...

    mycat-1.6.7.6_BYMONTH.zip

    基于MyCat1.6.7.6正式版的源码修改的,支持subTables的按月分表正则配置 subTables=“ tableName_$202101-?” subTableWay="BYMONTH" rule="sharding-by-month" 表示从202101月份开始进行分表处理,?表示当前日期的...

    mycat1.6.7.4

    【Mycat1.6.7.4:数据库中间件详解】 Mycat,作为一款强大的数据库中间件,它的出现是对阿里巴巴开源项目Cobar的一次革新与超越。Cobar是MySQL集群的一种解决方案,但随着时间的发展,社区发现它存在一些局限性,...

    linux的Mycat安装包1.6.7.4版本

    本篇文章将详细介绍如何在Linux环境下安装Mycat的1.6.7.4版本。 首先,了解Mycat的基本概念。Mycat是基于Java开发的,它的主要功能包括数据分片、读写分离、故障切换以及SQL路由等。通过数据分片,Mycat可以将大...

    mycat-server 1.6 源码包 可直接运行

    《深入解析mycat-server 1.6:源码探索与实战指南》 Mycat-Server 1.6 是一个开源的分布式数据库中间件,它主要用于解决大数据环境下高并发、高性能的问题,尤其在分库分表场景下表现卓越。这款源码包的特点是可...

    MySql 中间件 Mycat 1.6.7.5 - release- win.tar windows 程序

    3. **分布式事务**:Mycat实现了X/Open XA标准的分布式事务管理,保证了数据的一致性和完整性。 4. **SQL路由**:Mycat能解析SQL语句,并根据预设的规则将其路由到正确的数据节点执行。 5. **高可用性**:Mycat...

    Mycat-Server-1.6(源码)

    3. **Config模块**:配置管理模块,用于加载和管理Mycat的配置文件,如schema.xml、server.xml等。这些配置文件定义了数据库连接信息、分片规则、服务器设置等。 4. **Heartbeat模块**:心跳检测模块,定期检测...

    解决mycatJDBC8驱动连接Mycat1.6报错 Unknown system variable 'query_cache_size'

    标题中的问题涉及到的是在使用Mycat数据中间件时,尝试使用JDBC 8驱动连接到Mycat 1.6版本时遇到的一个错误:`Unknown system variable 'query_cache_size'`。这个问题出现的原因在于MySQL 8.0版本中移除了`query_...

    【Mycat1.6之操作Oracle案例】

    【Mycat1.6与Oracle操作案例详解】 在IT行业中,数据库管理是核心部分,尤其是在大型企业级应用中。Mycat是一个开源的分布式数据库中间件,它提供了数据库分片、读写分离、故障切换等功能,适用于高并发、大数据量...

    mycat1.6jar包

    当mycat的jar包的版本过低时,替换这个可解决问题.

    Mycat1.6.7.3版本

    《Mycat 1.6.7.3版本:分布式数据库中间件的深度解析》 Mycat,作为一款开源的、基于Java开发的数据库中间件,被广泛应用于分布式数据库系统中,它能够有效地解决大数据量下的高并发访问问题。在Mycat 1.6.7.3这个...

Global site tag (gtag.js) - Google Analytics