spark的任务调度系统如下所示:
从上图中可以看出来由RDD Objects产生DAG,然后进入了DAGScheduler阶段,DAGScheduler是面向state的高层次的调度器,DAGScheduler把DAG拆分成很多的tasks,每组的tasks都是一个state,每当遇到shuffle就会产生新的state,可以看出上图一共有三个state;DAGScheduler需要记录那些RDD被存入磁盘等物化动作,同时需寻找task的最优化调度,例如数据本地性等;
在Spark中作业调度的相关类最重要的就是DAGScheduler,DAGScheduler顾名思义就是基于DAG图的Scheduler
DAG全称 Directed Acyclic Graph,有向无环图。简单的来说,就是一个由顶点和有方向性的边构成的图中,从任意一个顶点出发,没有任何一条路径会将其带回到出发的顶点。
在作业调度系统中,调度的基础就在于判断多个作业任务的依赖关系,这些任务之间可能存在多重的依赖关系,也就是说有些任务必须先获得执行,然后另外的相关依赖任务才能执行,但是任务之间显然不应该出现任何直接或间接的循环依赖关系,所以本质上这种关系适合用DAG有向无环图来表示。
概括地描述DAGScheduler和TaskScheduler(关于TaskScheduler的相关细节,在我之前的关于Spark运行模式的文章中有)的功能划分就是:TaskScheduler负责实际每个具体任务的物理调度,DAGScheduler负责将作业拆分成不同阶段的具有依赖关系的多批任务,可以理解为DAGScheduler负责任务的逻辑调度。
基本概念
Task任务 :单个分区数据集上的最小处理流程单元
TaskSet任务集:一组关联的,但是互相之间没有Shuffle依赖关系的任务所组成的任务集
Stage调度阶段:一个任务集所对应的调度阶段
Job作业:一次RDD Action生成的一个或多个Stage所组成的一次计算作业
运行方式
DAGScheduler在SparkContext初始化过程中实例化,一个SparkContext对应一个DAGScheduler,DAGScheduler的事件循环逻辑基于Akka Actor的消息传递机制来构建,在DAGScheduler的Start函数中创建了一个eventProcessActor用来处理各种DAGSchedulerEvent,这些事件包括作业的提交,任务状态的变化,监控等等
- private[scheduler]case class JobSubmitted(
- jobId: Int,
- finalRDD: RDD[_],
- func: (TaskContext, Iterator[_]) => _,
- partitions: Array[Int],
- allowLocal: Boolean,
- callSite: String,
- listener: JobListener,
- properties: Properties = null)
- extends DAGSchedulerEvent
- private[scheduler]case class JobCancelled(jobId: Int) extends DAGSchedulerEvent
- private[scheduler]case class JobGroupCancelled(groupId: String) extends DAGSchedulerEvent
- private[scheduler]case object AllJobsCancelled extends DAGSchedulerEvent
- private[scheduler]
- case classBeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent
- private[scheduler]
- case classGettingResultEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent
- private[scheduler]case class CompletionEvent(
- task: Task[_],
- reason: TaskEndReason,
- result: Any,
- accumUpdates: Map[Long, Any],
- taskInfo: TaskInfo,
- taskMetrics: TaskMetrics)
- extends DAGSchedulerEvent
- private[scheduler]case class ExecutorAdded(execId: String, host: String) extendsDAGSchedulerEvent
- private[scheduler]case class ExecutorLost(execId: String) extends DAGSchedulerEvent
- private[scheduler] caseclass TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent
- private[scheduler]case object ResubmitFailedStages extends DAGSchedulerEvent
- private[scheduler]case object StopDAGScheduler extends DAGSchedulerEvent
不论是Client还是TaskScheduler与DAGScheduler的交互方式基本上都是通过DAGScheduler暴露的函数接口间接的给eventProcessActor发送相关消息
如前面所说,DAGScheduler最重要的任务之一就是计算作业和任务的依赖关系,制定调度逻辑
DAGScheduler作业调度的两个主要入口是submitJob 和 runJob,两者的区别在于前者返回一个Jobwaiter对象,可以用在异步调用中,用来判断作业完成或者取消作业,runJob在内部调用submitJob,阻塞等待直到作业完成(或失败)
具体往DAGScheduler提交作业的操作,基本都是封装在RDD的相关Action操作里面,不需要用户显式的提交作业
用户代码都是基于RDD的一系列计算操作,实际运行时,这些计算操作是Lazy执行的,并不是所有的RDD操作都会触发Spark往Cluster上提交实际作业,基本上只有一些需要返回数据或者向外部输出的操作才会触发实际计算工作,其它的变换操作基本上只是生成对应的RDD记录依赖关系。
DAGScheduler内部维护了各种 task / stage / job之间的映射关系表
工作流程
提交并运行一个Job的基本流程,包括以下步骤
划分Stage
当某个操作触发计算,向DAGScheduler提交作业时,DAGScheduler需要从RDD依赖链最末端的RDD出发,遍历整个RDD依赖链,划分Stage任务阶段,并决定各个Stage之间的依赖关系。Stage的划分是以ShuffleDependency为依据的,也就是说当某个RDD的运算需要将数据进行Shuffle时,这个包含了Shuffle依赖关系的RDD将被用来作为输入信息,构建一个新的Stage,由此为依据划分Stage,可以确保有依赖关系的数据能够按照正确的顺序得到处理和运算。
以GroupByKey操作为例,该操作返回的结果实际上是一个ShuffleRDD,当DAGScheduler遍历到这个ShuffleRDD的时候,因为其Dependency是一个ShuffleDependency,于是这个ShuffleRDD的父RDD以及shuffleDependency等对象就被用来构建一个新的Stage,这个Stage的输出结果的分区方式,则由ShuffleDependency中的Partitioner对象来决定。
可以看到,尽管划分和构建Stage的依据是ShuffleDependency,对应的RDD也就是这里的ShuffleRDD,但是这个Stage所处理的数据是从这个shuffleRDD的父RDD开始计算的,只是最终的输出结果的位置信息参考了ShuffleRDD返回的ShuffleDependency里所包含的内容。而shuffleRDD本身的运算操作(其实就是一个获取shuffle结果的过程),是在下一个Stage里进行的。
生成Job,提交Stage
上一个步骤得到一个或多个有依赖关系的Stage,其中直接触发Job的RDD所关联的Stage作为FinalStage生成一个Job实例,这两者的关系进一步存储在resultStageToJob映射表中,用于在该Stage全部完成时做一些后续处理,如报告状态,清理Job相关数据等。
具体提交一个Stage时,首先判断该Stage所依赖的父Stage的结果是否可用,如果所有父Stage的结果都可用,则提交该Stage,如果有任何一个父Stage的结果不可用,则迭代尝试提交父Stage。 所有迭代过程中由于所依赖Stage的结果不可用而没有提交成功的Stage都被放到waitingStages列表中等待将来被提交
什么时候waitingStages中的Stage会被重新提交呢,当一个属于中间过程Stage的任务(这种类型的任务所对应的类为ShuffleMapTask)完成以后,DAGScheduler会检查对应的Stage的所有任务是否都完成了,如果是都完成了,则DAGScheduler将重新扫描一次waitingStages中的所有Stage,检查他们是否还有任何依赖的Stage没有完成,如果没有就可以提交该Stage。
此外每当完成一次DAGScheduler的事件循环以后,也会触发一次从等待和失败列表中扫描并提交就绪Stage的调用过程
任务集的提交
每个Stage的提交,最终是转换成一个TaskSet任务集的提交,DAGScheduler通过TaskScheduler接口提交TaskSet,这个TaskSet最终会触发TaskScheduler构建一个TaskSetManager的实例来管理这个TaskSet的生命周期,对于DAGScheduler来说提交Stage的工作到此就完成了。而TaskScheduler的具体实现则会在得到计算资源的时候,进一步通过TaskSetManager调度具体的Task到对应的Executor节点上进行运算
任务作业完成状态的监控
要保证相互依赖的job/stage能够得到顺利的调度执行,DAGScheduler就必然需要监控当前Job / Stage乃至Task的完成情况。这是通过对外(主要是对TaskScheduler)暴露一系列的回调函数来实现的,对于TaskScheduler来说,这些回调函数主要包括任务的开始结束失败,任务集的失败,DAGScheduler根据这些Task的生命周期信息进一步维护Job和Stage的状态信息。
此外TaskScheduler还可以通过回调函数通知DAGScheduler具体的Executor的生命状态,如果某一个Executor崩溃了,或者由于任何原因与Driver失去联系了,则对应的Stage的shuffleMapTask的输出结果也将被标志为不可用,这也将导致对应Stage状态的变更,进而影响相关Job的状态,再进一步可能触发对应Stage的重新提交来重新计算获取相关的数据。
任务结果的获取
一个具体的任务在Executor中执行完毕以后,其结果需要以某种形式返回给DAGScheduler,根据任务类型的不同,任务的结果的返回方式也不同
对于FinalStage所对应的任务(对应的类为ResultTask)返回给DAGScheduler的是运算结果本身,而对于ShuffleMapTask,返回给DAGScheduler的是一个MapStatus对象,MapStatus对象管理了ShuffleMapTask的运算输出结果在BlockManager里的相关存储信息,而非结果本身,这些存储位置信息将作为下一个Stage的任务的获取输入数据的依据
而根据任务结果的大小的不同,ResultTask返回的结果又分为两类,如果结果足够小,则直接放在DirectTaskResult对象内,如果超过特定尺寸(默认约10MB)则在Executor端会将DirectTaskResult先序列化,再把序列化的结果作为一个Block存放在BlockManager里,而后将BlockManager返回的BlockID放在IndirectTaskResult对象中返回给TaskScheduler,TaskScheduler进而调用TaskResultGetter将IndirectTaskResult中的BlockID取出并通过BlockManager最终取得对应的DirectTaskResult。当然从DAGScheduler的角度来说,这些过程对它来说是透明的,它所获得的都是任务的实际运算结果。
TaskSetManager
前面提到DAGScheduler负责将一组任务提交给TaskScheduler以后,这组任务的调度工作对它来说就算完成了,接下来这组任务内部的调度逻辑,则是由TaskSetManager来完成的。
TaskSetManager的主要接口包括:
ResourceOffer:根据TaskScheduler所提供的单个Resource资源包括host,executor和locality的要求返回一个合适的Task。TaskSetManager内部会根据上一个任务成功提交的时间,自动调整自身的Locality匹配策略,如果上一次成功提交任务的时间间隔很长,则降低对Locality的要求(例如从最差要求Process Local降低为最差要求Node Local),反之则提高对Locality的要求。这一动态调整Locality策略基本可以理解为是为了提高任务在最佳Locality的情况下得到运行的机会,因为Resource资源可能是在短期内分批提供给TaskSetManager的,动态调整Locality门槛有助于改善整体的Locality分布情况。
举个例子,如果TaskSetManager内部有a/b两个任务等待调度,a/b两个任务Prefer的节点分别是Host A 和 Host B, 这时候先有一个Host C的资源以最差匹配为Rack Local的形式提供给TaskSetManager,如果没有内部动态Locality调整机制,那么比如a任务将被调度。接下来在很短的时间间隔内,一个Host A的资源来到,同样的b任务被调度。 而原本最佳的情况应该是任务b调度给Host C, 而任务a调度给Host A。
当然动态Locality也会带来一定的调度延迟,因此如何设置合适的调整策略也是需要针对实际情况来确定的。目前可以设置参数包括
spark.locality.wait.process
spark.locality.wait.node
spark.locality.wait.rack
即各个Locality级别中TaskSetManager等待分配下一个任务的时间,如果距离上一次成功分配资源的时间间隔超过对应的参数值,则降低匹配要求(即process -> node -> rack -> any), 而每当成功分配一个任务时,则重置时间间隔,并更新Locality级别为当前成功分配的任务的Locality级别
handleSuccessfulTask / handleFailedTask /handleTaskGettingResult :用于更新任务的运行状态,Taskset Manager在这些函数中除了更新自身维护的任务状态列表等信息,用于剩余的任务的调度以外,也会进一步调用DAGScheduler的函数接口将结果通知给它。
此外,TaskSetManager在调度任务时还可能进一步考虑Speculation的情况,亦即当某个任务的运行时间超过其它任务的运行完成时间的一个特定比例值时,该任务可能被重复调度。目的当然是为了防止某个运行中的Task由于某些特殊原因(例如所在节点CPU负载过高,IO带宽被占等等)运行特别缓慢拖延了整个Stage的完成时间,Speculation同样需要根据集群和作业的实际情况合理配置,否则可能反而降低集群性能。
Pool 调度池
前面我们说了,DAGScheduler负责构建具有依赖关系的任务集,TaskSetManager负责在具体的任务集的内部调度任务,而TaskScheduler负责将资源提供给TaskSetManager供其作为调度任务的依据。但是每个SparkContext可能同时存在多个可运行的任务集(没有依赖关系),这些任务集之间如何调度,则是由调度池(POOL)对象来决定的,Pool所管理的对象是下一级的Pool或者TaskSetManager对象
TaskSchedulerImpl在初始化过程中会根据用户设定的SchedulingMode(默认为FIFO)创建一个rootPool根调度池,之后根据具体的调度模式再进一步创建SchedulableBuilder对象,具体的SchedulableBuilder对象的BuildPools方法将在rootPool的基础上完成整个Pool的构建工作。
目前的实现有两种调度模式,对应了两种类型的Pool:
FIFO:先进先出型,FIFO Pool直接管理的是TaskSetManager,每个TaskSetManager创建时都存储了其对应的StageID,FIFO pool最终根据StageID的顺序来调度TaskSetManager
FAIR:公平调度,FAIR Pool管理的对象是下一级的POOL,或者TaskSetManager,公平调度的基本原则是根据所管理的Pool/TaskSetManager中正在运行的任务的数量来判断优先级,用户可以设置minShare最小任务数,weight任务权重来调整对应Pool里的任务集的优先程度。当采用公平调度模式时,目前所构建的调度池是两级的结构,即根调度池管理一组子调度池,子调度池进一步管理属于该调度池的TaskSetManager
公平调度模式的配置通过配置文件来管理,默认使用fairscheduler.xml文件,范例参见conf目录下的模板:
- <?xmlversionxmlversion="1.0"?>
- <allocations>
- <pool name="production">
- <schedulingMode>FAIR</schedulingMode>
- <weight>1</weight>
- <minShare>2</minShare>
- </pool>
- <pool name="test">
- <schedulingMode>FIFO</schedulingMode>
- <weight>2</weight>
- <minShare>3</minShare>
- </pool>
- </allocations>
由于这里的调度池是在SparkContext内部的调度,因此其调度范畴是一个基于该SparkContext的Spark应用程序,正常情况下,多个Spark应用程序之间在调度池层面是没有调度优先级关系的。那么这种调度模式的应用场合是怎样的呢? 举一个例子就是SparkServer或者SharkServer,作为一个长期运行的SparkContext,他们代理运行了其它连上Server的Spark应用的任务,这样你可以为每个链接按照用户名指定一个Pool运行,从而实现用户优先级和资源分配的合理调度等。
Spark应用之间的调度
前面提到调度池只是在SparkContxt内部调度资源,SparkContext之间的调度关系,按照Spark不同的运行模式,就不一定归Spark所管理的了。
在Mesos和YARN模式下,底层资源调度系统的调度策略由Mesos和YARN所决定,只有在Standalone模式下,Spark Master按照当前cluster资源是否满足等待列表中的Spark应用 对内存和CPU资源的需求,而决定是否创建一个SparkContext对应的Driver,进而完成Spark应用的启动过程,这可以粗略近似的认为是一种粗颗粒度的有条件的FIFO策略吧
相关推荐
### Spark作业调度详解 #### 一、Spark作业调度概述 在大数据处理领域,Apache Spark以其高效灵活的特点成为了处理大规模数据集的重要工具之一。Spark通过内存计算加速数据处理速度,并提供了丰富的API来支持多种...
YARN(Yet Another Resource Negotiator)是Apache Hadoop项目的一个子项目,其设计初衷是为了更好地管理大数据框架中的资源分配,以提升资源利用率和作业调度的效率。YARN通过引入资源管理器、节点管理器、应用程序...
将两者结合使用,可以在Kettle中构建和调度Spark作业,充分利用Spark的强大计算能力。 在Kettle中运行Spark作业主要涉及以下知识点: 1. **Kettle(PDI)基本概念**:Kettle是ETL(Extract, Transform, Load)工具...
- Spark作业调度:任务的切分、调度和执行流程。 - Spark性能调优策略:包括内存管理、持久化级别选择、广播变量使用、并行度设置、数据倾斜处理等。 - Spark容错机制:如RDD的不变性和分区概念,使得在节点失败时...
Oozie是基于Hadoop的大数据工作流调度框架,它提供了针对MapReduce、Java、Spark作业的调度功能。通过使用XML文件格式进行工作流设计,Oozie能够实现复杂的数据处理工作流。Oozie的一个主要特点是具有强大的容错能力...
同时,书中可能涵盖了Spark作业调度、数据分区策略、性能调优等方面的知识,这些都是提升Spark应用效率的关键。附录部分可能提供了额外的参考资料,例如常见问题解答、参考链接等。 《Apache Spark源码剖析》则针对...
面试时可能会讨论RDD(弹性分布式数据集)、DataFrame/Dataset、Spark SQL的SQL支持、Spark Streaming的DStream处理和Spark作业调度策略。 4. **Flink**:Flink的关键概念有DataStream、DataStream API、状态管理、...
当Spark作业开始运行时,首先初始化SparkContext,它是整个Spark作业的入口点。SparkContext创建DAGScheduler、TaskScheduler和SchedulerBackend等组件。一旦作业被提交,DAGScheduler计算出作业的DAG,并将stage ...
1、分布式作业管理、调度、监控; 2、基于spark streaming+Cassandra的实时分析和监控,包括性能分析、账号安全主动防御。 web部分采用spring boot开发,前端采用angularJS组织页面相关的各个部分,系统的技术和效果...
4. **Spark作业执行** - **Local模式**:在单机上运行,适合开发和测试。 - **YARN、Mesos或Standalone**:Spark可以运行在这些集群管理系统之上,实现资源调度。 5. **Java编程实践** - **创建SparkConf和...
Spark 生产优化是企业中 Spark 作业的调优的总结,涉及 Spark 任务的详细解释、调度、资源分配等多方面的内容。本文将从三个方向考虑 Spark 生产优化:磁盘存储、CPU cores 和内存。 磁盘存储优化 在 Spark 生产...
4. **Spark作业调度**:Spark使用Zookeeper进行作业调度和应用状态的跟踪,确保多任务并行执行的有序性。 对于无法在CDH官网上找到的Zookeeper-3.4.5-cdh5.16.2,可能是因为版本更新或者版权问题,导致部分旧版本...
3. **双跑验证**:在切换前,先并行运行Hive和Spark作业,确保两者的结果一致,这是为了验证Spark-SQL是否正确实现了业务逻辑。 4. **正式上线**:确认一切正常后,正式将生产环境中的Hive作业替换为Spark作业。 ##...
7. **Spark作业调度**:理解Spark的调度机制,包括Stage划分、Task生成以及如何调整并行度以优化性能。 8. **Spark Shell与Spark SQL**:探索Spark Shell作为交互式数据探索工具的使用,以及Spark SQL用于结构化...
3.1 作业调度简介 55 3.2 Application调度 56 3.3 Job调度 56 3.4 Tasks延时调度 56 第七章 Spark运行原理 57 1 Spark运行基本流程 57 2 Spark在不同集群中的运行架构 58 2.1 Spark on Standalone运行过程 59 2.2 ...
例如,可能有Hadoop集群管理、Spark作业调度、数据仓库服务、流处理引擎等,这些组件为企业提供了高效的数据处理能力。 3. **法律声明**:用户在使用该指南前需要同意一系列法律条款,包括但不限于只能通过官方渠道...
通过这些实验,学习者可以熟悉Spark API的使用,了解如何在实践中优化Spark作业性能,例如通过调整配置参数、分区策略等。 为了更好地学习和理解这些代码,建议按照以下步骤进行: 1. 阅读和理解代码逻辑,关注关键...
7. **Spark Shuffle**:在Spark作业中,数据的重新分区过程被称为shuffle,它是Spark性能优化的关键点,源码中会涉及shuffle的实现细节,包括HashShuffle和SortShuffle。 8. **DAGScheduler和TaskScheduler**:...
提交Spark作业后,Driver会将作业分解为Stage(基于shuffle划分),然后提交Task到Executor执行。Executor在内存中缓存数据,并在本地执行任务,提高整体性能。 7. Spark内存管理: Spark利用内存存储中间结果,...