`
bit1129
  • 浏览: 1070685 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark四十九】Spark Shuffle第四部分:Spark Hash Based Shuffle源代码流程解析

 
阅读更多

关于Spark的Hash based shuffle,其实已经在http://bit1129.iteye.com/blog/2180214中进行了基本的分析,不过那会对shuffle本身就不甚了解,分析之时有只见树木不见森林之惑,所以Hash Based Shuffle的整体流程并没有分析到位,但是那里却对一些常见的易犯迷糊的问题进行了总结,现在看上去,总结的着实不错,是时候从头到尾把自己写的东西看一遍了,现在是总结了原材料,再看就是升华阶段。同时,Spark的100篇博客,此目标不动摇!

 

Shuffle流程,第一个问题是什么时候需要Shuffle。Shuffle是由于RDD之间的依赖关系为ShuffleDependency而产生的,那么ShuffleDependency是什么时候建立的。RDD之间的依赖是DAGScheduler划分Stage的关键所在。给定两个RDD a和b,如果b对a的依赖类型是ShuffleDependency,那么划分Stage时,a是一个Stage(称为stage1)的结束,而b是另一个Stage(称为stage2)的开始。b所在的Stage称为shuffle stage。那么RDD之间的这种依赖关系是什么建立的呢?答案:是在RDD算子操作创建RDD时创建了RDD之间的依赖关系,也就是RDD创建之后,就可以通过getDependencies来获取它的依赖了。

因此本文从DAGScheduler根据ShuffleDependency划分Stage开始入手。

 

DAGScheduler根据ShuffleDependency划分Stage

1. 当作业提交给DAGScheduler,在DAGSchedule的 handleJobSubmitted方法中,参数RDD就是整个job的最后一个RDD,DAGScheduler需要做的是,从后向前回溯整个RDD 的依赖关系图,然后根据RDD之间的依赖关系,创建出不同的Stage,然后从前往后依次将Stage中的任务提交。如下是DAGScheduler的handleJobSubmitted方法

 

 

 private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_], ///这是整个Job的computing chain上的最后一个RDD,RDD之间的依赖已经确立
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int], ///整数数组,是Reducers的个数
      allowLocal: Boolean,
      callSite: CallSite,
      listener: JobListener,
      properties: Properties = null)
  {
    var finalStage: Stage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)///创建Stage,注意第三个参数,ShuffleDependency初始值为None,jobId是在DAGScheduler的submitJob方法中建立的
    } catch {
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }
    if (finalStage != null) { ///finalStage为null表示什么情况?
      val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) ///创建Job,参数是finalStage,那么finalStage应该携带者父Stage
      clearCacheLocs()
      logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
        job.jobId, callSite.shortForm, partitions.length, allowLocal))
      logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
      logInfo("Parents of final stage: " + finalStage.parents) ///列出finalStage依赖的Stage
      logInfo("Missing parents: " + getMissingParentStages(finalStage))////日志中列出尚未提交执行的父Stages
      val shouldRunLocally =
        localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
      if (shouldRunLocally) {
        // Compute very short actions like first() or take() with no parent stages locally.
        listenerBus.post(SparkListenerJobStart(job.jobId, Seq.empty, properties))
        runLocally(job)
      } else {
        jobIdToActiveJob(jobId) = job ///Map缓存
        activeJobs += job ///记录当前获取的Job
        finalStage.resultOfJob = Some(job)
        val stageIds = jobIdToStageIds(jobId).toArray
        val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
        listenerBus.post(SparkListenerJobStart(job.jobId, stageInfos, properties))
        submitStage(finalStage) ///提交作业
      }
    } 
    ////如果finalStage为null,则直接提交
    submitWaitingStages()
  }

 在上面的代码中,有三个主要的方法

newStage

submitStage

submitWaitingStages

下面分别来谈

2. newStage方法

 

2.1 newStage方法

 

  private def newStage(
      rdd: RDD[_], ///这是最后一个Stage的final stage,此处的Dependency关系尚未建立
      numTasks: Int,
      shuffleDep: Option[ShuffleDependency[_, _, _]], ///handleJobSubmitted方法调用时,传入的值为空
      jobId: Int,
      callSite: CallSite)
    : Stage = ///返回
  {
    val parentStages = getParentStages(rdd, jobId) ///根据rdd和jobId获取父Stages,那么getParentStages是否会调用newStage方法呢?
    val id = nextStageId.getAndIncrement() ///StageId
    val stage = new Stage(id, rdd, numTasks, shuffleDep, parentStages, jobId, callSite) ///Stage有id,包含的最后一个RDD,shuffleDep,父Stages,jobId
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }
 2.2 getParentStages源代码:
 /**
   * Get or create the list of parent stages for a given RDD. The stages will be assigned the
   * provided jobId if they haven't already been created with a lower jobId.
   */
  private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = { ///针对指定的RDD获取它的Parent Stages
    val parents = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new Stack[RDD[_]] ///RDD堆栈
    def visit(r: RDD[_]) { ///当前visit的RDD,无返回值
      if (!visited(r)) { ///如果还没有遍历这个RDD
        visited += r ///将r放入到已遍历的visited集合中
        // Kind of ugly: need to register RDDs with the cache here since
        // we can't do it in its constructor because # of partitions is unknown
        for (dep <- r.dependencies) { ////遍历RDD依赖的Dependency,可以是NarrowDependency,可以是ShuffleDependency
          dep match {
            case shufDep: ShuffleDependency[_, _, _] =>
              parents += getShuffleMapStage(shufDep, jobId) ///遇到一个ShfufleDependency, 则返回一个parent stage,
            case _ => ///如果是窄依赖,则直接压入堆栈,下次再遍历这个RDD
              waitingForVisit.push(dep.rdd)
          }
        }
      }
    }
    waitingForVisit.push(rdd) ///将最后一个RDD放入
    while (!waitingForVisit.isEmpty) { ///最后一个RDD放入,所以第一次访问时不为空;while循环中的判断逻辑,visit调用完后就会执行一次,而visit则会更新waitingForVisit
      visit(waitingForVisit.pop()) ///visit方法每次访问的都是最后一个压入堆栈的RDD
    }
    parents.toList
  }
 
2.3 getShuffleMapStage方法源代码
  /**
   * Get or create a shuffle map stage for the given shuffle dependency's map side.
   * The jobId value passed in will be used if the stage doesn't already exist with
   * a lower jobId (jobId always increases across jobs.)
   */
  private def getShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): Stage = {
    shuffleToMapStage.get(shuffleDep.shuffleId) match {
      case Some(stage) => stage ///如果shuffleId和Stage已经建立了对应关系,则直接返回shuffleToMapStage中保存的Stage
      case None => 
        // We are going to register ancestor shuffle dependencies
        registerShuffleDependencies(shuffleDep, jobId) ////注册registerShuffleDependency
        // Then register current shuffleDep
        val stage = ///调用newOrUsedStage方法
          newOrUsedStage(
            shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId,
            shuffleDep.rdd.creationSite)
        shuffleToMapStage(shuffleDep.shuffleId) = stage
 
        stage
    }
  }

 

getShuffleMapStage获取ShuffleMapStage的逻辑中,主要的两个方法

registerShuffleDependencies

newOrUsedStage

 

 

 

Spark涉及的角色图

 

 

示例程序

package spark.examples

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

object SparkWordCountHashShuffle {
  def main(args: Array[String]) {
    System.setProperty("hadoop.home.dir", "E:\\devsoftware\\hadoop-2.5.2\\hadoop-2.5.2");
    val conf = new SparkConf()
    conf.setAppName("SparkWordCount")
    conf.setMaster("local[3]")
    //Hash based Shuffle;
    conf.set("spark.shuffle.manager", "hash");
    val sc = new SparkContext(conf)
    val rdd = sc.textFile("file:///D:/word.in.3",4);
    val rdd1 = rdd.flatMap(_.split(" "))
    val rdd2 = rdd1.map((_, 1))
    val rdd3 = rdd2.reduceByKey(_ + _, 3);
    rdd3.saveAsTextFile("file:///D:/wordout" + System.currentTimeMillis());
    println(rdd3.toDebugString)
    sc.stop
  }
}

整个DAG图表示如下:

 

MappedRDD[5]///这个rdd没有打印出来,是在saveAsTextFile调用时,内部创建的MappedRDD,将Value转换为(K,V)的形式,K是null,但是DAG构造Stage图时会构造这个
(3) ShuffledRDD[4] at reduceByKey at SparkWordCountHashShuffle.scala:20 []
 +-(5) MappedRDD[3] at map at SparkWordCountHashShuffle.scala:19 []
    |  FlatMappedRDD[2] at flatMap at SparkWordCountHashShuffle.scala:18 []
    |  file:///D:/word.in.3 MappedRDD[1] at textFile at SparkWordCountHashShuffle.scala:17 []
    |  file:///D:/word.in.3 HadoopRDD[0] at textFile at SparkWordCountHashShuffle.scala:17 []
 

 

 

Hash based Shuffle Writer源代码解析

1. hash based shuffle writer操作发生在ShuffleMapTask的runTask中,首先看看runTask方法

 

  override def runTask(context: TaskContext): MapStatus = {
    // Deserialize the RDD using the broadcast variable.
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

    metrics = Some(context.taskMetrics)
    var writer: ShuffleWriter[Any, Any] = null
    try {
      val manager = SparkEnv.get.shuffleManager
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
      return writer.stop(success = true).get
    } catch {
      case e: Exception =>
        try {
          if (writer != null) {
            writer.stop(success = false)
          }
        } catch {
          case e: Exception =>
            log.debug("Could not stop writer", e)
        }
        throw e
    }
  }

分析下runTask做了哪些事情

  • 首先,runTask返回的是一个MapStatus对象,这个里面存放的是什么信息呢?
  • 反序列化出来一个rdd和dep的二元组,此二者所指什么
  • 从SparkEnv中获取shuffleManager,对于Hash Based Shuffle,这个是HashShuffleManager,它获取ShuffleWriter的依据dep.shuffleHandle, partitionId, context三方面信息。那么dep的shuffleHandle何时创建,里面包含了哪些信息,partitionId指的是该Task要处理的partition。它是要做Shuffle的RDD的一个partitionId。对于对于Hash Based Shuffle,这个ShuffleWriter是HashShuffleWriter
  • 调用HashShuffleWriter的write方法将rdd的一个partition写入到map output file中。
  • 写完后,执行HashShuffleWriter的stop方法返回MapStatus对象

接下来,逐步的分析上面的五步具体的逻辑

2. rdd和dep的生成

rdd指的是示例代码中的rdd2,这个rdd指的是要进行shuffle算子操作的RDD,而计算得到的rdd3,(MappedRDD)依赖于这个rdd。

3.dep的shuffleHandle生成

dep中的shuffleHandle是在Dependency中实例化的,如下代码所示,它是调用ShuffleManagerManager的registerShuffle方法完成的

  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, _rdd.partitions.size, this)

 ShuffleManagerManager的registerShuffle方法:

  /* Register a shuffle with the manager and obtain a handle for it to pass to tasks. */
  override def registerShuffle[K, V, C](
      shuffleId: Int,
      numMaps: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
    new BaseShuffleHandle(shuffleId, numMaps, dependency) ///shuffleId以及numMaps
  }

 

 

 

分享到:
评论

相关推荐

    Spark技术内幕深入解析Spark内核架构设计与实现原理

    10. **Spark Shuffle优化**:包括减少shuffle数据量、使用更高效的shuffle机制如Sort-based Shuffle和Hash-based Shuffle,以及合理配置executor内存和并行度,以提高性能。 11. **资源管理和调度**:Spark可以与...

    Spark 的两种核心 Shuffle 详解.pdf

    Spark Shuffle 分为两种:一种是基于 Hash 的 Shuffle;另一种是基于 Sort 的 Shuffle。

    spark shuffle简介

    4. **Spark 1.2.0**:Sort-based Shuffle逐渐成为推荐的Shuffle模式,因为它可以提供更好的磁盘和网络效率。然而,Hash-based Shuffle仍然保留,作为低内存使用场景下的备选方案。 5. **Spark 2.0.0**:进一步优化...

    Spark-shuffle机制.pdf

    ### Spark Shuffle机制详解 #### 一、Spark Shuffle概念与作用 **Spark Shuffle**是指在Spark应用程序执行过程中,数据从一个节点或分区移动到另一个节点或分区的过程。这种数据重分布通常发生在诸如`groupByKey`,...

    Spark Shuffle优化-参数调优1

    Spark Shuffle 优化 - 参数调优 1 Spark 是一个基于内存的分布式计算框架,它的核心是shuffle操作。Shuffle 操作是 Spark 中最耗时的操作之一,而 shuffle 优化是Spark 优化的关键。 本文将对 Spark Shuffle 优化...

    spark调优介绍

    5. 使用sort-based shuffle代替hash-based shuffle:sort-based shuffle可以提供更好的排序效果,减少数据交换。 以上策略需要结合实际应用场景灵活调整,持续监控Spark应用的性能指标,如executor内存使用情况、...

    spark调优.rar

    6. **Shuffle操作优化**:Shuffle是Spark性能瓶颈之一,减少不必要的Shuffle(如通过Coalesce操作)和优化Shuffle管理(如使用更高效的Hash Shuffle或Sort Shuffle)能显著提升性能。 7. **宽依赖与窄依赖**:理解...

    Spark大数据商业实战三部曲_内核解密_商业案例_性能调优 实例源码

    《Spark大数据商业实战三部曲》是一套深度探讨Spark技术在商业环境中的应用、内核解析及性能优化的实例源码集合。这份压缩包包含了丰富的实践案例和代码,旨在帮助读者深入理解Spark的核心机制,掌握大数据处理的...

    SparkDemo12

    【SparkDemo12】是一个基于Apache Spark的演示项目,它可能是为了展示Spark的核心功能和用法。Apache Spark是一个用于大规模数据处理的开源集群计算系统,以其高性能、易用性和广泛的功能而闻名。在这个Demo中,我们...

    SparkSourceCodeReading:Spark源代码阅读-spark source code

    在Spark源代码阅读的过程中,我们可以深入理解Spark的工作原理和架构设计,这对于提升开发效率和优化性能至关重要。Spark作为一个分布式计算框架,其核心组件包括:弹性分布式数据集(RDD)、存储系统、调度器、 ...

    Spark大数据处理:技术、应用与性能优化

    4. Shuffle操作:数据在不同节点间重新分配的过程,是Spark性能优化的重点之一。 二、Spark组件与应用 1. Spark Core:基础执行引擎,负责任务调度、内存管理等。 2. Spark SQL:提供SQL接口,支持结构化数据处理...

    spark原理示意图,执行计划,shuffle,架构,检查点,缓存,广播

    为了优化shuffle过程,Spark使用Hash Shuffling和Sort Shuffling策略,后者通过排序保证了数据的稳定性。 Spark的架构是基于Master-Worker模式,由一个或多个Driver(通常是用户的应用程序)和多个Executor组成。...

    Spark性能优化基础篇

    shuffle调优则需要理解Spark的shuffle过程,通过调整hash分区策略、增加shuffle write buffer大小等手段,优化数据传输和合并过程。 总的来说,Spark性能优化是一个综合的过程,需要从开发阶段就考虑效率问题,并...

    spark dataframe 将一列展开,把该列所有值都变成新列的方法

    ### Spark DataFrame将一列展开,把该列所有值都变成新列的方法 在处理大数据时,Apache Spark 是一个非常强大的工具。特别是在数据处理与分析领域,Spark 的 DataFrame API 提供了丰富的功能来帮助用户高效地操作...

    Spark SQL- Relational Data Processing in Spark(Paper).rar

    Spark SQL在处理大型数据集的join操作时,采用多种策略,如shuffle hash join、broadcast hash join等,根据数据量和内存可用性选择最优策略。 11. **Window Functions**: Spark SQL支持窗口函数,这在处理时间...

    sparksqlCmd_Spark!_spark_

    SparkSQL是Apache Spark项目的一部分,它提供了一个用于处理结构化数据的强大工具,允许开发者使用SQL或者DataFrame API来操作数据。SparkSQL集成了Hive,因此它可以读取和写入Hive表,同时支持多种数据源,如HDFS、...

    基于Spark的实时攻击检测报告

    4. 缓存存储:Redis 是一个 key-value 存储系统,支持存储的 value 类型包括 string(字符串)、list(链表)、set(集合)、zset(sorted set --有序集合)和 hash(哈希类型)。Redis 支持 push/pop、add/remove ...

    hashcat [hashcat wiki].rar

    This version combines the previous CPU-based hashcat (now called hashcat-legacy) and GPU-based oclHashcat. Hashcat is released as open source software under the MIT license. Current Version ...

    大规模游戏社交网络节点相似性算法及其应用-8-5 美团 Spark Shuffle 架构演进.zip

    随着技术的发展,美团在Spark Shuffle上引入了内存优化策略,如使用RDMA(Remote Direct Memory Access)技术减少网络传输延迟,以及采用更高效的Hash Partitioner和Bucketing策略降低数据冲突。此外,美团还引入了...

    前端项目-spark-md5.zip

    在"js-spark-md5-master"文件中,包含了Spark-MD5库的源代码和其他相关资源。主要文件结构可能包括: 1. `spark-md5.js`:这是Spark-MD5的核心库文件,包含了MD5算法的实现。 2. `spark-md5.min.js`:压缩后的版本...

Global site tag (gtag.js) - Google Analytics