`

Spark源码分析9-Excutor

 
阅读更多

Excutor主要分为两部分,一是ExecutorBackend,二是Executor。ExecutorBackend用来接收信息,调用Executor执行task。我们以CoarseGrainedExecutorBackend为例介绍Excutor。

worker会调用java命令启动CoarseGrainedExecutorBackend。在run函数中创建了CoarseGrainedExecutorBackend和WorkerWatcher两个actor。WorkerWatcher用来监控worker的状态。

  def main(args: Array[String]) {
    args.length match {
      case x if x < 4 =>
        System.err.println(
          // Worker url is used in spark standalone mode to enforce fate-sharing with worker
          "Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> " +
          "<cores> [<workerUrl>]")
        System.exit(1)
      case 4 =>
        run(args(0), args(1), args(2), args(3).toInt, None)
      case x if x > 4 =>
        run(args(0), args(1), args(2), args(3).toInt, Some(args(4)))
    }
  }
private[spark] object CoarseGrainedExecutorBackend {
  def run(driverUrl: String, executorId: String, hostname: String, cores: Int,
          workerUrl: Option[String]) {
    // Debug code
    Utils.checkHost(hostname)

    // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
    // before getting started with all our system properties, etc
    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
      indestructible = true, conf = new SparkConf)
    // set it
    val sparkHostPort = hostname + ":" + boundPort
    actorSystem.actorOf(
      Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
      name = "Executor")
    workerUrl.foreach{ url =>
      actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
    }
    actorSystem.awaitTermination()
  }

   在CoarseGrainedExecutorBackend的preStart方法中创建了driver的actorRef,并发送给driver RegisterExecutor这个消息

  override def preStart() {
    logInfo("Connecting to driver: " + driverUrl)
    driver = context.actorSelection(driverUrl)
    driver ! RegisterExecutor(executorId, hostPort, cores)
    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
  }

 下面是CoarseGrainedExecutorBackend需要处理的消息,主要是RegisteredExecutor,LaunchTask以及statusUpdate这几类。

 override def receive = {
    case RegisteredExecutor(sparkProperties) =>
      logInfo("Successfully registered with driver")
      // Make this host instead of hostPort ?
      executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties)

    case RegisterExecutorFailed(message) =>
      logError("Slave registration failed: " + message)
      System.exit(1)

    case LaunchTask(taskDesc) =>
      logInfo("Got assigned task " + taskDesc.taskId)
      if (executor == null) {
        logError("Received LaunchTask command but executor was null")
        System.exit(1)
      } else {
        executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
      }

    case KillTask(taskId, _) =>
      if (executor == null) {
        logError("Received KillTask command but executor was null")
        System.exit(1)
      } else {
        executor.killTask(taskId)
      }

    case x: DisassociatedEvent =>
      logError(s"Driver $x disassociated! Shutting down.")
      System.exit(1)

    case StopExecutor =>
      logInfo("Driver commanded a shutdown")
      context.stop(self)
      context.system.shutdown()
  }

  override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
    driver ! StatusUpdate(executorId, taskId, state, data)
  }
}

   下面是Excutor中的run函数,它先调用ser.deserialize函数反序列化task,然后调用task.run运行task(task分为两类,分别是finalTask和shuffleTask).如果结果数据大于akkaFrameSize的话,将blockID发送回去,如果小于的话,直接将结果发回

  override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
      val startTime = System.currentTimeMillis()
      SparkEnv.set(env)
      Thread.currentThread.setContextClassLoader(replClassLoader)
      val ser = SparkEnv.get.closureSerializer.newInstance()
      logInfo("Running task ID " + taskId)
      execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
      var attemptedTask: Option[Task[Any]] = None
      var taskStart: Long = 0
      def gcTime = ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum
      val startGCTime = gcTime

      try {
        SparkEnv.set(env)
        Accumulators.clear()
        val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
        updateDependencies(taskFiles, taskJars)
        task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)

        // If this task has been killed before we deserialized it, let's quit now. Otherwise,
        // continue executing the task.
        if (killed) {
          // Throw an exception rather than returning, because returning within a try{} block
          // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
          // exception will be caught by the catch block, leading to an incorrect ExceptionFailure
          // for the task.
          throw TaskKilledException
        }

        attemptedTask = Some(task)
        logDebug("Task " + taskId +"'s epoch is " + task.epoch)
        env.mapOutputTracker.updateEpoch(task.epoch)

        // Run the actual task and measure its runtime.
        taskStart = System.currentTimeMillis()
        val value = task.run(taskId.toInt)
        val taskFinish = System.currentTimeMillis()

        // If the task has been killed, let's fail it.
        if (task.killed) {
          throw TaskKilledException
        }

        val resultSer = SparkEnv.get.serializer.newInstance()
        val beforeSerialization = System.currentTimeMillis()
        val valueBytes = resultSer.serialize(value)
        val afterSerialization = System.currentTimeMillis()

        for (m <- task.metrics) {
          m.hostname = Utils.localHostName()
          m.executorDeserializeTime = (taskStart - startTime).toInt
          m.executorRunTime = (taskFinish - taskStart).toInt
          m.jvmGCTime = gcTime - startGCTime
          m.resultSerializationTime = (afterSerialization - beforeSerialization).toInt
        }

        val accumUpdates = Accumulators.values

        val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.getOrElse(null))
        val serializedDirectResult = ser.serialize(directResult)
        logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit)
        val serializedResult = {
          if (serializedDirectResult.limit >= akkaFrameSize - 1024) {
            logInfo("Storing result for " + taskId + " in local BlockManager")
            val blockId = TaskResultBlockId(taskId)
            env.blockManager.putBytes(
              blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
            ser.serialize(new IndirectTaskResult[Any](blockId))
          } else {
            logInfo("Sending result for " + taskId + " directly to driver")
            serializedDirectResult
          }
        }

        execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
        logInfo("Finished task ID " + taskId)
      } catch {
        case ffe: FetchFailedException => {
          val reason = ffe.toTaskEndReason
          execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
        }

        case TaskKilledException => {
          logInfo("Executor killed task " + taskId)
          execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
        }

        case t: Throwable => {
          val serviceTime = (System.currentTimeMillis() - taskStart).toInt
          val metrics = attemptedTask.flatMap(t => t.metrics)
          for (m <- metrics) {
            m.executorRunTime = serviceTime
            m.jvmGCTime = gcTime - startGCTime
          }
          val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
          execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))

          // TODO: Should we exit the whole executor here? On the one hand, the failed task may
          // have left some weird state around depending on when the exception was thrown, but on
          // the other hand, maybe we could detect that when future tasks fail and exit then.
          logError("Exception in task ID " + taskId, t)
          //System.exit(1)
        }
      } finally {
        // TODO: Unregister shuffle memory only for ResultTask
        val shuffleMemoryMap = env.shuffleMemoryMap
        shuffleMemoryMap.synchronized {
          shuffleMemoryMap.remove(Thread.currentThread().getId)
        }
        runningTasks.remove(taskId)
      }
    }
  }

 

分享到:
评论

相关推荐

    Spark源码分析3-The connect between driver,master and excutor

    《Spark源码分析3——驱动器、主节点与执行器之间的连接》 在Spark的分布式计算框架中,驱动器(Driver)、主节点(Master)和执行器(Executor)是核心组件,它们之间的通信和协作构成了Spark作业执行的基础。本文将深入...

    spark2.2.0源码------

    9. **Python和R支持**:对于Python和R的API也进行了增强,包括新增函数、改进的API设计以及更全面的文档,使得数据科学家可以更方便地使用Spark进行数据分析。 10. **社区贡献**:Spark 2.2.0还包括了大量的社区...

    spark源码:spark-master.zip

    spark源码:spark-master.zip。方便不能登录GitHub的小伙伴下载。如果实在需要留言,可以私下给。

    spark--bin-hadoop3-without-hive.tgz

    本压缩包“spark--bin-hadoop3-without-hive.tgz”提供了Spark二进制版本,针对Hadoop 3.1.3进行了编译和打包,这意味着它已经与Hadoop 3.x兼容,但不包含Hive组件。在CentOS 8操作系统上,这个版本的Spark已经被...

    spark2.1.0-bin-hadoop2.7

    5. 测试安装:通过运行`spark-shell`或`pyspark`启动Spark交互式Shell,如果一切正常,你应该能够开始编写和执行Spark程序了。 四、使用实践 1. 数据读取:使用`SparkSession`对象的`read`方法,可以从HDFS加载...

    Spark源码分析2-Driver generate jobs and launch task

    1. **解析用户代码**:Driver会分析Spark程序中的transformations(转换操作)和actions(行动操作)。Transformations创建新的RDD,而actions触发实际的计算。Actions是Job的起点,因为它们会触发Spark执行计算并...

    编译的spark-hive_2.11-2.3.0和 spark-hive-thriftserver_2.11-2.3.0.jar

    spark-hive_2.11-2.3.0 spark-hive-thriftserver_2.11-2.3.0.jar log4j-2.15.0.jar slf4j-api-1.7.7.jar slf4j-log4j12-1.7.25.jar curator-client-2.4.0.jar curator-framework-2.4.0.jar curator-recipes-2.4.0....

    spark-2.4.7-bin-hadoop2.6.tgz

    在解压`spark-2.4.7-bin-hadoop2.6.tgz`后,您会得到一个名为`spark-2.4.7-bin-hadoop2.6`的目录,其中包括以下组件: - `bin/`:包含可执行文件,如`spark-submit`,`pyspark`,`spark-shell`等,用于启动和管理...

    spark-3.1.3-bin-without-hadoop.tgz

    这个"spark-3.1.3-bin-without-hadoop.tgz"压缩包是Spark的3.1.3版本,不含Hadoop依赖的二进制发行版。这意味着在部署时,你需要自行配置Hadoop环境,或者在不依赖Hadoop的环境中运行Spark。 Spark的核心特性包括...

    北风网spark课程源码spark-study-scala.rar

    北风网spark课程源码spark-study-scala.rar,

    spark-3.1.2.tgz & spark-3.1.2-bin-hadoop2.7.tgz.rar

    Spark-3.1.2.tgz和Spark-3.1.2-bin-hadoop2.7.tgz是两个不同格式的Spark发行版,分别以tar.gz和rar压缩格式提供。 1. Spark核心概念: - RDD(弹性分布式数据集):Spark的基础数据结构,是不可变、分区的数据集合...

    spark-1.6.0-bin-hadoop2.6.tgz

    Spark-1.6.0-bin-hadoop2.6.tgz 是针对Linux系统的Spark安装包,包含了Spark 1.6.0版本以及与Hadoop 2.6版本兼容的构建。这个安装包为在Linux环境中搭建Spark集群提供了必要的组件和库。 **1. Spark基础知识** ...

    spark-3.0.0-bin-hadoop3.2

    在本场景中,我们讨论的是Spark的3.0.0版本,与Hadoop3.2相结合的二进制发行版——"spark-3.0.0-bin-hadoop3.2"。这个压缩包是为了在Windows操作系统下运行Spark而设计的,因此标签明确指出它是适用于Windows平台的...

    spark-2.4.7-bin-hadoop2.7.tar.gz

    该文件为2.4.7版本的spark包(spark-2.4.7-bin-hadoop2.7.tar.gz)

    spark-3.1.3-bin-hadoop3.2.tgz

    在这个特定的压缩包"spark-3.1.3-bin-hadoop3.2.tgz"中,我们得到了Spark的3.1.3版本,它已经预编译为与Hadoop 3.2兼容。这个版本的Spark不仅提供了源码,还包含了预编译的二进制文件,使得在Linux环境下快速部署和...

    spark-3.2.4-bin-hadoop3.2-scala2.13 安装包

    在本安装包“spark-3.2.4-bin-hadoop3.2-scala2.13”中,包含了用于运行Spark的核心组件以及依赖的Hadoop版本和Scala编程语言支持。以下是对这些关键组成部分的详细解释: 1. **Spark**: Spark的核心在于它的弹性...

    spark-3.2.1-bin-hadoop2.7.tgz

    这个名为"spark-3.2.1-bin-hadoop2.7.tgz"的压缩包是Spark的一个特定版本,即3.2.1,与Hadoop 2.7版本兼容。在Linux环境下,这样的打包方式方便用户下载、安装和运行Spark。 Spark的核心设计理念是快速数据处理,...

    spark-3.2.0-bin-hadoop3.2.tgz

    这个压缩包"spark-3.2.0-bin-hadoop3.2.tgz"包含了Spark 3.2.0版本的二进制文件,以及针对Hadoop 3.2的兼容构建。 Spark的核心组件包括:Spark Core、Spark SQL、Spark Streaming、MLlib(机器学习库)和GraphX(图...

    spark-2.1.0-bin-without-hadoop版本的压缩包,直接下载到本地解压后即可使用

    在Ubuntu里安装spark,spark-2.1.0-bin-without-hadoop该版本直接下载到本地后解压即可使用。 Apache Spark 是一种用于大数据工作负载的分布式开源处理系统。它使用内存中缓存和优化的查询执行方式,可针对任何规模...

    spark-assembly-1.5.2-hadoop2.6.0.jar

    《Spark编程核心组件:spark-assembly-1.5.2-hadoop2.6.0.jar详解》 在大数据处理领域,Spark以其高效、易用和灵活性脱颖而出,成为了许多开发者的首选框架。Spark-assembly-1.5.2-hadoop2.6.0.jar是Spark中的一个...

Global site tag (gtag.js) - Google Analytics