`
bit1129
  • 浏览: 1069523 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark三十六】Spark On Yarn之yarn-client方式部署

 
阅读更多

按照Spark的部署设置,对于Spark运行于Yarn之上,有如下四种选择方式(本质上是两种),

  • yarn-client+client
  • yarn-cluster+cluster
  • yarn-client(部署方式默认为client)
  • yarn-cluster(部署方式默认为cluster)

yarn-client+cluster组合以及yarn-cluster+client是不正确的组合,Spark报错退出。

本文首先探讨Spark On Yarn之yarn-client+client方式部署下的代码执行流程

 

程序提交给Yarn运行时环境

 

  • 对于部署方式是Client的情况,SparkSubmit的main函数中通过反射执行应用程序的main方法
  • 在应用程序的main方法中,创建SparkContext实例
  • 在创建SparkContext的实例过程中,通过如下语句创建Scheduler和Backend实例

 

  private[spark] var (schedulerBackend, taskScheduler) =  SparkContext.createTaskScheduler(this, master)

 

  • 由于当前是yarn-client和client组合部署模式,

        1.代码执行逻辑是: taskScheduler是org.apache.spark.scheduler.cluster.YarnClientClusterScheduler实例,它是TaskSchedulerImpl的子类,它的文档说明为

 

 

/**
 * This scheduler launches executors through Yarn - by calling into Client to launch the Spark AM.
 */
 
      2.schedulerBackend是org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend实例,它是CoarseGrainedSchedulerBackend的子类,它是文档说明为无

 

 

  • 继续SparkContext实例的创建过程,调用taskScheduler的start方法,也即YarnClientClusterScheduler的start方法,因为YarnClientClusterScheduler并没有覆盖TaskSchedulerImpl的start方法,所以执行逻辑进入到TaskSchedulerImpl的start方法中
  • 在TaskSchedulerImpl的start方法中,调用backend的start方法,由于此处的backend是YarnClientSchedulerBackend,所以代码逻辑进入到YarnClientSchedulerBackend的start方法中
  • 在YarnClientSchedulerBackend的start方法中,创建YarnClient(将用户编写的应用程序提交给Yarn的ResourceManager)
  • 在YarnClient创建yarn.Client对象,然后调用submitApplication,等待Application执行完,如下代码所示
    client = new Client(args, conf) //yarn.client
    appId = client.submitApplication()
    waitForApplication() ///阻塞等待Application进入Running状态
    asyncMonitorApplication() ///异步监控Application的运行状态,If the application has exited for any reason, stop the SparkContext.

 

  • 程序逻辑进入了yarn.Client调用submitApplication的逻辑,执行代码:

1.submitApplication的代码(Spark)

  /**
   * Submit an application running our ApplicationMaster to the ResourceManager.
   *
   * The stable Yarn API provides a convenience method (YarnClient#createApplication) for
   * creating applications and setting up the application submission context. This was not
   * available in the alpha API.
   */
  ////这里借助Hadoop Yarn提供的API提交应用程序,这里的API是Hadoop Yarn的YarnClient
  override def submitApplication(): ApplicationId = {
    yarnClient.init(yarnConf) ////yarnClient是通过YarnClient.createYarnClient创建,而YarnClient是Hadoop API,所以yarnClient也是Hadoop的API
    yarnClient.start() ///启动Yarn

    logInfo("Requesting a new application from cluster with %d NodeManagers" ///NodeManager是什么概念?
      .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))

    // Get a new application from our RM
    val newApp = yarnClient.createApplication()///代码运行到此处,还没有真正的把程序代码提交给Yarn去运行;这里使用YarnClient创建一个Application,类型为YarnClientApplication
    val newAppResponse = newApp.getNewApplicationResponse() ///返回GetNewApplicationResponse类型
    val appId = newAppResponse.getApplicationId() ///获取applicationId

    // Verify whether the cluster has enough resources for our AM
    verifyClusterResources(newAppResponse)

    // Set up the appropriate contexts to launch our AM
    val containerContext = createContainerLaunchContext(newAppResponse) ///创建启动ApplicationMaster容器的上下文环境
    val appContext = createApplicationSubmissionContext(newApp, containerContext)///依据创建的newApp和containerContext,创建应用上下文环境

    // Finally, submit and monitor the application
    logInfo(s"Submitting application ${appId.getId} to ResourceManager")
    yarnClient.submitApplication(appContext) ///根据创建的applicationContext,由Hadoop Yarn的yarnClient提交作业
    appId
  }

2. YarnClient的createApplication代码(Hadoop Yarn)

 

 

  @Override
  public YarnClientApplication createApplication()
      throws YarnException, IOException {
    ApplicationSubmissionContext context = Records.newRecord
        (ApplicationSubmissionContext.class);
    GetNewApplicationResponse newApp = getNewApplication();
    ApplicationId appId = newApp.getApplicationId();
    context.setApplicationId(appId);
    return new YarnClientApplication(newApp, context);
  }

 

3. YarnClientApplication的getNewApplicationResponsed代码(Hadoop Yarn)

 

 public GetNewApplicationResponse getNewApplicationResponse() {
    return newAppResponse;//类型为GetNewApplicationResponse 
  }

 

4. createContainerLaunchContext代码(Spark)

 /**
   * Set up a ContainerLaunchContext to launch our ApplicationMaster container.
   * This sets up the launch environment, java options, and the command for launching the AM.
   */
  ///创建启动ApplicationMaster容器的上下文环境
  protected def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
      : ContainerLaunchContext = {
    logInfo("Setting up container launch context for our AM")

    val appId = newAppResponse.getApplicationId
    val appStagingDir = getAppStagingDir(appId)
    val localResources = prepareLocalResources(appStagingDir) ///准备本地资源
    val launchEnv = setupLaunchEnv(appStagingDir)
    val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) ///创建ApplicationMaster容器,类型为ContainerLaunchContext
    amContainer.setLocalResources(localResources)
    amContainer.setEnvironment(launchEnv)

    val javaOpts = ListBuffer[String]()

    // Set the environment variable through a command prefix
    // to append to the existing value of the variable
    var prefixEnv: Option[String] = None

    // Add Xmx for AM memory
    javaOpts += "-Xmx" + args.amMemory + "m"

    val tmpDir = new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
    javaOpts += "-Djava.io.tmpdir=" + tmpDir

    // TODO: Remove once cpuset version is pushed out.
    // The context is, default gc for server class machines ends up using all cores to do gc -
    // hence if there are multiple containers in same node, Spark GC affects all other containers'
    // performance (which can be that of other Spark containers)
    // Instead of using this, rely on cpusets by YARN to enforce "proper" Spark behavior in
    // multi-tenant environments. Not sure how default Java GC behaves if it is limited to subset
    // of cores on a node.
    val useConcurrentAndIncrementalGC = launchEnv.get("SPARK_USE_CONC_INCR_GC").exists(_.toBoolean)
    if (useConcurrentAndIncrementalGC) {
      // In our expts, using (default) throughput collector has severe perf ramifications in
      // multi-tenant machines
      javaOpts += "-XX:+UseConcMarkSweepGC"
      javaOpts += "-XX:+CMSIncrementalMode"
      javaOpts += "-XX:+CMSIncrementalPacing"
      javaOpts += "-XX:CMSIncrementalDutyCycleMin=0"
      javaOpts += "-XX:CMSIncrementalDutyCycle=10"
    }

    // Forward the Spark configuration to the application master / executors.
    // TODO: it might be nicer to pass these as an internal environment variable rather than
    // as Java options, due to complications with string parsing of nested quotes.
    for ((k, v) <- sparkConf.getAll) {
      javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v")
    }

    // Include driver-specific java options if we are launching a driver
    if (isLaunchingDriver) {  //什么情况是启动Driver??
      sparkConf.getOption("spark.driver.extraJavaOptions")
        .orElse(sys.env.get("SPARK_JAVA_OPTS"))
        .foreach(opts => javaOpts += opts)
      val libraryPaths = Seq(sys.props.get("spark.driver.extraLibraryPath"),
        sys.props.get("spark.driver.libraryPath")).flatten
      if (libraryPaths.nonEmpty) { ///此处对prefixEnv进行唯一的赋值,
        prefixEnv = Some(Utils.libraryPathEnvPrefix(libraryPaths))
      }
    }

    // For log4j configuration to reference
    javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)

    val userClass = ///只有启动Driver的情况下才会设置--class
      if (isLaunchingDriver) {
        Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))
      } else {
        Nil
      }
    val userJar = ////应用程序的jar文件
      if (args.userJar != null) {
        Seq("--jar", args.userJar)
      } else {
        Nil
      }
    val amClass = ///如果是Driver,则是ApplicationMaster,否则是ExecutorLauncher
      if (isLaunchingDriver) {
        Class.forName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
      } else {
        Class.forName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
      }
    val userArgs = args.userArgs.flatMap { arg =>
      Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
    }
    val amArgs = ///ApplicationMaster --class --jar
      Seq(amClass) ++ userClass ++ userJar ++ userArgs ++
      Seq(
        "--executor-memory", args.executorMemory.toString + "m",
        "--executor-cores", args.executorCores.toString,
        "--num-executors ", args.numExecutors.toString)

    // Command for the ApplicationMaster
    ///封装要执行的命令,使用java -server javaOpts armArgs
    val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
      javaOpts ++ amArgs ++
      Seq(
        "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
        "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")

    // TODO: it would be nicer to just make sure there are no null commands here
    val printableCommands = commands.map(s => if (s == null) "null" else s).toList
    //amContainer包含的命令
    amContainer.setCommands(printableCommands)

    logDebug("===============================================================================")
    logDebug("Yarn AM launch context:")
    logDebug(s"    user class: ${Option(args.userClass).getOrElse("N/A")}")
    logDebug("    env:")
    launchEnv.foreach { case (k, v) => logDebug(s"        $k -> $v") }
    logDebug("    resources:")
    localResources.foreach { case (k, v) => logDebug(s"        $k -> $v")}
    logDebug("    command:")
    logDebug(s"        ${printableCommands.mkString(" ")}")
    logDebug("===============================================================================")

    // send the acl settings into YARN to control who has access via YARN interfaces
    val securityManager = new SecurityManager(sparkConf)
    amContainer.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager))
    setupSecurityToken(amContainer)
    UserGroupInformation.getCurrentUser().addCredentials(credentials)

    amContainer
  }

 

 

5. createApplicationSubmissionContext的代码(Spark)

  /**
   * Set up the context for submitting our ApplicationMaster.
   * This uses the YarnClientApplication not available in the Yarn alpha API.
   */
  ///提交ApplicationMaster的上下文环境
  def createApplicationSubmissionContext(
      newApp: YarnClientApplication,
      containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
    val appContext = newApp.getApplicationSubmissionContext
    appContext.setApplicationName(args.appName) ///应用程序的名字
    appContext.setQueue(args.amQueue)
    appContext.setAMContainerSpec(containerContext) ///将创建的ContainerLaunchContext包装到appContext中
    appContext.setApplicationType("SPARK") ////UI上当应用程序执行完成后,显示的应用程序的类型
    val capability = Records.newRecord(classOf[Resource])
    capability.setMemory(args.amMemory + amMemoryOverhead)
    appContext.setResource(capability)
    appContext
  }

 

  • 上面分析了yarn.Client调用submitApplication详细逻辑,程序回到的start方法中,继续下面的逻辑
    waitForApplication() ///阻塞等待程序进入Running状态
    asyncMonitorApplication() ///异步监控程序的运行状态

 

  • 当作业提交到Yarn中之后,Yarn创建一个进程。如果是Driver则运行ApplicationMaster,否则运行ExecutorLauncher。是否是Driver通过yarn.client的isLaunchingDriver变量决定。isLaunchingDriver取值依赖于args.userClass是否存在,用户的指令中提供了--class参数,则args.userClass中的值就是用户提供的--class参数。此逻辑在.yarn.ClientArguments类中的parseArgs中
  • 我们的spark-submit提供了--class参数,所以,Yarn将启动ApplicationMaster进程。也就是说,在Yarn的运行时环境中,启动了Spark的ApplicationMaster进程

在Yarn运行时环境中运行Spark的ApplicationMaster进程的执行逻辑

  • 代码逻辑进入到ApplicationMaster的main方法中

 

  def main(args: Array[String]) = {
    SignalLogger.register(log)
    val amArgs = new ApplicationMasterArguments(args) ///args是什么内容?
    SparkHadoopUtil.get.runAsSparkUser { () =>
      master = new ApplicationMaster(amArgs, new YarnRMClientImpl(amArgs)) ///因为ApplicationMaster需要跟Yarn的ResourceManager交互,所以这里需要访问RM的YarnRMClientImpl实例
      System.exit(master.run()) ///执行ApplicationMaster的run方法
    }
  }

 

  •  在ApplicationMaster的run方法中调用ApplicationMaster的runDriver或者runExecutorLauncher方法,究竟运行哪一个??
  • 此处的逻辑先暂时不表,因为还没有清楚ApplicationMaster的runDriver或者runExecutorLauncher方法,究竟运行哪一个,不过最终的逻辑都会走进ApplicationMaster的registerAM中,先继续吧!!!
  • 在ApplicationMaster的registerAM方法中,调用YarnRMClient的register方法,如下是代码:

 

  override def register(
      conf: YarnConfiguration,
      sparkConf: SparkConf,
      preferredNodeLocations: Map[String, Set[SplitInfo]],
      uiAddress: String,
      uiHistoryAddress: String,
      securityMgr: SecurityManager) = {
    amClient = AMRMClient.createAMRMClient()
    amClient.init(conf)
    amClient.start()
    this.uiHistoryAddress = uiHistoryAddress

    logInfo("Registering the ApplicationMaster") ///注册APPlicationMaster到Yarn
    synchronized {
      amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)///调用AMRMClient的registerApplicationMaster方法
      registered = true
    }
    new YarnAllocationHandler(conf, sparkConf, amClient, getAttemptId(), args,
      preferredNodeLocations, securityMgr) ///创建了一个YarnAllocationHandler,register方法不需要返回值
  }

 

  • 在ApplicationMaster的registerRM方法中,继续调用YarnAllocator的allocateResources方法。这个方法将近300行,主要的功能如其方法说明文档所说,主要是分配container,按照本机优先,本机架次之,其它机架最后的次序进行container分配
  /**
   * Allocate missing containers based on the number of executors currently pending and running.
   *
   * This method prioritizes the allocated container responses from the RM based on node and
   * rack locality. Additionally, it releases any extra containers allocated for this application
   * but are not needed. This must be synchronized because variables read in this block are
   * mutated by other methods.
   */

 

  • 在allocateResources的过程,会执行提交ExecutorRunnable实例到线程池的操作,(这里是对每个分配到的container交给线程池来处理使用)
          val executorRunnable = new ExecutorRunnable( ///实现了Runnable接口的任务
            container,
            conf,
            sparkConf,
            driverUrl,
            executorId,
            executorHostname,
            executorMemory,
            executorCores,
            appAttemptId.getApplicationId.toString,
            securityMgr)
          launcherPool.execute(executorRunnable) ///提交给线程池
        }

 

  • ExecutorRunnable在container中执行run方法中,封装启动org.apache.spark.executor.CoarseGrainedExecutorBackend进程的指令,这个逻辑是在prepareCommand中完成的,代码片段如下:
 val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java", ///java命令所在的位置
      "-server",
      // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
      // Not killing the task leaves various aspects of the executor and (to some extent) the jvm in
      // an inconsistent state.
      // TODO: If the OOM is not recoverable by rescheduling it on different node, then do
      // 'something' to fail job ... akin to blacklisting trackers in mapred ?
      "-XX:OnOutOfMemoryError='kill %p'") ++
      javaOpts ++
      Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend", ///启动CoarseGrainedExecutorBackend进程
      masterAddress.toString,
      slaveId.toString,
      hostname.toString,
      executorCores.toString,
      appId,
      "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
      "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")

 

  • 调用NMClient.startContainer启动Container
    // Send the start request to the ContainerManager
    nmClient.startContainer(container, ctx) //nmClient是NodeManager的实例,输入Hadoop Yarn的API

 

  • 当CoarseGrainedExecutorBackend启动时,代码逻辑回到我们熟悉的轨道上来。调用preStart方法启动Executor
  override def preStart() {
    logInfo("Connecting to driver: " + driverUrl)
    driver = context.actorSelection(driverUrl)
    driver ! RegisterExecutor(executorId, hostPort, cores) ///给Driver发消息RegisterExecutor
    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
  }

 

  • DriverActor收到RegisterExecutor的消息后,调用makeOffer方法,在makeOffer方法中,调用launchTasks方法给ExecutorActor发消息LaunchTask
  • ExecutorBackEnd收到LaunchTask的消息后,调用Executor的launchTask方法,然后通过Executor里面的线程池提交任务到线程池执行

 

总结

 囫囵吞枣似的的将yarn-client模式的执行流程走了一遍,毕竟是第一次接触到这里,同时对Yarn也没有很好的理解,所以这中间还有很多不明白的东西,还得不断的思考,细化。

以两幅图片作为结束,

 

 

 

 

 

 

  • 大小: 8.5 KB
  • 大小: 100.2 KB
分享到:
评论

相关推荐

    spark初始化源码阅读sparkonyarn的client和cluster区别

    Spark 初始化源码阅读 Spark on YARN 的 Client 和 Cluster 区别 Spark 是一个大数据处理的开源框架,它可以在多种集群管理器上运行,如 YARN、Mesos 和 Standalone。Spark on YARN 是 Spark 在 YARN 集群管理器上...

    (word完整版)windows下非submit-方式运行spark-on-yarn(CDH集群).doc

    【标题】: "Windows环境下非Submit方式运行Spark on YARN(CDH集群)" 【描述】: "本文档详细介绍了在Windows系统下,不通过`submit`命令而是采用其他方式运行Spark应用程序在YARN上的方法,适用于CDH集群环境。" ...

    spark-3.2.2-bin-3.0.0-cdh6.3.2

    内容概要:由于cdh6.3.2的spark版本为2.4.0,并且spark-sql被阉割,现基于cdh6.3.2,scala2.12.0,java1.8,maven3.6.3,,对spark-3.2.2源码进行编译 应用:该资源可用于cdh6.3.2集群配置spark客户端,用于spark-sql

    java提交spark任务到yarn平台的配置讲解共9页.pdf.zip

    在提交Spark任务前,需要配置Spark的相关属性,如`spark.master`设置为`yarn-client`或`yarn-cluster`,前者用于客户端模式,后者用于集群模式。此外,还需指定Hadoop的配置目录,例如`spark.yarn.conf.archive`。 ...

    Spark&Yarn手动安装指南

    三、Spark on Yarn的配置 Spark on Yarn是指在Yarn集群中运行Spark应用程序。下面是Spark on Yarn的配置步骤: 1. 修改配置文件:修改spark-defaults.conf和spark-env.sh文件,配置Spark的参数。 2. 提交Spark任务...

    spark-2.1.0-bin-without-hadoop.tgz.7z

    此外,还需要设置SPARK_HOME环境变量,并在启动时指定master节点,例如本地模式(`--master local[n]`)、standalone模式(`--master spark://&lt;master_ip&gt;:&lt;port&gt;`)或YARN模式(`--master yarn-client`或`--master ...

    Spark的Yarn模式

    Spark的Yarn模式是将Spark应用部署在Hadoop的YARN资源管理系统上的方式,这种方式无需单独搭建Spark集群,而是利用YARN的资源管理和调度能力。YARN(Yet Another Resource Negotiator)是Hadoop 2.x版本引入的一个...

    Spark环境搭建——on yarn集群模式

    http://spark.apache.org/docs/latest/running-on-yarn.html 准备工作 安装启动Hadoop(需要使用HDFS和YARN,已经ok) 安装单机版Spark(已经ok) 注意:不需要集群,因为把Spark程序提交给YARN运行本质上是把字节码给...

    Spark简介以及其生态圈

    Spark可以部署在各种集群管理器之上,如Standalone、YARN(Hadoop Yet Another Resource Negotiator)和Mesos等。其生态圈还包含一系列处理不同数据源和数据类型的应用程序库,如Spark SQL用于处理结构化数据,Spark...

    java提交spark任务到yarn平台的配置讲解共9页

    SparkSession spark = SparkSession.builder().appName("Java Spark on YARN").getOrCreate(); // 添加业务逻辑... jsc.stop(); } } ``` 编译Java项目,生成jar文件,例如`my-spark-job.jar`。然后使用`...

    Spark开发指南.pdf

    3. **Spark on YARN** (Hadoop YARN): - **特点**: 基于 Hadoop 的资源管理系统 YARN 运行。 - **应用场景**: 大型企业级应用,可以与现有的 Hadoop 生态系统无缝集成。 - **模式分类**: - **yarn-cluster**: ...

    spark集群部署.docx

    在部署Spark on YARN时,不需要单独部署Spark集群,而是将Spark程序提交到Hadoop YARN中运行。这里假设已经有一个高可用的Hadoop集群,只需在Master节点上部署Spark。首先,从Spark官方网站下载与Hadoop版本兼容的...

    spark-3.2.1 不集成hadoop安装包

    3. **YARN集成**:如果打算在Hadoop的YARN资源管理器上运行Spark作业,需要配置Spark以使用YARN作为其资源调度器。 4. **测试兼容性**:确保所使用的Hadoop版本与Spark 3.2.1兼容,因为不同版本间可能存在API或行为...

    spark安装包+spark实验安装软件

    这时,需要配置Spark的`yarn-client`或`yarn-cluster`模式,并设置相关的Hadoop配置。 **8. Spark的性能调优** 性能调优是Spark应用的关键环节,包括调整executor数量、内存分配、shuffle行为、数据序列化策略等。...

    Hive on Spark安装配置详解.pdf

    在Spark on YARN模式下,又分为Cluster和Client两种部署模式。Cluster模式下,Driver程序在ResourceManager上运行,而Client模式下,Driver在提交应用程序的客户端上运行。 接下来,我们将按照以下步骤进行配置: ...

    Apache Spark常见面试题

    Apache Spark支持多种部署方式,包括local模式、standalone模式、Spark on YARN模式以及Spark on Mesos模式。 - **Local模式**:主要用于本地开发和测试场景。在这种模式下,所有Spark任务都在单台机器上运行,无需...

    spark相关jar包

    6. **Hadoop相关库**:Spark通常运行在Hadoop YARN或HDFS之上,因此需要Hadoop的相关jar包,如`hadoop-client.jar`、`hadoop-common.jar`、`hadoop-hdfs.jar`等。 7. **其他依赖**:根据项目需求,可能还需要包括...

    SparkYARN.pdf

    总结来看,对于在安全的 YARN 上部署和运行 Spark 应用程序,需要考虑认证机制、委托令牌的生命周期和安全通信等多个方面。通过优化现有的设计和引入新的机制,可以解决现有系统在处理长时间运行的应用程序时所面临...

    spark基础,关于spark的安装和几种模式的部署

    3. **Yarn模式**:Spark on Yarn模式下,Spark作为一个客户端,提交任务给Yarn进行资源管理和调度。Yarn模式有两种提交模式:Yarn Cluster模式和Yarn Client模式。Cluster模式下,Driver运行在Application Master...

Global site tag (gtag.js) - Google Analytics