- 浏览: 74035 次
在上一篇文章前, 我一直没看懂为什么下面的代码就能得到max或者avg或者min的值:
数据是
身高 性别
这样的一个组合大概有几百万个值
刚开始是使用reducebykey去做计算, 后来发现网上有agg里面直接进行排序获取值的做法, 特地看了一下为什么传进去一个Map(column -> Expression)就能得到想要的结果
首先还是直接进到agg的方法里面:
看到他是执行groupBy返回对象的agg方法, 可以看到groupBy是一个GroupData:
GroupedData的agg方法:
可以看到他是使用toDF方法构建一个DataFrame, 看一下strToExpr里面其实是做了一个unresolvedFunction:
看一下toDF是怎么写的:
在groupBy方法里面我们其实可以看到传入的grouptype是GroupedData.GroupByType
所以这里会去执行:
case GroupedData.GroupByType =>
DataFrame(
df.sqlContext, Aggregate(groupingExprs, aliasedAgg, df.logicalPlan))
Aggregate方法继承自UnaryNode, 也就是一个LogicPlan
这个logicplan包含了我们传入的表达式, 比如说hight-> max这样的。 经过这几步后, 一个DataFrame被创建了, 按照之前的那片文章来看, DF会做下面这几步去优化logicplan直到一个可执行的物理计划为止: (包含对unresolvedFunction的优化)
1.通过Sqlparse 转成unresolvedLogicplan
2.通过Analyzer转成 resolvedLogicplan
3.通过optimizer转成 optimzedLogicplan
4.通过sparkplanner转成physicalLogicplan
5.通过prepareForExecution 转成executable logicplan
6.通过toRDD等方法执行executedplan去调用tree的doExecute
既然这样, 那么我们看一下unresolvedFunction是怎么会和max min avg等expression关联起来的, 进入analyzer, 看到SQLContext里面创建Analyzer时候传入了一个registry:
在这个FunctionRegistry里面包含了所有的expression:
这样当Analyzer在执行execute方法, 对所有的node进行Rule的时候, 有一个Rule叫ResolveFunctions, 下面是analyzer里面定义的batch:
在ResolveFunctions 是这样定义的:
看到这个方法会对所有的expression进行遍历:
registry.lookupFunction(name, children) match{
...
}
如果我们传入的是max或者min, 或者不属于这两者的, 那么直接就能返回aggregateexpression:
AggregateExpression(max, Complete, isDistinct = false)
AggregateExpression(min, Complete, isDistinct = false)
AggregateExpression(agg, Complete, isDistinct)
这样我们传入的max min就被registryFunction里面的expression代替了, 继续通过其他Rule执行来变成resolvedaggreFunction。
可以看到我们定义的max min或者avg其实在构建DataFrame的时候已经在其最总的执行计划里面了, 就不难理解为什么我们这样传入参数就能得到这些结果。
根据测试结果, 传入agg的expression的方法远比rdd计算获取结果快的多。 目前来看, 如果能用agg这样去获取想要的结果, 那么就不要用rdd去进行计算了。
如果有什么不对的地方, 请指正
ps:可以试一下传入的参数不在registryFunction里面的话会由checkAnalysis(resolvedAggregate)这个方法发现及抛出异常
malePPL.agg(Map("height" -> "max", "sex" -> "count")).show
数据是
身高 性别
这样的一个组合大概有几百万个值
刚开始是使用reducebykey去做计算, 后来发现网上有agg里面直接进行排序获取值的做法, 特地看了一下为什么传进去一个Map(column -> Expression)就能得到想要的结果
首先还是直接进到agg的方法里面:
/** * (Scala-specific) Aggregates on the entire [[DataFrame]] without groups. * {{{ * // df.agg(...) is a shorthand for df.groupBy().agg(...) * df.agg(Map("age" -> "max", "salary" -> "avg")) * df.groupBy().agg(Map("age" -> "max", "salary" -> "avg")) * }}} * @group dfops * @since 1.3.0 */ def agg(exprs: Map[String, String]): DataFrame = groupBy().agg(exprs)
看到他是执行groupBy返回对象的agg方法, 可以看到groupBy是一个GroupData:
@scala.annotation.varargs def groupBy(cols: Column*): GroupedData = { GroupedData(this, cols.map(_.expr), GroupedData.GroupByType) }
GroupedData的agg方法:
def agg(exprs: Map[String, String]): DataFrame = { toDF(exprs.map { case (colName, expr) => strToExpr(expr)(df(colName).expr) }.toSeq) }
可以看到他是使用toDF方法构建一个DataFrame, 看一下strToExpr里面其实是做了一个unresolvedFunction:
private[this] def strToExpr(expr: String): (Expression => Expression) = { val exprToFunc: (Expression => Expression) = { (inputExpr: Expression) => expr.toLowerCase match { // We special handle a few cases that have alias that are not in function registry. case "avg" | "average" | "mean" => UnresolvedFunction("avg", inputExpr :: Nil, isDistinct = false) case "stddev" | "std" => UnresolvedFunction("stddev", inputExpr :: Nil, isDistinct = false) // Also special handle count because we need to take care count(*). case "count" | "size" => // Turn count(*) into count(1) inputExpr match { case s: Star => Count(Literal(1)).toAggregateExpression() case _ => Count(inputExpr).toAggregateExpression() } case name => UnresolvedFunction(name, inputExpr :: Nil, isDistinct = false) } } (inputExpr: Expression) => exprToFunc(inputExpr) }
看一下toDF是怎么写的:
private[this] def toDF(aggExprs: Seq[Expression]): DataFrame = { val aggregates = if (df.sqlContext.conf.dataFrameRetainGroupColumns) { groupingExprs ++ aggExprs } else { aggExprs } val aliasedAgg = aggregates.map(alias) groupType match { case GroupedData.GroupByType => DataFrame( df.sqlContext, Aggregate(groupingExprs, aliasedAgg, df.logicalPlan)) case GroupedData.RollupType => DataFrame( df.sqlContext, Rollup(groupingExprs, df.logicalPlan, aliasedAgg)) case GroupedData.CubeType => DataFrame( df.sqlContext, Cube(groupingExprs, df.logicalPlan, aliasedAgg)) case GroupedData.PivotType(pivotCol, values) => val aliasedGrps = groupingExprs.map(alias) DataFrame( df.sqlContext, Pivot(aliasedGrps, pivotCol, values, aggExprs, df.logicalPlan)) } }
在groupBy方法里面我们其实可以看到传入的grouptype是GroupedData.GroupByType
所以这里会去执行:
case GroupedData.GroupByType =>
DataFrame(
df.sqlContext, Aggregate(groupingExprs, aliasedAgg, df.logicalPlan))
Aggregate方法继承自UnaryNode, 也就是一个LogicPlan
case class Aggregate( groupingExpressions: Seq[Expression], aggregateExpressions: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode { override lazy val resolved: Boolean = { val hasWindowExpressions = aggregateExpressions.exists ( _.collect { case window: WindowExpression => window }.nonEmpty ) !expressions.exists(!_.resolved) && childrenResolved && !hasWindowExpressions } override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute) }
这个logicplan包含了我们传入的表达式, 比如说hight-> max这样的。 经过这几步后, 一个DataFrame被创建了, 按照之前的那片文章来看, DF会做下面这几步去优化logicplan直到一个可执行的物理计划为止: (包含对unresolvedFunction的优化)
1.通过Sqlparse 转成unresolvedLogicplan
2.通过Analyzer转成 resolvedLogicplan
3.通过optimizer转成 optimzedLogicplan
4.通过sparkplanner转成physicalLogicplan
5.通过prepareForExecution 转成executable logicplan
6.通过toRDD等方法执行executedplan去调用tree的doExecute
既然这样, 那么我们看一下unresolvedFunction是怎么会和max min avg等expression关联起来的, 进入analyzer, 看到SQLContext里面创建Analyzer时候传入了一个registry:
protected[sql] lazy val functionRegistry: FunctionRegistry = FunctionRegistry.builtin.copy() protected[sql] lazy val analyzer: Analyzer = new Analyzer(catalog, functionRegistry, conf) { override val extendedResolutionRules = ExtractPythonUDFs :: PreInsertCastAndRename :: (if (conf.runSQLOnFile) new ResolveDataSource(self) :: Nil else Nil) override val extendedCheckRules = Seq( datasources.PreWriteCheck(catalog) ) }
在这个FunctionRegistry里面包含了所有的expression:
object FunctionRegistry { type FunctionBuilder = Seq[Expression] => Expression val expressions: Map[String, (ExpressionInfo, FunctionBuilder)] = Map( // misc non-aggregate functions expression[Abs]("abs"), expression[CreateArray]("array"), expression[Coalesce]("coalesce"), expression[Explode]("explode"), expression[Greatest]("greatest"), expression[If]("if"), expression[IsNaN]("isnan"), expression[IsNull]("isnull"), expression[IsNotNull]("isnotnull"), expression[Least]("least"), expression[Coalesce]("nvl"), expression[Rand]("rand"), expression[Randn]("randn"), expression[CreateStruct]("struct"), expression[CreateNamedStruct]("named_struct"), expression[Sqrt]("sqrt"), expression[NaNvl]("nanvl"), // math functions expression[Acos]("acos"), expression[Asin]("asin"), expression[Atan]("atan"), expression[Atan2]("atan2"), expression[Bin]("bin"), expression[Cbrt]("cbrt"), expression[Ceil]("ceil"), expression[Ceil]("ceiling"), expression[Cos]("cos"), expression[Cosh]("cosh"), expression[Conv]("conv"), expression[EulerNumber]("e"), expression[Exp]("exp"), expression[Expm1]("expm1"), expression[Floor]("floor"), expression[Factorial]("factorial"), expression[Hypot]("hypot"), expression[Hex]("hex"), expression[Logarithm]("log"), expression[Log]("ln"), expression[Log10]("log10"), expression[Log1p]("log1p"), expression[Log2]("log2"), expression[UnaryMinus]("negative"), expression[Pi]("pi"), expression[Pow]("pow"), expression[Pow]("power"), expression[Pmod]("pmod"), expression[UnaryPositive]("positive"), expression[Rint]("rint"), expression[Round]("round"), expression[ShiftLeft]("shiftleft"), expression[ShiftRight]("shiftright"), expression[ShiftRightUnsigned]("shiftrightunsigned"), expression[Signum]("sign"), expression[Signum]("signum"), expression[Sin]("sin"), expression[Sinh]("sinh"), expression[Tan]("tan"), expression[Tanh]("tanh"), expression[ToDegrees]("degrees"), expression[ToRadians]("radians"), // aggregate functions expression[HyperLogLogPlusPlus]("approx_count_distinct"), expression[Average]("avg"), expression[Corr]("corr"), expression[Count]("count"), expression[First]("first"), expression[First]("first_value"), expression[Last]("last"), expression[Last]("last_value"), expression[Max]("max"), expression[Average]("mean"), expression[Min]("min"), expression[StddevSamp]("stddev"), expression[StddevPop]("stddev_pop"), expression[StddevSamp]("stddev_samp"), expression[Sum]("sum"), expression[VarianceSamp]("variance"), expression[VariancePop]("var_pop"), expression[VarianceSamp]("var_samp"), expression[Skewness]("skewness"), expression[Kurtosis]("kurtosis"), // string functions expression[Ascii]("ascii"), expression[Base64]("base64"), expression[Concat]("concat"), expression[ConcatWs]("concat_ws"), expression[Encode]("encode"), expression[Decode]("decode"), expression[FindInSet]("find_in_set"), expression[FormatNumber]("format_number"), expression[GetJsonObject]("get_json_object"), expression[InitCap]("initcap"), expression[JsonTuple]("json_tuple"), expression[Lower]("lcase"), expression[Lower]("lower"), expression[Length]("length"), expression[Levenshtein]("levenshtein"), expression[RegExpExtract]("regexp_extract"), expression[RegExpReplace]("regexp_replace"), expression[StringInstr]("instr"), expression[StringLocate]("locate"), expression[StringLPad]("lpad"), expression[StringTrimLeft]("ltrim"), expression[FormatString]("format_string"), expression[FormatString]("printf"), expression[StringRPad]("rpad"), expression[StringRepeat]("repeat"), expression[StringReverse]("reverse"), expression[StringTrimRight]("rtrim"), expression[SoundEx]("soundex"), expression[StringSpace]("space"), expression[StringSplit]("split"), expression[Substring]("substr"), expression[Substring]("substring"), expression[SubstringIndex]("substring_index"), expression[StringTranslate]("translate"), expression[StringTrim]("trim"), expression[UnBase64]("unbase64"), expression[Upper]("ucase"), expression[Unhex]("unhex"), expression[Upper]("upper"), // datetime functions expression[AddMonths]("add_months"), expression[CurrentDate]("current_date"), expression[CurrentTimestamp]("current_timestamp"), expression[CurrentTimestamp]("now"), expression[DateDiff]("datediff"), expression[DateAdd]("date_add"), expression[DateFormatClass]("date_format"), expression[DateSub]("date_sub"), expression[DayOfMonth]("day"), expression[DayOfYear]("dayofyear"), expression[DayOfMonth]("dayofmonth"), expression[FromUnixTime]("from_unixtime"), expression[FromUTCTimestamp]("from_utc_timestamp"), expression[Hour]("hour"), expression[LastDay]("last_day"), expression[Minute]("minute"), expression[Month]("month"), expression[MonthsBetween]("months_between"), expression[NextDay]("next_day"), expression[Quarter]("quarter"), expression[Second]("second"), expression[ToDate]("to_date"), expression[ToUnixTimestamp]("to_unix_timestamp"), expression[ToUTCTimestamp]("to_utc_timestamp"), expression[TruncDate]("trunc"), expression[UnixTimestamp]("unix_timestamp"), expression[WeekOfYear]("weekofyear"), expression[Year]("year"), // collection functions expression[Size]("size"), expression[SortArray]("sort_array"), expression[ArrayContains]("array_contains"), // misc functions expression[Crc32]("crc32"), expression[Md5]("md5"), expression[Sha1]("sha"), expression[Sha1]("sha1"), expression[Sha2]("sha2"), expression[SparkPartitionID]("spark_partition_id"), expression[InputFileName]("input_file_name"), expression[MonotonicallyIncreasingID]("monotonically_increasing_id") )
这样当Analyzer在执行execute方法, 对所有的node进行Rule的时候, 有一个Rule叫ResolveFunctions, 下面是analyzer里面定义的batch:
lazy val batches: Seq[Batch] = Seq( Batch("Substitution", fixedPoint, CTESubstitution, WindowsSubstitution), Batch("Resolution", fixedPoint, ResolveRelations :: ResolveReferences :: ResolveGroupingAnalytics :: ResolvePivot :: ResolveUpCast :: ResolveSortReferences :: ResolveGenerate :: ResolveFunctions :: ResolveAliases :: ExtractWindowExpressions :: GlobalAggregates :: ResolveAggregateFunctions :: HiveTypeCoercion.typeCoercionRules ++ extendedResolutionRules : _*), Batch("Nondeterministic", Once, PullOutNondeterministic, ComputeCurrentTime), Batch("UDF", Once, HandleNullInputsForUDF), Batch("Cleanup", fixedPoint, CleanupAliases) )
在ResolveFunctions 是这样定义的:
object ResolveFunctions extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case q: LogicalPlan => q transformExpressions { case u if !u.childrenResolved => u // Skip until children are resolved. case u @ UnresolvedFunction(name, children, isDistinct) => withPosition(u) { registry.lookupFunction(name, children) match { // DISTINCT is not meaningful for a Max or a Min. case max: Max if isDistinct => AggregateExpression(max, Complete, isDistinct = false) case min: Min if isDistinct => AggregateExpression(min, Complete, isDistinct = false) // We get an aggregate function, we need to wrap it in an AggregateExpression. case agg: AggregateFunction => AggregateExpression(agg, Complete, isDistinct) // This function is not an aggregate function, just return the resolved one. case other => other } } } } }
看到这个方法会对所有的expression进行遍历:
registry.lookupFunction(name, children) match{
...
}
如果我们传入的是max或者min, 或者不属于这两者的, 那么直接就能返回aggregateexpression:
AggregateExpression(max, Complete, isDistinct = false)
AggregateExpression(min, Complete, isDistinct = false)
AggregateExpression(agg, Complete, isDistinct)
这样我们传入的max min就被registryFunction里面的expression代替了, 继续通过其他Rule执行来变成resolvedaggreFunction。
可以看到我们定义的max min或者avg其实在构建DataFrame的时候已经在其最总的执行计划里面了, 就不难理解为什么我们这样传入参数就能得到这些结果。
根据测试结果, 传入agg的expression的方法远比rdd计算获取结果快的多。 目前来看, 如果能用agg这样去获取想要的结果, 那么就不要用rdd去进行计算了。
如果有什么不对的地方, 请指正
ps:可以试一下传入的参数不在registryFunction里面的话会由checkAnalysis(resolvedAggregate)这个方法发现及抛出异常
发表评论
-
kafka + flume + hdfs + zookeeper + spark 测试环境搭建
2017-07-20 11:28 1108最近由于项目需要, 搭建了一个类似线上环境的处理流数据的环境 ... -
源码跟踪executor如何写数据到blockmanager, 以及如何从blockmanager读数据
2016-08-10 19:41 1426之前看了Job怎么submit 以 ... -
Spark中Blockmanager相关代码解析
2016-08-04 19:47 1848前一段时间看了如何划分stage以及如何提交Job, 最后把结 ... -
Spark在submitStage后如何通过clustermanager调度执行task到Driver接收计算结果的代码解析
2016-08-01 14:08 1481前文: http://humingminghz.iteye.c ... -
Spark中saveAsTextFile至stage划分和job提交的源代码分析
2016-07-29 14:20 3363之前看了Spark Streaming和Spark SQL, ... -
SparkSQL SQL语句解析过程源代码浅析
2016-07-15 19:34 6648前两天一直在忙本职工 ... -
SparkSQL SQL语句解析过程浅析
2016-07-15 19:06 0前两天一直在忙本职工 ... -
SparkStreaming从启动Receiver到收取数据生成RDD的代码浅析
2016-07-08 17:54 2237前面一片文章介绍了SocketTextStream 是如何从b ... -
Sparkstreaming是如何获取数据组成Dstream的源码浅析
2016-07-08 11:23 1478前面一篇文章介绍了SparkStreaming是如何不停的循环 ... -
SparkSQL 使用SQLContext读取csv文件 分析数据 (含部分数据)
2016-07-06 11:24 10160前两天开始研究SparkSQL, 其主要分为HiveConte ... -
SparkStreaming是如何完成不停的循环处理的代码浅析
2016-07-02 12:26 4660一直很好奇Sparkstreaming的ssc.start是怎 ... -
SparkStreaming 对Window的reduce的方法解析
2016-06-30 11:57 4733在sparkstreaming中对窗口 ... -
Sparkstreaming reduceByKeyAndWindow(_+_, _-_, Duration, Duration) 的源码/原理解析
2016-06-29 19:50 8796最近在玩spark streaming, 感觉到了他的强大。 ... -
关于Eclipse开发环境下 Spark+Kafka 获取topic的时候连接出错
2016-06-28 17:20 7411林林总总玩了Spark快一个月了, 打算试一下kafka的消息 ...
相关推荐
例如,`df.createOrReplaceTempView("tempView")`后,可以使用`spark.sql("SELECT * FROM tempView WHERE column = 'value'")`执行SQL查询。 6. **DataFrame优化**:Spark SQL使用 Catalyst 编译器优化查询计划,...
示例代码:using MatterHackers.Agg.UI; using System; namespace MatterHackers.Agg { public class HelloWorld : SystemWindow { public HelloWorld() : base(640, 480) { // add the ...
val avgAge = df.agg(avg("age")).first().getDouble(0) // 11. 计算 age 的最小值 val minAge = df.agg(min("age")).first().getInt(0) ``` #### 四、实验总结 通过本次实验,学生不仅掌握了Spark SQL的基本操作...
- 分组计算:`df.groupby('column').agg(func)`按列分组并应用聚合函数。 - 聚合操作:`df.sum()`, `df.mean()`, `df.median()`等。 - 数据透视表:`pd.pivot_table(data, values=None, index=None, columns=None...
•如果要用AGG的控件和窗体,要加入[AGG]\src\ctrl\*.cpp和[AGG]\src\platform\<OS>\*.cpp,头文件在[AGG]\include\ctrl和[AGG]\include\platform里 •如果要用到TrueType字体显示,要加入[AGG]\font_win32_tt目录下...
此渲染器将覆盖对api v2 .agg端点的调用的默认行为。 支持GET调用以列出以下格式的端点: endpoint.agg/?aggregate[Count]=(field to count) endpoint.agg/?aggregate[Sum]=(field to sum) endpoint.agg/?...
- 分组聚合:`df.groupby('group_column').agg(func)` 8. **合并与连接** - 合并DataFrame:`pd.concat([df1, df2], axis=0/1)` - 内连接:`df1.merge(df2, on='common_column')` - 左/右/外连接:`left_on`, `...
描述中提到的“对采集到的数据进行解析,可以直接更换数据,采集”进一步强调了解析过程对于数据操作和更新的重要性。 数据解析通常是通过编程语言实现的,例如Python,它具有强大的数据处理库,如Pandas、Numpy和...
接着,我们可以使用`.agg()`方法对每个分组执行聚合操作。聚合操作可以包括求和、平均值、最大值、最小值等。在示例中,`df.groupby('A').agg('min')`计算了每组中'B'和'C'列的最小值。`df.groupby('A').agg(['min',...
- `agg`: 这个函数主要用于**聚合运算**,它接受一个或多个函数作为参数,对DataFrame或GroupBy对象的列执行聚合操作。这些函数通常会减少数据的维度,如计算平均值、总和、最大值等。`agg`的目的是将一维数组(如...
aggregated = df.groupby('col').agg(custom_agg) ``` 47. **如何使用Pandas的MultiIndex来处理高维数据?** - 使用`.stack()`和`.unstack()`: ```python stacked = df.stack() unstacked = stacked.unstack...
- `df.groupby('A')['B'].agg(['mean', 'sum'])`:对每个分组执行多个操作。 **2.4 和 fillna 连用** - `df.groupby('A').fillna(method='ffill')`:向前填充缺失值。 #### 3. 注意事项 - 在使用 groupby 时,需要...
- 分组聚合:`groupby().agg(['function1', 'function2'])` ### 8. 时间序列分析 Pandas支持时间序列数据处理,包括日期和时间操作。 - 创建日期索引:`pd.date_range(start, end)` - 将列转换为日期:`df['date_...
agg_v2.0.0.apk
result = grouped.B.agg(['sum', 'max']) ``` ### 4. 数据合并 #### `pd.merge()` 合并两个DataFrame,类似于SQL中的JOIN操作,可以按共同的列进行连接。 ```python df2 = pd.DataFrame({'A': [1, 2], 'D': [10, ...
首先,我们需要创建一个 SparkSession 对象,它是 SparkSQL 的入口点,可以用来创建 DataFrame 和执行 SQL 查询。 ```python from pyspark.sql import SparkSession spark = SparkSession.builder.appName('...
### AGG学习手册知识点解析 #### 一、AGG简介 **AGG**(Anti-Grain Geometry)是一个开源的2D图形库,以其高效、跨平台的特点而著称。相较于GDI+(Graphics Device Interface Plus),AGG不仅提供了更为灵活的编程...
( agg-2.4-2.1.i386.rpm )