- 浏览: 73882 次
前两天一直在忙本职工作, 最近才有时间闲下来看了一下SparkSql的执行过程, 记录一下。
主要是通过sqlContext.sql() 这个方法作为一个入口。
在这之前先得知道一句SQL传到 sql()这个方法里面后要经历好几次转换, 最终生成一个executedPlan去执行。
总的过程分下面几步:
1.通过Sqlparse 转成unresolvedLogicplan
2.通过Analyzer转成 resolvedLogicplan
3.通过optimizer转成 optimzedLogicplan
4.通过sparkplanner转成physicalLogicplan
5.通过prepareForExecution 转成executable logicplan
6.通过toRDD等方法执行executedplan去调用tree的doExecute
借用一个图, 懒得自己画了:
现在那么先从sqlContext.sql进来看到:
构造了一个DF, 点进去看到里面new 了一个DF:
传入的Logicalplan是在DF object里面执行parseSql(sqlText)而获取的, 那么这里是怎么拿到这个logicalplan的呢, 点进去看一下其实他调的是sqlContext里面的parseSql
这里就拿一个select语句来看一下最终的executable plan是怎么生成的:
parseSql 是 ddlParser.parse(sql, false), ddlParser其实就是:
所以调用的是DDLParser类里面的parse 方法, 看一下这个方法是怎么写的:
其实是调用了父类AbstractSparkSQLParser 的parse方法, 看一下这个方法是怎么写的:
里面主要做了两件事:
1.执行 start方法
2.通过lexical.Scanner(input) 来对input进行解析, 符合query模式就返回success
这个start方法是在DDLParser类里面定义的, 看一下:
可以看到start其实就是去判断是不是 这三种语句createTable describeTable refreshTable, 方法里面怎么写的就自己进去看一下了, 反正我们用select语句的话, 明显不是上面三种类型之一, 那么不是上面三种类型的话 #2步就不会返回success, 所以是会去执行 case failureOrError => sys.error(failureOrError.toString), 抛出一个异常, 那么在DDLParser里面接收到异常, 就会跑到catch里面 去执行:
case _ if !exceptionOnError => parseQuery(input)
看一下DDLParser里面parseQuery是怎么来的:
可以看到是在创建DDLParser的时候传入的, 那么在sqlContext里面DDLParser是怎么声明的呢:
传入的是一个sqlParser.parse(_)
所以回去调用sqlParser的parse方法, sqlParser是这样创建的:
所以会去调用SparkSQLParser里面定义的parse方法, 但是看这个类里面没有重写parser的方法, 所以一样还是调用父类AbstractSparkSQLParser的parse方法:
那样的话还是会去执行start, 这个start就是在SparkSQLParser里面定义的:
看到start里面会去判断是不是cache uncache set show desc , 如果不是就会去调用others, select语句明显不是上面的任意一种, 所以 直接去看others怎么定义的:
注意这个是lazy, 所以只有在job被提交后才会真正执行
会直接去调用fallback, fallback是在创建SparkSQLParser的时候传入的:
那么就要到SQLContext里面去看这个fallback到底是什么了:
好了 select语句又被传到了getSQLDialect().parse(_)去执行, getSQLDialect:
实际是执行DefaultParserDialect的parser方法:
可以看到他实际是执行SqlParser的parse方法, 那么SqlParser里面parse是怎么写的, 可以看到里面没有重写parse方法, 那么继续调用start, start的定义如下:
可以看到里面的start1其实就是匹配到了select语句, 那么我们的select最后会通过start1的方法去生成一个unresolvedlogicalplan
那么有了unresolvedlogicalplan后我们去看DataFrame是怎么被构造出来的:
new出来DataFrame后回去调用def this(sqlContext: SQLContext, logicalPlan: LogicalPlan)这个构造函数, 这个构造函数其实是创建了返回了DataFrame本身, 但是传入参数为sqlContext, 和一个queryexecution (qe), qe的传入参数为unresolvedlogicplan
这个queryexecution就是这部分代码:
我们看一下queryExecution是怎么写的:
这个类是理解整个sql解析过程的关键, 传入对象是一个unresolvedlogicplan, 首先他会去调用sqlContext的analyzer去生成一个resolvedlogicplan:
也是lazy的, 会在job提交后执行。 我们看一下analyzer是怎么定义的:
new了一个Analyzer, 然后重写了extendedResolutionRules , analyzer里面主要定义了一个batches, 这个batches其实就是一些对传入的tree(logicplan)进行解析的各种rule, 在analyzer里面的rule是这样的:
里面有一个ResolveRelations , 看一下这个rule就会明白为什么网上好多资料会说analyzer是吧unresolvedlogicplan和catalog绑定生成resolvedlogicplan:
那么这些batches是怎么被调用的呢, 得看anzlyer的execute方法了:
这个比较复杂, 传入的是unresolvedLogicplan, 然后会对上面定义的batches进行遍历, 确保每个rule都做一遍:
batches.foreach { ...}
再每个batch里面执行foldLeft(curPlan)
所以第一次的时候 case里面的(plan, rule)其实就是(curPlan, 第一个rule)
通过val result = rule(plan) 对当前的plan 做rule, 返回的结果当成下一次的curplan执行直到所有的rule都做完, 得到最总的curPlan
然后会去判断是否还要执行一遍, 以保证所有的node都执行到了这些rule, 有个iteration来记录执行了多少次, 然后和strategy来做对比:
strategy主要有这几种:
Once
FixedPoint(maxIterations: Int)
初始化maxIterations = 1
strategy实在batch创建的时候传入的, 列子:
当所有的rule都执行了strategy规定的次数后, 就返回一个新的sparkplan。
analyzer这边执行完后, 传入的unresolvedlogicplan就变成了resolvedlogicplan。
然后会看一下这个resolvedlogicplan是不是可以用cachedata, 如果其中有在cache里面的就直接替换掉:
然后再通过optimizer把resolvedlogicplan变成一个optimzedLogicplan:
具体调用过程和analyzer一样, 就不重复了:
再通过sparkplaner转成physicalplan:
然后通过通过prepareForExecution转成executebleplan:
到这里为止就生成了一个可以执行的物理计划, 这个物理计划会在toRdd的时候执行:
可以看到一路下来都是lazy的, 所以只有在job真正提交后才会交由spark 去做这些事,
execute方法里面其实就是调用了物理计划的toexecute方法:
里面的doExecute方法只是一个声明, 其实现实在所有的sparkplan中实现的, 比如说实在所有的LeafNode UnaryNode BinaryNode的实现类里面实现的, 随便找一个列子:
这里的doExecute就返回了一个RDD
到这里基本上理的差不多了, 以后有什么要补充的再加把
主要是通过sqlContext.sql() 这个方法作为一个入口。
在这之前先得知道一句SQL传到 sql()这个方法里面后要经历好几次转换, 最终生成一个executedPlan去执行。
总的过程分下面几步:
1.通过Sqlparse 转成unresolvedLogicplan
2.通过Analyzer转成 resolvedLogicplan
3.通过optimizer转成 optimzedLogicplan
4.通过sparkplanner转成physicalLogicplan
5.通过prepareForExecution 转成executable logicplan
6.通过toRDD等方法执行executedplan去调用tree的doExecute
借用一个图, 懒得自己画了:
现在那么先从sqlContext.sql进来看到:
def sql(sqlText: String): DataFrame = { DataFrame(this, parseSql(sqlText)) }
构造了一个DF, 点进去看到里面new 了一个DF:
private[sql] object DataFrame { def apply(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = { new DataFrame(sqlContext, logicalPlan) } }
传入的Logicalplan是在DF object里面执行parseSql(sqlText)而获取的, 那么这里是怎么拿到这个logicalplan的呢, 点进去看一下其实他调的是sqlContext里面的parseSql
protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false)
这里就拿一个select语句来看一下最终的executable plan是怎么生成的:
parseSql 是 ddlParser.parse(sql, false), ddlParser其实就是:
protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_))
所以调用的是DDLParser类里面的parse 方法, 看一下这个方法是怎么写的:
def parse(input: String, exceptionOnError: Boolean): LogicalPlan = { try { parse(input) } catch { case ddlException: DDLException => throw ddlException case _ if !exceptionOnError => parseQuery(input) case x: Throwable => throw x } }
其实是调用了父类AbstractSparkSQLParser 的parse方法, 看一下这个方法是怎么写的:
def parse(input: String): LogicalPlan = synchronized { // Initialize the Keywords. initLexical phrase(start)(new lexical.Scanner(input)) match { case Success(plan, _) => plan case failureOrError => sys.error(failureOrError.toString) } }
里面主要做了两件事:
1.执行 start方法
2.通过lexical.Scanner(input) 来对input进行解析, 符合query模式就返回success
这个start方法是在DDLParser类里面定义的, 看一下:
protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable | refreshTable protected def start: Parser[LogicalPlan] = ddl
可以看到start其实就是去判断是不是 这三种语句createTable describeTable refreshTable, 方法里面怎么写的就自己进去看一下了, 反正我们用select语句的话, 明显不是上面三种类型之一, 那么不是上面三种类型的话 #2步就不会返回success, 所以是会去执行 case failureOrError => sys.error(failureOrError.toString), 抛出一个异常, 那么在DDLParser里面接收到异常, 就会跑到catch里面 去执行:
case _ if !exceptionOnError => parseQuery(input)
看一下DDLParser里面parseQuery是怎么来的:
class DDLParser(parseQuery: String => LogicalPlan) extends AbstractSparkSQLParser with DataTypeParser with Logging { ... }
可以看到是在创建DDLParser的时候传入的, 那么在sqlContext里面DDLParser是怎么声明的呢:
protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_))
传入的是一个sqlParser.parse(_)
所以回去调用sqlParser的parse方法, sqlParser是这样创建的:
protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect().parse(_))
所以会去调用SparkSQLParser里面定义的parse方法, 但是看这个类里面没有重写parser的方法, 所以一样还是调用父类AbstractSparkSQLParser的parse方法:
def parse(input: String): LogicalPlan = synchronized { // Initialize the Keywords. initLexical phrase(start)(new lexical.Scanner(input)) match { case Success(plan, _) => plan case failureOrError => sys.error(failureOrError.toString) } }
那样的话还是会去执行start, 这个start就是在SparkSQLParser里面定义的:
override protected lazy val start: Parser[LogicalPlan] = cache | uncache | set | show | desc | others
看到start里面会去判断是不是cache uncache set show desc , 如果不是就会去调用others, select语句明显不是上面的任意一种, 所以 直接去看others怎么定义的:
private lazy val others: Parser[LogicalPlan] = wholeInput ^^ { case input => fallback(input) }
注意这个是lazy, 所以只有在job被提交后才会真正执行
会直接去调用fallback, fallback是在创建SparkSQLParser的时候传入的:
class SparkSQLParser(fallback: String => LogicalPlan) extends AbstractSparkSQLParser {...}
那么就要到SQLContext里面去看这个fallback到底是什么了:
protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect().parse(_))
好了 select语句又被传到了getSQLDialect().parse(_)去执行, getSQLDialect:
protected[sql] def dialectClassName = if (conf.dialect == "sql") { classOf[DefaultParserDialect].getCanonicalName } else { conf.dialect } protected[sql] def getSQLDialect(): ParserDialect = { try { val clazz = Utils.classForName(dialectClassName) clazz.newInstance().asInstanceOf[ParserDialect] } catch { case NonFatal(e) => // Since we didn't find the available SQL Dialect, it will fail even for SET command: // SET spark.sql.dialect=sql; Let's reset as default dialect automatically. val dialect = conf.dialect // reset the sql dialect conf.unsetConf(SQLConf.DIALECT) // throw out the exception, and the default sql dialect will take effect for next query. throw new DialectException( s"""Instantiating dialect '$dialect' failed. |Reverting to default dialect '${conf.dialect}'""".stripMargin, e) } }
实际是执行DefaultParserDialect的parser方法:
private[spark] class DefaultParserDialect extends ParserDialect { @transient protected val sqlParser = SqlParser override def parse(sqlText: String): LogicalPlan = { sqlParser.parse(sqlText) } }
可以看到他实际是执行SqlParser的parse方法, 那么SqlParser里面parse是怎么写的, 可以看到里面没有重写parse方法, 那么继续调用start, start的定义如下:
protected lazy val start: Parser[LogicalPlan] = start1 | insert | cte protected lazy val start1: Parser[LogicalPlan] = (select | ("(" ~> select <~ ")")) * ( UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } | INTERSECT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Intersect(q1, q2) } | EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} | UNION ~ DISTINCT.? ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } )
可以看到里面的start1其实就是匹配到了select语句, 那么我们的select最后会通过start1的方法去生成一个unresolvedlogicalplan
那么有了unresolvedlogicalplan后我们去看DataFrame是怎么被构造出来的:
class DataFrame private[sql]( @transient override val sqlContext: SQLContext, @DeveloperApi @transient override val queryExecution: QueryExecution) extends Queryable with Serializable { // Note for Spark contributors: if adding or updating any action in `DataFrame`, please make sure // you wrap it with `withNewExecutionId` if this actions doesn't call other action. /** * A constructor that automatically analyzes the logical plan. * * This reports error eagerly as the [[DataFrame]] is constructed, unless * [[SQLConf.dataFrameEagerAnalysis]] is turned off. */ def this(sqlContext: SQLContext, logicalPlan: LogicalPlan) = { this(sqlContext, { val qe = sqlContext.executePlan(logicalPlan) if (sqlContext.conf.dataFrameEagerAnalysis) { qe.assertAnalyzed() // This should force analysis and throw errors if there are any } qe }) } .... }
new出来DataFrame后回去调用def this(sqlContext: SQLContext, logicalPlan: LogicalPlan)这个构造函数, 这个构造函数其实是创建了返回了DataFrame本身, 但是传入参数为sqlContext, 和一个queryexecution (qe), qe的传入参数为unresolvedlogicplan
这个queryexecution就是这部分代码:
val qe = sqlContext.executePlan(logicalPlan) if (sqlContext.conf.dataFrameEagerAnalysis) { qe.assertAnalyzed() // This should force analysis and throw errors if there are any } qe
我们看一下queryExecution是怎么写的:
class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) { def assertAnalyzed(): Unit = sqlContext.analyzer.checkAnalysis(analyzed) lazy val analyzed: LogicalPlan = sqlContext.analyzer.execute(logical) lazy val withCachedData: LogicalPlan = { assertAnalyzed() sqlContext.cacheManager.useCachedData(analyzed) } lazy val optimizedPlan: LogicalPlan = sqlContext.optimizer.execute(withCachedData) lazy val sparkPlan: SparkPlan = { SQLContext.setActive(sqlContext) sqlContext.planner.plan(optimizedPlan).next() } // executedPlan should not be used to initialize any SparkPlan. It should be // only used for execution. lazy val executedPlan: SparkPlan = sqlContext.prepareForExecution.execute(sparkPlan) /** Internal version of the RDD. Avoids copies and has no schema */ lazy val toRdd: RDD[InternalRow] = executedPlan.execute() protected def stringOrError[A](f: => A): String = try f.toString catch { case e: Throwable => e.toString } def simpleString: String = { s"""== Physical Plan == |${stringOrError(executedPlan)} """.stripMargin.trim } override def toString: String = { def output = analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ") s"""== Parsed Logical Plan == |${stringOrError(logical)} |== Analyzed Logical Plan == |${stringOrError(output)} |${stringOrError(analyzed)} |== Optimized Logical Plan == |${stringOrError(optimizedPlan)} |== Physical Plan == |${stringOrError(executedPlan)} """.stripMargin.trim } }
这个类是理解整个sql解析过程的关键, 传入对象是一个unresolvedlogicplan, 首先他会去调用sqlContext的analyzer去生成一个resolvedlogicplan:
lazy val analyzed: LogicalPlan = sqlContext.analyzer.execute(logical)
也是lazy的, 会在job提交后执行。 我们看一下analyzer是怎么定义的:
protected[sql] lazy val analyzer: Analyzer = new Analyzer(catalog, functionRegistry, conf) { override val extendedResolutionRules = ExtractPythonUDFs :: PreInsertCastAndRename :: (if (conf.runSQLOnFile) new ResolveDataSource(self) :: Nil else Nil) override val extendedCheckRules = Seq( datasources.PreWriteCheck(catalog) ) }
new了一个Analyzer, 然后重写了extendedResolutionRules , analyzer里面主要定义了一个batches, 这个batches其实就是一些对传入的tree(logicplan)进行解析的各种rule, 在analyzer里面的rule是这样的:
val extendedResolutionRules: Seq[Rule[LogicalPlan]] = Nil lazy val batches: Seq[Batch] = Seq( Batch("Substitution", fixedPoint, CTESubstitution, WindowsSubstitution), Batch("Resolution", fixedPoint, ResolveRelations :: ResolveReferences :: ResolveGroupingAnalytics :: ResolvePivot :: ResolveUpCast :: ResolveSortReferences :: ResolveGenerate :: ResolveFunctions :: ResolveAliases :: ExtractWindowExpressions :: GlobalAggregates :: ResolveAggregateFunctions :: HiveTypeCoercion.typeCoercionRules ++ extendedResolutionRules : _*), Batch("Nondeterministic", Once, PullOutNondeterministic, ComputeCurrentTime), Batch("UDF", Once, HandleNullInputsForUDF), Batch("Cleanup", fixedPoint, CleanupAliases) )
里面有一个ResolveRelations , 看一下这个rule就会明白为什么网上好多资料会说analyzer是吧unresolvedlogicplan和catalog绑定生成resolvedlogicplan:
object ResolveRelations extends Rule[LogicalPlan] { def getTable(u: UnresolvedRelation): LogicalPlan = { try { catalog.lookupRelation(u.tableIdentifier, u.alias) } catch { case _: NoSuchTableException => u.failAnalysis(s"Table not found: ${u.tableName}") } }
那么这些batches是怎么被调用的呢, 得看anzlyer的execute方法了:
def execute(plan: TreeType): TreeType = { var curPlan = plan batches.foreach { batch => val batchStartPlan = curPlan var iteration = 1 var lastPlan = curPlan var continue = true // Run until fix point (or the max number of iterations as specified in the strategy. while (continue) { curPlan = batch.rules.foldLeft(curPlan) { case (plan, rule) => val startTime = System.nanoTime() val result = rule(plan) val runTime = System.nanoTime() - startTime RuleExecutor.timeMap.addAndGet(rule.ruleName, runTime) if (!result.fastEquals(plan)) { logTrace( s""" |=== Applying Rule ${rule.ruleName} === |${sideBySide(plan.treeString, result.treeString).mkString("\n")} """.stripMargin) } result } iteration += 1 if (iteration > batch.strategy.maxIterations) { // Only log if this is a rule that is supposed to run more than once. if (iteration != 2) { logInfo(s"Max iterations (${iteration - 1}) reached for batch ${batch.name}") } continue = false } if (curPlan.fastEquals(lastPlan)) { logTrace( s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.") continue = false } lastPlan = curPlan } if (!batchStartPlan.fastEquals(curPlan)) { logDebug( s""" |=== Result of Batch ${batch.name} === |${sideBySide(plan.treeString, curPlan.treeString).mkString("\n")} """.stripMargin) } else { logTrace(s"Batch ${batch.name} has no effect.") } } curPlan }
这个比较复杂, 传入的是unresolvedLogicplan, 然后会对上面定义的batches进行遍历, 确保每个rule都做一遍:
batches.foreach { ...}
再每个batch里面执行foldLeft(curPlan)
所以第一次的时候 case里面的(plan, rule)其实就是(curPlan, 第一个rule)
通过val result = rule(plan) 对当前的plan 做rule, 返回的结果当成下一次的curplan执行直到所有的rule都做完, 得到最总的curPlan
然后会去判断是否还要执行一遍, 以保证所有的node都执行到了这些rule, 有个iteration来记录执行了多少次, 然后和strategy来做对比:
strategy主要有这几种:
Once
FixedPoint(maxIterations: Int)
初始化maxIterations = 1
strategy实在batch创建的时候传入的, 列子:
Batch("UDF", Once, HandleNullInputsForUDF)
当所有的rule都执行了strategy规定的次数后, 就返回一个新的sparkplan。
analyzer这边执行完后, 传入的unresolvedlogicplan就变成了resolvedlogicplan。
然后会看一下这个resolvedlogicplan是不是可以用cachedata, 如果其中有在cache里面的就直接替换掉:
lazy val withCachedData: LogicalPlan = { assertAnalyzed() sqlContext.cacheManager.useCachedData(analyzed) }
然后再通过optimizer把resolvedlogicplan变成一个optimzedLogicplan:
具体调用过程和analyzer一样, 就不重复了:
lazy val optimizedPlan: LogicalPlan = sqlContext.optimizer.execute(withCachedData)
再通过sparkplaner转成physicalplan:
lazy val sparkPlan: SparkPlan = { SQLContext.setActive(sqlContext) sqlContext.planner.plan(optimizedPlan).next() }
然后通过通过prepareForExecution转成executebleplan:
lazy val executedPlan: SparkPlan = sqlContext.prepareForExecution.execute(sparkPlan)
到这里为止就生成了一个可以执行的物理计划, 这个物理计划会在toRdd的时候执行:
lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
可以看到一路下来都是lazy的, 所以只有在job真正提交后才会交由spark 去做这些事,
execute方法里面其实就是调用了物理计划的toexecute方法:
protected def doExecute(): RDD[InternalRow] final def execute(): RDD[InternalRow] = { if (children.nonEmpty) { val hasUnsafeInputs = children.exists(_.outputsUnsafeRows) val hasSafeInputs = children.exists(!_.outputsUnsafeRows) assert(!(hasSafeInputs && hasUnsafeInputs), "Child operators should output rows in the same format") assert(canProcessSafeRows || canProcessUnsafeRows, "Operator must be able to process at least one row format") assert(!hasSafeInputs || canProcessSafeRows, "Operator will receive safe rows as input but cannot process safe rows") assert(!hasUnsafeInputs || canProcessUnsafeRows, "Operator will receive unsafe rows as input but cannot process unsafe rows") } RDDOperationScope.withScope(sparkContext, nodeName, false, true) { prepare() doExecute() } }
里面的doExecute方法只是一个声明, 其实现实在所有的sparkplan中实现的, 比如说实在所有的LeafNode UnaryNode BinaryNode的实现类里面实现的, 随便找一个列子:
case class Limit(limit: Int, child: SparkPlan) extends UnaryNode { // TODO: Implement a partition local limit, and use a strategy to generate the proper limit plan: // partition local limit -> exchange into one partition -> partition local limit again /** We must copy rows when sort based shuffle is on */ private def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = SinglePartition override def executeCollect(): Array[InternalRow] = child.executeTake(limit) protected override def doExecute(): RDD[InternalRow] = { val rdd: RDD[_ <: Product2[Boolean, InternalRow]] = if (sortBasedShuffleOn) { child.execute().mapPartitionsInternal { iter => iter.take(limit).map(row => (false, row.copy())) } } else { child.execute().mapPartitionsInternal { iter => val mutablePair = new MutablePair[Boolean, InternalRow]() iter.take(limit).map(row => mutablePair.update(false, row)) } } val part = new HashPartitioner(1) val shuffled = new ShuffledRDD[Boolean, InternalRow, InternalRow](rdd, part) shuffled.setSerializer(new SparkSqlSerializer(child.sqlContext.sparkContext.getConf)) shuffled.mapPartitionsInternal(_.take(limit).map(_._2)) } }
这里的doExecute就返回了一个RDD
到这里基本上理的差不多了, 以后有什么要补充的再加把
发表评论
-
kafka + flume + hdfs + zookeeper + spark 测试环境搭建
2017-07-20 11:28 1105最近由于项目需要, 搭建了一个类似线上环境的处理流数据的环境 ... -
源码跟踪executor如何写数据到blockmanager, 以及如何从blockmanager读数据
2016-08-10 19:41 1415之前看了Job怎么submit 以 ... -
Spark中Blockmanager相关代码解析
2016-08-04 19:47 1844前一段时间看了如何划分stage以及如何提交Job, 最后把结 ... -
Spark在submitStage后如何通过clustermanager调度执行task到Driver接收计算结果的代码解析
2016-08-01 14:08 1472前文: http://humingminghz.iteye.c ... -
Spark中saveAsTextFile至stage划分和job提交的源代码分析
2016-07-29 14:20 3360之前看了Spark Streaming和Spark SQL, ... -
SparkSQL DF.agg 执行过程解析
2016-07-19 10:21 4120在上一篇文章前, 我一直没看懂为什么下面的代码就能得到max或 ... -
SparkSQL SQL语句解析过程浅析
2016-07-15 19:06 0前两天一直在忙本职工 ... -
SparkStreaming从启动Receiver到收取数据生成RDD的代码浅析
2016-07-08 17:54 2233前面一片文章介绍了SocketTextStream 是如何从b ... -
Sparkstreaming是如何获取数据组成Dstream的源码浅析
2016-07-08 11:23 1472前面一篇文章介绍了SparkStreaming是如何不停的循环 ... -
SparkSQL 使用SQLContext读取csv文件 分析数据 (含部分数据)
2016-07-06 11:24 10149前两天开始研究SparkSQL, 其主要分为HiveConte ... -
SparkStreaming是如何完成不停的循环处理的代码浅析
2016-07-02 12:26 4654一直很好奇Sparkstreaming的ssc.start是怎 ... -
SparkStreaming 对Window的reduce的方法解析
2016-06-30 11:57 4727在sparkstreaming中对窗口 ... -
Sparkstreaming reduceByKeyAndWindow(_+_, _-_, Duration, Duration) 的源码/原理解析
2016-06-29 19:50 8790最近在玩spark streaming, 感觉到了他的强大。 ... -
关于Eclipse开发环境下 Spark+Kafka 获取topic的时候连接出错
2016-06-28 17:20 7403林林总总玩了Spark快一个月了, 打算试一下kafka的消息 ...
相关推荐
通过查看其源代码,我们可以看到它如何构建SQL语句来遍历对象,并动态执行用户提供的命令。 然而,需要注意的是,`sp_MSforeachtable`不是官方支持的系统存储过程,这意味着微软可能在未来的SQL Server版本中移除...
在列的选择上,Oracle 和 SQL Server 都要求在 SELECT 语句中指定数据源,如 FROM 子句。Oracle 提供了一个特殊的表 Dual,用于在没有实际数据表的情况下执行查询。例如,如果你需要返回一个常量,你可以这样做: ...
- 提高效率:由于SQL语句在预编译阶段被转换为C代码,执行时减少了解析和编译的时间。 - 易于调试:由于程序逻辑主要在C语言中实现,调试过程与其他C程序相似。 - 强大的数据库操作能力:结合SQL和PL/SQL,可以处理...
**SQL Maps** 是iBATIS的核心,它通过XML文件定义了Java对象与SQL语句之间的映射关系。这种映射简化了数据库操作,大大减少了编写Java代码的数量。SQL Maps的优势在于其简洁性,学习成本低,且在处理复杂的数据库表...
源代码优化虽然重要,但由于涉及复杂的逻辑改变,其成本和风险较高,通常不是首选。因此,优化的重点应放在SQL语句上,避免使用影响索引效率的表达式,如IS NULL和IS NOT NULL,以及在联接列中使用非索引的方式。 ...
JDBC(Java Database Connectivity)是Java平台中用于与各种数据库进行交互的标准API,它提供了一种桥梁,使得Java应用程序能够连接到数据库,执行SQL语句以及处理结果。 **JDBC连接数据库的步骤**: 1. **加载和...
iBATIS通过XML配置文件来定义SQL语句与Java对象之间的映射关系。这样,开发者可以在XML中编写SQL,而在Java代码中直接操作对象,从而实现对数据库的CRUD(Create、Read、Update、Delete)操作。 首先,创建一个简单...
JDBC允许Java程序通过发送SQL语句来操作数据库,实现数据的增删查改。 1. JDBC技术概述 JDBC的核心是一系列的Java接口和类,这些接口定义了与数据库进行交互的各种方法。通过JDBC,开发者可以编写一次代码,就能在...
开发者可以下载这些源代码,按照书中步骤进行实践操作,加深对SQL Server 2008功能的理解。 总之,《SQL Server 2008开发人员入门》是一本全面介绍SQL Server 2008的书籍,适合初学者和希望提升SQL Server技能的...
JDBC API包括了用于执行SQL语句、管理事务、处理结果集等方法。通过JDBC,开发者可以编写一次程序,让它在任何支持Java的平台上运行,而无需针对特定数据库进行修改。 2. JDBC驱动程序分类 JDBC驱动程序分为四类: ...
- 定义了一个SQL查询语句(`strExcel`),用于从Excel文件的指定工作表(`sheet1$`)中选取所有数据。 - 使用`OleDbDataAdapter`对象执行查询,并将结果填充到一个新的`DataSet`对象中。 **动态获取工作表名称** ...
用户可以自由下载、修改源代码,以满足个性化需求。 【C/S 浏览器设计】C/S(Client/Server)架构是指客户端/服务器结构,通常用于数据库应用。在 MySQL 数据库中的 C/S 浏览器产品,如 MYSQL Front、Manager 和 ...
在Java开发中,我们需确保数据来源的规范性,例如通过SQL查询语句获取数据库中的数据时,要遵循标准的操作规程,避免脏数据的引入。此外,使用Java的API如JDBC进行数据访问时,应有完善的异常处理机制,防止因输入...
这包括 `OleDbConnection`(用于创建和管理数据库连接)、`OleDbCommand`(用于执行 SQL 语句)和 `OleDbDataAdapter`(用于填充 DataTable 或 DataSet)等类。连接字符串如 "Provider=Microsoft.Jet.OLEDB.4.0;Data...
在数据库管理领域,处理空值(NULL)是一个常见的任务,因为NULL值...了解它们的差异和使用条件,有助于在不同数据库环境下编写更加灵活和适应性强的SQL代码。在实际开发中,选择哪个函数取决于具体需求和数据库环境。