1.interSection
2.join
1.interSection
1.示例代码
package spark.examples
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._
object SparkRDDIntersection {
def main(args : Array[String]) {
val conf = new SparkConf().setAppName("SparkRDDDistinct").setMaster("local");
val sc = new SparkContext(conf);
val rdd1 = sc.parallelize(List(1,8,2,1,4,2,7,6,2,3,3,1), 3)
val rdd2 = sc.parallelize(List(1,8,7,9,6,2,1), 2)
val pairs = rdd1.intersection(rdd2);
pairs.saveAsTextFile("file:///D:/intersection" + System.currentTimeMillis());
println(pairs.toDebugString)
}
}
1.1 RDD的依赖关系:
(3) MappedRDD[7] at intersection at SparkRDDIntersection.scala:13 []
| FilteredRDD[6] at intersection at SparkRDDIntersection.scala:13 []
| MappedValuesRDD[5] at intersection at SparkRDDIntersection.scala:13 []
| CoGroupedRDD[4] at intersection at SparkRDDIntersection.scala:13 []
+-(3) MappedRDD[2] at intersection at SparkRDDIntersection.scala:13 []
| | ParallelCollectionRDD[0] at parallelize at SparkRDDIntersection.scala:11 []
+-(2) MappedRDD[3] at intersection at SparkRDDIntersection.scala:13 []
| ParallelCollectionRDD[1] at parallelize at SparkRDDIntersection.scala:12 []
1.2 运行结果:
part-000000: 6
part-000001: 1 7
part-000002: 8 2
2.RDD依赖图

3.intersection的源代码
/**
* Return the intersection of this RDD and another one. The output will not contain any duplicate
* elements, even if the input RDDs did.
*
* Note that this method performs a shuffle internally.
*/
def intersection(other: RDD[T]): RDD[T] = {
this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
.filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
.keys
}
3.1 RDD的取交集算子是使用cogroup,首先将Key相同的Value聚合到一个数组中,然后进行过滤
3.2 即使RDD内部有重复的元素,也会过滤掉
2.join
1. 示例源代码:
package spark.examples
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._
object SparkRDDJoin {
def main(args : Array[String]) {
val conf = new SparkConf().setAppName("SparkRDDJoin").setMaster("local");
val sc = new SparkContext(conf);
//第一个参数是集合,第二个参数是分区数
val rdd1 = sc.parallelize(List((1,2),(2,3), (3,4),(4,5),(5,6)), 3)
val rdd2 = sc.parallelize(List((3,6),(2,8)), 2);
//join操作的RDD的元素类型必须是K/V类型
val pairs = rdd1.join(rdd2);
pairs.saveAsTextFile("file:///D:/join" + System.currentTimeMillis());
println(pairs.toDebugString)
}
}
1.1 RDD依赖图
(3) FlatMappedValuesRDD[4] at join at SparkRDDJoin.scala:17 []
| MappedValuesRDD[3] at join at SparkRDDJoin.scala:17 []
| CoGroupedRDD[2] at join at SparkRDDJoin.scala:17 []
+-(3) ParallelCollectionRDD[0] at parallelize at SparkRDDJoin.scala:13 []
+-(2) ParallelCollectionRDD[1] at parallelize at SparkRDDJoin.scala:14 []
1.2 计算结果
part-00000: (3,(4,6))
part-00001:空
part-00002:(2,(3,8))
2. RDD依赖图


