`
bit1129
  • 浏览: 1069723 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark三十三】Spark Sort based Shuffle

 
阅读更多

1. N个partition,会产生N个MapTask,如果不指定ReduceTask的个数,那么默认情况下,ReduceTask个数也为N

2. N个partition,即N个MapTask,同时有N个ReduceTask(默认是N,不指定ReduceTask个数的情况下),那么会产生不大于N的.data文件以及不大于N的index文件。

即文件个数不受ReduceTask个数的影响。

 

如下所示:index个数是5,data个数是4,这些数字跟Reducer个数无关,而与Mapper个数有关。那么为什么data文件个数不是5个,这些文件是Mapper如何写入的?其中包含了什么内容,接下来分析。

 

hadoop$ pwd
/tmp/spark-local-20150129214306-5f46
hadoop$ cd spark-local-20150129214306-5f46/
hadoop$ ls
0c  0d  0e  0f  11  13  15  29  30  32  36  38
hadoop$ find . -name "*.index" | xargs ls -l
-rw-rw-r-- 1 hadoop hadoop 48  1月 29 21:43 ./0c/shuffle_0_4_0.index
-rw-rw-r-- 1 hadoop hadoop 48  1月 29 21:43 ./0d/shuffle_0_3_0.index
-rw-rw-r-- 1 hadoop hadoop 48  1月 29 21:43 ./0f/shuffle_0_1_0.index
-rw-rw-r-- 1 hadoop hadoop 48  1月 29 21:43 ./30/shuffle_0_0_0.index
-rw-rw-r-- 1 hadoop hadoop 48  1月 29 21:43 ./32/shuffle_0_2_0.index
hadoop$ find . -name "*.data" | xargs ls -l
-rw-rw-r-- 1 hadoop hadoop 884  1月 29 21:43 ./0c/shuffle_0_0_0.data
-rw-rw-r-- 1 hadoop hadoop 685  1月 29 21:43 ./15/shuffle_0_1_0.data
-rw-rw-r-- 1 hadoop hadoop 710  1月 29 21:43 ./29/shuffle_0_3_0.data
-rw-rw-r-- 1 hadoop hadoop 693  1月 29 21:43 ./36/shuffle_0_2_0.data

 

ShuffleMapTask执行逻辑

既然N个partition写入到若干个data文件中,那么当Reducer过来拉取数据时,必须得知道,属于它的那部分数据

 

data和index文件名中的reduceId都是0,原因是(见IndexShuffleBlockManager)

 

 

  def getDataFile(shuffleId: Int, mapId: Int): File = {
    ///相同的shuffleId和mapId指向同一个文件
    blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, 0))
  }

  private def getIndexFile(shuffleId: Int, mapId: Int): File = {
    ///相同的shuffleId和mapId指向同一个文件
    blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, 0))
  }

 

 

 

 

  def writePartitionedFile(
      blockId: BlockId,
      context: TaskContext,
      outputFile: File): Array[Long] = {

    // Track location of each range in the output file
    val lengths = new Array[Long](numPartitions)

    if (bypassMergeSort && partitionWriters != null) {
      // We decided to write separate files for each partition, so just concatenate them. To keep
      // this simple we spill out the current in-memory collection so that everything is in files.
      spillToPartitionFiles(if (aggregator.isDefined) map else buffer)
      partitionWriters.foreach(_.commitAndClose())
      var out: FileOutputStream = null
      var in: FileInputStream = null
      try {
        out = new FileOutputStream(outputFile, true)
        for (i <- 0 until numPartitions) {
          in = new FileInputStream(partitionWriters(i).fileSegment().file)
          val size = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled)
          in.close()
          in = null
          lengths(i) = size
        }
      } finally {
        if (out != null) {
          out.close()
        }
        if (in != null) {
          in.close()
        }
      }
    } else {
      // Either we're not bypassing merge-sort or we have only in-memory data; get an iterator by
      // partition and just write everything directly.
      ///this.partitionedIterator包含了数据集合
      for ((id, elements) <- this.partitionedIterator) {
        if (elements.hasNext) { //如果elements有元素
          val writer = blockManager.getDiskWriter(
            blockId, outputFile, ser, fileBufferSize, context.taskMetrics.shuffleWriteMetrics.get)
          for (elem <- elements) { ///遍历集合,写入文件
            writer.write(elem) ///追加数据,遍历结束后再关闭
          }
          writer.commitAndClose() ///关闭文件
          val segment = writer.fileSegment() ///这是什么东东?
          lengths(id) = segment.length
        }
      }
    }

    context.taskMetrics.memoryBytesSpilled += memoryBytesSpilled
    context.taskMetrics.diskBytesSpilled += diskBytesSpilled
    context.taskMetrics.shuffleWriteMetrics.filter(_ => bypassMergeSort).foreach { m =>
      if (curWriteMetrics != null) {
        m.shuffleBytesWritten += curWriteMetrics.shuffleBytesWritten
        m.shuffleWriteTime += curWriteMetrics.shuffleWriteTime
      }
    }

    lengths
  }

 

 

 

 

 

 

 

 

 

 

 

 

 

分享到:
评论

相关推荐

    Spark的shuffle调优

    spark.shuffle.blockTransferService netty shuffle过程中,传输数据的方式,两种选项,netty或nio,spark 1.2开始,默认就是netty,比较简单而且性能较高,spark 1.5开始nio就是过期的了,而且spark 1.6中会去除掉 ...

    SparkShuffle.xmind

    SparkShuffle思维导图,xmind

    spark shuffle简介

    3. **Spark 1.4.0**:虽然Hash-based Shuffle仍然是默认方式,但Spark开始尝试优化Shuffle,引入了Sort-based Shuffle。这种方式在写入阶段就对数据进行了排序,使得相同键值的数据在同一块中,减少了Shuffle过程中...

    Spark Shuffle优化-参数调优1

    这个参数控制 Spark 1.5 以后的 ShuffleManager,有三种可选的,hash、sort 和 tungsten-sort。sort-based ShuffleManager 会更高效实用内存,并且避免产生大量的 map side 磁盘文件,从 Spark 1.2 开始就是默认的...

    spark-2.2.0-yarn-shuffle.jar

    spark-2.2.0-yarn-shuffle.jar

    Spark 的两种核心 Shuffle 详解.pdf

    Spark Shuffle 分为两种:一种是基于 Hash 的 Shuffle;另一种是基于 Sort 的 Shuffle。

    Spark-shuffle机制.pdf

    - shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold(默认200)。 - 不是排序类的shuffle算子。 - 每个下游task创建一个临时磁盘文件,数据按照key的hash值写入对应的文件。 - 写入磁盘前先...

    【Spark内核篇04】SparkShuffle解析1

    3. **Bypass Merge SortShuffle**: 当满足特定条件(map任务数量小于`spark.shuffle.sort.bypassMergeThreshold`且非聚合类shuffle算子)时,Spark会使用bypass机制,跳过排序过程。每个task会为每个reducer创建一个...

    spark shuffle原理

    Spark Shuffle是大数据处理框架Apache Spark中的关键组成部分,它在数据处理流程中扮演着至关重要的角色,连接了Map和Reduce操作,决定了数据如何在集群中重新分布。 Shuffle过程涉及到数据的分区、排序和网络传输,...

    sparkshuffle原理、shuffle操作问题解决和参数调优.doc

    当数量小于等于 spark.shuffle.sort.bypassMergeThreshold 参数的值时(默认为 200),就会启用 bypass 机制。在普通运行机制下,数据会先写入一个内存数据结构中,然后根据不同的 Shuffle 算子选择不同的数据结构。...

    spark-1.6.1-yarn-shuffle.jar

    spark-1.6.1-yarn-shuffle.jar 下载。spark-1.6.1-yarn-shuffle.jar 下载。spark-1.6.1-yarn-shuffle.jar 下载。

    Apache Spark Shuffle I/O 在 Facebook 的优化

    我们都知道,Shuffle 操作在 Spark 中是一种昂贵的操作。在 Facebook,单个 Job 的 Shuffle 就可能往磁盘中写入 300TB 的数据;而且 shuffle reads 也是一种低效的操作,这会大大延长作业的整体执行时间,并且消耗...

    【Spark调优篇03】Spark之Shuffle调优1

    SortShuffleManager在某些情况下可以跳过排序,即当Shuffle reduce task数量小于`spark.shuffle.sort.bypassMergeThreshold`参数值时。这可以减少排序的计算开销,但可能导致更多的磁盘文件生成。如果不需要排序,...

    spark-shuffle重要类和方法介绍

    SortShuffleManager 是 Spark 中的一种重要类,基于 Sort-base 的 Shuffle,用于在 Mapper 中的每一个 ShuffleMapTask 中产生两个文件:Data 文件和 Index 文件。SortShuffleManager 的主要方法包括: * ...

    spark-ExternalShuffleService配置使用及性能提升

    Spark ExternalShuffleService 配置使用及性能提升 Spark ExternalShuffleService 是一个长期存在于 NodeManager 进程中的辅助服务,旨在提升 Spark 系统中 Shuffle 计算的性能。该服务可以减少 Executor 的压力,...

    Spark的Shuffle总结分析

    一、shuffle原理分析 1.1 shuffle概述 Shuffle就是对数据进行重组,由于分布式计算的特性和要求,在实现细节上...而Spark也会有自己的shuffle实现过程。 1.2 Spark中的 shuffle 介绍 在DAG调度的过程中,Stage 阶段的

    Apache Spark Shuffle I/O 在 Facebook 的优化 [PDF]

    我们都知道,Shuffle 操作在 Spark 中是一种昂贵的操作。在 Facebook,单个 Job 的 Shuffle 就可能往磁盘中写入 300TB 的数据;而且 shuffle reads 也是一种低效的操作,这会大大延长作业的整体执行时间,并且消耗...

    spark-network-shuffle_2.11-2.1.3-SNAPSHOT.jar

    spark-network-shuffle_2.11-2.1.3-SNAPSHOT.jar

    适配CDH6.3.2的Spark3.2.2

    6. **性能优化**:Spark3.2.2包含了一系列性能优化,如减少网络传输、提高shuffle效率、内存管理改进等。此外,还引入了Tungsten项目的新的编码器技术,进一步减少了内存占用。 7. **机器学习库MLlib**:Spark ...

    源码 spark shuffle service在中通的优化实践.docx

    【Spark Shuffle Service在中通的优化实践】 Spark Shuffle Service是Apache Spark中用于处理Stage间数据交换的关键组件。在中通快递的业务场景中,随着业务量的快速增长,传统的基于Hive+MapReduce的离线计算方式...

Global site tag (gtag.js) - Google Analytics