- 浏览: 74034 次
之前看了Spark Streaming和Spark SQL, 自己还花了一些时间去玩了些machine learning的算法, 像 线性回归, kmeans, 协同过滤等。
现在回过头来, 打算看一下spark core部分代码, 就先找了下saveAsTextFile这个方法作为入口, 看一下是怎么保存文档到hadoop中,并且怎么切分stage以及提交Task。 中间也会触碰到DAGScheduler, 也能明白为什么大家都说DAGScheduler是作业调度的核心了
看一下saveAsTextFile代码:
这里很简单, 就是定义了一些输出类, 给每个partition设置了一下, 然后通过执行saveAsHadoopFile 来创建hadoop的txt文件, 看一下saveAsHadoopFile:
直接调用了saveAsHadoopFile, 那么我们继续跟进去:
这个里面其实主要就是设置了hadoopconf的属性,然后设置到了FileOutputFormat里面, 再通过saveAsHadoopDataset继续执行saveAsTextFile:
这里主要做了几件事:
1.创建了一个writer: val writer = new SparkHadoopWriter(hadoopConf)
2. 创建了writeToFile这个function, 这个function会被作为一个Job提交: self.context.runJob(self, writeToFile)
writeToFile其实就是从Job中获取数据写到对应的partition里面去
主要还是要看runJob里面做了什么:
继续:
再继续, 这里有一个回调函数(index, res) => results(index) = res, 就是把计算的result存到results里面:
注意这里的参数func一直就是在最开始定义的 将数据写到partition里面的writeToFile。
看到这里有调用到dagScheduler, 在初始化SparkContext之前, dagScheduler已经被构造了: (回头会写一下SparkContext的初始化)
private[spark] def dagScheduler: DAGScheduler = _dagScheduler
_dagScheduler = new DAGScheduler(this)
我们看一下dagScheduler里面的runJob干了什么:
里面调用了submitJob, 所以我们说Job是通过DAGScheduler去提交的, 可以看到Job提交后会有waiter一直awaitResult(), 将结果打印到日志里面, Job结束的时候writeToFile也执行完成了, txt文件也存到hadoop里面了。
那么接下来看一下DAGScheduler怎么提交Job的, 进入submitJob:
里面先确定patition的数量是正常范围内, 然后创建JobId, 如果partions是0 代表最终没有task, 所以直接返回JobWaiter, 如果定partition大于0, 则创建JobWaiter用来返回去执行 awaitResult, 然后通过eventProcessLoop 把JobSubmitted的event加入进去, 那么 eventProcessLoop 是什么呢:
看到eventProcessLoop 其实是DAGSchedulerEventProcessLoop,(继承自EventLoop) 那么问题来了, 放进去的event是怎么被调用的呢, 那么我们要回到DAGScheduler的构造过程中, 看到创建DAGScheduler里面执行了eventProcessLoop.start()
这个start直接调用的是EventLoop的start方法:
在DAGSchedulerEventProcessLoop没有定义onStart方法, 所以其实有用的是eventThread.start()方法, 这个方法如下:
他就是一个thread, 然后start方法回去跑run里面的东西, 所以调用了onReceive(event)方法, 如下: (DAGSchedulerEventProcessLoop的onReceive)
继续调用doOnReceive(event) :
好啦, 看到了case JobSubmitted。。 这个就是我们event加进去后, 执行的时候就会走到这个case里面, 去执行
dagScheduler.handleJobSubmitted
那么在handleJobSubmitted里面做了什么呢:
先拿到
finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
在newResultStage里面的代码:
他会通过getParentStagesAndId拿到parents和stage ID然后根据这两个参数创建一个resultStage返回, 那么getParentStagesAndId里面是怎么做的呢:
stageID是从一个increment里面创建的, parentStages是从getParentStages方法里面拿的:
这里可以看到实际上spark是根据rdd的dependence, 如果是ShuffleDependency那么就分割出来, 如果不是那么放到waitingForVisit的列表中继续查找他的父rdd, 直到循环结束, 或者父rdd的dependence是ShuffleDependency为止, 然后getShuffleMapStage返回到parents里面再返回到前面调用的方法. 所以我们可以看到stage的划分其实是根据rdd的dependence是不是ShuffleDependency来分的。
接下来看一下getShuffleMapStage里面做了什么:
其实就是创建一个shuffleStage及返回。 顺便再通过getAncestorShuffleDependencies 把所有和当前stage相关联的ShuffleDependency全部加到shuffleToMapStage, 以备后用。 好了现在知道了之前创建的那个resultStage其实是根据一堆ShuffleDependency的stage创建出来的, 那么我们回到handleJobSubmitted方法里面, 在拿到了finalStage (一个resultStage)后会根据其创建一个ActiveJob:
再通过finalStage.setActiveJob(job) 和finalStage关联起来, 最后通过submitStage(finalStage)提交。
submitStage里面:
这里面其实做的就是先去查祖先stage是不是都active了, 如果不是active的话就放到missing里面, 先提交所有的inactive的stage,并且把当前stage放入waitingStages里面 把所有当前stage的祖先stage都submit后才有可能submit当前的stage。 所以stage都是有关联顺序的, 只有所有祖先stage都提交了, 才会去执行当前stage。 执行当前stage的时候其实是调用submitMissingTasks这个方法是根据stage提交task, 后面有机会说一下。 waitingStages 会通过submitWaitingStages方法去执行:
可以看到里面其实也是调用submitStage去对所有的waitingstage做处理, 最后以task提交。 当task提交后我们的writeToFile就会被执行, 数据就会写到指定的hadoop路径中, 整个过程大概就是这个样子, 哪里不对的麻烦指正一下
现在回过头来, 打算看一下spark core部分代码, 就先找了下saveAsTextFile这个方法作为入口, 看一下是怎么保存文档到hadoop中,并且怎么切分stage以及提交Task。 中间也会触碰到DAGScheduler, 也能明白为什么大家都说DAGScheduler是作业调度的核心了
看一下saveAsTextFile代码:
def saveAsTextFile(path: String): Unit = withScope { // https://issues.apache.org/jira/browse/SPARK-2075 // // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit // Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]` // in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an // Ordering for `NullWritable`. That's why the compiler will generate different anonymous // classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+. // // Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate // same bytecodes for `saveAsTextFile`. val nullWritableClassTag = implicitly[ClassTag[NullWritable]] val textClassTag = implicitly[ClassTag[Text]] val r = this.mapPartitions { iter => val text = new Text() iter.map { x => text.set(x.toString) (NullWritable.get(), text) } } RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null) .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path) }
这里很简单, 就是定义了一些输出类, 给每个partition设置了一下, 然后通过执行saveAsHadoopFile 来创建hadoop的txt文件, 看一下saveAsHadoopFile:
def saveAsHadoopFile[F <: OutputFormat[K, V]]( path: String)(implicit fm: ClassTag[F]): Unit = self.withScope { saveAsHadoopFile(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]]) }
直接调用了saveAsHadoopFile, 那么我们继续跟进去:
def saveAsHadoopFile( path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = new JobConf(self.context.hadoopConfiguration), codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope { // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). val hadoopConf = conf hadoopConf.setOutputKeyClass(keyClass) hadoopConf.setOutputValueClass(valueClass) conf.setOutputFormat(outputFormatClass) for (c <- codec) { hadoopConf.setCompressMapOutput(true) hadoopConf.set("mapred.output.compress", "true") hadoopConf.setMapOutputCompressorClass(c) hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName) hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString) } // Use configured output committer if already set if (conf.getOutputCommitter == null) { hadoopConf.setOutputCommitter(classOf[FileOutputCommitter]) } // When speculation is on and output committer class name contains "Direct", we should warn // users that they may loss data if they are using a direct output committer. val speculationEnabled = self.conf.getBoolean("spark.speculation", false) val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "") if (speculationEnabled && outputCommitterClass.contains("Direct")) { val warningMessage = s"$outputCommitterClass may be an output committer that writes data directly to " + "the final location. Because speculation is enabled, this output committer may " + "cause data loss (see the case in SPARK-10063). If possible, please use a output " + "committer that does not have this behavior (e.g. FileOutputCommitter)." logWarning(warningMessage) } FileOutputFormat.setOutputPath(hadoopConf, SparkHadoopWriter.createPathFromString(path, hadoopConf)) saveAsHadoopDataset(hadoopConf) }
这个里面其实主要就是设置了hadoopconf的属性,然后设置到了FileOutputFormat里面, 再通过saveAsHadoopDataset继续执行saveAsTextFile:
def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope { // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). val hadoopConf = conf val outputFormatInstance = hadoopConf.getOutputFormat val keyClass = hadoopConf.getOutputKeyClass val valueClass = hadoopConf.getOutputValueClass if (outputFormatInstance == null) { throw new SparkException("Output format class not set") } if (keyClass == null) { throw new SparkException("Output key class not set") } if (valueClass == null) { throw new SparkException("Output value class not set") } SparkHadoopUtil.get.addCredentials(hadoopConf) logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " + valueClass.getSimpleName + ")") if (isOutputSpecValidationEnabled) { // FileOutputFormat ignores the filesystem parameter val ignoredFs = FileSystem.get(hadoopConf) hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf) } val writer = new SparkHadoopWriter(hadoopConf) writer.preSetup() val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => { // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it // around by taking a mod. We expect that no task will be attempted 2 billion times. val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context) writer.setup(context.stageId, context.partitionId, taskAttemptId) writer.open() var recordsWritten = 0L Utils.tryWithSafeFinallyAndFailureCallbacks { while (iter.hasNext) { val record = iter.next() writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) // Update bytes written metric every few records maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten) recordsWritten += 1 } }(finallyBlock = writer.close()) writer.commit() bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) } outputMetrics.setRecordsWritten(recordsWritten) } self.context.runJob(self, writeToFile) writer.commitJob() }
这里主要做了几件事:
1.创建了一个writer: val writer = new SparkHadoopWriter(hadoopConf)
2. 创建了writeToFile这个function, 这个function会被作为一个Job提交: self.context.runJob(self, writeToFile)
writeToFile其实就是从Job中获取数据写到对应的partition里面去
主要还是要看runJob里面做了什么:
def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = { runJob(rdd, func, 0 until rdd.partitions.length) }
继续:
def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int]): Array[U] = { val results = new Array[U](partitions.size) runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res) results }
再继续, 这里有一个回调函数(index, res) => results(index) = res, 就是把计算的result存到results里面:
def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = { if (stopped.get()) { throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) } dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) rdd.doCheckpoint() }
注意这里的参数func一直就是在最开始定义的 将数据写到partition里面的writeToFile。
看到这里有调用到dagScheduler, 在初始化SparkContext之前, dagScheduler已经被构造了: (回头会写一下SparkContext的初始化)
private[spark] def dagScheduler: DAGScheduler = _dagScheduler
_dagScheduler = new DAGScheduler(this)
我们看一下dagScheduler里面的runJob干了什么:
def runJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): Unit = { val start = System.nanoTime val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) waiter.awaitResult() match { case JobSucceeded => logInfo("Job %d finished: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) case JobFailed(exception: Exception) => logInfo("Job %d failed: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler. val callerStackTrace = Thread.currentThread().getStackTrace.tail exception.setStackTrace(exception.getStackTrace ++ callerStackTrace) throw exception } }
里面调用了submitJob, 所以我们说Job是通过DAGScheduler去提交的, 可以看到Job提交后会有waiter一直awaitResult(), 将结果打印到日志里面, Job结束的时候writeToFile也执行完成了, txt文件也存到hadoop里面了。
那么接下来看一下DAGScheduler怎么提交Job的, 进入submitJob:
def submitJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): JobWaiter[U] = { // Check to make sure we are not launching a task on a partition that does not exist. val maxPartitions = rdd.partitions.length partitions.find(p => p >= maxPartitions || p < 0).foreach { p => throw new IllegalArgumentException( "Attempting to access a non-existent partition: " + p + ". " + "Total number of partitions: " + maxPartitions) } val jobId = nextJobId.getAndIncrement() if (partitions.size == 0) { // Return immediately if the job is running 0 tasks return new JobWaiter[U](this, jobId, 0, resultHandler) } assert(partitions.size > 0) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties))) waiter }
里面先确定patition的数量是正常范围内, 然后创建JobId, 如果partions是0 代表最终没有task, 所以直接返回JobWaiter, 如果定partition大于0, 则创建JobWaiter用来返回去执行 awaitResult, 然后通过eventProcessLoop 把JobSubmitted的event加入进去, 那么 eventProcessLoop 是什么呢:
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
看到eventProcessLoop 其实是DAGSchedulerEventProcessLoop,(继承自EventLoop) 那么问题来了, 放进去的event是怎么被调用的呢, 那么我们要回到DAGScheduler的构造过程中, 看到创建DAGScheduler里面执行了eventProcessLoop.start()
这个start直接调用的是EventLoop的start方法:
def start(): Unit = { if (stopped.get) { throw new IllegalStateException(name + " has already been stopped") } // Call onStart before starting the event thread to make sure it happens before onReceive onStart() eventThread.start() }
在DAGSchedulerEventProcessLoop没有定义onStart方法, 所以其实有用的是eventThread.start()方法, 这个方法如下:
private val eventThread = new Thread(name) { setDaemon(true) override def run(): Unit = { try { while (!stopped.get) { val event = eventQueue.take() try { onReceive(event) } catch { case NonFatal(e) => { try { onError(e) } catch { case NonFatal(e) => logError("Unexpected error in " + name, e) } } } } } catch { case ie: InterruptedException => // exit even if eventQueue is not empty case NonFatal(e) => logError("Unexpected error in " + name, e) } } }
他就是一个thread, 然后start方法回去跑run里面的东西, 所以调用了onReceive(event)方法, 如下: (DAGSchedulerEventProcessLoop的onReceive)
override def onReceive(event: DAGSchedulerEvent): Unit = { val timerContext = timer.time() try { doOnReceive(event) } finally { timerContext.stop() } }
继续调用doOnReceive(event) :
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match { case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) case MapStageSubmitted(jobId, dependency, callSite, listener, properties) => dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties) case StageCancelled(stageId) => dagScheduler.handleStageCancellation(stageId) case JobCancelled(jobId) => dagScheduler.handleJobCancellation(jobId) case JobGroupCancelled(groupId) => dagScheduler.handleJobGroupCancelled(groupId) case AllJobsCancelled => dagScheduler.doCancelAllJobs() case ExecutorAdded(execId, host) => dagScheduler.handleExecutorAdded(execId, host) case ExecutorLost(execId) => dagScheduler.handleExecutorLost(execId, fetchFailed = false) case BeginEvent(task, taskInfo) => dagScheduler.handleBeginEvent(task, taskInfo) case GettingResultEvent(taskInfo) => dagScheduler.handleGetTaskResult(taskInfo) case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) => dagScheduler.handleTaskCompletion(completion) case TaskSetFailed(taskSet, reason, exception) => dagScheduler.handleTaskSetFailed(taskSet, reason, exception) case ResubmitFailedStages => dagScheduler.resubmitFailedStages() }
好啦, 看到了case JobSubmitted。。 这个就是我们event加进去后, 执行的时候就会走到这个case里面, 去执行
dagScheduler.handleJobSubmitted
那么在handleJobSubmitted里面做了什么呢:
private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) { var finalStage: ResultStage = null try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted. finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite) } catch { case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return } val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) clearCacheLocs() logInfo("Got job %s (%s) with %d output partitions".format( job.jobId, callSite.shortForm, partitions.length)) logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) val jobSubmissionTime = clock.getTimeMillis() jobIdToActiveJob(jobId) = job activeJobs += job finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) submitStage(finalStage) submitWaitingStages() }
先拿到
finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
在newResultStage里面的代码:
private def newResultStage( rdd: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = { val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId) val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite) stageIdToStage(id) = stage updateJobIdStageIdMaps(jobId, stage) stage }
他会通过getParentStagesAndId拿到parents和stage ID然后根据这两个参数创建一个resultStage返回, 那么getParentStagesAndId里面是怎么做的呢:
private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = { val parentStages = getParentStages(rdd, firstJobId) val id = nextStageId.getAndIncrement() (parentStages, id) }
stageID是从一个increment里面创建的, parentStages是从getParentStages方法里面拿的:
private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = { val parents = new HashSet[Stage] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting val waitingForVisit = new Stack[RDD[_]] def visit(r: RDD[_]) { if (!visited(r)) { visited += r // Kind of ugly: need to register RDDs with the cache here since // we can't do it in its constructor because # of partitions is unknown for (dep <- r.dependencies) { dep match { case shufDep: ShuffleDependency[_, _, _] => parents += getShuffleMapStage(shufDep, firstJobId) case _ => waitingForVisit.push(dep.rdd) } } } } waitingForVisit.push(rdd) while (waitingForVisit.nonEmpty) { visit(waitingForVisit.pop()) } parents.toList }
这里可以看到实际上spark是根据rdd的dependence, 如果是ShuffleDependency那么就分割出来, 如果不是那么放到waitingForVisit的列表中继续查找他的父rdd, 直到循环结束, 或者父rdd的dependence是ShuffleDependency为止, 然后getShuffleMapStage返回到parents里面再返回到前面调用的方法. 所以我们可以看到stage的划分其实是根据rdd的dependence是不是ShuffleDependency来分的。
接下来看一下getShuffleMapStage里面做了什么:
private def getShuffleMapStage( shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int): ShuffleMapStage = { shuffleToMapStage.get(shuffleDep.shuffleId) match { case Some(stage) => stage case None => // We are going to register ancestor shuffle dependencies getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep => shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId) } // Then register current shuffleDep val stage = newOrUsedShuffleStage(shuffleDep, firstJobId) shuffleToMapStage(shuffleDep.shuffleId) = stage stage } }
其实就是创建一个shuffleStage及返回。 顺便再通过getAncestorShuffleDependencies 把所有和当前stage相关联的ShuffleDependency全部加到shuffleToMapStage, 以备后用。 好了现在知道了之前创建的那个resultStage其实是根据一堆ShuffleDependency的stage创建出来的, 那么我们回到handleJobSubmitted方法里面, 在拿到了finalStage (一个resultStage)后会根据其创建一个ActiveJob:
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
再通过finalStage.setActiveJob(job) 和finalStage关联起来, 最后通过submitStage(finalStage)提交。
submitStage里面:
private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) if (missing.isEmpty) { logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") submitMissingTasks(stage, jobId.get) } else { for (parent <- missing) { submitStage(parent) } waitingStages += stage } } } else { abortStage(stage, "No active job for stage " + stage.id, None) } }
这里面其实做的就是先去查祖先stage是不是都active了, 如果不是active的话就放到missing里面, 先提交所有的inactive的stage,并且把当前stage放入waitingStages里面 把所有当前stage的祖先stage都submit后才有可能submit当前的stage。 所以stage都是有关联顺序的, 只有所有祖先stage都提交了, 才会去执行当前stage。 执行当前stage的时候其实是调用submitMissingTasks这个方法是根据stage提交task, 后面有机会说一下。 waitingStages 会通过submitWaitingStages方法去执行:
private def submitWaitingStages() { // TODO: We might want to run this less often, when we are sure that something has become // runnable that wasn't before. logTrace("Checking for newly runnable parent stages") logTrace("running: " + runningStages) logTrace("waiting: " + waitingStages) logTrace("failed: " + failedStages) val waitingStagesCopy = waitingStages.toArray waitingStages.clear() for (stage <- waitingStagesCopy.sortBy(_.firstJobId)) { submitStage(stage) } }
可以看到里面其实也是调用submitStage去对所有的waitingstage做处理, 最后以task提交。 当task提交后我们的writeToFile就会被执行, 数据就会写到指定的hadoop路径中, 整个过程大概就是这个样子, 哪里不对的麻烦指正一下
发表评论
-
kafka + flume + hdfs + zookeeper + spark 测试环境搭建
2017-07-20 11:28 1108最近由于项目需要, 搭建了一个类似线上环境的处理流数据的环境 ... -
源码跟踪executor如何写数据到blockmanager, 以及如何从blockmanager读数据
2016-08-10 19:41 1426之前看了Job怎么submit 以 ... -
Spark中Blockmanager相关代码解析
2016-08-04 19:47 1848前一段时间看了如何划分stage以及如何提交Job, 最后把结 ... -
Spark在submitStage后如何通过clustermanager调度执行task到Driver接收计算结果的代码解析
2016-08-01 14:08 1481前文: http://humingminghz.iteye.c ... -
SparkSQL DF.agg 执行过程解析
2016-07-19 10:21 4123在上一篇文章前, 我一直没看懂为什么下面的代码就能得到max或 ... -
SparkSQL SQL语句解析过程源代码浅析
2016-07-15 19:34 6648前两天一直在忙本职工 ... -
SparkSQL SQL语句解析过程浅析
2016-07-15 19:06 0前两天一直在忙本职工 ... -
SparkStreaming从启动Receiver到收取数据生成RDD的代码浅析
2016-07-08 17:54 2237前面一片文章介绍了SocketTextStream 是如何从b ... -
Sparkstreaming是如何获取数据组成Dstream的源码浅析
2016-07-08 11:23 1478前面一篇文章介绍了SparkStreaming是如何不停的循环 ... -
SparkSQL 使用SQLContext读取csv文件 分析数据 (含部分数据)
2016-07-06 11:24 10160前两天开始研究SparkSQL, 其主要分为HiveConte ... -
SparkStreaming是如何完成不停的循环处理的代码浅析
2016-07-02 12:26 4660一直很好奇Sparkstreaming的ssc.start是怎 ... -
SparkStreaming 对Window的reduce的方法解析
2016-06-30 11:57 4733在sparkstreaming中对窗口 ... -
Sparkstreaming reduceByKeyAndWindow(_+_, _-_, Duration, Duration) 的源码/原理解析
2016-06-29 19:50 8796最近在玩spark streaming, 感觉到了他的强大。 ... -
关于Eclipse开发环境下 Spark+Kafka 获取topic的时候连接出错
2016-06-28 17:20 7411林林总总玩了Spark快一个月了, 打算试一下kafka的消息 ...
相关推荐
Spark是Apache软件基金会下的一个开源大数据处理框架,它以其高效、灵活和易用性而闻名...通过上述分析,你可以深入了解Spark的工作原理,这将有助于你在实际项目中更好地优化和调试Spark应用,提升大数据处理的效率。
Spark提供了Web UI来监控应用程序的运行状态,包括Job、Stage、Task等详细信息。同时,日志管理也非常重要,可以使用Spark的log4j配置来定制日志输出。 通过以上步骤,你可以理解并掌握Spark的基本安装、配置和使用...
在 Spark 中,Job、Stage 和 Task 是执行计算的基本单元。Job 是由一个或多个动作触发的计算序列,Stage 是 Job 中的一个分阶段,Task 则是在 Stage 内执行的具体工作单元。Spark 通过 DAG(有向无环图)来表示 Job ...
- **Job、Stage、Task**:Job是用户提交的完整操作,被分解为多个Stage(基于Shuffle划分),每个Stage又包含多个并行执行的Task。 2. **Spark编程模型** - **Spark API**:提供了Scala、Java、Python和R等多种...
在不同集群中的运行演示部分,通常会展示如何在Standalone和YARN模式下启动Spark-Shell,如何提交Spark应用程序,并通过具体案例来分析运行结果。同时,在问题解决部分,会针对可能遇到的问题,如YARN-CLIENT启动...
Java实现可能包括了Job、Stage和Task的定义,以及DAGScheduler和TaskScheduler接口的实现,以确保任务的正确调度和执行。 - **内存管理**:Spark通过内存管理和缓存策略优化性能。Java实现中,BlockManager和...
在任务调度方面,Spark采用了阶段(Stage)和任务(Task)的概念,其中作业(Job)由一个或多个RDD组成,而作业会划分为多个阶段,每个阶段由一组任务组成。Spark的任务调度主要通过将作业拆分成具有依赖关系的多个...
在这个项目中,我们可以看到各种Spark示例的Java源代码。 7. **Spark编程模型**: Spark的编程模型强调交互式计算和批处理,通过SparkContext与集群建立连接,然后创建RDD或DataFrame,执行各种转换和动作操作。 ...
02_尚硅谷大数据技术之SparkCore.docx详细讲解了Spark Core的基本概念,包括RDD(弹性分布式数据集)、作业(Job)、任务(Task)和Stage的生命周期,以及如何使用Spark Shell进行交互式编程。 二、Spark SQL与数据...
- Spark作业(Job)是由用户代码触发的一系列任务(Task)的集合。 - 作业被分解成Stage,每个Stage是一组任务,共享同一份数据分区。 - Stage之间可能存在依赖,形成DAG(有向无环图)。 8. **容错机制与弹性**...
这份压缩包包含的PDF文档,很可能是对Spark的核心组件、设计模式以及源代码进行了详尽的剖析。 Spark的核心思想主要包括以下几个方面: 1. **弹性分布式数据集(RDD)**:RDD是Spark的基础数据抽象,它是一个只读...
- **Spark的Job、Stage和Task是什么?** Job由一个或多个Action操作触发,Stage是DAG(有向无环图)划分的任务阶段,Task是运行在Worker节点上的最小执行单元。 3. **Spark SQL** - **Spark SQL如何与传统SQL区别...
同时,Spark能访问多种数据源,增强了其灵活性和适用性。 Spark的生态系统,即BDAS(Berkeley Data Analytics Stack),包括了多个关键组件: - **Spark Core**:提供了基础的内存计算框架,是其他所有组件的基础...
每个Spark作业(Job)由一系列阶段(Stage)组成,这些阶段由任务(Task)构成,任务会在集群中的工作节点(Executor)上并行执行。理解DAG和Stage的概念有助于优化Spark作业,避免不必要的数据shuffle和提高性能。 ...
Job 是用户提交的作业,可能包含一个或多个Task,这些Task会被划分为Stage,并按照Stage进行并行执行。 总的来说,Spark 是一个功能强大的大数据处理平台,它通过内存计算、灵活的API、全面的技术栈和多样的运行...
Spark的工作流程包括Job、Stage和Task三个层次。Job被分解为多个Stage,每个Stage进一步划分为Task,这些Task在Executor之间并行执行。 四、Spark组件解析 1. Spark Core:Spark的基础组件,提供了RDD和基本的调度...
Sparklens的源代码位于`sparklens-master`目录下,包含了项目的核心组件和示例。如果你是Scala或Java开发者,可以直接编译源码并根据需求进行定制。同时,Qubole提供了详细的文档和示例来指导用户如何使用Sparklens...
Job、Stage和Task是Spark作业的执行单元,Job被分解为多个Stage,每个Stage由一系列Task组成,Task在Worker Nodes上并行运行。 Python是Spark支持的多种编程语言之一,通过PySpark库,Python开发者可以轻松地利用...
8. **Spark Job和Stage**:每个Spark应用由一个或多个job组成,job被划分为多个stage,stage之间存在依赖关系。理解这种执行模型有助于优化代码性能。 9. **Caching and Persistence**:Spark支持将数据缓存到内存...