按照Spark的部署设置,对于Spark运行于Yarn之上,有如下四种选择方式(本质上是两种),
- yarn-client+client
- yarn-cluster+cluster
- yarn-client(部署方式默认为client)
- yarn-cluster(部署方式默认为cluster)
yarn-client+cluster组合以及yarn-cluster+client是不正确的组合,Spark报错退出。
本文首先探讨Spark On Yarn之yarn-client+client方式部署下的代码执行流程
程序提交给Yarn运行时环境
- 对于部署方式是Client的情况,SparkSubmit的main函数中通过反射执行应用程序的main方法
- 在应用程序的main方法中,创建SparkContext实例
- 在创建SparkContext的实例过程中,通过如下语句创建Scheduler和Backend实例
private[spark] var (schedulerBackend, taskScheduler) = SparkContext.createTaskScheduler(this, master)
- 由于当前是yarn-client和client组合部署模式,
1.代码执行逻辑是: taskScheduler是org.apache.spark.scheduler.cluster.YarnClientClusterScheduler实例,它是TaskSchedulerImpl的子类,它的文档说明为
/**
* This scheduler launches executors through Yarn - by calling into Client to launch the Spark AM.
*/
2.schedulerBackend是org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend实例,它是CoarseGrainedSchedulerBackend的子类,它是文档说明为无
- 继续SparkContext实例的创建过程,调用taskScheduler的start方法,也即YarnClientClusterScheduler的start方法,因为YarnClientClusterScheduler并没有覆盖TaskSchedulerImpl的start方法,所以执行逻辑进入到TaskSchedulerImpl的start方法中
- 在TaskSchedulerImpl的start方法中,调用backend的start方法,由于此处的backend是YarnClientSchedulerBackend,所以代码逻辑进入到YarnClientSchedulerBackend的start方法中
- 在YarnClientSchedulerBackend的start方法中,创建YarnClient(将用户编写的应用程序提交给Yarn的ResourceManager)
- 在YarnClient创建yarn.Client对象,然后调用submitApplication,等待Application执行完,如下代码所示
client = new Client(args, conf) //yarn.client
appId = client.submitApplication()
waitForApplication() ///阻塞等待Application进入Running状态
asyncMonitorApplication() ///异步监控Application的运行状态,If the application has exited for any reason, stop the SparkContext.
- 程序逻辑进入了yarn.Client调用submitApplication的逻辑,执行代码:
1.submitApplication的代码(Spark)
/**
* Submit an application running our ApplicationMaster to the ResourceManager.
*
* The stable Yarn API provides a convenience method (YarnClient#createApplication) for
* creating applications and setting up the application submission context. This was not
* available in the alpha API.
*/
////这里借助Hadoop Yarn提供的API提交应用程序,这里的API是Hadoop Yarn的YarnClient
override def submitApplication(): ApplicationId = {
yarnClient.init(yarnConf) ////yarnClient是通过YarnClient.createYarnClient创建,而YarnClient是Hadoop API,所以yarnClient也是Hadoop的API
yarnClient.start() ///启动Yarn
logInfo("Requesting a new application from cluster with %d NodeManagers" ///NodeManager是什么概念?
.format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
// Get a new application from our RM
val newApp = yarnClient.createApplication()///代码运行到此处,还没有真正的把程序代码提交给Yarn去运行;这里使用YarnClient创建一个Application,类型为YarnClientApplication
val newAppResponse = newApp.getNewApplicationResponse() ///返回GetNewApplicationResponse类型
val appId = newAppResponse.getApplicationId() ///获取applicationId
// Verify whether the cluster has enough resources for our AM
verifyClusterResources(newAppResponse)
// Set up the appropriate contexts to launch our AM
val containerContext = createContainerLaunchContext(newAppResponse) ///创建启动ApplicationMaster容器的上下文环境
val appContext = createApplicationSubmissionContext(newApp, containerContext)///依据创建的newApp和containerContext,创建应用上下文环境
// Finally, submit and monitor the application
logInfo(s"Submitting application ${appId.getId} to ResourceManager")
yarnClient.submitApplication(appContext) ///根据创建的applicationContext,由Hadoop Yarn的yarnClient提交作业
appId
}
2. YarnClient的createApplication代码(Hadoop Yarn)
@Override
public YarnClientApplication createApplication()
throws YarnException, IOException {
ApplicationSubmissionContext context = Records.newRecord
(ApplicationSubmissionContext.class);
GetNewApplicationResponse newApp = getNewApplication();
ApplicationId appId = newApp.getApplicationId();
context.setApplicationId(appId);
return new YarnClientApplication(newApp, context);
}
3. YarnClientApplication的getNewApplicationResponsed代码(Hadoop Yarn)
public GetNewApplicationResponse getNewApplicationResponse() {
return newAppResponse;//类型为GetNewApplicationResponse
}
4. createContainerLaunchContext代码(Spark)
/**
* Set up a ContainerLaunchContext to launch our ApplicationMaster container.
* This sets up the launch environment, java options, and the command for launching the AM.
*/
///创建启动ApplicationMaster容器的上下文环境
protected def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
: ContainerLaunchContext = {
logInfo("Setting up container launch context for our AM")
val appId = newAppResponse.getApplicationId
val appStagingDir = getAppStagingDir(appId)
val localResources = prepareLocalResources(appStagingDir) ///准备本地资源
val launchEnv = setupLaunchEnv(appStagingDir)
val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) ///创建ApplicationMaster容器,类型为ContainerLaunchContext
amContainer.setLocalResources(localResources)
amContainer.setEnvironment(launchEnv)
val javaOpts = ListBuffer[String]()
// Set the environment variable through a command prefix
// to append to the existing value of the variable
var prefixEnv: Option[String] = None
// Add Xmx for AM memory
javaOpts += "-Xmx" + args.amMemory + "m"
val tmpDir = new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
javaOpts += "-Djava.io.tmpdir=" + tmpDir
// TODO: Remove once cpuset version is pushed out.
// The context is, default gc for server class machines ends up using all cores to do gc -
// hence if there are multiple containers in same node, Spark GC affects all other containers'
// performance (which can be that of other Spark containers)
// Instead of using this, rely on cpusets by YARN to enforce "proper" Spark behavior in
// multi-tenant environments. Not sure how default Java GC behaves if it is limited to subset
// of cores on a node.
val useConcurrentAndIncrementalGC = launchEnv.get("SPARK_USE_CONC_INCR_GC").exists(_.toBoolean)
if (useConcurrentAndIncrementalGC) {
// In our expts, using (default) throughput collector has severe perf ramifications in
// multi-tenant machines
javaOpts += "-XX:+UseConcMarkSweepGC"
javaOpts += "-XX:+CMSIncrementalMode"
javaOpts += "-XX:+CMSIncrementalPacing"
javaOpts += "-XX:CMSIncrementalDutyCycleMin=0"
javaOpts += "-XX:CMSIncrementalDutyCycle=10"
}
// Forward the Spark configuration to the application master / executors.
// TODO: it might be nicer to pass these as an internal environment variable rather than
// as Java options, due to complications with string parsing of nested quotes.
for ((k, v) <- sparkConf.getAll) {
javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v")
}
// Include driver-specific java options if we are launching a driver
if (isLaunchingDriver) { //什么情况是启动Driver??
sparkConf.getOption("spark.driver.extraJavaOptions")
.orElse(sys.env.get("SPARK_JAVA_OPTS"))
.foreach(opts => javaOpts += opts)
val libraryPaths = Seq(sys.props.get("spark.driver.extraLibraryPath"),
sys.props.get("spark.driver.libraryPath")).flatten
if (libraryPaths.nonEmpty) { ///此处对prefixEnv进行唯一的赋值,
prefixEnv = Some(Utils.libraryPathEnvPrefix(libraryPaths))
}
}
// For log4j configuration to reference
javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)
val userClass = ///只有启动Driver的情况下才会设置--class
if (isLaunchingDriver) {
Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))
} else {
Nil
}
val userJar = ////应用程序的jar文件
if (args.userJar != null) {
Seq("--jar", args.userJar)
} else {
Nil
}
val amClass = ///如果是Driver,则是ApplicationMaster,否则是ExecutorLauncher
if (isLaunchingDriver) {
Class.forName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else {
Class.forName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
val userArgs = args.userArgs.flatMap { arg =>
Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
}
val amArgs = ///ApplicationMaster --class --jar
Seq(amClass) ++ userClass ++ userJar ++ userArgs ++
Seq(
"--executor-memory", args.executorMemory.toString + "m",
"--executor-cores", args.executorCores.toString,
"--num-executors ", args.numExecutors.toString)
// Command for the ApplicationMaster
///封装要执行的命令,使用java -server javaOpts armArgs
val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
javaOpts ++ amArgs ++
Seq(
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
// TODO: it would be nicer to just make sure there are no null commands here
val printableCommands = commands.map(s => if (s == null) "null" else s).toList
//amContainer包含的命令
amContainer.setCommands(printableCommands)
logDebug("===============================================================================")
logDebug("Yarn AM launch context:")
logDebug(s" user class: ${Option(args.userClass).getOrElse("N/A")}")
logDebug(" env:")
launchEnv.foreach { case (k, v) => logDebug(s" $k -> $v") }
logDebug(" resources:")
localResources.foreach { case (k, v) => logDebug(s" $k -> $v")}
logDebug(" command:")
logDebug(s" ${printableCommands.mkString(" ")}")
logDebug("===============================================================================")
// send the acl settings into YARN to control who has access via YARN interfaces
val securityManager = new SecurityManager(sparkConf)
amContainer.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager))
setupSecurityToken(amContainer)
UserGroupInformation.getCurrentUser().addCredentials(credentials)
amContainer
}
5. createApplicationSubmissionContext的代码(Spark)
/**
* Set up the context for submitting our ApplicationMaster.
* This uses the YarnClientApplication not available in the Yarn alpha API.
*/
///提交ApplicationMaster的上下文环境
def createApplicationSubmissionContext(
newApp: YarnClientApplication,
containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
val appContext = newApp.getApplicationSubmissionContext
appContext.setApplicationName(args.appName) ///应用程序的名字
appContext.setQueue(args.amQueue)
appContext.setAMContainerSpec(containerContext) ///将创建的ContainerLaunchContext包装到appContext中
appContext.setApplicationType("SPARK") ////UI上当应用程序执行完成后,显示的应用程序的类型
val capability = Records.newRecord(classOf[Resource])
capability.setMemory(args.amMemory + amMemoryOverhead)
appContext.setResource(capability)
appContext
}
- 上面分析了yarn.Client调用submitApplication详细逻辑,程序回到的start方法中,继续下面的逻辑
waitForApplication() ///阻塞等待程序进入Running状态
asyncMonitorApplication() ///异步监控程序的运行状态
- 当作业提交到Yarn中之后,Yarn创建一个进程。如果是Driver则运行ApplicationMaster,否则运行ExecutorLauncher。是否是Driver通过yarn.client的isLaunchingDriver变量决定。isLaunchingDriver取值依赖于args.userClass是否存在,用户的指令中提供了--class参数,则args.userClass中的值就是用户提供的--class参数。此逻辑在.yarn.ClientArguments类中的parseArgs中
- 我们的spark-submit提供了--class参数,所以,Yarn将启动ApplicationMaster进程。也就是说,在Yarn的运行时环境中,启动了Spark的ApplicationMaster进程
在Yarn运行时环境中运行Spark的ApplicationMaster进程的执行逻辑
- 代码逻辑进入到ApplicationMaster的main方法中
def main(args: Array[String]) = {
SignalLogger.register(log)
val amArgs = new ApplicationMasterArguments(args) ///args是什么内容?
SparkHadoopUtil.get.runAsSparkUser { () =>
master = new ApplicationMaster(amArgs, new YarnRMClientImpl(amArgs)) ///因为ApplicationMaster需要跟Yarn的ResourceManager交互,所以这里需要访问RM的YarnRMClientImpl实例
System.exit(master.run()) ///执行ApplicationMaster的run方法
}
}
- 在ApplicationMaster的run方法中调用ApplicationMaster的runDriver或者runExecutorLauncher方法,究竟运行哪一个??
- 此处的逻辑先暂时不表,因为还没有清楚ApplicationMaster的runDriver或者runExecutorLauncher方法,究竟运行哪一个,不过最终的逻辑都会走进ApplicationMaster的registerAM中,先继续吧!!!
- 在ApplicationMaster的registerAM方法中,调用YarnRMClient的register方法,如下是代码:
override def register(
conf: YarnConfiguration,
sparkConf: SparkConf,
preferredNodeLocations: Map[String, Set[SplitInfo]],
uiAddress: String,
uiHistoryAddress: String,
securityMgr: SecurityManager) = {
amClient = AMRMClient.createAMRMClient()
amClient.init(conf)
amClient.start()
this.uiHistoryAddress = uiHistoryAddress
logInfo("Registering the ApplicationMaster") ///注册APPlicationMaster到Yarn
synchronized {
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)///调用AMRMClient的registerApplicationMaster方法
registered = true
}
new YarnAllocationHandler(conf, sparkConf, amClient, getAttemptId(), args,
preferredNodeLocations, securityMgr) ///创建了一个YarnAllocationHandler,register方法不需要返回值
}
- 在ApplicationMaster的registerRM方法中,继续调用YarnAllocator的allocateResources方法。这个方法将近300行,主要的功能如其方法说明文档所说,主要是分配container,按照本机优先,本机架次之,其它机架最后的次序进行container分配
/**
* Allocate missing containers based on the number of executors currently pending and running.
*
* This method prioritizes the allocated container responses from the RM based on node and
* rack locality. Additionally, it releases any extra containers allocated for this application
* but are not needed. This must be synchronized because variables read in this block are
* mutated by other methods.
*/
- 在allocateResources的过程,会执行提交ExecutorRunnable实例到线程池的操作,(这里是对每个分配到的container交给线程池来处理使用)
val executorRunnable = new ExecutorRunnable( ///实现了Runnable接口的任务
container,
conf,
sparkConf,
driverUrl,
executorId,
executorHostname,
executorMemory,
executorCores,
appAttemptId.getApplicationId.toString,
securityMgr)
launcherPool.execute(executorRunnable) ///提交给线程池
}
- ExecutorRunnable在container中执行run方法中,封装启动org.apache.spark.executor.CoarseGrainedExecutorBackend进程的指令,这个逻辑是在prepareCommand中完成的,代码片段如下:
val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java", ///java命令所在的位置
"-server",
// Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
// Not killing the task leaves various aspects of the executor and (to some extent) the jvm in
// an inconsistent state.
// TODO: If the OOM is not recoverable by rescheduling it on different node, then do
// 'something' to fail job ... akin to blacklisting trackers in mapred ?
"-XX:OnOutOfMemoryError='kill %p'") ++
javaOpts ++
Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend", ///启动CoarseGrainedExecutorBackend进程
masterAddress.toString,
slaveId.toString,
hostname.toString,
executorCores.toString,
appId,
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
- 调用NMClient.startContainer启动Container
// Send the start request to the ContainerManager
nmClient.startContainer(container, ctx) //nmClient是NodeManager的实例,输入Hadoop Yarn的API
- 当CoarseGrainedExecutorBackend启动时,代码逻辑回到我们熟悉的轨道上来。调用preStart方法启动Executor
override def preStart() {
logInfo("Connecting to driver: " + driverUrl)
driver = context.actorSelection(driverUrl)
driver ! RegisterExecutor(executorId, hostPort, cores) ///给Driver发消息RegisterExecutor
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
- DriverActor收到RegisterExecutor的消息后,调用makeOffer方法,在makeOffer方法中,调用launchTasks方法给ExecutorActor发消息LaunchTask
- ExecutorBackEnd收到LaunchTask的消息后,调用Executor的launchTask方法,然后通过Executor里面的线程池提交任务到线程池执行
总结
囫囵吞枣似的的将yarn-client模式的执行流程走了一遍,毕竟是第一次接触到这里,同时对Yarn也没有很好的理解,所以这中间还有很多不明白的东西,还得不断的思考,细化。
以两幅图片作为结束,
相关推荐
Spark 初始化源码阅读 Spark on YARN 的 Client 和 Cluster 区别 Spark 是一个大数据处理的开源框架,它可以在多种集群管理器上运行,如 YARN、Mesos 和 Standalone。Spark on YARN 是 Spark 在 YARN 集群管理器上...
【标题】: "Windows环境下非Submit方式运行Spark on YARN(CDH集群)" 【描述】: "本文档详细介绍了在Windows系统下,不通过`submit`命令而是采用其他方式运行Spark应用程序在YARN上的方法,适用于CDH集群环境。" ...
内容概要:由于cdh6.3.2的spark版本为2.4.0,并且spark-sql被阉割,现基于cdh6.3.2,scala2.12.0,java1.8,maven3.6.3,,对spark-3.2.2源码进行编译 应用:该资源可用于cdh6.3.2集群配置spark客户端,用于spark-sql
在提交Spark任务前,需要配置Spark的相关属性,如`spark.master`设置为`yarn-client`或`yarn-cluster`,前者用于客户端模式,后者用于集群模式。此外,还需指定Hadoop的配置目录,例如`spark.yarn.conf.archive`。 ...
三、Spark on Yarn的配置 Spark on Yarn是指在Yarn集群中运行Spark应用程序。下面是Spark on Yarn的配置步骤: 1. 修改配置文件:修改spark-defaults.conf和spark-env.sh文件,配置Spark的参数。 2. 提交Spark任务...
此外,还需要设置SPARK_HOME环境变量,并在启动时指定master节点,例如本地模式(`--master local[n]`)、standalone模式(`--master spark://<master_ip>:<port>`)或YARN模式(`--master yarn-client`或`--master ...
Spark的Yarn模式是将Spark应用部署在Hadoop的YARN资源管理系统上的方式,这种方式无需单独搭建Spark集群,而是利用YARN的资源管理和调度能力。YARN(Yet Another Resource Negotiator)是Hadoop 2.x版本引入的一个...
http://spark.apache.org/docs/latest/running-on-yarn.html 准备工作 安装启动Hadoop(需要使用HDFS和YARN,已经ok) 安装单机版Spark(已经ok) 注意:不需要集群,因为把Spark程序提交给YARN运行本质上是把字节码给...
Spark可以部署在各种集群管理器之上,如Standalone、YARN(Hadoop Yet Another Resource Negotiator)和Mesos等。其生态圈还包含一系列处理不同数据源和数据类型的应用程序库,如Spark SQL用于处理结构化数据,Spark...
SparkSession spark = SparkSession.builder().appName("Java Spark on YARN").getOrCreate(); // 添加业务逻辑... jsc.stop(); } } ``` 编译Java项目,生成jar文件,例如`my-spark-job.jar`。然后使用`...
3. **Spark on YARN** (Hadoop YARN): - **特点**: 基于 Hadoop 的资源管理系统 YARN 运行。 - **应用场景**: 大型企业级应用,可以与现有的 Hadoop 生态系统无缝集成。 - **模式分类**: - **yarn-cluster**: ...
在部署Spark on YARN时,不需要单独部署Spark集群,而是将Spark程序提交到Hadoop YARN中运行。这里假设已经有一个高可用的Hadoop集群,只需在Master节点上部署Spark。首先,从Spark官方网站下载与Hadoop版本兼容的...
3. **YARN集成**:如果打算在Hadoop的YARN资源管理器上运行Spark作业,需要配置Spark以使用YARN作为其资源调度器。 4. **测试兼容性**:确保所使用的Hadoop版本与Spark 3.2.1兼容,因为不同版本间可能存在API或行为...
在Spark on YARN模式下,又分为Cluster和Client两种部署模式。Cluster模式下,Driver程序在ResourceManager上运行,而Client模式下,Driver在提交应用程序的客户端上运行。 接下来,我们将按照以下步骤进行配置: ...
Apache Spark支持多种部署方式,包括local模式、standalone模式、Spark on YARN模式以及Spark on Mesos模式。 - **Local模式**:主要用于本地开发和测试场景。在这种模式下,所有Spark任务都在单台机器上运行,无需...
6. **Hadoop相关库**:Spark通常运行在Hadoop YARN或HDFS之上,因此需要Hadoop的相关jar包,如`hadoop-client.jar`、`hadoop-common.jar`、`hadoop-hdfs.jar`等。 7. **其他依赖**:根据项目需求,可能还需要包括...
这时,需要配置Spark的`yarn-client`或`yarn-cluster`模式,并设置相关的Hadoop配置。 **8. Spark的性能调优** 性能调优是Spark应用的关键环节,包括调整executor数量、内存分配、shuffle行为、数据序列化策略等。...
总结来看,对于在安全的 YARN 上部署和运行 Spark 应用程序,需要考虑认证机制、委托令牌的生命周期和安全通信等多个方面。通过优化现有的设计和引入新的机制,可以解决现有系统在处理长时间运行的应用程序时所面临...
3. **Yarn模式**:Spark on Yarn模式下,Spark作为一个客户端,提交任务给Yarn进行资源管理和调度。Yarn模式有两种提交模式:Yarn Cluster模式和Yarn Client模式。Cluster模式下,Driver运行在Application Master...