- /*
- *schedule()解决了spark资源调度的问题
- */
- rivate def schedule() {
- //首先判断,master状态不是ALIVE的话,直接返回
- //也就是说,stanby master是不会进行application等资源调度的
- if (state != RecoveryState.ALIVE) { return }
- // First schedule drivers, they take strict precedence over applications
- // Randomization helps balance drivers
- //Random.shuffle的原理,大家要清楚,就是对传入的集合的元素进行随机的打乱
- //取出了workers中的所有之前注册上来的worker,进行过滤,必须是状态为ALIVE的worker
- //对状态为ALIVE的worker,调用Random的shuffle方法进行随机的打乱
- val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
- val numWorkersAlive = shuffledAliveWorkers.size
- var curPos = 0
- //首先,调度driver
- //为什么要调度driver,大家想一下,什么情况下,会注册driver,并且会导致driver被调度
- //其实 ,只有用yarn-cluster模式提交的时候,才会注册driver;因为standalone和yarn-client模式,都会在本地直接
- //启动driver,而不会来注册driver,就更不可能让master调度driver了
- //driver调度机制
- //遍历waittingDrivers ArrayBuffer
- for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
- // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
- // start from the last worker that was assigned a driver, and continue onwards until we have
- // explored all alive workers.
- var launched = false
- var numWorkersVisited = 0
- //while的条件,numWorkersVisited小于numWorkersAlive
- //什么意思?就是说,只要还有活着的worker没有遍历到,那么就继续进行遍历
- //而且,当前这个driver还没有被启动,也就是launched为false
- while (numWorkersVisited < numWorkersAlive && !launched) {
- val worker = shuffledAliveWorkers(curPos)
- numWorkersVisited += 1
- //如果当前这个worker的空闲内存量大于等于,driver需要的内存
- //并且worker的空闲cpu数量,大于等于driver需要的cpu数量
- if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
- //启动driver
- launchDriver(worker, driver)
- //并且将driver从waitingDrivers队列中移除
- waitingDrivers -= driver
- launched = true
- }
- //将指针指向下一个worker
- curPos = (curPos + 1) % numWorkersAlive
- }
- }
- // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
- // in the queue, then the second app, etc.
- // Application的调度机制(核心之核心,重中之重)
- // 首先, application的调度算法有两种,一种是spreadOutApps,另一种是非spreadOutApps
- if (spreadOutApps) {
- // Try to spread out each app among all the nodes, until it has all its cores
- //首先,遍历waitingApps中的ApplicationInfo,并且过滤出application还需要高度的cores的application
- for (app <- waitingApps if app.coresLeft > 0) {
- //从workers中,过滤状态为ALIVE的,再次过滤可以被Application使用的Worker,然后按照剩余cpu数量倒序排序
- val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
- .filter(canUse(app, _)).sortBy(_.coresFree).reverse
- val numUsable = usableWorkers.length
- //创建一个空数组,存储了要分配给每个worker的cpu数量
- val assigned = new Array[Int](numUsable) // Number of cores to give on each node
- //获取到底要分配多少cpu,取app剩余要分配的cpu的数量和worker总共可用cpu数量的最小值
- var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
- //通过这种算法,其实会将每个application,要启动的executor,都平均分布到各个worker上去
- //比如有20个cpu core要分配,那么实际会循环两遍worker,每次循环,给每个worker分配1个core
- //最后每个worker分配了2个core
- //while条件,只要要分配的cpu,还没有分配完,就继续循环
- var pos = 0
- while (toAssign > 0) {
- //每一个worker,如果空闲的cpu数量大于,已经分配出去的cpu数量
- //也就是说,worker还有可分配的cpu
- if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
- //将总共要分配的cpu数量-1,因为这里已经决定在这个worker上分配一个cpu了
- toAssign -= 1
- //给这个worker分配的cpu数量,加1
- assigned(pos) += 1
- }
- //指针移动到下一下worker
- pos = (pos + 1) % numUsable
- }
- // Now that we've decided how many cores to give on each node, let's actually give them
- // 给每个worker分配完application要求的cpu core之后
- // 遍历worker
- for (pos <- 0 until numUsable) {
- //只要判断之前给这个worker分配到了core
- if (assigned(pos) > 0) {
- //首先,在application内部缓存结构中,添加executor
- //并且创建ExecutorDesc对象,其中封装了,给这个executor分配多少个cpu core
- //在spark-submit脚本中,可以指定要多少executor,每个execuor多少个cpu,多少内存
- //那么基于源码机制,实际上,executor的实际数量,以及每个executor的cpu,可能与配置是不一样的
- //因为,我人帝里基于总的cpu来分配的,就是比如,要求3个executor,每个要3个cpu,那么比如,有9个workers,每个有1个cpu
- //那么其实总其知道,要分配9个core,其实根据这种算法,会给每个worker分配一个core,然后给每个worker启动一个executor
- //最后会启动,9个executor,每个executor有1个cpu core
- val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
- //那么就在worker上启动executor
- launchExecutor(usableWorkers(pos), exec)
- //将application状态设置为running
- app.state = ApplicationState.RUNNING
- }
- }
- }
- } else {
- // Pack each app into as few nodes as possible until we've assigned all its cores
- //非spreadOutApps调度算法
- //这种算法与spreadOutApps算法正好相反,1、spreadOutApp尽量平均分配到每个executor上;2、非spreadOutApp尽量在使用单个executor的资源。
- //每个application,都尽可能分配到尽量少的worker上去,比如总其有10个worker,每个有10个core
- //app总共要分配 20个core,那么其实,只会分配到两个worker上,每个worker都占满10个core
- //那么,其余的app,就只能 分配到下一个worker了
- //比如,spark-submit里,配置的是要10个executor,每个要2个core,那么总共是20个croe
- //只会启动2个executor,每个有10个cores
- //将每个Application,尽可能少的分配到worker上去
- //首先,遍历worker,并且是状态为ALIVE,还有空闲cpu的worker
- for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
- //遍历application,并且是还有城朵分配的core的application
- for (app <- waitingApps if app.coresLeft > 0) {
- //判断,如果当前这个worker可以被 application使用
- if (canUse(app, worker)) {
- //取worker剩余cpu数量,与app要分配的cpu数量的最小值
- val coresToUse = math.min(worker.coresFree, app.coresLeft)
- //如果Worker剩余cpu为0了,就不分配了
- if (coresToUse > 0) {
- // 给app添加一个executor
- val exec = app.addExecutor(worker, coresToUse)
- //在worker上启动executor
- launchExecutor(worker, exec)
- //将application状态设置为running
- app.state = ApplicationState.RUNNING
- }
- }
- }
- }
- }
相关推荐
总之,《Spark源码分析》这本书是学习和掌握Spark核心技术和实现细节的重要资源,对于想要深入大数据领域的开发者来说,是一本不可多得的参考书籍。通过阅读和实践,读者能够更好地理解Spark的工作原理,从而在实际...
通过对Spark SQL的源码分析,我们可以理解其内部的工作机制,这对于优化查询性能、调试问题和开发高效的数据处理应用程序具有重要意义。在实践中,理解这些核心概念和流程能够帮助我们更好地设计和实现大数据处理...
7. 实验设计:为了加深对Spark源码分析的理解,课程可能会设置一系列实验。这些实验可能会涉及修改Spark源码以观察特定行为的变化,或者是基于源码分析来优化Spark程序的性能。 8. 最佳实践:最后,课程可能会分享...
深入理解Spark源码,有助于开发者优化应用程序性能、调试问题,甚至为Spark贡献代码。Spark的源码是用Java和Scala编写的,因此熟悉这两种语言对于理解源码至关重要。同时,理解Scala的Actor模型和Akka框架也是解析...
学习Spark源码对于大数据开发人员来说极其有价值,因为它可以帮助你优化代码,理解Spark的执行效率,甚至参与到Spark的贡献和开发中去。对于初学者,建议从核心概念如RDD、DAG、TaskScheduler入手,逐步深入到各个...
Spark 2.2.0与资源调度框架YARN和Mesos的集成更加紧密,优化了资源申请和释放的过程,提升了在分布式环境下的运行效率。 总结,Spark 2.2.0源码的分析,不仅可以帮助我们理解Spark的内在机制,还能指导我们在实际...
本篇文章将深入探讨Spark Driver的工作机制,以及如何生成Jobs并启动Tasks。 首先,我们来理解Spark作业(Job)与任务(Task)的概念。一个Spark作业是由一个或多个RDD操作(如map、filter、reduceByKey等)组成的...
《Spark源码分析3——驱动器、主节点与执行器之间的连接》 在Spark的分布式计算框架中,驱动器(Driver)、主节点(Master)和执行器(Executor)是核心组件,它们之间的通信和协作构成了Spark作业执行的基础。本文将深入...
6. **Spark源码分析**: 书中可能会深入到Spark源码,解析其任务调度、内存管理和数据交换等关键机制,帮助读者理解Spark是如何高效运行的。例如,DAGScheduler如何将作业拆分为任务,以及TaskScheduler如何将任务...
源码分析: 1. **Spark Core**:Spark的核心组件,负责任务调度、内存管理、故障恢复和与存储系统的交互。在源码中,你可以看到DAGScheduler如何将任务分解为Stage,以及TaskScheduler如何将这些Stage转化为具体的...
7. **动态资源调度**:Spark 2.2.0引入了动态资源分配,允许作业根据可用资源自动调整执行器的数量,提高集群资源利用率。 8. **SQL和DataFrame优化**:Spark SQL的Catalyst优化器对DataFrame查询进行分析和优化,...
Spark源码的学习对于深入理解其内部机制,提升开发效率,以及解决实际问题具有至关重要的作用。本文将从Spark的核心组件、架构设计、任务调度、数据存储与计算等多个角度,对Spark源码进行详尽的解读。 一、Spark...
### Master的资源调度算法 - Spark的Master节点负责整个集群的资源管理和调度。 - 它根据Worker节点的状态和资源使用情况,决定哪些任务分配到哪个Worker节点上执行。 ### Executor启动UML总结 - Executor是Spark...
这种模式下,YARN作为资源调度器负责分配资源,而Spark则负责任务的具体执行。这种方式使得Spark能够更好地利用Hadoop集群中的资源,提高资源利用率和任务执行效率。 ### 二、Yarn架构本质与自定义开发 #### Yarn...
《Spark源码解读迷你书》是一本专注于深入理解Apache Spark核心源码的书籍,适合对大数据处理技术有深厚兴趣并且想要探索Spark内部机制的读者。在阅读这本书之前,建议先搭建好Spark开发环境,比如使用Intelij IDEA...
1. **Spark Core**:这是Spark的基础,提供分布式任务调度、内存管理以及错误恢复机制。 2. **Spark SQL**:将SQL查询与DataFrame API相结合,允许用户使用SQL语句处理结构化数据,并与其他Spark模块无缝集成。 3. *...
8. Spark源码分析:书中可能涵盖了Spark源码的深度分析,帮助读者理解其内部工作机制,如调度系统、存储层次、容错机制等,这对于优化Spark应用和解决性能问题非常有价值。 9. 性能优化:Spark的性能优化是学习的...
在源码分析部分,读者会了解到Spark如何通过`SparkContext`初始化,如何调度任务,`Executor`如何执行任务,以及`RDD`的创建、转换和行动操作的实现细节。此外,还会深入到`Shuffle`过程、错误恢复机制、存储策略...
Spark on Mesos 提供了更高级别的资源管理和调度机制,适合需要高度自定义和灵活性的应用场景;而 Spark on YARN 则能够更好地融入 Hadoop 生态系统,适合大规模生产环境。根据实际需求选择合适的部署方式,可以充分...