Spark的部署和Application提交方式会按照Cluster Manager和Deploy Mode进行划分。。。以前只知道Spark的Cluster部署分为Standalone、YARN和Mesos。。。
关于Cluster Manager和Deploy Mode的组合在SparkSubmit.scala的createLaunchEnv中有比较详细的逻辑。
Cluster Manager基本上有Standalone,YARN和Mesos三种情况,说明Cluster Manager用来指明集群的资源管理器。这就是说不管是Client还是Cluster部署方式(deployMode的两种可能),都会使用它们做集群管理器,也就是说Client也是一种集群部署方式???
/** * @return a tuple containing * (1) the arguments for the child process, * (2) a list of classpath entries for the child, * (3) a list of system properties and env vars, and * (4) the main class for the child */ //createLaunchEnv的方法返回值 //1.子进程的参数,字符串数组,ArrayBuffer[String] //2.子进程JVM的classpath路径列表,字符串数组,ArrayBuffer[String] //3.子进程的系统变量和环境变量 ,HashMap类型 //4.子进程JVM的main class private[spark] def createLaunchEnv(args: SparkSubmitArguments) : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = { // Values to return val childArgs = new ArrayBuffer[String]() val childClasspath = new ArrayBuffer[String]() val sysProps = new HashMap[String, String]() var childMainClass = "" // Set the cluster manager //集群管理器,这里指定了四种:YARN,STANDALONE,MESON和LOCAL //需要注意的是,为什么LOCAL也是一种集群管理器,它的集群含义是什么? //根据args.master参数值决定clusterManager,注意,区分大小写 //这里只检查master是否以yarn, spark, mesos或者local开头,实际中,以yarn开头的master值可能是yarn-client,yarn-cluster,yarn-standalone,所以代码后面对master做了更进一步的检查 val clusterManager: Int = args.master match { case m if m.startsWith("yarn") => YARN case m if m.startsWith("spark") => STANDALONE case m if m.startsWith("mesos") => MESOS case m if m.startsWith("local") => LOCAL //如果master不以这四个开头,提示出错信息是***Master***必须以yarn, spark, mesos, or local开头 case _ => printErrorAndExit("Master must start with yarn, spark, mesos, or local"); -1 } // Set the deploy mode; default is client mode //设置部署模式,有两种模式,client和cluster模式 //如果没有设置deployMode(取值null),则默认认为是client模式 var deployMode: Int = args.deployMode match { case "client" | null => CLIENT case "cluster" => CLUSTER case _ => printErrorAndExit("Deploy mode must be either client or cluster"); -1 } // Because "yarn-cluster" and "yarn-client" encapsulate both the master // and deploy mode, we have some logic to infer the master and deploy mode // from each other if only one is specified, or exit early if they are at odds. ///因为yarn-cluster和yarn-client封装了master和deployMode, 这里对yarn-cluster和yarn-client两种集群管理器和部署模式的组合进行了特殊处理 ///只要知道一个就可以推倒出另一个we have some logic to infer the master and deploy mode from each other //如果args.master以yarn开头(导致clusterManager == YARN为true) if (clusterManager == YARN) { ///如果clusterManager是YARN,那么说明master是以yarn开头 if (args.master == "yarn-standalone") { //如果master是yarn-standalone,则提示说master的值yarn-standalone已经不推荐使用, printWarning("\"yarn-standalone\" is deprecated. Use \"yarn-cluster\" instead.") args.master = "yarn-cluster" ///将master值改为yarn-cluster } (args.master, args.deployMode) match { ///对args.master和args.deployMode的组合进行检查和整理 case ("yarn-cluster", null) => ///如果args.deployMode为null,那么在前面的逻辑中,deployMode的值是CLIENT, deployMode = CLUSTER //根据args.master的取值yarn-cluster,将deployMode的值改为CLUSTER case ("yarn-cluster", "client") => ////如果args.master, args.deployMode是yarn-cluster,client,那么认为这两个值是冲突的,退出 printErrorAndExit("Client deploy mode is not compatible with master \"yarn-cluster\"") case ("yarn-client", "cluster") => ////如果args.master, args.deployMode是yarn-client,cluster,那么认为这两个值是冲突的,退出 printErrorAndExit("Cluster deploy mode is not compatible with master \"yarn-client\"") case (_, mode) => ///其他情况下,将args.master改为如下值(代码执行到这里的前提是args.master以yarn开头) ///通过前面的代码,mode只有三种可能的取值:null,cluster和client //如果mode是client,那么args.master是yarn-client //如果mode是null,那么args.master是yarn-client //如果mode是cluster,args.master只能是yarn-cluster,因为(yarn-client,cluster)情况在前面判断过了 args.master = "yarn-" + Option(mode).getOrElse("client") } // Make sure YARN is included in our build if we're trying to use it //在YARN的集群模式下检查Spark安装包的完整性,为什么一定要检查org.apache.spark.deploy.yarn.Client类的存在?因为后面的代码中会通过反射调用Client的main方法 //注意,Client是的包名中有个yarn,说明这个Client用于跟YARN集群模式有关 if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) { printErrorAndExit( "Could not load YARN classes. " + "This copy of Spark may not have been compiled with YARN support.") } } // The following modes are not supported or applicable ///对clusterManager和deployMode不支持的组合进行检查 (clusterManager, deployMode) match { case (MESOS, CLUSTER) =>///MESOS和CLUSTER不共存 printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.") case (_, CLUSTER) if args.isPython =>///Python应用程序不支持集群部署模式 printErrorAndExit("Cluster deploy mode is currently not supported for python applications.") case (_, CLUSTER) if isShell(args.primaryResource) => ///Spark Shell不支持集群部署方式?通过Spark Shell提交Application是一种Client方式??? printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.") case _ => } // If we're running a python app, set the main class to our specific python runner if (args.isPython) { ///如果python应用程序,我表示不关心,忽略之 if (args.primaryResource == PYSPARK_SHELL) { args.mainClass = "py4j.GatewayServer" args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0") } else { // If a python file is provided, add it to the child arguments and list of files to deploy. // Usage: PythonAppRunner <main python file> <extra python files> [app arguments] args.mainClass = "org.apache.spark.deploy.PythonRunner" args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs args.files = mergeFileLists(args.files, args.primaryResource) } args.files = mergeFileLists(args.files, args.pyFiles) if (args.pyFiles != null) { sysProps("spark.submit.pyFiles") = args.pyFiles } } // Special flag to avoid deprecation warnings at the client sysProps("SPARK_SUBMIT") = "true" ///SPARK_SUBMIT这个参数用来做什么的?注意sysProps是个Scala的HashMap // A list of rules to map each argument to system properties or command-line options in // each deploy mode; we iterate through these below ///这是在做什么??一系列规则(用于在各种部署模式下,把每个argument映射为system properties或者command-line options的规则) val options = List[OptionAssigner]( // All cluster managers ///sysProp是指定参数的函数参数传值,此处没有给clOption赋值 OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"), OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"), ///args.jars指定了提交的Application的jars相关的东西 OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"), OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.driver.memory"), OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.driver.extraClassPath"), OptionAssigner(args.driverExtraJavaOptions, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.driver.extraJavaOptions"), OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.driver.extraLibraryPath"), // Standalone cluster only ///只适用于Standalone集群,属性无需以spark开头,因为都在spark中 OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"), OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"), //命令行赋值 OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"), // Yarn client only //只适用于Yarn client,因为由Yarn进行管理,属性名都带有spark OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"), OptionAssigner(args.numExecutors, YARN, CLIENT, sysProp = "spark.executor.instances"), OptionAssigner(args.executorCores, YARN, CLIENT, sysProp = "spark.executor.cores"), OptionAssigner(args.files, YARN, CLIENT, sysProp = "spark.yarn.dist.files"), OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"), // Yarn cluster only ///只适用于Yarn cluster,为命令行选项赋值 OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"), OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"), OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"), OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"), OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = "--executor-memory"), OptionAssigner(args.executorCores, YARN, CLUSTER, clOption = "--executor-cores"), OptionAssigner(args.files, YARN, CLUSTER, clOption = "--files"), OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"), OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"), // Other options OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES, sysProp = "spark.executor.memory"), OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.cores.max"), OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.files") ) ///如果CLIENT部署模式,使用application的jar中指定的可运行的Main Class // In client mode, launch the application main class directly // In addition, add the main application jar and any added jars (if any) to the classpath if (deployMode == CLIENT) { childMainClass = args.mainClass if (isUserJar(args.primaryResource)) { childClasspath += args.primaryResource } if (args.jars != null) { childClasspath ++= args.jars.split(",") } if (args.childArgs != null) { childArgs ++= args.childArgs } } // Map all arguments to command-line options or system properties for our chosen mode for (opt <- options) { if (opt.value != null && (deployMode & opt.deployMode) != 0 && (clusterManager & opt.clusterManager) != 0) { if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) } if (opt.sysProp != null) { sysProps.put(opt.sysProp, opt.value) } } } // Add the application jar automatically so the user doesn't have to call sc.addJar // For YARN cluster mode, the jar is already distributed on each node as "app.jar" // For python files, the primary resource is already distributed as a regular file val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER if (!isYarnCluster && !args.isPython) { var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty) if (isUserJar(args.primaryResource)) { jars = jars ++ Seq(args.primaryResource) } sysProps.put("spark.jars", jars.mkString(",")) } // In standalone-cluster mode, use Client as a wrapper around the user class if (clusterManager == STANDALONE && deployMode == CLUSTER) { childMainClass = "org.apache.spark.deploy.Client" if (args.supervise) { childArgs += "--supervise" } childArgs += "launch" childArgs += (args.master, args.primaryResource, args.mainClass) if (args.childArgs != null) { childArgs ++= args.childArgs } } // In yarn-cluster mode, use yarn.Client as a wrapper around the user class ///在Yarn if (isYarnCluster) { childMainClass = "org.apache.spark.deploy.yarn.Client" if (args.primaryResource != SPARK_INTERNAL) { childArgs += ("--jar", args.primaryResource) } childArgs += ("--class", args.mainClass) if (args.childArgs != null) { args.childArgs.foreach { arg => childArgs += ("--arg", arg) } } } // Load any properties specified through --conf and the default properties file for ((k, v) <- args.sparkProperties) { sysProps.getOrElseUpdate(k, v) } // Ignore invalid spark.driver.host in cluster modes. if (deployMode == CLUSTER) { sysProps -= ("spark.driver.host") } // Resolve paths in certain spark properties val pathConfigs = Seq( "spark.jars", "spark.files", "spark.yarn.jar", "spark.yarn.dist.files", "spark.yarn.dist.archives") pathConfigs.foreach { config => // Replace old URIs with resolved URIs, if they exist sysProps.get(config).foreach { oldValue => sysProps(config) = Utils.resolveURIs(oldValue) } } // Resolve and format python file paths properly before adding them to the PYTHONPATH. // The resolving part is redundant in the case of --py-files, but necessary if the user // explicitly sets `spark.submit.pyFiles` in his/her default properties file. sysProps.get("spark.submit.pyFiles").foreach { pyFiles => val resolvedPyFiles = Utils.resolveURIs(pyFiles) val formattedPyFiles = PythonRunner.formatPaths(resolvedPyFiles).mkString(",") sysProps("spark.submit.pyFiles") = formattedPyFiles } (childArgs, childClasspath, sysProps, childMainClass) }
OptionAssigner
/** * Provides an indirection layer for passing arguments as system properties or flags to * the user's driver program or to downstream launcher tools. */ private[spark] case class OptionAssigner( value: String, clusterManager: Int, deployMode: Int, clOption: String = null, sysProp: String = null)
相关推荐
Spark Standalone 是 Apache Spark 提供的一种自带的集群管理模式,主要用于管理 Spark 应用程序的执行环境。这种模式简单易用,适合于开发测试以及中小型生产环境。 #### Spark Standalone 部署配置 ##### ...
1. 实验描述:本实验旨在搭建一个基于Standalone模式的Spark集群,包括解压安装包、配置环境变量、启动集群及验证安装效果。 2. 实验环境:使用3台虚拟机,操作系统为Centos 7.5,Hadoop版本为2.7.3,Spark版本为...
Spark2.4.3集群部署是将Spark集群安装到多台机器上,以达到分布式计算和大规模数据处理的目的。下面是Spark2.4.3集群部署的详细知识点: 1. Master 节点配置 Master 节点是Spark集群的中心节点,负责协调和管理整个...
2. **Cluster Manager**:管理整个Spark集群的资源,可以是Spark原生的、Apache Mesos或Hadoop YARN。它根据应用需求分配和释放Worker Node上的资源。 3. **Executor**:在Worker Node上运行的进程,每个Executor...
《Spark集群模式安装详解》 Spark,作为大数据处理领域中的重要工具,因其高效、易用和灵活的特点,深受广大开发者喜爱。在实际应用中,为了处理大规模数据,往往需要将Spark部署在集群环境中。本篇将详细介绍Spark...
在这个实验中,我们将详细探讨如何在Yarn模式下安装和部署Spark集群。 首先,我们需要准备实验环境,这里包括三台虚拟机,操作系统为CentOS 7.5,Hadoop版本为2.7.3,Spark版本为2.1.1。这些版本的兼容性对于实验的...
Spark 支持三种分布式部署模式:standalone、spark on mesos 和 spark on YARN。其中,standalone 模式是 Spark 的默认模式,不需要其他的资源管理系统。在 standalone 模式下,Spark 可以以单机模式或伪分布式模式...
本文将深入探讨如何搭建Hadoop集群,并在此基础上配置YARN资源管理器,以及如何部署Spark集群,同时也会涉及到Spark的Python编程指南。 首先,让我们了解Hadoop。Hadoop是一个开源框架,主要用于分布式存储和计算,...
`spark-submit` 是一个命令行工具,用于将 Spark 应用程序提交到集群执行。它可以连接到各种类型的集群管理器,如 Standalone、YARN 或 Mesos,同时支持本地模式运行。通过 `spark-submit`,开发者可以指定应用程序...
折腾了很久,终于开始学习Spark的源码...这个是提交到standalone集群的方式,打开spark-submit这文件,我们会发现它最后是调用了org.apache.spark.deploy.SparkSubmit这个类。我们直接进去看就行了,main函数就几行代码
这些节点将共同构成一个Spark集群。 **1.1 环境规划** - **Master节点:** cancer01 - **Worker节点:** cancer02, cancer03, cancer04, cancer05 **1.2 系统环境准备** - 操作系统: 假设为Linux (推荐CentOS 7或...
- **Master/Worker模式**: Spark集群通常采用Master/Worker架构,其中Master节点负责任务调度,Worker节点执行具体计算任务。 - **部署方式**: 可选择Standalone模式或者YARN模式。本文重点介绍YARN模式下的部署。 ...
2. **配置参数**:确定要传递给SparkSubmit的参数,如输入数据路径、主类、内存分配等。 3. **提交任务**:运行`bin/spark-submit`命令,指定Master地址、应用jar文件、主类以及其它相关配置。 总的来说,Spark ...
在Spark集群搭建的过程中,我们需要了解一系列关键概念和技术。首先,Spark是大数据处理领域的一个高性能、通用的并行计算框架,它构建在Hadoop之上,但提供了更高效的内存计算能力,适用于实时数据处理、批量数据...
### Spark集群与应用知识点 #### 一、Spark概述 ##### 1、Spark简介 ...通过这些内容的学习,读者可以对Spark有一个全面的理解,并能够搭建起自己的Spark集群环境,为后续的大数据分析和处理打下坚实的基础。
### Spark集群及相关生态链部署知识点 #### 一、前期准备与环境配置 1. **主机名配置** - 使用命令 `sudo hostname master` 来设置主节点的主机名为`master`。 - 此步骤是确保集群中的每台机器都有唯一的标识。 ...
2. **创建SparkContext**:Spark应用程序的入口点是SparkContext,它连接到Spark集群并管理所有资源。 3. **加载数据**:使用Spark的API读取HDFS、Cassandra、Hive等数据源的数据。 4. **转换和操作数据**:利用...
在Spark内核篇02中,我们主要讨论了Spark在三种不同模式下的运行机制:Yarn模式、Yarn-Client模式和Standalone模式,包括这两种运行模式下的Cluster和Client模式。 首先,我们来看Yarn模式的运行流程: 1. 用户通过...
- 使用SparkSubmit工具提交应用程序到集群。 - Spark Shell提供了一个交互式的环境,可以直接测试代码。 7. 性能优化: - 调整Executor的数量、内存大小和CPU核心分配。 - 使用宽依赖减少shuffle操作以优化数据...
5. 使用`spark-submit`脚本提交Spark作业到YARN上运行,或者在本地模式或standalone模式下启动Spark Shell进行交互式测试。 Spark的使用场景广泛,涵盖了数据批处理、实时流处理、机器学习和图计算等。在大数据领域...