`
bit1129
  • 浏览: 1069653 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark三十四】Standalone集群+Cluster部署模式下用户提交任务的执行流程

 
阅读更多

standalone-Cluster模式下application提交到执行的流程

  • SparkSubmit提交程序
  • 通过sparkSubmit命令提交执行SparkSubmit的main函数,
  • 在SparkSubmit的main函数中调用createLaunchEnv方法,这个方法用于解析当前用户作业提交命令中包含的集群管理器和Driver部署模式,以及命令参数,对环境进行解析
  • 环境解析完成后,在main函数中根据用户提交作业的环境执行launch函数
  • 因为standalone-cluster模式,master以spark开头并且deploy-mode为cluster,因此,mainClass是org.apache.spark.deploy.Client 也就是最终的逻辑进入到org.apache.spark.deploy.Client的main函数。Client类用于在Standalone-Cluster模式下启动和停止Driver
  • 在org.apache.spark.deploy.Client的main函数中创建ClientActor,这是一个Akka的Actor,定义于org.apache.spark.deploy.Client类中
  • ClientActor执行它的preStart方法,主要工作是封装Driver信息,给Master发送RequestSubmitDriver请求,请求参数是DriverDescription

 

  override def preStart() = {
    masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master))

    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])

    println(s"Sending ${driverArgs.cmd} command to ${driverArgs.master}")

    driverArgs.cmd match {
      case "launch" => ///在ClientActor的preStart方法中,启动Driver
        // TODO: We could add an env variable here and intercept it in `sc.addJar` that would
        //       truncate filesystem paths similar to what YARN does. For now, we just require
        //       people call `addJar` assuming the jar is in the same directory.
        val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"

        val classPathConf = "spark.driver.extraClassPath"
        val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>
          cp.split(java.io.File.pathSeparator)
        }

        val libraryPathConf = "spark.driver.extraLibraryPath"
        val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>
          cp.split(java.io.File.pathSeparator)
        }

        val extraJavaOptsConf = "spark.driver.extraJavaOptions"
        val extraJavaOpts = sys.props.get(extraJavaOptsConf)
          .map(Utils.splitCommandString).getOrElse(Seq.empty)
        val sparkJavaOpts = Utils.sparkJavaOpts(conf)
        val javaOpts = sparkJavaOpts ++ extraJavaOpts
        //进程启动命令参数:主类是DriverWrapper
        val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++
          driverArgs.driverOptions, sys.env, classPathEntries, libraryPathEntries, javaOpts)

        val driverDescription = new DriverDescription(
          driverArgs.jarUrl,
          driverArgs.memory,
          driverArgs.cores,
          driverArgs.supervise,
          command)
        ///给Master发送提交创建Driver的请求,Driver的各种信息是在Client端完成的
        masterActor ! RequestSubmitDriver(driverDescription)

      case "kill" =>
        val driverId = driverArgs.driverId
        masterActor ! RequestKillDriver(driverId)
    }
  }
 

 

 

  • 在Master中处理RequestSubmitDriver请求,是通过createDriver创建DriverInfo对象(此时Driver进程尚未建立)和调用schedule方法。schedule是资源调度的主要方法(schedule the currently available resources among the waiting apps. This method will be called every time a new app joins or resource availability changes.)
  • 在Master的schedule中的launchDriver方法中,给Worker发送LauchDriver请求(Master如何确定在哪个Worker上启动Driver?Master使用round-robin方式,依次在Workers上创建多个app的Driver,同时要兼顾Worker上空闲的资源是否满足Driver需要的资源,worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores)
  • 在Worker中处理LaunchDriver请求,创建DriverRunner,调用DriverRunner.start方法。同时记录下如下信息:a.将本Worker的可用内存和CPU核数减掉Driver使用的内存数和CPU核数 b.使用drivers集合变量记录下本Worker处理了这个Driver
  • 在DriverRunner的start方法中,调用DriverRunner的launchDriver
  • 在launchDriver中调用runCommandWithRetry创建Driver进程

下面对创建Driver进程(JVM进程)的代码梳理一下:

1.

private[deploy] def runCommandWithRetry(command: ProcessBuilderLike, initialize: Process => Unit,
    supervise: Boolean) {
    // Time to wait between submission retries.
    var waitSeconds = 1
    // A run of this many seconds resets the exponential back-off.
    val successfulRunDuration = 5

    var keepTrying = !killed

    while (keepTrying) {
      logInfo("Launch Command: " + command.command.mkString("\"", "\" \"", "\""))

      synchronized {
        if (killed) { return }
        process = Some(command.start()) ///command的start方法返回Java的Process对象
        initialize(process.get)
      }

      val processStart = clock.currentTimeMillis()
      val exitCode = process.get.waitFor()
      if (clock.currentTimeMillis() - processStart > successfulRunDuration * 1000) {
        waitSeconds = 1
      }

      if (supervise && exitCode != 0 && !killed) {
        logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")
        sleeper.sleep(waitSeconds)
        waitSeconds = waitSeconds * 2 // exponential back-off
      }

      keepTrying = supervise && exitCode != 0 && !killed
      finalExitCode = Some(exitCode)
    }
  }

 

2. 传入的command是ProcessBuilderLike类型的对象

private[deploy] object ProcessBuilderLike {
  def apply(processBuilder: ProcessBuilder) = new ProcessBuilderLike {
    def start() = processBuilder.start() ///调用Java的ProcessBuilder的start方法
    def command = processBuilder.command()
  }
}

 

3. 传入的commandProcessbuilderLike是基于ProcessBuilder而构建,command = new ProcessBuilderLike(ProcessBuilder)

          val builder = CommandUtils.buildProcessBuilder(driverDesc.command, driverDesc.mem,
            sparkHome.getAbsolutePath, substituteVariables, Seq(localJarFilename)) ///返回ProcessBuilder对象

 

4.CommandUtils.buildProcessBuilder的代码如下所示:

  def buildProcessBuilder(
      command: Command,
      memory: Int,
      sparkHome: String,
      substituteArguments: String => String,
      classPaths: Seq[String] = Seq[String](),
      env: Map[String, String] = sys.env): ProcessBuilder = {
    val localCommand = buildLocalCommand(command, substituteArguments, classPaths, env)
    val commandSeq = buildCommandSeq(localCommand, memory, sparkHome) ///构造【${JAVA_HOME}/bin/java options mainClass 参数】 命令
    val builder = new ProcessBuilder(commandSeq: _*)
    val environment = builder.environment()
    for ((key, value) <- localCommand.environment) {
      environment.put(key, value)
    }
    builder
  }

 

5. buildLocalCommand方法

读取Java进程运行所在的机器的环境信息,比如系统变量等

  /**
   * Build a command based on the given one, taking into account the local environment
   * of where this command is expected to run, substitute any placeholders, and append
   * any extra class paths.
   */
  private def buildLocalCommand(
      command: Command,
      substituteArguments: String => String,
      classPath: Seq[String] = Seq[String](),
      env: Map[String, String]): Command = {
    val libraryPathName = Utils.libraryPathEnvName
    val libraryPathEntries = command.libraryPathEntries
    val cmdLibraryPath = command.environment.get(libraryPathName)

    val newEnvironment = if (libraryPathEntries.nonEmpty && libraryPathName.nonEmpty) {
      val libraryPaths = libraryPathEntries ++ cmdLibraryPath ++ env.get(libraryPathName)
      command.environment + ((libraryPathName, libraryPaths.mkString(File.pathSeparator)))
    } else {
      command.environment
    }

 

 

6. buildCommandSeq方法

  private def buildCommandSeq(command: Command, memory: Int, sparkHome: String): Seq[String] = {
    val runner = sys.env.get("JAVA_HOME").map(_ + "/bin/java").getOrElse("java")

    // SPARK-698: do not call the run.cmd script, as process.destroy()
    // fails to kill a process tree on Windows
    
    ///构造【${JAVA_HOME}/bin/java options mainClass 参数】 命令  
    Seq(runner) ++ buildJavaOpts(command, memory, sparkHome) ++ Seq(command.mainClass) ++
    command.arguments
  }

 

  • Driver进程启动后,即运行我们定义的application的main函数
  • 创建SparkContext对象,

                 1. 创建SparkDeploySchedulerBackend 和 TaskScheduler, private[spark] var (schedulerBackend, taskScheduler) =   SparkContext.createTaskScheduler(this, master)

               2. 创建 dagScheduler = new DAGScheduler(this)

 

  • 在createTaskScheduler方法中,创建SparkDeploySchedulerBackend对象,SparkDeploySchedulerBackend继承自SparkDeploySchedulerBackend
  • 在SparkContext的构造方法中,调用TaskScheduler的start方法,在start方法内部调用SparkDeploySchedulerBackend的start方法
  • 在SparkDeploySchedulerBackend的start方法中,构造AppClient对象,并调用AppClient的start方法。AppClient类的含义

 

 * Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL,
 * an app description, and a listener for cluster events, and calls back the listener when various
 * events occur.

 

  • 在AppClient中,执行preStart方法以调用 registerWithMaster()方法,将Driver注册给Master,实际上此时给Master发送的是RegisterApplication消息,
  • 在Master的RegisterApplication消息处理中,调用Master的schedule方法,在scheduler方法中,Master将为Application分配计算资源,默认情况下,计算资源的分配策略是尽可能的占用少的Worker数目,即一个Worker满足要求,就不会放到放到两个上面
  • 在Master的schedule方法中调用launchExecutor方法
  • 在Master的launchExecutor方法中,给Worker发送LaunchExecutor消息,同时给Driver发送ExecutorAdded消息,包括在哪个Worker上,分配了多少cores以及多少memory
  • 在Workder的LaunchExecutor消息处理器中,创建ExecutorRunner对象,而ExecutorRunner则通过反射的方式创建一个Java进程,这个进程就是启动一个CoarseGrainedExecutorBackend进程
  • 调用ExecutorRunner对象的start方法,start方法调用fetchAndRunExecutor方法
  • 如下是fetchAndRunExecutor方法的一部分逻辑

 

      val builder = CommandUtils.buildProcessBuilder(appDesc.command, memory,
        sparkHome.getAbsolutePath, substituteVariables)

 问题:appDesc.command是在哪里定义的?这是在SparkDeploySchedulerBackend的start方法定义的

 

    // Start executors with a few necessary configs for registering with the scheduler
    val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
    val javaOpts = sparkJavaOpts ++ extraJavaOpts
    val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", ///封装到ApplicationDescription中,后面会启动一个进程
      args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
    val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
      appUIAddress, sc.eventLogDir)

    client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
    client.start()

 

 

  • CoarseGrainedExecutorBackend是一个Actor,首先运行它的preStart方法,在它的preStart方法中,给Driver发送RegisterExecutor消息
  • 此处的Driver是在CoarseGrainedSchedulerBackend中定义的,当它收到RegisterExecutor时,调用CoarseGrainedSchedulerBackend的makeOffers方法
  • 在makeOffers中,调用launchTasks方法启动任务
  • 在launchTasks中,循环提交所有的Task(这本来是一个TaskSet任务集),每次循环给CoarseGrainedExecutorBackend发送LaunchTask消息
  • CoarseGrainedExecutorBackend处理LaunchTask时,调用Executor的launchTask方法
  • 在Executor的launchTask方法中,提交给Executor中的线程池执行
  def launchTask(
      context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) {
    val tr = new TaskRunner(context, taskId, taskName, serializedTask)
    runningTasks.put(taskId, tr)
    threadPool.execute(tr)
  }

 

 

 

总结

如下图是网上流传甚广的一幅图片,经常上面的流程分析,可知这幅图是错的,第二步不是RegisterDriver,而是RequestSubmitDriver, 第五步才是RegisterDriver(Driver给Master发送的消息是RegisterApplication)

 

 

 

 

 

  • 大小: 40.8 KB
分享到:
评论

相关推荐

    spark 分布式集群搭建

    Spark Standalone 是 Apache Spark 提供的一种自带的集群管理模式,主要用于管理 Spark 应用程序的执行环境。这种模式简单易用,适合于开发测试以及中小型生产环境。 #### Spark Standalone 部署配置 ##### ...

    spark1.2.1常用模式部署运行

    本文档详细介绍了 Spark 1.2.1 在 standalone 集群模式和 on yarn 集群模式下的部署与运行方式。 Spark 版本和环境配置 Spark 1.2.1 的版本号为 spark-1.2.1-bin-hadoop2.4,已经做了相应的环境配置,例如 linux ...

    spark集群部署.docx

    Spark在YARN模式下有两种提交模式:YARN Client和YARN Cluster。前者中,Driver运行在提交应用程序的工作站上,而后者中,Driver运行在Application Master进程中,由YARN管理。 在具体部署Spark集群的过程中,首先...

    Spark Standalone架构设计.docx

    Standalone 模式下,默认使用的是 FIFO 这种简单的调度策略,在进行调度的过程中,大概流程如下图所示:从用户提交 Spark 程序,最终生成 TaskSet,而在调度时,通过 TaskSetManager 来管理一个 TaskSet(包含一组可...

    spark基础,关于spark的安装和几种模式的部署

    在规划Spark Standalone集群时,需要在每台机器上安装Spark,并配置Master和Worker的相关参数。例如,可以将Master节点设置为node01,两个Slave节点为node02和node03。安装步骤包括将Spark安装包上传至服务器,解压...

    spark初始化源码阅读sparkonyarn的client和cluster区别

    Client 负责提交任务,Cluster 负责任务的执行。下面我们将对这两个组件进行详细的分析。 Client Client 是 Spark 的入口点,负责提交任务到集群上。Client 的主要组件有 SparkContext、DriverActor 和 ...

    flink-spark-submiter:从本地IDEA提交FlinkSpark任务到Yarnk8s集群

    Flink任务、Spark任务提交到集群,通常需要将可执行Jar上传到集群,手动执行任务提交指令,如果有配套的大数据平台则需要上传Jar,由调度系统进行任务提交。 对开发者来说,本地IDEA调试Flink、Spark任务不涉及对象...

    23:Spark2.3.x分布式集群安装部署.zip

    Driver程序负责整个应用的逻辑,Executor是在集群中运行任务的工作进程,而Cluster Manager则协调资源分配,如YARN、Mesos或Spark自身的Standalone模式。 安装部署Spark之前,你需要准备一个Hadoop环境,因为Spark...

    【Spark内核篇02】Spark模式运行机制1

    在Spark内核篇02中,我们主要讨论了Spark在三种不同模式下的运行机制:Yarn模式、Yarn-Client模式和Standalone模式,包括这两种运行模式下的Cluster和Client模式。 首先,我们来看Yarn模式的运行流程: 1. 用户通过...

    Spark简介以及其生态圈

    在不同集群中的运行演示部分,通常会展示如何在Standalone和YARN模式下启动Spark-Shell,如何提交Spark应用程序,并通过具体案例来分析运行结果。同时,在问题解决部分,会针对可能遇到的问题,如YARN-CLIENT启动...

    Patrick Wendell:Administering Spark

    通过一个与Spark捆绑的启动器来设置集群,用户可以指定实例类型、密钥名称、私钥文件路径、从节点数量、EC2区域和备用价格(spot-price),从而在5分钟内创建出一个Spark Standalone集群,一个HDFS集群和一个...

    Spark原理及源码剖析1

    本文将深入探讨Spark的原理及源码分析,首先从Spark运行时的通用流程入手,然后介绍核心组件的角色与职责,以及Spark支持的不同集群部署模式。 在Spark的运行流程中,用户通过`spark-submit`提交应用程序。这个过程...

    04_尚硅谷大数据技术之Spark内核1

    YARN模式下,Spark运行机制分为client和cluster模式,其中client模式中Driver运行在客户端,而cluster模式中Driver运行在集群内部。 理解Spark内核和部署模式对于优化Spark应用性能和解决运行中遇到的问题具有重要...

    Spark内核解析.docx

    - Standalone:Spark自带的简单集群管理器,适合独立部署; - Apache Mesos:分布式资源管理框架,支持多种计算框架; - Hadoop YARN:统一资源管理,支持mapreduce、storm等多种计算框架,有yarn client和yarn ...

    spark_deploy

    在"spark_deploy"这个主题中,我们主要关注的是如何部署和配置Spark集群,以实现大规模数据处理任务的高效执行。 首先,理解Spark的部署模式至关重要。Spark提供了多种部署方式,包括本地模式(Local)、独立模式...

    Apache Spark常见面试题

    - **特点**:在Standalone模式下,Spark应用可以独立部署在一个集群中,通过ZooKeeper解决了单点故障问题。Standalone模式下的资源管理较为简单,每个Worker节点上的资源被抽象成若干个slot,根据任务需求进行分配...

    Spark Cluster Computing with Data Set

    Spark的集群计算模式包括本地模式、standalone模式、Mesos模式和YARN模式。在这些模式下,Spark可以跨多台机器并行处理数据,提升计算效率。Cluster Manager负责分配资源,Executor负责执行任务,Driver负责协调...

    spark最新集群搭建指南2017

    - **Standalone模式**:Spark自带的分布式集群管理模式。通过修改`spark-env.sh`配置Master和Worker,并将Spark安装包复制到所有节点,然后启动集群。`start-all.sh`命令启动所有服务,可以通过`...

    大数据组件 Spark 面试题 + Spark 高频面试题

    - Spark架构包括Driver、Executor和Scheduler等组件,作业提交流程涉及Driver向资源管理器(如YARN的ResourceManager)申请资源,启动Executor,然后Executor执行任务。 - 在YARN的client模式下,Driver运行在...

    46488-Spark大数据技术与应用(1-3).pdf

    Spark的核心架构由三部分组成:Driver Program(驱动程序)、Cluster Manager(集群管理器)和Worker Node(工作节点)。 Driver Program是运行应用程序的主节点,它负责创建SparkContext对象,该对象是与Spark集群...

Global site tag (gtag.js) - Google Analytics