Excutor主要分为两部分,一是ExecutorBackend,二是Executor。ExecutorBackend用来接收信息,调用Executor执行task。我们以CoarseGrainedExecutorBackend为例介绍Excutor。
worker会调用java命令启动CoarseGrainedExecutorBackend。在run函数中创建了CoarseGrainedExecutorBackend和WorkerWatcher两个actor。WorkerWatcher用来监控worker的状态。
def main(args: Array[String]) { args.length match { case x if x < 4 => System.err.println( // Worker url is used in spark standalone mode to enforce fate-sharing with worker "Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> " + "<cores> [<workerUrl>]") System.exit(1) case 4 => run(args(0), args(1), args(2), args(3).toInt, None) case x if x > 4 => run(args(0), args(1), args(2), args(3).toInt, Some(args(4))) } } private[spark] object CoarseGrainedExecutorBackend { def run(driverUrl: String, executorId: String, hostname: String, cores: Int, workerUrl: Option[String]) { // Debug code Utils.checkHost(hostname) // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor // before getting started with all our system properties, etc val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0, indestructible = true, conf = new SparkConf) // set it val sparkHostPort = hostname + ":" + boundPort actorSystem.actorOf( Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores), name = "Executor") workerUrl.foreach{ url => actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher") } actorSystem.awaitTermination() }
在CoarseGrainedExecutorBackend的preStart方法中创建了driver的actorRef,并发送给driver RegisterExecutor这个消息
override def preStart() { logInfo("Connecting to driver: " + driverUrl) driver = context.actorSelection(driverUrl) driver ! RegisterExecutor(executorId, hostPort, cores) context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) }
下面是CoarseGrainedExecutorBackend需要处理的消息,主要是RegisteredExecutor,LaunchTask以及statusUpdate这几类。
override def receive = { case RegisteredExecutor(sparkProperties) => logInfo("Successfully registered with driver") // Make this host instead of hostPort ? executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties) case RegisterExecutorFailed(message) => logError("Slave registration failed: " + message) System.exit(1) case LaunchTask(taskDesc) => logInfo("Got assigned task " + taskDesc.taskId) if (executor == null) { logError("Received LaunchTask command but executor was null") System.exit(1) } else { executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask) } case KillTask(taskId, _) => if (executor == null) { logError("Received KillTask command but executor was null") System.exit(1) } else { executor.killTask(taskId) } case x: DisassociatedEvent => logError(s"Driver $x disassociated! Shutting down.") System.exit(1) case StopExecutor => logInfo("Driver commanded a shutdown") context.stop(self) context.system.shutdown() } override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { driver ! StatusUpdate(executorId, taskId, state, data) } }
下面是Excutor中的run函数,它先调用ser.deserialize函数反序列化task,然后调用task.run运行task(task分为两类,分别是finalTask和shuffleTask).如果结果数据大于akkaFrameSize的话,将blockID发送回去,如果小于的话,直接将结果发回
override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () => val startTime = System.currentTimeMillis() SparkEnv.set(env) Thread.currentThread.setContextClassLoader(replClassLoader) val ser = SparkEnv.get.closureSerializer.newInstance() logInfo("Running task ID " + taskId) execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) var attemptedTask: Option[Task[Any]] = None var taskStart: Long = 0 def gcTime = ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum val startGCTime = gcTime try { SparkEnv.set(env) Accumulators.clear() val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) updateDependencies(taskFiles, taskJars) task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) // If this task has been killed before we deserialized it, let's quit now. Otherwise, // continue executing the task. if (killed) { // Throw an exception rather than returning, because returning within a try{} block // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl // exception will be caught by the catch block, leading to an incorrect ExceptionFailure // for the task. throw TaskKilledException } attemptedTask = Some(task) logDebug("Task " + taskId +"'s epoch is " + task.epoch) env.mapOutputTracker.updateEpoch(task.epoch) // Run the actual task and measure its runtime. taskStart = System.currentTimeMillis() val value = task.run(taskId.toInt) val taskFinish = System.currentTimeMillis() // If the task has been killed, let's fail it. if (task.killed) { throw TaskKilledException } val resultSer = SparkEnv.get.serializer.newInstance() val beforeSerialization = System.currentTimeMillis() val valueBytes = resultSer.serialize(value) val afterSerialization = System.currentTimeMillis() for (m <- task.metrics) { m.hostname = Utils.localHostName() m.executorDeserializeTime = (taskStart - startTime).toInt m.executorRunTime = (taskFinish - taskStart).toInt m.jvmGCTime = gcTime - startGCTime m.resultSerializationTime = (afterSerialization - beforeSerialization).toInt } val accumUpdates = Accumulators.values val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.getOrElse(null)) val serializedDirectResult = ser.serialize(directResult) logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit) val serializedResult = { if (serializedDirectResult.limit >= akkaFrameSize - 1024) { logInfo("Storing result for " + taskId + " in local BlockManager") val blockId = TaskResultBlockId(taskId) env.blockManager.putBytes( blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER) ser.serialize(new IndirectTaskResult[Any](blockId)) } else { logInfo("Sending result for " + taskId + " directly to driver") serializedDirectResult } } execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult) logInfo("Finished task ID " + taskId) } catch { case ffe: FetchFailedException => { val reason = ffe.toTaskEndReason execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) } case TaskKilledException => { logInfo("Executor killed task " + taskId) execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled)) } case t: Throwable => { val serviceTime = (System.currentTimeMillis() - taskStart).toInt val metrics = attemptedTask.flatMap(t => t.metrics) for (m <- metrics) { m.executorRunTime = serviceTime m.jvmGCTime = gcTime - startGCTime } val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics) execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) // TODO: Should we exit the whole executor here? On the one hand, the failed task may // have left some weird state around depending on when the exception was thrown, but on // the other hand, maybe we could detect that when future tasks fail and exit then. logError("Exception in task ID " + taskId, t) //System.exit(1) } } finally { // TODO: Unregister shuffle memory only for ResultTask val shuffleMemoryMap = env.shuffleMemoryMap shuffleMemoryMap.synchronized { shuffleMemoryMap.remove(Thread.currentThread().getId) } runningTasks.remove(taskId) } } }
相关推荐
《Spark源码分析3——驱动器、主节点与执行器之间的连接》 在Spark的分布式计算框架中,驱动器(Driver)、主节点(Master)和执行器(Executor)是核心组件,它们之间的通信和协作构成了Spark作业执行的基础。本文将深入...
9. **Python和R支持**:对于Python和R的API也进行了增强,包括新增函数、改进的API设计以及更全面的文档,使得数据科学家可以更方便地使用Spark进行数据分析。 10. **社区贡献**:Spark 2.2.0还包括了大量的社区...
spark源码:spark-master.zip。方便不能登录GitHub的小伙伴下载。如果实在需要留言,可以私下给。
本压缩包“spark--bin-hadoop3-without-hive.tgz”提供了Spark二进制版本,针对Hadoop 3.1.3进行了编译和打包,这意味着它已经与Hadoop 3.x兼容,但不包含Hive组件。在CentOS 8操作系统上,这个版本的Spark已经被...
5. 测试安装:通过运行`spark-shell`或`pyspark`启动Spark交互式Shell,如果一切正常,你应该能够开始编写和执行Spark程序了。 四、使用实践 1. 数据读取:使用`SparkSession`对象的`read`方法,可以从HDFS加载...
1. **解析用户代码**:Driver会分析Spark程序中的transformations(转换操作)和actions(行动操作)。Transformations创建新的RDD,而actions触发实际的计算。Actions是Job的起点,因为它们会触发Spark执行计算并...
spark-hive_2.11-2.3.0 spark-hive-thriftserver_2.11-2.3.0.jar log4j-2.15.0.jar slf4j-api-1.7.7.jar slf4j-log4j12-1.7.25.jar curator-client-2.4.0.jar curator-framework-2.4.0.jar curator-recipes-2.4.0....
在解压`spark-2.4.7-bin-hadoop2.6.tgz`后,您会得到一个名为`spark-2.4.7-bin-hadoop2.6`的目录,其中包括以下组件: - `bin/`:包含可执行文件,如`spark-submit`,`pyspark`,`spark-shell`等,用于启动和管理...
这个"spark-3.1.3-bin-without-hadoop.tgz"压缩包是Spark的3.1.3版本,不含Hadoop依赖的二进制发行版。这意味着在部署时,你需要自行配置Hadoop环境,或者在不依赖Hadoop的环境中运行Spark。 Spark的核心特性包括...
北风网spark课程源码spark-study-scala.rar,
Spark-3.1.2.tgz和Spark-3.1.2-bin-hadoop2.7.tgz是两个不同格式的Spark发行版,分别以tar.gz和rar压缩格式提供。 1. Spark核心概念: - RDD(弹性分布式数据集):Spark的基础数据结构,是不可变、分区的数据集合...
Spark-1.6.0-bin-hadoop2.6.tgz 是针对Linux系统的Spark安装包,包含了Spark 1.6.0版本以及与Hadoop 2.6版本兼容的构建。这个安装包为在Linux环境中搭建Spark集群提供了必要的组件和库。 **1. Spark基础知识** ...
在本场景中,我们讨论的是Spark的3.0.0版本,与Hadoop3.2相结合的二进制发行版——"spark-3.0.0-bin-hadoop3.2"。这个压缩包是为了在Windows操作系统下运行Spark而设计的,因此标签明确指出它是适用于Windows平台的...
该文件为2.4.7版本的spark包(spark-2.4.7-bin-hadoop2.7.tar.gz)
在这个特定的压缩包"spark-3.1.3-bin-hadoop3.2.tgz"中,我们得到了Spark的3.1.3版本,它已经预编译为与Hadoop 3.2兼容。这个版本的Spark不仅提供了源码,还包含了预编译的二进制文件,使得在Linux环境下快速部署和...
在本安装包“spark-3.2.4-bin-hadoop3.2-scala2.13”中,包含了用于运行Spark的核心组件以及依赖的Hadoop版本和Scala编程语言支持。以下是对这些关键组成部分的详细解释: 1. **Spark**: Spark的核心在于它的弹性...
这个名为"spark-3.2.1-bin-hadoop2.7.tgz"的压缩包是Spark的一个特定版本,即3.2.1,与Hadoop 2.7版本兼容。在Linux环境下,这样的打包方式方便用户下载、安装和运行Spark。 Spark的核心设计理念是快速数据处理,...
这个压缩包"spark-3.2.0-bin-hadoop3.2.tgz"包含了Spark 3.2.0版本的二进制文件,以及针对Hadoop 3.2的兼容构建。 Spark的核心组件包括:Spark Core、Spark SQL、Spark Streaming、MLlib(机器学习库)和GraphX(图...
在Ubuntu里安装spark,spark-2.1.0-bin-without-hadoop该版本直接下载到本地后解压即可使用。 Apache Spark 是一种用于大数据工作负载的分布式开源处理系统。它使用内存中缓存和优化的查询执行方式,可针对任何规模...
《Spark编程核心组件:spark-assembly-1.5.2-hadoop2.6.0.jar详解》 在大数据处理领域,Spark以其高效、易用和灵活性脱颖而出,成为了许多开发者的首选框架。Spark-assembly-1.5.2-hadoop2.6.0.jar是Spark中的一个...