`
bit1129
  • 浏览: 1070061 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark二八Spark Shuffle读过程源代码代码剖析

 
阅读更多

Spark的shuffle读操作是性能杀手,原因是Shuffle读操作需要从多个Map节点拉取数据到Reduce节点,所有的Reduce结果是否还要经过一次总计算?

 

 

package spark.examples

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

object SparkWordCount {
  def main(args: Array[String]) {
    System.setProperty("hadoop.home.dir", "E:\\devsoftware\\hadoop-2.5.2\\hadoop-2.5.2");
    val conf = new SparkConf()
    conf.setAppName("SparkWordCount")
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    val rdd = sc.textFile("file:///D:/word.in")
    println(rdd.toDebugString)
    val rdd1 = rdd.flatMap(_.split(" "))
    println("rdd1:" + rdd1.toDebugString)
    val rdd2 = rdd1.map((_, 1))
    println("rdd2:" + rdd2.toDebugString)
    val rdd3 = rdd2.reduceByKey(_ + _);
    println("rdd3:" + rdd3.toDebugString)
    rdd3.saveAsTextFile("file:///D:/wordout" + System.currentTimeMillis());
    sc.stop
  }
}

 

 

 

1. ResultTask的runTask方法

  override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

    metrics = Some(context.taskMetrics)
    func(context, rdd.iterator(partition, context))
  }

 2. 首先反序列化得到rdd和func,其中rdd是该stage的最后一个RDD(final RDD),而func是就是PairRDDFunctions.scala的saveAsHadoopDataset方法内部定义的writeToFile函数。writeToFile函数实现了wordcount最终的写磁盘操作(rdd3.saveAsTextFile)。在调用writeToFile写磁盘之前,需要首先从Mapper节点读取到数据,然后对它进行整合,这个在ResultTask的runTask方法的最后一句 func(context, rdd.iterator(partition, context))中的rdd.iterator方法实现的。

这在Spark Shuffle写操作过程中分析,就是读取级联的读取它的父RDD的compute方法完成读取的读取操作

 

3.数据读取操作(rdd.iterator)

3.1 RDD类定义的iterator的模板方法,

 

  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) { ///如果这个RDD的StorageLevel不为NONE,那么逻辑是什么?
      SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel) 
    } else { //否则计算或者读取Checkpoint检查点
      computeOrReadCheckpoint(split, context)
    }
  }

3.2 WordCount的第二个Stage总共包含两个RDD,最后一个是MappedRDD,第一个是ShuffledMapRDD,MappedRDD从ShuffledMapRDD获取具体的数据,然后执行MappedRDD自身携带的函数,这个函数的定义是在RDD的saveAsTextFile的函数x=>(NullWritable.get(), new Text(x.toString))

  def saveAsTextFile(path: String) {
    this.map(x => (NullWritable.get(), new Text(x.toString)))
      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
  }

3.3 MappedRDD的compute方法

 

 

  override def compute(split: Partition, context: TaskContext) =
    firstParent[T].iterator(split, context).map(f) ///调用它的parent RDD的iterator方法,这里是ShuffledRDD
}
 

 

 3.4 ShuffledRDD的compute方法

 

 

 ///split是当前要计算的分片
 override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]

   //ShuffleManager是SortShuffleManager、
   ///获取的ReaderHashShuffleReader,注意此处虽然是Sort based Shuffle,但是读仍然是HashShuffleReader
   ///获取reader的参照是shuffleHandle,分区的头尾,Spark只支持一次读取一个分片
    SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
      .read()
      .asInstanceOf[Iterator[(K, C)]]
  }
 

 

3.5 SortShuffleManager的getReader方法

 

 

/**
   * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
   * Called on executors by reduce tasks.
   */
  override def getReader[K, C](
      handle: ShuffleHandle,
      startPartition: Int,
      endPartition: Int,
      context: TaskContext): ShuffleReader[K, C] = {
    // We currently use the same block store shuffle fetcher as the hash-based shuffle.
    new HashShuffleReader(////构造时,只需要提供ShuffleHandle,parttition开始结束位
      handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
  }
 

 

 3.6 HashShuffleReader的read方法

 

 

  /** Read the combined key-values for this reduce task */
  override def read(): Iterator[Product2[K, C]] = {
    val ser = Serializer.getSerializer(dep.serializer)///获取序列化器

    ///关键方法,根据shuffleId获取迭代数据的Iterator
    val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)
    
    ///对数据进行aggregate,得到一个整合后的Iterator
    val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
      
      ////Reduce端进行结果合并,map端是否已经combine过,采用两种不同的方式
      ////如果map端做了combine,那么调用combineCombinersByKey,
      if (dep.mapSideCombine) {
        new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))
      } else {
        ////如果map端没combine,那么调用combineValuesByKey,这个应该是跟map端做combine使用相同的方法
        new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))
      }
    } else if (dep.aggregator.isEmpty && dep.mapSideCombine) {
      throw new IllegalStateException("Aggregator is empty for map-side combine")
    } else {
      // Convert the Product2s to pairs since this is what downstream RDDs currently expect
      iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2))
    }

    // Sort the output if there is a sort ordering defined.

    ///继续对数据进行排序操作,如果定义了keyOrdering,如果没有定义keyOrdering,那么如何定义之?
    ///对Key进行排序?
    dep.keyOrdering match {
      case Some(keyOrd: Ordering[K]) =>
        // Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
        // the ExternalSorter won't spill to disk.
        val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))
        sorter.insertAll(aggregatedIter)
        context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled
        context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled
        sorter.iterator
      case None =>
        aggregatedIter
    }
  }
 

 

3.7 BlockStoreShuffleFetcher伴生对象只定义了一个fetch方法

 

 

private[hash] object BlockStoreShuffleFetcher extends Logging {
  def fetch[T](
      shuffleId: Int, ////shuffleId,由dep.shuffleHandle.shuffleId提供
      reduceId: Int,  ///startPartition被理解为reduceId?
      context: TaskContext,
      serializer: Serializer)
    : Iterator[T] =
  {
    logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId))
    val blockManager = SparkEnv.get.blockManager //获取blockManager

    val startTime = System.currentTimeMillis
    ///根据shuffleId和reduceId获取statuses对象,因为MapOutputTracker存放在SparkEnv中,SparkEnv类似于集群级别的共享变量
    ///简单的说就是获取MapOutputputLocation
    /// Called from executors to get the server URIs and output sizes of the map outputs of a given shuffle.
    ///statuses是一个二元元组的集合,每个元组的第一个元素是BlockManagerId对象(包含三方面的信息,executorId_,host_,port_),第二个是数据的长度
    val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)
    logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format(
      shuffleId, reduceId, System.currentTimeMillis - startTime))
    
    ////BlockManagerId封装了什么信息?
    ///ArrayBuffer的元素是Tuple二元组类型,分别是Int和Long类型
    val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]]
    
     
    //statuses.zipWithIndex是个二元组,第一元素又是一个二元元组
    ///address表示BlockManagerId对象
    ///size表示数据大小
    ///index表示索引?有什么用?
    for (((address, size), index) <- statuses.zipWithIndex) {
      ///这是什么操作?根据address获取一个二元组,然后把(index,size)赋值给它
      ///splitsByAddress此时有一个元素(address,(index,size))
      splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size))
    }
    
    ///构造blockByAddress,得到一个Seq,元素类型是元组:(BlockManagerId, Seq[(BlockId, Long)])
    ///元组的第一个元素是address,元组的第二个元素是一个新元组的集合,每个元组的第一个是一个元素是BlockId(由shuffleId,index, reduceId),第二个元素是数据的size
    val blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = splitsByAddress.toSeq.map {
      case (address, splits) =>
        (address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2)))
    }
    
    ///定义一个内部函数
    def unpackBlock(blockPair: (BlockId, Try[Iterator[Any]])) : Iterator[T] = {
      val blockId = blockPair._1
      val blockOption = blockPair._2
      blockOption match {
        case Success(block) => {
          block.asInstanceOf[Iterator[T]]
        }
        case Failure(e) => {
          blockId match {
            case ShuffleBlockId(shufId, mapId, _) =>
              val address = statuses(mapId.toInt)._1
              throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, e)
            case _ =>
              throw new SparkException(
                "Failed to get block " + blockId + ", which is not a shuffle block", e)
          }
        }
      }
    }
    
    ///这是什么类?
    val blockFetcherItr = new ShuffleBlockFetcherIterator(
      context,
      SparkEnv.get.blockManager.shuffleClient,
      blockManager,
      blocksByAddress,
      serializer,
      SparkEnv.get.conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024)
    
    ///对blockFetcherItr集合ununpackBlock方法然后压平结果
    val itr = blockFetcherItr.flatMap(unpackBlock)
    
    ///构造 CompletionIterator对象?
    val completionIter = CompletionIterator[T, Iterator[T]](itr, {
      context.taskMetrics.updateShuffleReadMetrics()
    })
    ///包装了completionIter迭代器,组合模式
    new InterruptibleIterator[T](context, completionIter)
  }
}
 

 3.8 ShuffleBlockFetcherIterator类

a. 从下面的类文档中可以看出,ShuffleBlockFetcherIterator类,用于fetch数据块,对于位于本地的block,从本地BlockManager获取;对于远端的数据块,使用BlockTransferService获取数据(BlockTransferService使用Netty作为底层的数据传输模块)

b. 最后的结果是获取了一个迭代器,迭代的每条记录是一个(BlockId,values)元组

c. throttle的含义是节流阀

 

/**
 * An iterator that fetches multiple blocks. For local blocks, it fetches from the local block
 * manager. For remote blocks, it fetches them using the provided BlockTransferService.
 *
 * This creates an iterator of (BlockID, values) tuples so the caller can handle blocks in a
 * pipelined fashion as they are received.
 *
 * The implementation throttles the remote fetches to they don't exceed maxBytesInFlight to avoid
 * using too much memory.
 *
 * @param context [[TaskContext]], used for metrics update
 * @param shuffleClient [[ShuffleClient]] for fetching remote blocks
 * @param blockManager [[BlockManager]] for reading local blocks
 * @param blocksByAddress list of blocks to fetch grouped by the [[BlockManagerId]].
 *                        For each block we also require the size (in bytes as a long field) in
 *                        order to throttle the memory usage.
 * @param serializer serializer used to deserialize the data.
 * @param maxBytesInFlight max size (in bytes) of remote blocks to fetch at any given point.
 */

 

d.在BlockStoreShuffleFetcher中,ShuffleBlockFetcherIterator的构造如下,对比下构造函数

注意ShuffleClient是从SparkEnv的BlockManager获取的,

    val blockFetcherItr = new ShuffleBlockFetcherIterator(
      context,
      SparkEnv.get.blockManager.shuffleClient,
      blockManager,
      blocksByAddress,
      serializer,
      SparkEnv.get.conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024)

 

e. ShuffleClient在BlockManager中的定义

 

  // Client to read other executors' shuffle files. This is either an external service, or just the
  // standard BlockTranserService to directly connect to other Executors.
  private[spark] val shuffleClient = if (externalShuffleServiceEnabled) { //启用了外部ExternalShuffleService
    val transConf = SparkTransportConf.fromSparkConf(conf, numUsableCores)
    new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled())
  } else {
    blockTransferService //否则使用blockTransferService: BlockTransferService,在Spark1.2中使用了基于Netty的数据传输NettyBlockTransferService
  }

 

 3.9 BlockStoreShuffleFetcher的fetch方法调用blockFetcherItr.flatMap(unpackBlock)解析

 

结论:

 

在Reduce阶段,还会执行combine操作,进行深度的合并的操作,这是必然的,假如不同的Mapper上有多个单词A,那么计数的结果应该是把所有Mapper上的A进行合并得到最后的结果。

 

 

 

分享到:
评论

相关推荐

    Spark源码剖析

    《Spark源码剖析》PDF 文件很可能会深入到这些技术细节,包括类结构、算法实现以及关键代码的解析,帮助读者更好地理解和优化 Spark 应用。通过深入学习 Spark 源码,开发者可以更好地掌握 Spark 内部工作原理,从而...

    ApacheSpark设计与实现.pdf+ApacheSpark源码剖析.pdf+Spark原著中文版.pdf

    《Apache Spark源码剖析》则会深入到Spark的源代码层面,帮助读者理解其实现细节: 1. **源码结构**:Spark源代码的主要模块和包划分,以及关键类的职责。 2. **调度机制**:DAGScheduler和TaskScheduler的工作流程...

    spark2.2.0源码------

    Spark 2.2.0是Apache Spark的一个重要版本,它带来了许多增强...通过深入研究Spark 2.2.0的源码,开发者可以更好地理解其内部机制,定制化自己的大数据处理流程,同时也能为未来的版本贡献代码,推动Spark的持续发展。

    Spark github源码 例子很有价值

    在GitHub上,Spark的源代码被广泛分享和贡献,为开发者提供了深入理解其内部机制和学习新功能的机会。"Spark github源码 例子很有价值"这个标题表明,通过分析这些源码,我们可以学习到Spark如何实现其核心功能,如...

    spark高级分析数据源码

    7. **源码解析**:书中可能涵盖了Spark源码的解析,如任务调度、内存管理、shuffle过程等,帮助读者深入理解Spark的内部工作原理。 8. **aas-master实例**:这个文件夹很可能是包含了一系列的Spark应用实例,覆盖了...

    基于spark的外卖大数据平台分析系统.zip

    本文将深入探讨如何利用Spark构建一个基于外卖数据的分析系统,以及在这个过程中涉及到的关键技术和应用。 首先,我们需要理解Spark的核心特性。Spark提供了一种内存计算模型,它将数据存储在内存中,从而极大地...

    Spark大数据分析与实战课后练习答案.rar

    《Spark大数据分析与实战》课程是一门深入探讨Apache Spark在大数据处理领域的应用和技术的课程,其课后练习答案集提供了对课程所讲授知识的巩固和实践。这是一份珍贵的配套教学资源,旨在帮助学生更好地理解和掌握...

    带你深入理解Spark核心思想走进Sprak的源码分析

    3. Shuffle过程:理解Spark如何处理shuffle操作,如`groupByKey`,涉及的HashPartitioner和SortBasedPartitioner等。 4.容错机制:研究RDD的 lineage(血统),了解如何通过重计算丢失的分区来恢复数据。 5. ...

    spark新手上路之源码解析.pdf

    Shuffle 读,根据新的分区策略重新组合数据。 10. **存储管理** Spark 支持多种存储级别,如内存、磁盘和序列化。存储层负责管理数据的缓存和回收,以优化内存使用。 11. **通信层** Spark 使用 Tachyon 或 ...

    Apache-Spark2.20源码中文注释

    6. **Shuffle 操作**:在 Spark 中,Shuffle 是数据重新分区的过程,通常发生在 join、groupByKey 或 reduceByKey 等操作中。Shuffle 会触发网络传输,对性能影响较大,理解其内部机制有助于优化作业。 7. **容错...

    Hadoop源代码分析(一)

    源代码分析可以帮助我们理解任务调度、数据分区和Shuffle过程的细节。 3. **YARN资源管理系统**:随着Hadoop的发展,YARN(Yet Another Resource Negotiator)取代了最初的JobTracker,负责集群资源的管理和任务...

    Spark技术内幕:深入解析Spark内核架构设计与实现原理

    Shuffle是Spark中数据重新分布的过程,涉及数据在网络间的移动。优化Shuffle可以显著提高性能,如使用合适的分区策略、减少网络传输和避免Shuffle产生的临时文件。 **Spark调度和资源管理** Spark的调度器分为粗...

    spark考试练习题含答案.rar

    《Spark大数据处理实战练习题详解》 Spark作为大数据处理领域的重要工具,因其高效、易用的特性备受开发者青睐。为了帮助大家深入理解和掌握Spark的核心功能,我们整理了一系列的Spark考试练习题,涵盖从基础概念到...

    spark-3.1.2.tgz & spark-3.1.2-bin-hadoop2.7.tgz.rar

    - spark-3.1.2.tgz:这是一个tar归档文件,经过gzip压缩,通常包含源代码、文档、配置文件和编译后的二进制文件。 - spark-3.1.2-bin-hadoop2.7.tgz:这个版本除了包含基本的Spark组件外,还集成了Hadoop 2.7的二...

    spark-2.2.0.tgz源码

    6. **Shuffle过程**:在Spark作业中,shuffle是一个重要的操作,它涉及重新组织数据以满足任务间的依赖。在Spark 2.2.0中,shuffle管理得到了优化,减少了网络传输和磁盘I/O,提高了性能。 7. **动态资源调度**:...

    Spark快速大数据分析图谱.zip

    在大数据分析过程中,Spark提供了RDD(弹性分布式数据集)作为基本的数据抽象,它是不可变的、分区的并行数据集合。RDD可以通过转换操作(如map、filter)和行动操作(如count、save)进行操作。此外,DataFrame和...

    Spark-2.4.5官网下载源码包

    这个版本的源码包可以从Spark的官方网站下载,用于深入学习其内部机制、理解代码结构和实现,以及根据个人需求进行定制化编译。 源码包中包含的主要目录和文件如下: 1. `core/`:Spark的核心模块,包含任务调度、...

    spark版本2.7.4

    2. DataFrame/Dataset API:提供了一种声明式编程方式,使得代码更加简洁易读,同时利用 Catalyst 优化器进行底层执行计划的优化。 3. SQL支持:Spark SQL与Hive兼容,允许用户使用标准SQL查询数据,同时也支持...

    spark全套学习资料.zip

    05_尚硅谷大数据技术之Spark内核.docx探讨了Spark的存储系统、执行计划优化、shuffle过程以及Tachyon和Alluxio等内存数据存储系统的集成,让读者深入了解Spark的运行机制。 五、Spark性能调优与故障处理 06_Spark...

    spark-3.0.0-bin-hadoop2.7.tgz

    例如,新的Shuffle服务可以提高shuffle操作的性能,而Tungsten项目则通过代码生成优化了DataFrame/Dataset的执行效率。 8. **SQL改进**: Spark SQL在3.0.0版本中引入了对ansi SQL标准的支持,增强了对窗口函数的...

Global site tag (gtag.js) - Google Analytics