`
bit1129
  • 浏览: 1069944 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark七十一】Hash Based Shuffle之三Shuffle Write Map side combine

 
阅读更多

在这篇http://bit1129.iteye.com/blog/2186325博文中,分析了hash based shuffle write开启consolidationFiles选项的过程。本文,则关注将Iteratable

 

1. 如下代码是HashShuffleWriter.write方法

在将partition的数据写入到磁盘前,进行map端的shuffle

  /** Write a bunch of records to this task's output */
  override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
    ///对输入的partition对应Iteratable集合进行map端combine
    val iter = if (dep.aggregator.isDefined) {
      if (dep.mapSideCombine) { //如果定义了dep.aggregator以及dep.mapSideCombine则进行map端combine
        dep.aggregator.get.combineValuesByKey(records, context)
      } else {
        records
      }
    } else {
      require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
      records
    }

    for (elem <- iter) {
      val bucketId = dep.partitioner.getPartition(elem._1)
      shuffle.writers(bucketId).write(elem)
    }
  }

 

 

2. 调用dep.aggregator.get.combineValuesByKey(records, context)进行map端combine

其中aggregator是Aggregator类型的对象,它在构造时需要传入如下参数:

 

 

case class Aggregator[K, V, C] (
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C) {

//代码体.....

}

 

3. Aggregator.combineValuesByKey方法体:

 

 @deprecated("use combineValuesByKey with TaskContext argument", "0.9.0")
  def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]): Iterator[(K, C)] =
    combineValuesByKey(iter, null)

  //iter:是map端输入的可遍历的数据集合
  def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
                         context: TaskContext): Iterator[(K, C)] = {

    if (!isSpillEnabled) { //不启用spill到磁盘,那么数据集合中的所有数据将在内存中进行combine
      val combiners = new AppendOnlyMap[K,C] //combiners是一个AppendOnlyMap, 可以想象成内存内的HashMap
      var kv: Product2[K, V] = null
      val update = (hadValue: Boolean, oldValue: C) => {
        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
      }
      while (iter.hasNext) { //遍历集合
        kv = iter.next()
        //kv._1是Key,changeValue接收两个参数,kv键以及update
        combiners.changeValue(kv._1, update) 
      }
      combiners.iterator //返回
    } else {
      //构造参数哪来的?
      val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
      
      combiners.insertAll(iter)
      // Update task metrics if context is not null
      // TODO: Make context non optional in a future release
      Option(context).foreach { c =>
        c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled
        c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled
      }
      combiners.iterator
    }
  }

 

 

4. 只在内存内combine(AppendOnlyMap)

 

4.1 使用AppendOnlyMap,代码的关键是update函数,以及combiner.changeValue(kv._1, update)

 

      val combiners = new AppendOnlyMap[K,C]
      var kv: Product2[K, V] = null
      ///通过闭包特性,update函数内可以访问kv
      val update = (hadValue: Boolean, oldValue: C) => {
        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
      }
      while (iter.hasNext) {
        kv = iter.next()
        //kv键值对的key以及update函数作为入参
        combiners.changeValue(kv._1, update)
      }
      combiners.iterator

 

4.2 AppendOnlyMap的changeValue方法

 

/**
   * Set the value for key to updateFunc(hadValue, oldValue), where oldValue will be the old value
   * for key, if any, or null otherwise. Returns the newly updated value.
   */
  //key:kv键值对的key,
  //updateFunc的函数类型是(Boolean,V)=>V
  def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
    assert(!destroyed, destructionMessage)
    val k = key.asInstanceOf[AnyRef]
    if (k.eq(null)) { ///如果键值null,对键为空的处理
      if (!haveNullValue) {
        incrementSize()
      }
      nullValue = updateFunc(haveNullValue, nullValue) //
      haveNullValue = true
      return nullValue
    }
    //通过对k进行rehash算出它Hash值
    var pos = rehash(k.hashCode) & mask
    var i = 1
    while (true) {
      //data是数组
      //kv期望放到2*pos位置,其中k放到2*pos的位置,v放到2*pos+1的位置
      val curKey = data(2 * pos)
  
      //if-else if-else做寻址探测
      //如果data(2*pos)位置上的key已经存在,且与k相同,那么表示它们需要聚合
      if (k.eq(curKey) || k.equals(curKey)) {
        //调用updateFunc,入参是值已经存在(true),那个位置上的值(old value)
        val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
        //赋心智
        data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
        //返回新值
        return newValue
      } else if (curKey.eq(null)) {
        //HashMap上data(2 * pos)的值为null,
        //将kv的v写入,updateFunc的入参是值不存在,同时旧值为null
        val newValue = updateFunc(false, null.asInstanceOf[V])
        //将data(2*pos)的值由null改为k
        data(2 * pos) = k
        //设置新值
        data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
        //AppendOnlyMap的长度增1,表示新元素加入
        incrementSize()
        return newValue
      } else {
        //重新寻址,寻址的算法,每次的步长是平方探测,1,2,4,8?
        val delta = i
        pos = (pos + delta) & mask
        i += 1
      }
    }
    null.asInstanceOf[V] // Never reached but needed to keep compiler happy
  }

 

4.3 incrementSize肩负着resize table的职责

  /** Increase table size by 1, rehashing if necessary */
  private def incrementSize() {
    curSize += 1
    if (curSize > growThreshold) {
      growTable()
    }
  }

 

 

 

5. 内存+磁盘combine(ExternalAppendOnlyMap)

特点:

1. 内存放不下可以放到磁盘

2. 放到磁盘前,首先进行排序

3. 最后对所有spill到磁盘的文件做归并排序

 

 6. 向MapOutputTrackerMaster汇报写入数据的位置以及文件中FileSegment的位置

http://www.cnblogs.com/fxjwind/p/3522219.html

分享到:
评论

相关推荐

    spark shuffle简介

    3. **Spark 1.4.0**:虽然Hash-based Shuffle仍然是默认方式,但Spark开始尝试优化Shuffle,引入了Sort-based Shuffle。这种方式在写入阶段就对数据进行了排序,使得相同键值的数据在同一块中,减少了Shuffle过程中...

    Spark 的两种核心 Shuffle 详解.pdf

    Spark Shuffle 分为两种:一种是基于 Hash 的 Shuffle;另一种是基于 Sort 的 Shuffle。

    Spark Shuffle优化-参数调优1

    Shuffle 操作是 Spark 中最耗时的操作之一,而 shuffle 优化是Spark 优化的关键。 本文将对 Spark Shuffle 优化的参数进行详细的介绍。 spark.reducer.maxSizeInFlight Spark 的 reducer 任务的 buffer 缓冲大小...

    Spark-shuffle机制.pdf

    **Spark Shuffle**是指在Spark应用程序执行过程中,数据从一个节点或分区移动到另一个节点或分区的过程。这种数据重分布通常发生在诸如`groupByKey`, `reduceByKey`, `join`等操作中。Shuffle操作涉及到大量的磁盘I/...

    Spark技术内幕深入解析Spark内核架构设计与实现原理

    10. **Spark Shuffle优化**:包括减少shuffle数据量、使用更高效的shuffle机制如Sort-based Shuffle和Hash-based Shuffle,以及合理配置executor内存和并行度,以提高性能。 11. **资源管理和调度**:Spark可以与...

    hash_map的简单应用

    hash_map

    hash_map的详解

    ### hash_map详解 #### 0. 为什么需要hash_map? 在软件开发中,经常会遇到需要高效存储和查找键值对(key-value)的情况。例如,在管理人物名称及其相关信息时,我们希望能够快速地添加、查找和更新数据。传统的...

    spark调优.rar

    6. **Shuffle操作优化**:Shuffle是Spark性能瓶颈之一,减少不必要的Shuffle(如通过Coalesce操作)和优化Shuffle管理(如使用更高效的Hash Shuffle或Sort Shuffle)能显著提升性能。 7. **宽依赖与窄依赖**:理解...

    spark调优介绍

    5. 使用sort-based shuffle代替hash-based shuffle:sort-based shuffle可以提供更好的排序效果,减少数据交换。 以上策略需要结合实际应用场景灵活调整,持续监控Spark应用的性能指标,如executor内存使用情况、...

    c++中hash_table以及std::map应用案例

    代码重点是hash_table,附加std::map与其做对比,实现的是一条sql语句:select c_nationkey, c_mktsegment, count(*), max(c_acctbal) from aaa_customer_1g group by c_nationkey, c_mktsegment order by c_...

    spark原理示意图,执行计划,shuffle,架构,检查点,缓存,广播

    为了优化shuffle过程,Spark使用Hash Shuffling和Sort Shuffling策略,后者通过排序保证了数据的稳定性。 Spark的架构是基于Master-Worker模式,由一个或多个Driver(通常是用户的应用程序)和多个Executor组成。...

    大规模游戏社交网络节点相似性算法及其应用-8-5 美团 Spark Shuffle 架构演进.zip

    Spark是一种流行的分布式大数据处理框架,Shuffle是其中的重要阶段,它发生在Map和Reduce任务之间,负责重新组织数据以确保正确性。在处理大规模数据时,Shuffle的效率和稳定性对整个作业性能至关重要。 早期的...

    Spark大数据商业实战三部曲_内核解密_商业案例_性能调优 实例源码

    《Spark大数据商业实战三部曲》是一套深度探讨Spark技术在商业环境中的应用、内核解析及性能优化的实例源码集合。这份压缩包包含了丰富的实践案例和代码,旨在帮助读者深入理解Spark的核心机制,掌握大数据处理的...

    Hash_map 实现源码

    哈希映射(Hash Map)是一种常见的数据结构,它提供了键值对(Key-Value Pair)的快速存储和检索功能。在C++中,STL(Standard Template Library)提供了一个名为`std::unordered_map`的容器,它是基于哈希表实现的...

    HASHIN.rar_ABAQUS_Hashin失效准则 abaqus_abaqus hashin_abaqus 三维Hashi

    标题中的"HASHIN.rar_ABAQUS_Hashin失效准则 abaqus_abaqus hashin_abaqus 三维Hashi"表明这是一个关于ABAQUS软件中应用Hashin失效准则进行三维分析的示例或教程。ABAQUS是一款广泛应用的有限元分析软件,尤其在结构...

    Spark性能优化基础篇

    shuffle调优则需要理解Spark的shuffle过程,通过调整hash分区策略、增加shuffle write buffer大小等手段,优化数据传输和合并过程。 总的来说,Spark性能优化是一个综合的过程,需要从开发阶段就考虑效率问题,并...

    hashin-strain-3d_hashin_三维hashin_三维hashin失效_失效准则_3D—Hashin_

    **三维Hashin失效准则详解** 在复合材料领域,失效分析是至关重要的,它关系到材料的性能预测和结构安全。Hashin失效准则是一种广泛应用的多向复合材料失效理论,由Shlomo Hashin于1962年提出,主要用于评估多向受...

    Hash map 哈希表

    哈希表,也被称为Hash Map,是计算机科学中一种高效的数据结构,用于存储键值对。它通过一种称为哈希函数的过程将键映射到数组的特定位置,从而实现快速的查找、插入和删除操作。在哈希表中,查找时间通常接近常数...

    哈希映射 hash map

    哈希映射(Hash Map),又称为哈希表,是一种数据结构,用于高效地存储和检索键值对。它基于哈希表的概念,利用哈希函数将键(Key)映射到一个固定大小的数组(桶)中的特定位置,以此实现快速访问。哈希表最大的...

    linux hash_map

    Linux下的`hash_map`是一种基于哈希表的数据结构,它提供了高效的键值对存储功能。与`map`不同,`hash_map`不使用二叉查找树(如红黑树),而是利用哈希函数来实现快速查找。哈希表通常由数组和散列函数组成,数组的...

Global site tag (gtag.js) - Google Analytics