`
bit1129
  • 浏览: 1072861 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark四十四】RDD算子逻辑执行图第四部分

 
阅读更多

1.sortByKey

2.cartesian

 

1.sortByKey

1.示例代码

 

 

1.1 RDD依赖关系

(3) ShuffledRDD[3] at sortByKey at SparkRDDSortByKey.scala:15 []
 +-(3) ParallelCollectionRDD[0] at parallelize at SparkRDDSortByKey.scala:14 []

1.2 计算结果

part-00000:

(A,5)
(A,4)
(B,4)
(B,2)

part-00001:

(C,3)
(D,2)

part-00002:

(E,1)
(Z,3)

 

2.RDD依赖图



 

3.sortByKey源代码

 

 /**
   * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
   * `collect` or `save` on the resulting RDD will return or output an ordered list of records
   * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
   * order of the keys).
   */
  // TODO: this currently doesn't work on P other than Tuple2!
  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size)
      : RDD[(K, V)] =
  {
    val part = new RangePartitioner(numPartitions, self, ascending)
    new ShuffledRDD[K, V, V](self, part)
      .setKeyOrdering(if (ascending) ordering else ordering.reverse)
  }

3.1 sortByKey是OrderedRDDFunctions的方法,而不是PairRDDFunctions类的方法。它其中使用了RangePartitioner,因此对于同一个Reducer来说,它得到的结果是有序的,即part-00000中的数据是有序的,part-00001中的数据也是有序的,同时part-00001中的数据较part-00000要么为大(升序),要么为小(降序).

3.2 如果Reducer的个数大于1个,那么这些reducer的排序不是全局有序的?不是这么理解的,在一个节点上的Reducer任务,得到的结果是有序的,但是不同节点上的数据不是全量有序的。

3.3 sortByKey只支持K,V类型的sort,即按照Key进行排序。然后通过隐式转换转换到OrderedRDDFunctions类上。

 

2. cartesian

1. 示例代码

package spark.examples

import org.apache.spark.{SparkContext, SparkConf}

/**
 * Created by yuzhitao on 2/6/2015.
 */
object SparkRDDCartesian {
  def main(args : Array[String]) {
    val conf = new SparkConf().setAppName("SparkRDDCartesian").setMaster("local");
    val sc = new SparkContext(conf);

    //第一个参数是集合,第二个参数是分区数
    val rdd1 = sc.parallelize(List((1,2),(2,3), (3,4),(4,5),(5,6)), 3)
    val rdd2 = sc.parallelize(List((3,6),(2,8)), 2);

    val pairs = rdd1.cartesian(rdd2);

    pairs.saveAsTextFile("file:///D:/cartesian" + System.currentTimeMillis());

    println(pairs.toDebugString)
  }
}

 

 

1.1 RDD依赖关系

(6) CartesianRDD[2] at cartesian at SparkRDDCartesian.scala:18 []
 |  ParallelCollectionRDD[0] at parallelize at SparkRDDCartesian.scala:14 []
 |  ParallelCollectionRDD[1] at parallelize at SparkRDDCartesian.scala:15 []

1.2 计算结果

part-00000: ((1,2),(3,6))

part-00001:((1,2),(2,8))

part-00002:((2,3),(3,6)) ((3,4),(3,6))

part-00003: ((2,3),(2,8)) ((3,4),(2,8))

part-00004: ((4,5),(3,6)) ((5,6),(3,6))

part-00005:((4,5),(2,8)) ((5,6),(2,8))

 

2. RDD依赖图

 

 

 

 

3. cartesian源代码

 

  /**
   * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of
   * elements (a, b) where a is in `this` and b is in `other`.
   */
  def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)

 

3.1 子RDD的每个partition完全依赖于父RDD的一个partion,又完全依赖于另一个父RDDb的partition,那么这个为什么称为窄依赖???对于窄依赖,NarrowDependency的文档如下。可见,不是说父RDD的partition只能被一个子RDD的partition使用才是窄依赖。一个父RDD的partition可以被多个子RDD依赖

/**
 * :: DeveloperApi ::
 * Base class for dependencies where each partition of the child RDD depends on a small number
 * of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
 */

3.2 CartesianRDD的源代码:

 

class CartesianRDD[T: ClassTag, U: ClassTag](
    sc: SparkContext,
    var rdd1 : RDD[T],
    var rdd2 : RDD[U])
  extends RDD[Pair[T, U]](sc, Nil)
  with Serializable {

  val numPartitionsInRdd2 = rdd2.partitions.size

  override def getPartitions: Array[Partition] = {
    // create the cross product split
    val array = new Array[Partition](rdd1.partitions.size * rdd2.partitions.size)
    for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {
      val idx = s1.index * numPartitionsInRdd2 + s2.index
      array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)
    }
    array
  }

  override def getPreferredLocations(split: Partition): Seq[String] = {
    val currSplit = split.asInstanceOf[CartesianPartition]
    (rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2)).distinct
  }

  override def compute(split: Partition, context: TaskContext) = {
    val currSplit = split.asInstanceOf[CartesianPartition]
    for (x <- rdd1.iterator(currSplit.s1, context);
         y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
  }
  ///都是窄依赖
  override def getDependencies: Seq[Dependency[_]] = List(
    new NarrowDependency(rdd1) {
      def getParents(id: Int): Seq[Int] = List(id / numPartitionsInRdd2)
    },
    new NarrowDependency(rdd2) {
      def getParents(id: Int): Seq[Int] = List(id % numPartitionsInRdd2)
    }
  )

  override def clearDependencies() {
    super.clearDependencies()
    rdd1 = null
    rdd2 = null
  }
}

 

再论宽窄依赖:

依赖是针对分区来说的,所以洗牌就是对分区的数据进行重新整理,重新分配等,所以需要重组分区数据之类的操作,在理论上才是一种Shuffle

有shuffle就是宽,没shuffle就是窄

shuffle的含义是数组重组,而有些情况下,数据需要重组,也有可能不需要重组,简单把整个数据传出去
  • 第一种 1:1 的情况被称为 OneToOneDependency。
  • 第二种 N:1 的情况被称为 N:1 NarrowDependency。
  • 第三种 N:N 的情况被称为 N:N NarrowDependency。不属于前两种情况的完全依赖都属于这个类别。
  • 第四种被称为 ShuffleDependency。

 

 

 

 

 

  • 大小: 79.2 KB
  • 大小: 160.5 KB
分享到:
评论

相关推荐

    Spark1.4.1 RDD算子详解

    结合代码详细描述RDD算子的执行流程,并配上执行流程图

    Spark常用的算子以及Scala函数总结.pdf

    Transformation 算子用于对 RDD 进行变换操作,例如 map、filter 等,这些操作是延迟计算的,只有在触发 Action 算子时才真正执行。Action 算子会触发 SparkContext 提交作业并返回结果给驱动程序或写入外部存储系统...

    【SparkCore篇03】RDD行动算子1

    `first`算子返回RDD中的第一个元素。这在需要快速获取数据集的代表值时非常方便。 5. `take`算子: `take`算子返回RDD中的前n个元素组成的数组。这对于获取数据样本或者有限的输出很有帮助。 6. `takeOrdered`...

    【SparkCore篇02】RDD转换算子1

    SparkCore篇02主要介绍了RDD的一些基础转换算子,这些算子是Spark处理数据的核心工具。以下是关于这些算子的详细说明: 1. **map()**:map算子用于对RDD(Resilient Distributed Dataset)中的每个元素进行操作。它...

    Spark思维导图之Spark RDD.png

    Spark思维导图之Spark RDD.png

    25个经典Spark算子的JAVA实现

    根据给定文件的信息,本文将详细介绍25个经典Spark算子的Java实现,并结合详细的注释及JUnit测试结果,帮助读者更好地理解Spark算子的工作原理及其应用方式。 ### Spark算子简介 在Apache Spark框架中,算子是用于...

    spark基本算子操作

    - **示例**:若`rdd1 = sc.parallelize([1, 2, 3])`,`rdd2 = sc.parallelize([3, 4, 5])`,执行`rdd1.union(rdd2)`后,得到的新RDD为`[1, 2, 3, 3, 4, 5]`。 8. **`intersection(otherDataset)`** - **功能**:...

    Spark算子.pdf

    Spark对于大数据行业的实时处理数据来说,有着举足轻重的位置,特此学习整理了RDD 算子的各个含义,希望各位读者能够喜欢。谢谢

    sparkRDD函数大全

    spark rdd函数大全。spark rdd操作为core操作,虽然后续版本主要以dataset来操作,但是rdd操作也是不可忽略的一部分。

    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql

    在大数据处理框架Apache Spark中,RDD(弹性分布式数据集)是基础的数据处理抽象,它提供了容错、分布式数据操作的能力。而DataFrame和Dataset是Spark SQL中更高级的数据抽象,提供了更多的优化和易于使用的特点。...

    Spark算子实例maven版

    Spark算子实例maven版是基于Apache Spark框架的开发示例,主要针对的是使用Maven构建项目的开发者。Apache Spark是一个用于大规模数据处理的快速、通用且可扩展的开源框架,它提供了一种分布式、内存计算的编程模型...

    spark算子基础讲义1

    四、Spark 算子实践操作 下面是一个使用 Spark 算子的实践操作: 1. 首先,我们需要将数据放入 Hadoop 集群中。 ``` hdfs dfs -mkdir /spark hdfs dfs -put word.txt /spark/1.log hdfs dfs -put word.txt /spark/...

    spark算子.docx

    Spark 算子详解 Spark 是一个基于内存的分布式计算框架,提供了多种算子来实现数据的处理和转换。本文将详细介绍 Spark 中常用的 Transformations 算子,包括 map、mapPartitions、mapPartitionsWithIndex、flatMap...

    Spark算子的详细使用方法

    Spark 算子是 Spark 框架中最基本的组成部分,它们是 Spark 程序的主要构建块。Spark 算子可以分为两类:Transformation 变换/转换算子和 Action 行动算子。 Transformation 变换/转换算子并不触发提交作业,完成...

    spark RDD操作详解

    - **RDD依赖**:RDD之间的依赖关系是Spark优化执行计划的重要依据。 #### 二、RDD的创建 RDD可以通过多种方式创建: - **并行化集合**:可以直接从驱动程序上的集合创建RDD。 - **外部数据源**:从文件系统、...

    大数据实验报告Windows环境下安装Spark及RDD编程和Spark编程实现wordcount.doc

    大数据实验报告 Windows 环境下安装 Spark 及 RDD 编程和 Spark 编程实现 wordcount 本实验报告主要介绍了在 Windows 环境下安装 Spark 及 RDD 编程和 Spark 编程实现 wordcount 的步骤和过程。实验中首先安装了 ...

    Spark学习--RDD编码

    当Spark对数据操作和转换时,会自动将RDD中的数据分发到集群,并将操作并行化执行。 Spark中的RDD是一个不可变的分布式对象集合。每个RDD都倍分为多个分区,这些分区运行在集群中的不同节点。RDD可以包含Python、...

    【SparkCore篇01】RDD编程入门1

    Spark中的RDD(Resilient Distributed Dataset)是核心的数据抽象,它是弹性分布式数据集,具备弹性、分区、只读和依赖这四个主要特性。弹性体现在RDD能够自动在内存和磁盘之间切换存储,同时具备容错机制,数据丢失...

    Java Spark算子:sample

    import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.util.Arrays; import java.util.List; /** * sample(withReplacement,fraction,seed) 算子 * 对RDD...

    PySpark_Day04:RDD Operations & Shared Variables.pdf

    PySpark_Day04:RDD Operations & Shared Variables 主要讲解了 RDD 算子、RDD 共享变量、综合实战案例及 Spark 内核调度。 知识点1:RDD 概念 RDD(Resilient Distributed Dataset)是 Spark Core 中的核心概念。...

Global site tag (gtag.js) - Google Analytics