now we will dive into spark internal as per this simple example(wordcount,later articles will reference this one by default) below
sparkConf.setMaster("local[2]") //-local[*] by default //leib-confs:output all the dependencies logs sparkConf.set("spark.logLineage","true") val sc = new org.apache.spark.SparkContext(sparkConf) // (1) ShuffledRDD[4] at reduceByKey at ScalaWordCount.scala:44 [] // +-(1) MapPartitionsRDD[3] at map at ScalaWordCount.scala:42 [] // | MapPartitionsRDD[2] at flatMap at ScalaWordCount.scala:41 [] // | MapPartitionsRDD[1] at textFile at ScalaWordCount.scala:34 [] // | ../spark-1.4.1/examples/src/main/resources/CHANGES.txt HadoopRDD[0] at textFile at ScalaWordCount.scala:34 [] val rdd = sc.textFile(file) //MapPartitionsRDD[1]-->HadoopRDD[0] examples/src/....Changes.txt val fmrdd = rdd.flatMap(_.split(" ")) //-MapPartitionsRDD[2] val maprdd = fmrdd.map((_,1)) //-MapPartitionsRDD[3] //-this file is 584k val rstrdd = maprdd.reduceByKey((x,y) => x+y) //-ShuffledRDD[4]; reduceByKey() will not spawn a real computating event val arr = rstrdd.collect() //-action now,but collect() does val max = 10 //Integer.MAX_VALUE var count = 0 if(arr.length > max){ //-if want to limit the result size,use rdd.take(n) or rdd.top(n) is better println(s"*reduce output to limit ${max},found ${arr.length}") } //itereately without order for((ele,num) <- arr if(count < max)){ count += 1 println(ele + "," + num) }
overview of job submitting flow
communication figure of a job
sequence figure in map side computation
pseudo code view that corresponding to spark src
======
several kernel code paths
-job submitting method in DAGScheduler
/**-handle job submit event */ private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, //-case 1:spark internal collective func,ie it.toArray() partitions: Array[Int], //-from most recent rdd's partitions,see SparkContext#runJob()-L1979 allowLocal: Boolean, callSite: CallSite, listener: JobListener, properties: Properties) { var finalStage: ResultStage = null try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted.-->>parent ShuffleMapStage is created here<< finalStage = newResultStage(finalRDD, partitions.size, jobId, callSite) } catch { case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return } if (finalStage != null) { val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) //-one job per action clearCacheLocs() logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format( job.jobId, callSite.shortForm, partitions.length, allowLocal)) logInfo("Final stage: " + finalStage + "(" + finalStage.name + "),rdd " + finalRDD)//-ShuffleRDD for ScalaWordCount logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) val shouldRunLocally = localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1 val jobSubmissionTime = clock.getTimeMillis() if (shouldRunLocally) { // Compute very short actions like first() or take() with no parent stages locally. listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties)) runLocally(job) //-run with same action's process in local host; mini spark job runner:no job/task schedule } else { //-eg. reduceByKey() jobIdToActiveJob(jobId) = job activeJobs += job finalStage.resultOfJob = Some(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) //-one element in fact listenerBus.post( //-below is similar to runLocally() SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) submitStage(finalStage) //-ResultStage如何提交?see into;一个job的各个map操作如何完成?在RDD#compute()中有迭代iterator() } } submitWaitingStages() //-check whether any waiting stages to submit }
/** Submits stage, but first *recursively* submits any missing parents.-ie. from earlier to later by executed order */ private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logInfo("*submitStage(" + stage + ")") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { val missing = getMissingParentStages(stage).sortBy(_.id) //-from old steps to new; tasks belong to this stage logInfo("-*missing: " + missing) if (missing.isEmpty) { //-submit the root/first stages only logInfo("--*Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") submitMissingTasks(stage, jobId.get) //-this is first stage,so submit all tasks w/o dependencies } else { for (parent <- missing) { //-not leaf node,recursively calling self submitStage(parent) } //-resultstage is set here //-exists parent stage,so add thsi stage to wating queue.it will be scheduled after the prior stages finished, waitingStages += stage //-submit stage by stage,so keep child stage here;,see this.onReceive()>ComplementEvent } } } else { abortStage(stage, "No active job for stage " + stage.id) }
generates DAG then split stage to tasks,submits tasks at last below
/** Called when stage's parents are available and we can now do its task. */ private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug("submitMissingTasks(" + stage + ")") // Get our pending tasks and remember them in our pendingTasks entry stage.pendingTasks.clear() // First figure out the indexes of partition ids to compute.-empty or unfinished partitions val partitionsToCompute: Seq[Int] = { //-control how many tasks will be generated stage match { case stage: ShuffleMapStage => (0 until stage.numPartitions).filter(id => stage.outputLocs(id).isEmpty) case stage: ResultStage => val job = stage.resultOfJob.get (0 until job.numPartitions).filter(id => !job.finished(id)) } } val properties = jobIdToActiveJob.get(stage.jobId).map(_.properties).orNull //-mark current running stage runningStages += stage // SparkListenerStageSubmitted should be posted before testing whether tasks are // serializable. If tasks are not serializable, a SparkListenerStageCompleted event // will be posted, which should always come after a corresponding SparkListenerStageSubmitted // event. stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size)) outputCommitCoordinator.stageStart(stage.id) listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times. // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast // the serialized copy of the RDD and for each task we will deserialize it, which means each // task gets a different copy of the RDD. This provides stronger isolation between tasks that // might modify state of objects referenced in their closures. This is necessary in Hadoop // where the JobConf/Configuration object is not thread-safe. var taskBinary: Broadcast[Array[Byte]] = null try { // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep). // For ResultTask, serialize and broadcast (rdd, func). val taskBinaryBytes: Array[Byte] = stage match { case stage: ShuffleMapStage => closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array() //-use stage's rdd as task's rdd case stage: ResultStage => closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func): AnyRef).array() } taskBinary = sc.broadcast(taskBinaryBytes) } catch { // In the case of a failure during serialization, abort the stage. case e: NotSerializableException => abortStage(stage, "Task not serializable: " + e.toString) runningStages -= stage // Abort execution return case NonFatal(e) => abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}") runningStages -= stage return } //-note:tasks generation policy val tasks: Seq[Task[_]] = try { stage match { case stage: ShuffleMapStage => //-intermediate stage,see below partitionsToCompute.map { id => //-partitioned by dep-rdd's partitions( maybe deliver to root rdd's partitions) val locs = getPreferredLocs(stage.rdd, id)//-several locations belong to the partition,similar to hdfs blocks val part = stage.rdd.partitions(id) //-access the indexed id partition new ShuffleMapTask(stage.id, taskBinary, part, locs) //-corresponding task } case stage: ResultStage => //-final stage val job = stage.resultOfJob.get partitionsToCompute.map { id => val p: Int = job.partitions(id) //-a map from job partition to stage.rdd's one val part = stage.rdd.partitions(p) val locs = getPreferredLocs(stage.rdd, p) //- the replication hosts of same block?yes //-track info for(par <- stage.rdd.partitions) logInfo("-part/rdd:" + par + "/" + stage.rdd) for(loc <- locs) logInfo("-loc %s".format(loc)) //-# resulttask is depended on partitions num new ResultTask(stage.id, taskBinary, part, locs, id) //-corresponding task } } } catch { case NonFatal(e) => abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}") runningStages -= stage return } if (tasks.size > 0) { //-case reduceByKey(), 1 for it logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") stage.pendingTasks ++= tasks logDebug("New pending tasks: " + stage.pendingTasks) taskScheduler.submitTasks( //-deliver tasks schedule to TaskSchedulerImpl new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties)) stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) } else { // Because we posted SparkListenerStageSubmitted earlier, we should mark // the stage as completed here in case there are no tasks to run markStageAsFinished(stage, None) val debugString = stage match { case stage: ShuffleMapStage => s"Stage ${stage} is actually done; " + s"(available: ${stage.isAvailable}," + s"available outputs: ${stage.numAvailableOutputs}," + s"partitions: ${stage.numPartitions})" case stage : ResultStage => s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})" } logDebug(debugString) }
-ShuffleMapTask core method
override def runTask(context: TaskContext): MapStatus = { // Deserialize the RDD using the broadcast variable. val deserializeStartTime = System.currentTimeMillis() val ser = SparkEnv.get.closureSerializer.newInstance() //-task serialization see DAGScheduler#submitMissingTasks(); the rdd is last rdd in this stage(shufflemapstage) val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( //-restore the direct-recent parent rdd ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime metrics = Some(context.taskMetrics) var writer: ShuffleWriter[Any, Any] = null try { val manager = SparkEnv.get.shuffleManager //-SortShuffleManager by default //'SortShuffleWriter' writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) //-'BaseShuffleHandler' //-first,compute the user's lastest job business(eg. reduceByKey()) by rdd.iterator(xx);then output the result logInfo("precomputing,task:" + toString + ",dep:" + dep + ",handle:" + dep.shuffleHandle +",part:" +partitionId + ",rdd " + rdd) writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])//-invoke recusively return writer.stop(success = true).get //-release resources then return MapStatus,see SortShuffleWriter#write() } catch { case e: Exception => try { if (writer != null) { writer.stop(success = false) } } catch { case e: Exception => log.debug("Could not stop writer", e) } throw e } }
相关推荐
Nginx上传模块该模块基于Nginx上传模块(v 2.2.0) 。 ...由于似乎作者没有维护该... } # Upload form should be submitted to this location location /upload { # Pass altered request body to this location uploa
We are delighted to introduce the proceedings of the second edition of the 2017 European Alliance for Innovation (EAI) International Conference on Machine Learning and Intelligent Communications ...
A project model for the FreeBSD Project Niklas Saers Copyright © 2002-2005 Niklas Saers [ Split HTML / Single HTML ] Table of Contents Foreword 1 Overview 2 Definitions 2.1. Activity 2.2. Process ...
18/05/25 19:51:35 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1527248744555_0001 18/05/25 19:51:36 INFO impl.YarnClientImpl: Submitted application application_1527248744555_0001 18/05/...
Enhancement Proposal (PEP) may be submitted if your idea gains ground. All current PEPs, as well as guidelines for submitting a new PEP, are listed at http://www.python.org/dev/peps/. Release ...
with correctly recognized figure titles, and 70.98% for part label detection and character recognition. Data and software from the competition are available through the online UCI Machine Learning ...
I can't list all contributors here but I want to specially thanks two specially active contributors: - Arno Garrels - Angus Robertson Latest versions: --------------- The latest versions of ICS...
Simultaneous Localization and Mapping (SLAM) technology is a core research topic in mobile robotics, unmanned aerial vehicles (UAVs), and autonomous driving fields. It enables robots, drones, and self...
For-Hire Vehicle (“FHV”) trip records include fields capturing the dispatching base license number and the pick-up date, time, and taxi zone location ID (shape file below). These records are ...
submitted SEP 2015 (https://sites.google.com/site/jwkang10/) -EFLA : J. Liu, L. Yuan, and J. Ye. An efficient algorithm for a class of fused lasso problems, proc of ACM SIGKDD Conference on Knowledge ...
It is now accepted for submitted articles, both in Elsevier's electronic submission system and elsewhere. Elsevier's previous document class for typeset articles, elsart.cls, is now over 10 years ...
中心表单提交 基于约定的功能,用于向Hubspot提交数据。...const submitted = await hubspotSubmit ( 'your-hubspot-id' , 'hubspot-form-id' , { email : 'user-email@gmail.com' , message : 'User information...'
A question we often see is "I have a lot of blank lines in my file and I don't want to go through and manually delete them. Is there an easier way to do this?" The answer is: yes! Configure FTP Set up...
Or you can simply keep the one i have provided (Its bug free i promise) -Uses Downloader.ocx (submitted by sean gallardy, you can search for his name and find the source code for the downloader on ...
protection for its customers, Hortonworks uses a holistic approach based on five core security features: • Administration • Authentication and perimeter security • Authorization • Audit • Data ...
We also give our most thanks to all the authors of the submitted papers to make this conference successful in the good paper quality for presenta- tions. We are grateful to Springer for publishing ...
Understanding and Improving Bloom Filter Configurationfor Lazy Address-Set DisambiguationbyMark C. JeffreyA thesis submitted in conformity with the requirements for the degree of Master of Applied ...
simply keep the one i have provided (Its bug free i promise) -Uses Downloader.ocx (submitted by sean gallardy, you can search for his name and find the source code for the downloader on ...
-Uses Downloader.ocx (submitted by sean gallardy, you can search for his name and find the source code for the downloader on PlanetSourceCode.com) Extract both of these zip files to the same...