`

Sort-based Shuffle的设计与实现

阅读更多
原文  http://www.cnblogs.com/hseagle/p/3979744.html

概要

Spark 1.1中对spark core的一个重大改进就是引入了sort-based shuffle处理机制,本文就该处理机制的实现进行初步的分析。


通过一个小的实验来直观的感受一下sort-based shuffle算法会产生哪些中间文件,具体实验步骤如下所述。

步骤1: 修改conf/spark-default.conf, 加入如下内容

spark.shuffle.manager SORT

步骤2: 运行spark-shell

SPARK_LOCAL_IP=127.0.0.1 $SPARK_HOME/bin/spark-shell

步骤3: 执行wordcount

sc.textFile("README.md").flatMap(l => l.split(" ")).map(w=>(w,1)).reduceByKey(_ + _).collect

步骤4: 查看生成的中间文件

find /tmp/spark-local* -type f

文件查找结果如下所示

/tmp/spark-local-20140919091822-aa66/0f/shuffle_0_1_0.index
/tmp/spark-local-20140919091822-aa66/30/shuffle_0_0_0.index
/tmp/spark-local-20140919091822-aa66/0c/shuffle_0_0_0.data
/tmp/spark-local-20140919091822-aa66/15/shuffle_0_1_0.data


可以看到生成了两人种后缀的文件,分别为data和index类型,这两者的用途在后续分析中会详细讲述。

如果我们做一下对比实验,将shuffle模式改为Hash,再来观看生成的文件,就会找到区别。将原先配置文件中的 SORT 改为 HASH ,重新启动spark-shell,执行相同的wordcount之后,在tmp目录下找到的文件列表如下。


/tmp/spark-local-20140919092949-14cc/10/shuffle_0_1_3
/tmp/spark-local-20140919092949-14cc/0f/shuffle_0_1_2
/tmp/spark-local-20140919092949-14cc/0f/shuffle_0_0_3
/tmp/spark-local-20140919092949-14cc/0c/shuffle_0_0_0
/tmp/spark-local-20140919092949-14cc/0d/shuffle_0_1_0
/tmp/spark-local-20140919092949-14cc/0d/shuffle_0_0_1
/tmp/spark-local-20140919092949-14cc/0e/shuffle_0_1_1
/tmp/spark-local-20140919092949-14cc/0e/shuffle_0_0_2


两者生成的文件数量差异非常大,具体数值计算如下

    在HASH模式下,每一次shuffle会生成M*R的数量的文件,如上述wordcount例子中,整个job有一次shuffle过程,由于输入文件默认分片为2,故M个数为2,而spark.default.parallelism配置的值为4,故R为4,所以总共生成1*2*4=8个文件。shuffle_0_1_2解读为shuffle+shuffle_id+map_id+reduce_id,故0_1_2表示由第0次shuffle中的第1个maptask生成的文件,该文件内容会被第2个reduce task消费
    在SORT模式下,一个Map Task只生成一个文件,而不管生成的文件要被多少的Reduce消费,故文件个数是M的数量,由于wordcount中的默认分片为2,故只生成两个data文件

多次shuffle

刚才的示例中只有一次shuffle过程,我们可以通过小小的改动来达到两次shuffle,代码如下

sc.textFile("README.md").flatMap(l => l.split(" ")).map(w => (w,1)).reduceByKey(_ + _).map(p=>(p._2,p._1)).groupByKey.collect


上述代码将reduceByKey的结果通过map进行反转,即将原来的(w, count)转换为(count,w),然后根据出现次数进行归类。 groupByKey会再次导致数据shuffle过程。

在HASH模式下产生的文件如下所示


/tmp/spark-local-20140919094531-1cb6/12/shuffle_0_3_3
/tmp/spark-local-20140919094531-1cb6/0c/shuffle_0_0_0
/tmp/spark-local-20140919094531-1cb6/11/shuffle_0_2_3
/tmp/spark-local-20140919094531-1cb6/11/shuffle_0_3_2
/tmp/spark-local-20140919094531-1cb6/11/shuffle_1_1_3
/tmp/spark-local-20140919094531-1cb6/10/shuffle_0_2_2
/tmp/spark-local-20140919094531-1cb6/10/shuffle_0_1_3
/tmp/spark-local-20140919094531-1cb6/10/shuffle_0_3_1
/tmp/spark-local-20140919094531-1cb6/10/shuffle_1_0_3
/tmp/spark-local-20140919094531-1cb6/10/shuffle_1_1_2
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_0_3
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_3_0
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_2_1
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_1_2
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_1_0_2
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_1_1_1
/tmp/spark-local-20140919094531-1cb6/0d/shuffle_0_0_1
/tmp/spark-local-20140919094531-1cb6/0d/shuffle_0_1_0
/tmp/spark-local-20140919094531-1cb6/0d/shuffle_1_0_0
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_0_2_0
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_0_1_1
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_0_0_2
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_1_0_1
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_1_1_0


引入一次新的shuffle,产生了大量的中间文件

如果是使用SORT,效果如何呢?只会增加M个文件,由于在新的shuffle过程中,map task数目为4,所以总共的文件是2+4=6。


/tmp/spark-local-20140919094731-034a/29/shuffle_0_3_0.data
/tmp/spark-local-20140919094731-034a/30/shuffle_0_0_0.index
/tmp/spark-local-20140919094731-034a/15/shuffle_0_1_0.data
/tmp/spark-local-20140919094731-034a/36/shuffle_0_2_0.data
/tmp/spark-local-20140919094731-034a/0c/shuffle_0_0_0.data
/tmp/spark-local-20140919094731-034a/32/shuffle_0_2_0.index
/tmp/spark-local-20140919094731-034a/32/shuffle_1_1_0.index
/tmp/spark-local-20140919094731-034a/0f/shuffle_0_1_0.index
/tmp/spark-local-20140919094731-034a/0f/shuffle_1_0_0.index
/tmp/spark-local-20140919094731-034a/0a/shuffle_1_1_0.data
/tmp/spark-local-20140919094731-034a/2b/shuffle_1_0_0.data
/tmp/spark-local-20140919094731-034a/0d/shuffle_0_3_0.index


值得指出的是shuffle_0和shuffle_1的执行次序问题,数字越大越先执行,由于spark job提交的时候是从后往前倒推的,故0是最后将执行,而前面的先执行。
Sort-based Shuffle的设计思想

sort-based shuffle的总体指导思想是一个map task最终只生成一个shuffle文件,那么后续的reduce task是如何从这一个shuffle文件中得到自己的partition呢,这个时候就需要引入一个新的文件类型即index文件。

其具体实现步骤如下:

    Map Task在读取自己输入的partition之后,将计算结果写入到ExternalSorter
    ExternalSorter会使用一个map来存储新的计算结果,新的计算结果根据partiton分类,如果是有combine操作,则需要将新的值与原有的值进行合并
    如果ExternalSorter中的map占用的内存已经超越了使用的阀值,则将map中的内容spill到磁盘中,每一次spill产生一个不同的文件
    当输入Partition中的所有数据都已经处理完毕之后,这时有可能一部分计算结果在内存中,另一部分计算结果在spill的一到多个文件之中,这时通过merge操作将内存和spill文件中的内容合并整到一个文件里
    最后将每一个partition的在data文件中的起始位置和结束位置写入到index文件

相应的源文件

  
 SortShuffleManager.scala
    SortShuffleWriter.scala
    ExternalSorter.scala
    IndexShuffleBlockManager.scala


几个重要的函数

SortShuffleWriter.write

 override def write(records: Iterator[_ >: Product2[K, V]]): Unit = {
    if (dep.mapSideCombine) {
      if (!dep.aggregator.isDefined) {
        throw new IllegalStateException("Aggregator is empty for map-side combine")
      }
      sorter = new ExternalSorter[K, V, C](
        dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
      sorter.insertAll(records)
    } else {
      // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
      // care whether the keys get sorted in each partition; that will be done on the reduce side
      // if the operation being run is sortByKey.
      sorter = new ExternalSorter[K, V, V](
        None, Some(dep.partitioner), None, dep.serializer)
      sorter.insertAll(records)
    }

    val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId)
    val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId)
    val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
    shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths)

    mapStatus = new MapStatus(blockManager.blockManagerId,
      partitionLengths.map(MapOutputTracker.compressSize))
  }


ExternalSorter.insertAll

def insertAll(records: Iterator[_  {
        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
      }
      while (records.hasNext) {
        elementsRead += 1
        kv = records.next()
        map.changeValue((getPartition(kv._1), kv._1), update)
        maybeSpill(usingMap = true)
      }
    } else {
      // Stick values into our buffer
      while (records.hasNext) {
        elementsRead += 1
        val kv = records.next()
        buffer.insert((getPartition(kv._1), kv._1), kv._2.asInstanceOf[C])
        maybeSpill(usingMap = false)
      }
    }
  }


writePartitionedFile将内存中的数据和spill文件中内容一起合并到一个文件当中

def writePartitionedFile(
      blockId: BlockId,
      context: TaskContext,
      outputFile: File): Array[Long] = {

    // Track location of each range in the output file
    val lengths = new Array[Long](numPartitions)

    if (bypassMergeSort && partitionWriters != null) {
      // We decided to write separate files for each partition, so just concatenate them. To keep
      // this simple we spill out the current in-memory collection so that everything is in files.
      spillToPartitionFiles(if (aggregator.isDefined) map else buffer)
      partitionWriters.foreach(_.commitAndClose())
      var out: FileOutputStream = null
      var in: FileInputStream = null
      try {
        out = new FileOutputStream(outputFile)
        for (i <- 0 until numPartitions) {
          in = new FileInputStream(partitionWriters(i).fileSegment().file)
          val size = org.apache.spark.util.Utils.copyStream(in, out, false)
          in.close()
          in = null
          lengths(i) = size
        }
      } finally {
        if (out != null) {
          out.close()
        }
        if (in != null) {
          in.close()
        }
      }
    } else {
      // Either we're not bypassing merge-sort or we have only in-memory data; get an iterator by
      // partition and just write everything directly.
      for ((id, elements) <- this.partitionedIterator) {
        if (elements.hasNext) {
          val writer = blockManager.getDiskWriter(
            blockId, outputFile, ser, fileBufferSize, context.taskMetrics.shuffleWriteMetrics.get)
          for (elem 


而数据读取过程中则需要使用IndexShuffleBlockManager来获取Partiton的具体位置

 
override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
    // The block is actually going to be a range of a single map output file for this map, so
    // find out the consolidated file, then the offset within that from our index
    val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)

    val in = new DataInputStream(new FileInputStream(indexFile))
    try {
      in.skip(blockId.reduceId * 8)
      val offset = in.readLong()
      val nextOffset = in.readLong()
      new FileSegmentManagedBuffer(
        getDataFile(blockId.shuffleId, blockId.mapId),
        offset,
        nextOffset - offset)
    } finally {
      in.close()
    }
  }
分享到:
评论

相关推荐

    spark shuffle简介

    从Hash-based Shuffle到Sort-based Shuffle再到Push-based Shuffle,每次改进都是为了降低延迟、提高吞吐量和减少资源消耗。理解Spark Shuffle的工作原理及其优化方法,对于有效管理和优化Spark作业的性能至关重要。

    Spark技术内幕深入解析Spark内核架构设计与实现原理

    《Spark技术内幕深入解析Spark内核架构设计与实现原理》这本书深入探讨了Apache Spark这一分布式计算框架的核心架构和实现机制,旨在帮助读者全面理解Spark的工作原理,并能够有效地利用其进行大数据处理。...

    spark调优介绍

    5. 使用sort-based shuffle代替hash-based shuffle:sort-based shuffle可以提供更好的排序效果,减少数据交换。 以上策略需要结合实际应用场景灵活调整,持续监控Spark应用的性能指标,如executor内存使用情况、...

    Spark Shuffle优化-参数调优1

    这个参数控制对于 sort-based ShuffleManager,如果没有进行 map side 聚合,而且 reduce 任务数量少于这个值,那么就不会进行排序。如果你使用 sort ShuffleManager,而且不需要排序,那么可以考虑将这个值加大,...

    数据结构常用算法c++实现

    Array shuffle Prime test(trial division) Prime test(Miller-Rabin's method) 2D Array Arbitary Integer Linear congruential generator Maximum subarray problem Bit-Set Queue Stack Binary Heap Fibonacci ...

    Spark SQL技术架构优化实践.pptx

    - **Bucket Join 改进**:通过预先对数据进行分桶(Bucketing),使得具有相同键的数据被分配到相同的分区中,从而避免 Shuffle,直接进行 Bucket Sort Merge Join,减少网络传输和数据重排的开销。对于不兼容的 ...

    基于MapReduce实现物品协同过滤算法(ItemCF).zip

    3. Shuffle和Sort阶段:Map阶段产生的中间结果按键排序,并分发到相应的Reducer。 4. Reduce阶段:Reducer函数根据物品ID聚合相似度,生成物品的推荐列表。对于每个用户,找出与其已评分物品最相似的N个物品,作为...

    hive参数优化文档

    - **Shuffle优化**:优化Shuffle过程中的排序与合并操作,提高数据传输效率。 3. **Shuffle阶段优化**: - 在Shuffle过程中优化数据传输方式,例如使用更高效的序列化/反序列化方法。 - 调整`io.sort.mb`参数,...

    基于Map_Reduce的分布式数据排序算法分析.pdf

    - Tree-based Sorting:构建二叉树或其他树结构,每个节点代表一部分数据,通过树的遍历实现排序。 每种算法的代价模型包括计算成本、内存使用和网络通信开销,选择哪种算法取决于数据特性、集群资源和性能需求。...

    spark调优.rar

    6. **Shuffle操作优化**:Shuffle是Spark性能瓶颈之一,减少不必要的Shuffle(如通过Coalesce操作)和优化Shuffle管理(如使用更高效的Hash Shuffle或Sort Shuffle)能显著提升性能。 7. **宽依赖与窄依赖**:理解...

    大数据各类性能调优

    - HiveCBO(Cost-Based Optimizer)可以根据成本模型自动选择最优查询计划,从而提高查询性能。 #### 12.6 Kafka ##### 12.6.1 Kafka性能调优 - Kafka的性能调优主要包括以下方面: - 调整分区数。 - 优化Broker...

    大数据面试100题.pdf

    sort-based shuffle在处理大量数据时,需要消耗更多的内存和I/O资源,并且对网络带宽要求较高,可能会成为瓶颈。 spark.storage.memoryFraction参数的含义,实际生产中如何调优: spark.storage.memoryFraction参数...

    hadoop_the_definitive_guide_3nd_edition

    Shuffle and Sort 205 The Map Side 206 The Reduce Side 207 Configuration Tuning 209 Task Execution 212 The Task Execution Environment 212 Speculative Execution 213 Output Committers 215 Task JVM Reuse ...

    VB编程资源大全(英文源码 其它)

    array.zip A simple program that shows how a two-dimensional array works within a VB program.&lt;END&gt;&lt;br&gt;70,Bubblesort.zip A simple Bubble Sort code that shows how the program works within a VB ...

Global site tag (gtag.js) - Google Analytics