Worker 主要负责管理excutor和driver,并向master报告excutor和driver的状态
Worker的启动
类似于master,创建了worker的actor
private[spark] object Worker { def main(argStrings: Array[String]) { val args = new WorkerArguments(argStrings) val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores, args.memory, args.masters, args.workDir) actorSystem.awaitTermination() } def startSystemAndActor(host: String, port: Int, webUiPort: Int, cores: Int, memory: Int, masterUrls: Array[String], workDir: String, workerNumber: Option[Int] = None) : (ActorSystem, Int) = { // The LocalSparkCluster runs multiple local sparkWorkerX actor systems val conf = new SparkConf val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("") val actorName = "Worker" val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf) actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory, masterUrls, systemName, actorName, workDir, conf), name = actorName) (actorSystem, boundPort) }
worker actor 启动的时候调用PreStart函数,创建了webUI system,调用metricsSystem注册了workerSource,并向master注册worker
override def preStart() { assert(!registered) logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format( host, port, cores, Utils.megabytesToString(memory))) sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse(".")) logInfo("Spark home: " + sparkHome) createWorkDir() webUi = new WorkerWebUI(this, workDir, Some(webUiPort)) context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) webUi.start() registerWithMaster() metricsSystem.registerSource(workerSource) metricsSystem.start() }
worker需要处理的事件
override def receive = { case RegisteredWorker(masterUrl, masterWebUiUrl) => logInfo("Successfully registered with master " + masterUrl) registered = true changeMaster(masterUrl, masterWebUiUrl) context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat) case SendHeartbeat => masterLock.synchronized { if (connected) { master ! Heartbeat(workerId) } } case MasterChanged(masterUrl, masterWebUiUrl) => logInfo("Master has changed, new master is at " + masterUrl) changeMaster(masterUrl, masterWebUiUrl) val execs = executors.values. map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state)) sender ! WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq) case Heartbeat => logInfo(s"Received heartbeat from driver ${sender.path}") case RegisterWorkerFailed(message) => if (!registered) { logError("Worker registration failed: " + message) System.exit(1) } case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, execSparkHome_) => if (masterUrl != activeMasterUrl) { logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") } else { logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) // TODO (pwendell): We shuld make sparkHome an Option[String] in // ApplicationDescription to be more explicit about this. val effectiveSparkHome = Option(execSparkHome_).getOrElse(sparkHome.getAbsolutePath) val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, self, workerId, host, new File(effectiveSparkHome), workDir, akkaUrl, ExecutorState.RUNNING) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ memoryUsed += memory_ masterLock.synchronized { master ! ExecutorStateChanged(appId, execId, manager.state, None, None) } } case ExecutorStateChanged(appId, execId, state, message, exitStatus) => masterLock.synchronized { master ! ExecutorStateChanged(appId, execId, state, message, exitStatus) } val fullId = appId + "/" + execId if (ExecutorState.isFinished(state)) { val executor = executors(fullId) logInfo("Executor " + fullId + " finished with state " + state + message.map(" message " + _).getOrElse("") + exitStatus.map(" exitStatus " + _).getOrElse("")) executors -= fullId finishedExecutors(fullId) = executor coresUsed -= executor.cores memoryUsed -= executor.memory } case KillExecutor(masterUrl, appId, execId) => if (masterUrl != activeMasterUrl) { logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor " + execId) } else { val fullId = appId + "/" + execId executors.get(fullId) match { case Some(executor) => logInfo("Asked to kill executor " + fullId) executor.kill() case None => logInfo("Asked to kill unknown executor " + fullId) } } case LaunchDriver(driverId, driverDesc) => { logInfo(s"Asked to launch driver $driverId") val driver = new DriverRunner(driverId, workDir, sparkHome, driverDesc, self, akkaUrl) drivers(driverId) = driver driver.start() coresUsed += driverDesc.cores memoryUsed += driverDesc.mem } case KillDriver(driverId) => { logInfo(s"Asked to kill driver $driverId") drivers.get(driverId) match { case Some(runner) => runner.kill() case None => logError(s"Asked to kill unknown driver $driverId") } } case DriverStateChanged(driverId, state, exception) => { state match { case DriverState.ERROR => logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}") case DriverState.FINISHED => logInfo(s"Driver $driverId exited successfully") case DriverState.KILLED => logInfo(s"Driver $driverId was killed by user") } masterLock.synchronized { master ! DriverStateChanged(driverId, state, exception) } val driver = drivers.remove(driverId).get finishedDrivers(driverId) = driver memoryUsed -= driver.driverDesc.mem coresUsed -= driver.driverDesc.cores } case x: DisassociatedEvent if x.remoteAddress == masterAddress => logInfo(s"$x Disassociated !") masterDisconnected() case RequestWorkerState => { sender ! WorkerStateResponse(host, port, workerId, executors.values.toList, finishedExecutors.values.toList, drivers.values.toList, finishedDrivers.values.toList, activeMasterUrl, cores, memory, coresUsed, memoryUsed, activeMasterWebUiUrl) } }
相关推荐
《Spark源码分析3——驱动器、主节点与执行器之间的连接》 在Spark的分布式计算框架中,驱动器(Driver)、主节点(Master)和执行器(Executor)是核心组件,它们之间的通信和协作构成了Spark作业执行的基础。本文将深入...
在深入探讨Apache Spark源码之前,我们先了解一些基础知识。Apache Spark是一个用于大规模数据处理的开源集群计算系统,它提供了统一的框架来处理批处理、流处理以及机器学习等多种场景。Spark的核心组件是`...
源码分析可以了解DataFrame如何与RDD互操作,以及 Catalyst优化器的工作原理。 总结,Spark源码的学习是一个深入理解大数据处理流程和技术细节的过程。通过源码,我们可以了解到Spark如何高效地调度任务,如何处理...
8. Spark源码分析:书中可能涵盖了Spark源码的深度分析,帮助读者理解其内部工作机制,如调度系统、存储层次、容错机制等,这对于优化Spark应用和解决性能问题非常有价值。 9. 性能优化:Spark的性能优化是学习的...
《深入理解Spark核心思想与源码分析》这本书深入剖析了Apache Spark这一大数据处理框架的核心设计理念与实现机制。Spark作为分布式计算的重要工具,以其高效的内存计算、易用的API和广泛的应用场景深受业界青睐。...
《深入理解Spark核心思想与源码分析》是耿嘉安撰写的一本专著,全面而深入地探讨了Apache Spark这一大数据处理框架的核心理念和技术细节。这本书不仅覆盖了Spark的基础概念,还深入到了源码层面,为读者揭示了Spark...
### Spark Core 源码分析之部署方式 #### 前言 Apache Spark 是一个用于大规模数据处理的开源计算系统,其核心模块 Spark Core 提供了基础的并行计算框架和分布式部署方式的支持。本文主要关注 Spark 的部署方式,...
### Spark源码解析:Master与Worker机制 #### Spark概述及特点 Spark是一个高效的数据处理框架,它由加州大学伯克利分校的AMP实验室研发。该框架支持多种编程语言(包括Java、Scala、Python和R等),使开发者可以...
源码分析对于深入理解Spark的工作原理和进行二次开发至关重要。 首先,Java目录是Spark源码的核心部分,它包含了Spark的大部分核心组件和功能的实现。以下是一些主要的子目录及其相关的知识点: 1. **core**:这是...
源码分析可以帮助我们理解这些算法如何并行化和优化,以及如何利用Spark的内存计算优势。 GraphX提供了图处理API,允许用户定义图的顶点和边,以及对图进行各种操作,如PageRank和最短路径算法。源码分析将揭示...
本次源码分析基于**Spark 1.2版本**,并聚焦于**standalone模式**,即独立部署模式。此模式下,Spark服务完全自包含,无需依赖其他资源管理系统。它是Spark YARN和Mesos模式的基础。 #### 四、Master与Worker的启动...
为了深入了解 Spark 的内部机制,我们需要对其内核源码进行深入分析。 Application/App 在 Spark 中,Application 指的是用户编写的 Spark 应用程序/代码,包含了 Driver 功能代码和分布在集群中多个节点上运行的 ...
- 启动 Spark Master 和 Worker 节点。 - 验证集群状态。 **示例: 计算 Pi** 在 CentOS 上使用 Spark 进行 Pi 的计算是一个经典的入门示例。可以通过编写简单的程序来实现,该程序使用 Monte Carlo 方法估计 Pi ...
六、Spark源码分析 1. RDD创建:深入源码,我们可以看到如何通过`sparkContext.parallelize`或`sparkContext.textFile`创建RDD。 2. Action与Transformation:研究`count`、`map`等操作的实现,理解它们如何触发...
本文旨在通过对Apache Spark源码的初步解读,帮助读者建立起对Spark核心概念和技术细节的理解。 #### 二、基本概念 ##### 1. RDD(Resilient Distributed Dataset) - **定义**:弹性分布式数据集(Resilient ...
对于深入理解Spark on Yarn的工作机制,需要对其实现源码进行详细分析。这主要包括以下几个方面: - **ApplicationMaster的实现**:了解ApplicationMaster是如何与ResourceManager和NodeManager通信的。 - **资源...
《深入理解Spark:核心思想与源码分析》是一本针对Apache Spark进行深度解析的专业书籍,旨在帮助读者透彻掌握Spark的核心理念与实现机制。Spark作为大数据处理领域的重要框架,其高性能、易用性和弹性分布式计算的...
Spark源码分析 各个组件介绍 后面补充。。。。 StandAlone模式 在StandAlone模式的start-all的shell启动脚本下,在当前机器执行了JAVA_HOME/bin/java -cp ….Master和在配置的slave的机器中执行 JAVA_HOME/bin/java ...