`

Spark源码分析5-Master

 
阅读更多

本节主要分析Master 的主要功能。

Master主要分为两块. 1. Master leader的选举。2.Master对work,application,deriver的管理

 

首先看Master是怎么启动的

调用了 actorSystem.actorOf()创建了Master Actor对象

  def main(argStrings: Array[String]) {
    val conf = new SparkConf
    val args = new MasterArguments(argStrings, conf)
    val (actorSystem, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
    actorSystem.awaitTermination()
  }

  def startSystemAndActor(host: String, port: Int, webUiPort: Int, conf: SparkConf)
      : (ActorSystem, Int, Int) =
  {
    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf)
    val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), actorName)
    val timeout = AkkaUtils.askTimeout(conf)
    val respFuture = actor.ask(RequestWebUIPort)(timeout)
    val resp = Await.result(respFuture, timeout).asInstanceOf[WebUIPortResponse]
    (actorSystem, boundPort, resp.webUIBoundPort)
  }

 接下来Master运行prestart函数,在这个函数中创建了web UI service,schedule了CheckForWorkerTimeOut的任务,创建了persistenceEngine和leaderElectionAgent。

 

persistenceEngine是用来做master recover的,leaderElectionAgent是用来选举master的leader的。

 override def preStart() {
    logInfo("Starting Spark master at " + masterUrl)
    // Listen for remote client disconnection events, since they don't go through Akka's watch()
    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
    webUi.start()
    masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort.get
    context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)

    masterMetricsSystem.registerSource(masterSource)
    masterMetricsSystem.start()
    applicationMetricsSystem.start()

    persistenceEngine = RECOVERY_MODE match {
      case "ZOOKEEPER" =>
        logInfo("Persisting recovery state to ZooKeeper")
        new ZooKeeperPersistenceEngine(SerializationExtension(context.system), conf)
      case "FILESYSTEM" =>
        logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
        new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system))
      case _ =>
        new BlackHolePersistenceEngine()
    }
    leaderElectionAgent = RECOVERY_MODE match {
        case "ZOOKEEPER" =>
          context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl, conf))
        case _ =>
          context.actorOf(Props(classOf[MonarchyLeaderAgent], self))
      }

 接下来以ZooKeeperLeaderElectionAgent和ZooKeeperPersistenceEngine为例。

SparkZooKeeperSession是用来与zookeeper交互的,如create path,getData等


  checkLeader函数中拿到master的列表并以第一个作为leader,然后添加了一个watcher来监控   leaderFile,当leaderFile被删除的时候,将会重新选举leader。然后会调用updateLeadershipStatus通知master,leader已经选举出来。

 

 def checkLeader() {
    val masters = zk.getChildren(WORKING_DIR).toList
    val leader = masters.sorted.head
    val leaderFile = WORKING_DIR + "/" + leader

    // Setup a watch for the current leader.
    zk.exists(leaderFile, watcher)

    try {
      leaderUrl = new String(zk.getData(leaderFile))
    } catch {
      // A NoNodeException may be thrown if old leader died since the start of this method call.
      // This is fine -- just check again, since we're guaranteed to see the new values.
      case e: KeeperException.NoNodeException =>
        logInfo("Leader disappeared while reading it -- finding next leader")
        checkLeader()
        return
    }

    // Synchronization used to ensure no interleaving between the creation of a new session and the
    // checking of a leader, which could cause us to delete our real leader file erroneously.
    synchronized {
      val isLeader = myLeaderFile == leaderFile
      if (!isLeader && leaderUrl == masterUrl) {
        // We found a different master file pointing to this process.
        // This can happen in the following two cases:
        // (1) The master process was restarted on the same node.
        // (2) The ZK server died between creating the file and returning the name of the file.
        //     For this case, we will end up creating a second file, and MUST explicitly delete the
        //     first one, since our ZK session is still open.
        // Note that this deletion will cause a NodeDeleted event to be fired so we check again for
        // leader changes.
        assert(leaderFile < myLeaderFile)
        logWarning("Cleaning up old ZK master election file that points to this master.")
        zk.delete(leaderFile)
      } else {
        updateLeadershipStatus(isLeader)
      }
    }
  }

 

   ZooKeeperPersistenceEngine将Application,Driver和worker的信息记录到zookeeper中,用于leader的master当掉时,恢复。

 

  下面是master处理的消息,主要是leader的消息,对worker,application,deriver的管理

 

 override def receive = {
    case ElectedLeader => {
      val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData()
      state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty)
        RecoveryState.ALIVE
      else
        RecoveryState.RECOVERING
      logInfo("I have been elected leader! New state: " + state)
      if (state == RecoveryState.RECOVERING) {
        beginRecovery(storedApps, storedDrivers, storedWorkers)
        context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() }
      }
    }

    case RevokedLeadership => {
      logError("Leadership has been revoked -- master shutting down.")
      System.exit(0)
    }

    case RegisterWorker(id, workerHost, workerPort, cores, memory, workerWebUiPort, publicAddress) => {
      logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
        workerHost, workerPort, cores, Utils.megabytesToString(memory)))
      if (state == RecoveryState.STANDBY) {
        // ignore, don't send response
      } else if (idToWorker.contains(id)) {
        sender ! RegisterWorkerFailed("Duplicate worker ID")
      } else {
        val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
          sender, workerWebUiPort, publicAddress)
        registerWorker(worker)
        persistenceEngine.addWorker(worker)
        sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
        schedule()
      }
    }

    case RequestSubmitDriver(description) => {
      if (state != RecoveryState.ALIVE) {
        val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state."
        sender ! SubmitDriverResponse(false, None, msg)
      } else {
        logInfo("Driver submitted " + description.command.mainClass)
        val driver = createDriver(description)
        persistenceEngine.addDriver(driver)
        waitingDrivers += driver
        drivers.add(driver)
        schedule()

        // TODO: It might be good to instead have the submission client poll the master to determine
        //       the current status of the driver. For now it's simply "fire and forget".

        sender ! SubmitDriverResponse(true, Some(driver.id),
          s"Driver successfully submitted as ${driver.id}")
      }
    }

    case RequestKillDriver(driverId) => {
      if (state != RecoveryState.ALIVE) {
        val msg = s"Can only kill drivers in ALIVE state. Current state: $state."
        sender ! KillDriverResponse(driverId, success = false, msg)
      } else {
        logInfo("Asked to kill driver " + driverId)
        val driver = drivers.find(_.id == driverId)
        driver match {
          case Some(d) =>
            if (waitingDrivers.contains(d)) {
              waitingDrivers -= d
              self ! DriverStateChanged(driverId, DriverState.KILLED, None)
            }
            else {
              // We just notify the worker to kill the driver here. The final bookkeeping occurs
              // on the return path when the worker submits a state change back to the master
              // to notify it that the driver was successfully killed.
              d.worker.foreach { w =>
                w.actor ! KillDriver(driverId)
              }
            }
            // TODO: It would be nice for this to be a synchronous response
            val msg = s"Kill request for $driverId submitted"
            logInfo(msg)
            sender ! KillDriverResponse(driverId, success = true, msg)
          case None =>
            val msg = s"Driver $driverId has already finished or does not exist"
            logWarning(msg)
            sender ! KillDriverResponse(driverId, success = false, msg)
        }
      }
    }

    case RequestDriverStatus(driverId) => {
      (drivers ++ completedDrivers).find(_.id == driverId) match {
        case Some(driver) =>
          sender ! DriverStatusResponse(found = true, Some(driver.state),
            driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception)
        case None =>
          sender ! DriverStatusResponse(found = false, None, None, None, None)
      }
    }

    case RegisterApplication(description) => {
      if (state == RecoveryState.STANDBY) {
        // ignore, don't send response
      } else {
        logInfo("Registering app " + description.name)
        val app = createApplication(description, sender)
        registerApplication(app)
        logInfo("Registered app " + description.name + " with ID " + app.id)
        persistenceEngine.addApplication(app)
        sender ! RegisteredApplication(app.id, masterUrl)
        schedule()
      }
    }

    case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
      val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
      execOption match {
        case Some(exec) => {
          exec.state = state
          exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus)
          if (ExecutorState.isFinished(state)) {
            val appInfo = idToApp(appId)
            // Remove this executor from the worker and app
            logInfo("Removing executor " + exec.fullId + " because it is " + state)
            appInfo.removeExecutor(exec)
            exec.worker.removeExecutor(exec)

            // Only retry certain number of times so we don't go into an infinite loop.
            if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
              schedule()
            } else {
              logError("Application %s with ID %s failed %d times, removing it".format(
                appInfo.desc.name, appInfo.id, appInfo.retryCount))
              removeApplication(appInfo, ApplicationState.FAILED)
            }
          }
        }
        case None =>
          logWarning("Got status update for unknown executor " + appId + "/" + execId)
      }
    }

    case DriverStateChanged(driverId, state, exception) => {
      state match {
        case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
          removeDriver(driverId, state, exception)
        case _ =>
          throw new Exception(s"Received unexpected state update for driver $driverId: $state")
      }
    }

    case Heartbeat(workerId) => {
      idToWorker.get(workerId) match {
        case Some(workerInfo) =>
          workerInfo.lastHeartbeat = System.currentTimeMillis()
        case None =>
          logWarning("Got heartbeat from unregistered worker " + workerId)
      }
    }

    case MasterChangeAcknowledged(appId) => {
      idToApp.get(appId) match {
        case Some(app) =>
          logInfo("Application has been re-registered: " + appId)
          app.state = ApplicationState.WAITING
        case None =>
          logWarning("Master change ack from unknown app: " + appId)
      }

      if (canCompleteRecovery) { completeRecovery() }
    }

    case WorkerSchedulerStateResponse(workerId, executors, driverIds) => {
      idToWorker.get(workerId) match {
        case Some(worker) =>
          logInfo("Worker has been re-registered: " + workerId)
          worker.state = WorkerState.ALIVE

          val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)
          for (exec <- validExecutors) {
            val app = idToApp.get(exec.appId).get
            val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))
            worker.addExecutor(execInfo)
            execInfo.copyState(exec)
          }

          for (driverId <- driverIds) {
            drivers.find(_.id == driverId).foreach { driver =>
              driver.worker = Some(worker)
              driver.state = DriverState.RUNNING
              worker.drivers(driverId) = driver
            }
          }
        case None =>
          logWarning("Scheduler state from unknown worker: " + workerId)
      }

      if (canCompleteRecovery) { completeRecovery() }
    }

 

 

  • 大小: 12.9 KB
分享到:
评论

相关推荐

    Spark源码分析3-The connect between driver,master and excutor

    《Spark源码分析3——驱动器、主节点与执行器之间的连接》 在Spark的分布式计算框架中,驱动器(Driver)、主节点(Master)和执行器(Executor)是核心组件,它们之间的通信和协作构成了Spark作业执行的基础。本文将深入...

    learning-spark-examples-master

    "learning-spark-examples-master"这一项目主要涵盖了使用Apache Spark进行数据处理和分析的实例代码,旨在帮助学习者深入理解Spark的核心功能和使用方式。Spark作为一个快速、通用且可扩展的大数据处理框架,以其...

    spark-2.4.7-bin-hadoop2.6.tgz

    5. **Spark Streaming**:Spark Streaming提供了微批处理的方式处理实时流数据,它能够处理来自各种数据源(如Kafka、Flume等)的连续数据流。 6. **MLlib**:Spark的机器学习库MLlib包含了多种机器学习算法,如...

    dr-elephant-master-spark2.0.zip

    总之,"dr-elephant-master-spark2.0.zip"是Spark开发者和性能优化人员的宝贵资源,通过学习和研究其中的源码,不仅可以提升对Spark作业执行过程的理解,还能掌握一套实用的性能分析工具,为优化Spark应用程序提供...

    Spark源码深度解读

    Spark源码解读迷你 RDD、Spark Submit、Job、Runtime、Scheduler、Spark Storage、Shuffle、Standlone算法、Spark On yarn。。。

    Hive on Spark源码分析DOC

    Hive on Spark源码分析 Hive on Spark 源码分析是指将 Hive 默认的执行引擎 MapReduce 换成 Spark 或者 Tez,以满足实际场景中的需求。本文将对 Hive on Spark 的源码进行深入分析,涵盖其基本原理、运行模式、Hive...

    JimuReport-master.zip

    使用JimuReport时,首先需要下载解压"JimuReport-master.zip"文件,其中包含了项目源码和相关文档。开发者可以通过IDE导入项目,按照官方文档的指引进行环境配置和运行,然后在Web端进行报表设计。设计完成后,将...

    Apache Spark源码读解

    在深入探讨Apache Spark源码之前,我们先了解一些基础知识。Apache Spark是一个用于大规模数据处理的开源集群计算系统,它提供了统一的框架来处理批处理、流处理以及机器学习等多种场景。Spark的核心组件是`...

    Apache Spark源码走读之3 -- Task运行期之函数调用关系分析

    ### Apache Spark源码走读之3 -- Task运行期之函数调用关系分析 #### 概述 Apache Spark作为一款高效的大数据处理框架,在其内部有着复杂的任务调度与执行机制。本文将深入探讨Spark中Task执行期间的具体流程以及...

    hadoop-2.x-eclipse-plugin-master.zip

    7. **版本兼容性**:这个插件特别强调对Hadoop 2.x版本的支持,意味着它可以充分利用Hadoop 2.x引入的新功能和优化,如YARN的资源调度和数据处理框架如Spark、Flink等的集成。 8. **增强的开发体验**:除了基本功能...

    spark-streaming-twitter_2.10-0.9.0-incubating.zip

    2. **Twitter API**:Spark Streaming与Twitter的API集成,可以实时抓取和分析Twitter上的数据流,如推文、话题趋势等,适用于社交媒体分析、情感分析、事件检测等多种应用场景。 3. **Scala 2.10**:这是Spark ...

    spark高级分析数据源码

    7. **源码解析**:书中可能涵盖了Spark源码的解析,如任务调度、内存管理、shuffle过程等,帮助读者深入理解Spark的内部工作原理。 8. **aas-master实例**:这个文件夹很可能是包含了一系列的Spark应用实例,覆盖了...

    大数据Spark源码

    Spark采用Master-Worker模式,由一个Driver进程和多个Executor进程组成。Driver负责任务的提交和管理,Executor则执行实际的计算任务。源码中可以看到Driver如何启动,如何与Master通信,以及Executor如何接收并执行...

    Python基于Spark的电影推荐系统.zip

    在这个电影推荐系统中,Spark被用来处理和分析爬取到的大量电影数据,比如用户行为、电影信息等。Spark的DataFrame和Dataset API使得数据操作更加高效便捷,同时,它的MLlib库提供了协同过滤等推荐算法,用于构建...

    xixun-test-master.zipxixun-test-master.zip

    - **技术栈**:熙讯可能使用的技术栈包括但不限于Java、Python、JavaScript、SQL数据库、分布式计算框架(如Hadoop或Spark)、前端框架(如React或Vue.js)等。 - **服务与应用**:熙讯可能提供的服务可能涵盖数据...

    Spark源码....

    ### Spark源码解析与应用实例 #### 一、Spark源码概述 Spark 是一款开源的大规模数据处理框架,因其高效性和灵活性,在大数据处理领域占据着举足轻重的地位。了解Spark源码不仅可以帮助开发者深入理解其内部机制,...

    LearningApacheSpark-master.rar

    5. **GraphX**:GraphX是Spark的图处理框架,用于构建和分析图形数据。它提供了一种抽象来表示图和顶点属性,并提供了高效的图算法。 在"LearningApacheSpark-master.zip"中,你可以找到Spark项目的源代码,这对于...

    search-framework-master.zip

    通过对"search-framework-master"的源码分析,开发者可以学习到如何在实际项目中构建高效的全文搜索引擎,理解其内部的工作流程,同时也可以借鉴其优化策略,提升自身在程序开发和搜索引擎领域的专业技能。...

    大屏模板-BigDataView-master.7z

    2. **大数据处理**: BigDataView-master可能集成了大数据处理框架,如Hadoop或Spark,用于预处理和分析大量数据。理解如何从大数据源中提取信息,并进行清洗、聚合等操作是使用模板的前提。 3. **可视化库**: 模板...

    深入理解Spark核心思想与源码分析

    3. **Spark架构**:Spark采用Master-Worker模式,由Driver程序负责任务调度,Executor在Worker节点上执行任务。SparkContext是Spark应用程序的入口,负责与Cluster Manager通信,获取资源并创建RDD。 4. **Spark...

Global site tag (gtag.js) - Google Analytics