`

[spark-src-core] 2.2 job submitted flow for local mode-part I

 
阅读更多

  now we will dive into spark internal as per this simple example(wordcount,later articles will reference this one by default) below

sparkConf.setMaster("local[2]") //-local[*] by default
//leib-confs:output all the dependencies logs
sparkConf.set("spark.logLineage","true")

val sc = new org.apache.spark.SparkContext(sparkConf)
//    (1) ShuffledRDD[4] at reduceByKey at ScalaWordCount.scala:44 []
//    +-(1) MapPartitionsRDD[3] at map at ScalaWordCount.scala:42 []
//    |  MapPartitionsRDD[2] at flatMap at ScalaWordCount.scala:41 []
//    |  MapPartitionsRDD[1] at textFile at ScalaWordCount.scala:34 []
//    |  ../spark-1.4.1/examples/src/main/resources/CHANGES.txt HadoopRDD[0] at textFile at ScalaWordCount.scala:34 []
val rdd = sc.textFile(file) //MapPartitionsRDD[1]-->HadoopRDD[0] examples/src/....Changes.txt

val fmrdd = rdd.flatMap(_.split(" ")) //-MapPartitionsRDD[2]
val maprdd = fmrdd.map((_,1)) //-MapPartitionsRDD[3]
//-this file is 584k
val rstrdd = maprdd.reduceByKey((x,y) => x+y) //-ShuffledRDD[4]; reduceByKey() will not spawn a real computating event

val arr = rstrdd.collect()  //-action now,but collect() does
val max = 10 //Integer.MAX_VALUE
var count = 0
if(arr.length > max){ //-if want to limit the result size,use rdd.take(n) or rdd.top(n) is better
  println(s"*reduce output to limit ${max},found ${arr.length}")
}
//itereately without order
for((ele,num) <- arr if(count < max)){
  count += 1
  println(ele + "," + num)
}   

 

 

  overview of job submitting flow

 



   communication figure of a job

 



    sequence figure in map side computation



   pseudo code view that corresponding to spark src 

 ======

 several kernel code paths

-job submitting method in DAGScheduler

/**-handle job submit event */
  private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,  //-case 1:spark internal collective func,ie it.toArray()
      partitions: Array[Int], //-from most recent rdd's partitions,see SparkContext#runJob()-L1979
      allowLocal: Boolean,
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.-->>parent ShuffleMapStage is created here<<
      finalStage = newResultStage(finalRDD, partitions.size, jobId, callSite)
    } catch {
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }
    if (finalStage != null) {
      val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) //-one job per action
      clearCacheLocs()
      logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
        job.jobId, callSite.shortForm, partitions.length, allowLocal))
      logInfo("Final stage: " + finalStage + "(" + finalStage.name + "),rdd " + finalRDD)//-ShuffleRDD for ScalaWordCount
      logInfo("Parents of final stage: " + finalStage.parents)
      logInfo("Missing parents: " + getMissingParentStages(finalStage))
      val shouldRunLocally =
        localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
      val jobSubmissionTime = clock.getTimeMillis()
      if (shouldRunLocally) {
        // Compute very short actions like first() or take() with no parent stages locally.
        listenerBus.post(
          SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))
        runLocally(job) //-run with same action's process in local host; mini spark job runner:no job/task schedule
      } else {  //-eg. reduceByKey()
        jobIdToActiveJob(jobId) = job
        activeJobs += job
        finalStage.resultOfJob = Some(job)
        val stageIds = jobIdToStageIds(jobId).toArray
        val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) //-one element in fact
        listenerBus.post( //-below is similar to runLocally()
          SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
        submitStage(finalStage) //-ResultStage如何提交?see into;一个job的各个map操作如何完成?在RDD#compute()中有迭代iterator()
      }
    }
    submitWaitingStages() //-check whether any waiting stages to submit
  }

 

/** Submits stage, but first *recursively* submits any missing parents.-ie. from earlier to later by executed order */
  private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logInfo("*submitStage(" + stage + ")")
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        val missing = getMissingParentStages(stage).sortBy(_.id)  //-from old steps to new; tasks belong to this stage
        logInfo("-*missing: " + missing)
        if (missing.isEmpty) {  //-submit the root/first stages only
          logInfo("--*Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          submitMissingTasks(stage, jobId.get) //-this is first stage,so submit all tasks w/o dependencies
        } else {
          for (parent <- missing) { //-not leaf node,recursively calling self
            submitStage(parent)
          }
          //-resultstage is set here
          //-exists parent stage,so add thsi stage to wating queue.it will be scheduled after the prior stages finished,
          waitingStages += stage //-submit stage by stage,so keep child stage here;,see this.onReceive()>ComplementEvent
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id)
    }
  

   generates DAG then split stage to tasks,submits tasks at last below

/** Called when stage's parents are available and we can now do its task. */
  private def submitMissingTasks(stage: Stage, jobId: Int) {
    logDebug("submitMissingTasks(" + stage + ")")
    // Get our pending tasks and remember them in our pendingTasks entry
    stage.pendingTasks.clear()

    // First figure out the indexes of partition ids to compute.-empty or unfinished partitions
    val partitionsToCompute: Seq[Int] = { //-control how many tasks will be generated
      stage match {
        case stage: ShuffleMapStage =>
          (0 until stage.numPartitions).filter(id => stage.outputLocs(id).isEmpty)
        case stage: ResultStage =>
          val job = stage.resultOfJob.get
          (0 until job.numPartitions).filter(id => !job.finished(id))
      }
    }

    val properties = jobIdToActiveJob.get(stage.jobId).map(_.properties).orNull
    //-mark current running stage
    runningStages += stage
    // SparkListenerStageSubmitted should be posted before testing whether tasks are
    // serializable. If tasks are not serializable, a SparkListenerStageCompleted event
    // will be posted, which should always come after a corresponding SparkListenerStageSubmitted
    // event.
    stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size))
    outputCommitCoordinator.stageStart(stage.id)
    listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

    // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
    // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
    // the serialized copy of the RDD and for each task we will deserialize it, which means each
    // task gets a different copy of the RDD. This provides stronger isolation between tasks that
    // might modify state of objects referenced in their closures. This is necessary in Hadoop
    // where the JobConf/Configuration object is not thread-safe.
    var taskBinary: Broadcast[Array[Byte]] = null
    try {
      // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
      // For ResultTask, serialize and broadcast (rdd, func).
      val taskBinaryBytes: Array[Byte] = stage match {
        case stage: ShuffleMapStage =>
          closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array() //-use stage's rdd as task's rdd
        case stage: ResultStage =>
          closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func): AnyRef).array()
      }

      taskBinary = sc.broadcast(taskBinaryBytes)
    } catch {
      // In the case of a failure during serialization, abort the stage.
      case e: NotSerializableException =>
        abortStage(stage, "Task not serializable: " + e.toString)
        runningStages -= stage

        // Abort execution
        return
      case NonFatal(e) =>
        abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}")
        runningStages -= stage
        return
    }
    //-note:tasks generation policy
    val tasks: Seq[Task[_]] = try {
      stage match {
        case stage: ShuffleMapStage =>  //-intermediate stage,see below
          partitionsToCompute.map { id => //-partitioned by dep-rdd's partitions( maybe deliver to root rdd's partitions)
            val locs = getPreferredLocs(stage.rdd, id)//-several locations belong to the partition,similar to hdfs blocks
            val part = stage.rdd.partitions(id) //-access the indexed id partition
            new ShuffleMapTask(stage.id, taskBinary, part, locs)  //-corresponding task
          }

        case stage: ResultStage =>  //-final stage
          val job = stage.resultOfJob.get
          partitionsToCompute.map { id =>
            val p: Int = job.partitions(id) //-a map from job partition to stage.rdd's one
            val part = stage.rdd.partitions(p)
            val locs = getPreferredLocs(stage.rdd, p) //- the replication hosts of same block?yes
            //-track info
            for(par <- stage.rdd.partitions)
              logInfo("-part/rdd:" + par + "/" + stage.rdd)
            for(loc <- locs)
              logInfo("-loc %s".format(loc))
            //-# resulttask is depended on partitions num
            new ResultTask(stage.id, taskBinary, part, locs, id)  //-corresponding task
          }
      }
    } catch {
      case NonFatal(e) =>
        abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}")
        runningStages -= stage
        return
    }

    if (tasks.size > 0) { //-case reduceByKey(), 1 for it
      logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
      stage.pendingTasks ++= tasks
      logDebug("New pending tasks: " + stage.pendingTasks)
      taskScheduler.submitTasks(  //-deliver tasks schedule to TaskSchedulerImpl
        new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
    } else {
      // Because we posted SparkListenerStageSubmitted earlier, we should mark
      // the stage as completed here in case there are no tasks to run
      markStageAsFinished(stage, None)

      val debugString = stage match {
        case stage: ShuffleMapStage =>
          s"Stage ${stage} is actually done; " +
            s"(available: ${stage.isAvailable}," +
            s"available outputs: ${stage.numAvailableOutputs}," +
            s"partitions: ${stage.numPartitions})"
        case stage : ResultStage =>
          s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
      }
      logDebug(debugString)
    }
  

 

-ShuffleMapTask core method

override def runTask(context: TaskContext): MapStatus = {
    // Deserialize the RDD using the broadcast variable.
    val deserializeStartTime = System.currentTimeMillis()
    val ser = SparkEnv.get.closureSerializer.newInstance()
    //-task serialization  see DAGScheduler#submitMissingTasks(); the rdd is last rdd in this stage(shufflemapstage)
    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( //-restore the direct-recent parent rdd
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

    metrics = Some(context.taskMetrics)
    var writer: ShuffleWriter[Any, Any] = null
    try {
      val manager = SparkEnv.get.shuffleManager //-SortShuffleManager by default
      //'SortShuffleWriter'
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) //-'BaseShuffleHandler'
      //-first,compute the user's lastest job business(eg. reduceByKey()) by rdd.iterator(xx);then output the result
      logInfo("precomputing,task:" + toString + ",dep:" + dep
            + ",handle:" + dep.shuffleHandle +",part:" +partitionId + ",rdd " + rdd)
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])//-invoke recusively
      return writer.stop(success = true).get //-release resources then return MapStatus,see SortShuffleWriter#write()
    } catch {
      case e: Exception =>
        try {
          if (writer != null) {
            writer.stop(success = false)
          }
        } catch {
          case e: Exception =>
            log.debug("Could not stop writer", e)
        }
        throw e
    }
  }

 

  • 大小: 35.3 KB
  • 大小: 63.5 KB
  • 大小: 90.8 KB
  • 大小: 139.3 KB
分享到:
评论

相关推荐

    nginx-upload-module

    Nginx上传模块该模块基于Nginx上传模块(v 2.2.0) 。 ...由于似乎作者没有维护该... } # Upload form should be submitted to this location location /upload { # Pass altered request body to this location uploa

    Machine Learning and Intelligent Communications_MLICOM,Part I-Springer(2017)

    We are delighted to introduce the proceedings of the second edition of the 2017 European Alliance for Innovation (EAI) International Conference on Machine Learning and Intelligent Communications ...

    a project model for the FreeBSD Project.7z

    A project model for the FreeBSD Project Niklas Saers Copyright © 2002-2005 Niklas Saers [ Split HTML / Single HTML ] Table of Contents Foreword 1 Overview 2 Definitions 2.1. Activity 2.2. Process ...

    hadoop 2.7.6 eclipse插件

    18/05/25 19:51:35 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1527248744555_0001 18/05/25 19:51:36 INFO impl.YarnClientImpl: Submitted application application_1527248744555_0001 18/05/...

    Python安装包version 3.1.5

    Enhancement Proposal (PEP) may be submitted if your idea gains ground. All current PEPs, as well as guidelines for submitting a new PEP, are listed at http://www.python.org/dev/peps/. Release ...

    Competition-Based Development of Image Processing Algorithms

    with correctly recognized figure titles, and 70.98% for part label detection and character recognition. Data and software from the competition are available through the online UCI Machine Learning ...

    ICS delphixe10源码版

    I can't list all contributors here but I want to specially thanks two specially active contributors: - Arno Garrels - Angus Robertson Latest versions: --------------- The latest versions of ICS...

    基于改进光流法的视觉SLAM1

    Simultaneous Localization and Mapping (SLAM) technology is a core research topic in mobile robotics, unmanned aerial vehicles (UAVs), and autonomous driving fields. It enables robots, drones, and self...

    For-Hire Vehicle Trip Records 2023 01

    For-Hire Vehicle (“FHV”) trip records include fields capturing the dispatching base license number and the pick-up date, time, and taxi zone location ID (shape file below). These records are ...

    装箱问题matlab代码-ssamp:演示具有一维有限差分稀疏性的压缩感知(CS)恢复

    submitted SEP 2015 (https://sites.google.com/site/jwkang10/) -EFLA : J. Liu, L. Yuan, and J. Ye. An efficient algorithm for a class of fused lasso problems, proc of ACM SIGKDD Conference on Knowledge ...

    elsarticle.zip

    It is now accepted for submitted articles, both in Elsevier's electronic submission system and elsewhere. Elsevier's previous document class for typeset articles, elsart.cls, is now over 10 years ...

    hubspot-form-submit

    中心表单提交 基于约定的功能,用于向Hubspot提交数据。...const submitted = await hubspotSubmit ( 'your-hubspot-id' , 'hubspot-form-id' , { email : 'user-email@gmail.com' , message : 'User information...'

    UE(官方下载)

    A question we often see is "I have a lot of blank lines in my file and I don't want to go through and manually delete them. Is there an easier way to do this?" The answer is: yes! Configure FTP Set up...

    P2P代码

    Or you can simply keep the one i have provided (Its bug free i promise) -Uses Downloader.ocx (submitted by sean gallardy, you can search for his name and find the source code for the downloader on ...

    sec_hdp_security_overview.pdf

    protection for its customers, Hortonworks uses a holistic approach based on five core security features: • Administration • Authentication and perimeter security • Authorization • Audit • Data ...

    Security with Intelligent Computing and Big-data Services-Springer(2018).pdf

    We also give our most thanks to all the authors of the submitted papers to make this conference successful in the good paper quality for presenta- tions. We are grateful to Springer for publishing ...

    Understanding and Improving Bloom Filter Configuration for Lazy Address-set Disambiguation (2011)-计算机科学

    Understanding and Improving Bloom Filter Configurationfor Lazy Address-Set DisambiguationbyMark C. JeffreyA thesis submitted in conformity with the requirements for the degree of Master of Applied ...

    P2P下载系统(VB) 源代码(英文)

    simply keep the one i have provided (Its bug free i promise) -Uses Downloader.ocx (submitted by sean gallardy, you can search for his name and find the source code for the downloader on ...

    更加完善的KaZaA Morpheus点对点P2P文件共享客户端源码

    -Uses Downloader.ocx (submitted by sean gallardy, you can search for his name and find the source code for the downloader on PlanetSourceCode.com) Extract both of these zip files to the same...

Global site tag (gtag.js) - Google Analytics