`
ahua186186
  • 浏览: 561997 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

oceanus-58总体框架理解

 
阅读更多
1.总体思路:
通过约定的XML规则(分表分库规则)和 封装jdbc的Connection和PreparedStatement来实现SQL解析,sql路由和sql重写。

2.  3个核心类:ConnectionWrapper(JDBC Connection包装),PreparedStatementWrapper( JDBC PreparedStatement包装),SimpleExecutor(sql执行器,类似mybatis的SimpleExecutor)

3.  3个上下文传参数:ConnectionContext,StatementContext,transactionContext。

4.  真正干活的类:

(1)DefaultStatementContextBuilder类:解析SQL并保存BatchItem到StatementContext,--作者貌似直接用的mycat里面的sql解析的代码,直接拿来主义实现价值啊。
public StatementContext build(String sql, StatementContext context)
			throws SQLException {
		if (context == null) {
			context = new StatementContext();
			StatementContext.setContext(context);
			if (logger.isDebugEnabled()) {
				logger.debug("create context!sql=" + sql);
			}
		}
		if (context.getCurrentBatch().getSql() == null) {
			context.getCurrentBatch().setSql(sql);
		}

		StatementContextHandler handler = null;
		if (context.isBatch()) {
			handler = HandlerFactory.create(StatementType.BATCH);
			StatementContext resultContext = handler.handle(sql, context);
			processPreparedValues(resultContext);
			return resultContext;
		}
		
		TrackerExecutor.trackBegin(TrackPoint.PARSE_SQL, sql);
		SQLParser parser = StatementHelper.createSQLParser();
		try {
			DMLStatementNode statementNode = (DMLStatementNode) parser
					.parseStatement(sql);
			switch (statementNode.getNodeType()) {
			case NodeTypes.CURSOR_NODE:
				handler = HandlerFactory.create(StatementType.SELECT);
				break;
			case NodeTypes.DELETE_NODE:
				handler = HandlerFactory.create(StatementType.DELETE);
				break;
			case NodeTypes.UPDATE_NODE:
				handler = HandlerFactory.create(StatementType.UPDATE);
				break;
			case NodeTypes.INSERT_NODE:
				handler = HandlerFactory.create(StatementType.INSERT);
				break;
			case NodeTypes.CALL_STATEMENT_NODE:
				handler = HandlerFactory.create(StatementType.CALLABLE);
				break;
			}
			
			StatementContext resultContext = handler.handle(statementNode,
					context);
			
			TrackerExecutor.trackEnd(TrackPoint.PARSE_SQL);
			
			processPreparedValues(resultContext);
			return resultContext;

		} catch (StandardException se) {
			System.out.println("sql parse error, sql:"+sql);
			se.printStackTrace();
		} catch (Exception e) {
			e.printStackTrace();
		}
		processPreparedValues(context);
		return context;
	}

(2)DefaultTargetDispatcher:根据batchItem和batchItem中的tableInfo获取路由信息和重写SQL,达到路由到指定的分库分表的目的

Set<RouteTarget> getSpecifyTargets(TableInfo tableInfo, BatchItem batchItem) {
		Set<RouteTarget> targetSet = new LinkedHashSet<RouteTarget>();
		Configurations configurations = Configurations.getInstance();

		/**
		 * 解析where中符合分库分表字段的值
		 */
		Map<String, List<TableColumn>> resolveColumns = RouteHelper
				.getResolveColumns(tableInfo.getOrgName(),
						batchItem.getAnalyzeResult());
		List<Map<String, Object>> parameters = RouteHelper
				.getParameterValues(resolveColumns);
		Set<Integer> indexs = new HashSet<Integer>();
		TableDescription desc = configurations.getTableDescription(tableInfo
				.getOrgName());
		List<NameNodeHolder> nameNodes = desc.getNameNodes();
		Function func = desc.getFunction();
//分库分表函数,本质就是获取table节点中namenode节点的序号
		for (Map<String, Object> item : parameters) {
			checkParameters(item, batchItem);
			int i = func.execute(nameNodes.size(), item);
			indexs.add(i);
		}

		if (indexs.size() > 0) {
			AnalyzeResult analyzeResult = batchItem.getAnalyzeResult();
			HavingInfo havingInfo = analyzeResult.getHavingInfo();
			if (havingInfo != null) {
				AnalyzerCallback callback = havingInfo.getCallback();
				if (callback != null) {
					callback.call();
				}
			}
		}
		if ((!batchItem.getAnalyzeResult().getAppendResultColumns().isEmpty() || batchItem
				.getAnalyzeResult().getLimit() != null) && indexs.size() > 1) {// 存在limit或者avg等聚集函数,需要重新生成sql
			Collection<AnalyzerCallback> analyzerCallbacks = batchItem
					.getAnalyzeResult().getAnalyzerCallbacks();
			if (batchItem.getAnalyzeResult().getLimit() != null) {
				SqlValueItem limitItem = batchItem.getAnalyzeResult()
						.getLimit();
				SqlValueItem offsetItem = batchItem.getAnalyzeResult()
						.getOffset();
				if(offsetItem==null){
					offsetItem=new SqlValueItem();
					offsetItem.setValue(0);
				}
				if (limitItem.getParameterIndex() > 0
						&& offsetItem.getParameterIndex() > 0) {// limit ?,?
					Integer limitSize = limitItem.getValue()
							+ offsetItem.getValue();
					batchItem.getCallback(limitItem.getParameterIndex())
							.setParameter(limitSize);
					batchItem.getCallback(offsetItem.getParameterIndex())
							.setParameter(0);
					
					if(limitItem.getParameterIndex() > offsetItem.getParameterIndex()){
						batchItem.getCallback(offsetItem.getParameterIndex())
							.setParameterIndex(limitItem.getParameterIndex());
						
						batchItem.getCallback(limitItem.getParameterIndex())
							.setParameterIndex(offsetItem.getParameterIndex());
					}
				} else if (limitItem.getParameterIndex() > 0) {// limit 1,?
					Integer limitSize = limitItem.getValue()
							+ offsetItem.getValue();
					batchItem.getCallback(limitItem.getParameterIndex())
							.setParameter(limitSize);
				} else if (offsetItem.getParameterIndex() > 0) {// limit ?,10
					batchItem.getCallback(offsetItem.getParameterIndex())
							.setParameter(0);
				}
			}
			for (AnalyzerCallback callback : analyzerCallbacks) {
				callback.call();
			}
		} else {// 在单库路由的情况下,如果检测到是limit ?,?,就置换Parameter顺序
			SqlValueItem limitItem = batchItem.getAnalyzeResult()
					.getLimit();
			SqlValueItem offsetItem = batchItem.getAnalyzeResult()
					.getOffset();
			
			if(limitItem !=null && offsetItem !=null && 
					limitItem.getParameterIndex() > offsetItem.getParameterIndex()){
				
				batchItem.getCallback(offsetItem.getParameterIndex())
					.setParameterIndex(limitItem.getParameterIndex());
				
				batchItem.getCallback(limitItem.getParameterIndex())
					.setParameterIndex(offsetItem.getParameterIndex());
			}
		}
		for (Integer i : indexs) {// 生成target
			NameNode nameNode = configurations.getNameNode(
					tableInfo.getOrgName(), i);
			DefaultRouteTarget target = this.createTarget(batchItem, nameNode, tableInfo);
			targetSet.add(target);
		}

		
		for (RouteTarget item : targetSet) {
			DefaultRouteTarget target = (DefaultRouteTarget) item;
			SqlExecuteInfo info = new SqlExecuteInfo();
			info.setCallbacks(new LinkedHashSet<ParameterCallback<?>>(batchItem
					.getCallbacks()));

			if (desc.isDifferentName()) {
				info.setExecuteSql(configurations.getGenerator().generate(
						(NameNodeHolder) target.getNameNode(),
						batchItem.getAnalyzeResult()));
			} else if ((!batchItem.getAnalyzeResult().getAppendResultColumns().isEmpty() || batchItem
					.getAnalyzeResult().getLimit() != null) && nameNodes.size() > 1) {// 存在limit或者avg等聚集函数,需要重新生成sql,必须要超过1个路由结果
				info.setExecuteSql(configurations.getLimitAvgGenerator()
						.generate((NameNodeHolder) target.getNameNode(),
								batchItem.getAnalyzeResult()));
			} else {
				info.setExecuteSql(batchItem.getSql());
			}
			target.setExecuteInfo(info);
		}
		return targetSet;
	}


(3)SimpleExecutor 和 HandlerFactory:

根据StatementContext的RouteTarget(路由数据),

新建事务并获取数据库连接,   实际执行JDBC curd操作的类.

兴趣点:发现doUpdate的时候有用同步工具类:

CyclicBarrier barrier = new CyclicBarrier(n);



@SuppressWarnings({ "rawtypes", "unchecked" })
public class SimpleExecutor implements Executor {
	static Logger logger = LoggerFactory.getLogger(SimpleExecutor.class);
	static final ExecuteHandler<Integer> deleteHandler = new DeleteExecuteHandler();
	static final ExecuteHandler<Integer> insertHandler = new InsertExecuteHandler();
	static final ExecuteHandler<Integer> updateHandler = new UpdateExecuteHandler();
	static final ExecuteHandler<ResultSet> queryHandler = new QueryExecuteHandler();

	@Override
	public Object execute(StatementContext context, ExecuteCallback callback)
			throws SQLException {

		switch (context.getCurrentBatch().getAnalyzeResult().getStatementType()) {
		case SELECT:
			return this.doQuery(context, callback);
		case INSERT:
		case UPDATE:
		case DELETE:
			return doUpdate(context, callback);
		default:
			break;
		}
		return null;
	}

	ExecuteHandler<?> getHandler(StatementType statementType) {
		switch (statementType) {
		case SELECT:
			return queryHandler;
		case INSERT:
			return insertHandler;
		case UPDATE:
			return updateHandler;
		case DELETE:
			return deleteHandler;
		default:
			break;
		}
		return null;
	}
...


5.集成Mybatis:

因为Mybatis获取连接是通过PooledDataSource或UnpooledDataSource获取的,所以写个插件:包装下DataSource,把oceanus的connentionWrap包装进去即可实现整合。
分享到:
评论

相关推荐

    PyPI 官网下载 | tencentcloud-sdk-python-oceanus-3.0.448.tar.gz

    《PyPI官网下载的tencentcloud-sdk-python-oceanus-3.0.448.tar.gz:深入了解腾讯云Python SDK》 在Python编程环境中,开发者经常需要与各种云服务进行交互,以实现数据存储、计算任务调度等操作。腾讯云为开发者...

    Python库 | tencentcloud-sdk-python-oceanus-3.0.386.tar.gz

    资源分类:Python库 所属语言:Python 资源全名:tencentcloud-sdk-python-oceanus-3.0.386.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    Python库 | tencentcloud-sdk-python-oceanus-3.0.544.tar.gz

    资源分类:Python库 所属语言:Python 资源全名:tencentcloud-sdk-python-oceanus-3.0.544.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    Python库 | tencentcloud-sdk-python-oceanus-3.0.547.tar.gz

    资源分类:Python库 所属语言:Python 资源全名:tencentcloud-sdk-python-oceanus-3.0.547.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    Python库 | tencentcloud-sdk-python-oceanus-3.0.357.tar.gz

    资源分类:Python库 所属语言:Python 资源全名:tencentcloud-sdk-python-oceanus-3.0.357.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    Python库 | tencentcloud-sdk-python-oceanus-3.0.385.tar.gz

    资源分类:Python库 所属语言:Python 资源全名:tencentcloud-sdk-python-oceanus-3.0.385.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    Python库 | tencentcloud-sdk-python-oceanus-3.0.507.tar.gz

    资源分类:Python库 所属语言:Python 资源全名:tencentcloud-sdk-python-oceanus-3.0.507.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    Python库 | tencentcloud-sdk-python-oceanus-3.0.330.tar.gz

    资源分类:Python库 所属语言:Python 资源全名:tencentcloud-sdk-python-oceanus-3.0.330.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    58同城数据库中间件-58同城数据库中间件

    58同城数据库中间件 关于DB中间件 在DB存储需求中,尽管业务不同,技术难点还是类似的,开源世界有很多DB中间件,解决方案也以通用方案为主,满足业务需要为前提,支持各种类型的需求。 Oceanus致力于打造一个功能...

    oceanus架构

    58同城的分布式数据库中间件架构设计与实现的介绍

    Oceanus 使用文档1

    《Oceanus 使用文档1》是58同城针对其数据库中间件Oceanus提供的详细操作指南,旨在帮助用户理解和掌握如何有效且安全地使用Oceanus。Oceanus作为一款关键的中间件,其作用在于处理数据的分库分表需求,提供监控、...

    可自由拖拽的BI可视化系统源码.zip

    Oceanus通常不是一个标准的Java库或框架名,可能是项目团队为其BI系统选择的特定命名。在这样的系统中,我们可能会看到以下关键组成部分: 1. 数据接入:这部分负责从各种数据源(如数据库、API、CSV文件等)获取...

    oceanus

    依赖环境PHP 5.6.0版本及以上从腾讯云控制台开通相应产品获取SecretID,SecretKey以及调用地址(端点),端点为oceanus.tencentcloudapi.com,具体参考各产品说明。获取安装通过Composer安装通过Composer获取安装是...

    腾讯实时流计算平台的建设.pptx

    在这一过程中,腾讯从早期采用的JStorm逐渐过渡到基于Apache Flink构建的Oceanus平台,实现了更高效、更可靠的实时计算解决方案。 首先,腾讯实时计算的规模极其庞大,每天处理的消息总量达到20万亿条,日均消息总...

    一个基于Java可自由拖拽的BI可视化系统源码.zip

    BI是将大量数据转化为洞察力的重要工具,而可视化则是将这些数据以图形或图像形式展示,帮助决策者更好地理解和解释数据。 【描述】中的信息与标题一致,再次强调了这是一个Java实现的BI可视化系统,并且提供了源码...

    IT-运维工程师的23个细节-进阶.doc.pdf

    IT运维工程师的工作涉及众多细节,从基础架构到高级服务,都需要深入理解和熟练掌握。以下是针对运维工程师的关键知识点的详尽阐述: 1. **自动化部署**: - Bootstrapping技术如Kickstart和Cobbler可以帮助自动...

    腾讯实时流计算平台演进之路-杨华.pdf

    \n\n【Flink在腾讯的引入与应用】\n\n2017年,腾讯开始对Flink进行预研,与当时的主流实时计算框架Storm进行对比。经过评估,Flink在性能、稳定性以及易用性上展现出优势,因此腾讯在同年下半年开始内部版本的定制...

    mysql面试题及答案

    - 常用中间件:sharding-jdbc、Mycat、TDDL、Oceanus、Vitess、Atlas等。 - 面临的问题:事务管理、跨节点Join、统计计算、数据迁移、容量规划、ID生成、分片排序分页等。 5. **InnoDB与MyISAM的区别** - 事务...

    100道MySql面试题

    4. Oceanus(58 同城数据库中间件) 5. vitess(谷歌开发的数据库中间件) 6. Atlas(Qihoo 360) 分库分表可能遇到的问题: 1. 事务问题:需要用分布式事务 2. 跨节点 Join 的问题:解决这一问题可以分两次查询...

Global site tag (gtag.js) - Google Analytics