测试例子使用的数据:
test01:
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
a a
b b
c c
d d
e e
f f
g g
test02:
[java]view plaincopy
print?在CODE上查看代码片派生到我的代码片
11
22
33
44
55
66
a a
b b
c c
d d
e e
f f
1、union(otherRDD)
union() 将两个rdd简单结合在一起,与mysql中的union 操作类似只不过它是操作的rdd,它不会改变partition中的数据
spark sql 测试:
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
./spark-shell
sc
val t01 = sc.textFile("hdfs://user/data_spark/test01")
val t02 = sc.textFile("hdfs://user/data_spark/test02")
t01.union(t01) foreach println
结果:
a a
e e
b b
a a
f f
b b
c c
c c
g g
d d
d d
e e
f f
g g
多次测试union,结果顺序都是随机的,所以,union只是简单的将两个rdd的数据拼接到一起
2、groupByKey(numPartitions)
普通的RDD 类是没有这个方法的,org.apache.spark.rdd.PairRDDFunctions 这个pairRdd提供这个方法;
顾名思义,这个方法是将相同的key的records聚合在一起,类似mysql中的groupby操作,通过ShuffledRDD将每个partition中fetch过来,shuffle机制默认用的是hashShuffle,spark1.1版本引入sorted shuffle,速度更快。shuffle操作后面接着mapPartition()操作,生成MapPartitionRDD。这就是groupbykey的结果了。
同一个key的值聚合以后,将所有的value放到一个arraylist,新的arraylist 作为value
[java]view plaincopy
print?在CODE上查看代码片派生到我的代码片
val wc = t01.union(t01).flatMap(l=>l.split(" ")).map(w=>(w,1))
wc foreach println
结果:
(e,1)
(e,1)
(e,1)
(e,1)
(f,1)
(f,1)
(f,1)
(f,1)
(g,1)
(g,1)
(g,1)
(g,1)
(a,1)
(a,1)
(a,1)
(a,1)
(b,1)
(b,1)
(b,1)
(b,1)
(c,1)
(c,1)
(c,1)
(c,1)
(d,1)
(d,1)
(d,1)
(d,1)
wc.groupByKey foreach println
结果:
(d,CompactBuffer(1, 1, 1, 1))
(g,CompactBuffer(1, 1, 1, 1))
(c,CompactBuffer(1, 1, 1, 1))
(b,CompactBuffer(1, 1, 1, 1))
(f,CompactBuffer(1, 1, 1, 1))
(e,CompactBuffer(1, 1, 1, 1))
(a,CompactBuffer(1, 1, 1, 1))
ok,groupByKey 之后,将同一个key的值都放到一个列表中
3、reduceByKey(func,numPartition)
这个操作的作用类似mapreduce中的reduce操作,对相同的key的值加上func的操作,比如要做wordcount的操作:
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
map(x=>(x,1)).reduceByKey(_+_, 5)
reduceByKey默认开启map端的combine,上面的groupByKey默认没有开启map端的combine操作,可以人工设置一下。
接上面的测试
[java]view plaincopy
print?在CODE上查看代码片派生到我的代码片
wc.reduceByKey(_+_) foreach println
结果:
(d,4)
(b,4)
(f,4)
(g,4)
(c,4)
(e,4)
(a,4)
4、distinct(numPartitions)
将 parent rdd 的数据去重,放到新的numPartitions,还是要通过shuffle操作,如果是kv pair 的数据 则直接进行shuffle 操作,如果只有key,那么spark先将数据转换成再进行shuffle。其实后面调用的是reduceByKey()
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
wc.distinct(1) foreach println
结果:
(g,1)
(b,1)
(f,1)
(d,1)
(a,1)
(e,1)
(c,1)
5、cogroup(otherRDD,numPartitions)
与groupByKey不同的地方,cogroup 是将多个rdd的数据聚合到一起,过程跟groupByKey 类似.
但是结果是一个包含多个arraylist 的arraylist,每一个rdd 的value放到一个arraylist,然后,将这些arraylist放到一个元素的arraylist的arraylist。
[java]view plaincopy
print?在CODE上查看代码片派生到我的代码片
val wc01 = t01.flatMap(l=>l.split(" ")).map(w=>(w,1))
val wc02 = t02.flatMap(l=>l.split(" ")).map(w=>(w,1))
wc01.cogroup(wc02,1) foreach println
结果:
(d,(CompactBuffer(1, 1),CompactBuffer(1, 1)))
(e,(CompactBuffer(1, 1),CompactBuffer(1, 1)))
(4,(CompactBuffer(),CompactBuffer(1, 1)))
(5,(CompactBuffer(),CompactBuffer(1, 1)))
(a,(CompactBuffer(1, 1),CompactBuffer(1, 1)))
(6,(CompactBuffer(),CompactBuffer(1, 1)))
(b,(CompactBuffer(1, 1),CompactBuffer(1, 1)))
(2,(CompactBuffer(),CompactBuffer(1, 1)))
(3,(CompactBuffer(),CompactBuffer(1, 1)))
(f,(CompactBuffer(1, 1),CompactBuffer(1, 1)))
(1,(CompactBuffer(),CompactBuffer(1, 1)))
(g,(CompactBuffer(1, 1),CompactBuffer()))
(c,(CompactBuffer(1, 1),CompactBuffer(1, 1)))
6、intersection(otherRDD)
这个操作值保留两个rdd中都包含的数据,首先将rdd的数据转化成,后面调用cogroup()操作。
然后, 对cogroup结果进行过滤,由前面cogroup 的结果格式介绍可知,会生成包含两个arraylist元素的arraylist,只保留结果中两个arraylist都不为空的,最后取出key,便是最终的结果。
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
wc01.intersection(wc02) foreach println
结果:
(d,1)
(e,1)
(b,1)
(f,1)
(a,1)
(c,1)
只有两个rdd共同的部分 kv 对
7、join(otherRDD, numPartitions)
将两个RDD[ K, V ] 安装sql中的join方式聚合。类似intersection,先进行cogroup操作,得到 的MappedValuesRDD。
将 Iterable[v1] 和 Iterable[v2] 做笛卡尔集,并将集合flat()化,生成FlatMappedValuesRDD。
[java]view plaincopy
print?在CODE上查看代码片派生到我的代码片
wc01.join(wc02,1) foreach println
结果:
(d,(1,1))
(d,(1,1))
(d,(1,1))
(d,(1,1))
(e,(1,1))
(e,(1,1))
(e,(1,1))
(e,(1,1))
(a,(1,1))
(a,(1,1))
(a,(1,1))
(a,(1,1))
(b,(1,1))
(b,(1,1))
(b,(1,1))
(b,(1,1))
(f,(1,1))
(f,(1,1))
(f,(1,1))
(f,(1,1))
(c,(1,1))
(c,(1,1))
(c,(1,1))
(c,(1,1))
这个jion应该对应于mysql的inner join,只包含双方共有的数据
8、sortByKey(ascending,numPartitions)
将RDD [ k, v ] 按照key进行排序,如果ascending=true表示升序,false表示降序。
先通过shuffle将数据聚合到一起,然后将聚合的数据按照key排序
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
wc01.sortByKey(true,1) foreach println
结果:
(a,1)
(a,1)
(b,1)
(b,1)
(c,1)
(c,1)
(d,1)
(d,1)
(e,1)
(e,1)
(f,1)
(f,1)
(g,1)
(g,1)
9、cartesian(otherRDD)
求两个rdd的笛卡尔集,生成的CartesianRDD中的partition个数为两个rdd的partition的个数乘积。
逻辑类似join
笛卡尔乘积,这个很简单,不过数据量大的话就不要这么做了
10、coalesce(numPartitions, shuffle = false)
合并,对一个rdd,两种方式,一种需要shuffle,一种直接将多个partitions的内容合并到一起,不需要shuffle。
这个方法的主要作用就是调整 parentRDD 的partition数量。合并因素除了考虑partition的个数外,还应该考虑locality 和 balance的问题
这个操作的逻辑比较难理解:
[java]view plaincopy
print?在CODE上查看代码片派生到我的代码片
wc01.coalesce(1) foreach println
结果:
14/10/2717:23:42 INFO rdd.HadoopRDD: Input split: hdfs://qunarcluster/user/data_spark/test01:0+14
(a,1)
(a,1)
(b,1)
(b,1)
(c,1)
(c,1)
(d,1)
(d,1)
14/10/2717:23:42 INFO rdd.HadoopRDD: Input split: hdfs://qunarcluster/user/data_spark/test01:14+14
(e,1)
(e,1)
(f,1)
(f,1)
(g,1)
(g,1)
wc01.coalesce(2) foreach println
结果:
(a,1)
(a,1)
(b,1)
(b,1)
(c,1)
(c,1)
(e,1)
(d,1)
(e,1)
(d,1)
(f,1)
(f,1)
(g,1)
(g,1)
11、repartition(numPartitions)
等价于coalesce(numPartitions, shuffle = true)
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
wc01.repartition(1) foreach println
(a,1)
(a,1)
(b,1)
(b,1)
(c,1)
(c,1)
(d,1)
(d,1)
(e,1)
(e,1)
(f,1)
(f,1)
(g,1)
(g,1)
虽然mapreduce 相当于 spark 的 map + reduceByKey, 但是 mapreduce中的reduce可以灵活的操作,加入一些自己的逻辑,所以,各有所长。
但是,spark 确实很方便
分享到:
相关推荐
**Action** 是 Spark 中的另一种操作类型,与 Transformation 不同的是,Action 会立即执行计算。下面介绍几个常用的 Action 操作。 ##### 2.1 reduce `reduce` 函数用于对 RDD 中的所有元素应用一个特定的操作,...
- **Local模式**:主要用于本地开发和测试场景。在这种模式下,所有Spark任务都在单台机器上运行,无需与其他节点通信。 - **Standalone模式**: - **概述**:Standalone模式是一种独立的部署模式,不需要依赖外部...
1. **RDD的生成**:对于每一种转换操作(transformation),都需要确定其是否会产生新的RDD,以及这个新RDD的具体类型。 2. **依赖关系的建立**:在生成新的RDD时,还需要考虑这些RDD之间的依赖关系,即它们是如何...
【Spark MLlib】是Apache Spark框架中的机器学习库,它为大数据分析提供了丰富的机器学习算法和工具,旨在简化机器学习的实现过程。机器学习是一种人工智能的分支,关注于让计算机通过学习数据或经验来改进其执行...
通过以上解析,我们可以看出 Spark 中的很多核心概念如 RDD、DAG、Task 等都在 Driver 端进行管理和调度,而具体的计算逻辑和数据处理则主要发生在 Executor 端。这种设计模式有效地实现了计算任务的分布与并行处理...
通过代码分析和性能测试,我们可以不断迭代优化,提高Spark框架在处理长DAG计算时的效率。 总的来说,对Spark框架进行定制分析是解决DAG计算流程长问题的关键。通过深入理解Spark的内部机制,结合Java编程,我们...
1. Transformation(转化):Transformation属于延迟计算,当一个RDD转换成另一个RDD时并没有立即进行转换,只是记住了数据集的逻辑操作。 2. Action(执行):Action触发Spark作业的运行,真正触发转换算子的计算。...