本节主要分析Master 的主要功能。
Master主要分为两块. 1. Master leader的选举。2.Master对work,application,deriver的管理
调用了 actorSystem.actorOf()创建了Master Actor对象
def main(argStrings: Array[String]) { val conf = new SparkConf val args = new MasterArguments(argStrings, conf) val (actorSystem, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf) actorSystem.awaitTermination() } def startSystemAndActor(host: String, port: Int, webUiPort: Int, conf: SparkConf) : (ActorSystem, Int, Int) = { val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf) val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), actorName) val timeout = AkkaUtils.askTimeout(conf) val respFuture = actor.ask(RequestWebUIPort)(timeout) val resp = Await.result(respFuture, timeout).asInstanceOf[WebUIPortResponse] (actorSystem, boundPort, resp.webUIBoundPort) }
接下来Master运行prestart函数,在这个函数中创建了web UI service,schedule了CheckForWorkerTimeOut的任务,创建了persistenceEngine和leaderElectionAgent。
persistenceEngine是用来做master recover的,leaderElectionAgent是用来选举master的leader的。
override def preStart() { logInfo("Starting Spark master at " + masterUrl) // Listen for remote client disconnection events, since they don't go through Akka's watch() context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) webUi.start() masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort.get context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) masterMetricsSystem.registerSource(masterSource) masterMetricsSystem.start() applicationMetricsSystem.start() persistenceEngine = RECOVERY_MODE match { case "ZOOKEEPER" => logInfo("Persisting recovery state to ZooKeeper") new ZooKeeperPersistenceEngine(SerializationExtension(context.system), conf) case "FILESYSTEM" => logInfo("Persisting recovery state to directory: " + RECOVERY_DIR) new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system)) case _ => new BlackHolePersistenceEngine() } leaderElectionAgent = RECOVERY_MODE match { case "ZOOKEEPER" => context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl, conf)) case _ => context.actorOf(Props(classOf[MonarchyLeaderAgent], self)) }
SparkZooKeeperSession是用来与zookeeper交互的,如create path,getData等
checkLeader函数中拿到master的列表并以第一个作为leader,然后添加了一个watcher来监控 leaderFile,当leaderFile被删除的时候,将会重新选举leader。然后会调用updateLeadershipStatus通知master,leader已经选举出来。
def checkLeader() { val masters = zk.getChildren(WORKING_DIR).toList val leader = masters.sorted.head val leaderFile = WORKING_DIR + "/" + leader // Setup a watch for the current leader. zk.exists(leaderFile, watcher) try { leaderUrl = new String(zk.getData(leaderFile)) } catch { // A NoNodeException may be thrown if old leader died since the start of this method call. // This is fine -- just check again, since we're guaranteed to see the new values. case e: KeeperException.NoNodeException => logInfo("Leader disappeared while reading it -- finding next leader") checkLeader() return } // Synchronization used to ensure no interleaving between the creation of a new session and the // checking of a leader, which could cause us to delete our real leader file erroneously. synchronized { val isLeader = myLeaderFile == leaderFile if (!isLeader && leaderUrl == masterUrl) { // We found a different master file pointing to this process. // This can happen in the following two cases: // (1) The master process was restarted on the same node. // (2) The ZK server died between creating the file and returning the name of the file. // For this case, we will end up creating a second file, and MUST explicitly delete the // first one, since our ZK session is still open. // Note that this deletion will cause a NodeDeleted event to be fired so we check again for // leader changes. assert(leaderFile < myLeaderFile) logWarning("Cleaning up old ZK master election file that points to this master.") zk.delete(leaderFile) } else { updateLeadershipStatus(isLeader) } } }
override def receive = { case ElectedLeader => { val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData() state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) RecoveryState.ALIVE else RecoveryState.RECOVERING logInfo("I have been elected leader! New state: " + state) if (state == RecoveryState.RECOVERING) { beginRecovery(storedApps, storedDrivers, storedWorkers) context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() } } } case RevokedLeadership => { logError("Leadership has been revoked -- master shutting down.") System.exit(0) } case RegisterWorker(id, workerHost, workerPort, cores, memory, workerWebUiPort, publicAddress) => { logInfo("Registering worker %s:%d with %d cores, %s RAM".format( workerHost, workerPort, cores, Utils.megabytesToString(memory))) if (state == RecoveryState.STANDBY) { // ignore, don't send response } else if (idToWorker.contains(id)) { sender ! RegisterWorkerFailed("Duplicate worker ID") } else { val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, sender, workerWebUiPort, publicAddress) registerWorker(worker) persistenceEngine.addWorker(worker) sender ! RegisteredWorker(masterUrl, masterWebUiUrl) schedule() } } case RequestSubmitDriver(description) => { if (state != RecoveryState.ALIVE) { val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state." sender ! SubmitDriverResponse(false, None, msg) } else { logInfo("Driver submitted " + description.command.mainClass) val driver = createDriver(description) persistenceEngine.addDriver(driver) waitingDrivers += driver drivers.add(driver) schedule() // TODO: It might be good to instead have the submission client poll the master to determine // the current status of the driver. For now it's simply "fire and forget". sender ! SubmitDriverResponse(true, Some(driver.id), s"Driver successfully submitted as ${driver.id}") } } case RequestKillDriver(driverId) => { if (state != RecoveryState.ALIVE) { val msg = s"Can only kill drivers in ALIVE state. Current state: $state." sender ! KillDriverResponse(driverId, success = false, msg) } else { logInfo("Asked to kill driver " + driverId) val driver = drivers.find(_.id == driverId) driver match { case Some(d) => if (waitingDrivers.contains(d)) { waitingDrivers -= d self ! DriverStateChanged(driverId, DriverState.KILLED, None) } else { // We just notify the worker to kill the driver here. The final bookkeeping occurs // on the return path when the worker submits a state change back to the master // to notify it that the driver was successfully killed. d.worker.foreach { w => w.actor ! KillDriver(driverId) } } // TODO: It would be nice for this to be a synchronous response val msg = s"Kill request for $driverId submitted" logInfo(msg) sender ! KillDriverResponse(driverId, success = true, msg) case None => val msg = s"Driver $driverId has already finished or does not exist" logWarning(msg) sender ! KillDriverResponse(driverId, success = false, msg) } } } case RequestDriverStatus(driverId) => { (drivers ++ completedDrivers).find(_.id == driverId) match { case Some(driver) => sender ! DriverStatusResponse(found = true, Some(driver.state), driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception) case None => sender ! DriverStatusResponse(found = false, None, None, None, None) } } case RegisterApplication(description) => { if (state == RecoveryState.STANDBY) { // ignore, don't send response } else { logInfo("Registering app " + description.name) val app = createApplication(description, sender) registerApplication(app) logInfo("Registered app " + description.name + " with ID " + app.id) persistenceEngine.addApplication(app) sender ! RegisteredApplication(app.id, masterUrl) schedule() } } case ExecutorStateChanged(appId, execId, state, message, exitStatus) => { val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId)) execOption match { case Some(exec) => { exec.state = state exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus) if (ExecutorState.isFinished(state)) { val appInfo = idToApp(appId) // Remove this executor from the worker and app logInfo("Removing executor " + exec.fullId + " because it is " + state) appInfo.removeExecutor(exec) exec.worker.removeExecutor(exec) // Only retry certain number of times so we don't go into an infinite loop. if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) { schedule() } else { logError("Application %s with ID %s failed %d times, removing it".format( appInfo.desc.name, appInfo.id, appInfo.retryCount)) removeApplication(appInfo, ApplicationState.FAILED) } } } case None => logWarning("Got status update for unknown executor " + appId + "/" + execId) } } case DriverStateChanged(driverId, state, exception) => { state match { case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED => removeDriver(driverId, state, exception) case _ => throw new Exception(s"Received unexpected state update for driver $driverId: $state") } } case Heartbeat(workerId) => { idToWorker.get(workerId) match { case Some(workerInfo) => workerInfo.lastHeartbeat = System.currentTimeMillis() case None => logWarning("Got heartbeat from unregistered worker " + workerId) } } case MasterChangeAcknowledged(appId) => { idToApp.get(appId) match { case Some(app) => logInfo("Application has been re-registered: " + appId) app.state = ApplicationState.WAITING case None => logWarning("Master change ack from unknown app: " + appId) } if (canCompleteRecovery) { completeRecovery() } } case WorkerSchedulerStateResponse(workerId, executors, driverIds) => { idToWorker.get(workerId) match { case Some(worker) => logInfo("Worker has been re-registered: " + workerId) worker.state = WorkerState.ALIVE val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined) for (exec <- validExecutors) { val app = idToApp.get(exec.appId).get val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId)) worker.addExecutor(execInfo) execInfo.copyState(exec) } for (driverId <- driverIds) { drivers.find(_.id == driverId).foreach { driver => driver.worker = Some(worker) driver.state = DriverState.RUNNING worker.drivers(driverId) = driver } } case None => logWarning("Scheduler state from unknown worker: " + workerId) } if (canCompleteRecovery) { completeRecovery() } }
