测试例子使用的数据:
test01:
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
a a
b b
c c
d d
e e
f f
g g
test02:
[java]view plaincopy
print?在CODE上查看代码片派生到我的代码片
11
22
33
44
55
66
a a
b b
c c
d d
e e
f f
1、union(otherRDD)
union() 将两个rdd简单结合在一起,与mysql中的union 操作类似只不过它是操作的rdd,它不会改变partition中的数据
spark sql 测试:
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
./spark-shell
sc
val t01 = sc.textFile("hdfs://user/data_spark/test01")
val t02 = sc.textFile("hdfs://user/data_spark/test02")
t01.union(t01) foreach println
结果:
a a
e e
b b
a a
f f
b b
c c
c c
g g
d d
d d
e e
f f
g g
多次测试union,结果顺序都是随机的,所以,union只是简单的将两个rdd的数据拼接到一起
2、groupByKey(numPartitions)
普通的RDD 类是没有这个方法的,org.apache.spark.rdd.PairRDDFunctions 这个pairRdd提供这个方法;
顾名思义,这个方法是将相同的key的records聚合在一起,类似mysql中的groupby操作,通过ShuffledRDD将每个partition中fetch过来,shuffle机制默认用的是hashShuffle,spark1.1版本引入sorted shuffle,速度更快。shuffle操作后面接着mapPartition()操作,生成MapPartitionRDD。这就是groupbykey的结果了。
同一个key的值聚合以后,将所有的value放到一个arraylist,新的arraylist 作为value
[java]view plaincopy
print?在CODE上查看代码片派生到我的代码片
val wc = t01.union(t01).flatMap(l=>l.split(" ")).map(w=>(w,1))
wc foreach println
结果:
(e,1)
(e,1)
(e,1)
(e,1)
(f,1)
(f,1)
(f,1)
(f,1)
(g,1)
(g,1)
(g,1)
(g,1)
(a,1)
(a,1)
(a,1)
(a,1)
(b,1)
(b,1)
(b,1)
(b,1)
(c,1)
(c,1)
(c,1)
(c,1)
(d,1)
(d,1)
(d,1)
(d,1)
wc.groupByKey foreach println
结果:
(d,CompactBuffer(1, 1, 1, 1))
(g,CompactBuffer(1, 1, 1, 1))
(c,CompactBuffer(1, 1, 1, 1))
(b,CompactBuffer(1, 1, 1, 1))
(f,CompactBuffer(1, 1, 1, 1))
(e,CompactBuffer(1, 1, 1, 1))
(a,CompactBuffer(1, 1, 1, 1))
ok,groupByKey 之后,将同一个key的值都放到一个列表中
3、reduceByKey(func,numPartition)
这个操作的作用类似mapreduce中的reduce操作,对相同的key的值加上func的操作,比如要做wordcount的操作:
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
map(x=>(x,1)).reduceByKey(_+_, 5)
reduceByKey默认开启map端的combine,上面的groupByKey默认没有开启map端的combine操作,可以人工设置一下。
接上面的测试
[java]view plaincopy
print?在CODE上查看代码片派生到我的代码片
wc.reduceByKey(_+_) foreach println
结果:
(d,4)
(b,4)
(f,4)
(g,4)
(c,4)
(e,4)
(a,4)
4、distinct(numPartitions)
将 parent rdd 的数据去重,放到新的numPartitions,还是要通过shuffle操作,如果是kv pair 的数据 则直接进行shuffle 操作,如果只有key,那么spark先将数据转换成再进行shuffle。其实后面调用的是reduceByKey()
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
wc.distinct(1) foreach println
结果:
(g,1)
(b,1)
(f,1)
(d,1)
(a,1)
(e,1)
(c,1)
5、cogroup(otherRDD,numPartitions)
与groupByKey不同的地方,cogroup 是将多个rdd的数据聚合到一起,过程跟groupByKey 类似.
但是结果是一个包含多个arraylist 的arraylist,每一个rdd 的value放到一个arraylist,然后,将这些arraylist放到一个元素的arraylist的arraylist。
[java]view plaincopy
print?在CODE上查看代码片派生到我的代码片
val wc01 = t01.flatMap(l=>l.split(" ")).map(w=>(w,1))
val wc02 = t02.flatMap(l=>l.split(" ")).map(w=>(w,1))
wc01.cogroup(wc02,1) foreach println
结果:
(d,(CompactBuffer(1, 1),CompactBuffer(1, 1)))
(e,(CompactBuffer(1, 1),CompactBuffer(1, 1)))
(4,(CompactBuffer(),CompactBuffer(1, 1)))
(5,(CompactBuffer(),CompactBuffer(1, 1)))
(a,(CompactBuffer(1, 1),CompactBuffer(1, 1)))
(6,(CompactBuffer(),CompactBuffer(1, 1)))
(b,(CompactBuffer(1, 1),CompactBuffer(1, 1)))
(2,(CompactBuffer(),CompactBuffer(1, 1)))
(3,(CompactBuffer(),CompactBuffer(1, 1)))
(f,(CompactBuffer(1, 1),CompactBuffer(1, 1)))
(1,(CompactBuffer(),CompactBuffer(1, 1)))
(g,(CompactBuffer(1, 1),CompactBuffer()))
(c,(CompactBuffer(1, 1),CompactBuffer(1, 1)))
6、intersection(otherRDD)
这个操作值保留两个rdd中都包含的数据,首先将rdd的数据转化成,后面调用cogroup()操作。
然后, 对cogroup结果进行过滤,由前面cogroup 的结果格式介绍可知,会生成包含两个arraylist元素的arraylist,只保留结果中两个arraylist都不为空的,最后取出key,便是最终的结果。
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
wc01.intersection(wc02) foreach println
结果:
(d,1)
(e,1)
(b,1)
(f,1)
(a,1)
(c,1)
只有两个rdd共同的部分 kv 对
7、join(otherRDD, numPartitions)
将两个RDD[ K, V ] 安装sql中的join方式聚合。类似intersection,先进行cogroup操作,得到 的MappedValuesRDD。
将 Iterable[v1] 和 Iterable[v2] 做笛卡尔集,并将集合flat()化,生成FlatMappedValuesRDD。
[java]view plaincopy
print?在CODE上查看代码片派生到我的代码片
wc01.join(wc02,1) foreach println
结果:
(d,(1,1))
(d,(1,1))
(d,(1,1))
(d,(1,1))
(e,(1,1))
(e,(1,1))
(e,(1,1))
(e,(1,1))
(a,(1,1))
(a,(1,1))
(a,(1,1))
(a,(1,1))
(b,(1,1))
(b,(1,1))
(b,(1,1))
(b,(1,1))
(f,(1,1))
(f,(1,1))
(f,(1,1))
(f,(1,1))
(c,(1,1))
(c,(1,1))
(c,(1,1))
(c,(1,1))
这个jion应该对应于mysql的inner join,只包含双方共有的数据
8、sortByKey(ascending,numPartitions)
将RDD [ k, v ] 按照key进行排序,如果ascending=true表示升序,false表示降序。
先通过shuffle将数据聚合到一起,然后将聚合的数据按照key排序
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
wc01.sortByKey(true,1) foreach println
结果:
(a,1)
(a,1)
(b,1)
(b,1)
(c,1)
(c,1)
(d,1)
(d,1)
(e,1)
(e,1)
(f,1)
(f,1)
(g,1)
(g,1)
9、cartesian(otherRDD)
求两个rdd的笛卡尔集,生成的CartesianRDD中的partition个数为两个rdd的partition的个数乘积。
逻辑类似join
笛卡尔乘积,这个很简单,不过数据量大的话就不要这么做了
10、coalesce(numPartitions, shuffle = false)
合并,对一个rdd,两种方式,一种需要shuffle,一种直接将多个partitions的内容合并到一起,不需要shuffle。
这个方法的主要作用就是调整 parentRDD 的partition数量。合并因素除了考虑partition的个数外,还应该考虑locality 和 balance的问题
这个操作的逻辑比较难理解:
[java]view plaincopy
print?在CODE上查看代码片派生到我的代码片
wc01.coalesce(1) foreach println
结果:
14/10/2717:23:42 INFO rdd.HadoopRDD: Input split: hdfs://qunarcluster/user/data_spark/test01:0+14
(a,1)
(a,1)
(b,1)
(b,1)
(c,1)
(c,1)
(d,1)
(d,1)
14/10/2717:23:42 INFO rdd.HadoopRDD: Input split: hdfs://qunarcluster/user/data_spark/test01:14+14
(e,1)
(e,1)
(f,1)
(f,1)
(g,1)
(g,1)
wc01.coalesce(2) foreach println
结果:
(a,1)
(a,1)
(b,1)
(b,1)
(c,1)
(c,1)
(e,1)
(d,1)
(e,1)
(d,1)
(f,1)
(f,1)
(g,1)
(g,1)
11、repartition(numPartitions)
等价于coalesce(numPartitions, shuffle = true)
[java]view plaincopyprint?在CODE上查看代码片派生到我的代码片
wc01.repartition(1) foreach println
(a,1)
(a,1)
(b,1)
(b,1)
(c,1)
(c,1)
(d,1)
(d,1)
(e,1)
(e,1)
(f,1)
(f,1)
(g,1)
(g,1)
虽然mapreduce 相当于 spark 的 map + reduceByKey, 但是 mapreduce中的reduce可以灵活的操作,加入一些自己的逻辑,所以,各有所长。
但是,spark 确实很方便
分享到:
相关推荐
**Action** 是 Spark 中的另一种操作类型,与 Transformation 不同的是,Action 会立即执行计算。下面介绍几个常用的 Action 操作。 ##### 2.1 reduce `reduce` 函数用于对 RDD 中的所有元素应用一个特定的操作,...
1. **Spark Shell**:交互式的命令行工具,可用于测试和学习Spark。 2. **Spark Job**:一系列操作的集合,由SparkContext提交执行。 3. **Spark Application**:一个完整的Spark程序,包括main方法和Job。 4. **...
Spark作为大数据处理领域的明星框架,因其高效、灵活和易用的特点,被广泛应用于大规模数据处理、实时计算、机器学习和图形处理等多个领域。本电子书合集深入剖析了Spark的核心概念、架构设计以及实际应用,旨在帮助...
Spark SQL用于SQL查询和数据处理,Dataset提供类型安全的API,Spark Structured Streaming处理实时流数据,Spark Streaming处理连续数据流,MLlib支持机器学习,GraphX处理图计算,而SparkR则提供了R语言接口。...
在Spark中,数据通常以RDD(弹性分布式数据集)的形式存在,并通过两种类型的算子进行处理:Transformation(转换)算子和Action(行动)算子。 **Transformation算子**:这些算子用于创建一个新的RDD。它们的操作...
- **Transformation和Action**:Transformation定义了数据处理逻辑,不触发计算;Action则触发计算,并返回结果到Driver端。 3. **Java API使用** - **创建RDD**:通过SparkContext的 parallelize() 方法将Java...
Spark设计的主要目标是提供一个高效、易用且通用的大数据处理平台,与Hadoop MapReduce相比,Spark在处理迭代计算和交互式数据挖掘方面表现更优。SparkCore通过弹性分布式数据集(RDD)这一核心概念实现了这一点。 ...
Spark 是一种高性能、易用且灵活的大数据处理框架,尤其适合迭代计算和交互式查询。通过内存计算和高效的容错机制,Spark 相比传统 MapReduce 框架如 Hadoop 显示出显著的性能优势。此外,Spark 支持多种计算模型和...
Kettle是一款开源的数据集成工具,它允许用户通过图形化界面设计数据处理流程,而Apache Spark则是一个用于大数据处理的快速、通用和可扩展的计算框架。将两者结合使用,可以在Kettle中构建和调度Spark作业,充分...
总的来说,Spark的RDD模型提供了一种高效、灵活的分布式内存计算方式,尤其适合于迭代计算和需要频繁数据重用的应用。通过RDD的转换和动作,用户可以轻松构建复杂的并行计算流程,同时享受到内存计算的高速和容错...
Spark 提供了 Spark Shell 作为交互式环境,方便开发者测试和调试代码。此外,Spark 支持 Scala、Java、Python 和 R 四种编程语言,让开发人员可以根据自己的偏好选择合适的工具。 Spark 还引入了动态资源调度,...
Driver负责任务调度和计算逻辑,Executor在工作节点上运行任务,Cluster Manager协调资源分配,而RDD是Spark的基本数据抽象,是不可变的、分区的记录集合。 3. **RDD操作**:RDD支持两种基本操作——转换...
RDD 支持两种类型的操作:转换(Transformation)和行动(Action)。转换会返回一个新的 RDD,而行动则触发实际的计算,并返回结果。 #### 二、Spark 生态有哪些? Spark 不仅仅是一个单一的框架,它还拥有一个...
RDD 支持转换(Transformation)和动作(Action)两种操作,转换创建新的 RDD,动作触发实际计算。 2. **Spark 核心组件**:Spark 包含多个组件,如 Spark Core、Spark SQL、Spark Streaming、MLlib 和 GraphX。...
RDD提供了两种操作类型:转换(Transformation)和动作(Action)。转换创建新的RDD,而动作则触发计算并可能返回结果到驱动程序。 2. **Spark架构** Spark采用了主-从架构,由Driver Program、SparkContext、...
RDDs支持两种主要操作:转换(Transformation)和动作(Action)。转换创建新的RDD,而动作触发实际的计算并可能返回结果到驱动程序或存储数据。 **2. Spark组件** - **Spark Core**:Spark的基础架构,提供了任务...
Spark Scala API 是一个用于大数据处理的强大工具,它结合了Apache Spark的高性能计算框架与Scala编程语言的简洁性和表达力。这个zip压缩包很可能是包含了Spark的Scala开发接口及相关示例,便于开发者在Scala环境中...
RDDs支持多种操作,如转换(Transformation)和动作(Action),转换不立即执行,而是生成一个新的RDD,而动作则触发实际的计算。 文档中会详细介绍Spark的架构,包括其主节点(Driver)如何协调工作节点(Executor...