3.join的源代码
/**
* Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
*/
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1; w <- pair._2) yield (v, w)
)
}
1. 从源代码中可以看到,图中所描绘的过程是正确的,对于一个给定的Key,假如RDD1中有m个(K,V),RDD2中有n个(K,V‘),那么结果中将由m*n个(K,(V,V'))
2.

- 大小: 268.5 KB

- 大小: 246.6 KB

- 大小: 170.4 KB
分享到:
相关推荐
结合代码详细描述RDD算子的执行流程,并配上执行流程图
Transformation 算子用于对 RDD 进行变换操作,例如 map、filter 等,这些操作是延迟计算的,只有在触发 Action 算子时才真正执行。Action 算子会触发 SparkContext 提交作业并返回结果给驱动程序或写入外部存储系统...
根据给定文件的信息,本文将详细介绍25个经典Spark算子的Java实现,并结合详细的注释及JUnit测试结果,帮助读者更好地理解Spark算子的工作原理及其应用方式。 ### Spark算子简介 在Apache Spark框架中,算子是用于...
`first`算子返回RDD中的第一个元素。这在需要快速获取数据集的代表值时非常方便。 5. `take`算子: `take`算子返回RDD中的前n个元素组成的数组。这对于获取数据样本或者有限的输出很有帮助。 6. `takeOrdered`...
- **示例**:若`rdd1 = sc.parallelize([1, 2, 3])`,`rdd2 = sc.parallelize([3, 4, 5])`,执行`rdd1.union(rdd2)`后,得到的新RDD为`[1, 2, 3, 3, 4, 5]`。 8. **`intersection(otherDataset)`** - **功能**:...
SparkCore篇02主要介绍了RDD的一些基础转换算子,这些算子是Spark处理数据的核心工具。以下是关于这些算子的详细说明: 1. **map()**:map算子用于对RDD(Resilient Distributed Dataset)中的每个元素进行操作。它...
spark rdd函数大全。spark rdd操作为core操作,虽然后续版本主要以dataset来操作,但是rdd操作也是不可忽略的一部分。
Spark对于大数据行业的实时处理数据来说,有着举足轻重的位置,特此学习整理了RDD 算子的各个含义,希望各位读者能够喜欢。谢谢
三、 Spark 算子分类 Spark 算子可以分为以下几类: 1. 转换算子(Transformation):将一个RDD 转换为另一个RDD,例如 map、filter、flatmap 等。 2. 动作算子(Action):将RDD 转换为非RDD 的结果,例如 count...
3. 告诉Spark对需要被重用的中间结果RDD执行persist()操作; 4. 使用诸如first()等这样的行动操作来触发一次并行计算,Spark会对计算进行优化后再执行。 一. 创建RDD Spark提供了两种创建RDD方式: 1. 读取外部...
大数据实验报告 Windows 环境下安装 Spark 及 RDD 编程和 Spark 编程实现 wordcount 本实验报告主要介绍了在 Windows 环境下安装 Spark 及 RDD 编程和 Spark 编程实现 wordcount 的步骤和过程。实验中首先安装了 ...
在使用Maven作为构建工具时,我们可以方便地管理项目依赖,确保所有必要的Spark库和其他第三方库都能正确导入。Maven的pom.xml文件中,我们需要指定Spark相关的依赖,例如`org.apache.spark:spark-core_2.11`和`org....
Spark思维导图之Spark RDD.png
Spark 算子是 Spark 框架中最基本的组成部分,它们是 Spark 程序的主要构建块。Spark 算子可以分为两类:Transformation 变换/转换算子和 Action 行动算子。 Transformation 变换/转换算子并不触发提交作业,完成...
### Spark RDD操作详解 #### 一、RDD概念与特性 **RDD(弹性分布式数据集)**是Apache Spark的核心抽象,代表一种不可变的、可分区的、能够进行并行操作的数据集合。它提供了丰富的API来支持高效的大规模数据处理...
### Spark Transformation和Action算子详解 #### 一、Transformation **Transformation** 在 Spark 中是指对 RDD(弹性分布式数据集)进行的各种转换操作。这些操作并不会立即执行,而是延迟执行,直到遇到 Action...
Spark 算子详解 Spark 是一个基于内存的分布式计算框架,提供了多种算子来实现数据的处理和转换。本文将详细介绍 Spark 中常用的 Transformations 算子,包括 map、mapPartitions、mapPartitionsWithIndex、flatMap...
"Spark 运行原理和 RDD 解密" Spark 是一个分布式计算框架,基于内存和磁盘,特别适合于迭代计算。Spark 的运行原理可以分为两大部分:Driver 端和 Executor 端。Driver 端负责提交任务,Executor 端负责执行任务...
在大数据处理框架Apache Spark中,RDD(弹性分布式数据集)是基础的数据处理抽象,它提供了容错、分布式数据操作的能力。而DataFrame和Dataset是Spark SQL中更高级的数据抽象,提供了更多的优化和易于使用的特点。...
### Spark RDD论文中文版知识点详述 #### 一、引言 Spark RDD(Resilient Distributed Datasets)作为Apache Spark的核心组件之一,在大数据处理领域扮演着至关重要的角色。本论文旨在探讨Spark RDD的设计理念及其在...