`

Spark 源码解析 : DAGScheduler中的DAG划分与提交

阅读更多

一、Spark 运行架构

 
Spark 运行架构如下图:
各个RDD之间存在着依赖关系,这些依赖关系形成有向无环图DAG,DAGScheduler对这些依赖关系形成的DAG,进行Stage划分,划分的规则很简单,从后往前回溯,遇到窄依赖加入本stage,遇见宽依赖进行Stage切分。完成了Stage的划分,DAGScheduler基于每个Stage生成TaskSet,并将TaskSet提交给TaskScheduler。TaskScheduler 负责具体的task调度,在Worker节点上启动task。
 
 
二、源码解析:DAGScheduler中的DAG划分
    当RDD触发一个Action操作(如:colllect)后,导致SparkContext.runJob的执行。而在SparkContext的run方法中会调用DAGScheduler的run方法最终调用了DAGScheduler的submit方法:
def submitJob[T, U](
rdd: RDD[T],
func:(TaskContext,Iterator[T])=> U,
partitions:Seq[Int],
callSite:CallSite,
resultHandler:(Int, U)=>Unit,
properties:Properties):JobWaiter[U]={
// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p <0).foreach { p =>
thrownewIllegalArgumentException(
"Attempting to access a non-existent partition: "+ p +". "+
"Total number of partitions: "+ maxPartitions)
}
 
val jobId = nextJobId.getAndIncrement()
if(partitions.size ==0){
// Return immediately if the job is running 0 tasks
returnnewJobWaiter[U](this, jobId,0, resultHandler)
}
 
assert(partitions.size >0)
val func2 = func.asInstanceOf[(TaskContext,Iterator[_])=> _]
val waiter =newJobWaiter(this, jobId, partitions.size, resultHandler)
//给eventProcessLoop发送JobSubmitted消息
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}
 
 
DAGScheduler的submit方法中,像eventProcessLoop对象发送了JobSubmitted消息。eventProcessLoop是DAGSchedulerEventProcessLoop类的对象
 
private[scheduler] val eventProcessLoop =newDAGSchedulerEventProcessLoop(this)
 
 
DAGSchedulerEventProcessLoop,接收各种消息并进行处理,处理的逻辑在其doOnReceive方法中:
 
private def doOnReceive(event:DAGSchedulerEvent):Unit= event match {
   //Job提交
caseJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)=>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
 
caseMapStageSubmitted(jobId, dependency, callSite, listener, properties)=>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
 
caseStageCancelled(stageId)=>
dagScheduler.handleStageCancellation(stageId)
 
caseJobCancelled(jobId)=>
dagScheduler.handleJobCancellation(jobId)
 
caseJobGroupCancelled(groupId)=>
dagScheduler.handleJobGroupCancelled(groupId)
 
caseAllJobsCancelled=>
dagScheduler.doCancelAllJobs()
 
caseExecutorAdded(execId, host)=>
dagScheduler.handleExecutorAdded(execId, host)
 
caseExecutorLost(execId)=>
dagScheduler.handleExecutorLost(execId, fetchFailed =false)
 
caseBeginEvent(task, taskInfo)=>
dagScheduler.handleBeginEvent(task, taskInfo)
 
caseGettingResultEvent(taskInfo)=>
dagScheduler.handleGetTaskResult(taskInfo)
 
case completion:CompletionEvent=>
dagScheduler.handleTaskCompletion(completion)
 
caseTaskSetFailed(taskSet, reason, exception)=>
dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
 
caseResubmitFailedStages=>
dagScheduler.resubmitFailedStages()
}
 
 
可以把DAGSchedulerEventProcessLoop理解成DAGScheduler的对外的功能接口。它对外隐藏了自己内部实现的细节。无论是内部还是外部消息,DAGScheduler可以共用同一消息处理代码,逻辑清晰,处理方式统一。
 
接下来分析DAGScheduler的Stage划分,handleJobSubmitted方法首先创建ResultStage
 
try{
//创建新stage可能出现异常,比如job运行依赖hdfs文文件被删除
finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
}catch{
case e:Exception=>
logWarning("Creating new stage failed due to exception - job: "+ jobId, e)
listener.jobFailed(e)
return
}
  
然后调用submitStage方法,进行stage的划分。
 
 
首先由finalRDD获取它的父RDD依赖,判断依赖类型,如果是窄依赖,则将父RDD压入栈中,如果是宽依赖,则作为父Stage。
 
看一下源码的具体过程:
 
private def getMissingParentStages(stage:Stage):List[Stage]={
val missing =newHashSet[Stage] //存储需要返回的父Stage
val visited =newHashSet[RDD[_]] //存储访问过的RDD
//自己建立栈,以免函数的递归调用导致
val waitingForVisit =newStack[RDD[_]]
 
def visit(rdd: RDD[_]){
if(!visited(rdd)){
visited += rdd
val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
if(rddHasUncachedPartitions){
for(dep <- rdd.dependencies){
dep match {
case shufDep:ShuffleDependency[_, _, _]=>
val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)
if(!mapStage.isAvailable){
missing += mapStage //遇到宽依赖,加入父stage
}
case narrowDep:NarrowDependency[_]=>
waitingForVisit.push(narrowDep.rdd) //窄依赖入栈,
}
}
}
}
}
 
   //回溯的起始RDD入栈
waitingForVisit.push(stage.rdd)
while(waitingForVisit.nonEmpty){
visit(waitingForVisit.pop())
}
missing.toList
}
 
 
getMissingParentStages方法是由当前stage,返回他的父stage,父stage的创建由getShuffleMapStage返回,最终会调用newOrUsedShuffleStage方法返回ShuffleMapStage
 
private def newOrUsedShuffleStage(
shuffleDep:ShuffleDependency[_, _, _],
firstJobId:Int):ShuffleMapStage={
val rdd = shuffleDep.rdd
val numTasks = rdd.partitions.length
val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite)
if(mapOutputTracker.containsShuffle(shuffleDep.shuffleId)){
//Stage已经被计算过,从MapOutputTracker中获取计算结果
val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
val locs =MapOutputTracker.deserializeMapStatuses(serLocs)
(0 until locs.length).foreach { i =>
if(locs(i) ne null){
// locs(i) will be null if missing
stage.addOutputLoc(i, locs(i))
}
}
}else{
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
logInfo("Registering RDD "+ rdd.id +" ("+ rdd.getCreationSite +")")
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
}
stage
}
 
 
现在父Stage已经划分好,下面看看你Stage的提交逻辑
 
/** Submits stage, but first recursively submits any missing parents. */
private def submitStage(stage:Stage){
val jobId = activeJobForStage(stage)
if(jobId.isDefined){
logDebug("submitStage("+ stage +")")
if(!waitingStages(stage)&&!runningStages(stage)&&!failedStages(stage)){
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: "+ missing)
if(missing.isEmpty){
logInfo("Submitting "+ stage +" ("+ stage.rdd +"), which has no missing parents")
//如果没有父stage,则提交当前stage
submitMissingTasks(stage, jobId.get)
}else{
for(parent <- missing){
//如果有父stage,则递归提交父stage
submitStage(parent)
}
waitingStages += stage
}
}
}else{
abortStage(stage,"No active job for stage "+ stage.id,None)
}
}
 
 提交的过程很简单,首先当前stage获取父stage,如果父stage为空,则当前Stage为起始stage,交给submitMissingTasks处理,如果当前stage不为空,则递归调用submitStage进行提交。
 
到这里,DAGScheduler中的DAG划分与提交就讲完了,下次解析这些stage是如果封装成TaskSet交给TaskScheduler以及TaskSchedule的调度过程。
 
 
 
 
 
 
 
 
 
 
 
 

 

分享到:
评论

相关推荐

    spark源码分析

    六、Spark源码解析 Spark源码分析主要涉及以下几个关键模块: 1. DAGScheduler:负责构建DAG,并将其拆分为任务阶段。 2. TaskScheduler:调度和分配任务到工作节点。 3. Executor:执行实际任务,管理内存和线程。 ...

    大数据Spark源码

    《深入解析Spark源码:大数据处理的基石》 Spark,作为大数据处理领域的明星框架,以其高效、易用和可扩展性赢得了广泛的认可。Spark源码的学习对于深入理解其内部机制,提升开发效率,以及解决实际问题具有至关...

    Spark-内核源码解析.docx

    Spark 内核源码解析 Spark 是一个大规模数据处理框架,它的核心原理和架构组件对开发者和研究人员来说非常重要。为了深入了解 Spark 的内部机制,我们需要对其内核源码进行深入分析。 Application/App 在 Spark ...

    Spark 3.0.0 Driver 启动内幕

    DAGScheduler 解析:DAG 的实例化 、DAGScheduler 划分Stage 的原理、DAGScheduler 划分Stage 的具体算法、Stage 内部Task 获取位置的算法;TaskScheduler 解析:TaskScheduler 原理剖析、TaskScheduler 源码解析;...

    Apache Spark源码走读之3 -- Task运行期之函数调用关系分析

    ### Apache Spark源码走读之3 -- Task运行期之函数调用关系分析 #### 概述 Apache Spark作为一款高效的大数据处理框架,在其内部有着复杂的任务调度与执行机制。本文将深入探讨Spark中Task执行期间的具体流程以及...

    深入理解Spark核心思想与源码分析

    5. **DAGScheduler与TaskScheduler**:Spark将转换链构建为有向无环图(DAG),然后DAGScheduler将其切分成Stage,每个Stage是一组可以并行执行的任务集。TaskScheduler负责将任务分发到Executor上执行。 6. **内存...

    spark-java:java实现spark核心源代码

    **Spark Java核心源码解析** Spark作为一个分布式计算框架,以其高效、灵活的特点在大数据处理领域广泛应用。本项目“spark-java”旨在用Java语言模仿Spark的核心功能,帮助开发者更深入地理解Spark的工作机制。...

    Spark-source-code-description-spark source code

    《Spark源码解析:探索系统开源的奥秘》 Spark,作为大数据处理领域的明星框架,以其高效的计算性能、丰富的生态组件以及易用的API深受广大开发者喜爱。Spark源码不仅揭示了其内在的工作机制,也为深入理解分布式...

    SparkSourceCodeLearning-spark source code

    源代码是理解任何软件系统最直接的途径,通过深入学习Spark源码,我们可以更好地掌握其工作原理,优化应用程序,并解决实际开发中的问题。本文将围绕"SparkSourceCodeLearning"这一主题,对Spark的核心组件和关键...

    SparkSourceCode-spark source code

    《Spark源码解析——探索系统开源的奥秘》 Spark,作为大数据处理领域的重要框架,以其高效的计算性能、易用的API以及丰富的生态系统赢得了全球开发者的喜爱。Spark源码的研读,对于深入理解其工作原理,提升开发...

Global site tag (gtag.js) - Google Analytics