`

Spark源码分析10-Schedualer

 
阅读更多

Spark很重要的一部分是Task的schedual,以下是具体的流程图。



 SchedulableBuilder分为两种,分别是FairSchedulableBuilder和FIFOSchedulableBuilder。主要是pool的getSortedTaskSetQueue方法中调用不同的taskSetSchedulingAlgorithm去排序schedulableQueue

 override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
    var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
    val sortedSchedulableQueue = schedulableQueue.sortWith(taskSetSchedulingAlgorithm.comparator)
    for (schedulable <- sortedSchedulableQueue) {
      sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue()
    }
    sortedTaskSetQueue
  }

 Schedual另一个重要的类就是SchedulerBackend。它的子类有四类分别为MesosSchedulerBackendCoarseMesosSchedulerBackendSimrSchedulerBackendSparkDeploySchedulerBackendMesosSchedulerBackendCoarseMesosSchedulerBackend用于mesos的部署方式,SimrSchedulerBackend用于hadoop部署方式,SparkDeploySchedulerBackend用于纯spark的部署方式。

 

 

SparkDeploySchedulerBackendCoarseGrainedSchedulerBackend的子类,它的start方法中调用了父类的start方法,并且创建了一个AppClent实例,调用client.start()方法注册到masterMaster会通知workcommand启动CoarseGrainedExecutorBackend

override def start() {
    super.start()

    // The endpoint for executors to talk to us
    val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
      conf.get("spark.driver.host"),  conf.get("spark.driver.port"),
      CoarseGrainedSchedulerBackend.ACTOR_NAME)
    val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
    val command = Command(
      "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
    val sparkHome = sc.getSparkHome().getOrElse(null)
    val appDesc = new ApplicationDescription(appName, maxCores, sc.executorMemory, command, sparkHome,
        "http://" + sc.ui.appUIAddress)

    client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
    client.start()
  }

 以下是clientActor处理的消息

override def receive = {
      case RegisteredApplication(appId_, masterUrl) =>
        appId = appId_
        registered = true
        changeMaster(masterUrl)
        listener.connected(appId)

      case ApplicationRemoved(message) =>
        logError("Master removed our application: %s; stopping client".format(message))
        markDisconnected()
        context.stop(self)

      case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>
        val fullId = appId + "/" + id
        logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort, cores))
        listener.executorAdded(fullId, workerId, hostPort, cores, memory)

      case ExecutorUpdated(id, state, message, exitStatus) =>
        val fullId = appId + "/" + id
        val messageText = message.map(s => " (" + s + ")").getOrElse("")
        logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
        if (ExecutorState.isFinished(state)) {
          listener.executorRemoved(fullId, message.getOrElse(""), exitStatus)
        }

      case MasterChanged(masterUrl, masterWebUiUrl) =>
        logInfo("Master has changed, new master is at " + masterUrl)
        changeMaster(masterUrl)
        alreadyDisconnected = false
        sender ! MasterChangeAcknowledged(appId)

      case DisassociatedEvent(_, address, _) if address == masterAddress =>
        logWarning(s"Connection to $address failed; waiting for master to reconnect...")
        markDisconnected()

      case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) =>
        logWarning(s"Could not connect to $address: $cause")

      case StopAppClient =>
        markDead()
        sender ! true
        context.stop(self)
}

 CoarseGrainedSchedulerBackendstart方法中创建了DriverActor,并调用preStart方法, schedule了一个reviveInterval,每个reviveInterval发送一个ReviveOffersReviveOffers launch task

   override def preStart() {
      // Listen for remote client disconnection events, since they don't go through Akka's watch()
      context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])

      // Periodically revive offers to allow delay scheduling to work
      val reviveInterval = conf.getLong("spark.scheduler.revive.interval", 1000)
      import context.dispatcher
      context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)
}

 DriverActor中处理的事件

def receive = {
      case RegisterExecutor(executorId, hostPort, cores) =>
        Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
        if (executorActor.contains(executorId)) {
          sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
        } else {
          logInfo("Registered executor: " + sender + " with ID " + executorId)
          sender ! RegisteredExecutor(sparkProperties)
          executorActor(executorId) = sender
          executorHost(executorId) = Utils.parseHostPort(hostPort)._1
          freeCores(executorId) = cores
          executorAddress(executorId) = sender.path.address
          addressToExecutorId(sender.path.address) = executorId
          totalCoreCount.addAndGet(cores)
          makeOffers()
        }

      case StatusUpdate(executorId, taskId, state, data) =>
        scheduler.statusUpdate(taskId, state, data.value)
        if (TaskState.isFinished(state)) {
          if (executorActor.contains(executorId)) {
            freeCores(executorId) += 1
            makeOffers(executorId)
          } else {
            // Ignoring the update since we don't know about the executor.
            val msg = "Ignored task status update (%d state %s) from unknown executor %s with ID %s"
            logWarning(msg.format(taskId, state, sender, executorId))
          }
        }

      case ReviveOffers =>
        makeOffers()

      case KillTask(taskId, executorId) =>
        executorActor(executorId) ! KillTask(taskId, executorId)

      case StopDriver =>
        sender ! true
        context.stop(self)

      case StopExecutors =>
        logInfo("Asking each executor to shut down")
        for (executor <- executorActor.values) {
          executor ! StopExecutor
        }
        sender ! true

      case RemoveExecutor(executorId, reason) =>
        removeExecutor(executorId, reason)
        sender ! true

      case DisassociatedEvent(_, address, _) =>
        addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disassociated"))

    }

 MesosSchedulerBackend的部署方式

 在start()方法中启动了MesosSchedulerDriver

override def start() {
    synchronized {
      classLoader = Thread.currentThread.getContextClassLoader

      new Thread("MesosSchedulerBackend driver") {
        setDaemon(true)
        override def run() {
          val scheduler = MesosSchedulerBackend.this
          val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
          driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
          try {
            val ret = driver.run()
            logInfo("driver.run() returned with code " + ret)
          } catch {
            case e: Exception => logError("driver.run() failed", e)
          }
        }
      }.start()

      waitForRegister()
    }
  }

 TaskSchedulerImpl调用submitTasks()时会调用MesosSchedulerBackendreviveOffers()。通知MesosSchedulerDriver reviveOffersMesosSchedulerDriver 会被调用resourceOffers方法,resourceOffers方法中创建了mesosTasks,并且调用launchTasks方法提交taskmesos。bellow is resourceOffers function

 override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
    val oldClassLoader = setClassLoader()
    try {
      synchronized {
        // Build a big list of the offerable workers, and remember their indices so that we can
        // figure out which Offer to reply to for each worker
        val offerableIndices = new ArrayBuffer[Int]
        val offerableWorkers = new ArrayBuffer[WorkerOffer]

        def enoughMemory(o: Offer) = {
          val mem = getResource(o.getResourcesList, "mem")
          val slaveId = o.getSlaveId.getValue
          mem >= sc.executorMemory || slaveIdsWithExecutors.contains(slaveId)
        }

        for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
          offerableIndices += index
          offerableWorkers += new WorkerOffer(
            offer.getSlaveId.getValue,
            offer.getHostname,
            getResource(offer.getResourcesList, "cpus").toInt)
        }

        // Call into the ClusterScheduler
        val taskLists = scheduler.resourceOffers(offerableWorkers)

        // Build a list of Mesos tasks for each slave
        val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]())
        for ((taskList, index) <- taskLists.zipWithIndex) {
          if (!taskList.isEmpty) {
            val offerNum = offerableIndices(index)
            val slaveId = offers(offerNum).getSlaveId.getValue
            slaveIdsWithExecutors += slaveId
            mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size)
            for (taskDesc <- taskList) {
              taskIdToSlaveId(taskDesc.taskId) = slaveId
              mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId))
            }
          }
        }

        // Reply to the offers
        val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
        for (i <- 0 until offers.size) {
          d.launchTasks(offers(i).getId, mesosTasks(i), filters)
        }
      }
    } finally {
      restoreClassLoader(oldClassLoader)
    }
  }

CoarseMesosSchedulerBackend 

CoarseMesosSchedulerBackend也是mesos部署的一种,它和MesosSchedulerBackend区别在于CoarseMesosSchedulerBackend在mesos上运行一个CoarseGrainedExecutorBackend,然后CoarseMesosSchedulerBackend提交task给CoarseGrainedExecutorBackend,让CoarseGrainedExecutorBackend运行。MesosSchedulerBackend是直接将task转化为

 Mesos的task,提交到mesos上运行。下面是createCommand的代码,其他和MesosSchedulerBackend类似。

 def createCommand(offer: Offer, numCores: Int): CommandInfo = {
    val environment = Environment.newBuilder()
    sc.executorEnvs.foreach { case (key, value) =>
      environment.addVariables(Environment.Variable.newBuilder()
        .setName(key)
        .setValue(value)
        .build())
    }
    val command = CommandInfo.newBuilder()
      .setEnvironment(environment)
    val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
      conf.get("spark.driver.host"),
      conf.get("spark.driver.port"),
      CoarseGrainedSchedulerBackend.ACTOR_NAME)
      val uri = conf.get("spark.executor.uri", null)
    if (uri == null) {
      val runScript = new File(sparkHome, "./bin/spark-class").getCanonicalPath
      command.setValue(
        "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d".format(
          runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
    } else {
      // Grab everything to the first '.'. We'll use that and '*' to
      // glob the directory "correctly".
      val basename = uri.split('/').last.split('.').head
      command.setValue(
        "cd %s*; ./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d"
          .format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
      command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
    }
    command.build()
  }

 SimrSchedulerBackend

目前还没有看懂

 

 

 

 

  • 大小: 72.3 KB
分享到:
评论

相关推荐

    spark2.2.0源码------

    10. **社区贡献**:Spark 2.2.0还包括了大量的社区贡献,涵盖了各种功能增强和bug修复,体现了开源社区的力量。 通过深入研究Spark 2.2.0的源码,开发者可以更好地理解其内部机制,定制化自己的大数据处理流程,...

    spark源码:spark-master.zip

    spark源码:spark-master.zip。方便不能登录GitHub的小伙伴下载。如果实在需要留言,可以私下给。

    spark--bin-hadoop3-without-hive.tgz

    本压缩包“spark--bin-hadoop3-without-hive.tgz”提供了Spark二进制版本,针对Hadoop 3.1.3进行了编译和打包,这意味着它已经与Hadoop 3.x兼容,但不包含Hive组件。在CentOS 8操作系统上,这个版本的Spark已经被...

    spark2.1.0-bin-hadoop2.7

    5. 测试安装:通过运行`spark-shell`或`pyspark`启动Spark交互式Shell,如果一切正常,你应该能够开始编写和执行Spark程序了。 四、使用实践 1. 数据读取:使用`SparkSession`对象的`read`方法,可以从HDFS加载...

    Spark源码分析2-Driver generate jobs and launch task

    1. **解析用户代码**:Driver会分析Spark程序中的transformations(转换操作)和actions(行动操作)。Transformations创建新的RDD,而actions触发实际的计算。Actions是Job的起点,因为它们会触发Spark执行计算并...

    编译的spark-hive_2.11-2.3.0和 spark-hive-thriftserver_2.11-2.3.0.jar

    spark-hive_2.11-2.3.0 spark-hive-thriftserver_2.11-2.3.0.jar log4j-2.15.0.jar slf4j-api-1.7.7.jar slf4j-log4j12-1.7.25.jar curator-client-2.4.0.jar curator-framework-2.4.0.jar curator-recipes-2.4.0....

    Spark源码分析3-The connect between driver,master and excutor

    《Spark源码分析3——驱动器、主节点与执行器之间的连接》 在Spark的分布式计算框架中,驱动器(Driver)、主节点(Master)和执行器(Executor)是核心组件,它们之间的通信和协作构成了Spark作业执行的基础。本文将深入...

    spark-3.1.3-bin-without-hadoop.tgz

    这个"spark-3.1.3-bin-without-hadoop.tgz"压缩包是Spark的3.1.3版本,不含Hadoop依赖的二进制发行版。这意味着在部署时,你需要自行配置Hadoop环境,或者在不依赖Hadoop的环境中运行Spark。 Spark的核心特性包括...

    spark-2.4.7-bin-hadoop2.6.tgz

    在解压`spark-2.4.7-bin-hadoop2.6.tgz`后,您会得到一个名为`spark-2.4.7-bin-hadoop2.6`的目录,其中包括以下组件: - `bin/`:包含可执行文件,如`spark-submit`,`pyspark`,`spark-shell`等,用于启动和管理...

    spark-streaming-kafka-0-10_2.11-2.4.0-cdh6.1.1.jar

    spark-streaming-kafka-0-10_2.11-2.4.0-cdh6.1.1.jar

    北风网spark课程源码spark-study-scala.rar

    北风网spark课程源码spark-study-scala.rar,

    spark-3.1.2.tgz & spark-3.1.2-bin-hadoop2.7.tgz.rar

    Spark-3.1.2.tgz和Spark-3.1.2-bin-hadoop2.7.tgz是两个不同格式的Spark发行版,分别以tar.gz和rar压缩格式提供。 1. Spark核心概念: - RDD(弹性分布式数据集):Spark的基础数据结构,是不可变、分区的数据集合...

    spark-1.6.0-bin-hadoop2.6.tgz

    Spark-1.6.0-bin-hadoop2.6.tgz 是针对Linux系统的Spark安装包,包含了Spark 1.6.0版本以及与Hadoop 2.6版本兼容的构建。这个安装包为在Linux环境中搭建Spark集群提供了必要的组件和库。 **1. Spark基础知识** ...

    spark-3.0.0-bin-hadoop3.2

    在本场景中,我们讨论的是Spark的3.0.0版本,与Hadoop3.2相结合的二进制发行版——"spark-3.0.0-bin-hadoop3.2"。这个压缩包是为了在Windows操作系统下运行Spark而设计的,因此标签明确指出它是适用于Windows平台的...

    spark-2.4.7-bin-hadoop2.7.tar.gz

    该文件为2.4.7版本的spark包(spark-2.4.7-bin-hadoop2.7.tar.gz)

    spark-3.2.4-bin-hadoop3.2-scala2.13 安装包

    在本安装包“spark-3.2.4-bin-hadoop3.2-scala2.13”中,包含了用于运行Spark的核心组件以及依赖的Hadoop版本和Scala编程语言支持。以下是对这些关键组成部分的详细解释: 1. **Spark**: Spark的核心在于它的弹性...

    spark-3.1.3-bin-hadoop3.2.tgz

    在这个特定的压缩包"spark-3.1.3-bin-hadoop3.2.tgz"中,我们得到了Spark的3.1.3版本,它已经预编译为与Hadoop 3.2兼容。这个版本的Spark不仅提供了源码,还包含了预编译的二进制文件,使得在Linux环境下快速部署和...

    spark-3.2.1-bin-hadoop2.7.tgz

    这个名为"spark-3.2.1-bin-hadoop2.7.tgz"的压缩包是Spark的一个特定版本,即3.2.1,与Hadoop 2.7版本兼容。在Linux环境下,这样的打包方式方便用户下载、安装和运行Spark。 Spark的核心设计理念是快速数据处理,...

    spark-3.2.0-bin-hadoop3.2.tgz

    这个压缩包"spark-3.2.0-bin-hadoop3.2.tgz"包含了Spark 3.2.0版本的二进制文件,以及针对Hadoop 3.2的兼容构建。 Spark的核心组件包括:Spark Core、Spark SQL、Spark Streaming、MLlib(机器学习库)和GraphX(图...

    spark-2.1.0-bin-without-hadoop版本的压缩包,直接下载到本地解压后即可使用

    在Ubuntu里安装spark,spark-2.1.0-bin-without-hadoop该版本直接下载到本地后解压即可使用。 Apache Spark 是一种用于大数据工作负载的分布式开源处理系统。它使用内存中缓存和优化的查询执行方式,可针对任何规模...

Global site tag (gtag.js) - Google Analytics