`
bit1129
  • 浏览: 1069734 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark八十四】Spark零碎知识点记录

 
阅读更多

1. ShuffleMapTask的shuffle数据在什么地方记录到MapOutputTracker中的

ShuffleMapTask的runTask方法负责写数据到shuffle map文件中。当任务执行完成成功,DAGScheduler会收到通知,在DAGScheduler的handleTaskCompletion方法中完成记录到MapOutputTracker中

 

    event.reason match {
      case Success =>
        listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
          event.reason, event.taskInfo, event.taskMetrics))
        stage.pendingTasks -= task
        task match {
          case rt: ResultTask[_, _] =>
            stage.resultOfJob match {
              case Some(job) =>
                if (!job.finished(rt.outputId)) {
                  updateAccumulators(event)
                  job.finished(rt.outputId) = true
                  job.numFinished += 1
                  // If the whole job has finished, remove it
                  if (job.numFinished == job.numPartitions) {
                    markStageAsFinished(stage)
                    cleanupStateForJobAndIndependentStages(job)
                    listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded))
                  }

                  // taskSucceeded runs some user code that might throw an exception. Make sure
                  // we are resilient against that.
                  try {
                    job.listener.taskSucceeded(rt.outputId, event.result)
                  } catch {
                    case e: Exception =>
                      // TODO: Perhaps we want to mark the stage as failed?
                      job.listener.jobFailed(new SparkDriverExecutionException(e))
                  }
                }
              case None =>
                logInfo("Ignoring result from " + rt + " because its job has finished")
            }

          case smt: ShuffleMapTask =>
            updateAccumulators(event)
            ///从通知事件中获得MapStatus对西那个
            val status = event.result.asInstanceOf[MapStatus]
            ////ExecutorId
            val execId = status.location.executorId
            logDebug("ShuffleMapTask finished on " + execId)
            if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
              logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)
            } else {
              stage.addOutputLoc(smt.partitionId, status)
            }
            if (runningStages.contains(stage) && stage.pendingTasks.isEmpty) {
              markStageAsFinished(stage)
              logInfo("looking for newly runnable stages")
              logInfo("running: " + runningStages)
              logInfo("waiting: " + waitingStages)
              logInfo("failed: " + failedStages)
              if (stage.shuffleDep.isDefined) {
                // We supply true to increment the epoch number here in case this is a
                // recomputation of the map outputs. In that case, some nodes may have cached
                // locations with holes (from when we detected the error) and will need the
                // epoch incremented to refetch them.
                // TODO: Only increment the epoch number if this is not the first time
                //       we registered these map outputs.
               ///在此处将MapOutput注册到mapOutputTracker中
               mapOutputTracker.registerMapOutputs(
                  stage.shuffleDep.get.shuffleId,
                  stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray,
                  changeEpoch = true)
              }
              clearCacheLocs()
              if (stage.outputLocs.exists(_ == Nil)) {
                // Some tasks had failed; let's resubmit this stage
                // TODO: Lower-level scheduler should also deal with this
                logInfo("Resubmitting " + stage + " (" + stage.name +
                  ") because some of its tasks had failed: " +
                  stage.outputLocs.zipWithIndex.filter(_._1 == Nil).map(_._2).mkString(", "))
                submitStage(stage)
              } else {
                val newlyRunnable = new ArrayBuffer[Stage]
                for (stage <- waitingStages) {
                  logInfo("Missing parents for " + stage + ": " + getMissingParentStages(stage))
                }
                for (stage <- waitingStages if getMissingParentStages(stage) == Nil) {
                  newlyRunnable += stage
                }
                waitingStages --= newlyRunnable
                runningStages ++= newlyRunnable
                for {
                  stage <- newlyRunnable.sortBy(_.id)
                  jobId <- activeJobForStage(stage)
                } {
                  logInfo("Submitting " + stage + " (" + stage.rdd + "), which is now runnable")
                  submitMissingTasks(stage, jobId)
                }
              }
            }
          }

 

2. ShuffleMapTask在写shuffle map数据时(调用SortShuffleWriter.write方法),首先写内存,当内存不够使用时,将spill到磁盘;

 

 override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
    if (dep.mapSideCombine) {
      require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
      sorter = new ExternalSorter[K, V, C](
        dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
      sorter.insertAll(records) ///Spill到磁盘
    } else {
      // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
      // care whether the keys get sorted in each partition; that will be done on the reduce side
      // if the operation being run is sortByKey.
      sorter = new ExternalSorter[K, V, V](
        None, Some(dep.partitioner), None, dep.serializer)
      sorter.insertAll(records)
    }

    val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId)
    val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId)
    val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile) ///写到磁盘文件中
    shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths)

    mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
  }

 

 

3. ResultTask在都去ShuffledRDD中的数据时(通过调用HashShufflerReader),首先读取到内存,当内存不够使用时,将spill到磁盘

 

 

 

override def read(): Iterator[Product2[K, C]] = {
    val ser = Serializer.getSerializer(dep.serializer)
    ///将shuffle数据转换成可遍历的Iterator对象
    val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)

    val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
    ///从Mapper端读取数据前,做Combine
    ///combine时,可能会spill到磁盘
      if (dep.mapSideCombine) {
        new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context)) 
      } else {
        new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))
      }
    } else {
      require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")

      // Convert the Product2s to pairs since this is what downstream RDDs currently expect
      iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2))
    }

    // Sort the output if there is a sort ordering defined.
    ///对output排序,可能spill到磁盘
    dep.keyOrdering match {
      case Some(keyOrd: Ordering[K]) =>
        // Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
        // the ExternalSorter won't spill to disk.
        val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))
        sorter.insertAll(aggregatedIter)
        context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled
        context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled
        sorter.iterator
      case None =>
        aggregatedIter
    }
  }
 

 

4. 任务本地性处理

a.DriverActor收到

 

 

 

 

 

 

 

 

 

 

 

 

 

分享到:
评论

相关推荐

    Spark知识点汇总

    该文件是小弟在学习spark期间,将spark的所学内容汇总到一起的一个思维导图,内容包括了spark的实战代码和其他技术集成。小弟学术不精,在技术上难免会有疏忽,若有什么地方写错,望海涵并积极指出。

    Spark汇总知识点

    spark相关的知识点整理出来的xmind,仅供参考~ 学习在于总结,希望能够帮助大家更快速的熟悉、了解或者复习这些内容。

    spark知识点个人总结

    Spark是Apache软件基金会下的一个开源大数据处理框架,以其高效、灵活和易用的特性在大数据领域备受关注。本文将深入探讨Spark的核心概念、主要组件、编程模型以及与Java的交互方式,帮助读者全面理解Spark在Java...

    基于Spark的人工智能知识图谱构建1

    【基于Spark的人工智能知识图谱构建】 随着大数据时代的来临,计算机科学的各个领域都面临着海量信息处理的挑战。知识图谱作为一种强大的语义处理工具,它能够有效地组织和表达复杂的概念及其相互关系,帮助机器...

    Spark知识体系-高频知识点汇总及面试常见问题总结

    Spark知识体系-高频知识点汇总及面试常见问题总结

    Spark 编程指南简体中文版.pdf

    以下是该资源中的知识点总结: Spark 基础 * Spark Shell:交互式 shell,用于快速上手 Spark * Spark 初始化:如何初始化 Spark,包括 SparkContext 和 RDD 的介绍 * RDDs:Resilient Distributed Datasets,...

    spark考试(练习题)编程!

    Spark考试(练习题)编程!...本文档提供了一些 Spark 考试的练习题,涵盖了 SparkStreaming、HBase、Kafka 等知识点。通过这些练习题,我们可以更好地了解 Spark 的相关知识点,并提高自己的编程能力。

    spark-timeSeries.rar_scala 时间序列_spark ARIMA_spark arima_spark 滑

    本文将深入探讨使用Scala语言在Spark平台上实现ARIMA(自回归积分滑动平均模型)和Holt-Winters三次指数平滑法进行时间序列预测的知识点。 一、ARIMA模型 ARIMA(Autoregressive Integrated Moving Average)模型是...

    spark知识点整理.docx

    以下是一些核心的Spark知识点: 1. **RDD(弹性分布式数据集)**:RDD是Spark中最基础的数据抽象,它是一个不可变、分区的数据集合。RDD具有五种特性: - **分区列表**:RDD由多个分区组成,每个分区存储在集群的...

    spark基础知识整理

    spark基础知识思维导图整理,包括SparkCore和SparkSQL

    spark安装包+spark实验安装软件

    Spark是Apache基金会下的一个开源大数据处理框架,以其高效、易用和可扩展性著称。Spark的核心设计理念是基于内存计算,极大地提升了数据处理速度。在本压缩包中,"spark-3.4.0-bin-without-hadoop"是Spark的一个预...

    Spark基本知识调查

    2014年的记录显示,使用Spark处理的数据量达百TB级仅需23分钟,而1PB级数据则需234分钟,相较之下,Hadoop MapReduce处理102.5TB数据需要72分钟。在排序基准测试中,Spark的排序速度可以达到1.42TB/分钟,而Hadoop仅...

    大数据Spark纯净版安装包,用于快速集成Hive on Spark

    Spark纯净版安装包是一个用于安装Apache Spark的软件包,该软件包提供了Spark的基本功能和组件,但不包含任何额外的扩展或依赖项。纯净版安装包旨在提供一个轻量级、简单易用的Spark安装选项,适用于用户希望快速...

    spark全套视频教程

    本教程通过一系列视频课程,将涵盖以下关键知识点: 1. **Spark概述**:首先,我们会介绍Spark的基本概念,包括其设计理念、核心组件以及与Hadoop等其他大数据框架的区别。理解Spark的DAG执行模型对于后续的学习至...

    spark基础知识

    在本资料包中,我们将围绕"Spark基础知识"进行探讨,包括Spark的架构、RDD概念、Spark SQL以及Spark与Scala、Hadoop的关联。 首先,Spark的架构基于“计算向数据移动”的理念,它由Driver、Executor和Cluster ...

    spark笔记整理文档

    通过检查点和持久化策略,Spark能够在节点故障时恢复状态。此外,动态资源调度可以根据集群资源变化调整Executor数量。 9. Spark SQL与Hive集成: Spark SQL可以直接读取Hive Metastore中的表,使得Hive用户可以...

    ApacheSpark设计与实现.pdf+ApacheSpark源码剖析.pdf+Spark原著中文版.pdf

    书中可能涵盖了以下知识点: 1. **Spark概述**:Spark的诞生背景,其与Hadoop MapReduce的对比,以及Spark的主要优势。 2. **RDD概念**:RDD的定义,其不可变性和分区特性,以及转换和行动操作的原理。 3. **Spark...

    Spark-Core学习知识笔记整理

    Spark-Core文档是本人经三年总结笔记汇总而来,对于自我学习Spark核心基础知识非常方便,资料中例举完善,内容丰富。具体目录如下: 目录 第一章 Spark简介与计算模型 3 1 What is Spark 3 2 Spark简介 3 3 Spark...

    适配CDH6.3.2的Spark3.2.2

    以下是关于Spark3.2.2的一些关键知识点: 1. **DataFrame/Dataset API**:Spark3.2.2继续强化了DataFrame和Dataset API,这是Spark SQL的核心,提供了类型安全和更强的编译时检查。它们允许开发人员使用Java、Scala...

    大数据Spark企业级实战

    《大数据Spark企业级实战》详细解析了企业级Spark开发所需的几乎所有技术内容,涵盖Spark的架构设计、Spark的集群搭建、Spark内核的解析、Spark SQL、MLLib、GraphX、Spark Streaming、Tachyon、SparkR、Spark多语言...

Global site tag (gtag.js) - Google Analytics