`
bit1129
  • 浏览: 1072976 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark四十二】RDD算子逻辑执行图第二部分

 
阅读更多

1.distinct

2.cogroup

 

 

 

1.distinct

1.示例代码

package spark.examples

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._

object SparkRDDDistinct {

  def main(args : Array[String]) {
    val conf = new SparkConf().setAppName("SparkRDDDistinct").setMaster("local");
    val sc = new SparkContext(conf);
    val rdd1 = sc.parallelize(List(1,8,2,1,4,2,7,6,2,3,1), 3)
    val pairs = rdd1.distinct();

    pairs.saveAsTextFile("file:///D:/distinct" + System.currentTimeMillis());

    println(pairs.toDebugString)
  }

}

1.1 输出的RDD依赖

(3) MappedRDD[3] at distinct at SparkRDDDisctinct.scala:14 []
 |  ShuffledRDD[2] at distinct at SparkRDDDisctinct.scala:14 []
 +-(3) MappedRDD[1] at distinct at SparkRDDDisctinct.scala:14 []
    |  ParallelCollectionRDD[0] at parallelize at SparkRDDDisctinct.scala:13 []

1.2  作业结果

part-000000:   6 3

part-000001:   4 1 7

part-000002:   8 2

 

注意的是:结果并没有排序

 

2.distict的源代码

def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] =
    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) ///map得到元组的第一个元素

 

 

 

3.RDD依赖图



 

 2. cogroup

2.1 示例代码

package spark.examples

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._

object SparkRDDCogroup {

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SparkRDDCogroup").setMaster("local");
    val sc = new SparkContext(conf);

    //第一个参数是集合,第二个参数是分区数
    val rdd1 = sc.parallelize(List((1, 2), (2, 3), (3, 4), (2,10),(4, 5), (5, 6)), 3)
    val rdd2 = sc.parallelize(List((3, 6), (2, 8), (9,11)), 2);

    //cogroup操作的RDD的元素类型必须是K/V类型
    val pairs = rdd1.cogroup(rdd2);
    pairs.saveAsTextFile("file:///D:/cogroup" + System.currentTimeMillis());

    println(pairs.toDebugString)
  }

}

 

2.2 RDD依赖关系

 

(3) MappedValuesRDD[3] at cogroup at SparkRDDCogroup.scala:17 []
 |  CoGroupedRDD[2] at cogroup at SparkRDDCogroup.scala:17 []
 +-(3) ParallelCollectionRDD[0] at parallelize at SparkRDDCogroup.scala:13 []
 +-(2) ParallelCollectionRDD[1] at parallelize at SparkRDDCogroup.scala:14 []

 

2.3 执行结果:

part-00000: (3,(CompactBuffer(4),CompactBuffer(6))) (9,(CompactBuffer(),CompactBuffer(11)))

part-00001: (4,(CompactBuffer(5),CompactBuffer())) (1,(CompactBuffer(2),CompactBuffer()))

part-00002: (5,(CompactBuffer(6),CompactBuffer())) (2,(CompactBuffer(3, 10),CompactBuffer(8)))

 

从结果中可以看到,

cogroup是对所有的Key进行聚合,不管这个Key在哪个RDD中出现,比如9,在rdd2中出现,那么也会出现在结果集中。

如果rdd中有两个Key一样的元素,比如(2,3),(2,10),那么跟rdd2的(2,8)聚合后得到什么结果?(2,(CompactBuffer(3, 10),CompactBuffer(8)))

 

2.4 RDD依赖图




 

 cogroup函数的的源代码

 

 /**
   * For each key k in `this` or `other1` or `other2` or `other3`,
   * return a resulting RDD that contains a tuple with the list of values
   * for that key in `this`, `other1`, `other2` and `other3`.
   */
  def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
      other2: RDD[(K, W2)],
      other3: RDD[(K, W3)],
      partitioner: Partitioner)
      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = {
    if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
      throw new SparkException("Default partitioner cannot partition array keys.")
    }
    val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
    cg.mapValues { case Array(vs, w1s, w2s, w3s) =>
       (vs.asInstanceOf[Iterable[V]],
         w1s.asInstanceOf[Iterable[W1]],
         w2s.asInstanceOf[Iterable[W2]],
         w3s.asInstanceOf[Iterable[W3]])
    }
  }

 可见,cogroup最多对四个RDD同时做cogroup操作。cogroup操作的含义是,对在四个RDD中的每个Key进行操作,Key对应的Value是,每个RDD中这个Key对应的Value的集合所构成的元组

 

  • 大小: 194.1 KB
  • 大小: 316.7 KB
  • 大小: 69.5 KB
分享到:
评论

相关推荐

    Spark1.4.1 RDD算子详解

    结合代码详细描述RDD算子的执行流程,并配上执行流程图

    Spark常用的算子以及Scala函数总结.pdf

    Transformation 算子用于对 RDD 进行变换操作,例如 map、filter 等,这些操作是延迟计算的,只有在触发 Action 算子时才真正执行。Action 算子会触发 SparkContext 提交作业并返回结果给驱动程序或写入外部存储系统...

    25个经典Spark算子的JAVA实现

    根据给定文件的信息,本文将详细介绍25个经典Spark算子的Java实现,并结合详细的注释及JUnit测试结果,帮助读者更好地理解Spark算子的工作原理及其应用方式。 ### Spark算子简介 在Apache Spark框架中,算子是用于...

    【SparkCore篇03】RDD行动算子1

    `first`算子返回RDD中的第一个元素。这在需要快速获取数据集的代表值时非常方便。 5. `take`算子: `take`算子返回RDD中的前n个元素组成的数组。这对于获取数据样本或者有限的输出很有帮助。 6. `takeOrdered`...

    spark基本算子操作

    - **示例**:若`rdd1 = sc.parallelize([1, 2, 3])`,`rdd2 = sc.parallelize([3, 4, 5])`,执行`rdd1.union(rdd2)`后,得到的新RDD为`[1, 2, 3, 3, 4, 5]`。 8. **`intersection(otherDataset)`** - **功能**:...

    【SparkCore篇02】RDD转换算子1

    SparkCore篇02主要介绍了RDD的一些基础转换算子,这些算子是Spark处理数据的核心工具。以下是关于这些算子的详细说明: 1. **map()**:map算子用于对RDD(Resilient Distributed Dataset)中的每个元素进行操作。它...

    sparkRDD函数大全

    spark rdd函数大全。spark rdd操作为core操作,虽然后续版本主要以dataset来操作,但是rdd操作也是不可忽略的一部分。

    Spark算子.pdf

    Spark对于大数据行业的实时处理数据来说,有着举足轻重的位置,特此学习整理了RDD 算子的各个含义,希望各位读者能够喜欢。谢谢

    Spark学习--RDD编码

    RDD:弹性分布式数据集(ResilientDistributed Dataset),是Spark对数据的...本节将通过示例的方式验证第二节中相关的转化操作和行动操作。 转化和行动计算结果 代码地址: 参考文献: 王道远 《Spark 快速大数据分析》

    spark算子基础讲义1

    四、Spark 算子实践操作 下面是一个使用 Spark 算子的实践操作: 1. 首先,我们需要将数据放入 Hadoop 集群中。 ``` hdfs dfs -mkdir /spark hdfs dfs -put word.txt /spark/1.log hdfs dfs -put word.txt /spark/...

    大数据实验报告Windows环境下安装Spark及RDD编程和Spark编程实现wordcount.doc

    大数据实验报告 Windows 环境下安装 Spark 及 RDD 编程和 Spark 编程实现 wordcount 本实验报告主要介绍了在 Windows 环境下安装 Spark 及 RDD 编程和 Spark 编程实现 wordcount 的步骤和过程。实验中首先安装了 ...

    Spark思维导图之Spark RDD.png

    Spark思维导图之Spark RDD.png

    Spark算子实例maven版

    2. **filter()**: 通过传入一个谓词函数,filter算子可以筛选出满足条件的元素,返回一个新的RDD。这个操作常用于数据预处理,例如删除无效或异常的数据。 3. **reduceByKey()**: 在键值对的RDD上操作,将具有相同...

    spark RDD操作详解

    ### Spark RDD操作详解 #### 一、RDD概念与特性 **RDD(弹性分布式数据集)**是Apache Spark的核心抽象,代表一种不可变的、可分区的、能够进行并行操作的数据集合。它提供了丰富的API来支持高效的大规模数据处理...

    Spark-Transformation和Action算子.md

    `cartesian` 操作会产生两个 RDD 的笛卡尔积,即第一个 RDD 中的每一个元素都会与第二个 RDD 中的所有元素组合。 **示例代码:** ```scala val numbers1 = sc.parallelize(Array(1, 2)) val numbers2 = sc....

    Spark算子的详细使用方法

    Spark 算子是 Spark 框架中最基本的组成部分,它们是 Spark 程序的主要构建块。Spark 算子可以分为两类:Transformation 变换/转换算子和 Action 行动算子。 Transformation 变换/转换算子并不触发提交作业,完成...

    spark算子.docx

    Spark 算子详解 Spark 是一个基于内存的分布式计算框架,提供了多种算子来实现数据的处理和转换。本文将详细介绍 Spark 中常用的 Transformations 算子,包括 map、mapPartitions、mapPartitionsWithIndex、flatMap...

    10 实战解析spark运行原理和RDD解密

    "Spark 运行原理和 RDD 解密" Spark 是一个分布式计算框架,基于内存和磁盘,特别适合于迭代计算。Spark 的运行原理可以分为两大部分:Driver 端和 Executor 端。Driver 端负责提交任务,Executor 端负责执行任务...

    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql

    在大数据处理框架Apache Spark中,RDD(弹性分布式数据集)是基础的数据处理抽象,它提供了容错、分布式数据操作的能力。而DataFrame和Dataset是Spark SQL中更高级的数据抽象,提供了更多的优化和易于使用的特点。...

    spark RDD 论文 中文版

    ### Spark RDD论文中文版知识点详述 #### 一、引言 Spark RDD(Resilient Distributed Datasets)作为Apache Spark的核心组件之一,在大数据处理领域扮演着至关重要的角色。本论文旨在探讨Spark RDD的设计理念及其在...

Global site tag (gtag.js) - Google Analytics