1. N个partition,会产生N个MapTask,如果不指定ReduceTask的个数,那么默认情况下,ReduceTask个数也为N
2. N个partition,即N个MapTask,同时有N个ReduceTask(默认是N,不指定ReduceTask个数的情况下),那么会产生不大于N的.data文件以及不大于N的index文件。
即文件个数不受ReduceTask个数的影响。
如下所示:index个数是5,data个数是4,这些数字跟Reducer个数无关,而与Mapper个数有关。那么为什么data文件个数不是5个,这些文件是Mapper如何写入的?其中包含了什么内容,接下来分析。
hadoop$ pwd /tmp/spark-local-20150129214306-5f46 hadoop$ cd spark-local-20150129214306-5f46/ hadoop$ ls 0c 0d 0e 0f 11 13 15 29 30 32 36 38 hadoop$ find . -name "*.index" | xargs ls -l -rw-rw-r-- 1 hadoop hadoop 48 1月 29 21:43 ./0c/shuffle_0_4_0.index -rw-rw-r-- 1 hadoop hadoop 48 1月 29 21:43 ./0d/shuffle_0_3_0.index -rw-rw-r-- 1 hadoop hadoop 48 1月 29 21:43 ./0f/shuffle_0_1_0.index -rw-rw-r-- 1 hadoop hadoop 48 1月 29 21:43 ./30/shuffle_0_0_0.index -rw-rw-r-- 1 hadoop hadoop 48 1月 29 21:43 ./32/shuffle_0_2_0.index hadoop$ find . -name "*.data" | xargs ls -l -rw-rw-r-- 1 hadoop hadoop 884 1月 29 21:43 ./0c/shuffle_0_0_0.data -rw-rw-r-- 1 hadoop hadoop 685 1月 29 21:43 ./15/shuffle_0_1_0.data -rw-rw-r-- 1 hadoop hadoop 710 1月 29 21:43 ./29/shuffle_0_3_0.data -rw-rw-r-- 1 hadoop hadoop 693 1月 29 21:43 ./36/shuffle_0_2_0.data
ShuffleMapTask执行逻辑
既然N个partition写入到若干个data文件中,那么当Reducer过来拉取数据时,必须得知道,属于它的那部分数据
data和index文件名中的reduceId都是0,原因是(见IndexShuffleBlockManager)
def getDataFile(shuffleId: Int, mapId: Int): File = { ///相同的shuffleId和mapId指向同一个文件 blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, 0)) } private def getIndexFile(shuffleId: Int, mapId: Int): File = { ///相同的shuffleId和mapId指向同一个文件 blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, 0)) }
def writePartitionedFile( blockId: BlockId, context: TaskContext, outputFile: File): Array[Long] = { // Track location of each range in the output file val lengths = new Array[Long](numPartitions) if (bypassMergeSort && partitionWriters != null) { // We decided to write separate files for each partition, so just concatenate them. To keep // this simple we spill out the current in-memory collection so that everything is in files. spillToPartitionFiles(if (aggregator.isDefined) map else buffer) partitionWriters.foreach(_.commitAndClose()) var out: FileOutputStream = null var in: FileInputStream = null try { out = new FileOutputStream(outputFile, true) for (i <- 0 until numPartitions) { in = new FileInputStream(partitionWriters(i).fileSegment().file) val size = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled) in.close() in = null lengths(i) = size } } finally { if (out != null) { out.close() } if (in != null) { in.close() } } } else { // Either we're not bypassing merge-sort or we have only in-memory data; get an iterator by // partition and just write everything directly. ///this.partitionedIterator包含了数据集合 for ((id, elements) <- this.partitionedIterator) { if (elements.hasNext) { //如果elements有元素 val writer = blockManager.getDiskWriter( blockId, outputFile, ser, fileBufferSize, context.taskMetrics.shuffleWriteMetrics.get) for (elem <- elements) { ///遍历集合,写入文件 writer.write(elem) ///追加数据,遍历结束后再关闭 } writer.commitAndClose() ///关闭文件 val segment = writer.fileSegment() ///这是什么东东? lengths(id) = segment.length } } } context.taskMetrics.memoryBytesSpilled += memoryBytesSpilled context.taskMetrics.diskBytesSpilled += diskBytesSpilled context.taskMetrics.shuffleWriteMetrics.filter(_ => bypassMergeSort).foreach { m => if (curWriteMetrics != null) { m.shuffleBytesWritten += curWriteMetrics.shuffleBytesWritten m.shuffleWriteTime += curWriteMetrics.shuffleWriteTime } } lengths }
相关推荐
spark.shuffle.blockTransferService netty shuffle过程中,传输数据的方式,两种选项,netty或nio,spark 1.2开始,默认就是netty,比较简单而且性能较高,spark 1.5开始nio就是过期的了,而且spark 1.6中会去除掉 ...
SparkShuffle思维导图,xmind
3. **Spark 1.4.0**:虽然Hash-based Shuffle仍然是默认方式,但Spark开始尝试优化Shuffle,引入了Sort-based Shuffle。这种方式在写入阶段就对数据进行了排序,使得相同键值的数据在同一块中,减少了Shuffle过程中...
这个参数控制 Spark 1.5 以后的 ShuffleManager,有三种可选的,hash、sort 和 tungsten-sort。sort-based ShuffleManager 会更高效实用内存,并且避免产生大量的 map side 磁盘文件,从 Spark 1.2 开始就是默认的...
spark-2.2.0-yarn-shuffle.jar
Spark Shuffle 分为两种:一种是基于 Hash 的 Shuffle;另一种是基于 Sort 的 Shuffle。
- shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold(默认200)。 - 不是排序类的shuffle算子。 - 每个下游task创建一个临时磁盘文件,数据按照key的hash值写入对应的文件。 - 写入磁盘前先...
3. **Bypass Merge SortShuffle**: 当满足特定条件(map任务数量小于`spark.shuffle.sort.bypassMergeThreshold`且非聚合类shuffle算子)时,Spark会使用bypass机制,跳过排序过程。每个task会为每个reducer创建一个...
Spark Shuffle是大数据处理框架Apache Spark中的关键组成部分,它在数据处理流程中扮演着至关重要的角色,连接了Map和Reduce操作,决定了数据如何在集群中重新分布。 Shuffle过程涉及到数据的分区、排序和网络传输,...
当数量小于等于 spark.shuffle.sort.bypassMergeThreshold 参数的值时(默认为 200),就会启用 bypass 机制。在普通运行机制下,数据会先写入一个内存数据结构中,然后根据不同的 Shuffle 算子选择不同的数据结构。...
spark-1.6.1-yarn-shuffle.jar 下载。spark-1.6.1-yarn-shuffle.jar 下载。spark-1.6.1-yarn-shuffle.jar 下载。
我们都知道,Shuffle 操作在 Spark 中是一种昂贵的操作。在 Facebook,单个 Job 的 Shuffle 就可能往磁盘中写入 300TB 的数据;而且 shuffle reads 也是一种低效的操作,这会大大延长作业的整体执行时间,并且消耗...
SortShuffleManager在某些情况下可以跳过排序,即当Shuffle reduce task数量小于`spark.shuffle.sort.bypassMergeThreshold`参数值时。这可以减少排序的计算开销,但可能导致更多的磁盘文件生成。如果不需要排序,...
SortShuffleManager 是 Spark 中的一种重要类,基于 Sort-base 的 Shuffle,用于在 Mapper 中的每一个 ShuffleMapTask 中产生两个文件:Data 文件和 Index 文件。SortShuffleManager 的主要方法包括: * ...
Spark ExternalShuffleService 配置使用及性能提升 Spark ExternalShuffleService 是一个长期存在于 NodeManager 进程中的辅助服务,旨在提升 Spark 系统中 Shuffle 计算的性能。该服务可以减少 Executor 的压力,...
一、shuffle原理分析 1.1 shuffle概述 Shuffle就是对数据进行重组,由于分布式计算的特性和要求,在实现细节上...而Spark也会有自己的shuffle实现过程。 1.2 Spark中的 shuffle 介绍 在DAG调度的过程中,Stage 阶段的
我们都知道,Shuffle 操作在 Spark 中是一种昂贵的操作。在 Facebook,单个 Job 的 Shuffle 就可能往磁盘中写入 300TB 的数据;而且 shuffle reads 也是一种低效的操作,这会大大延长作业的整体执行时间,并且消耗...
spark-network-shuffle_2.11-2.1.3-SNAPSHOT.jar
6. **性能优化**:Spark3.2.2包含了一系列性能优化,如减少网络传输、提高shuffle效率、内存管理改进等。此外,还引入了Tungsten项目的新的编码器技术,进一步减少了内存占用。 7. **机器学习库MLlib**:Spark ...
【Spark Shuffle Service在中通的优化实践】 Spark Shuffle Service是Apache Spark中用于处理Stage间数据交换的关键组件。在中通快递的业务场景中,随着业务量的快速增长,传统的基于Hive+MapReduce的离线计算方式...