原文 http://www.cnblogs.com/hseagle/p/3979744.html
概要
Spark 1.1中对spark core的一个重大改进就是引入了sort-based shuffle处理机制,本文就该处理机制的实现进行初步的分析。
通过一个小的实验来直观的感受一下sort-based shuffle算法会产生哪些中间文件,具体实验步骤如下所述。
步骤1: 修改conf/spark-default.conf, 加入如下内容
spark.shuffle.manager SORT
步骤2: 运行spark-shell
SPARK_LOCAL_IP=127.0.0.1 $SPARK_HOME/bin/spark-shell
步骤3: 执行wordcount
sc.textFile("README.md").flatMap(l => l.split(" ")).map(w=>(w,1)).reduceByKey(_ + _).collect
步骤4: 查看生成的中间文件
find /tmp/spark-local* -type f
文件查找结果如下所示
可以看到生成了两人种后缀的文件,分别为data和index类型,这两者的用途在后续分析中会详细讲述。
如果我们做一下对比实验,将shuffle模式改为Hash,再来观看生成的文件,就会找到区别。将原先配置文件中的 SORT 改为 HASH ,重新启动spark-shell,执行相同的wordcount之后,在tmp目录下找到的文件列表如下。
两者生成的文件数量差异非常大,具体数值计算如下
在HASH模式下,每一次shuffle会生成M*R的数量的文件,如上述wordcount例子中,整个job有一次shuffle过程,由于输入文件默认分片为2,故M个数为2,而spark.default.parallelism配置的值为4,故R为4,所以总共生成1*2*4=8个文件。shuffle_0_1_2解读为shuffle+shuffle_id+map_id+reduce_id,故0_1_2表示由第0次shuffle中的第1个maptask生成的文件,该文件内容会被第2个reduce task消费
在SORT模式下,一个Map Task只生成一个文件,而不管生成的文件要被多少的Reduce消费,故文件个数是M的数量,由于wordcount中的默认分片为2,故只生成两个data文件
多次shuffle
刚才的示例中只有一次shuffle过程,我们可以通过小小的改动来达到两次shuffle,代码如下
上述代码将reduceByKey的结果通过map进行反转,即将原来的(w, count)转换为(count,w),然后根据出现次数进行归类。 groupByKey会再次导致数据shuffle过程。
在HASH模式下产生的文件如下所示
/tmp/spark-local-20140919094531-1cb6/12/shuffle_0_3_3
/tmp/spark-local-20140919094531-1cb6/0c/shuffle_0_0_0
/tmp/spark-local-20140919094531-1cb6/11/shuffle_0_2_3
/tmp/spark-local-20140919094531-1cb6/11/shuffle_0_3_2
/tmp/spark-local-20140919094531-1cb6/11/shuffle_1_1_3
/tmp/spark-local-20140919094531-1cb6/10/shuffle_0_2_2
/tmp/spark-local-20140919094531-1cb6/10/shuffle_0_1_3
/tmp/spark-local-20140919094531-1cb6/10/shuffle_0_3_1
/tmp/spark-local-20140919094531-1cb6/10/shuffle_1_0_3
/tmp/spark-local-20140919094531-1cb6/10/shuffle_1_1_2
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_0_3
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_3_0
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_2_1
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_1_2
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_1_0_2
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_1_1_1
/tmp/spark-local-20140919094531-1cb6/0d/shuffle_0_0_1
/tmp/spark-local-20140919094531-1cb6/0d/shuffle_0_1_0
/tmp/spark-local-20140919094531-1cb6/0d/shuffle_1_0_0
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_0_2_0
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_0_1_1
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_0_0_2
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_1_0_1
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_1_1_0
引入一次新的shuffle,产生了大量的中间文件
如果是使用SORT,效果如何呢?只会增加M个文件,由于在新的shuffle过程中,map task数目为4,所以总共的文件是2+4=6。
值得指出的是shuffle_0和shuffle_1的执行次序问题,数字越大越先执行,由于spark job提交的时候是从后往前倒推的,故0是最后将执行,而前面的先执行。
Sort-based Shuffle的设计思想
sort-based shuffle的总体指导思想是一个map task最终只生成一个shuffle文件,那么后续的reduce task是如何从这一个shuffle文件中得到自己的partition呢,这个时候就需要引入一个新的文件类型即index文件。
其具体实现步骤如下:
Map Task在读取自己输入的partition之后,将计算结果写入到ExternalSorter
ExternalSorter会使用一个map来存储新的计算结果,新的计算结果根据partiton分类,如果是有combine操作,则需要将新的值与原有的值进行合并
如果ExternalSorter中的map占用的内存已经超越了使用的阀值,则将map中的内容spill到磁盘中,每一次spill产生一个不同的文件
当输入Partition中的所有数据都已经处理完毕之后,这时有可能一部分计算结果在内存中,另一部分计算结果在spill的一到多个文件之中,这时通过merge操作将内存和spill文件中的内容合并整到一个文件里
最后将每一个partition的在data文件中的起始位置和结束位置写入到index文件
相应的源文件
几个重要的函数
SortShuffleWriter.write
ExternalSorter.insertAll
writePartitionedFile将内存中的数据和spill文件中内容一起合并到一个文件当中
而数据读取过程中则需要使用IndexShuffleBlockManager来获取Partiton的具体位置
概要
Spark 1.1中对spark core的一个重大改进就是引入了sort-based shuffle处理机制,本文就该处理机制的实现进行初步的分析。
通过一个小的实验来直观的感受一下sort-based shuffle算法会产生哪些中间文件,具体实验步骤如下所述。
步骤1: 修改conf/spark-default.conf, 加入如下内容
spark.shuffle.manager SORT
步骤2: 运行spark-shell
SPARK_LOCAL_IP=127.0.0.1 $SPARK_HOME/bin/spark-shell
步骤3: 执行wordcount
sc.textFile("README.md").flatMap(l => l.split(" ")).map(w=>(w,1)).reduceByKey(_ + _).collect
步骤4: 查看生成的中间文件
find /tmp/spark-local* -type f
文件查找结果如下所示
/tmp/spark-local-20140919091822-aa66/0f/shuffle_0_1_0.index /tmp/spark-local-20140919091822-aa66/30/shuffle_0_0_0.index /tmp/spark-local-20140919091822-aa66/0c/shuffle_0_0_0.data /tmp/spark-local-20140919091822-aa66/15/shuffle_0_1_0.data
可以看到生成了两人种后缀的文件,分别为data和index类型,这两者的用途在后续分析中会详细讲述。
如果我们做一下对比实验,将shuffle模式改为Hash,再来观看生成的文件,就会找到区别。将原先配置文件中的 SORT 改为 HASH ,重新启动spark-shell,执行相同的wordcount之后,在tmp目录下找到的文件列表如下。
/tmp/spark-local-20140919092949-14cc/10/shuffle_0_1_3 /tmp/spark-local-20140919092949-14cc/0f/shuffle_0_1_2 /tmp/spark-local-20140919092949-14cc/0f/shuffle_0_0_3 /tmp/spark-local-20140919092949-14cc/0c/shuffle_0_0_0 /tmp/spark-local-20140919092949-14cc/0d/shuffle_0_1_0 /tmp/spark-local-20140919092949-14cc/0d/shuffle_0_0_1 /tmp/spark-local-20140919092949-14cc/0e/shuffle_0_1_1 /tmp/spark-local-20140919092949-14cc/0e/shuffle_0_0_2
两者生成的文件数量差异非常大,具体数值计算如下
在HASH模式下,每一次shuffle会生成M*R的数量的文件,如上述wordcount例子中,整个job有一次shuffle过程,由于输入文件默认分片为2,故M个数为2,而spark.default.parallelism配置的值为4,故R为4,所以总共生成1*2*4=8个文件。shuffle_0_1_2解读为shuffle+shuffle_id+map_id+reduce_id,故0_1_2表示由第0次shuffle中的第1个maptask生成的文件,该文件内容会被第2个reduce task消费
在SORT模式下,一个Map Task只生成一个文件,而不管生成的文件要被多少的Reduce消费,故文件个数是M的数量,由于wordcount中的默认分片为2,故只生成两个data文件
多次shuffle
刚才的示例中只有一次shuffle过程,我们可以通过小小的改动来达到两次shuffle,代码如下
sc.textFile("README.md").flatMap(l => l.split(" ")).map(w => (w,1)).reduceByKey(_ + _).map(p=>(p._2,p._1)).groupByKey.collect
上述代码将reduceByKey的结果通过map进行反转,即将原来的(w, count)转换为(count,w),然后根据出现次数进行归类。 groupByKey会再次导致数据shuffle过程。
在HASH模式下产生的文件如下所示
/tmp/spark-local-20140919094531-1cb6/12/shuffle_0_3_3
/tmp/spark-local-20140919094531-1cb6/0c/shuffle_0_0_0
/tmp/spark-local-20140919094531-1cb6/11/shuffle_0_2_3
/tmp/spark-local-20140919094531-1cb6/11/shuffle_0_3_2
/tmp/spark-local-20140919094531-1cb6/11/shuffle_1_1_3
/tmp/spark-local-20140919094531-1cb6/10/shuffle_0_2_2
/tmp/spark-local-20140919094531-1cb6/10/shuffle_0_1_3
/tmp/spark-local-20140919094531-1cb6/10/shuffle_0_3_1
/tmp/spark-local-20140919094531-1cb6/10/shuffle_1_0_3
/tmp/spark-local-20140919094531-1cb6/10/shuffle_1_1_2
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_0_3
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_3_0
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_2_1
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_1_2
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_1_0_2
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_1_1_1
/tmp/spark-local-20140919094531-1cb6/0d/shuffle_0_0_1
/tmp/spark-local-20140919094531-1cb6/0d/shuffle_0_1_0
/tmp/spark-local-20140919094531-1cb6/0d/shuffle_1_0_0
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_0_2_0
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_0_1_1
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_0_0_2
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_1_0_1
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_1_1_0
引入一次新的shuffle,产生了大量的中间文件
如果是使用SORT,效果如何呢?只会增加M个文件,由于在新的shuffle过程中,map task数目为4,所以总共的文件是2+4=6。
/tmp/spark-local-20140919094731-034a/29/shuffle_0_3_0.data /tmp/spark-local-20140919094731-034a/30/shuffle_0_0_0.index /tmp/spark-local-20140919094731-034a/15/shuffle_0_1_0.data /tmp/spark-local-20140919094731-034a/36/shuffle_0_2_0.data /tmp/spark-local-20140919094731-034a/0c/shuffle_0_0_0.data /tmp/spark-local-20140919094731-034a/32/shuffle_0_2_0.index /tmp/spark-local-20140919094731-034a/32/shuffle_1_1_0.index /tmp/spark-local-20140919094731-034a/0f/shuffle_0_1_0.index /tmp/spark-local-20140919094731-034a/0f/shuffle_1_0_0.index /tmp/spark-local-20140919094731-034a/0a/shuffle_1_1_0.data /tmp/spark-local-20140919094731-034a/2b/shuffle_1_0_0.data /tmp/spark-local-20140919094731-034a/0d/shuffle_0_3_0.index
值得指出的是shuffle_0和shuffle_1的执行次序问题,数字越大越先执行,由于spark job提交的时候是从后往前倒推的,故0是最后将执行,而前面的先执行。
Sort-based Shuffle的设计思想
sort-based shuffle的总体指导思想是一个map task最终只生成一个shuffle文件,那么后续的reduce task是如何从这一个shuffle文件中得到自己的partition呢,这个时候就需要引入一个新的文件类型即index文件。
其具体实现步骤如下:
Map Task在读取自己输入的partition之后,将计算结果写入到ExternalSorter
ExternalSorter会使用一个map来存储新的计算结果,新的计算结果根据partiton分类,如果是有combine操作,则需要将新的值与原有的值进行合并
如果ExternalSorter中的map占用的内存已经超越了使用的阀值,则将map中的内容spill到磁盘中,每一次spill产生一个不同的文件
当输入Partition中的所有数据都已经处理完毕之后,这时有可能一部分计算结果在内存中,另一部分计算结果在spill的一到多个文件之中,这时通过merge操作将内存和spill文件中的内容合并整到一个文件里
最后将每一个partition的在data文件中的起始位置和结束位置写入到index文件
相应的源文件
SortShuffleManager.scala SortShuffleWriter.scala ExternalSorter.scala IndexShuffleBlockManager.scala
几个重要的函数
SortShuffleWriter.write
override def write(records: Iterator[_ >: Product2[K, V]]): Unit = { if (dep.mapSideCombine) { if (!dep.aggregator.isDefined) { throw new IllegalStateException("Aggregator is empty for map-side combine") } sorter = new ExternalSorter[K, V, C]( dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) sorter.insertAll(records) } else { // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't // care whether the keys get sorted in each partition; that will be done on the reduce side // if the operation being run is sortByKey. sorter = new ExternalSorter[K, V, V]( None, Some(dep.partitioner), None, dep.serializer) sorter.insertAll(records) } val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId) val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId) val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile) shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths) mapStatus = new MapStatus(blockManager.blockManagerId, partitionLengths.map(MapOutputTracker.compressSize)) }
ExternalSorter.insertAll
def insertAll(records: Iterator[_ { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) } while (records.hasNext) { elementsRead += 1 kv = records.next() map.changeValue((getPartition(kv._1), kv._1), update) maybeSpill(usingMap = true) } } else { // Stick values into our buffer while (records.hasNext) { elementsRead += 1 val kv = records.next() buffer.insert((getPartition(kv._1), kv._1), kv._2.asInstanceOf[C]) maybeSpill(usingMap = false) } } }
writePartitionedFile将内存中的数据和spill文件中内容一起合并到一个文件当中
def writePartitionedFile( blockId: BlockId, context: TaskContext, outputFile: File): Array[Long] = { // Track location of each range in the output file val lengths = new Array[Long](numPartitions) if (bypassMergeSort && partitionWriters != null) { // We decided to write separate files for each partition, so just concatenate them. To keep // this simple we spill out the current in-memory collection so that everything is in files. spillToPartitionFiles(if (aggregator.isDefined) map else buffer) partitionWriters.foreach(_.commitAndClose()) var out: FileOutputStream = null var in: FileInputStream = null try { out = new FileOutputStream(outputFile) for (i <- 0 until numPartitions) { in = new FileInputStream(partitionWriters(i).fileSegment().file) val size = org.apache.spark.util.Utils.copyStream(in, out, false) in.close() in = null lengths(i) = size } } finally { if (out != null) { out.close() } if (in != null) { in.close() } } } else { // Either we're not bypassing merge-sort or we have only in-memory data; get an iterator by // partition and just write everything directly. for ((id, elements) <- this.partitionedIterator) { if (elements.hasNext) { val writer = blockManager.getDiskWriter( blockId, outputFile, ser, fileBufferSize, context.taskMetrics.shuffleWriteMetrics.get) for (elem
而数据读取过程中则需要使用IndexShuffleBlockManager来获取Partiton的具体位置
override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { // The block is actually going to be a range of a single map output file for this map, so // find out the consolidated file, then the offset within that from our index val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) val in = new DataInputStream(new FileInputStream(indexFile)) try { in.skip(blockId.reduceId * 8) val offset = in.readLong() val nextOffset = in.readLong() new FileSegmentManagedBuffer( getDataFile(blockId.shuffleId, blockId.mapId), offset, nextOffset - offset) } finally { in.close() } }
发表评论
-
翻译一下spark sql and dataframes
2016-03-23 15:20 1620概述 spark sql 是一个结构化执行的数据模块,它并不像 ... -
spark 中GC的调优
2016-03-14 11:02 1352注:本文转自:http://www.csdn.net/arti ... -
spark Tungsten-将硬件性能彻底压榨
2016-03-08 11:06 1031Tungsten项目将是Spark自诞生以来内核级别的最大改动 ... -
关于Spark的Broadcast解析
2016-02-20 08:37 4520本文重点关注 数据块切分方法以及P2P下载数据方法 Broad ... -
spark的几个重要概念
2015-12-04 14:09 0本节主要记录以下几个概念 一:RDD的五大特点 二:RDD 窄 ... -
spark部署安装调试
2015-12-02 11:28 737本节记录spark下载-->编译-->安装--&g ... -
spark基本概念
2015-11-12 10:45 787记录一下课堂笔记: ... -
hadoop计算能力调度器配置
2015-10-29 10:39 1017问题出现 hadoop默认调度器是FIFO,其原理就是先按照作 ... -
HBase在各大应用中的优化和改进
2015-10-28 14:59 695Facebook之前曾经透露过Facebook的hbase架构 ... -
一篇很好的解决系统问题过程描述文章
2015-09-23 08:40 498在网上看到的一篇解决h ... -
通过GeoHash核心原理来分析hbase rowkey设计
2015-09-08 15:49 3517注:本文是结合hbase ... -
从OpenTsdb来分析rowkey设计
2015-09-06 16:04 4945讨论此问题前,先理解 ... -
HBase中asynchbase的使用方式
2015-08-25 10:32 8194Hbase的原生java 客户端是完全同步的,当你使用原生AP ... -
Mapreduce优化的点滴
2015-07-16 15:18 827注:转载 1. 使用自定义Writable 自带的Text ... -
hadoop 如何自定义类型
2015-07-15 09:37 1236记录一下hadoop 数据类型章节的笔记,以便后期使用,本文是 ... -
napreduce shuffle 过程记录
2015-07-10 11:23 758在我看来 hadoop的核心是mapre ... -
ZooKeeper伪分布式集群安装及使用
2015-02-13 08:29 9201. zookeeper介绍 ZooKeeper是一个为分 ... -
hadoop-mahout 核心算法总结
2015-02-07 10:08 1554其实大家都知道hadoop为我们提供了一个大的框架,真正的 ... -
推荐引擎内部原理--mahout
2015-01-22 11:11 569转载自:https://www.ibm.com/devel ... -
hadoop 动态添加删除节点
2015-01-20 13:39 661转自:http://www.cnblogs.com/rill ...
相关推荐
从Hash-based Shuffle到Sort-based Shuffle再到Push-based Shuffle,每次改进都是为了降低延迟、提高吞吐量和减少资源消耗。理解Spark Shuffle的工作原理及其优化方法,对于有效管理和优化Spark作业的性能至关重要。
《Spark技术内幕深入解析Spark内核架构设计与实现原理》这本书深入探讨了Apache Spark这一分布式计算框架的核心架构和实现机制,旨在帮助读者全面理解Spark的工作原理,并能够有效地利用其进行大数据处理。...
5. 使用sort-based shuffle代替hash-based shuffle:sort-based shuffle可以提供更好的排序效果,减少数据交换。 以上策略需要结合实际应用场景灵活调整,持续监控Spark应用的性能指标,如executor内存使用情况、...
这个参数控制对于 sort-based ShuffleManager,如果没有进行 map side 聚合,而且 reduce 任务数量少于这个值,那么就不会进行排序。如果你使用 sort ShuffleManager,而且不需要排序,那么可以考虑将这个值加大,...
Array shuffle Prime test(trial division) Prime test(Miller-Rabin's method) 2D Array Arbitary Integer Linear congruential generator Maximum subarray problem Bit-Set Queue Stack Binary Heap Fibonacci ...
- **Bucket Join 改进**:通过预先对数据进行分桶(Bucketing),使得具有相同键的数据被分配到相同的分区中,从而避免 Shuffle,直接进行 Bucket Sort Merge Join,减少网络传输和数据重排的开销。对于不兼容的 ...
3. Shuffle和Sort阶段:Map阶段产生的中间结果按键排序,并分发到相应的Reducer。 4. Reduce阶段:Reducer函数根据物品ID聚合相似度,生成物品的推荐列表。对于每个用户,找出与其已评分物品最相似的N个物品,作为...
- **Shuffle优化**:优化Shuffle过程中的排序与合并操作,提高数据传输效率。 3. **Shuffle阶段优化**: - 在Shuffle过程中优化数据传输方式,例如使用更高效的序列化/反序列化方法。 - 调整`io.sort.mb`参数,...
- Tree-based Sorting:构建二叉树或其他树结构,每个节点代表一部分数据,通过树的遍历实现排序。 每种算法的代价模型包括计算成本、内存使用和网络通信开销,选择哪种算法取决于数据特性、集群资源和性能需求。...
6. **Shuffle操作优化**:Shuffle是Spark性能瓶颈之一,减少不必要的Shuffle(如通过Coalesce操作)和优化Shuffle管理(如使用更高效的Hash Shuffle或Sort Shuffle)能显著提升性能。 7. **宽依赖与窄依赖**:理解...
- HiveCBO(Cost-Based Optimizer)可以根据成本模型自动选择最优查询计划,从而提高查询性能。 #### 12.6 Kafka ##### 12.6.1 Kafka性能调优 - Kafka的性能调优主要包括以下方面: - 调整分区数。 - 优化Broker...
sort-based shuffle在处理大量数据时,需要消耗更多的内存和I/O资源,并且对网络带宽要求较高,可能会成为瓶颈。 spark.storage.memoryFraction参数的含义,实际生产中如何调优: spark.storage.memoryFraction参数...
Shuffle and Sort 205 The Map Side 206 The Reduce Side 207 Configuration Tuning 209 Task Execution 212 The Task Execution Environment 212 Speculative Execution 213 Output Committers 215 Task JVM Reuse ...
array.zip A simple program that shows how a two-dimensional array works within a VB program.<END><br>70,Bubblesort.zip A simple Bubble Sort code that shows how the program works within a VB ...