Spark Runtime里的主要层次分析,梳理Runtime组件和执行流程,
DAGScheduler
Job=多个stage,Stage=多个同种task, Task分为ShuffleMapTask和ResultTask,Dependency分为ShuffleDependency和NarrowDependency
面向stage的切分,切分依据为宽依赖
维护waiting jobs和active jobs,维护waiting stages、active stages和failed stages,以及与jobs的映射关系
主要职能
- 接收提交Job的主入口,
submitJob(rdd, ...)
或runJob(rdd, ...)
。在SparkContext
里会调用这两个方法。- 生成一个Stage并提交,接着判断Stage是否有父Stage未完成,若有,提交并等待父Stage,以此类推。结果是:DAGScheduler里增加了一些waiting stage和一个running stage。
- running stage提交后,分析stage里Task的类型,生成一个Task描述,即TaskSet。
- 调用
TaskScheduler.submitTask(taskSet, ...)
方法,把Task描述提交给TaskScheduler。TaskScheduler依据资源量和触发分配条件,会为这个TaskSet分配资源并触发执行。 -
DAGScheduler
提交job后,异步返回JobWaiter
对象,能够返回job运行状态,能够cancel job,执行成功后会处理并返回结果
- 处理
TaskCompletionEvent
- 如果task执行成功,对应的stage里减去这个task,做一些计数工作:
- 如果task是ResultTask,计数器
Accumulator
加一,在job里为该task置true,job finish总数加一。加完后如果finish数目与partition数目相等,说明这个stage完成了,标记stage完成,从running stages里减去这个stage,做一些stage移除的清理工作 - 如果task是ShuffleMapTask,计数器
Accumulator
加一,在stage里加上一个output location,里面是一个MapStatus
类。MapStatus
是ShuffleMapTask
执行完成的返回,包含location信息和block size(可以选择压缩或未压缩)。同时检查该stage完成,向MapOutputTracker
注册本stage里的shuffleId和location信息。然后检查stage的output location里是否存在空,若存在空,说明一些task失败了,整个stage重新提交;否则,继续从waiting stages里提交下一个需要做的stage
- 如果task是ResultTask,计数器
- 如果task是重提交,对应的stage里增加这个task
- 如果task是fetch失败,马上标记对应的stage完成,从running stages里减去。如果不允许retry,abort整个stage;否则,重新提交整个stage。另外,把这个fetch相关的location和map任务信息,从stage里剔除,从
MapOutputTracker
注销掉。最后,如果这次fetch的blockManagerId对象不为空,做一次ExecutorLost
处理,下次shuffle会换在另一个executor上去执行。 - 其他task状态会由
TaskScheduler
处理,如Exception, TaskResultLost, commitDenied等。
- 如果task执行成功,对应的stage里减去这个task,做一些计数工作:
- 其他与job相关的操作还包括:cancel job, cancel stage, resubmit failed stage等
其他职能
1. cacheLocations 和 preferLocation
privateval cacheLocs =newHashMap[Int,Array[Seq[TaskLocation]]]
TaskScheduler
维护task和executor对应关系,executor和物理资源对应关系,在排队的task和正在跑的task。
内部维护一个任务队列,根据FIFO或Fair策略,调度任务。
TaskScheduler
本身是个接口,spark里只实现了一个TaskSchedulerImpl
,理论上任务调度可以定制。下面是TaskScheduler
的主要接口:
def start():Unitdef postStartHook(){}def stop():Unitdef submitTasks(taskSet:TaskSet):Unitdef cancelTasks(stageId:Int, interruptThread:Boolean)def setDAGScheduler(dagScheduler:DAGScheduler):Unitdef executorHeartbeatReceived(execId:String, taskMetrics:Array[(Long,TaskMetrics)],
blockManagerId:BlockManagerId):Boolean
主要职能
-
submitTasks(taskSet)
,接收DAGScheduler
提交来的tasks- 为tasks创建一个
TaskSetManager
,添加到任务队列里。TaskSetManager
跟踪每个task的执行状况,维护了task的许多具体信息。 - 触发一次资源的索要。
- 首先,
TaskScheduler
对照手头的可用资源和Task队列,进行executor分配(考虑优先级、本地化等策略),符合条件的executor会被分配给TaskSetManager
。 - 然后,得到的Task描述交给
SchedulerBackend
,调用launchTask(tasks)
,触发executor上task的执行。task描述被序列化后发给executor,executor提取task信息,调用task的run()
方法执行计算。
- 首先,
- 为tasks创建一个
-
cancelTasks(stageId)
,取消一个stage的tasks- 调用
SchedulerBackend
的killTask(taskId, executorId, ...)
方法。taskId和executorId在TaskScheduler
里一直维护着。
- 调用
-
resourceOffer(offers: Seq[Workers])
,这是非常重要的一个方法,调用者是SchedulerBacnend
,用途是底层资源SchedulerBackend
把空余的workers资源交给TaskScheduler
,让其根据调度策略为排队的任务分配合理的cpu和内存资源,然后把任务描述列表传回给SchedulerBackend
- 从worker offers里,搜集executor和host的对应关系、active executors、机架信息等等
- worker offers资源列表进行随机洗牌,任务队列里的任务列表依据调度策略进行一次排序
- 遍历每个taskSet,按照进程本地化、worker本地化、机器本地化、机架本地化的优先级顺序,为每个taskSet提供可用的cpu核数,看是否满足
- 默认一个task需要一个cpu,设置参数为
"spark.task.cpus=1"
- 为taskSet分配资源,校验是否满足的逻辑,最终在
TaskSetManager
的resourceOffer(execId, host, maxLocality)
方法里 - 满足的话,会生成最终的任务描述,并且调用
DAGScheduler
的taskStarted(task, info)
方法,通知DAGScheduler
,这时候每次会触发DAGScheduler
做一次submitMissingStage
的尝试,即stage的tasks都分配到了资源的话,马上会被提交执行
- 默认一个task需要一个cpu,设置参数为
-
statusUpdate(taskId, taskState, data)
,另一个非常重要的方法,调用者是SchedulerBacnend
,用途是SchedulerBacnend
会将task执行的状态汇报给TaskScheduler
做一些决定- 若
TaskLost
,找到该task对应的executor,从active executor里移除,避免这个executor被分配到其他task继续失败下去。 - task finish包括四种状态:finished, killed, failed, lost。只有finished是成功执行完成了。其他三种是失败。
- task成功执行完,调用
TaskResultGetter.enqueueSuccessfulTask(taskSet, tid, data)
,否则调用TaskResultGetter.enqueueFailedTask(taskSet, tid, state, data)
。TaskResultGetter
内部维护了一个线程池,负责异步fetch task执行结果并反序列化。默认开四个线程做这件事,可配参数"spark.resultGetter.threads"=4
。
- 若
TaskResultGetter取task result的逻辑
- 对于success task,如果taskResult里的数据是直接结果数据,直接把data反序列出来得到结果;如果不是,会调用
blockManager.getRemoteBytes(blockId)
从远程获取。如果远程取回的数据是空的,那么会调用TaskScheduler.handleFailedTask
,告诉它这个任务是完成了的但是数据是丢失的。否则,取到数据之后会通知BlockManagerMaster
移除这个block信息,调用TaskScheduler.handleSuccessfulTask
,告诉它这个任务是执行成功的,并且把result data传回去。 - 对于failed task,从data里解析出fail的理由,调用
TaskScheduler.handleFailedTask
,告诉它这个任务失败了,理由是什么。
SchedulerBackend
在TaskScheduler
下层,用于对接不同的资源管理系统,SchedulerBackend
是个接口,需要实现的主要方法如下:
def start():Unitdef stop():Unitdef reviveOffers():Unit// 重要方法:SchedulerBackend把自己手头上的可用资源交给TaskScheduler,TaskScheduler根据调度策略分配给排队的任务吗,返回一批可执行的任务描述,SchedulerBackend负责launchTask,即最终把task塞到了executor模型上,executor里的线程池会执行task的run()def killTask(taskId:Long, executorId:String, interruptThread:Boolean):Unit=thrownewUnsupportedOperationException
粗粒度:进程常驻的模式,典型代表是standalone模式,mesos粗粒度模式,yarn
细粒度:mesos细粒度模式
这里讨论粗粒度模式,更好理解:CoarseGrainedSchedulerBackend
。
维护executor相关信息(包括executor的地址、通信端口、host、总核数,剩余核数),手头上executor有多少被注册使用了,有多少剩余,总共还有多少核是空的等等。
主要职能
- Driver端主要通过actor监听和处理下面这些事件:
-
RegisterExecutor(executorId, hostPort, cores, logUrls)
。这是executor添加的来源,通常worker拉起、重启会触发executor的注册。CoarseGrainedSchedulerBackend
把这些executor维护起来,更新内部的资源信息,比如总核数增加。最后调用一次makeOffer()
,即把手头资源丢给TaskScheduler
去分配一次,返回任务描述回来,把任务launch起来。这个makeOffer()
的调用会出现在任何与资源变化相关的事件中,下面会看到。 -
StatusUpdate(executorId, taskId, state, data)
。task的状态回调。首先,调用TaskScheduler.statusUpdate
上报上去。然后,判断这个task是否执行结束了,结束了的话把executor上的freeCore加回去,调用一次makeOffer()
。 -
ReviveOffers
。这个事件就是别人直接向SchedulerBackend
请求资源,直接调用makeOffer()
。 -
KillTask(taskId, executorId, interruptThread)
。这个killTask的事件,会被发送给executor的actor,executor会处理KillTask
这个事件。 -
StopExecutors
。通知每一个executor,处理StopExecutor
事件。 -
RemoveExecutor(executorId, reason)
。从维护信息中,那这堆executor涉及的资源数减掉,然后调用TaskScheduler.executorLost()
方法,通知上层我这边有一批资源不能用了,你处理下吧。TaskScheduler
会继续把executorLost
的事件上报给DAGScheduler
,原因是DAGScheduler
关心shuffle任务的output location。DAGScheduler
会告诉BlockManager
这个executor不可用了,移走它,然后把所有的stage的shuffleOutput信息都遍历一遍,移走这个executor,并且把更新后的shuffleOutput信息注册到MapOutputTracker
上,最后清理下本地的CachedLocations
Map。
-
-
reviveOffers()
方法的实现。直接调用了makeOffers()
方法,得到一批可执行的任务描述,调用launchTasks
。 -
launchTasks(tasks: Seq[Seq[TaskDescription]])
方法。- 遍历每个task描述,序列化成二进制,然后发送给每个对应的executor这个任务信息
- 如果这个二进制信息太大,超过了9.2M(默认的akkaFrameSize 10M 减去 默认 为akka留空的200K),会出错,abort整个taskSet,并打印提醒增大akka frame size
- 如果二进制数据大小可接受,发送给executor的actor,处理
LaunchTask(serializedTask)
事件。
- 遍历每个task描述,序列化成二进制,然后发送给每个对应的executor这个任务信息
Executor
Executor是spark里的进程模型,可以套用到不同的资源管理系统上,与SchedulerBackend
配合使用。
内部有个线程池,有个running tasks map,有个actor,接收上面提到的由SchedulerBackend
发来的事件。
事件处理
-
launchTask
。根据task描述,生成一个TaskRunner
线程,丢尽running tasks map里,用线程池执行这个TaskRunner
-
killTask
。从running tasks map里拿出线程对象,调它的kill方法。
http://www.mamicode.com/info-detail-530178.html
相关推荐
《Spark软件组件架构图及Task Scheduler架构解析》 Spark,作为大数据处理领域的明星框架,以其高效、易用和可扩展性赢得了广泛的认可。本文将深入探讨Spark的核心组件架构及其Task Scheduler的设计原理,帮助读者...
此外,Spark的调度器(包括DAGScheduler和TaskScheduler)负责任务的分解和执行。 3. **Spark组件**: - **Spark Core**:Spark的基础模块,提供RDD操作和任务调度。 - **Spark SQL**:用于结构化数据处理,允许...
GPU Task Scheduler GPU Task Scheduler是一个Python库,用于并行调度GPU作业。 在设计和运行基于神经网络的算法时,我们经常需要在大量参数组合上测试代码。 为了提高效率,我们可能还希望将任务并行分布在多个...
1. **Task的调度**:DAGScheduler根据DAG划分出Stage,并将Stage提交给TaskScheduler。 2. **Task的分配**:TaskScheduler决定每个Stage中的Task应该在哪个Executor上运行。 3. **Task的执行**:Executor上的...
在.NET框架中,微软提供了一个名为`System.Threading.Tasks.TaskScheduler`的组件,它允许开发者以编程方式创建、管理和控制计划任务。这个组件是.NET Framework的一部分,适用于多种.NET应用程序,包括Windows服务...
- **Spark Core**:Spark的基础模块,提供了分布式任务调度和资源管理。 2. **Spark运行机制**: - **DAG(有向无环图)**:Spark作业转换为DAG,任务分解为Stage,Stage进一步划分为Task。 - **Task Scheduler*...
Arduino-TaskScheduler.zip,Arduino、ESPX和STM32微控制器任务调度器的协同多任务处理,Arduino是一家开源软硬件公司和制造商社区。Arduino始于21世纪初,深受电子制造商的欢迎,Arduino通过开源系统提供了很多灵活性...
Win32 Perl扩展,可以完全管理通过Perl中的Scheduling Agent计划的任务。
例如,DAGScheduler如何将作业拆分为任务,以及TaskScheduler如何将任务分配给Executor。 7. **优化技巧**: Spark的性能优化是重要一环,包括配置调优、内存管理优化、数据序列化、减少shuffle操作等。理解这些...
TaskScheduler_适用于Arduino+ESPx+STM32+nRF+其他微控制器的协作多任务处理器_优质嵌入式项目分享
8. **DAGScheduler和TaskScheduler**:Spark的调度系统,DAGScheduler将用户作业转化为Stage,而TaskScheduler负责将Stage分解为任务并在Executor上执行。 9. **YARN和Mesos集成**:Spark可以运行在Hadoop的YARN或...
它旨在与Windows Task Scheduler或cron一起用于家庭自动化。 *在天黑之前,用它打开灯! 该程序可以从调用开始等待,直到在命令行上指定的事件发生为止,或者可以立即返回以指示它是白天还是晚上。 最好将Sunwait...
C#利用Interop.TaskScheduler.dll添加删除计划任务,可实现程序随Windows系统自动启动; 项目用VS2017打开,需要.net 2.0支持,需要管理员权限;支持win7 win10;不支持xp。
在 Spark 调度中最重要的是 DAGScheduler 和 TaskScheduler 两个调度器,其中,DAGScheduler 负责任务的逻辑调度, 将作业拆分为不同阶段的具有依赖关系的任务集。TaskScheduler 则负责具体任务的调度执行...
4. **执行与分析**:在spark-shell中运行程序,观察执行过程,理解Spark如何处理数据。 **五、数据集说明** 本实践使用的数据集是一个小规模的文本数据,包含多行文本,如"How nice I love Spark I love Hadoop How...
3. 调度系统:分析DAGScheduler和TaskScheduler的工作流程,理解如何优化任务执行。 4. 内存管理:讨论Spark如何在内存和磁盘之间进行数据交换,以及如何配置内存参数以提高性能。 5. Shuffle过程:深入理解数据重排...
**Android TaskScheduler:高效异步任务处理库** 在Android应用开发中,为了提供良好的用户体验,开发者经常需要在后台执行耗时操作,如网络请求、数据处理等,这就涉及到了异步处理。`TaskScheduler`是一个专为...
2. **调度机制**:DAGScheduler和TaskScheduler的工作流程,如何调度任务到Executor。 3. **存储系统**:BlockManager如何管理内存和磁盘上的数据,以及如何与Shuffle服务交互。 4. **网络通信**:Akka在Spark中的...
TaskScheduler 解析:TaskScheduler 原理剖析、TaskScheduler 源码解析;SchedulerBackend 解析:SchedulerBackend 原理剖析、SchedulerBackend 源码解析、Spark 程序的注册机制、Spark 程序对计算资源Executor 的...
.archMicrosoft.Win32.TaskScheduler.dll