`
wbj0110
  • 浏览: 1598211 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

Spark Core源码分析: Spark任务执行模型

阅读更多

DAGScheduler

 

面向stage的调度层,为job生成以stage组成的DAG,提交TaskSet给TaskScheduler执行。

每一个Stage内,都是独立的tasks,他们共同执行同一个compute function,享有相同的shuffledependencies。DAG在切分stage的时候是依照出现shuffle为界限的。

Java代码  收藏代码
  1. private[spark]  
  2. class DAGScheduler(  
  3.     taskScheduler: TaskScheduler,  
  4.     listenerBus: LiveListenerBus,  
  5.     mapOutputTracker: MapOutputTrackerMaster,  
  6.     blockManagerMaster: BlockManagerMaster,  
  7.     env: SparkEnv)  
  8.   extends Logging {  

 

Java代码  收藏代码
  1. // Actor模式收取发来的DAGSchedulerEvent,并进行processEvent  
  2. private var eventProcessActor: ActorRef = _  
  3.   
  4.   private[scheduler] val nextJobId = new AtomicInteger(0)  
  5.   private[scheduler] def numTotalJobs: Int = nextJobId.get()  
  6.   private val nextStageId = new AtomicInteger(0)  
  7.   
  8.   // 一系列信息维护,很清晰  
  9.   private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]  
  10.   private[scheduler] val stageIdToJobIds = new HashMap[Int, HashSet[Int]]  
  11.   private[scheduler] val stageIdToStage = new HashMap[Int, Stage]  
  12.   private[scheduler] val shuffleToMapStage = new HashMap[Int, Stage]  
  13.   private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]  
  14.   private[scheduler] val resultStageToJob = new HashMap[Stage, ActiveJob]  
  15.   private[scheduler] val stageToInfos = new HashMap[Stage, StageInfo]  
  16.   
  17.   // 不同状态stages的维护,很清晰  
  18.   // Stages we need to run whose parents aren't done  
  19.   private[scheduler] val waitingStages = new HashSet[Stage]  
  20.   
  21.   // Stages we are running right now  
  22.   private[scheduler] val runningStages = new HashSet[Stage]  
  23.   
  24.   // Stages that must be resubmitted due to fetch failures  
  25.   private[scheduler] val failedStages = new HashSet[Stage]  
  26.   
  27.   // Missing tasks from each stage  
  28.   private[scheduler] val pendingTasks = new HashMap[Stage, HashSet[Task[_]]]  
  29.   
  30.   private[scheduler] val activeJobs = new HashSet[ActiveJob]  
  31.   
  32.   // Contains the locations that each RDD's partitions are cached on  
  33.   private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]  

 

在start()方法中会初始化Actor,然后接收DAGSchedulerEvent处理。Scheduler会在SparkContext里start起来。

 

Event处理

 

源码的阅读入口:可以根据processEvent(event:DAGSchedulerEvent)方法展开。

处理的事件包括这么一些:

 

Submit Job

 

JobSubmitted事件:

 

 

提交任务的事件传入参数如下

Java代码  收藏代码
  1. case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties)  

 

处理过程可以拆成三步看,每一步里面涉及的具体逻辑在下面补充展开

 

 

Java代码  收藏代码
  1. finalStage = newStage(rdd, partitions.size, None, jobId, Some(callSite))  

 

本次newStage()操作可以对应新的result stage或者shuffle stage。返回Stage类(里面记录一些信息)。Stage类会传入Option[ShuffleDependency[_,_]]参数,内部有一个isShuffleMap变量,以标识该Stage是shuffle or result。

 

 

Java代码  收藏代码
  1. val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)  

 

ActiveJob类也是记录一些信息的类,可以当作是一个VO类

 

 

Java代码  收藏代码
  1. if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {  
  2. // Compute very short actions like first() or take()   
  3. // with no parent stages locally.  
  4. listenerBus.post(SparkListenerJobStart(  
  5. job.jobId, Array[Int](), properties))  
  6. runLocally(job)  
  7. else {  
  8. jobIdToActiveJob(jobId) = job  
  9. activeJobs += job  
  10. resultStageToJob(finalStage) = job  
  11. listenerBus.post(  
  12. SparkListenerJobStart(  
  13. job.jobId, jobIdToStageIds(jobId).toArray, properties))  
  14. submitStage(finalStage)  
  15. }  

 

首先判断stage没有父亲依赖,且partition为1的话,就执行本地任务。否则,submitStage。

 

submitStage的逻辑为,首先寻找本次stage的parents。如果没有missing的parent stage,那么就submitMissingTask,即提交本次stage的tasks了。如果有,会对parent stage进行递归submitStage,而且getMissingParentStages得到的结果集是按id降序排的,也就是说递归submitStage的时候会按parent stage的id顺序进行。

 

submitMissingTask处理的是stage的parent已经available的stage。主要逻辑如下:

第一步:通过stage.isShuffleMap来决定生成ShuffleMapTask还是ResultTask,生成的ShuffleMapTask数目和partition数目相等。

第二步:把生成的tasks组建成一个TaskSet,提交给TaskScheduler的submitTasks方法。

 

TaskScheduler

 

DAGScheduler以stage为单位,提tasks给TaskScheduer,实现类为TaskSchedulerImpl。

 

TaskSchedulerImpl几个内部部件:

SchedulerBackend

SchedulableBuilder

DAGScheduler

TaskSetManager

TaskResultGetter

Tasks信息(taskIdToTaskSetId,taskIdToExecutorId,activeExecutorIds)

别的信息(SchedulerMode)

 

TaskScheduler做接收task、接收分到的资源和executor、维护信息、与backend打交道、把任务分配好等事情。

 

start(),stop()的时候,backend的start(),stop()

 

submitTasks(TaskSet)逻辑:

为这批Task生成新的TaskSetManager,把TaskSetManager加到SchedulerBuilder里,然后向backend进行一次reviveOffer()操作。

 

SchedulerBuilder

 

SchedulableBuilder有FIFO和Fair两种实现, addTaskSetManager会把TaskSetManager加到pool里。FIFO的话只有一个pool。Fair有多个pool,Pool也分FIFO和Fair两种模式。

SchedulableBuilder的rootPool里面可以新增pool或者TaskSetManager,两者都是Scheduable的继承类,所以SchedulableBuilder用于维护rootPool这棵Scheduable 树结构。Pool是树上的非叶子节点,而TaskSetManager就是叶子节点。

在TaskScheduler初始化的时候会buildDafaultPool。

 

 

TaskSetManager

 

TaskSetManager负责这批Tasks的启动,失败重试,感知本地化等事情。每次reourseOffer方法会寻找合适(符合条件execId, host, locality)的Task并启动它。

 

reourseOffer方法,

Java代码  收藏代码
  1. def resourceOffer(  
  2.     execId: String,  
  3.     host: String,  
  4.     maxLocality: TaskLocality.TaskLocality)  

寻找符合execId, host和locality的task,找到的话就启动这个Task。启动的时候,把task加到runningTask的HashSet里,然后调DAGScheduler的taskStarted方法,taskStarted方法向eventProcessorActor发出BeginEvent的DAGSchedulerEvent。

 

TaskResultGetter

 

维护一个线程池,用来反序列化和从远端获取task结果。

 

 

Java代码  收藏代码
  1. def enqueueSuccessfulTask(taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer)  

把序列化的数据反序列化解析出来之后,有两种情况:直接可读的result和间接task result。

 

前者是DirectTaskResult[T]类:

Java代码  收藏代码
  1. class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics)  

 

后者是IndirectTaskResult[T]类:

Java代码  收藏代码
  1. case class IndirectTaskResult[T](blockId: BlockId) extends TaskResult[T] with Serializable  

在反解析出IndirectTaskResult后,可以得到BlockId这个类,他的实现有这么些:

 

在TaskResultGetter里,会通过blockManager的getRemoteBytes(BlockId)方法来获得序列化的task result,对这个task result进行反解析后得到DirectTaskResult类,从而获得反序列化后的真正结果数据。

这是大致的一个过程,具体还有一些细节在之中,比如会向scheduler发送不同的event、blockManager会调用BlockManagerMaster把该Block remove掉。

 

BlockId类有这么些关键变量:

Java代码  收藏代码
  1. private[spark] sealed abstract class BlockId {  
  2.   /** A globally unique identifier for this Block. Can be used for ser/de. */  
  3.   def name: String  
  4.   
  5.   // convenience methods  
  6.   def asRDDId = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None  
  7.   def isRDD = isInstanceOf[RDDBlockId]  
  8.   def isShuffle = isInstanceOf[ShuffleBlockId]  
  9.   def isBroadcast = isInstanceOf[BroadcastBlockId]  

 

下面看BlockManager如何通过BlockId获得数据:

调用的是BlockManager的内部方法

Java代码  收藏代码
  1. private def doGetRemote(blockId: BlockId, asValues: Boolean): Option[Any] = {  
  2. require(blockId != null"BlockId is null")  
  3. // 通过BlockManagerMaster获得这个blockId的locations  
  4.     val locations = Random.shuffle(master.getLocations(blockId))  
  5.     for (loc <- locations) {  
  6.       logDebug("Getting remote block " + blockId + " from " + loc)  
  7.       // 使用BlockManagerWorker来获得block的数据  
  8.       val data = BlockManagerWorker.syncGetBlock(  
  9.         GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))  
  10.       if (data != null) {  
  11.         if (asValues) {  
  12.           // 取到就返回  
  13.           return Some(dataDeserialize(blockId, data))  
  14.         } else {  
  15.           return Some(data)  
  16.         }  
  17.       }  
  18.       logDebug("The value of block " + blockId + " is null")  
  19.     }  
  20.     logDebug("Block " + blockId + " not found")  
  21.     None  
  22.   }  

 

思路是通过BlockManagerMaster来获得block的位置信息,得到的集合打乱后,遍历位置信息,通过BlockManagerWorker去获得数据,只要得到了,就反序列化之后返回。

 

在TaskResultGetter处理的时候,成功和失败分别向Scheduler调用handleSuccessfulTask和handleFailedTask方法。

handleSuccessfulTask在DAGScheduler里,会发出CompletionEvent事件,这一步结尾工作会有很多细节处理,这里先不阅读了。

handleFailedTask的话,只要TaskSetManager不是zombie,task没有被kill,那么会继续调用backend.reviveOffers()来re-run。

http://jgsj.iteye.com/blog/2050689

分享到:
评论

相关推荐

    深入理解Spark:核心思想及源码分析.pdf

    《深入理解Spark:核心思想及源码分析》这本书旨在帮助读者深入掌握Apache Spark这一大数据处理框架的核心原理与实现细节。Spark作为一个快速、通用且可扩展的数据处理系统,已经在大数据领域得到了广泛应用。它提供...

    Spark源码分析.pdf

    3. 调度系统:分析DAGScheduler和TaskScheduler的工作流程,理解如何优化任务执行。 4. 内存管理:讨论Spark如何在内存和磁盘之间进行数据交换,以及如何配置内存参数以提高性能。 5. Shuffle过程:深入理解数据重排...

    spark-2.4.0源码

    8. **DAGScheduler和TaskScheduler**:Spark的调度系统,DAGScheduler将用户作业转化为Stage,而TaskScheduler负责将Stage分解为任务并在Executor上执行。 9. **YARN和Mesos集成**:Spark可以运行在Hadoop的YARN或...

    spark1.3.1源码下载

    - `core/`:包含Spark核心组件的源码,如`spark-core`、`spark-network`等子模块。 - `sql/`:Spark SQL的相关代码,包括DataFrame API和Hive的集成。 - `streaming/`:Spark Streaming的实现,包括DStream...

    spark源码分析

    总结,Spark源码分析是一个涵盖广泛的主题,涉及Spark的设计理念、核心组件、内存管理、任务调度等多个方面。深入学习Spark源码,不仅能提升我们对大数据处理的理解,还能为日常开发工作提供有力的支持。

    spark2.3源码下载

    Spark Core是基础,提供分布式任务调度和内存管理;Spark SQL负责处理SQL查询和数据处理,支持Hive等传统SQL系统;Spark Streaming处理实时流数据,提供DStream抽象;MLlib是机器学习库,包含多种算法和工具。 二、...

    Apache Spark源码走读:如何进行代码跟读

    在进行源码分析的过程中,往往需要对代码进行一些修改来辅助理解和测试。如果不想提交这些临时修改到版本控制系统,可以使用git的`stash`功能暂时保存这些更改,以便随时恢复而不影响其他工作。 **示例**:假设你...

    Spark-2.4.5官网下载源码包

    8. Spark Streaming的微批处理模型:将实时流数据转化为一系列小的批处理作业,实现低延迟处理。 9. MLlib:Spark的机器学习库,包括分类、回归、聚类、协同过滤等多种算法,以及模型选择和评估工具。 10. GraphX:...

    Spark github源码 例子很有价值

    "Spark github源码 例子很有价值"这个标题表明,通过分析这些源码,我们可以学习到Spark如何实现其核心功能,如弹性分布式数据集(RDD)、DataFrame、Dataset以及Spark SQL等。 描述中提到"github无法下载的可以直接...

    spark-2.2.0.tgz源码

    4. **DAG执行模型**:Spark通过将任务拆分为一系列任务阶段,并构建有向无环图(DAG)来执行任务。每个任务阶段由一系列可以并行运行的任务组成,这些任务之间通过RDD的依赖关系连接。 5. **存储系统集成**:Spark...

    深入理解Spark 核心思想与源码分析

    源码分析方面,Spark的设计遵循模块化的架构,主要包括以下几个部分: 1. **Spark Core**:提供基本的分布式计算框架,包括DAG调度、任务调度、内存管理、错误恢复和集群资源管理。 2. **Spark SQL**:Spark的SQL...

    spark2.2.0源码包(含分析文档),包含机器学习mlib 及ml

    7. **源码解析**:源码分析文档可能详细讲解了Spark的各个组件是如何协同工作的,包括RDD的生命周期、调度器的工作方式、DataFrame/Dataset的优化策略、机器学习算法的实现细节等。 对于机器学习开发者而言,MLlib...

    spark(svn自动下载)源码

    Spark源码分析首先需要了解其核心组件和架构。Spark主要由以下几个部分组成: 1. **Spark Core**:这是Spark的基础,提供分布式任务调度、内存管理、错误恢复以及与其他存储系统交互的能力。它定义了RDD(弹性...

    spark 3.2.1 源码下载

    通过阅读源码,你可以了解Spark如何处理数据、如何执行任务调度以及如何实现分布式计算。 对于开发者来说,理解源码可以帮助优化应用性能,解决遇到的问题,甚至贡献自己的代码到Spark项目中。此外,通过编译源码,...

    Spark2.6.3源码

    源码分析对于深入理解Spark的工作原理和进行二次开发至关重要。 首先,Java目录是Spark源码的核心部分,它包含了Spark的大部分核心组件和功能的实现。以下是一些主要的子目录及其相关的知识点: 1. **core**:这是...

    Spark 核心思想与源码分析.7z

    《Spark核心思想与源码分析》是一份深入探讨Apache Spark技术的资料,旨在帮助读者理解Spark的内在工作原理,从而更好地应用和优化这个大数据处理框架。Spark作为一个分布式计算框架,以其高效、易用和可扩展性赢得...

    spark高级数据分析源代码

    1. Spark Core:Spark的基础组件,提供了分布式任务调度、内存管理、错误恢复等基础功能。 2. Spark SQL:Spark的SQL接口,可以处理结构化和半结构化数据,与Hive兼容,支持DataFrame和Dataset API。 3. Spark ...

    spark-2.2.1.tar.gz 源码

    - **Spark Core**:负责任务调度,使用DAG(有向无环图)来表示任务,通过TaskScheduler接口实现任务调度。内存管理采用Tachyon或HDFS作为持久化存储,容错机制基于检查点和事件日志。 - **Spark SQL**:提供了...

    spark-2.2.0源码

    总结,Spark 2.2.0源码的分析,不仅可以帮助我们理解Spark的内在机制,还能指导我们在实际项目中更好地利用Spark进行大数据处理。通过对源码的深入学习,我们可以发现并解决性能瓶颈,实现更高效的数据处理和分析。...

    spark-1.6.3.zip

    在源码中,你可以找到DAGScheduler和TaskScheduler的实现,理解Spark如何将作业拆分成任务并分配到集群中的执行节点。 2. **Spark SQL**:Spark SQL提供了DataFrame API,使得用户可以用SQL语句处理数据。它通过 ...

Global site tag (gtag.js) - Google Analytics