`
wbj0110
  • 浏览: 1602790 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

Spark源码系列(六)Shuffle的过程解析

阅读更多

Spark大会上,所有的演讲嘉宾都认为shuffle是最影响性能的地方,但是又无可奈何。之前去百度面试hadoop的时候,也被问到了这个问题,直接回答了不知道。

这篇文章主要是沿着下面几个问题来开展:

1、shuffle过程的划分?

2、shuffle的中间结果如何存储?

3、shuffle的数据如何拉取过来?

Shuffle过程的划分

Spark的操作模型是基于RDD的,当调用RDD的reduceByKey、groupByKey等类似的操作的时候,就需要有shuffle了。再拿出reduceByKey这个来讲。

  def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = {
    reduceByKey(new HashPartitioner(numPartitions), func)
  }

reduceByKey的时候,我们可以手动设定reduce的个数,如果不指定的话,就可能不受控制了。

复制代码
  def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
    val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse for (r <- bySize if r.partitioner.isDefined) { return r.partitioner.get } if (rdd.context.conf.contains("spark.default.parallelism")) { new HashPartitioner(rdd.context.defaultParallelism)
    } else { new HashPartitioner(bySize.head.partitions.size)
    }
  }
复制代码

如果不指定reduce个数的话,就按默认的走:

1、如果自定义了分区函数partitioner的话,就按你的分区函数来走。

2、如果没有定义,那么如果设置了spark.default.parallelism,就使用哈希的分区方式,reduce个数就是设置的这个值。

3、如果这个也没设置,那就按照输入数据的分片的数量来设定。如果是hadoop的输入数据的话,这个就多了。。。大家可要小心啊。

设定完之后,它会做三件事情,也就是之前讲的3次RDD转换。

 View Code

1、在第一个MapPartitionsRDD这里先做一次map端的聚合操作。

2、ShuffledRDD主要是做从这个抓取数据的工作。

3、第二个MapPartitionsRDD把抓取过来的数据再次进行聚合操作。

4、步骤1和步骤3都会涉及到spill的过程。

怎么做的聚合操作,回去看RDD那章。

Shuffle的中间结果如何存储

作业提交的时候,DAGScheduler会把Shuffle的过程切分成map和reduce两个Stage(之前一直被我叫做shuffle前和shuffle后),具体的切分的位置在上图的虚线处。

map端的任务会作为一个ShuffleMapTask提交,最后在TaskRunner里面调用了它的runTask方法。

复制代码
  override def runTask(context: TaskContext): MapStatus = {
    val numOutputSplits = dep.partitioner.numPartitions
    metrics = Some(context.taskMetrics)

    val blockManager = SparkEnv.get.blockManager
    val shuffleBlockManager = blockManager.shuffleBlockManager var shuffle: ShuffleWriterGroup = null var success = false try { // serializer为空的情况调用默认的JavaSerializer,也可以通过spark.serializer来设置成别的 val ser = Serializer.getSerializer(dep.serializer) // 实例化Writer,Writer的数量=numOutputSplits=前面我们说的那个reduce的数量 shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser) // 遍历rdd的元素,按照key计算出来它所在的bucketId,然后通过bucketId找到相应的Writer写入 for (elem <- rdd.iterator(split, context)) {
        val pair = elem.asInstanceOf[Product2[Any, Any]]
        val bucketId = dep.partitioner.getPartition(pair._1)
        shuffle.writers(bucketId).write(pair)
      } // 提交写入操作. 计算每个bucket block的大小 var totalBytes = 0L var totalTime = 0L val compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter => writer.commit()
        writer.close()
        val size = writer.fileSegment().length
        totalBytes += size
        totalTime += writer.timeWriting()
        MapOutputTracker.compressSize(size)
      } // 更新 shuffle 监控参数. val shuffleMetrics = new ShuffleWriteMetrics
      shuffleMetrics.shuffleBytesWritten = totalBytes
      shuffleMetrics.shuffleWriteTime = totalTime
      metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)

      success = true new MapStatus(blockManager.blockManagerId, compressedSizes)
    } catch { case e: Exception => // 出错了,取消之前的操作,关闭writer if (shuffle != null && shuffle.writers != null) { for (writer <- shuffle.writers) {
          writer.revertPartialWrites()
          writer.close()
        }
      } throw e
    } finally { // 关闭writer if (shuffle != null && shuffle.writers != null) { try {
          shuffle.releaseWriters(success)
        } catch { case e: Exception => logError("Failed to release shuffle writers", e)
        }
      } // 执行注册的回调函数,一般是做清理工作  context.executeOnCompleteCallbacks()
    }
  }
复制代码

遍历每一个记录,通过它的key来确定它的bucketId,再通过这个bucket的writer写入数据。

下面我们看看ShuffleBlockManager的forMapTask方法吧。

复制代码
def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = { new ShuffleWriterGroup {
      shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets)) private val shuffleState = shuffleStates(shuffleId) private var fileGroup: ShuffleFileGroup = null val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
        fileGroup = getUnusedFileGroup()
        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) // 从已有的文件组里选文件,一个bucket一个文件,即要发送到同一个reduce的数据写入到同一个文件  blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize)
        }
      } else {
        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => // 按照blockId来生成文件,文件数为map数*reduce数 val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
          val blockFile = blockManager.diskBlockManager.getFile(blockId) if (blockFile.exists) { if (blockFile.delete()) {
              logInfo(s"Removed existing shuffle file $blockFile")
            } else {
              logWarning(s"Failed to remove existing shuffle file $blockFile")
            }
          }
          blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
        }
      }
复制代码

1、map的中间结果是写入到本地硬盘的,而不是内存。

2、默认是一个Executor的中间结果文件是M*R(M=map数量,R=reduce的数量),设置了spark.shuffle.consolidateFiles为true之后是R个文件,根据bucketId把要分到同一个reduce的结果写入到一个文件中。

3、consolidateFiles采用的是一个reduce一个文件,它还记录了每个map的写入起始位置,所以查找的时候先通过reduceId查找到哪个文件,再通过mapId查找索引当中的起始位置offset,长度length=(mapId + 1).offset -(mapId).offset,这样就可以确定一个FileSegment(file, offset, length)。

4、Finally,存储结束之后, 返回了一个new MapStatus(blockManager.blockManagerId, compressedSizes),把blockManagerId和block的大小都一起返回。

个人想法,shuffle这块和hadoop的机制差别不大,tez这样的引擎会赶上spark的速度呢?还是让我们拭目以待吧!

Shuffle的数据如何拉取过来

ShuffleMapTask结束之后,最后走到DAGScheduler的handleTaskCompletion方法当中(关于中间的过程,请看《图解作业生命周期》)。

 View Code

1、把结果添加到Stage的outputLocs数组里,它是按照数据的分区Id来存储映射关系的partitionId->MapStaus。

2、stage结束之后,通过mapOutputTracker的registerMapOutputs方法,把此次shuffle的结果outputLocs记录到mapOutputTracker里面。

这个stage结束之后,就到ShuffleRDD运行了,我们看一下它的compute函数。

SparkEnv.get.shuffleFetcher.fetch[P](shuffledId, split.index, context, ser)

它是通过ShuffleFetch的fetch方法来抓取的,具体实现在BlockStoreShuffleFetcher里面。

 View Code

1、MapOutputTrackerWorker向MapOutputTrackerMaster获取shuffle相关的map结果信息。

2、把map结果信息构造成BlockManagerId --> Array(BlockId, size)的映射关系。

3、通过BlockManager的getMultiple批量拉取block。

4、返回一个可遍历的Iterator接口,并更新相关的监控参数。

我们继续看getMultiple方法。

复制代码
 def getMultiple(
      blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
      serializer: Serializer): BlockFetcherIterator = {
    val iter = if (conf.getBoolean("spark.shuffle.use.netty", false)) { new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer)
      } else { new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer)
      }

    iter.initialize()
    iter
  }
复制代码

分两种情况处理,分别是netty的和Basic的,Basic的就不讲了,就是通过ConnectionManager去指定的BlockManager那里获取数据,上一章刚好说了。

我们讲一下Netty的吧,这个是需要设置的才能启用的,不知道性能会不会好一些呢?

看NettyBlockFetcherIterator的initialize方法,再看BasicBlockFetcherIterator的initialize方法,发现Basic的不能同时抓取超过48Mb的数据。

复制代码
    override def initialize() { // 分开本地请求和远程请求,返回远程的FetchRequest val remoteRequests = splitLocalRemoteBlocks() // 抓取顺序随机 for (request <- Utils.randomize(remoteRequests)) {
        fetchRequestsSync.put(request)
      } // 默认是开6个线程去进行抓取 copiers = startCopiers(conf.getInt("spark.shuffle.copier.threads", 6))// 读取本地的block  getLocalBlocks()
   }
复制代码

在NettyBlockFetcherIterator的sendRequest方法里面,发现它是通过ShuffleCopier来试下的。

  val cpier = new ShuffleCopier(blockManager.conf)
   cpier.getBlocks(cmId, req.blocks, putResult)

这块接下来就是netty的客户端调用的方法了,我对这个不了解。在服务端的处理是在DiskBlockManager内部启动了一个ShuffleSender的服务,最终的业务处理逻辑是在FileServerHandler。

它是通过getBlockLocation返回一个FileSegment,下面这段代码是ShuffleBlockManager的getBlockLocation方法。

复制代码
  def getBlockLocation(id: ShuffleBlockId): FileSegment = { // Search all file groups associated with this shuffle. val shuffleState = shuffleStates(id.shuffleId) for (fileGroup <- shuffleState.allFileGroups) {
      val segment = fileGroup.getFileSegmentFor(id.mapId, id.reduceId) if (segment.isDefined) { return segment.get }
    } throw new IllegalStateException("Failed to find shuffle block: " + id)
  }
复制代码

先通过shuffleId找到ShuffleState,再通过reduceId找到文件,最后通过mapId确定它的文件分片的位置。但是这里有个疑问了,如果启用了consolidateFiles,一个reduce的所需数据都在一个文件里,是不是就可以把整个文件一起返回呢,而不是通过N个map来多次读取?还是害怕一次发送一个大文件容易失败?这就不得而知了。

到这里整个过程就讲完了。可以看得出来Shuffle这块还是做了一些优化的,但是这些参数并没有启用,有需要的朋友可以自己启用一下试试效果。

http://www.luobo360.com/article/140

分享到:
评论

相关推荐

    Spark源码剖析

    7. 实验设计:为了加深对Spark源码分析的理解,课程可能会设置一系列实验。这些实验可能会涉及修改Spark源码以观察特定行为的变化,或者是基于源码分析来优化Spark程序的性能。 8. 最佳实践:最后,课程可能会分享...

    Spark源码深度解读

    Spark源码解读迷你 RDD、Spark Submit、Job、Runtime、Scheduler、Spark Storage、Shuffle、Standlone算法、Spark On yarn。。。

    Spark-2.3.1源码解读

    Spark-2.3.1源码解读。 Spark Core源码阅读 Spark Context 阅读要点 Spark的缓存,变量,shuffle数据等清理及机制 Spark-submit关于参数及部署模式的部分解析 GroupByKey VS ReduceByKey OrderedRDDFunctions...

    spark 源码解读迷你书

    《Spark源码解读迷你书》是一本专注于深入理解Apache Spark核心源码的书籍,适合对大数据处理技术有深厚兴趣并且想要探索Spark内部机制的读者。在阅读这本书之前,建议先搭建好Spark开发环境,比如使用Intelij IDEA...

    大数据Spark源码

    《深入解析Spark源码:大数据处理的基石》 Spark,作为大数据处理领域的明星框架,以其高效、易用和可扩展性赢得了广泛的认可。Spark源码的学习对于深入理解其内部机制,提升开发效率,以及解决实际问题具有至关...

    spark-2.4.0源码

    深入理解Spark源码,有助于开发者优化应用程序性能、调试问题,甚至为Spark贡献代码。Spark的源码是用Java和Scala编写的,因此熟悉这两种语言对于理解源码至关重要。同时,理解Scala的Actor模型和Akka框架也是解析...

    spark源码以及官方的示例(方便阅读源码学习)

    3. **Shuffle过程**:在Spark中,Shuffle是数据重新分布的过程,涉及到MapTask和ReduceTask。源码解析可以帮助理解数据分发和排序的细节。 4. **存储机制**:Spark支持内存和磁盘混合存储,源码可以揭示如何管理...

    07-尚硅谷大数据技术之Spark源码1

    Spark 源码解析 Spark 是一种基于内存的分布式计算框架,主要用于大数据处理。它的源码中包含了许多复杂的组件和算法,本文将对 Spark 源码中的关键组件和算法进行解析。 环境准备(Yarn 集群) 在 Spark 中,...

    spark源码阅读笔记

    Spark是Apache软件基金会下的一个开源大数据处理框架,以其高效的计算模型和易用性而闻名。...通过对Spark源码的深入学习,开发者可以更好地掌握大数据处理的底层机制,提升在大规模数据环境中的编程能力。

    Spark-内核源码解析.docx

    Spark 内核源码解析 Spark 是一个大规模数据处理框架,它的核心原理和架构组件对开发者和研究人员来说非常重要。为了深入了解 Spark 的内部机制,我们需要对其内核源码进行深入分析。 Application/App 在 Spark ...

    spark-2.2.0源码

    6. **Spark Shuffle**:在大规模数据处理中,Shuffle是一个关键步骤,它涉及到数据的重新分布。源码中详细阐述了如何在节点间进行数据传输和重排。 7. **容错机制**:Spark的容错机制是基于RDD的血统(Lineage),...

    spark新手上路之源码解析.pdf

    Apache Spark 源码的理解 Apache Spark 是一个用于大规模数据处理的开源框架,以其高效的内存计算和弹性容错性而闻名。Spark 提供了一个统一的数据处理平台,支持批处理、交互式查询、流处理和机器学习等多种计算...

    深入理解Spark核心思想与源码分析

    通过阅读Spark源码,可以深入了解任务调度、内存管理、容错机制等内部工作原理,这对于调优和解决实际问题非常有帮助。 **10. 性能调优** Spark的性能调优涉及多个方面,包括配置参数调整(如executor内存、并行度...

    ApacheSpark设计与实现.pdf+ApacheSpark源码剖析.pdf+Spark原著中文版.pdf

    《Apache Spark源码剖析》则会深入到Spark的源代码层面,帮助读者理解其实现细节: 1. **源码结构**:Spark源代码的主要模块和包划分,以及关键类的职责。 2. **调度机制**:DAGScheduler和TaskScheduler的工作流程...

    spark源码阅读笔记(详)

    #### 六、Spark源码解析方法论 1. **从Main函数开始**: 分布式程序的解析往往需要从系统的初始化部分入手,理解系统如何启动以及各组件之间的交互。 2. **关注核心组件**: 如Master和Worker的启动过程、Driver与...

    spark-2.4.3源码

    《Spark 2.4.3源码解析:深入大数据处理的核心》 Spark,作为大数据处理领域的重要工具,因其高效、易用和通用性而备受推崇。Spark 2.4.3是Apache Spark的一个稳定版本,发布于2019年5月7日,其核心组件spark-core_...

    深入理解Spark:核心思想及源码分析.pdf

    书中可能会深入到Spark源码,解析其任务调度、内存管理和数据交换等关键机制,帮助读者理解Spark是如何高效运行的。例如,DAGScheduler如何将作业拆分为任务,以及TaskScheduler如何将任务分配给Executor。 7. **...

Global site tag (gtag.js) - Google Analytics