Spark的shuffle读操作是性能杀手,原因是Shuffle读操作需要从多个Map节点拉取数据到Reduce节点,所有的Reduce结果是否还要经过一次总计算?
package spark.examples import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ object SparkWordCount { def main(args: Array[String]) { System.setProperty("hadoop.home.dir", "E:\\devsoftware\\hadoop-2.5.2\\hadoop-2.5.2"); val conf = new SparkConf() conf.setAppName("SparkWordCount") conf.setMaster("local") val sc = new SparkContext(conf) val rdd = sc.textFile("file:///D:/word.in") println(rdd.toDebugString) val rdd1 = rdd.flatMap(_.split(" ")) println("rdd1:" + rdd1.toDebugString) val rdd2 = rdd1.map((_, 1)) println("rdd2:" + rdd2.toDebugString) val rdd3 = rdd2.reduceByKey(_ + _); println("rdd3:" + rdd3.toDebugString) rdd3.saveAsTextFile("file:///D:/wordout" + System.currentTimeMillis()); sc.stop } }
1. ResultTask的runTask方法
override def runTask(context: TaskContext): U = { // Deserialize the RDD and the func using the broadcast variables. val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) metrics = Some(context.taskMetrics) func(context, rdd.iterator(partition, context)) }
2. 首先反序列化得到rdd和func,其中rdd是该stage的最后一个RDD(final RDD),而func是就是PairRDDFunctions.scala的saveAsHadoopDataset方法内部定义的writeToFile函数。writeToFile函数实现了wordcount最终的写磁盘操作(rdd3.saveAsTextFile)。在调用writeToFile写磁盘之前,需要首先从Mapper节点读取到数据,然后对它进行整合,这个在ResultTask的runTask方法的最后一句 func(context, rdd.iterator(partition, context))中的rdd.iterator方法实现的。
这在Spark Shuffle写操作过程中分析,就是读取级联的读取它的父RDD的compute方法完成读取的读取操作
3.数据读取操作(rdd.iterator)
3.1 RDD类定义的iterator的模板方法,
final def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { ///如果这个RDD的StorageLevel不为NONE,那么逻辑是什么? SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel) } else { //否则计算或者读取Checkpoint检查点 computeOrReadCheckpoint(split, context) } }
3.2 WordCount的第二个Stage总共包含两个RDD,最后一个是MappedRDD,第一个是ShuffledMapRDD,MappedRDD从ShuffledMapRDD获取具体的数据,然后执行MappedRDD自身携带的函数,这个函数的定义是在RDD的saveAsTextFile的函数x=>(NullWritable.get(), new Text(x.toString))
def saveAsTextFile(path: String) { this.map(x => (NullWritable.get(), new Text(x.toString))) .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path) }
3.3 MappedRDD的compute方法
override def compute(split: Partition, context: TaskContext) = firstParent[T].iterator(split, context).map(f) ///调用它的parent RDD的iterator方法,这里是ShuffledRDD }
3.4 ShuffledRDD的compute方法
///split是当前要计算的分片 override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = { val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] //ShuffleManager是SortShuffleManager、 ///获取的ReaderHashShuffleReader,注意此处虽然是Sort based Shuffle,但是读仍然是HashShuffleReader ///获取reader的参照是shuffleHandle,分区的头尾,Spark只支持一次读取一个分片 SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context) .read() .asInstanceOf[Iterator[(K, C)]] }
3.5 SortShuffleManager的getReader方法
/** * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). * Called on executors by reduce tasks. */ override def getReader[K, C]( handle: ShuffleHandle, startPartition: Int, endPartition: Int, context: TaskContext): ShuffleReader[K, C] = { // We currently use the same block store shuffle fetcher as the hash-based shuffle. new HashShuffleReader(////构造时,只需要提供ShuffleHandle,parttition开始结束位 handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context) }
3.6 HashShuffleReader的read方法
/** Read the combined key-values for this reduce task */ override def read(): Iterator[Product2[K, C]] = { val ser = Serializer.getSerializer(dep.serializer)///获取序列化器 ///关键方法,根据shuffleId获取迭代数据的Iterator val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser) ///对数据进行aggregate,得到一个整合后的Iterator val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) { ////Reduce端进行结果合并,map端是否已经combine过,采用两种不同的方式 ////如果map端做了combine,那么调用combineCombinersByKey, if (dep.mapSideCombine) { new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context)) } else { ////如果map端没combine,那么调用combineValuesByKey,这个应该是跟map端做combine使用相同的方法 new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context)) } } else if (dep.aggregator.isEmpty && dep.mapSideCombine) { throw new IllegalStateException("Aggregator is empty for map-side combine") } else { // Convert the Product2s to pairs since this is what downstream RDDs currently expect iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2)) } // Sort the output if there is a sort ordering defined. ///继续对数据进行排序操作,如果定义了keyOrdering,如果没有定义keyOrdering,那么如何定义之? ///对Key进行排序? dep.keyOrdering match { case Some(keyOrd: Ordering[K]) => // Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled, // the ExternalSorter won't spill to disk. val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser)) sorter.insertAll(aggregatedIter) context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled sorter.iterator case None => aggregatedIter } }
3.7 BlockStoreShuffleFetcher伴生对象只定义了一个fetch方法
private[hash] object BlockStoreShuffleFetcher extends Logging { def fetch[T]( shuffleId: Int, ////shuffleId,由dep.shuffleHandle.shuffleId提供 reduceId: Int, ///startPartition被理解为reduceId? context: TaskContext, serializer: Serializer) : Iterator[T] = { logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId)) val blockManager = SparkEnv.get.blockManager //获取blockManager val startTime = System.currentTimeMillis ///根据shuffleId和reduceId获取statuses对象,因为MapOutputTracker存放在SparkEnv中,SparkEnv类似于集群级别的共享变量 ///简单的说就是获取MapOutputputLocation /// Called from executors to get the server URIs and output sizes of the map outputs of a given shuffle. ///statuses是一个二元元组的集合,每个元组的第一个元素是BlockManagerId对象(包含三方面的信息,executorId_,host_,port_),第二个是数据的长度 val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId) logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format( shuffleId, reduceId, System.currentTimeMillis - startTime)) ////BlockManagerId封装了什么信息? ///ArrayBuffer的元素是Tuple二元组类型,分别是Int和Long类型 val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]] //statuses.zipWithIndex是个二元组,第一元素又是一个二元元组 ///address表示BlockManagerId对象 ///size表示数据大小 ///index表示索引?有什么用? for (((address, size), index) <- statuses.zipWithIndex) { ///这是什么操作?根据address获取一个二元组,然后把(index,size)赋值给它 ///splitsByAddress此时有一个元素(address,(index,size)) splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size)) } ///构造blockByAddress,得到一个Seq,元素类型是元组:(BlockManagerId, Seq[(BlockId, Long)]) ///元组的第一个元素是address,元组的第二个元素是一个新元组的集合,每个元组的第一个是一个元素是BlockId(由shuffleId,index, reduceId),第二个元素是数据的size val blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = splitsByAddress.toSeq.map { case (address, splits) => (address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2))) } ///定义一个内部函数 def unpackBlock(blockPair: (BlockId, Try[Iterator[Any]])) : Iterator[T] = { val blockId = blockPair._1 val blockOption = blockPair._2 blockOption match { case Success(block) => { block.asInstanceOf[Iterator[T]] } case Failure(e) => { blockId match { case ShuffleBlockId(shufId, mapId, _) => val address = statuses(mapId.toInt)._1 throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, e) case _ => throw new SparkException( "Failed to get block " + blockId + ", which is not a shuffle block", e) } } } } ///这是什么类? val blockFetcherItr = new ShuffleBlockFetcherIterator( context, SparkEnv.get.blockManager.shuffleClient, blockManager, blocksByAddress, serializer, SparkEnv.get.conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024) ///对blockFetcherItr集合ununpackBlock方法然后压平结果 val itr = blockFetcherItr.flatMap(unpackBlock) ///构造 CompletionIterator对象? val completionIter = CompletionIterator[T, Iterator[T]](itr, { context.taskMetrics.updateShuffleReadMetrics() }) ///包装了completionIter迭代器,组合模式 new InterruptibleIterator[T](context, completionIter) } }
3.8 ShuffleBlockFetcherIterator类
a. 从下面的类文档中可以看出,ShuffleBlockFetcherIterator类,用于fetch数据块,对于位于本地的block,从本地BlockManager获取;对于远端的数据块,使用BlockTransferService获取数据(BlockTransferService使用Netty作为底层的数据传输模块)
b. 最后的结果是获取了一个迭代器,迭代的每条记录是一个(BlockId,values)元组
c. throttle的含义是节流阀
/** * An iterator that fetches multiple blocks. For local blocks, it fetches from the local block * manager. For remote blocks, it fetches them using the provided BlockTransferService. * * This creates an iterator of (BlockID, values) tuples so the caller can handle blocks in a * pipelined fashion as they are received. * * The implementation throttles the remote fetches to they don't exceed maxBytesInFlight to avoid * using too much memory. * * @param context [[TaskContext]], used for metrics update * @param shuffleClient [[ShuffleClient]] for fetching remote blocks * @param blockManager [[BlockManager]] for reading local blocks * @param blocksByAddress list of blocks to fetch grouped by the [[BlockManagerId]]. * For each block we also require the size (in bytes as a long field) in * order to throttle the memory usage. * @param serializer serializer used to deserialize the data. * @param maxBytesInFlight max size (in bytes) of remote blocks to fetch at any given point. */
d.在BlockStoreShuffleFetcher中,ShuffleBlockFetcherIterator的构造如下,对比下构造函数
注意ShuffleClient是从SparkEnv的BlockManager获取的,
val blockFetcherItr = new ShuffleBlockFetcherIterator( context, SparkEnv.get.blockManager.shuffleClient, blockManager, blocksByAddress, serializer, SparkEnv.get.conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024)
e. ShuffleClient在BlockManager中的定义
// Client to read other executors' shuffle files. This is either an external service, or just the // standard BlockTranserService to directly connect to other Executors. private[spark] val shuffleClient = if (externalShuffleServiceEnabled) { //启用了外部ExternalShuffleService val transConf = SparkTransportConf.fromSparkConf(conf, numUsableCores) new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled()) } else { blockTransferService //否则使用blockTransferService: BlockTransferService,在Spark1.2中使用了基于Netty的数据传输NettyBlockTransferService }
3.9 BlockStoreShuffleFetcher的fetch方法调用blockFetcherItr.flatMap(unpackBlock)解析
结论:
在Reduce阶段,还会执行combine操作,进行深度的合并的操作,这是必然的,假如不同的Mapper上有多个单词A,那么计数的结果应该是把所有Mapper上的A进行合并得到最后的结果。
相关推荐
《Spark源码剖析》PDF 文件很可能会深入到这些技术细节,包括类结构、算法实现以及关键代码的解析,帮助读者更好地理解和优化 Spark 应用。通过深入学习 Spark 源码,开发者可以更好地掌握 Spark 内部工作原理,从而...
《Apache Spark源码剖析》则会深入到Spark的源代码层面,帮助读者理解其实现细节: 1. **源码结构**:Spark源代码的主要模块和包划分,以及关键类的职责。 2. **调度机制**:DAGScheduler和TaskScheduler的工作流程...
Spark 2.2.0是Apache Spark的一个重要版本,它带来了许多增强...通过深入研究Spark 2.2.0的源码,开发者可以更好地理解其内部机制,定制化自己的大数据处理流程,同时也能为未来的版本贡献代码,推动Spark的持续发展。
在GitHub上,Spark的源代码被广泛分享和贡献,为开发者提供了深入理解其内部机制和学习新功能的机会。"Spark github源码 例子很有价值"这个标题表明,通过分析这些源码,我们可以学习到Spark如何实现其核心功能,如...
7. **源码解析**:书中可能涵盖了Spark源码的解析,如任务调度、内存管理、shuffle过程等,帮助读者深入理解Spark的内部工作原理。 8. **aas-master实例**:这个文件夹很可能是包含了一系列的Spark应用实例,覆盖了...
本文将深入探讨如何利用Spark构建一个基于外卖数据的分析系统,以及在这个过程中涉及到的关键技术和应用。 首先,我们需要理解Spark的核心特性。Spark提供了一种内存计算模型,它将数据存储在内存中,从而极大地...
《Spark大数据分析与实战》课程是一门深入探讨Apache Spark在大数据处理领域的应用和技术的课程,其课后练习答案集提供了对课程所讲授知识的巩固和实践。这是一份珍贵的配套教学资源,旨在帮助学生更好地理解和掌握...
3. Shuffle过程:理解Spark如何处理shuffle操作,如`groupByKey`,涉及的HashPartitioner和SortBasedPartitioner等。 4.容错机制:研究RDD的 lineage(血统),了解如何通过重计算丢失的分区来恢复数据。 5. ...
Shuffle 读,根据新的分区策略重新组合数据。 10. **存储管理** Spark 支持多种存储级别,如内存、磁盘和序列化。存储层负责管理数据的缓存和回收,以优化内存使用。 11. **通信层** Spark 使用 Tachyon 或 ...
6. **Shuffle 操作**:在 Spark 中,Shuffle 是数据重新分区的过程,通常发生在 join、groupByKey 或 reduceByKey 等操作中。Shuffle 会触发网络传输,对性能影响较大,理解其内部机制有助于优化作业。 7. **容错...
源代码分析可以帮助我们理解任务调度、数据分区和Shuffle过程的细节。 3. **YARN资源管理系统**:随着Hadoop的发展,YARN(Yet Another Resource Negotiator)取代了最初的JobTracker,负责集群资源的管理和任务...
Shuffle是Spark中数据重新分布的过程,涉及数据在网络间的移动。优化Shuffle可以显著提高性能,如使用合适的分区策略、减少网络传输和避免Shuffle产生的临时文件。 **Spark调度和资源管理** Spark的调度器分为粗...
《Spark大数据处理实战练习题详解》 Spark作为大数据处理领域的重要工具,因其高效、易用的特性备受开发者青睐。为了帮助大家深入理解和掌握Spark的核心功能,我们整理了一系列的Spark考试练习题,涵盖从基础概念到...
- spark-3.1.2.tgz:这是一个tar归档文件,经过gzip压缩,通常包含源代码、文档、配置文件和编译后的二进制文件。 - spark-3.1.2-bin-hadoop2.7.tgz:这个版本除了包含基本的Spark组件外,还集成了Hadoop 2.7的二...
6. **Shuffle过程**:在Spark作业中,shuffle是一个重要的操作,它涉及重新组织数据以满足任务间的依赖。在Spark 2.2.0中,shuffle管理得到了优化,减少了网络传输和磁盘I/O,提高了性能。 7. **动态资源调度**:...
在大数据分析过程中,Spark提供了RDD(弹性分布式数据集)作为基本的数据抽象,它是不可变的、分区的并行数据集合。RDD可以通过转换操作(如map、filter)和行动操作(如count、save)进行操作。此外,DataFrame和...
这个版本的源码包可以从Spark的官方网站下载,用于深入学习其内部机制、理解代码结构和实现,以及根据个人需求进行定制化编译。 源码包中包含的主要目录和文件如下: 1. `core/`:Spark的核心模块,包含任务调度、...
2. DataFrame/Dataset API:提供了一种声明式编程方式,使得代码更加简洁易读,同时利用 Catalyst 优化器进行底层执行计划的优化。 3. SQL支持:Spark SQL与Hive兼容,允许用户使用标准SQL查询数据,同时也支持...
05_尚硅谷大数据技术之Spark内核.docx探讨了Spark的存储系统、执行计划优化、shuffle过程以及Tachyon和Alluxio等内存数据存储系统的集成,让读者深入了解Spark的运行机制。 五、Spark性能调优与故障处理 06_Spark...
例如,新的Shuffle服务可以提高shuffle操作的性能,而Tungsten项目则通过代码生成优化了DataFrame/Dataset的执行效率。 8. **SQL改进**: Spark SQL在3.0.0版本中引入了对ansi SQL标准的支持,增强了对窗口函数的...