原创,转载请注明出处 http://baishuo491.iteye.com/blog/2019510 ,作者邮箱:vc_java@hotmail.com,新浪微博:爱看历史的码农--白硕 作者单位:亚信联创大数据平台部
从一个简单的例子,来看rdd的演化,和stage是如何生成的(过程灰常之复杂和抽象,请参考附件的图来理解)
object BaiWordCount2 {
def main(args: Array[String]) {
.....
// Create the context
val ssc = new SparkContext(args(0), "BaiWordCount", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val lines = ssc.textFile(args(1))//基于hadoopRdd,创建了一个MapRdd[String]
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x =>{ println("x:"+ x);(x, 1) } )//这回返回的是一个元组了
val red = wordCounts.reduceByKey( (a,b)=>{a + b} )
red.saveAsTextFile("/root/Desktop/out")
}
}
我们首先总结出每个操作生成的rdd,这是通过源码追踪得到的:
textFile:生成一个
HadoopRDD 和 一个
MapPedRDD[String]
flatMap:前面生成的MapPedRDD[String],调用flatMap函数,生成了
FlatMaPPedRDD[String]
map:FlatMaPPedRDD[String],调用map函数,创建了一个创建了一个
MapPartitionsRDD,再通过隐式转换,生成了
PairRDD[String,Int](因为map操作,产生了一对值key和value)
reduceByKey:生成三个Rdd,首先根据之前的PairRDD,生成一个
MapPartitionsRDD(这个RDD起到类似map-reduce里面,combine()的作用),再生成一个
shuffledRdd(这个rdd是分割stage的重要依据之一),这之后再生成一个
MapPartitionsRDD[String](这个rdd起到hadoop里reducer的作用)
saveAsTextFile先生成一个
MapPedRDD,然后调用runJob函数,将之前生成的rdd链表,提交到spark集群上,spark根局rdd的类型,划分成一个或多个stage(只有shuffledRdd这类的rdd,才会成为stage和stage之间的边界),然后将各个stage,按照依赖的先后顺序,将stage先后提交集群进行计算
下边通过textFile来详细说明rdd链表的生成过程和主要数据结构,主要注意deps和几种dependency:
从rdd生成的方式来说可以分成四类:通过外部数据生成rdd,通过transformations函数生成,缓存操作,actions操作
textFile函数无疑属于第一种,通过外部数据,生成rdd,输入的文件路径,可以是hdfs://开头的hdfs数据,也可以本地文件路径,例如"/root/Desktop/word.text"
textFile函数调用hadoopFile 函数,生成一个hadoopRdd[K,V],默认情况下,泛型参数K和V,对应HadoopRDD的构造函数里的keyClass和valueClass。
也就是一个Rdd[LongWritable,Text],通过外部数据生成rdd的第一个rdd的特点是,deps是一个空的list,
原因是它是从外部文件生成的,没有父rdd。
生成了Rdd[LongWritable,Text]后,还要调用transformations函数map:map(pair => pair._2.toString),来生成一个MappedRDD
MappedRDD(this, sc.clean(f)),这里this,就是之前生成的HadoopRDD,MappedRDD的构造函数,
会调用父类的构造函数RDD[U](prev),
这个this(也就是hadoopRdd),会被赋值给prev,然后调用RDD.scala中,下面的构造函数
def this(@transient oneParent: RDD[_]) =
this(oneParent.context , List(new OneToOneDependency(oneParent)))
这个函数的作用,是把父RDD的SparkContext(oneParent.context),和一个列表List(new OneToOneDependency(oneParent))),传入了另一个RDD的构造函数,
RDD[T: ClassManifest](
@transient private var sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
)
这样我们可以看到,在所有有父子关系的RDD,共享的是同一个SparkContext。而子RDD的deps变量,也被赋值为一个List,里面包含一个OneToOneDependency实例,表明父RDD和子RDD之间的关系
其实大多数的父子关系,包含的都是OneToOneDependency.比较例外的几个,比如join,这个很明显,他的数据不是来自同一个父RDD。而shuffledRdd的Dependency是
ShuffledDependency
父Rdd会在子rdd的构造函数中被传入,然后放入子rdd实例的deps里面,被记录下来。这样,当我们得到一个Rdd之后,就可以向后回溯它的祖先,再结合传入的函数变量f,完整的得到它的构造过程。
flatMap,map,reduceByKey,saveAsTextFile则按顺序创建各自的rdd,然后在deps中记录父rdd,同时根据rdd的类型,生成各自的不同类型的dependency。
在saveAsTextFile函数把整个计算任务提交到集群之前,所有的函数进行的操作,仅仅就是生成rdd链表而已。saveAsTextFile是action类型的操作,action的共同特点是,会调用RunJob一类的函数,调用Dagscheduler.runJob,将最后一个rdd(在我们这个例子里,就是saveAsTextFile生成的那个MappedRdd),提交到集群上。集群会以这个rdd为参数之一,生成一个stage,名叫finalStage(故名思意,这是最终的一个stage)。然后调用submitStage,将刚刚生成的finalStage提交到集群上。
这个stage是否会被马上执行呢?不一定,因为程序会调用getMissingParentStages,进行寻找,是否有需要先进行提交的stage---这个过程可以这样类比,一个查询操作,在提交之后,要先检查是否有子查询,如果有,先执行子查询,然后在执行父查询,这里的原因很简单,父查询依赖于子查询的数据。同理,在stage执行的过程中,也要先查询,它是否需要其他stage的数据(其实之后一种数据,就是通过shuffle传过来的数据),如果有,那么这些stage,就是它的missingParentStage,它要等他的missingParentStage执行成功,然后通过shuffle机制把数据传给它,才能开始执行。这个过程的执行过程如下:从最后一个rdd起,查看它的dependency的类型,如果是
shuffledDependency,则创建一个ShuffleMapStage,否则,就向前遍历,依次递归,知道最前面的rdd为止
private def getMissingParentStages(stage: Stage): List[Stage] = {
.....
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {/
visited += rdd
if (getCacheLocs(rdd).contains(Nil)) {
for (dep <- rdd.dependencies) {
dep match {
case shufDep: ShuffleDependency[_,_] =>
val mapStage = getShuffleMapStage(shufDep, stage.jobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
case narrowDep: NarrowDependency[_] =>
visit(narrowDep.rdd)
}
}
}
}
}
visit(stage.rdd)
missing.toList
}
当getMissingParentStage(stage)的结果为空的时候,表明这个stage没有missingParentStage,或者它的missingParentStage已经都执行完了,则当前这个stage才能被成功的提交到集群去执行,否则,它就要等待,并重复调用getMissingParentStage,直到它的结果为空,才可以被提交。
提交执行的过程会另开一篇
- 大小: 142 KB
分享到:
相关推荐
文章中提到的两个.vsdx文件"spark_client_generateJob.vsd"和"spark_client_runJob.vsd"可能是流程图或架构图,详细描绘了Spark客户端如何生成Jobs和运行Tasks的逻辑流程。这类图表对于理解Spark的工作原理非常有...
### Apache Spark源码走读之3 -- Task运行期之函数调用关系分析 #### 概述 Apache Spark作为一款高效的大数据处理框架,在其内部有着复杂的任务调度与执行机制。本文将深入探讨Spark中Task执行期间的具体流程以及...
10. **源码分析**:书中会对Spark的关键组件如Scheduler、Executor、RDD的实现进行源码级别的解析,帮助读者深入理解其内部工作原理,提升调试和优化能力。 通过学习《深入理解Spark核心思想与源码分析》,读者不仅...
本文旨在通过对Apache Spark源码的初步解读,帮助读者建立起对Spark核心概念和技术细节的理解。 #### 二、基本概念 ##### 1. RDD(Resilient Distributed Dataset) - **定义**:弹性分布式数据集(Resilient ...
六、Spark源码分析 1. RDD创建:深入源码,我们可以看到如何通过`sparkContext.parallelize`或`sparkContext.textFile`创建RDD。 2. Action与Transformation:研究`count`、`map`等操作的实现,理解它们如何触发...
在Spark源码中,`org.apache.spark.rdd.RDD`类是所有RDD的基类,我们可以从这个类开始探索RDD的实现细节。 其次,Spark的工作流程主要由Driver和Executor两部分构成。Driver负责任务的调度和管理,Executor则是在...
在分析Stage划分原理前,需要了解一些前置知识,包括RDD的特性、转换操作(Transformation)和行动操作(Action)。 首先,RDD是Spark的核心概念,它是一种在节点间高度容错的、并行的数据结构,可以包含任何类型的...
Spark是Apache软件基金会下的一个大数据处理框架,以其高效、易用和可扩展性著称。在深入学习Spark源码的过程中,了解一些小贴士能够...通过阅读和分析Spark源码,我们可以发现潜在的优化点,提升大数据处理的效率。
源码分析可以展示如何利用这些机制来恢复失败的任务,并理解RDD的 lineage图是如何构建和使用的。 6. **资源管理** Spark与YARN、Mesos或Kubernetes等资源管理系统集成,源码中可以看到如何申请、释放资源,以及...
理解Spark的内核原理和源码分析对于开发者来说至关重要,有助于优化应用性能,定位并解决问题。通过深入学习Spark的Master、Worker、Driver和Executor的工作方式,以及不同集群部署模式,可以更好地理解和利用Spark...
6. Spark源码阅读,理解其内部实现机制,如任务调度、容错机制等。 7. Spark与其他工具的集成,如Hadoop、Hive、Kafka等。 对于开发者而言,深入理解这些知识点有助于提升他们在大数据领域的专业技能。
在这里,Java实现可能会包括DAG(有向无环图)构建、Stage划分、Task生成等过程。此外,Spark的存储系统,如BlockManager和MemoryManager,也是核心部分,它们负责数据的缓存和存储策略。 2. **模块详解** - **...
《Spark源码学习指南——深度探索Spark核心机制》 Spark作为一个强大的分布式计算框架,其高效、易用的特点深受广大开发者喜爱。源代码是理解任何软件系统最直接的途径,通过深入学习Spark源码,我们可以更好地掌握...