`
bit1129
  • 浏览: 1069537 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark九十八】Standalone Cluster Mode下的资源调度源代码分析

 
阅读更多

在分析源代码之前,首先对Standalone Cluster Mode的资源调度有一个基本的认识:

首先,运行一个Application需要Driver进程和一组Executor进程。在Standalone Cluster Mode下,Driver和Executor都是在Master的监护下给Worker发消息创建(Driver进程和Executor进程都需要分配内存和CPU,这就需要Master进行资源的调度)。

另外,在为Application的Executor进程s分配CPU内核时,需要考虑CPU内核是尽可能的分散到所有的Worker上分配,还是尽可能在尽量少的Worker上分配,这是有计算任务的特点决定的,是数据密集型还是计算密集型(比如求PI值)

 

源代码:

 

/**
   * Schedule the currently available resources among waiting apps. This method will be called
   * every time a new app joins or resource availability changes.
   */
  private def schedule() {
    if (state != RecoveryState.ALIVE) { return }

    // First schedule drivers, they take strict precedence over applications
    // Randomization helps balance drivers
    //首先为已经提交Application,但是Application对应的Driver进程还没有启动的应用分配Driver资源
    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
    val numWorkersAlive = shuffledAliveWorkers.size
    var curPos = 0

    ///waitingDrivers列表保存了已经提交Application,但是Application对应的Driver进程还没有启动的DriverInfo信息,每个元素是DriverInfo类型
    for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
      // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
      // start from the last worker that was assigned a driver, and continue onwards until we have
      // explored all alive workers.
      var launched = false
      var numWorkersVisited = 0
      //对于给定的待申请资源的Driver,遍历所有可用的Worker,寻找满足条件的Worker(CPU和内存足够)
      while (numWorkersVisited < numWorkersAlive && !launched) {
        //获取第curPos号的可用Worker
        val worker = shuffledAliveWorkers(curPos)
        numWorkersVisited += 1
        ////如果该worker满足Driver的资源需求,则启动Driver进程,并且设置为已经动(launched=true)
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
          launchDriver(worker, driver)
          waitingDrivers -= driver
          launched = true
        }
        ///判断下一个可用的Worker
        curPos = (curPos + 1) % numWorkersAlive
      }
    }

    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
    // in the queue, then the second app, etc.
    //将Executor尽量分散到尽可能多的Worker上
    if (spreadOutApps) {
      // Try to spread out each app among all the nodes, until it has all its cores
      for (app <- waitingApps if app.coresLeft > 0) {
        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
          .filter(canUse(app, _)).sortBy(_.coresFree).reverse
        val numUsable = usableWorkers.length
        //assigned数组记录每个Worker上已经分配的CPU内核数
        val assigned = new Array[Int](numUsable) // Number of cores to give on each node
     
   
        //toAssign表示为该app将要分配的CPU内核数,它是app申请的内核数和所有Worker的剩余CPU内核数的较小值
        var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
        var pos = 0
        while (toAssign > 0) {
          //遍历所有的可用Worker,一次分配一个CPU内核,记录在assigned数组中
          if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
            toAssign -= 1
            assigned(pos) += 1
          }
          pos = (pos + 1) % numUsable
        }
        // Now that we've decided how many cores to give on each node, let's actually give them
        //根据assigned数组中记录的每个Worker要分配的CPU内核数,进行实际的分配
        for (pos <- 0 until numUsable) {
          if (assigned(pos) > 0) {
            val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
            //启动给定指定内核数的Executor进程(实际上是CoarceGrainedExecutorBackend进程)
            launchExecutor(usableWorkers(pos), exec)
            app.state = ApplicationState.RUNNING
          }
        }
      }
    } else {
      // Pack each app into as few nodes as possible until we've assigned all its cores
      //将Executor的分配集中在尽可能上的Worker上,外围循环是Worker,内层循环是Application
      for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
        
        for (app <- waitingApps if app.coresLeft > 0) {
          if (canUse(app, worker)) {//针对该Application,如果可以分配资源,
            //分配的内核数是该Worker可用的内核数与App申请的内核数的最小值
            val coresToUse = math.min(worker.coresFree, app.coresLeft)
            if (coresToUse > 0) {
              //要么Worker的内核数全部分配,要么app需要的内核数得到满足
              val exec = app.addExecutor(worker, coresToUse)
              launchExecutor(worker, exec)
              app.state = ApplicationState.RUNNING
            }
          }
        }
      }
    }
  }

 

 

分享到:
评论

相关推荐

    Spark Standalone架构设计.docx

    Spark Standalone 架构设计 Spark 是一个开源的通用集群计算系统,提供了 High-...Spark 集群模式架构,如下图所示:Spark 集群 Cluster Manager 目前支持如下三种模式:Standalone 模式、YARN 模式和 Mesos 模式。

    Spark实验:Standalone模式安装部署(带答案)1

    在Standalone模式下,Spark可以独立于任何分布式资源管理系统运行,形成一个自包含的集群。以下是安装部署Spark Standalone模式的详细步骤和相关知识点。 **一、实验描述与环境** 1. 实验描述:本实验旨在搭建一个...

    SPARK源代码

    Spark源代码的深入理解对于任何希望在大数据领域深化技术理解的开发者来说都是宝贵的资源。下面,我们将从多个角度详细解析Spark源码中的关键知识点。 1. **RDD(弹性分布式数据集)**:Spark的核心数据抽象是RDD,...

    spark源代码部署及编译生成

    如果需要对Spark源代码进行修改,可以在源代码目录下进行编辑,然后重新执行`sbt compile`和`sbt package`命令。若要测试修改,可以创建一个新的Spark项目或者在`spark-shell`中加载自定义的Spark库。 7. **阅读源...

    springboot整合spark连接远程服务计算框架使用standAlone模式

    在Standalone模式下,每个节点都既是worker又是driver,可以处理任务并调度其他节点的任务。这种方式简单易用,适合小型或中型规模的部署。 集成Spring Boot和Spark的第一步是添加依赖。在Spring Boot的`pom.xml`...

    spark初始化源码阅读sparkonyarn的client和cluster区别

    Spark 初始化源码阅读 Spark on YARN 的 Client 和 Cluster 区别 Spark 是一个大数据处理的开源框架,它可以在多种集群管理器上运行,如 YARN、Mesos 和 Standalone。Spark on YARN 是 Spark 在 YARN 集群管理器上...

    Spark Standalone 单机版部署

    Spark standalone 单机版部署,看了网上很多方法,事实证明都是错误的,本人亲身经历,在导师的指导下,成功配置成功单机版。

    spark 分布式集群搭建

    ### Spark Standalone 分布式集群搭建详解 #### Spark Standalone 运行模式概述 Spark Standalone 是 Apache Spark 提供的一种自带的集群管理模式,主要用于管理 Spark 应用程序的执行环境。这种模式简单易用,适合...

    基于spark的外卖大数据平台分析系统.zip

    在系统架构层面,Spark运行在Hadoop YARN或独立的Standalone集群上,提供资源管理和任务调度。对于大规模数据处理,可以采用Spark的弹性分布式数据集(RDD)和DataFrame/Dataset API,确保高效的数据并行处理和容错...

    Spark简介以及其生态圈

    在不同集群中的运行演示部分,通常会展示如何在Standalone和YARN模式下启动Spark-Shell,如何提交Spark应用程序,并通过具体案例来分析运行结果。同时,在问题解决部分,会针对可能遇到的问题,如YARN-CLIENT启动...

    spark standalone的集群镜像,基于zk做ha的镜像+源代码+文档说明

    1、该资源内项目代码都经过测试运行成功,功能ok的情况下才上传的,请放心下载使用! 2、本项目适合计算机相关专业(如计科、人工智能、通信工程、自动化、电子信息等)的在校学生、老师或者企业员工下载学习,也适合...

    Spark环境搭建——standalone集群模式

    这篇博客,Alice为大家带来的是Spark集群环境搭建之——standalone集群模式。 文章目录集群角色介绍集群规划修改配置并分发启动和停止查看web界面测试 集群角色介绍  Spark是基于内存计算的大数据并行计算框架,...

    独立部署模式standalone下spark的配置

    独立部署模式standalone下spark配置,从乌班图到jak,scala,hadoop,spark的安装 部署

    Spark Cluster Computing with Data Set

    Driver负责任务调度,Executor在Worker节点上运行,执行实际计算任务,Cluster Manager负责资源管理。 3. **数据集(Dataset)**: 在Spark 2.0及以后版本中,引入了DataFrame和Dataset API,它们是对RDD的一种...

    spark1.2.1常用模式部署运行

    本文档详细介绍了 Spark 1.2.1 在 standalone 集群模式和 on yarn 集群模式下的部署与运行方式。 Spark 版本和环境配置 Spark 1.2.1 的版本号为 spark-1.2.1-bin-hadoop2.4,已经做了相应的环境配置,例如 linux ...

    Spark大数据分析与实战课后练习答案.rar

    《Spark大数据分析与实战》课程是一门深入探讨Apache Spark在大数据处理领域的应用和技术的课程,其课后练习答案集提供了对课程所讲授知识的巩固和实践。这是一份珍贵的配套教学资源,旨在帮助学生更好地理解和掌握...

    Flink-Standalone-Cluster.md

    Flink_Standalone_Cluster

    ApacheSpark设计与实现.pdf+ApacheSpark源码剖析.pdf+Spark原著中文版.pdf

    Apache Spark 是一个强大的分布式计算框架,它以高效、易用和通用性著称。这个压缩包包含三本书籍,分别从不同的角度深入探讨...对于想在大数据领域尤其是Spark平台下工作的开发者来说,这些资源是不可或缺的学习资料。

    内存计算框架Spark实战

    在这之中,独立模式(Standalone)是Spark自带的资源调度模式,它允许用户在没有任何外部依赖的情况下启动Spark集群。而在分布式模式节点中,资源调度掌握整个集群的资源,管理集群中的节点资源分配和任务调度。 ...

Global site tag (gtag.js) - Google Analytics