`
bit1129
  • 浏览: 1076881 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark六十九】Hash Based Shuffle之二Shuffle Write + ConsolidationFiles

 
阅读更多

在http://bit1129.iteye.com/blog/2180214一文中,分析了Hash based shuffle的写过程。其中分析的是,未开始map端产生的数据文件做consolidate的流程,不开启则会创建M*R个文件,如果M和R都很大,比如2000*600,那么120万个小文件的读写将是性能瓶颈,这也是Hadoop采用sort based shuffle以及Spark在1.2的时候,将sort based shuffle作为默认shuffle算法,因为它们产生的输出文件就是确定的R个。

在Hash based shuffle中,Spark通过开启consolidationFiles选项,可以将文件进行consolidation,此时产生的文件个数就是M个

 

本文分析Spark开启consolidationFiles选项,mapper将文件进行consolidate的原理和过程

 

1. 总体框架图



 

上图基本准确


2.实例代码:

package spark.examples.shuffle

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._

object SparkHashShuffleConsolidationFile {
  def main(args: Array[String]) {
    System.setProperty("hadoop.home.dir", "E:\\devsoftware\\hadoop-2.5.2\\hadoop-2.5.2");
    val conf = new SparkConf()
    conf.setAppName("SparkWordCount")
    conf.setMaster("local")
    //Hash based Shuffle;
    conf.set("spark.shuffle.manager", "hash");

    //使用文件聚合
    conf.set("spark.shuffle.consolidateFiles", "true");

    val sc = new SparkContext(conf)
    //10个以上的分区,每个分区对应一个Map Task
    //读取一个1M的文件
    val rdd = sc.textFile("file:///D:/1.txt", 10); //十个以上的mapper分区
    val rdd1 = rdd.flatMap(_.split(" "))
    val rdd2 = rdd1.map((_, 1))
    val rdd3 = rdd2.reduceByKey(_ + _, 6); ///6个Reducer Task
    rdd3.saveAsTextFile("file:///D:/wordcount" + System.currentTimeMillis());

    println(rdd3.toDebugString)
    sc.stop
  }
}

2.1 结果产生了六个输出文件:

C:\Users\hadoop\AppData\Local\Temp\spark-local-20150219165442-cf2d>tree /f
文件夹 PATH 列表
卷序列号为 4E9D-390C
C:.
├─02
│      merged_shuffle_0_5_0
│
├─03
│      merged_shuffle_0_4_0
│
├─04
│      merged_shuffle_0_3_0
│
├─05
│      merged_shuffle_0_2_0
│
├─06
│      merged_shuffle_0_1_0
│
├─07
│      merged_shuffle_0_0_0
│
├─0c
├─0d
├─0e
├─11
└─13

2.2 将reducer的个数改为8,结果产生了8个输出文件

 

C:\Users\hadoop\AppData\Local\Temp\spark-local-20150219165701-135d>tree /f
文件夹 PATH 列表
卷序列号为 4E9D-390C
C:.
├─00
│      merged_shuffle_0_7_0
│
├─01
│      merged_shuffle_0_6_0
│
├─02
│      merged_shuffle_0_5_0
│
├─03
│      merged_shuffle_0_4_0
│
├─04
│      merged_shuffle_0_3_0
│
├─05
│      merged_shuffle_0_2_0
│
├─06
│      merged_shuffle_0_1_0
│
├─07
│      merged_shuffle_0_0_0
│
├─0c
├─0d
├─0e
├─11
└─13

 

2.3系统内核为4,将mater的local改为local[3],reducer个数仍然为8,结果产生了24个文件

C:\Users\hadoop\AppData\Local\Temp\spark-local-20150219171357-b6b4>tree /f
文件夹 PATH 列表
卷序列号为 4E9D-390C
C:.
├─00
│      merged_shuffle_0_5_2
│      merged_shuffle_0_6_1
│      merged_shuffle_0_7_0
│
├─01
│      merged_shuffle_0_4_2
│      merged_shuffle_0_5_1
│      merged_shuffle_0_6_0
│
├─02
│      merged_shuffle_0_3_2
│      merged_shuffle_0_4_1
│      merged_shuffle_0_5_0
│
├─03
│      merged_shuffle_0_2_2
│      merged_shuffle_0_3_1
│      merged_shuffle_0_4_0
│
├─04
│      merged_shuffle_0_1_2
│      merged_shuffle_0_2_1
│      merged_shuffle_0_3_0
│
├─05
│      merged_shuffle_0_0_2
│      merged_shuffle_0_1_1
│      merged_shuffle_0_2_0
│
├─06
│      merged_shuffle_0_0_1
│      merged_shuffle_0_1_0
│
├─07
│      merged_shuffle_0_0_0
│
├─0c
├─0d
├─0e
├─11
├─13
├─3e
│      merged_shuffle_0_7_2
│
└─3f
        merged_shuffle_0_6_2
        merged_shuffle_0_7_1

 

2.4系统内核为4,将mater的local改为local[6],reducer个数仍然为8,结果产生了48个文件

文件夹 PATH 列表
卷序列号为 4E9D-390C
C:.
├─00
│      merged_shuffle_0_2_5
│      merged_shuffle_0_3_4
│      merged_shuffle_0_4_3
│      merged_shuffle_0_5_2
│      merged_shuffle_0_6_1
│      merged_shuffle_0_7_0
│      
├─01
│      merged_shuffle_0_1_5
│      merged_shuffle_0_2_4
│      merged_shuffle_0_3_3
│      merged_shuffle_0_4_2
│      merged_shuffle_0_5_1
│      merged_shuffle_0_6_0
│      
├─02
│      merged_shuffle_0_0_5
│      merged_shuffle_0_1_4
│      merged_shuffle_0_2_3
│      merged_shuffle_0_3_2
│      merged_shuffle_0_4_1
│      merged_shuffle_0_5_0
│      
├─03
│      merged_shuffle_0_0_4
│      merged_shuffle_0_1_3
│      merged_shuffle_0_2_2
│      merged_shuffle_0_3_1
│      merged_shuffle_0_4_0
│      
├─04
│      merged_shuffle_0_0_3
│      merged_shuffle_0_1_2
│      merged_shuffle_0_2_1
│      merged_shuffle_0_3_0
│      
├─05
│      merged_shuffle_0_0_2
│      merged_shuffle_0_1_1
│      merged_shuffle_0_2_0
│      
├─06
│      merged_shuffle_0_0_1
│      merged_shuffle_0_1_0
│      
├─07
│      merged_shuffle_0_0_0
│      
├─0c
├─0d
├─0e
├─11
├─13
├─3b
│      merged_shuffle_0_7_5
│      
├─3c
│      merged_shuffle_0_6_5
│      merged_shuffle_0_7_4
│      
├─3d
│      merged_shuffle_0_5_5
│      merged_shuffle_0_6_4
│      merged_shuffle_0_7_3
│      
├─3e
│      merged_shuffle_0_4_5
│      merged_shuffle_0_5_4
│      merged_shuffle_0_6_3
│      merged_shuffle_0_7_2
│      
└─3f
        merged_shuffle_0_3_5
        merged_shuffle_0_4_4
        merged_shuffle_0_5_3
        merged_shuffle_0_6_2
        merged_shuffle_0_7_1
        

 

 

结论:使用consolidateFiles选项后,

1)不论Mapper多少,产生的文件个数以merged_shuffle_开头,

2)个数为Reducer的个数*并行度(这里的并行度有可能大于系统内核数)

3)产出的结果为乱码,也就是压缩过的

 

merged_shuffle_m_r_n

m: shuffleId

r: reducerId

n: fileId

 

3. 流程

3.1 ShuffleMapTask的runTask方法

 

 override def runTask(context: TaskContext): MapStatus = {
    // Deserialize the RDD using the broadcast variable.
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

    metrics = Some(context.taskMetrics)
    var writer: ShuffleWriter[Any, Any] = null
    try {
      val manager = SparkEnv.get.shuffleManager
      ///根据mapper的一个PartionId获取一个writer对象,此处是HashShuffleWriter
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
      ///将partition数据转换成iteratable对象,传给writer.write方法
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
      return writer.stop(success = true).get
    } catch {
      case e: Exception =>
        try {
          if (writer != null) {
            writer.stop(success = false)
          }
        } catch {
          case e: Exception =>
            log.debug("Could not stop writer", e)
        }
        throw e
    }
  }
 

 

3.2 调用HashShuffleWriter的write方法

 

  override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
    val iter = if (dep.aggregator.isDefined) {
      if (dep.mapSideCombine) { ///map端combine
        dep.aggregator.get.combineValuesByKey(records, context)
      } else {
        records
      }
    } else {
      require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
      records
    }

    for (elem <- iter) {
      //根据Reducer的分区算法获取当前遍历的elem应该属于哪个分区
      val bucketId = dep.partitioner.getPartition(elem._1)
      ///根据bucketId得到要执行写操作的writer
      ///一个partition里面的数据elem不同会写向不同的reducer writer
      ///因为多个mapper同时操作,那么如何保证它们写入是不冲突呢?
      shuffle.writers(bucketId).write(elem)
    }
  }
 

 

3.3 执行shuffle.writers(bucketId).write(elem)方法

 

3.3.1 shuffle是如下代码创建的

 

 

  private val shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser,
    writeMetrics)

 

 

shuffleBlockManager是FileShuffleBlockManager类型的对象,在forMapTask这个方法中,就使用了consolidateFiles选项,默认为false,需要将它设置为true

 

 

  private val consolidateShuffleFiles = conf.getBoolean("spark.shuffle.consolidateFiles", false)
 

 

3.3.2 FileShuffleBlockManager.forMapTask方法的代码

 

 

//shuffle变量的类型是ShuffleWriterGroup 
//shuffleId
//mapId:mapper partition的index
//numBuckets:reducer的partition总数
//serializer
def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer,
      writeMetrics: ShuffleWriteMetrics) = {
    new ShuffleWriterGroup {
      shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))
      ///suffleState仅仅在consolidateShuffleFiles为true时使用
      ///
      private val shuffleState = shuffleStates(shuffleId)
      private var fileGroup: ShuffleFileGroup = null
      //writers的元素个数都为Reduce个数个
      val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
        ///使用了consolidateShuffleFiles,表示每个mapper都是复用R个文件?
        ///首先获取一个未使用的FileGroup
        fileGroup = getUnusedFileGroup()
        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
         ///blockId,类型为ShuffleBlockId,bucketId就是reduceId 
         val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
         ///获取一个BlockObjectWriter 
         blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize,
            writeMetrics)
        }
      } else {
        //没用shuffleState变量
        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
          val blockFile = blockManager.diskBlockManager.getFile(blockId)
          // Because of previous failures, the shuffle file may already exist on this machine.
          // If so, remove it.
          if (blockFile.exists) {
            if (blockFile.delete()) {
              logInfo(s"Removed existing shuffle file $blockFile")
            } else {
              logWarning(s"Failed to remove existing shuffle file $blockFile")
            }
          }
          blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize, writeMetrics)
        }
      }
 

 3.3.3 writers的初始化是在if(consolidateShuffleFiles)代码中,首先调用getUnusedFileGroup获取一个ShuffleFileGroup对象,然后再执行如下方法,填充长度为numBuckets的数组(数组类型为BlockObjectWriter)。

 

      val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
        ///获取未使用的FileGroup,如果已经在队列里,则直接返回,否则新创建一个
        fileGroup = getUnusedFileGroup()
        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
          ///fileGroup(bucketId)返回的是一个文件,应该是往这个bucketId中写入数据的文件
          blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize,
            writeMetrics)
        }

 

3.3.4getUnusedFileGroup方法

 

 

     
   //这里面ShuffleFileGroup有重用的机制在里面,当一个ShuffleFileGroup被回收后,它可以被其它的map task使用,因此,
   //多个map task有可能往同一个ShuffleFileGroup中写入数据
   //正因为如此,ShuffleFileGroup中的每个文件是可能分段的,每个map task写入一段(称为FileSegment)
   //虽然ShuffleFileGroup中的每个File都可能有多个FileSegment,但是它们都属于一个Reducer要处理的数据,因此Reducer只要去这整个文件即可,而无需考虑文件内容已经分段,而这正是hash based shuffle write的好处,否则
   //如果考虑排序的话,那么需要对所有的segment进行merge 排序
   //
   private def getUnusedFileGroup(): ShuffleFileGroup = {
        //shuffleState.unusedFileGroups是ConcurrentLinkedQueue,poll操作是从队列头取走一个
        ///这里是一个优化,unusedFileGroups是一个队列,有取就有放,recycleFileGroup方法在回收一个FileGroup后就放入这个unusedFileGroups队列中,这样的目的是
        ///尽可能的复用已经创建的ShuffleFileGroup,但是也引入了不同的map task往同一个ShuffleFileGroup中写入文件
        ///复用也就意味着每个Mapper Task执行时,使用的ShuffleFileGroup的是一样的(不会因为mapper partition的不同而导致FileGroup不同)
        val fileGroup = shuffleState.unusedFileGroups.poll()
        //如果fileGroup不为空,则复用;否则新建一个
        if (fileGroup != null) fileGroup else newFileGroup()
      }

      ///新创建一个FileGroup
      private def newFileGroup(): ShuffleFileGroup = {
        //由于并行度,而导致这段代码同时运行,因此fileId会有多个,比如上面的local[3]产生了3个fileId
        //多个Map Task会共享这个ShuffleStage对象
        val fileId = shuffleState.nextFileId.getAndIncrement()
        //files是长度为numBuckets的File数组,因此,一个ShuffleFileGroup有R个File对象
        //map task同时运行会产生独立的fileId,可以把fileId想象成避免多线程同时操作同一个文件而引入的一个避免互斥的解决方法
        val files = Array.tabulate[File](numBuckets) { bucketId =>
          //文件名定义:一共有R个
          val filename = physicalFileName(shuffleId, bucketId, fileId)
          ///获取文件
          blockManager.diskBlockManager.getFile(filename)
        }
        //构造ShuffleFileGroup对象,包含了R个文件
        //构造ShuffleFileGroup是的入参是fileId以及files,这个files中每个File都有fileId作为其文件名的一部分
        val fileGroup = new ShuffleFileGroup(shuffleId, fileId, files)
        shuffleState.allFileGroups.add(fileGroup)
        fileGroup
      }

 

 phyisicalFileName:

 

  private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = {
    "merged_shuffle_%d_%d_%d".format(shuffleId, bucketId, fileId)
  }

 可见此时的shuffle结果文件名,有shuffleId,bucketId和fileId,这样的文件一共多少个?

 

 3.3.5构造好了ShuffleFileGroup对象,执行如下的代码逻辑:

 

        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
          //fileGroup是个ShuffleFileGroup对象,fileGroup(bucketId)是什么含义?
          //返回DiskBlockObjectWriter
          blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize,
            writeMetrics)
        }

 

3.3.6 fileGroup(bucketId)

fileGroup对象是一个有shuffleId,fileId和files作为入参构造的,其中files是个长度为R的File数组,每个File都有fileId作为文件名的一部分,而fileGroup(bucketId)使用了Scala的apply语法,这个语句会调用ShuffleFileGroup的apply方法,apply方法定义如下:

   def apply(bucketId: Int) = files(bucketId)

 可见它是去files数组中的bucketId对应的File

 

 

3.3.7

执行语句

 blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize, writeMetrics)

 

将返回DiskBlockObjectWriter,它里面挂了一个唯一的文件(注意:这个文件有可能已经有数据了)

 

3.3.8 DiskBlockObjectWriter的write方法

 

  override def write(value: Any) {
    if (!initialized) {
      open()
    }

    objOut.writeObject(value) //调用SerializationStream,写入数据

    if (writesSinceMetricsUpdate == 32) { //每调用write32次,则调用upateBytesWritten更新一下写入数据的字节数
      writesSinceMetricsUpdate = 0
      updateBytesWritten()
    } else {
      writesSinceMetricsUpdate += 1
    }
  }

 

 3.3.9在DiskBlockObjectWriter的类说明中说明了这个类有可能是通过追加的方式写入的,

 /**
   * Cursors used to represent positions in the file.
   *
   * xxxxxxxx|--------|---       |
   *         ^        ^          ^
   *         |        |        finalPosition
   *         |      reportedPosition
   *       initialPosition
   *
   * initialPosition: Offset in the file where we start writing. Immutable.
   * reportedPosition: Position at the time of the last update to the write metrics. 
   * finalPosition: Offset where we stopped writing. Set on closeAndCommit() then never changed.
   * -----: Current writes to the underlying file.
   * xxxxx: Existing contents of the file.
   */

 

 

 

 

  • 大小: 186.9 KB
分享到:
评论

相关推荐

    spark shuffle简介

    3. **Spark 1.4.0**:虽然Hash-based Shuffle仍然是默认方式,但Spark开始尝试优化Shuffle,引入了Sort-based Shuffle。这种方式在写入阶段就对数据进行了排序,使得相同键值的数据在同一块中,减少了Shuffle过程中...

    Spark 的两种核心 Shuffle 详解.pdf

    Spark Shuffle 分为两种:一种是基于 Hash 的 Shuffle;另一种是基于 Sort 的 Shuffle。

    Spark-shuffle机制.pdf

    #### 二、Spark Shuffle与Hadoop MapReduce Shuffle比较 1. **Hadoop MapReduce Shuffle流程** - 在Map阶段,每个maptask都有一个内存缓冲区,默认大小为100MB,用于存储map任务的输出结果。 - 当缓冲区接近满时...

    Spark Shuffle优化-参数调优1

    Shuffle 操作是 Spark 中最耗时的操作之一,而 shuffle 优化是Spark 优化的关键。 本文将对 Spark Shuffle 优化的参数进行详细的介绍。 spark.reducer.maxSizeInFlight Spark 的 reducer 任务的 buffer 缓冲大小...

    Spark技术内幕深入解析Spark内核架构设计与实现原理

    10. **Spark Shuffle优化**:包括减少shuffle数据量、使用更高效的shuffle机制如Sort-based Shuffle和Hash-based Shuffle,以及合理配置executor内存和并行度,以提高性能。 11. **资源管理和调度**:Spark可以与...

    spark调优介绍

    5. 使用sort-based shuffle代替hash-based shuffle:sort-based shuffle可以提供更好的排序效果,减少数据交换。 以上策略需要结合实际应用场景灵活调整,持续监控Spark应用的性能指标,如executor内存使用情况、...

    spark调优.rar

    6. **Shuffle操作优化**:Shuffle是Spark性能瓶颈之一,减少不必要的Shuffle(如通过Coalesce操作)和优化Shuffle管理(如使用更高效的Hash Shuffle或Sort Shuffle)能显著提升性能。 7. **宽依赖与窄依赖**:理解...

    大规模游戏社交网络节点相似性算法及其应用-8-5 美团 Spark Shuffle 架构演进.zip

    随着技术的发展,美团在Spark Shuffle上引入了内存优化策略,如使用RDMA(Remote Direct Memory Access)技术减少网络传输延迟,以及采用更高效的Hash Partitioner和Bucketing策略降低数据冲突。此外,美团还引入了...

    spark原理示意图,执行计划,shuffle,架构,检查点,缓存,广播

    为了优化shuffle过程,Spark使用Hash Shuffling和Sort Shuffling策略,后者通过排序保证了数据的稳定性。 Spark的架构是基于Master-Worker模式,由一个或多个Driver(通常是用户的应用程序)和多个Executor组成。...

    Spark性能优化基础篇

    shuffle调优则需要理解Spark的shuffle过程,通过调整hash分区策略、增加shuffle write buffer大小等手段,优化数据传输和合并过程。 总的来说,Spark性能优化是一个综合的过程,需要从开发阶段就考虑效率问题,并...

    SparkDemo12

    - **Shuffle优化**:减少不必要的数据传输,如减少shuffle write和优化hash partitioner。 7. **Spark配置** - `spark.master`:指定Spark运行的模式和地址。 - `spark.executor.instances`:设置Executor的数量...

    hashin-strain-3d_hashin_三维hashin_三维hashin失效_失效准则_3D—Hashin_

    **三维Hashin失效准则详解** 在复合材料领域,失效分析是至关重要的,它关系到材料的性能预测和结构安全。Hashin失效准则是一种广泛应用的多向复合材料失效理论,由Shlomo Hashin于1962年提出,主要用于评估多向受...

    Spark大数据商业实战三部曲_内核解密_商业案例_性能调优 实例源码

    4. Shuffle优化:设置合适的hash分区器和并行度,减少数据碰撞,提高shuffle效率。 5. SQL查询优化:使用广播变量、避免冗余计算,以及对JOIN操作进行优化。 四、实例源码分析 压缩包中的`code-of-spark-big-data-...

    各种Hash函数(JAVA版)

    RS-Hash Function Value: " + ghl.RSHash(key)); System.out.println(" 2. JS-Hash Function Value: " + ghl.JSHash(key)); System.out.println(" 3. PJW-Hash Function Value: " + ghl.PJWHash(key)); System....

    hashcat-5.1.0.rar

    This version combines the previous CPU-based hashcat (now called hashcat-legacy) and GPU-based oclHashcat. Hashcat is released as open source software under the MIT license. Current Version ...

    HASHIN.rar_ABAQUS_Hashin失效准则 abaqus_abaqus hashin_abaqus 三维Hashi

    标题中的"HASHIN.rar_ABAQUS_Hashin失效准则 abaqus_abaqus hashin_abaqus 三维Hashi"表明这是一个关于ABAQUS软件中应用Hashin失效准则进行三维分析的示例或教程。ABAQUS是一款广泛应用的有限元分析软件,尤其在结构...

    hashcat [hashcat wiki].rar

    This version combines the previous CPU-based hashcat (now called hashcat-legacy) and GPU-based oclHashcat. Hashcat is released as open source software under the MIT license. Current Version ...

    Spark Adaptive Execution

    在连接操作方面,Spark SQL支持多种连接策略,包括广播哈希连接(Broadcast Hash Join)和排序合并连接(Sort Merge Join)。自适应执行引擎能够根据中间结果的实际大小,优化连接策略的选择。比如,Spark SQL默认的...

    hashcat for windows

    Hashcat is the self-proclaimed world's fastest password recovery tool. It had a proprietary code base until 2015, but is now released as free software. Versions are available for Linux, OS X, and ...

    GeoHash核心原理解析

    在GeoHash编码中,二进制编码的纬度和经度被交替组合成一个新的二进制序列,然后将这个序列转换为一个base32编码的字符串。这种编码方式使得可以快速地比较字符串来确定地理位置的相似性,以及进行前缀匹配来查询...

Global site tag (gtag.js) - Google Analytics