`

spark 几种transformation 的计算逻辑和测试

阅读更多

测试例子使用的数据:
test01:
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
a a  
b b  
c c  
d d  
e e  
f f  
g g  

test02:
[java]view plaincopy
print?在CODE上查看代码片派生到我的代码片
11
22
33
44
55
66
a a  
b b  
c c  
d d  
e e  
f f  


1、union(otherRDD)
      union() 将两个rdd简单结合在一起,与mysql中的union 操作类似只不过它是操作的rdd,它不会改变partition中的数据
spark sql 测试:
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
./spark-shell  
sc  
val t01 = sc.textFile("hdfs://user/data_spark/test01")  
val t02 = sc.textFile("hdfs://user/data_spark/test02")  
t01.union(t01) foreach println  
结果:  
a a  
e e  
b b  
a a  
f f  
b b  
c c  
c c  
g g  
d d  
d d  
e e  
f f  
g g  

多次测试union,结果顺序都是随机的,所以,union只是简单的将两个rdd的数据拼接到一起

2、groupByKey(numPartitions)
      普通的RDD 类是没有这个方法的,org.apache.spark.rdd.PairRDDFunctions 这个pairRdd提供这个方法;
       顾名思义,这个方法是将相同的key的records聚合在一起,类似mysql中的groupby操作,通过ShuffledRDD将每个partition中fetch过来,shuffle机制默认用的是hashShuffle,spark1.1版本引入sorted shuffle,速度更快。shuffle操作后面接着mapPartition()操作,生成MapPartitionRDD。这就是groupbykey的结果了。
      同一个key的值聚合以后,将所有的value放到一个arraylist,新的arraylist 作为value
[java]view plaincopy
print?在CODE上查看代码片派生到我的代码片
val wc = t01.union(t01).flatMap(l=>l.split(" ")).map(w=>(w,1))  
wc foreach println  
结果:  
(e,1)  
(e,1)  
(e,1)  
(e,1)  
(f,1)  
(f,1)  
(f,1)  
(f,1)  
(g,1)  
(g,1)  
(g,1)  
(g,1)  
(a,1)  
(a,1)  
(a,1)  
(a,1)  
(b,1)  
(b,1)  
(b,1)  
(b,1)  
(c,1)  
(c,1)  
(c,1)  
(c,1)  
(d,1)  
(d,1)  
(d,1)  
(d,1)  
wc.groupByKey foreach println  
结果:  
(d,CompactBuffer(1, 1, 1, 1))  
(g,CompactBuffer(1, 1, 1, 1))  
(c,CompactBuffer(1, 1, 1, 1))  
(b,CompactBuffer(1, 1, 1, 1))  
(f,CompactBuffer(1, 1, 1, 1))  
(e,CompactBuffer(1, 1, 1, 1))  
(a,CompactBuffer(1, 1, 1, 1))  
ok,groupByKey 之后,将同一个key的值都放到一个列表中

3、reduceByKey(func,numPartition)
      这个操作的作用类似mapreduce中的reduce操作,对相同的key的值加上func的操作,比如要做wordcount的操作:
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
map(x=>(x,1)).reduceByKey(_+_, 5)  
      reduceByKey默认开启map端的combine,上面的groupByKey默认没有开启map端的combine操作,可以人工设置一下。
接上面的测试
[java]view plaincopy
print?在CODE上查看代码片派生到我的代码片
wc.reduceByKey(_+_) foreach println  
结果:  
(d,4)  
(b,4)  
(f,4)  
(g,4)  
(c,4)  
(e,4)  
(a,4)  

4、distinct(numPartitions)
      将 parent rdd 的数据去重,放到新的numPartitions,还是要通过shuffle操作,如果是kv pair 的数据  则直接进行shuffle 操作,如果只有key,那么spark先将数据转换成再进行shuffle。其实后面调用的是reduceByKey()
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
wc.distinct(1) foreach println  
结果:  
(g,1)  
(b,1)  
(f,1)  
(d,1)  
(a,1)  
(e,1)  
(c,1)  

5、cogroup(otherRDD,numPartitions)
     与groupByKey不同的地方,cogroup 是将多个rdd的数据聚合到一起,过程跟groupByKey 类似.
     但是结果是一个包含多个arraylist 的arraylist,每一个rdd 的value放到一个arraylist,然后,将这些arraylist放到一个元素的arraylist的arraylist。
[java]view plaincopy
print?在CODE上查看代码片派生到我的代码片
val wc01 = t01.flatMap(l=>l.split(" ")).map(w=>(w,1))  
val wc02 = t02.flatMap(l=>l.split(" ")).map(w=>(w,1))  
wc01.cogroup(wc02,1) foreach println  
结果:  
(d,(CompactBuffer(1, 1),CompactBuffer(1, 1)))  
(e,(CompactBuffer(1, 1),CompactBuffer(1, 1)))  
(4,(CompactBuffer(),CompactBuffer(1, 1)))  
(5,(CompactBuffer(),CompactBuffer(1, 1)))  
(a,(CompactBuffer(1, 1),CompactBuffer(1, 1)))  
(6,(CompactBuffer(),CompactBuffer(1, 1)))  
(b,(CompactBuffer(1, 1),CompactBuffer(1, 1)))  
(2,(CompactBuffer(),CompactBuffer(1, 1)))  
(3,(CompactBuffer(),CompactBuffer(1, 1)))  
(f,(CompactBuffer(1, 1),CompactBuffer(1, 1)))  
(1,(CompactBuffer(),CompactBuffer(1, 1)))  
(g,(CompactBuffer(1, 1),CompactBuffer()))  
(c,(CompactBuffer(1, 1),CompactBuffer(1, 1)))  


6、intersection(otherRDD)
     这个操作值保留两个rdd中都包含的数据,首先将rdd的数据转化成,后面调用cogroup()操作。
     然后,  对cogroup结果进行过滤,由前面cogroup 的结果格式介绍可知,会生成包含两个arraylist元素的arraylist,只保留结果中两个arraylist都不为空的,最后取出key,便是最终的结果。
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
wc01.intersection(wc02) foreach println  
结果:  
(d,1)  
(e,1)  
(b,1)  
(f,1)  
(a,1)  
(c,1)  
只有两个rdd共同的部分 kv 对

7、join(otherRDD, numPartitions)
     将两个RDD[ K, V ] 安装sql中的join方式聚合。类似intersection,先进行cogroup操作,得到 的MappedValuesRDD。
     将 Iterable[v1] 和 Iterable[v2] 做笛卡尔集,并将集合flat()化,生成FlatMappedValuesRDD。
[java]view plaincopy
print?在CODE上查看代码片派生到我的代码片
wc01.join(wc02,1) foreach println  
结果:  
(d,(1,1))  
(d,(1,1))  
(d,(1,1))  
(d,(1,1))  
(e,(1,1))  
(e,(1,1))  
(e,(1,1))  
(e,(1,1))  
(a,(1,1))  
(a,(1,1))  
(a,(1,1))  
(a,(1,1))  
(b,(1,1))  
(b,(1,1))  
(b,(1,1))  
(b,(1,1))  
(f,(1,1))  
(f,(1,1))  
(f,(1,1))  
(f,(1,1))  
(c,(1,1))  
(c,(1,1))  
(c,(1,1))  
(c,(1,1))  
这个jion应该对应于mysql的inner join,只包含双方共有的数据

8、sortByKey(ascending,numPartitions)
     将RDD [ k, v ] 按照key进行排序,如果ascending=true表示升序,false表示降序。
     先通过shuffle将数据聚合到一起,然后将聚合的数据按照key排序
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
wc01.sortByKey(true,1) foreach println  
结果:  
(a,1)  
(a,1)  
(b,1)  
(b,1)  
(c,1)  
(c,1)  
(d,1)  
(d,1)  
(e,1)  
(e,1)  
(f,1)  
(f,1)  
(g,1)  
(g,1)  

9、cartesian(otherRDD)
       求两个rdd的笛卡尔集,生成的CartesianRDD中的partition个数为两个rdd的partition的个数乘积。
       逻辑类似join
笛卡尔乘积,这个很简单,不过数据量大的话就不要这么做了

10、coalesce(numPartitions, shuffle = false) 
        合并,对一个rdd,两种方式,一种需要shuffle,一种直接将多个partitions的内容合并到一起,不需要shuffle。
        这个方法的主要作用就是调整 parentRDD 的partition数量。合并因素除了考虑partition的个数外,还应该考虑locality 和 balance的问题
这个操作的逻辑比较难理解:
[java]view plaincopy
print?在CODE上查看代码片派生到我的代码片
 wc01.coalesce(1) foreach println  
结果:  
14/10/2717:23:42 INFO rdd.HadoopRDD: Input split: hdfs://qunarcluster/user/data_spark/test01:0+14
(a,1)  
(a,1)  
(b,1)  
(b,1)  
(c,1)  
(c,1)  
(d,1)  
(d,1)  
14/10/2717:23:42 INFO rdd.HadoopRDD: Input split: hdfs://qunarcluster/user/data_spark/test01:14+14
(e,1)  
(e,1)  
(f,1)  
(f,1)  
(g,1)  
(g,1)  
 wc01.coalesce(2) foreach println  
结果:  
(a,1)  
(a,1)  
(b,1)  
(b,1)  
(c,1)  
(c,1)  
(e,1)  
(d,1)  
(e,1)  
(d,1)  
(f,1)  
(f,1)  
(g,1)  
(g,1)  

11、repartition(numPartitions)        
        等价于coalesce(numPartitions, shuffle = true)
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
wc01.repartition(1) foreach println  
(a,1)  
(a,1)  
(b,1)  
(b,1)  
(c,1)  
(c,1)  
(d,1)  
(d,1)  
(e,1)  
(e,1)  
(f,1)  
(f,1)  
(g,1)  
(g,1)  



虽然mapreduce 相当于 spark 的 map +  reduceByKey, 但是 mapreduce中的reduce可以灵活的操作,加入一些自己的逻辑,所以,各有所长。
但是,spark 确实很方便

分享到:
评论

相关推荐

    Spark-Transformation和Action算子.md

    **Action** 是 Spark 中的另一种操作类型,与 Transformation 不同的是,Action 会立即执行计算。下面介绍几个常用的 Action 操作。 ##### 2.1 reduce `reduce` 函数用于对 RDD 中的所有元素应用一个特定的操作,...

    spark考试练习题含答案.rar

    1. **Spark Shell**:交互式的命令行工具,可用于测试和学习Spark。 2. **Spark Job**:一系列操作的集合,由SparkContext提交执行。 3. **Spark Application**:一个完整的Spark程序,包括main方法和Job。 4. **...

    计算机专业基础理论电子书合集09----spark

    Spark作为大数据处理领域的明星框架,因其高效、灵活和易用的特点,被广泛应用于大规模数据处理、实时计算、机器学习和图形处理等多个领域。本电子书合集深入剖析了Spark的核心概念、架构设计以及实际应用,旨在帮助...

    华为大数据认证:Spark2x基于内存的分布式计算.pptx

    Spark SQL用于SQL查询和数据处理,Dataset提供类型安全的API,Spark Structured Streaming处理实时流数据,Spark Streaming处理连续数据流,MLlib支持机器学习,GraphX处理图计算,而SparkR则提供了R语言接口。...

    Spark Transformation和Action算子速查表.pdf

    在Spark中,数据通常以RDD(弹性分布式数据集)的形式存在,并通过两种类型的算子进行处理:Transformation(转换)算子和Action(行动)算子。 **Transformation算子**:这些算子用于创建一个新的RDD。它们的操作...

    spark_examples:Spark程序的示例测试程序

    - **Transformation和Action**:Transformation定义了数据处理逻辑,不触发计算;Action则触发计算,并返回结果到Driver端。 3. **Java API使用** - **创建RDD**:通过SparkContext的 parallelize() 方法将Java...

    SparkCore.docx

    Spark设计的主要目标是提供一个高效、易用且通用的大数据处理平台,与Hadoop MapReduce相比,Spark在处理迭代计算和交互式数据挖掘方面表现更优。SparkCore通过弹性分布式数据集(RDD)这一核心概念实现了这一点。 ...

    spark简介及使用

    Spark 是一种高性能、易用且灵活的大数据处理框架,尤其适合迭代计算和交互式查询。通过内存计算和高效的容错机制,Spark 相比传统 MapReduce 框架如 Hadoop 显示出显著的性能优势。此外,Spark 支持多种计算模型和...

    在Kettle(PDI)跑Apache Spark作业

    Kettle是一款开源的数据集成工具,它允许用户通过图形化界面设计数据处理流程,而Apache Spark则是一个用于大数据处理的快速、通用和可扩展的计算框架。将两者结合使用,可以在Kettle中构建和调度Spark作业,充分...

    Spark分布式计算和RDD模型研究.docx

    总的来说,Spark的RDD模型提供了一种高效、灵活的分布式内存计算方式,尤其适合于迭代计算和需要频繁数据重用的应用。通过RDD的转换和动作,用户可以轻松构建复杂的并行计算流程,同时享受到内存计算的高速和容错...

    spark官方文档中文版

    Spark 提供了 Spark Shell 作为交互式环境,方便开发者测试和调试代码。此外,Spark 支持 Scala、Java、Python 和 R 四种编程语言,让开发人员可以根据自己的偏好选择合适的工具。 Spark 还引入了动态资源调度,...

    Spark技术内幕深入解析Spark内核架构设计与实现原理

    Driver负责任务调度和计算逻辑,Executor在工作节点上运行任务,Cluster Manager协调资源分配,而RDD是Spark的基本数据抽象,是不可变的、分区的记录集合。 3. **RDD操作**:RDD支持两种基本操作——转换...

    Spark生态和安装部署

    RDD 支持两种类型的操作:转换(Transformation)和行动(Action)。转换会返回一个新的 RDD,而行动则触发实际的计算,并返回结果。 #### 二、Spark 生态有哪些? Spark 不仅仅是一个单一的框架,它还拥有一个...

    Spark源码剖析

    RDD 支持转换(Transformation)和动作(Action)两种操作,转换创建新的 RDD,动作触发实际计算。 2. **Spark 核心组件**:Spark 包含多个组件,如 Spark Core、Spark SQL、Spark Streaming、MLlib 和 GraphX。...

    spark_code_basic

    RDD提供了两种操作类型:转换(Transformation)和动作(Action)。转换创建新的RDD,而动作则触发计算并可能返回结果到驱动程序。 2. **Spark架构** Spark采用了主-从架构,由Driver Program、SparkContext、...

    spark-1.6.0-bin-hadoop2.6.tgz

    RDDs支持两种主要操作:转换(Transformation)和动作(Action)。转换创建新的RDD,而动作触发实际的计算并可能返回结果到驱动程序或存储数据。 **2. Spark组件** - **Spark Core**:Spark的基础架构,提供了任务...

    spark-scala-api

    Spark Scala API 是一个用于大数据处理的强大工具,它结合了Apache Spark的高性能计算框架与Scala编程语言的简洁性和表达力。这个zip压缩包很可能是包含了Spark的Scala开发接口及相关示例,便于开发者在Scala环境中...

    Spark官方文档指南chm版本

    RDDs支持多种操作,如转换(Transformation)和动作(Action),转换不立即执行,而是生成一个新的RDD,而动作则触发实际的计算。 文档中会详细介绍Spark的架构,包括其主节点(Driver)如何协调工作节点(Executor...

Global site tag (gtag.js) - Google Analytics