`
bit1129
  • 浏览: 1072924 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark四十三】RDD算子逻辑执行图第三部分

 
阅读更多

1.interSection

2.join

 

1.interSection

1.示例代码

 

package spark.examples

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._

object SparkRDDIntersection {

  def main(args : Array[String]) {
    val conf = new SparkConf().setAppName("SparkRDDDistinct").setMaster("local");
    val sc = new SparkContext(conf);
    val rdd1 = sc.parallelize(List(1,8,2,1,4,2,7,6,2,3,3,1), 3)
    val rdd2 = sc.parallelize(List(1,8,7,9,6,2,1), 2)
    val pairs = rdd1.intersection(rdd2);

    pairs.saveAsTextFile("file:///D:/intersection" + System.currentTimeMillis());

    println(pairs.toDebugString)
  }

}

 

1.1 RDD的依赖关系:

 

(3) MappedRDD[7] at intersection at SparkRDDIntersection.scala:13 []
 |  FilteredRDD[6] at intersection at SparkRDDIntersection.scala:13 []
 |  MappedValuesRDD[5] at intersection at SparkRDDIntersection.scala:13 []
 |  CoGroupedRDD[4] at intersection at SparkRDDIntersection.scala:13 []
 +-(3) MappedRDD[2] at intersection at SparkRDDIntersection.scala:13 []
 |  |  ParallelCollectionRDD[0] at parallelize at SparkRDDIntersection.scala:11 []
 +-(2) MappedRDD[3] at intersection at SparkRDDIntersection.scala:13 []
    |  ParallelCollectionRDD[1] at parallelize at SparkRDDIntersection.scala:12 []

 

1.2 运行结果:

part-000000: 6

part-000001: 1 7

part-000002: 8 2

 

 

 

 

2.RDD依赖图


 

3.intersection的源代码

  /**
   * Return the intersection of this RDD and another one. The output will not contain any duplicate
   * elements, even if the input RDDs did.
   *
   * Note that this method performs a shuffle internally.
   */
  def intersection(other: RDD[T]): RDD[T] = {
    this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
        .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
        .keys
  }
 

 

3.1 RDD的取交集算子是使用cogroup,首先将Key相同的Value聚合到一个数组中,然后进行过滤

3.2 即使RDD内部有重复的元素,也会过滤掉

 

2.join

1. 示例源代码:

package spark.examples

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._

object SparkRDDJoin {

  def main(args : Array[String]) {
    val conf = new SparkConf().setAppName("SparkRDDJoin").setMaster("local");
    val sc = new SparkContext(conf);

    //第一个参数是集合,第二个参数是分区数
    val rdd1 = sc.parallelize(List((1,2),(2,3), (3,4),(4,5),(5,6)), 3)
    val rdd2 = sc.parallelize(List((3,6),(2,8)), 2);

     //join操作的RDD的元素类型必须是K/V类型
    val pairs = rdd1.join(rdd2);

    pairs.saveAsTextFile("file:///D:/join" + System.currentTimeMillis());

    println(pairs.toDebugString)
  }

}

 

1.1 RDD依赖图

(3) FlatMappedValuesRDD[4] at join at SparkRDDJoin.scala:17 []
 |  MappedValuesRDD[3] at join at SparkRDDJoin.scala:17 []
 |  CoGroupedRDD[2] at join at SparkRDDJoin.scala:17 []
 +-(3) ParallelCollectionRDD[0] at parallelize at SparkRDDJoin.scala:13 []
 +-(2) ParallelCollectionRDD[1] at parallelize at SparkRDDJoin.scala:14 []

1.2 计算结果

part-00000: (3,(4,6))

part-00001:空

part-00002:(2,(3,8))

 

2. RDD依赖图

 


 3.join的源代码

  /**
   * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
   * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
   * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
   */
  def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
    this.cogroup(other, partitioner).flatMapValues( pair =>
      for (v <- pair._1; w <- pair._2) yield (v, w)
    )
  }

 

1. 从源代码中可以看到,图中所描绘的过程是正确的,对于一个给定的Key,假如RDD1中有m个(K,V),RDD2中有n个(K,V‘),那么结果中将由m*n个(K,(V,V'))

2.

 

 

 

 

 

  • 大小: 268.5 KB
  • 大小: 246.6 KB
  • 大小: 170.4 KB
分享到:
评论

相关推荐

    Spark1.4.1 RDD算子详解

    结合代码详细描述RDD算子的执行流程,并配上执行流程图

    Spark常用的算子以及Scala函数总结.pdf

    Transformation 算子用于对 RDD 进行变换操作,例如 map、filter 等,这些操作是延迟计算的,只有在触发 Action 算子时才真正执行。Action 算子会触发 SparkContext 提交作业并返回结果给驱动程序或写入外部存储系统...

    25个经典Spark算子的JAVA实现

    根据给定文件的信息,本文将详细介绍25个经典Spark算子的Java实现,并结合详细的注释及JUnit测试结果,帮助读者更好地理解Spark算子的工作原理及其应用方式。 ### Spark算子简介 在Apache Spark框架中,算子是用于...

    【SparkCore篇03】RDD行动算子1

    `first`算子返回RDD中的第一个元素。这在需要快速获取数据集的代表值时非常方便。 5. `take`算子: `take`算子返回RDD中的前n个元素组成的数组。这对于获取数据样本或者有限的输出很有帮助。 6. `takeOrdered`...

    spark基本算子操作

    - **示例**:若`rdd1 = sc.parallelize([1, 2, 3])`,`rdd2 = sc.parallelize([3, 4, 5])`,执行`rdd1.union(rdd2)`后,得到的新RDD为`[1, 2, 3, 3, 4, 5]`。 8. **`intersection(otherDataset)`** - **功能**:...

    【SparkCore篇02】RDD转换算子1

    SparkCore篇02主要介绍了RDD的一些基础转换算子,这些算子是Spark处理数据的核心工具。以下是关于这些算子的详细说明: 1. **map()**:map算子用于对RDD(Resilient Distributed Dataset)中的每个元素进行操作。它...

    sparkRDD函数大全

    spark rdd函数大全。spark rdd操作为core操作,虽然后续版本主要以dataset来操作,但是rdd操作也是不可忽略的一部分。

    Spark算子.pdf

    Spark对于大数据行业的实时处理数据来说,有着举足轻重的位置,特此学习整理了RDD 算子的各个含义,希望各位读者能够喜欢。谢谢

    spark算子基础讲义1

    三、 Spark 算子分类 Spark 算子可以分为以下几类: 1. 转换算子(Transformation):将一个RDD 转换为另一个RDD,例如 map、filter、flatmap 等。 2. 动作算子(Action):将RDD 转换为非RDD 的结果,例如 count...

    Spark学习--RDD编码

    3. 告诉Spark对需要被重用的中间结果RDD执行persist()操作; 4. 使用诸如first()等这样的行动操作来触发一次并行计算,Spark会对计算进行优化后再执行。 一. 创建RDD Spark提供了两种创建RDD方式: 1. 读取外部...

    大数据实验报告Windows环境下安装Spark及RDD编程和Spark编程实现wordcount.doc

    大数据实验报告 Windows 环境下安装 Spark 及 RDD 编程和 Spark 编程实现 wordcount 本实验报告主要介绍了在 Windows 环境下安装 Spark 及 RDD 编程和 Spark 编程实现 wordcount 的步骤和过程。实验中首先安装了 ...

    Spark算子实例maven版

    在使用Maven作为构建工具时,我们可以方便地管理项目依赖,确保所有必要的Spark库和其他第三方库都能正确导入。Maven的pom.xml文件中,我们需要指定Spark相关的依赖,例如`org.apache.spark:spark-core_2.11`和`org....

    Spark思维导图之Spark RDD.png

    Spark思维导图之Spark RDD.png

    Spark算子的详细使用方法

    Spark 算子是 Spark 框架中最基本的组成部分,它们是 Spark 程序的主要构建块。Spark 算子可以分为两类:Transformation 变换/转换算子和 Action 行动算子。 Transformation 变换/转换算子并不触发提交作业,完成...

    spark RDD操作详解

    ### Spark RDD操作详解 #### 一、RDD概念与特性 **RDD(弹性分布式数据集)**是Apache Spark的核心抽象,代表一种不可变的、可分区的、能够进行并行操作的数据集合。它提供了丰富的API来支持高效的大规模数据处理...

    Spark-Transformation和Action算子.md

    ### Spark Transformation和Action算子详解 #### 一、Transformation **Transformation** 在 Spark 中是指对 RDD(弹性分布式数据集)进行的各种转换操作。这些操作并不会立即执行,而是延迟执行,直到遇到 Action...

    spark算子.docx

    Spark 算子详解 Spark 是一个基于内存的分布式计算框架,提供了多种算子来实现数据的处理和转换。本文将详细介绍 Spark 中常用的 Transformations 算子,包括 map、mapPartitions、mapPartitionsWithIndex、flatMap...

    10 实战解析spark运行原理和RDD解密

    "Spark 运行原理和 RDD 解密" Spark 是一个分布式计算框架,基于内存和磁盘,特别适合于迭代计算。Spark 的运行原理可以分为两大部分:Driver 端和 Executor 端。Driver 端负责提交任务,Executor 端负责执行任务...

    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql

    在大数据处理框架Apache Spark中,RDD(弹性分布式数据集)是基础的数据处理抽象,它提供了容错、分布式数据操作的能力。而DataFrame和Dataset是Spark SQL中更高级的数据抽象,提供了更多的优化和易于使用的特点。...

    spark RDD 论文 中文版

    ### Spark RDD论文中文版知识点详述 #### 一、引言 Spark RDD(Resilient Distributed Datasets)作为Apache Spark的核心组件之一,在大数据处理领域扮演着至关重要的角色。本论文旨在探讨Spark RDD的设计理念及其在...

Global site tag (gtag.js) - Google Analytics