1. ShuffleMapTask的shuffle数据在什么地方记录到MapOutputTracker中的
ShuffleMapTask的runTask方法负责写数据到shuffle map文件中。当任务执行完成成功,DAGScheduler会收到通知,在DAGScheduler的handleTaskCompletion方法中完成记录到MapOutputTracker中
event.reason match { case Success => listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType, event.reason, event.taskInfo, event.taskMetrics)) stage.pendingTasks -= task task match { case rt: ResultTask[_, _] => stage.resultOfJob match { case Some(job) => if (!job.finished(rt.outputId)) { updateAccumulators(event) job.finished(rt.outputId) = true job.numFinished += 1 // If the whole job has finished, remove it if (job.numFinished == job.numPartitions) { markStageAsFinished(stage) cleanupStateForJobAndIndependentStages(job) listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded)) } // taskSucceeded runs some user code that might throw an exception. Make sure // we are resilient against that. try { job.listener.taskSucceeded(rt.outputId, event.result) } catch { case e: Exception => // TODO: Perhaps we want to mark the stage as failed? job.listener.jobFailed(new SparkDriverExecutionException(e)) } } case None => logInfo("Ignoring result from " + rt + " because its job has finished") } case smt: ShuffleMapTask => updateAccumulators(event) ///从通知事件中获得MapStatus对西那个 val status = event.result.asInstanceOf[MapStatus] ////ExecutorId val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId) } else { stage.addOutputLoc(smt.partitionId, status) } if (runningStages.contains(stage) && stage.pendingTasks.isEmpty) { markStageAsFinished(stage) logInfo("looking for newly runnable stages") logInfo("running: " + runningStages) logInfo("waiting: " + waitingStages) logInfo("failed: " + failedStages) if (stage.shuffleDep.isDefined) { // We supply true to increment the epoch number here in case this is a // recomputation of the map outputs. In that case, some nodes may have cached // locations with holes (from when we detected the error) and will need the // epoch incremented to refetch them. // TODO: Only increment the epoch number if this is not the first time // we registered these map outputs. ///在此处将MapOutput注册到mapOutputTracker中 mapOutputTracker.registerMapOutputs( stage.shuffleDep.get.shuffleId, stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray, changeEpoch = true) } clearCacheLocs() if (stage.outputLocs.exists(_ == Nil)) { // Some tasks had failed; let's resubmit this stage // TODO: Lower-level scheduler should also deal with this logInfo("Resubmitting " + stage + " (" + stage.name + ") because some of its tasks had failed: " + stage.outputLocs.zipWithIndex.filter(_._1 == Nil).map(_._2).mkString(", ")) submitStage(stage) } else { val newlyRunnable = new ArrayBuffer[Stage] for (stage <- waitingStages) { logInfo("Missing parents for " + stage + ": " + getMissingParentStages(stage)) } for (stage <- waitingStages if getMissingParentStages(stage) == Nil) { newlyRunnable += stage } waitingStages --= newlyRunnable runningStages ++= newlyRunnable for { stage <- newlyRunnable.sortBy(_.id) jobId <- activeJobForStage(stage) } { logInfo("Submitting " + stage + " (" + stage.rdd + "), which is now runnable") submitMissingTasks(stage, jobId) } } } }
2. ShuffleMapTask在写shuffle map数据时(调用SortShuffleWriter.write方法),首先写内存,当内存不够使用时,将spill到磁盘;
override def write(records: Iterator[_ <: Product2[K, V]]): Unit = { if (dep.mapSideCombine) { require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") sorter = new ExternalSorter[K, V, C]( dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) sorter.insertAll(records) ///Spill到磁盘 } else { // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't // care whether the keys get sorted in each partition; that will be done on the reduce side // if the operation being run is sortByKey. sorter = new ExternalSorter[K, V, V]( None, Some(dep.partitioner), None, dep.serializer) sorter.insertAll(records) } val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId) val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId) val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile) ///写到磁盘文件中 shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths) mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) }
3. ResultTask在都去ShuffledRDD中的数据时(通过调用HashShufflerReader),首先读取到内存,当内存不够使用时,将spill到磁盘
override def read(): Iterator[Product2[K, C]] = { val ser = Serializer.getSerializer(dep.serializer) ///将shuffle数据转换成可遍历的Iterator对象 val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser) val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) { ///从Mapper端读取数据前,做Combine ///combine时,可能会spill到磁盘 if (dep.mapSideCombine) { new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context)) } else { new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context)) } } else { require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!") // Convert the Product2s to pairs since this is what downstream RDDs currently expect iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2)) } // Sort the output if there is a sort ordering defined. ///对output排序,可能spill到磁盘 dep.keyOrdering match { case Some(keyOrd: Ordering[K]) => // Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled, // the ExternalSorter won't spill to disk. val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser)) sorter.insertAll(aggregatedIter) context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled sorter.iterator case None => aggregatedIter } }
4. 任务本地性处理
a.DriverActor收到
相关推荐
该文件是小弟在学习spark期间,将spark的所学内容汇总到一起的一个思维导图,内容包括了spark的实战代码和其他技术集成。小弟学术不精,在技术上难免会有疏忽,若有什么地方写错,望海涵并积极指出。
spark相关的知识点整理出来的xmind,仅供参考~ 学习在于总结,希望能够帮助大家更快速的熟悉、了解或者复习这些内容。
Spark是Apache软件基金会下的一个开源大数据处理框架,以其高效、灵活和易用的特性在大数据领域备受关注。本文将深入探讨Spark的核心概念、主要组件、编程模型以及与Java的交互方式,帮助读者全面理解Spark在Java...
【基于Spark的人工智能知识图谱构建】 随着大数据时代的来临,计算机科学的各个领域都面临着海量信息处理的挑战。知识图谱作为一种强大的语义处理工具,它能够有效地组织和表达复杂的概念及其相互关系,帮助机器...
Spark知识体系-高频知识点汇总及面试常见问题总结
以下是该资源中的知识点总结: Spark 基础 * Spark Shell:交互式 shell,用于快速上手 Spark * Spark 初始化:如何初始化 Spark,包括 SparkContext 和 RDD 的介绍 * RDDs:Resilient Distributed Datasets,...
Spark考试(练习题)编程!...本文档提供了一些 Spark 考试的练习题,涵盖了 SparkStreaming、HBase、Kafka 等知识点。通过这些练习题,我们可以更好地了解 Spark 的相关知识点,并提高自己的编程能力。
本文将深入探讨使用Scala语言在Spark平台上实现ARIMA(自回归积分滑动平均模型)和Holt-Winters三次指数平滑法进行时间序列预测的知识点。 一、ARIMA模型 ARIMA(Autoregressive Integrated Moving Average)模型是...
以下是一些核心的Spark知识点: 1. **RDD(弹性分布式数据集)**:RDD是Spark中最基础的数据抽象,它是一个不可变、分区的数据集合。RDD具有五种特性: - **分区列表**:RDD由多个分区组成,每个分区存储在集群的...
spark基础知识思维导图整理,包括SparkCore和SparkSQL
Spark是Apache基金会下的一个开源大数据处理框架,以其高效、易用和可扩展性著称。Spark的核心设计理念是基于内存计算,极大地提升了数据处理速度。在本压缩包中,"spark-3.4.0-bin-without-hadoop"是Spark的一个预...
2014年的记录显示,使用Spark处理的数据量达百TB级仅需23分钟,而1PB级数据则需234分钟,相较之下,Hadoop MapReduce处理102.5TB数据需要72分钟。在排序基准测试中,Spark的排序速度可以达到1.42TB/分钟,而Hadoop仅...
Spark纯净版安装包是一个用于安装Apache Spark的软件包,该软件包提供了Spark的基本功能和组件,但不包含任何额外的扩展或依赖项。纯净版安装包旨在提供一个轻量级、简单易用的Spark安装选项,适用于用户希望快速...
本教程通过一系列视频课程,将涵盖以下关键知识点: 1. **Spark概述**:首先,我们会介绍Spark的基本概念,包括其设计理念、核心组件以及与Hadoop等其他大数据框架的区别。理解Spark的DAG执行模型对于后续的学习至...
在本资料包中,我们将围绕"Spark基础知识"进行探讨,包括Spark的架构、RDD概念、Spark SQL以及Spark与Scala、Hadoop的关联。 首先,Spark的架构基于“计算向数据移动”的理念,它由Driver、Executor和Cluster ...
通过检查点和持久化策略,Spark能够在节点故障时恢复状态。此外,动态资源调度可以根据集群资源变化调整Executor数量。 9. Spark SQL与Hive集成: Spark SQL可以直接读取Hive Metastore中的表,使得Hive用户可以...
书中可能涵盖了以下知识点: 1. **Spark概述**:Spark的诞生背景,其与Hadoop MapReduce的对比,以及Spark的主要优势。 2. **RDD概念**:RDD的定义,其不可变性和分区特性,以及转换和行动操作的原理。 3. **Spark...
Spark-Core文档是本人经三年总结笔记汇总而来,对于自我学习Spark核心基础知识非常方便,资料中例举完善,内容丰富。具体目录如下: 目录 第一章 Spark简介与计算模型 3 1 What is Spark 3 2 Spark简介 3 3 Spark...
以下是关于Spark3.2.2的一些关键知识点: 1. **DataFrame/Dataset API**:Spark3.2.2继续强化了DataFrame和Dataset API,这是Spark SQL的核心,提供了类型安全和更强的编译时检查。它们允许开发人员使用Java、Scala...
《大数据Spark企业级实战》详细解析了企业级Spark开发所需的几乎所有技术内容,涵盖Spark的架构设计、Spark的集群搭建、Spark内核的解析、Spark SQL、MLLib、GraphX、Spark Streaming、Tachyon、SparkR、Spark多语言...