`
bit1129
  • 浏览: 1070561 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark四十五】RDD算子逻辑执行图第五部分

 
阅读更多

1. coalesce(联合,合并,接合,发音cola-les)

 2. repartition

 

 

 

1.coalesce

1. 示例代码

package spark.examples

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._

object SparkRDDCoalesce {

  def main(args : Array[String]) {
    val conf = new SparkConf().setAppName("SparkRDDDistinct").setMaster("local");
    val sc = new SparkContext(conf);
    val rdd1 = sc.parallelize(List(1,8,2,1,4,2,7,6,2,3,1,19,21, 66,74,22,21,72,78,102), 8)
    val pairs = rdd1.coalesce(3, true);
    pairs.saveAsTextFile("file:///D:/coalesce-0-" + System.currentTimeMillis());
    val pairs2 = rdd1.coalesce(3, false);
    pairs2.saveAsTextFile("file:///D:/coalesce-1-" + System.currentTimeMillis());


    println(pairs.toDebugString)
  }

}

 

 

1.1 依赖关系

(3) MappedRDD[4] at coalesce at SparkRDDCoalesce.scala:12 []
 |  CoalescedRDD[3] at coalesce at SparkRDDCoalesce.scala:12 []
 |  ShuffledRDD[2] at coalesce at SparkRDDCoalesce.scala:12 []
 +-(8) MapPartitionsRDD[1] at coalesce at SparkRDDCoalesce.scala:12 []
    |  ParallelCollectionRDD[0] at parallelize at SparkRDDCoalesce.scala:11 []

 1.2 计算结果

1.2.1 shuffle为true

 part-00000

4
7
6
1
21
21
78

part-00001

1
2
2
19
66
102

part-00002

8
1
2
3
74
22
72

 

1.2.2 shuffle为false

part-00000

1
8
2
1
4
part-00001

2
7
6
2
3
1
19

part-00002

21
66
74
22
21
72
78
102

 

 

 

2. RDD依赖图



 

 

 

3.源代码

 /**
   * Return a new RDD that is reduced into `numPartitions` partitions.
   *
   * This results in a narrow dependency, e.g. if you go from 1000 partitions
   * to 100 partitions, there will not be a shuffle, instead each of the 100
   * new partitions will claim 10 of the current partitions.
   *
   * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
   * this may result in your computation taking place on fewer nodes than
   * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
   * you can pass shuffle = true. This will add a shuffle step, but means the
   * current upstream partitions will be executed in parallel (per whatever
   * the current partitioning is).
   *
   * Note: With shuffle = true, you can actually coalesce to a larger number
   * of partitions. This is useful if you have a small number of partitions,
   * say 100, potentially with a few partitions being abnormally large. Calling
   * coalesce(1000, shuffle = true) will result in 1000 partitions with the
   * data distributed using a hash partitioner.
   */
  def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
      : RDD[T] = {
    if (shuffle) {
      /** Distributes elements evenly across output partitions, starting from a random partition. */
      val distributePartition = (index: Int, items: Iterator[T]) => {
        var position = (new Random(index)).nextInt(numPartitions)
        items.map { t => ///将items转换为(递增的Key,item)形式
          // Note that the hash code of the key will just be the key itself. The HashPartitioner
          // will mod it with the number of total partitions.
          position = position + 1 ///整数的hashCode为其本身?是的,参见Java的Integer#hashCode方法
          (position, t)
        }
      } : Iterator[(Int, T)]

      // include a shuffle step so that our upstream tasks are still distributed
      new CoalescedRDD(
        new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
        new HashPartitioner(numPartitions)),
        numPartitions).values
    } else { ///如果shuffle,则直接构造CoalescedRDD
      new CoalescedRDD(this, numPartitions)
    }
  }

 

 2. repartition

  /**
   * Return a new RDD that has exactly numPartitions partitions.
   *
   * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
   * a shuffle to redistribute data.
   *
   * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
   * which can avoid performing a shuffle.
   */
  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = {
    coalesce(numPartitions, shuffle = true)
  }

 可见repartition使用了shuffle为true的coalesce,主要用于对partition进行扩容(扩大partition),如果是窄化partition,考虑使用coalesce以避免使用shuffle(言外之意,是使用shuffle为false版本的coalesce)

 

  • 大小: 194.9 KB
  • 大小: 184.3 KB
分享到:
评论

相关推荐

    Spark1.4.1 RDD算子详解

    结合代码详细描述RDD算子的执行流程,并配上执行流程图

    Spark常用的算子以及Scala函数总结.pdf

    Transformation 算子用于对 RDD 进行变换操作,例如 map、filter 等,这些操作是延迟计算的,只有在触发 Action 算子时才真正执行。Action 算子会触发 SparkContext 提交作业并返回结果给驱动程序或写入外部存储系统...

    25个经典Spark算子的JAVA实现

    根据给定文件的信息,本文将详细介绍25个经典Spark算子的Java实现,并结合详细的注释及JUnit测试结果,帮助读者更好地理解Spark算子的工作原理及其应用方式。 ### Spark算子简介 在Apache Spark框架中,算子是用于...

    【SparkCore篇03】RDD行动算子1

    `first`算子返回RDD中的第一个元素。这在需要快速获取数据集的代表值时非常方便。 5. `take`算子: `take`算子返回RDD中的前n个元素组成的数组。这对于获取数据样本或者有限的输出很有帮助。 6. `takeOrdered`...

    spark基本算子操作

    - **示例**:若`rdd1 = sc.parallelize([1, 2, 3])`,`rdd2 = sc.parallelize([3, 4, 5])`,执行`rdd1.union(rdd2)`后,得到的新RDD为`[1, 2, 3, 3, 4, 5]`。 8. **`intersection(otherDataset)`** - **功能**:...

    【SparkCore篇02】RDD转换算子1

    SparkCore篇02主要介绍了RDD的一些基础转换算子,这些算子是Spark处理数据的核心工具。以下是关于这些算子的详细说明: 1. **map()**:map算子用于对RDD(Resilient Distributed Dataset)中的每个元素进行操作。它...

    sparkRDD函数大全

    spark rdd函数大全。spark rdd操作为core操作,虽然后续版本主要以dataset来操作,但是rdd操作也是不可忽略的一部分。

    Spark算子.pdf

    Spark对于大数据行业的实时处理数据来说,有着举足轻重的位置,特此学习整理了RDD 算子的各个含义,希望各位读者能够喜欢。谢谢

    spark算子基础讲义1

    四、Spark 算子实践操作 下面是一个使用 Spark 算子的实践操作: 1. 首先,我们需要将数据放入 Hadoop 集群中。 ``` hdfs dfs -mkdir /spark hdfs dfs -put word.txt /spark/1.log hdfs dfs -put word.txt /spark/...

    Spark学习--RDD编码

    当Spark对数据操作和转换时,会自动将RDD中的数据分发到集群,并将操作并行化执行。 Spark中的RDD是一个不可变的分布式对象集合。每个RDD都倍分为多个分区,这些分区运行在集群中的不同节点。RDD可以包含Python、...

    Spark思维导图之Spark RDD.png

    Spark思维导图之Spark RDD.png

    大数据实验报告Windows环境下安装Spark及RDD编程和Spark编程实现wordcount.doc

    大数据实验报告 Windows 环境下安装 Spark 及 RDD 编程和 Spark 编程实现 wordcount 本实验报告主要介绍了在 Windows 环境下安装 Spark 及 RDD 编程和 Spark 编程实现 wordcount 的步骤和过程。实验中首先安装了 ...

    Spark算子实例maven版

    Spark算子实例maven版是基于Apache Spark框架的开发示例,主要针对的是使用Maven构建项目的开发者。Apache Spark是一个用于大规模数据处理的快速、通用且可扩展的开源框架,它提供了一种分布式、内存计算的编程模型...

    Spark算子的详细使用方法

    Spark 算子是 Spark 框架中最基本的组成部分,它们是 Spark 程序的主要构建块。Spark 算子可以分为两类:Transformation 变换/转换算子和 Action 行动算子。 Transformation 变换/转换算子并不触发提交作业,完成...

    Spark-Transformation和Action算子.md

    ### Spark Transformation和Action算子详解 #### 一、Transformation **Transformation** 在 Spark 中是指对 RDD(弹性分布式数据集)进行的各种转换操作。这些操作并不会立即执行,而是延迟执行,直到遇到 Action...

    spark算子.docx

    Spark 算子详解 Spark 是一个基于内存的分布式计算框架,提供了多种算子来实现数据的处理和转换。本文将详细介绍 Spark 中常用的 Transformations 算子,包括 map、mapPartitions、mapPartitionsWithIndex、flatMap...

    10 实战解析spark运行原理和RDD解密

    "Spark 运行原理和 RDD 解密" Spark 是一个分布式计算框架,基于内存和磁盘,特别适合于迭代计算。Spark 的运行原理可以分为两大部分:Driver 端和 Executor 端。Driver 端负责提交任务,Executor 端负责执行任务...

    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql

    在大数据处理框架Apache Spark中,RDD(弹性分布式数据集)是基础的数据处理抽象,它提供了容错、分布式数据操作的能力。而DataFrame和Dataset是Spark SQL中更高级的数据抽象,提供了更多的优化和易于使用的特点。...

    spark RDD 论文 中文版

    ### Spark RDD论文中文版知识点详述 #### 一、引言 Spark RDD(Resilient Distributed Datasets)作为Apache Spark的核心组件之一,在大数据处理领域扮演着至关重要的角色。本论文旨在探讨Spark RDD的设计理念及其在...

Global site tag (gtag.js) - Google Analytics