1.flow
1.1 shuffle abstract
1.2 shuffle flow
1.3 sort flow in shuffle
1.4 data structure in mem
2.core code paths
//SortShuffleWriter override def write(records: Iterator[Product2[K, V]]): Unit = { //-how to collect this result by partition?by index file //-1 sort result data //-both below cases will spill if over threshold val ts = System.currentTimeMillis() //-comp to reduce side combine HashShuffleReader#read(),here is real map side. if (dep.mapSideCombine) { //-as map side's Combiner;note:even if no aggregrator is provided,DAGScheduler will add it- require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") //by default,but keyOrdering,eg. sorter = new ExternalSorter[K, V, C]( //reduceByKey() dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) logInfo("-aggr:" + dep.aggregator + ",key ord:" + dep.keyOrdering +",ser:" + dep.serializer +",part:"+dep.partitioner + ",dep " + dep) sorter.insertAll(records) //-if no order is given ,using the key's hashcode to sort per partition } else { //-no combine is given,eg. groupBy.. // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't // care whether the keys get sorted in each partition; that will be done on the *reduce side* // if the operation being run is *sortByKey.* sorter = new ExternalSorter[K, V, V](None, Some(dep.partitioner), None, dep.serializer) sorter.insertAll(records) } logInfo("*total cost of sorting(ms) " + (System.currentTimeMillis()-ts)) ///-2 write to data file then index file // Don't bother including the time to open the merged output file in the shuffle write time, // because it just opens a single file, so is typically too fast to measure accurately // (see SPARK-3570). val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)//-same with shulffle output file in insertAll() val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) //-means that always write result dato disk event if data is much less. val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile) shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths) logInfo("-output file:" + outputFile + ",blockid:" + blockId + ",part len " + partitionLengths.length + ",total " + partitionLengths.sum +",shuffle server id " + blockManager.shuffleServerId + ",shuffleId " + dep.shuffleId) //-3 encapsulate the result(serialization is placed in Executor#launchTask()) //-used by MapOutputTracker#getServerStatuses() mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)//-not contains real data but only shuffle server } /** Close this writer, passing along whether the map completed */ override def stop(success: Boolean): Option[MapStatus] = { try { if (stopping) { return None } stopping = true if (success) { return Option(mapStatus) } else { // The map task failed, so delete our output data. shuffleBlockResolver.removeDataByMap(dep.shuffleId, mapId) return None } } finally { // Clean up our sorter, which may have its own intermediate files if (sorter != null) { val startTime = System.nanoTime() sorter.stop() context.taskMetrics.shuffleWriteMetrics.foreach( _.incShuffleWriteTime(System.nanoTime - startTime)) sorter = null } } }
//ExternalSorter /**-this file's data structure is same as spilled file. * Write all the data added into this ExternalSorter into *a file* in the disk store. This is * called by the SortShuffleWriter and can go through an efficient path of just concatenating * binary files if we decided to avoid merge-sorting. * * @param blockId block ID to write to. The index file will be blockId.name + ".index".-note * @param context a TaskContext for a running Spark task, for us to update shuffle metrics. * @return array of lengths, in bytes, of each partition of the file (used by map output tracker) */ def writePartitionedFile( blockId: BlockId, context: TaskContext, outputFile: File): Array[Long] = { // Track location of each range in the output file val lengths = new Array[Long](numPartitions) //-1.case if (bypassMergeSort && partitionWriters != null) { logInfo("-bypass:" + bypassMergeSort+",pwriters:" + partitionWriters.length) // We decided to write separate files for each partition, so just concatenate them. To keep // this simple we spill out the current in-memory collection so that everything is in files.-so no order is guaranteed spillToPartitionFiles(if (aggregator.isDefined) map else buffer) partitionWriters.foreach(_.commitAndClose()) val out = new FileOutputStream(outputFile, true) //-note here,append is 'true' val writeStartTime = System.nanoTime util.Utils.tryWithSafeFinally { for (i <- 0 until numPartitions) { val in = new FileInputStream(partitionWriters(i).fileSegment().file) util.Utils.tryWithSafeFinally { lengths(i) = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled) } { in.close() } } } { out.close() context.taskMetrics.shuffleWriteMetrics.foreach( _.incShuffleWriteTime(System.nanoTime - writeStartTime)) } } else if (spills.isEmpty && partitionWriters == null) {//-come here is if no shuffle spill data to disk;same as spill() logInfo("-no spills occured") //2 Case where we only have in-memory data val collection = if (aggregator.isDefined) map else buffer val it = collection.destructiveSortedWritablePartitionedIterator(comparator) //-same as spillToMergableFile() while (it.hasNext) { //-note:this is a double loops val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, context.taskMetrics.shuffleWriteMetrics.get) //-using file's append mode,so only one file will be created val partitionId = it.nextPartition() //-base part id while (it.hasNext && it.nextPartition() == partitionId) { //-since the data has been ordered by partId,so it.writeNext(writer) //-this is a staged writing } writer.commitAndClose() //-a part data is write all out val segment = writer.fileSegment() lengths(partitionId) = segment.length //-count up it's size } } else { //-3.case spilled files and remain data in-mem(here will spill them all out and concate them to final file) logInfo("-merge spilled file and in-mem data?,part it:" + this.partitionedIterator) // Not bypassing merge-sort; get an iterator by partition and just write everything directly. for ((id, elements) <- this.partitionedIterator) { //-use multiway merge sorter if (elements.hasNext) { val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, context.taskMetrics.shuffleWriteMetrics.get) for (elem <- elements) { writer.write(elem._1, elem._2) } writer.commitAndClose() val segment = writer.fileSegment() lengths(id) = segment.length } } } context.taskMetrics.incMemoryBytesSpilled(memoryBytesSpilled) context.taskMetrics.incDiskBytesSpilled(diskBytesSpilled) context.taskMetrics.shuffleWriteMetrics.filter(_ => bypassMergeSort).foreach { m => if (curWriteMetrics != null) { m.incShuffleBytesWritten(curWriteMetrics.shuffleBytesWritten) m.incShuffleWriteTime(curWriteMetrics.shuffleWriteTime) m.incShuffleRecordsWritten(curWriteMetrics.shuffleRecordsWritten) } } lengths }
3.FAQ(TBD TODO)
here are some questions im not clear,so any clues from you are highly appreciated:
A.when and where does ResultTask know to fetch result data from ShuffleMapTask?for example,here are a job with :
3 maps(m1,m2,m3) x 2 reduces(r1,r2)
t1:m1,m2,m3 are all running ;
t2:all maps are continued running except that m1 is finished,then r1 is notified to setup and fetchs result of m1
t3:m2 is finished.question is here:when does r1 know to fetch the result of m2? in my guess,there shuould be a share place to for reduces to know where are maps and when they are finished,but i have not found that sources with happy.
B.in BlockManager,see #Question# below
private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = { require(blockId != null, "BlockId is null") //-1 get bock locations replication-note:this block is plaaced in TaskRunner#run() in case of 'Indirect result' val locations = Random.shuffle(master.getLocations(blockId)) //-deliver to BlockManagerMasterEndpoint //- #Question# why not to identify which blockmanager is most recent to this driver if in cluster even local mode?vip for (loc <- locations) { //-multi hosts for the same blockid,so once is enough if data is valid logDebug(s"Getting remote block $blockId from $loc") //-2 fetch real data val data = blockTransferService.fetchBlockSync( loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer() if (data != null) { //-get one from one of the replication node,return immediately if (asBlockResult) { return Some(new BlockResult( dataDeserialize(blockId, data), DataReadMethod.Network, data.limit())) } else { return Some(data) } } logDebug(s"The value of block $blockId is null") } logDebug(s"Block $blockId not found") None }
相关推荐
Spark-3.1.2.tgz和Spark-3.1.2-bin-hadoop2.7.tgz是两个不同格式的Spark发行版,分别以tar.gz和rar压缩格式提供。 1. Spark核心概念: - RDD(弹性分布式数据集):Spark的基础数据结构,是不可变、分区的数据集合...
Spark-1.6.0-bin-hadoop2.6.tgz 是针对Linux系统的Spark安装包,包含了Spark 1.6.0版本以及与Hadoop 2.6版本兼容的构建。这个安装包为在Linux环境中搭建Spark集群提供了必要的组件和库。 **1. Spark基础知识** ...
Spark-3.0.0-bin-hadoop2.7.tgz 是Spark 3.0.0版本的预编译二进制包,其中包含了针对Hadoop 2.7版本的兼容性构建。这个版本的发布对于数据科学家和大数据工程师来说至关重要,因为它提供了许多性能优化和新功能。 1...
在本例中,我们讨论的是Spark 2.2.1版本与CDH 5.14.2的集成,具体文件为"spark-2.2.1-bin-2.6.0-cdh5.14.2.tar.gz",这是一个压缩包文件,用于在CDH环境中部署Spark。 首先,让我们深入了解Spark的核心概念。Spark...
Spark 2.4.7是Apache Spark的一个稳定版本,它为大数据处理提供了高效、易用且可扩展的框架...一旦下载并解压"spark-2.4.7-bin-hadoop2.7.tgz",就可以开始设置环境、配置参数,然后根据业务需求编写和运行Spark应用。
"spark-2.4.0-bin-without-hadoop" 这个压缩包文件是专门为安装 Spark 2.4.0 而准备的,但需要注意的是,它不包含 Hadoop 相关依赖,这意味着你需要自己提供 Hadoop 配置或者在不依赖 Hadoop 的环境中使用。...
这个压缩包文件"spark-2.4.4-bin-hadoop2.6.tgz"包含了运行Spark所需的所有组件和依赖,以便在Hadoop 2.6环境中运行。 1. **Spark核心概念**: Spark的核心是弹性分布式数据集(Resilient Distributed Datasets, ...
这个压缩包"spark-2.1.0-bin-without-hadoop.tgz"是Spark的二进制发行版,但不包含Hadoop依赖,这意味着用户需要自行配置Hadoop环境来使用Spark。 1. **Spark架构**:Spark的核心设计是弹性分布式数据集(Resilient...
这个压缩包"spark1.6.0-src.rar"包含了Spark 1.6.0的源代码,为开发者提供了深入理解Spark工作原理的机会,同时也允许用户根据自身需求进行定制化开发。 源码分析: 1. **Spark Core**:Spark的核心组件,负责任务...
Spark是Apache软件基金会下的一个开源...通过对Spark-2.4.0源码的阅读和研究,开发者可以了解到分布式系统设计、内存管理、任务调度、数据并行处理等多方面的知识,这对于提升大数据处理技术的专业水平有着极大的帮助。
在描述中提到的"spark-2.3.1-bin-hadoop2.9-without-hive.tgz"是一个特别构建的Spark发行版,不包含Hive的支持,意味着这个版本的Spark没有内置与Hive交互的能力。 在大数据处理领域,Spark以其内存计算特性而闻名...
本压缩包"spark-2.1.1-bin-hadoop2.7.tar.gz"是Spark 2.1.1与Hadoop 2.7兼容的二进制发行版,适用于那些已经部署或打算使用Hadoop 2.7环境的用户。 首先,我们来了解一下Spark的核心特性。Spark主要以Resilient ...
Spark-Core文档是本人经三年总结笔记汇总而来,对于自我学习Spark核心基础知识非常方便,资料中例举完善,内容丰富。具体目录如下: 目录 第一章 Spark简介与计算模型 3 1 What is Spark 3 2 Spark简介 3 3 Spark...
Spark-2.4.5是该框架的一个稳定版本,提供了丰富的数据处理功能,包括批处理、交互式查询(通过Spark SQL)、实时流处理(通过Spark Streaming)以及机器学习(通过MLlib库)和图计算(通过GraphX)。这个版本的源码...
Spark-1.3.1-bin-hadoop2.6.tgz是一个针对Linux和Windows系统的安装包,包含了Apache Spark 1.3.1版本以及与Hadoop 2.6兼容的依赖。这个压缩包为用户提供了在本地或集群环境中搭建Spark计算平台的基础。 1. **Spark...
解压"spark--bin-hadoop2-without-hive.tgz"后,需要配置环境变量,如SPARK_HOME和PATH,然后可以通过spark-shell、pyspark或spark-submit命令启动Spark交互式环境或提交应用程序。 7. **优化技巧**: - **内存...
Spark-2.3.1源码解读。 Spark Core源码阅读 Spark Context 阅读要点 Spark的缓存,变量,shuffle数据等清理及机制 Spark-submit关于参数及部署模式的部分解析 GroupByKey VS ReduceByKey OrderedRDDFunctions...
Spark 2.3.1是Apache Spark的一个稳定版本,它是一个快速、通用且可扩展的大数据处理框架。...通过下载并解压"spark-2.3.1-bin-hadoop2.6.tgz",你可以开始搭建本地或集群的Spark环境,探索这个框架的更多可能性。
Spark是Apache软件基金会下的一个开源大数据处理框架,以其高效、灵活和易用的特性而...如果你在官方下载渠道遇到问题,可以尝试从其他可靠的来源获取“spark-2.0.0.tgz”压缩包,以加快获取速度并开始你的Spark之旅。
5. **网络通信改进**:Spark的shuffle过程涉及大量的网络通信,"my-spark-core"可能通过优化网络传输,减少了数据交换的成本,提升了整体性能。 6. **API扩展与定制**:项目可能提供了更方便的API,允许开发者更...