在这篇http://bit1129.iteye.com/blog/2186325博文中,分析了hash based shuffle write开启consolidationFiles选项的过程。本文,则关注将Iteratable
1. 如下代码是HashShuffleWriter.write方法
在将partition的数据写入到磁盘前,进行map端的shuffle
/** Write a bunch of records to this task's output */ override def write(records: Iterator[_ <: Product2[K, V]]): Unit = { ///对输入的partition对应Iteratable集合进行map端combine val iter = if (dep.aggregator.isDefined) { if (dep.mapSideCombine) { //如果定义了dep.aggregator以及dep.mapSideCombine则进行map端combine dep.aggregator.get.combineValuesByKey(records, context) } else { records } } else { require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!") records } for (elem <- iter) { val bucketId = dep.partitioner.getPartition(elem._1) shuffle.writers(bucketId).write(elem) } }
2. 调用dep.aggregator.get.combineValuesByKey(records, context)进行map端combine
其中aggregator是Aggregator类型的对象,它在构造时需要传入如下参数:
case class Aggregator[K, V, C] ( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) { //代码体..... }
3. Aggregator.combineValuesByKey方法体:
@deprecated("use combineValuesByKey with TaskContext argument", "0.9.0") def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]): Iterator[(K, C)] = combineValuesByKey(iter, null) //iter:是map端输入的可遍历的数据集合 def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], context: TaskContext): Iterator[(K, C)] = { if (!isSpillEnabled) { //不启用spill到磁盘,那么数据集合中的所有数据将在内存中进行combine val combiners = new AppendOnlyMap[K,C] //combiners是一个AppendOnlyMap, 可以想象成内存内的HashMap var kv: Product2[K, V] = null val update = (hadValue: Boolean, oldValue: C) => { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) } while (iter.hasNext) { //遍历集合 kv = iter.next() //kv._1是Key,changeValue接收两个参数,kv键以及update combiners.changeValue(kv._1, update) } combiners.iterator //返回 } else { //构造参数哪来的? val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners) combiners.insertAll(iter) // Update task metrics if context is not null // TODO: Make context non optional in a future release Option(context).foreach { c => c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled } combiners.iterator } }
4. 只在内存内combine(AppendOnlyMap)
4.1 使用AppendOnlyMap,代码的关键是update函数,以及combiner.changeValue(kv._1, update)
val combiners = new AppendOnlyMap[K,C] var kv: Product2[K, V] = null ///通过闭包特性,update函数内可以访问kv val update = (hadValue: Boolean, oldValue: C) => { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) } while (iter.hasNext) { kv = iter.next() //kv键值对的key以及update函数作为入参 combiners.changeValue(kv._1, update) } combiners.iterator
4.2 AppendOnlyMap的changeValue方法
/** * Set the value for key to updateFunc(hadValue, oldValue), where oldValue will be the old value * for key, if any, or null otherwise. Returns the newly updated value. */ //key:kv键值对的key, //updateFunc的函数类型是(Boolean,V)=>V def changeValue(key: K, updateFunc: (Boolean, V) => V): V = { assert(!destroyed, destructionMessage) val k = key.asInstanceOf[AnyRef] if (k.eq(null)) { ///如果键值null,对键为空的处理 if (!haveNullValue) { incrementSize() } nullValue = updateFunc(haveNullValue, nullValue) // haveNullValue = true return nullValue } //通过对k进行rehash算出它Hash值 var pos = rehash(k.hashCode) & mask var i = 1 while (true) { //data是数组 //kv期望放到2*pos位置,其中k放到2*pos的位置,v放到2*pos+1的位置 val curKey = data(2 * pos) //if-else if-else做寻址探测 //如果data(2*pos)位置上的key已经存在,且与k相同,那么表示它们需要聚合 if (k.eq(curKey) || k.equals(curKey)) { //调用updateFunc,入参是值已经存在(true),那个位置上的值(old value) val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V]) //赋心智 data(2 * pos + 1) = newValue.asInstanceOf[AnyRef] //返回新值 return newValue } else if (curKey.eq(null)) { //HashMap上data(2 * pos)的值为null, //将kv的v写入,updateFunc的入参是值不存在,同时旧值为null val newValue = updateFunc(false, null.asInstanceOf[V]) //将data(2*pos)的值由null改为k data(2 * pos) = k //设置新值 data(2 * pos + 1) = newValue.asInstanceOf[AnyRef] //AppendOnlyMap的长度增1,表示新元素加入 incrementSize() return newValue } else { //重新寻址,寻址的算法,每次的步长是平方探测,1,2,4,8? val delta = i pos = (pos + delta) & mask i += 1 } } null.asInstanceOf[V] // Never reached but needed to keep compiler happy }
4.3 incrementSize肩负着resize table的职责
/** Increase table size by 1, rehashing if necessary */ private def incrementSize() { curSize += 1 if (curSize > growThreshold) { growTable() } }
5. 内存+磁盘combine(ExternalAppendOnlyMap)
特点:
1. 内存放不下可以放到磁盘
2. 放到磁盘前,首先进行排序
3. 最后对所有spill到磁盘的文件做归并排序
6. 向MapOutputTrackerMaster汇报写入数据的位置以及文件中FileSegment的位置
http://www.cnblogs.com/fxjwind/p/3522219.html
相关推荐
3. **Spark 1.4.0**:虽然Hash-based Shuffle仍然是默认方式,但Spark开始尝试优化Shuffle,引入了Sort-based Shuffle。这种方式在写入阶段就对数据进行了排序,使得相同键值的数据在同一块中,减少了Shuffle过程中...
Spark Shuffle 分为两种:一种是基于 Hash 的 Shuffle;另一种是基于 Sort 的 Shuffle。
Shuffle 操作是 Spark 中最耗时的操作之一,而 shuffle 优化是Spark 优化的关键。 本文将对 Spark Shuffle 优化的参数进行详细的介绍。 spark.reducer.maxSizeInFlight Spark 的 reducer 任务的 buffer 缓冲大小...
**Spark Shuffle**是指在Spark应用程序执行过程中,数据从一个节点或分区移动到另一个节点或分区的过程。这种数据重分布通常发生在诸如`groupByKey`, `reduceByKey`, `join`等操作中。Shuffle操作涉及到大量的磁盘I/...
10. **Spark Shuffle优化**:包括减少shuffle数据量、使用更高效的shuffle机制如Sort-based Shuffle和Hash-based Shuffle,以及合理配置executor内存和并行度,以提高性能。 11. **资源管理和调度**:Spark可以与...
hash_map
### hash_map详解 #### 0. 为什么需要hash_map? 在软件开发中,经常会遇到需要高效存储和查找键值对(key-value)的情况。例如,在管理人物名称及其相关信息时,我们希望能够快速地添加、查找和更新数据。传统的...
6. **Shuffle操作优化**:Shuffle是Spark性能瓶颈之一,减少不必要的Shuffle(如通过Coalesce操作)和优化Shuffle管理(如使用更高效的Hash Shuffle或Sort Shuffle)能显著提升性能。 7. **宽依赖与窄依赖**:理解...
5. 使用sort-based shuffle代替hash-based shuffle:sort-based shuffle可以提供更好的排序效果,减少数据交换。 以上策略需要结合实际应用场景灵活调整,持续监控Spark应用的性能指标,如executor内存使用情况、...
代码重点是hash_table,附加std::map与其做对比,实现的是一条sql语句:select c_nationkey, c_mktsegment, count(*), max(c_acctbal) from aaa_customer_1g group by c_nationkey, c_mktsegment order by c_...
为了优化shuffle过程,Spark使用Hash Shuffling和Sort Shuffling策略,后者通过排序保证了数据的稳定性。 Spark的架构是基于Master-Worker模式,由一个或多个Driver(通常是用户的应用程序)和多个Executor组成。...
Spark是一种流行的分布式大数据处理框架,Shuffle是其中的重要阶段,它发生在Map和Reduce任务之间,负责重新组织数据以确保正确性。在处理大规模数据时,Shuffle的效率和稳定性对整个作业性能至关重要。 早期的...
《Spark大数据商业实战三部曲》是一套深度探讨Spark技术在商业环境中的应用、内核解析及性能优化的实例源码集合。这份压缩包包含了丰富的实践案例和代码,旨在帮助读者深入理解Spark的核心机制,掌握大数据处理的...
哈希映射(Hash Map)是一种常见的数据结构,它提供了键值对(Key-Value Pair)的快速存储和检索功能。在C++中,STL(Standard Template Library)提供了一个名为`std::unordered_map`的容器,它是基于哈希表实现的...
标题中的"HASHIN.rar_ABAQUS_Hashin失效准则 abaqus_abaqus hashin_abaqus 三维Hashi"表明这是一个关于ABAQUS软件中应用Hashin失效准则进行三维分析的示例或教程。ABAQUS是一款广泛应用的有限元分析软件,尤其在结构...
shuffle调优则需要理解Spark的shuffle过程,通过调整hash分区策略、增加shuffle write buffer大小等手段,优化数据传输和合并过程。 总的来说,Spark性能优化是一个综合的过程,需要从开发阶段就考虑效率问题,并...
**三维Hashin失效准则详解** 在复合材料领域,失效分析是至关重要的,它关系到材料的性能预测和结构安全。Hashin失效准则是一种广泛应用的多向复合材料失效理论,由Shlomo Hashin于1962年提出,主要用于评估多向受...
哈希表,也被称为Hash Map,是计算机科学中一种高效的数据结构,用于存储键值对。它通过一种称为哈希函数的过程将键映射到数组的特定位置,从而实现快速的查找、插入和删除操作。在哈希表中,查找时间通常接近常数...
哈希映射(Hash Map),又称为哈希表,是一种数据结构,用于高效地存储和检索键值对。它基于哈希表的概念,利用哈希函数将键(Key)映射到一个固定大小的数组(桶)中的特定位置,以此实现快速访问。哈希表最大的...
Linux下的`hash_map`是一种基于哈希表的数据结构,它提供了高效的键值对存储功能。与`map`不同,`hash_map`不使用二叉查找树(如红黑树),而是利用哈希函数来实现快速查找。哈希表通常由数组和散列函数组成,数组的...