`

Spark在submitStage后如何通过clustermanager调度执行task到Driver接收计算结果的代码解析

阅读更多
前文: http://humingminghz.iteye.com/blog/2314269

前面先看到了从action入口到如何切分stage, 随后submit stage的过程, 那么既然stage被submit了, 接下来就应该是cluster manager去分配各个任务到prefer location的executor上面去执行了.

submitstage的方法, 最终会把当前stage相关的所有祖先stage都提交(isActive=false),并把当前stage放到waiting的stage里面, 等所有前部stage执行完后, 再执行当前stage。 每个stage都有前后关系, 这也是为什么任意一个stage失败后, spark只需重新执行fail的stage, 而不需要执行所有的stage的原因。

好了, 我们看看submitstage里面做了什么:
  private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          submitMissingTasks(stage, jobId.get)
        } else {
          for (parent <- missing) {
            submitStage(parent)
          }
          waitingStages += stage
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id, None)
    }
  }


getMissingParentStages这个方法就是要去看是不是前部的stage都已经存在了, 如果没有的话, 会把当前stage放到waitingStages里面, 然后继续通过submitStage(parent)去submit所有的未执行的前部stage。

spark里面会找到所有的前部stage, 先执行有依赖关系的stage, 当当前stage没有未执行的前部stage的时候就通过submitMissingTasks 去提交当前stage的task。

submitMissingTasks 很长, 我就截取其中比较重要的几个部分:

  private def submitMissingTasks(stage: Stage, jobId: Int) {

  ...
  
  runningStages += stage
  
  ...
  
  val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
      stage match {
        case s: ShuffleMapStage =>
          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
        case s: ResultStage =>
          val job = s.activeJob.get
          partitionsToCompute.map { id =>
            val p = s.partitions(id)
            (id, getPreferredLocs(stage.rdd, p))
          }.toMap
      }
    } catch {
      case NonFatal(e) =>
        stage.makeNewStageAttempt(partitionsToCompute.size)
        listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
        abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))
        runningStages -= stage
        return
    }
	
	
...

 var taskBinary: Broadcast[Array[Byte]] = null
    try {
      // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
      // For ResultTask, serialize and broadcast (rdd, func).
      val taskBinaryBytes: Array[Byte] = stage match {
        case stage: ShuffleMapStage =>
          closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()
        case stage: ResultStage =>
          closureSerializer.serialize((stage.rdd, stage.func): AnyRef).array()
      }

      taskBinary = sc.broadcast(taskBinaryBytes)
    }
	
	
	...
	
	    val tasks: Seq[Task[_]] = try {
      stage match {
        case stage: ShuffleMapStage =>
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = stage.rdd.partitions(id)
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, stage.internalAccumulators)
          }

        case stage: ResultStage =>
          val job = stage.activeJob.get
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = stage.rdd.partitions(p)
            val locs = taskIdToLocations(id)
            new ResultTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, id, stage.internalAccumulators)
          }
      }
    }
	
	...
	
	    if (tasks.size > 0) {
      logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
      stage.pendingPartitions ++= tasks.map(_.partitionId)
      logDebug("New pending partitions: " + stage.pendingPartitions)
      taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
    }
	
	...
	
	}


首先会把当前stage加入到runningstage里面, 防止被重复提交。

然后取得preferred location 存到taskIdToLocations里面。 这里面主要是通过getPreferredLocs(stage.rdd, p)获取。

获取到了task可以执行的worknode地址后, 创建taskBinary, 这个是用来序列化当前task, 序列化了后就可以分发到每台机器上面去反序列化再执行。 主要包含了 stage的rdd以及对这个rdd的func, 比如前面文章说的WriteToFile这个

接着创建tasks变量, 这个是实际的task, 我们前面是ResultStage, 所以这里创建了一堆ResultStage (根据partition数量来)

最后根据tasks来创建taskset以及submittask:
taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))

那么一步一步看, 先看怎么拿到preferredLocation的:
  private[spark]
  def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
    getPreferredLocsInternal(rdd, partition, new HashSet)
  }


看一下getPreferredLocsInternal:
private def getPreferredLocsInternal(
      rdd: RDD[_],
      partition: Int,
      visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
    // If the partition has already been visited, no need to re-visit.
    // This avoids exponential path exploration.  SPARK-695
    if (!visited.add((rdd, partition))) {
      // Nil has already been returned for previously visited partitions.
      return Nil
    }
    // If the partition is cached, return the cache locations
    val cached = getCacheLocs(rdd)(partition)
    if (cached.nonEmpty) {
      return cached
    }
    // If the RDD has some placement preferences (as is the case for input RDDs), get those
    val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
    if (rddPrefs.nonEmpty) {
      return rddPrefs.map(TaskLocation(_))
    }

    // If the RDD has narrow dependencies, pick the first partition of the first narrow dependency
    // that has any placement preferences. Ideally we would choose based on transfer sizes,
    // but this will do for now.
    rdd.dependencies.foreach {
      case n: NarrowDependency[_] =>
        for (inPart <- n.getParents(partition)) {
          val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
          if (locs != Nil) {
            return locs
          }
        }

      case _ =>
    }

    Nil
  }


首先如果这个partition被cache过, 那么就返回这个cache的location, 这样就可以直接用这个partition, 减少重复计算。 这里调用了getCacheLocs来获取:
def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized {
    // Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times
    if (!cacheLocs.contains(rdd.id)) {
      // Note: if the storage level is NONE, we don't need to get locations from block manager.
      val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel == StorageLevel.NONE) {
        IndexedSeq.fill(rdd.partitions.length)(Nil)
      } else {
        val blockIds =
          rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId]
        blockManagerMaster.getLocations(blockIds).map { bms =>
          bms.map(bm => TaskLocation(bm.host, bm.executorId))
        }
      }
      cacheLocs(rdd.id) = locs
    }
    cacheLocs(rdd.id)
  }


看到如果StorageLevel.NONE 那么就不从blockmanager去拿location了, 只有选择了memory或者disk的时候才会去跑else里面的代码。 在else里面是跟去rdd去计算出blockid,然后再从blockmanagermaster里面拿回TaskLocation。

如果这个partition没有被cache过, 那么就会去执行rdd.preferredLocations去看当前rdd有没有preferredLocation, 如果有直接就返回了, 如果没有, 那么就继续去rdd.dependencies里面看窄连接的RDD里面有没有preferred的location。 如果都没有, 那么就返回空。


可以看到找preferredLocation的先后顺序是:
1.Cache
2.当前RDD有没有preferredLocation
3.窄连接RDD中有没有preferredLocation

那么拿到preferredLocation过后就会去序列化stage.rdd及stage.func最后做成一个广播变量(所有的spark节点都能接收到) taskBinary。

再之后就通过stage, taskBinary, preferredLocation来创建一堆task的集合, 按照我们这条线下来, 就是ResultTask的集合。

再接下来就是把这个task集合创建成taskset, 再由taskScheduler (TaskSchedulerImpl) 去submit:

taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())

看一下taskSchedulerImpl里面的submitTasks方法:
  override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      val stage = taskSet.stageId
      val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      stageTaskSets(taskSet.stageAttemptId) = manager
      val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
        ts.taskSet != taskSet && !ts.isZombie
      }
      if (conflictingTaskSet) {
        throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
          s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
      }
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
      }
      hasReceivedTask = true
    }
    backend.reviveOffers()
  }



看上去代码很复杂, 其实这里就是做了创建tasksetManager, 然后把taskSetManager放到schedulebleBuilder里面, 最终调用backend的reviveOffers方法。

backend是在SparkContext里面创建的, 我们的local模式的话就是LocalBackend:
      case "local" =>
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalBackend(sc.getConf, scheduler, 1)
        scheduler.initialize(backend)
        (backend, scheduler)


那么看一下LocalBackend里面的reviveOffers方法:
override def start() {
    val rpcEnv = SparkEnv.get.rpcEnv
    val executorEndpoint = new LocalEndpoint(rpcEnv, userClassPath, scheduler, this, totalCores)
    localEndpoint = rpcEnv.setupEndpoint("LocalBackendEndpoint", executorEndpoint)
    listenerBus.post(SparkListenerExecutorAdded(
      System.currentTimeMillis,
      executorEndpoint.localExecutorId,
      new ExecutorInfo(executorEndpoint.localExecutorHostname, totalCores, Map.empty)))
    launcherBackend.setAppId(appId)
    launcherBackend.setState(SparkAppHandle.State.RUNNING)
  }

  override def reviveOffers() {
    localEndpoint.send(ReviveOffers)
  }




可以看到这里直接通过localEndpoint发送了一个message。 到这里得提一下Akka了, 我不是很了解, 随便查了下资料, 貌似akka有actor和actorRef, 这个类LocalBackend应该就是actor了, 当actor send一个message后, actorRef会receive这个message, 然后执行相关方法。 这里面就是通过在LocalBackend的start方法里面localEndpoint = rpcEnv.setupEndpoint 关联了actorRef: LocalBackendEndpoint, 然后执行LocalBackendEndpoint中的receive方法, 我们看一下LocalBackendEndpoint里面的reveive方法:

private val executor = new Executor(
    localExecutorId, localExecutorHostname, SparkEnv.get, userClassPath, isLocal = true)

override def receive: PartialFunction[Any, Unit] = {
    case ReviveOffers =>
      reviveOffers()

    case StatusUpdate(taskId, state, serializedData) =>
      scheduler.statusUpdate(taskId, state, serializedData)
      if (TaskState.isFinished(state)) {
        freeCores += scheduler.CPUS_PER_TASK
        reviveOffers()
      }

    case KillTask(taskId, interruptThread) =>
      executor.killTask(taskId, interruptThread)
  }

  def reviveOffers() {
    val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
    for (task <- scheduler.resourceOffers(offers).flatten) {
      freeCores -= scheduler.CPUS_PER_TASK
      executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber,
        task.name, task.serializedTask)
    }
  }



看到receive里面会调用reviveOffers, 在reviveOffers有两个重要的方法:
1.scheduler.resourceOffers
2. executor.launchTask

先看第一个:

  def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    // Mark each slave as alive and remember its hostname
    // Also track if new executor is added
    var newExecAvail = false
    for (o <- offers) {
      executorIdToHost(o.executorId) = o.host
      executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
      if (!executorsByHost.contains(o.host)) {
        executorsByHost(o.host) = new HashSet[String]()
        executorAdded(o.executorId, o.host)
        newExecAvail = true
      }
      for (rack <- getRackForHost(o.host)) {
        hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
      }
    }

    // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
    val shuffledOffers = Random.shuffle(offers)
    // Build a list of tasks to assign to each worker.
    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
    val availableCpus = shuffledOffers.map(o => o.cores).toArray
    val sortedTaskSets = rootPool.getSortedTaskSetQueue
    for (taskSet <- sortedTaskSets) {
      logDebug("parentName: %s, name: %s, runningTasks: %s".format(
        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
      if (newExecAvail) {
        taskSet.executorAdded()
      }
    }

    // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
    // of locality levels so that it gets a chance to launch local tasks on all of them.
    // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
    var launchedTask = false
    for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
      do {
        launchedTask = resourceOfferSingleTaskSet(
            taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
      } while (launchedTask)
    }

    if (tasks.size > 0) {
      hasLaunchedTask = true
    }
    return tasks
  }


也蛮长的, 主要就是做:
1.Random.shuffle(offers) 打乱, 防止一个worker上面被执行太多的task
2.通过resourceOfferSingleTaskSet 做了为task分配executor的动作

到这里为止, 我们基本可以看到以下两点都已经被定义完了:
1.去哪台机器上面跑task
2.task在哪个executor里面执行

这样任务的调度就完成了

然后再看executor的launchTask代码:
  def launchTask(
      context: ExecutorBackend,
      taskId: Long,
      attemptNumber: Int,
      taskName: String,
      serializedTask: ByteBuffer): Unit = {
    val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
      serializedTask)
    runningTasks.put(taskId, tr)
    threadPool.execute(tr)
  }


其实就是在线程池里面起一个线程去跑tr (taskrunner)里面的run方法, 看一下TaskRunner: 很长, 也截取重要的部分
class TaskRunner(
      execBackend: ExecutorBackend,
      val taskId: Long,
      val attemptNumber: Int,
      taskName: String,
      serializedTask: ByteBuffer)
    extends Runnable {
	

execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
	...
	
	val (value, accumUpdates) = try {
          val res = task.run(
            taskAttemptId = taskId,
            attemptNumber = attemptNumber,
            metricsSystem = env.metricsSystem)
          threwException = false
          res
        } 
	
	...
	
	val valueBytes = resultSer.serialize(value)
	
	...
	
	val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull)
        val serializedDirectResult = ser.serialize(directResult)
        val resultSize = serializedDirectResult.limit
		
	
	...
	
	        val serializedResult: ByteBuffer = {
          if (maxResultSize > 0 && resultSize > maxResultSize) {
            logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
              s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
              s"dropping it.")
            ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
          } else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
            val blockId = TaskResultBlockId(taskId)
            env.blockManager.putBytes(
              blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
            logInfo(
              s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
            ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
          } else {
            logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
            serializedDirectResult
          }
        }

        execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
		
		...
	
	
	
	}


首先执行execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)

这样就会触发在LocalEndpoint的statusUpdate:
    case StatusUpdate(taskId, state, serializedData) =>
      scheduler.statusUpdate(taskId, state, serializedData)
      if (TaskState.isFinished(state)) {
        freeCores += scheduler.CPUS_PER_TASK
        reviveOffers()
      }


最终执行的是scheduler.statusUpdate, 里面只是做一些map里面存的内容的删减。

接下来run方法里面回去执行task.run 方法, 看一下这个方法是做什么的:
final def run(
    taskAttemptId: Long,
    attemptNumber: Int,
    metricsSystem: MetricsSystem)
  : (T, AccumulatorUpdates) = {
    context = new TaskContextImpl(
      stageId,
      partitionId,
      taskAttemptId,
      attemptNumber,
      taskMemoryManager,
      metricsSystem,
      internalAccumulators,
      runningLocally = false)
    TaskContext.setTaskContext(context)
    context.taskMetrics.setHostname(Utils.localHostName())
    context.taskMetrics.setAccumulatorsUpdater(context.collectInternalAccumulators)
    taskThread = Thread.currentThread()
    if (_killed) {
      kill(interruptThread = false)
    }
    try {
      (runTask(context), context.collectAccumulators())
    } catch {
      case e: Throwable =>
        // Catch all errors; run task failure callbacks, and rethrow the exception.
        try {
          context.markTaskFailed(e)
        } catch {
          case t: Throwable =>
            e.addSuppressed(t)
        }
        throw e
    } finally {
      // Call the task completion callbacks.
      context.markTaskCompleted()
      try {
        Utils.tryLogNonFatalError {
          // Release memory used by this thread for unrolling blocks
          SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask()
          // Notify any tasks waiting for execution memory to be freed to wake up and try to
          // acquire memory again. This makes impossible the scenario where a task sleeps forever
          // because there are no other tasks left to notify it. Since this is safe to do but may
          // not be strictly necessary, we should revisit whether we can remove this in the future.
          val memoryManager = SparkEnv.get.memoryManager
          memoryManager.synchronized { memoryManager.notifyAll() }
        }
      } finally {
        TaskContext.unset()
      }
    }
  }


里面主要返回和执行了:(runTask(context), context.collectAccumulators())

这个runTask是在Task类的子类 ResultTask里面的, 我们看一下:
override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val deserializeStartTime = System.currentTimeMillis()
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

    metrics = Some(context.taskMetrics)
    func(context, rdd.iterator(partition, context))
  }


看到了吗, 里面是一个反序列化动作, 把我们之前序列化的rdd和func拿出来。 现在已经在runtask了, 所以我们已经在目标机器的spark节点上面的executor里面跑task, 我们会把之前的广播变量taskBinary获取然后反序列化回来 value 和func, 这样就可以本地执行了。 最后调用func方法, 把就是我们的writeToFile的计算结果返回。

接下来再把这个value 结果序列化, 为传回Driver做准备:
val valueBytes = resultSer.serialize(value)

根据valueBytes创建 DirectTaskResult并序列化生成serializedDirectResult 对象
val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull)

val serializedDirectResult = ser.serialize(directResult)



再然后根据各种情况, 生成并IndirectTaskResult或者直接返回DirectTaskResult
val serializedResult: ByteBuffer = {
          if (maxResultSize > 0 && resultSize > maxResultSize) {
            logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
              s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
              s"dropping it.")
            ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
          } else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
            val blockId = TaskResultBlockId(taskId)
            env.blockManager.putBytes(
              blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
            logInfo(
              s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
            ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
          } else {
            logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
            serializedDirectResult
          }
        }


然后继续调用Localbackend的statusUpdate方法
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)


在localbackend里面的statusUpdate:

  override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) {
    localEndpoint.send(StatusUpdate(taskId, state, serializedData))
  }



这样就会调用endpoint里面的receive方法:
 override def receive: PartialFunction[Any, Unit] = {
    case ReviveOffers =>
      reviveOffers()

    case StatusUpdate(taskId, state, serializedData) =>
      scheduler.statusUpdate(taskId, state, serializedData)
      if (TaskState.isFinished(state)) {
        freeCores += scheduler.CPUS_PER_TASK
        reviveOffers()
      }

    case KillTask(taskId, interruptThread) =>
      executor.killTask(taskId, interruptThread)
  }


看到了, 实际就是调用TaskSchedulerImpl的statusUpdate, 传参TaskState.FINISHED
:
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
    ...
            if (state == TaskState.FINISHED) {
              taskSet.removeRunningTask(tid)
              taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
            } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
              taskSet.removeRunningTask(tid)
              taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
            }
...
  }


可以看到如果是Finished的状态那么就会通过taskResultGetter来把计算结果从spark节点 (executor)上面拿回Driver上(sparkcontext就是Driver, scheduler实在SC上创建的, 当然这里也是Driver了)。

那么我们看一下拿回来后做了什么:
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)

def enqueueSuccessfulTask(
    taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
    getTaskResultExecutor.execute(new Runnable {
      override def run(): Unit = Utils.logUncaughtExceptions {
        try {
          val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
            case directResult: DirectTaskResult[_] =>
              if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
                return
              }
              // deserialize "value" without holding any lock so that it won't block other threads.
              // We should call it here, so that when it's called again in
              // "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value.
              directResult.value()
              (directResult, serializedData.limit())
            case IndirectTaskResult(blockId, size) =>
              if (!taskSetManager.canFetchMoreResults(size)) {
                // dropped by executor if size is larger than maxResultSize
                sparkEnv.blockManager.master.removeBlock(blockId)
                return
              }
              logDebug("Fetching indirect task result for TID %s".format(tid))
              scheduler.handleTaskGettingResult(taskSetManager, tid)
              val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
              if (!serializedTaskResult.isDefined) {
                /* We won't be able to get the task result if the machine that ran the task failed
                 * between when the task ended and when we tried to fetch the result, or if the
                 * block manager had to flush the result. */
                scheduler.handleFailedTask(
                  taskSetManager, tid, TaskState.FINISHED, TaskResultLost)
                return
              }
              val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
                serializedTaskResult.get)
              sparkEnv.blockManager.master.removeBlock(blockId)
              (deserializedResult, size)
          }

          result.metrics.setResultSize(size)
          scheduler.handleSuccessfulTask(taskSetManager, tid, result)
        } catch {
          case cnf: ClassNotFoundException =>
            val loader = Thread.currentThread.getContextClassLoader
            taskSetManager.abort("ClassNotFound with classloader: " + loader)
          // Matching NonFatal so we don't catch the ControlThrowable from the "return" above.
          case NonFatal(ex) =>
            logError("Exception while getting task result", ex)
            taskSetManager.abort("Exception while getting task result: %s".format(ex))
        }
      }
    })
  }


可以看到直接在Driver上面起了一个线程拿数据, 数据如果是DirectTaskResult那么就不用做什么了, 把value直接返回就可以了, 如果是IndirectTaskResult, 那么就要从blockmanager通过remote的去取回序列化的对象:
val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)


然后再反序列化, 返回结果:
 val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
                serializedTaskResult.get)
              sparkEnv.blockManager.master.removeBlock(blockId)
              (deserializedResult, size)



好了 到现在为止我们可以明白spark是在分割完stage后, 如何找到相应的spark节点上相应的executor去跑这些task, 然后跑完后结果是怎么返回到Driver端的。

其他模式(不是local)其实也差不多, 基本上就是actor和actorRef不一样, 基本上按照这个顺序看是没有问题的, 就不一一写了

先到这里把, 有什么不对的 烦请指正
分享到:
评论

相关推荐

    Spark源码分析2-Driver generate jobs and launch task

    6. **执行Task**:Executor接收到Task后,会在其工作内存中创建Task实例并执行。Executor会缓存中间结果,以便后续Task复用,从而提高性能。 7. **结果收集**:所有的Task执行完成后,Driver会收集计算结果,并可能...

    Spark内核解析.docx

    《Spark内核解析》 Spark作为大数据处理领域的明星框架,其内核设计的高效和灵活性使其在处理大规模数据时展现出卓越性能。Spark的核心运行机制主要包括Spark核心组件、任务调度、内存管理和核心功能的运行原理。...

    spark Software Components架构图及Task Scheduler架构

    《Spark软件组件架构图及Task Scheduler架构解析》 Spark,作为大数据处理领域的明星框架,以其高效、易用和可扩展性赢得了广泛的认可。本文将深入探讨Spark的核心组件架构及其Task Scheduler的设计原理,帮助读者...

    Spark-内核源码解析.docx

    在 Spark 中,Application 指的是用户编写的 Spark 应用程序/代码,包含了 Driver 功能代码和分布在集群中多个节点上运行的 Executor 代码。一个 Spark 应用程序由一个或多个作业 JOB 组成,Driver 负责和 Cluster...

    Apache Spark源码走读之2 -- Job的提交与运行

    Apache Spark是当前大数据处理...通过以上的知识点,可以看出Apache Spark作业的提交与运行涉及到了进程和线程的创建与管理、任务的调度与执行等复杂的过程,这些过程共同保证了Spark作业高效、稳定地在集群上运行。

    Apache Spark源码走读之3 -- Task运行期之函数调用关系分析

    此外,Task在执行过程中会利用BlockManager进行数据的缓存与读取,通过ShuffleFetcher进行Shuffle数据的获取等。 #### 结论 通过对Spark中Task执行期间的函数调用关系分析,我们可以更加深入地理解Spark的工作原理...

    spark笔记整理文档

    提交Spark作业后,Driver会将作业分解为Stage(基于shuffle划分),然后提交Task到Executor执行。Executor在内存中缓存数据,并在本地执行任务,提高整体性能。 7. Spark内存管理: Spark利用内存存储中间结果,...

    Spark简介以及其生态圈

    在不同集群中的运行演示部分,通常会展示如何在Standalone和YARN模式下启动Spark-Shell,如何提交Spark应用程序,并通过具体案例来分析运行结果。同时,在问题解决部分,会针对可能遇到的问题,如YARN-CLIENT启动...

    实时计算框架:Spark集群搭建与入门案例.docx

    Spark通过DAG(有向无环图)执行引擎优化了计算流程,支持多种计算模式,包括批处理、交互式查询(Spark SQL)、流处理(Spark Streaming)和机器学习(MLlib)等。 Spark架构主要包括以下几个组件: 1. **Driver**...

    04_尚硅谷大数据技术之Spark内核1

    Executor具备两个关键功能:执行任务并将结果返回给Driver,以及通过Block Manager提供内存存储以缓存RDD。当Executor发生故障时,Spark能将任务重新调度到其他健康的Executor上执行。 ### 1.2 Spark通用运行流程 ...

    spark运行原理讲解

    当Spark接收到行动操作后,会构建一个有向无环图(DAG),表示所有转换操作的顺序。DAG Scheduler将DAG分解为任务阶段(Stage),阶段内的任务可以并行执行。Stage划分基于宽依赖(Shuffle Dependency),即涉及全...

    大数据Spark源码

    2. Task调度:TaskScheduler将Stage分解为TaskSet,并通过ClusterManager分配到Executor。源码中可以学习到如何依据资源需求和可用性进行任务调度。 四、数据存储与计算 1. Shuffle管理:Spark的Shuffle过程涉及...

    Spark技术内幕深入解析Spark内核架构设计与实现原理

    Driver负责任务调度,Cluster Manager管理资源,Worker Node执行任务。 4. **RDD(Resilient Distributed Datasets)**:RDD是Spark的基本数据抽象,是不可变、分区的记录集合,具有容错性。理解RDD的创建、转换和...

    Spark原理及源码剖析1

    Driver的工作包括申请资源、注册Executor、解析作业、生成Stage并调度Task,监控Task执行,最后释放资源并注销应用程序。 4. Executor:是JVM进程,执行具体的Task。Executor在Spark应用启动时启动,伴随整个应用...

    4 Spark架构及内部实现剖析.pdf

    在执行过程中,Spark的任务调度器(TaskScheduler)会将这些task提交到集群中执行。如果某个task执行失败,Spark会自动重新提交失败的task或重新调度那些运行缓慢的task。 Spark的任务调度器分为两个组件:...

    Spark源码分析3-The connect between driver,master and excutor

    驱动器通过`SparkEnv`中的`RpcEnv`与主节点建立连接,并通过`ClusterManager`与主节点通信,请求资源。主节点响应后,驱动器将任务分发到Executor。Executor之间通过`BlockManager`进行数据交换,使用`RDD`的分区...

    Spark技术内幕:深入解析Spark内核架构设计与实现原理

    3. **Cluster Manager**: 负责资源调度,如YARN、Mesos或独立的Spark Standalone模式。 4. **Executor**: 在每个工作节点上运行,执行任务并缓存数据,提供内存和磁盘存储。 **DAG执行模型** Spark通过DAG执行模型...

    【Spark内核篇02】Spark模式运行机制1

    1. 在Client模式下,SparkSubmit直接启动Driver线程执行用户代码,而不需要通过AM。 2. Driver创建ScheduleBackend与YarnClientSchedulerBackend交互,向RM申请ExecutorLauncher。 3. ExecutorLauncher启动...

    cloudera-spark 官方文档

    - **Executor Processes**:执行器进程是由Worker Node启动的,用于执行Task并存储计算结果。 - **Cluster Manager**:集群管理器负责资源分配和任务调度,它可以是独立模式(Standalone)、YARN 或 Mesos。 #### ...

    ApacheSpark设计与实现.pdf+ApacheSpark源码剖析.pdf+Spark原著中文版.pdf

    3. **Spark架构**:Driver、Executor、Cluster Manager的角色,以及如何通过SparkContext启动计算任务。 4. **内存管理**:Spark如何利用内存进行快速计算,包括Tachyon和Spark Shuffle的过程。 5. **容错机制**:...

Global site tag (gtag.js) - Google Analytics