本节主要分析Master 的主要功能。
Master主要分为两块. 1. Master leader的选举。2.Master对work,application,deriver的管理
首先看Master是怎么启动的
调用了 actorSystem.actorOf()创建了Master Actor对象
def main(argStrings: Array[String]) { val conf = new SparkConf val args = new MasterArguments(argStrings, conf) val (actorSystem, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf) actorSystem.awaitTermination() } def startSystemAndActor(host: String, port: Int, webUiPort: Int, conf: SparkConf) : (ActorSystem, Int, Int) = { val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf) val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), actorName) val timeout = AkkaUtils.askTimeout(conf) val respFuture = actor.ask(RequestWebUIPort)(timeout) val resp = Await.result(respFuture, timeout).asInstanceOf[WebUIPortResponse] (actorSystem, boundPort, resp.webUIBoundPort) }
接下来Master运行prestart函数,在这个函数中创建了web UI service,schedule了CheckForWorkerTimeOut的任务,创建了persistenceEngine和leaderElectionAgent。
persistenceEngine是用来做master recover的,leaderElectionAgent是用来选举master的leader的。
override def preStart() { logInfo("Starting Spark master at " + masterUrl) // Listen for remote client disconnection events, since they don't go through Akka's watch() context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) webUi.start() masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort.get context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) masterMetricsSystem.registerSource(masterSource) masterMetricsSystem.start() applicationMetricsSystem.start() persistenceEngine = RECOVERY_MODE match { case "ZOOKEEPER" => logInfo("Persisting recovery state to ZooKeeper") new ZooKeeperPersistenceEngine(SerializationExtension(context.system), conf) case "FILESYSTEM" => logInfo("Persisting recovery state to directory: " + RECOVERY_DIR) new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system)) case _ => new BlackHolePersistenceEngine() } leaderElectionAgent = RECOVERY_MODE match { case "ZOOKEEPER" => context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl, conf)) case _ => context.actorOf(Props(classOf[MonarchyLeaderAgent], self)) }
接下来以ZooKeeperLeaderElectionAgent和ZooKeeperPersistenceEngine为例。
SparkZooKeeperSession是用来与zookeeper交互的,如create path,getData等
checkLeader函数中拿到master的列表并以第一个作为leader,然后添加了一个watcher来监控 leaderFile,当leaderFile被删除的时候,将会重新选举leader。然后会调用updateLeadershipStatus通知master,leader已经选举出来。
def checkLeader() { val masters = zk.getChildren(WORKING_DIR).toList val leader = masters.sorted.head val leaderFile = WORKING_DIR + "/" + leader // Setup a watch for the current leader. zk.exists(leaderFile, watcher) try { leaderUrl = new String(zk.getData(leaderFile)) } catch { // A NoNodeException may be thrown if old leader died since the start of this method call. // This is fine -- just check again, since we're guaranteed to see the new values. case e: KeeperException.NoNodeException => logInfo("Leader disappeared while reading it -- finding next leader") checkLeader() return } // Synchronization used to ensure no interleaving between the creation of a new session and the // checking of a leader, which could cause us to delete our real leader file erroneously. synchronized { val isLeader = myLeaderFile == leaderFile if (!isLeader && leaderUrl == masterUrl) { // We found a different master file pointing to this process. // This can happen in the following two cases: // (1) The master process was restarted on the same node. // (2) The ZK server died between creating the file and returning the name of the file. // For this case, we will end up creating a second file, and MUST explicitly delete the // first one, since our ZK session is still open. // Note that this deletion will cause a NodeDeleted event to be fired so we check again for // leader changes. assert(leaderFile < myLeaderFile) logWarning("Cleaning up old ZK master election file that points to this master.") zk.delete(leaderFile) } else { updateLeadershipStatus(isLeader) } } }
ZooKeeperPersistenceEngine将Application,Driver和worker的信息记录到zookeeper中,用于leader的master当掉时,恢复。
下面是master处理的消息,主要是leader的消息,对worker,application,deriver的管理
override def receive = { case ElectedLeader => { val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData() state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) RecoveryState.ALIVE else RecoveryState.RECOVERING logInfo("I have been elected leader! New state: " + state) if (state == RecoveryState.RECOVERING) { beginRecovery(storedApps, storedDrivers, storedWorkers) context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() } } } case RevokedLeadership => { logError("Leadership has been revoked -- master shutting down.") System.exit(0) } case RegisterWorker(id, workerHost, workerPort, cores, memory, workerWebUiPort, publicAddress) => { logInfo("Registering worker %s:%d with %d cores, %s RAM".format( workerHost, workerPort, cores, Utils.megabytesToString(memory))) if (state == RecoveryState.STANDBY) { // ignore, don't send response } else if (idToWorker.contains(id)) { sender ! RegisterWorkerFailed("Duplicate worker ID") } else { val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, sender, workerWebUiPort, publicAddress) registerWorker(worker) persistenceEngine.addWorker(worker) sender ! RegisteredWorker(masterUrl, masterWebUiUrl) schedule() } } case RequestSubmitDriver(description) => { if (state != RecoveryState.ALIVE) { val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state." sender ! SubmitDriverResponse(false, None, msg) } else { logInfo("Driver submitted " + description.command.mainClass) val driver = createDriver(description) persistenceEngine.addDriver(driver) waitingDrivers += driver drivers.add(driver) schedule() // TODO: It might be good to instead have the submission client poll the master to determine // the current status of the driver. For now it's simply "fire and forget". sender ! SubmitDriverResponse(true, Some(driver.id), s"Driver successfully submitted as ${driver.id}") } } case RequestKillDriver(driverId) => { if (state != RecoveryState.ALIVE) { val msg = s"Can only kill drivers in ALIVE state. Current state: $state." sender ! KillDriverResponse(driverId, success = false, msg) } else { logInfo("Asked to kill driver " + driverId) val driver = drivers.find(_.id == driverId) driver match { case Some(d) => if (waitingDrivers.contains(d)) { waitingDrivers -= d self ! DriverStateChanged(driverId, DriverState.KILLED, None) } else { // We just notify the worker to kill the driver here. The final bookkeeping occurs // on the return path when the worker submits a state change back to the master // to notify it that the driver was successfully killed. d.worker.foreach { w => w.actor ! KillDriver(driverId) } } // TODO: It would be nice for this to be a synchronous response val msg = s"Kill request for $driverId submitted" logInfo(msg) sender ! KillDriverResponse(driverId, success = true, msg) case None => val msg = s"Driver $driverId has already finished or does not exist" logWarning(msg) sender ! KillDriverResponse(driverId, success = false, msg) } } } case RequestDriverStatus(driverId) => { (drivers ++ completedDrivers).find(_.id == driverId) match { case Some(driver) => sender ! DriverStatusResponse(found = true, Some(driver.state), driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception) case None => sender ! DriverStatusResponse(found = false, None, None, None, None) } } case RegisterApplication(description) => { if (state == RecoveryState.STANDBY) { // ignore, don't send response } else { logInfo("Registering app " + description.name) val app = createApplication(description, sender) registerApplication(app) logInfo("Registered app " + description.name + " with ID " + app.id) persistenceEngine.addApplication(app) sender ! RegisteredApplication(app.id, masterUrl) schedule() } } case ExecutorStateChanged(appId, execId, state, message, exitStatus) => { val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId)) execOption match { case Some(exec) => { exec.state = state exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus) if (ExecutorState.isFinished(state)) { val appInfo = idToApp(appId) // Remove this executor from the worker and app logInfo("Removing executor " + exec.fullId + " because it is " + state) appInfo.removeExecutor(exec) exec.worker.removeExecutor(exec) // Only retry certain number of times so we don't go into an infinite loop. if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) { schedule() } else { logError("Application %s with ID %s failed %d times, removing it".format( appInfo.desc.name, appInfo.id, appInfo.retryCount)) removeApplication(appInfo, ApplicationState.FAILED) } } } case None => logWarning("Got status update for unknown executor " + appId + "/" + execId) } } case DriverStateChanged(driverId, state, exception) => { state match { case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED => removeDriver(driverId, state, exception) case _ => throw new Exception(s"Received unexpected state update for driver $driverId: $state") } } case Heartbeat(workerId) => { idToWorker.get(workerId) match { case Some(workerInfo) => workerInfo.lastHeartbeat = System.currentTimeMillis() case None => logWarning("Got heartbeat from unregistered worker " + workerId) } } case MasterChangeAcknowledged(appId) => { idToApp.get(appId) match { case Some(app) => logInfo("Application has been re-registered: " + appId) app.state = ApplicationState.WAITING case None => logWarning("Master change ack from unknown app: " + appId) } if (canCompleteRecovery) { completeRecovery() } } case WorkerSchedulerStateResponse(workerId, executors, driverIds) => { idToWorker.get(workerId) match { case Some(worker) => logInfo("Worker has been re-registered: " + workerId) worker.state = WorkerState.ALIVE val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined) for (exec <- validExecutors) { val app = idToApp.get(exec.appId).get val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId)) worker.addExecutor(execInfo) execInfo.copyState(exec) } for (driverId <- driverIds) { drivers.find(_.id == driverId).foreach { driver => driver.worker = Some(worker) driver.state = DriverState.RUNNING worker.drivers(driverId) = driver } } case None => logWarning("Scheduler state from unknown worker: " + workerId) } if (canCompleteRecovery) { completeRecovery() } }
相关推荐
《Spark源码分析3——驱动器、主节点与执行器之间的连接》 在Spark的分布式计算框架中,驱动器(Driver)、主节点(Master)和执行器(Executor)是核心组件,它们之间的通信和协作构成了Spark作业执行的基础。本文将深入...
"learning-spark-examples-master"这一项目主要涵盖了使用Apache Spark进行数据处理和分析的实例代码,旨在帮助学习者深入理解Spark的核心功能和使用方式。Spark作为一个快速、通用且可扩展的大数据处理框架,以其...
5. **Spark Streaming**:Spark Streaming提供了微批处理的方式处理实时流数据,它能够处理来自各种数据源(如Kafka、Flume等)的连续数据流。 6. **MLlib**:Spark的机器学习库MLlib包含了多种机器学习算法,如...
总之,"dr-elephant-master-spark2.0.zip"是Spark开发者和性能优化人员的宝贵资源,通过学习和研究其中的源码,不仅可以提升对Spark作业执行过程的理解,还能掌握一套实用的性能分析工具,为优化Spark应用程序提供...
Spark源码解读迷你 RDD、Spark Submit、Job、Runtime、Scheduler、Spark Storage、Shuffle、Standlone算法、Spark On yarn。。。
Hive on Spark源码分析 Hive on Spark 源码分析是指将 Hive 默认的执行引擎 MapReduce 换成 Spark 或者 Tez,以满足实际场景中的需求。本文将对 Hive on Spark 的源码进行深入分析,涵盖其基本原理、运行模式、Hive...
使用JimuReport时,首先需要下载解压"JimuReport-master.zip"文件,其中包含了项目源码和相关文档。开发者可以通过IDE导入项目,按照官方文档的指引进行环境配置和运行,然后在Web端进行报表设计。设计完成后,将...
在深入探讨Apache Spark源码之前,我们先了解一些基础知识。Apache Spark是一个用于大规模数据处理的开源集群计算系统,它提供了统一的框架来处理批处理、流处理以及机器学习等多种场景。Spark的核心组件是`...
### Apache Spark源码走读之3 -- Task运行期之函数调用关系分析 #### 概述 Apache Spark作为一款高效的大数据处理框架,在其内部有着复杂的任务调度与执行机制。本文将深入探讨Spark中Task执行期间的具体流程以及...
7. **版本兼容性**:这个插件特别强调对Hadoop 2.x版本的支持,意味着它可以充分利用Hadoop 2.x引入的新功能和优化,如YARN的资源调度和数据处理框架如Spark、Flink等的集成。 8. **增强的开发体验**:除了基本功能...
2. **Twitter API**:Spark Streaming与Twitter的API集成,可以实时抓取和分析Twitter上的数据流,如推文、话题趋势等,适用于社交媒体分析、情感分析、事件检测等多种应用场景。 3. **Scala 2.10**:这是Spark ...
7. **源码解析**:书中可能涵盖了Spark源码的解析,如任务调度、内存管理、shuffle过程等,帮助读者深入理解Spark的内部工作原理。 8. **aas-master实例**:这个文件夹很可能是包含了一系列的Spark应用实例,覆盖了...
Spark采用Master-Worker模式,由一个Driver进程和多个Executor进程组成。Driver负责任务的提交和管理,Executor则执行实际的计算任务。源码中可以看到Driver如何启动,如何与Master通信,以及Executor如何接收并执行...
在这个电影推荐系统中,Spark被用来处理和分析爬取到的大量电影数据,比如用户行为、电影信息等。Spark的DataFrame和Dataset API使得数据操作更加高效便捷,同时,它的MLlib库提供了协同过滤等推荐算法,用于构建...
- **技术栈**:熙讯可能使用的技术栈包括但不限于Java、Python、JavaScript、SQL数据库、分布式计算框架(如Hadoop或Spark)、前端框架(如React或Vue.js)等。 - **服务与应用**:熙讯可能提供的服务可能涵盖数据...
### Spark源码解析与应用实例 #### 一、Spark源码概述 Spark 是一款开源的大规模数据处理框架,因其高效性和灵活性,在大数据处理领域占据着举足轻重的地位。了解Spark源码不仅可以帮助开发者深入理解其内部机制,...
5. **GraphX**:GraphX是Spark的图处理框架,用于构建和分析图形数据。它提供了一种抽象来表示图和顶点属性,并提供了高效的图算法。 在"LearningApacheSpark-master.zip"中,你可以找到Spark项目的源代码,这对于...
通过对"search-framework-master"的源码分析,开发者可以学习到如何在实际项目中构建高效的全文搜索引擎,理解其内部的工作流程,同时也可以借鉴其优化策略,提升自身在程序开发和搜索引擎领域的专业技能。...
2. **大数据处理**: BigDataView-master可能集成了大数据处理框架,如Hadoop或Spark,用于预处理和分析大量数据。理解如何从大数据源中提取信息,并进行清洗、聚合等操作是使用模板的前提。 3. **可视化库**: 模板...
3. **Spark架构**:Spark采用Master-Worker模式,由Driver程序负责任务调度,Executor在Worker节点上执行任务。SparkContext是Spark应用程序的入口,负责与Cluster Manager通信,获取资源并创建RDD。 4. **Spark...