从一个简单的例子,来看rdd的演化,和stage是如何生成的(过程灰常之复杂和抽象,请参考附件的图来理解)
- object BaiWordCount2 {
- def main(args: Array[String]) {
- .....
- // Create the context
- val ssc = new SparkContext(args(0), "BaiWordCount", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
- val lines = ssc.textFile(args(1))//基于hadoopRdd,创建了一个MapRdd[String]
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map(x =>{ println("x:"+ x);(x, 1) } )//这回返回的是一个元组了
- val red = wordCounts.reduceByKey( (a,b)=>{a + b} )
- red.saveAsTextFile("/root/Desktop/out")
- }
- }
我们首先总结出每个操作生成的rdd,这是通过源码追踪得到的:
textFile:生成一个HadoopRDD 和 一个MapPedRDD[String]
flatMap:前面生成的MapPedRDD[String],调用flatMap函数,生成了FlatMaPPedRDD[String]
map:FlatMaPPedRDD[String],调用map函数,创建了一个创建了一个MapPartitionsRDD,再通过隐式转换,生成了PairRDD[String,Int](因为map操作,产生了一对值key和value)
reduceByKey:生成三个Rdd,首先根据之前的PairRDD,生成一个MapPartitionsRDD(这个RDD起到类似map-reduce里面,combine()的作用),再生成一个shuffledRdd(这个rdd是分割stage的重要依据之一),这之后再生成一个MapPartitionsRDD[String](这个rdd起到hadoop里reducer的作用)
saveAsTextFile先生成一个MapPedRDD,然后调用runJob函数,将之前生成的rdd链表,提交到spark集群上,spark根局rdd的类型,划分成一个或多个stage(只有shuffledRdd这类的rdd,才会成为stage和stage之间的边界),然后将各个stage,按照依赖的先后顺序,将stage先后提交集群进行计算
下边通过textFile来详细说明rdd链表的生成过程和主要数据结构,主要注意deps和几种dependency:
从rdd生成的方式来说可以分成四类:通过外部数据生成rdd,通过transformations函数生成,缓存操作,actions操作
textFile函数无疑属于第一种,通过外部数据,生成rdd,输入的文件路径,可以是hdfs://开头的hdfs数据,也可以本地文件路径,例如"/root/Desktop/word.text"
textFile函数调用hadoopFile 函数,生成一个hadoopRdd[K,V],默认情况下,泛型参数K和V,对应HadoopRDD的构造函数里的keyClass和valueClass。
也就是一个Rdd[LongWritable,Text],通过外部数据生成rdd的第一个rdd的特点是,deps是一个空的list,原因是它是从外部文件生成的,没有父rdd。
生成了Rdd[LongWritable,Text]后,还要调用transformations函数map:map(pair => pair._2.toString),来生成一个MappedRDD
MappedRDD(this, sc.clean(f)),这里this,就是之前生成的HadoopRDD,MappedRDD的构造函数,会调用父类的构造函数RDD[U](prev),
这个this(也就是hadoopRdd),会被赋值给prev,然后调用RDD.scala中,下面的构造函数
- def this(@transient oneParent: RDD[_]) =
- this(oneParent.context , List(new OneToOneDependency(oneParent)))
这个函数的作用,是把父RDD的SparkContext(oneParent.context),和一个列表List(new OneToOneDependency(oneParent))),传入了另一个RDD的构造函数,
- RDD[T: ClassManifest](
- @transient private var sc: SparkContext,
- @transient private var deps: Seq[Dependency[_]]
- )
这样我们可以看到,在所有有父子关系的RDD,共享的是同一个SparkContext。而子RDD的deps变量,也被赋值为一个List,里面包含一个OneToOneDependency实例,表明父RDD和子RDD之间的关系
其实大多数的父子关系,包含的都是OneToOneDependency.比较例外的几个,比如join,这个很明显,他的数据不是来自同一个父RDD。而shuffledRdd的Dependency是ShuffledDependency
父Rdd会在子rdd的构造函数中被传入,然后放入子rdd实例的deps里面,被记录下来。这样,当我们得到一个Rdd之后,就可以向后回溯它的祖先,再结合传入的函数变量f,完整的得到它的构造过程。
flatMap,map,reduceByKey,saveAsTextFile则按顺序创建各自的rdd,然后在deps中记录父rdd,同时根据rdd的类型,生成各自的不同类型的dependency。
在saveAsTextFile函数把整个计算任务提交到集群之前,所有的函数进行的操作,仅仅就是生成rdd链表而已。saveAsTextFile是action类型的操作,action的共同特点是,会调用RunJob一类的函数,调用Dagscheduler.runJob,将最后一个rdd(在我们这个例子里,就是saveAsTextFile生成的那个MappedRdd),提交到集群上。集群会以这个rdd为参数之一,生成一个stage,名叫finalStage(故名思意,这是最终的一个stage)。然后调用submitStage,将刚刚生成的finalStage提交到集群上。这个stage是否会被马上执行呢?不一定,因为程序会调用getMissingParentStages,进行寻找,是否有需要先进行提交的stage---这个过程可以这样类比,一个查询操作,在提交之后,要先检查是否有子查询,如果有,先执行子查询,然后在执行父查询,这里的原因很简单,父查询依赖于子查询的数据。同理,在stage执行的过程中,也要先查询,它是否需要其他stage的数据(其实之后一种数据,就是通过shuffle传过来的数据),如果有,那么这些stage,就是它的missingParentStage,它要等他的missingParentStage执行成功,然后通过shuffle机制把数据传给它,才能开始执行。这个过程的执行过程如下:从最后一个rdd起,查看它的dependency的类型,如果是
shuffledDependency,则创建一个ShuffleMapStage,否则,就向前遍历,依次递归,知道最前面的rdd为止
- private def getMissingParentStages(stage: Stage): List[Stage] = {
- .....
- def visit(rdd: RDD[_]) {
- if (!visited(rdd)) {/
- visited += rdd
- if (getCacheLocs(rdd).contains(Nil)) {
- for (dep <- rdd.dependencies) {
- dep match {
- case shufDep: ShuffleDependency[_,_] =>
- val mapStage = getShuffleMapStage(shufDep, stage.jobId)
- if (!mapStage.isAvailable) {
- missing += mapStage
- }
- case narrowDep: NarrowDependency[_] =>
- visit(narrowDep.rdd)
- }
- }
- }
- }
- }
- visit(stage.rdd)
- missing.toList
- }
当getMissingParentStage(stage)的结果为空的时候,表明这个stage没有missingParentStage,或者它的missingParentStage已经都执行完了,则当前这个stage才能被成功的提交到集群去执行,否则,它就要等待,并重复调用getMissingParentStage,直到它的结果为空,才可以被提交。
http://baishuo491.iteye.com/blog/2019510
相关推荐
在Spark中,可以使用toDF()将RDD转换为DataFrame,使用toDS()将RDD或DataFrame转换为Dataset。DataFrame和Dataset都可以很容易地转换为RDD,使用rdd属性即可。这种转换使得开发者可以根据需要在不同的数据抽象之间...
- **不可变性**:一旦创建,RDD的数据不能被修改,这有助于简化错误检测和调试过程。 - **容错性**:RDD能够自动从失败中恢复,无需用户干预。 - **缓存能力**:用户可以选择将RDD缓存在内存中,这对于需要频繁...
通过理解RDD的不可变性、血统信息、转换与动作、数据分区和持久化等概念,开发者能够更好地利用Spark解决大规模数据处理问题。而《spark rdd 论文翻译_中文_spark老汤》和原版英文论文为深入学习和理解这些概念提供...
### Spark RDD 基础论文知识点解析 #### 一、引言与背景 - **Spark RDD 的起源**:本文档介绍的是 Spark 中的核心抽象概念——弹性分布式数据集(Resilient Distributed Datasets, RDD),这一概念由加州大学...
课时4:SparkRDD原理剖析 课时5:Spark2sql从mysql中导入 课时6:Spark1.6.2sql与mysql数据交互 课时7:SparkSQL java操作mysql数据 课时8:Spark统计用户的收藏转换率 课时9:Spark梳理用户的收藏以及订单转换率 ...
本实验报告主要介绍了在 Windows 环境下安装 Spark 及 RDD 编程和 Spark 编程实现 wordcount 的步骤和过程。实验中首先安装了 Spark 和其依赖项,然后通过 RDD 编程实现了 wordcount 的功能。最后,通过 Spark 编程...
在 Spark 中,Driver 程序负责启动多个 Worker,Worker 从文件系统加载数据并将其转换为 RDD。 RDD 的概念 RDD(Resilient Distributed Dataset)是 Spark 中的核心概念。RDD 是一个只读、分区记录的集合,可以被...
Java 和 Scala 实现 Spark RDD 转换成 DataFrame 的两种方法小结 在本文中,我们将讨论如何使用 Java 和 Scala 将 Spark RDD 转换成 DataFrame,並且介绍两种实现方法。 准备数据源 在项目下新建一个 student.txt...
RDD 是一种不可变、容错的分布式数据集,具备弹性、分布式、基于内存的特性,允许在计算过程中高效地进行并行运算。 RDD 的特性包括: 1. **只读性**:RDD 一旦创建,就不能被修改。任何对 RDD 的改变都会生成一个...
### Spark RDD详解 #### Spark计算模型与RDD概念 在探讨Spark的弹性分布式数据集(RDD)之前,我们首先需要理解Spark的基本计算模型。Spark是一种基于内存的分布式计算框架,其核心设计思想在于通过缓存中间结果来...
标题:“Spark RDD API”说明了本文档将专注于Apache Spark中弹性分布式数据集(RDD)的API。RDD是Spark的核心概念,它是一个容错的、并行的数据结构,使得用户能够处理大数据集。本文档将基于Scala语言中的RDD实现...
在描述中提到的"Spark RDD以及其特性的流程图",可能是对RDD创建、转换、持久化和执行流程的可视化表示。Visio是一款常用的流程图绘制工具,用于打开和理解这个流程图,可以帮助我们更直观地掌握RDD的工作原理。 总...
在转换过程中,通常是从RDD转换到DataFrame,然后再转换回RDD。因为DataFrame的优势在于优化执行计划和对结构化数据的更好处理,这是由于Spark SQL引擎的引入。它允许SQL查询被应用于存储在DataFrame中的数据。而从...
当Spark对数据操作和转换时,会自动将RDD中的数据分发到集群,并将操作并行化执行。 Spark中的RDD是一个不可变的分布式对象集合。每个RDD都倍分为多个分区,这些分区运行在集群中的不同节点。RDD可以包含Python、...
Spark RDD(弹性分布式数据集)是Apache Spark的核心概念,它是一种可分区、容错并可以在集群上并行处理的数据集合。在本实例中,我们将学习如何使用Python接口PySpark来处理RDD,通过实现三个基本的运营案例:计算...
- **计算步骤而非数据集合**:RDD 关注的是数据的转换过程而不是数据本身。 #### 三、创建 RDD ##### 1. 并行化集合 通过调用 SparkContext 的 `parallelize` 方法,可以在驱动程序中现有的集合基础上创建并行化...