`

spark-学习笔记--23 DAGScheduler 的stage划分

 
阅读更多

DAGScheduler 的stage划分

 

 

    /**  org.apache.spark.scheduler.DAGScheduler 中的 submitStage
	*   提交stage, 先提交缺失的父stage
	*/
  private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        val missing = getMissingParentStages(stage).sortBy(_.id)  //注意这里
        logDebug("missing: " + missing)
        if (missing.isEmpty) {// 所有的父stage都提交完了 再提交自己
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          submitMissingTasks(stage, jobId.get)
        } else {
          for (parent <- missing) {
            submitStage(parent)//注意这里
          }
          waitingStages += stage
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id, None)
    }
  }

 

 

 

  private def getMissingParentStages(stage: Stage): List[Stage] = {
    val missing = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new Stack[RDD[_]]
    def visit(rdd: RDD[_]) {
      if (!visited(rdd)) {
        visited += rdd
        val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
        if (rddHasUncachedPartitions) {
          for (dep <- rdd.dependencies) {
            dep match {
              case shufDep: ShuffleDependency[_, _, _] => //注意shufDep宽依赖会产生新的stage
                val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)
                if (!mapStage.isAvailable) {
                  missing += mapStage
                }
              case narrowDep: NarrowDependency[_] =>      //注意这里  narrowDep 
                waitingForVisit.push(narrowDep.rdd)
            }
          }
        }
      }
    }
    waitingForVisit.push(stage.rdd)
    while (waitingForVisit.nonEmpty) {
      visit(waitingForVisit.pop())
    }
    missing.toList
  } 

 

分享到:
评论

相关推荐

    spark-hive-2.11和spark-sql-以及spark-hadoop包另付下载地址

    在标题"spark-hive-2.11和spark-sql-以及spark-hadoop包另付下载地址"中,我们关注的是Spark与Hive的特定版本(2.11)的集成,以及Spark SQL和Spark对Hadoop的支持。这里的2.11可能指的是Scala的版本,因为Spark是用...

    spark-3.1.2.tgz & spark-3.1.2-bin-hadoop2.7.tgz.rar

    Spark-3.1.2.tgz和Spark-3.1.2-bin-hadoop2.7.tgz是两个不同格式的Spark发行版,分别以tar.gz和rar压缩格式提供。 1. Spark核心概念: - RDD(弹性分布式数据集):Spark的基础数据结构,是不可变、分区的数据集合...

    编译的spark-hive_2.11-2.3.0和 spark-hive-thriftserver_2.11-2.3.0.jar

    spark-hive_2.11-2.3.0 spark-hive-thriftserver_2.11-2.3.0.jar log4j-2.15.0.jar slf4j-api-1.7.7.jar slf4j-log4j12-1.7.25.jar curator-client-2.4.0.jar curator-framework-2.4.0.jar curator-recipes-2.4.0....

    spark-2.0.0-bin-hadoop2.6.tgz

    本资源是spark-2.0.0-bin-hadoop2.6.tgz百度网盘资源下载,本资源是spark-2.0.0-bin-hadoop2.6.tgz百度网盘资源下载

    spark-assembly-1.5.2-hadoop2.6.0.jar

    《Spark编程核心组件:spark-assembly-1.5.2-hadoop2.6.0.jar详解》 在大数据处理领域,Spark以其高效、易用和灵活性脱颖而出,成为了许多开发者的首选框架。Spark-assembly-1.5.2-hadoop2.6.0.jar是Spark中的一个...

    apache-doris-spark-connector-2.3_2.11-1.0.1

    Spark Doris Connector(apache-doris-spark-connector-2.3_2.11-1.0.1-incubating-src.tar.gz) Spark Doris Connector Version:1.0.1 Spark Version:2.x Scala Version:2.11 Apache Doris是一个现代MPP分析...

    spark-2.4.7-bin-hadoop2.6.tgz

    在解压`spark-2.4.7-bin-hadoop2.6.tgz`后,您会得到一个名为`spark-2.4.7-bin-hadoop2.6`的目录,其中包括以下组件: - `bin/`:包含可执行文件,如`spark-submit`,`pyspark`,`spark-shell`等,用于启动和管理...

    spark-1.6.0-bin-hadoop2.6.tgz

    Spark-1.6.0-bin-hadoop2.6.tgz 是针对Linux系统的Spark安装包,包含了Spark 1.6.0版本以及与Hadoop 2.6版本兼容的构建。这个安装包为在Linux环境中搭建Spark集群提供了必要的组件和库。 **1. Spark基础知识** ...

    spark-3.2.4-bin-hadoop3.2-scala2.13 安装包

    综上所述,“spark-3.2.4-bin-hadoop3.2-scala2.13”安装包是构建和运行Spark应用程序的基础,涵盖了大数据处理、流处理、机器学习等多个领域,为开发者提供了高效、灵活的数据处理平台。通过深入理解和熟练运用,...

    spark-3.1.3-bin-without-hadoop.tgz

    这个"spark-3.1.3-bin-without-hadoop.tgz"压缩包是Spark的3.1.3版本,不含Hadoop依赖的二进制发行版。这意味着在部署时,你需要自行配置Hadoop环境,或者在不依赖Hadoop的环境中运行Spark。 Spark的核心特性包括...

    spark-2.4.8-bin-hadoop2.7.tgz

    安装和配置Spark 2.4.8时,你需要根据你的环境调整配置文件,如`spark-env.sh`或`spark-defaults.conf`,以适应你的Hadoop集群或本地环境。在使用Spark时,你可以通过`spark-submit`命令提交应用程序,或者直接在...

    spark-3.1.2-bin-hadoop3.2.tgz

    5. **交互式Shell**:Spark提供了一个名为`spark-shell`的交互式环境,方便开发人员测试和调试代码。 **Spark与Hadoop 3.2的兼容性** Hadoop 3.2引入了许多新特性,如: 1. **多命名空间**:支持多个HDFS命名空间...

    spark-2.2.2-bin-hadoop2.7.tgz

    1. `bin`:存放可执行脚本,如`spark-submit`用于提交Spark应用,`spark-shell`提供交互式Shell环境。 2. `conf`:配置文件夹,存放默认配置模板,如`spark-defaults.conf`,用户可以根据需求自定义配置。 3. `jars`...

    spark-2.3.1-bin-hadoop2.7.zip

    - `bin`:包含Spark的可执行脚本,如`spark-shell`(Scala交互式环境)、`pyspark`(Python交互式环境)和`spark-submit`(提交Spark应用)等。 - `conf`:配置文件目录,其中`spark-defaults.conf`是默认配置,可以...

    spark--bin-hadoop3-without-hive.tgz

    本压缩包“spark--bin-hadoop3-without-hive.tgz”提供了Spark二进制版本,针对Hadoop 3.1.3进行了编译和打包,这意味着它已经与Hadoop 3.x兼容,但不包含Hive组件。在CentOS 8操作系统上,这个版本的Spark已经被...

    spark-2.1.0-bin-without-hadoop版本的压缩包,直接下载到本地解压后即可使用

    在Ubuntu里安装spark,spark-2.1.0-bin-without-hadoop该版本直接下载到本地后解压即可使用。 Apache Spark 是一种用于大数据工作负载的分布式开源处理系统。它使用内存中缓存和优化的查询执行方式,可针对任何规模...

    spark-3.2.1-bin-hadoop2.7.tgz

    总结一下,"spark-3.2.1-bin-hadoop2.7.tgz"是一个专为Linux设计的Spark版本,与Hadoop 2.7兼容,提供了高效的大数据处理能力,涵盖了核心计算、SQL查询、流处理、机器学习和图计算等多个方面。在实际应用中,开发者...

    spark-3.1.3-bin-hadoop3.2.tgz

    在这个特定的压缩包"spark-3.1.3-bin-hadoop3.2.tgz"中,我们得到了Spark的3.1.3版本,它已经预编译为与Hadoop 3.2兼容。这个版本的Spark不仅提供了源码,还包含了预编译的二进制文件,使得在Linux环境下快速部署和...

    spark-2.4.0-bin-hadoop2.7.tgz

    然后,你可以通过`spark-submit`命令提交Spark作业到集群,或者使用`pyspark`或`spark-shell`启动交互式环境。 在实际应用中,Spark常被用于大数据分析、实时数据处理、机器学习模型训练和图数据分析。由于其内存...

    spark-3.2.0-bin-hadoop3.2.tgz

    这个压缩包"spark-3.2.0-bin-hadoop3.2.tgz"包含了Spark 3.2.0版本的二进制文件,以及针对Hadoop 3.2的兼容构建。 Spark的核心组件包括:Spark Core、Spark SQL、Spark Streaming、MLlib(机器学习库)和GraphX(图...

Global site tag (gtag.js) - Google Analytics