在Spark Standalone集群模式下,Driver运行在客户端,所谓的客户端就是提交代码的那台机器。在Standalone模式下,角色包括:
Driver(Client,这里的Client对应到Spark的代码中是AppClient吗?)如下图所示,Driver位于提交代码的那台机器(提交代码的机器是Client),
Master
Worker(Worker是一个进程,它其中会有多个Executor)
Executor
为什么说Driver是在提交代码的那台机器上呢?
SparkConf类中有个关于Driver的参数设置,如下代码在SparkContext的构造方法中
// Set Spark driver host and port system properties conf.setIfMissing("spark.driver.host", Utils.localHostName()) ////host是本地,意思是可以设置的?? conf.setIfMissing("spark.driver.port", "0")
时序:
1.Client(Driver)向Master提交Application----通过spark-sumbit提交,指定master=spark:///
2. Master收到Driver的Application请求,申请资源(实际上是Worker的Executor),启动StandaloneExecutorBackend,StandaloneExecutorBackend是Worker跟外界通信的代表
3.图中的第3步代码中是否有体现?
4.Executor启动后,Driver就可以分配Task(launchTask)
5.作业执行过程中,Worker向Driver汇报任务的执行情况
用户的程序分成两部分,一个是初始化SparkContext,定义针对数据的各种函数操作实现业务逻辑(对应不同的RDD),当SparkContext通过runJob提交后,接下来的工作由Driver完成?
Driver是作业的驱动器(或者主进程),负责Job的解析,生成Stage,并调度Task到Executor上执行,其核心和精髓是DAGScheduler和TaskScheduler,通过AKKA消息驱动的方式完成
不是很理解!!这些工作都是SparkContext来完成的,SparkContext中有DAGScheduler和TaskScheduler,为什么会分成两部分?
Driver分为两部分:
1是SparkContext以及围绕这SparkContext的SparkConf和SparkEnv
2是DAGScheduler,TaskScheduler以及部署模块(部署模块主要是TaskScheduler使用)
Driver通过launchTask发送任务给Executor?Executor内部以线程池多线程的方式并行的运行任务(实际顺序是SparkContext.runJob->DagScheduler.runJob->DAGScheduler.submitJob->TaskScheduler.runbJob->TaskSetManager给LocalActor或者CoarseGrainedActor发送lanchTask消息,CoarseGrainedActor受到消息后调用Executor的lauchTask方法)
SparkConf
SparkConf一旦传递给SparkContext后就不能再修改,因为SparkContext构造时使用了SparkConf的clone方法。
SparkEnv
1.LiveListenerBus
里面有个org.apache.spark.scheduler.LiveListenerBus用于广播SparkListenerEvents到SparkListeners,SparkListenerEvents都定义在SparkListener.scala中
/** * Asynchronously passes SparkListenerEvents to registered SparkListeners. * * Until start() is called, all posted events are only buffered. Only after this listener bus * has started will events be actually propagated to all attached listeners. This listener bus * is stopped when it receives a SparkListenerShutdown event, which is posted using stop(). */
2. SparkEnv类似集群全局变量,在Driver中有,在Worker的Executors中也有,而Worker的Executors有多个,那么每个Executor的每个线程都会访问SparkEnv变量,Spark使用ThreadLocal来保存SparkEnv变量。因此,SparkEnv是一个重量级的东西。
CoarseGrainedSchedulerBackend
1. 在org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend其中创建了DriverActor
// TODO (prashant) send conf instead of properties driverActor = actorSystem.actorOf( Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
2.CoarseGrainedSchedulerBackend有一个子类org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend
关注它的start方法,其中的一句:
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
这个命令用于在Standalone模式下,通过CoarseGrainedExecutorBackend的命令方式启动Executor?
override def start() { super.start() // The endpoint for executors to talk to us val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( SparkEnv.driverActorSystemName, conf.get("spark.driver.host"), conf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{APP_ID}}", "{{WORKER_URL}}") val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions") .map(Utils.splitCommandString).getOrElse(Seq.empty) val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp => cp.split(java.io.File.pathSeparator) } val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath").toSeq.flatMap { cp => cp.split(java.io.File.pathSeparator) } // Start executors with a few necessary configs for registering with the scheduler val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf) val javaOpts = sparkJavaOpts ++ extraJavaOpts ///用于启动Executor的指令? val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts) val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") ////将command封装到appDesc类中 val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, appUIAddress, sc.eventLogDir) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) ////App的Client, ///启动ClientActor client.start() waitForRegistration() }
3.AppClient类
def start() { // Just launch an actor; it will call back into the listener. actor = actorSystem.actorOf(Props(new ClientActor)) }
Client
org.apache.spark.deploy.Client(是一个object) org.apache.spark.deploy.yarn.Client(是一个object) org.apache.spark.deploy.yarn.client(这是一个私有类) org.apache.spark.deploy.client.AppClient(这是一个私有类) 这几个类都在什么集群模式下起作用,用来做什么的?
未分类:
1.除了action触发Job提交,checkpoint也会触发job提交
2.提交Job时,首先计算Stage的依赖关系,从后面往前追溯,前面
相关推荐
spark官方版本的driver-class-path不支持hdfs路径,只支持本地路径。本资源解决了这个问题,driver-class-path在cluster模式时可以支持hdfs路径,解决了cluster模式driver有大量jar依赖的问题。
Spark驱动文件 Simba_Spark_JDBC.zip Apache Commons Copyright ?2001-2015 The Apache Software Foundation Apache Commons Codec Copyright ?2002-2014 The Apache Software Foundation Apache Hadoop Common ...
在Spark大数据处理框架中,Driver的角色至关重要,它负责协调整个计算过程,生成Jobs并调度Tasks。本篇文章将深入探讨Spark Driver的工作机制,以及如何生成Jobs并启动Tasks。 首先,我们来理解Spark作业(Job)与...
在CDH集群中提交Spark作业,大家也都知道Spark的Driver和Executor之间通讯端口是随机的,Spark会随选择1024和65535(含)之间的端口,因此在集群之间不建议启用防火墙。在前面Fayson介绍了《如何指定Spark2作业中...
提交Spark作业后,Driver会将作业分解为Stage(基于shuffle划分),然后提交Task到Executor执行。Executor在内存中缓存数据,并在本地执行任务,提高整体性能。 7. Spark内存管理: Spark利用内存存储中间结果,...
在实际应用中,Inceptor Driver 4.6 可以广泛应用于大数据分析平台,如Hadoop或Spark。它能与这些框架无缝集成,提升数据分析的速度和准确性。例如,在实时流处理场景下,Inceptor Driver 4.6 可以快速响应数据流,...
spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=/usr/bin/python spark.executorEnv.PYSPARK_PYTHON=/usr/bin/python spark.executorEnv.PYSPARK_DRIVER_PYTHON=/usr/bin/python spark.eventLog.enabled true ...
《Spark最新源码与二次开发详解》 Spark作为一款开源的大数据处理框架,因其高效、易用和灵活性而备受开发者青睐。在Openfire环境中,Spark更扮演着关键的角色,为实时通讯提供支持。本教程旨在深入解析Spark的最新...
在术语定义中,Application指的是用户编写的Spark应用程序,它由运行main()函数的Driver和分布在集群中多个节点上运行的Executor组成。Driver创建SparkContext以准备运行环境,SparkContext负责与ClusterManager通信...
- Driver:Spark应用的主控节点,它执行main方法,将用户程序转化为任务并负责调度。Driver在作业执行期间的主要职责包括: - 将用户代码转化为任务(job); - 在Executor之间调度任务(task); - 监控Executor...
- `spark.yarn.driver.memoryOverhead`设置为1GB,用于预留Driver端的额外内存。 ##### 1.3 动态分配Executor 为最大化集群资源利用率,推荐启用动态资源分配功能。这样可以在作业运行过程中根据实际需求动态调整...
3. **Spark架构**:Driver、Executor、Cluster Manager的角色,以及如何通过SparkContext启动计算任务。 4. **内存管理**:Spark如何利用内存进行快速计算,包括Tachyon和Spark Shuffle的过程。 5. **容错机制**:...
在大数据处理领域,Spark 和 HBase 以及 MySQL 都扮演着重要的角色。Spark 提供了高效的数据处理能力,HBase 是一个分布式、面向列的NoSQL数据库,而 MySQL 是广泛使用的的关系型数据库。本示例将详细介绍如何使用 ...
Spark的构成主要包括Driver、Executor、Cluster Manager等组件。Driver是Spark应用程序的入口点,负责将任务分配给Executor。Executor是负责执行具体任务的组件,可能会分布在多个节点上。Cluster Manager是负责管理...
1. Spark的核心组件包括Driver、Executor和Task。Driver是作业的主进程,负责作业的调度和解析;Executor是工作节点上的工作单元,负责执行Task;Task是Spark应用程序的基本执行单元。 2. Scala是Spark的主要编程...
一、实验目的 1. 理解Spark编程思想; 2. 学会在Spark Shell中编写Scala程序; 3. 学会在Spark Shell中运行Scala程序。 二、实验环境 Windows 10 VMware Workstation Pro虚拟机 ...(二)spark运行wordcount程序
1. **Spark架构**:理解Spark集群的工作原理,包括Driver、Executor和Worker节点的角色。 2. **RDD**:学习如何创建、转换和操作RDD,以及了解其容错机制。 3. **Spark SQL**:掌握DataFrame和DataSet API,以及如何...
在部署Spark之前,需要对`conf/spark-defaults.conf`进行配置,设置诸如`spark.master`(指定运行模式和地址)、`spark.executor.instances`(执行器数量)、`spark.driver.memory`(驱动程序内存)等参数。...
在核心组件方面,Spark应用程序包括驱动程序(Driver program)和执行器(Executors)。驱动程序是一个Java程序,它通过创建SparkContext来启动应用。而执行器则是执行任务和存储数据的工作进程。在集群管理器...
在进行Spark编程之前,通常需要对Spark环境进行配置,比如设置executor-memory(执行器内存)、driver-memory(驱动程序内存)、executor-cores(执行器核心数)等参数。spark-shell是Spark提供的交互式编程环境,...