import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
/**
* Created by EA on 2016/8/24.
*/
object Test3 {
def main(args: Array[ String ]) {
val conf = new SparkConf().setAppName("ScalaTest").setMaster("local")
val sc = new SparkContext(conf)
val rddInt: RDD[ Int ] = sc.parallelize(List(1, 2, 3, 4, 5), 3)
//定义一个Int类型的List,初始值为:1~5,分布在3个分区上
val rddAggr1: (Int, Int) =
rddInt.aggregate((0, 0))((x, y) => (x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2))
//( ⊙o⊙ )?)一看懵了,我们分开来,一个一个看,就比较好理解了:
//aggregate 返回一个2元组,元组内第一个值为List中所有数据求和,第二个值为List中的数据个数
//aggregate 其中(0,0) Int二元组,是参与计算的初始值
// 第一个0是参与求和的初始值,第二个0是参与求个数的初始值
//((x, y) => (x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2))分成两部分看:
//(x, y) => (x._1 + y, x._2 + 1) 单线程求和过程;
//(x, y) => (x._1 + y._1, x._2 + y._2) 各分区求和过程;
// (x, y) => (x._1 + y, x._2 + 1)理解如下:
//x._1为元组的第一个值,y为List中的各个值
//x._2为元组的第二个值,1为每次加1
//计算过程:
//x._1 + y 取元组(0,0)中 的第一个值0作为加数,与List中的第一个值1作为被加数相加,得到结果1;
//x._2 + 1 取元组(0,0)中的第二个值0作为加数,1作为被加数,得到结果1;
//以上两结果构成新元组(1,1),再以元组(1,1)与List中第二个值2求和(1+2=3,1+1=2)
//得到又一个新元组(3,2),以此类推,计算完所有数据求和,得到最终元组(15,5)
//(x, y) => (x._1 + y._1, x._2 + y._2)理解如下:
//以上求和过程为单线程计算过程,实际spark执行是分布式的
// 即,可能会把Lit(1,2,3,4,5)分为多个区,如:P1(1,2),P2(3,4),P3(5)
//各区计算结果:(3,2),(7,2),(5,1)
//每个区以单线程方式计算结果,最后再将各个区的计算结果合并
//(x, y) => (x._1 + y._1, x._2 + y._2)为每个区的计算结果元组相加
// 即,(3+7+5,2+2+1)得到(15,5)作为最终返回的二元组
//这里的x,y与前面的x,y没有关系
//此处的x._1可以理解为第一个分区的计算结果二元组的第一个值,x._2第二个值
//y._1与y._2可以理解为第二个分区的计算结果二元组的两个值
// 两个分区的计算结果合并后,作为新的元组,再与第三个分区合并,得到最终结果
println("rddAggr1:" + rddAggr1) // (15,5)
//理解了上面的过程,改成下面的计算就更清楚了(列表中数据乘积,并求个数)
val rddAggr2: (Int, Int) =
rddInt.aggregate((1, 0))((x, y) => (x._1 * y, x._2 + 1), (x, y) => (x._1 * y._1, x._2 + y._2))
println("rddAggr2:" + rddAggr2) // (120,5)
}
}
相关推荐
aggregate函数是RDD API中非常重要的一个动作操作,它可以对数据进行复杂的聚合计算。aggregate函数接受两个参数:一个是初始值(zeroValue),另一个是两个操作函数(seqOp和combOp)。初始值用于在每个分区和分区...
标题:“Spark RDD API”说明了本文档将专注于Apache Spark中弹性分布式数据集(RDD)的API。RDD是Spark的核心概念,它是一个容错的、并行的数据结构,使得用户能够处理大数据集。本文档将基于Scala语言中的RDD实现...
当Spark对数据操作和转换时,会自动将RDD中的数据分发到集群,并将操作并行化执行。 Spark中的RDD是一个不可变的分布式对象集合。每个RDD都倍分为多个分区,这些分区运行在集群中的不同节点。RDD可以包含Python、...
Spark中的aggregate函数和aggregateByKey函数是两个重要的聚合操作,它们可以对RDD中的元素进行聚合操作,生成新的RDD或值。本文将详细介绍aggregate函数和aggregateByKey函数的原理、使用方法和实例代码。 一、...
`reduce`用于聚合RDD中的所有元素,它接受一个函数 `(x, y) => (x+y)`,将RDD中的元素两两配对并应用该函数,最终得到一个单一的结果。例如,我们可以创建一个键值对RDD,通过`reduce`计算所有键的组合以及值的总和...
11. **aggregate(zeroValue: U)(seqOp: (U, T) => U, comOp: (U, U) => U)**: 对 RDD 中的元素进行聚合操作,使用一个初始值 `zeroValue`,`seqOp` 和 `comOp` 来指定如何聚合元素。 12. **fold** : 类似于 `reduce...
根据提供的信息,我们可以深入探讨Spark中的复杂算子及其在Scala中的应用。这部分内容重点在于`mapPartitionsWithIndex`、`aggregate`以及`aggregateByKey`等高级算子的理解与使用。 ### mapPartitionsWithIndex `...
- **DataFrame**:在Spark SQL中,DataFrame是一个表或关系的概念,它具有列名和列类型,可以看作是schemaRDD的升级版,提供了一种更高级别的抽象,使得数据处理更高效且易于理解。 - **DataSet**:DataFrame的类型...
**Resilient Distributed Datasets (RDD)**是Spark中最基础的数据抽象,它支持丰富的操作,包括转换和行动操作。接下来将详细介绍RDD API中的各个函数及其应用场景。 ##### 3.1 aggregate `aggregate`函数用于聚合...
1. **map()**: 这是Spark中最基本的转换操作,它接受一个函数作为参数,对数据集中的每个元素应用这个函数,并返回一个新的数据集。 2. **filter()**: 这个算子用于根据给定的条件筛选数据集中的元素。它接收一个...
例如,通过`reduce()`或`aggregate()`函数实现数据聚合,通过`join()`进行数据合并,通过`groupByKey()`或`groupBy()`进行分组统计。此外,Spark的窗口函数提供了一种处理时间序列数据或需要按时间顺序计算的方法。 ...
3. **转换与操作**:Spark SQL提供了丰富的函数库,可以方便地对Hive表进行转换和操作,如`join`、`group by`、`aggregate`等。 ### 四、学习资源 1. **《Hive简明教程.pdf》**:这个PDF文档应该包含了Hive的基础...
2. **函数式原语**:如map、filter、aggregate和join等操作,是构建流处理逻辑的基础。原语的丰富程度直接影响到应用的开发效率和可扩展性。 3. **状态管理**:许多流处理应用需要维护中间状态。框架应提供可靠的...
- 支持 Map、Reduce、Join、CoGroup、Union、Iterate、DeltaIterate、Filter、FlatMap、GroupReduce、Project、Aggregate、Distinct、Vertex-Update 等算子。 - 数据集(Dataset)和数据流(DataStream)模型。 -...
2.5 Spark RDD数据分享 ............................................................................................... 10 2.6 Spark RDD 迭代操作 ...........................................................