`

spark内核揭秘-07-DAGScheduler源码解读初体验

阅读更多

当构建完TaskScheduler之后,我们需要构建DAGScheduler这个核心对象:

进入其构造函数中:

可以看出构建DAGScheduler实例的时候需要把TaskScheduler实例对象作为参数传入。

LiveListenerBus:

MapOutputTrackerMaster:

BlockManagerMaster:

通过阅读代码,我们可以发现DAGScheduler实例化的时候,调用了initializeEventProcessActor()方法

 

private def initializeEventProcessActor() {
  // blocking the thread until supervisor is started, which ensures eventProcessActor is
  // not null before any job is submitted
  // 阻塞当前线程,等待supervisor启动,这样可以确保Job提交时,eventProcessActor not null
implicit val timeout = Timeout(30 seconds)
  val initEventActorReply =
    dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this))
  eventProcessActor = Await.result(initEventActorReply, timeout.duration).
    asInstanceOf[ActorRef]
}

initializeEventProcessActor()

DAGSchedulerEventProcessActor:

 

 

private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler)
  extends Actor with Logging {

  override def preStart() {
    // set DAGScheduler for taskScheduler to ensure eventProcessActor is always
    // valid when the messages arrive
    // 设置taskScheduler对DAGScheduler的引用句柄。在此处设置保证了Job提交时候
    // eventProcessActor已经准备就绪
dagScheduler.taskScheduler.setDAGScheduler(dagScheduler)
  }

  /**
   * The main event loop of the DAG scheduler.
   */
def receive = {
    case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
listener, properties)

    case StageCancelled(stageId) =>
      dagScheduler.handleStageCancellation(stageId)

    case JobCancelled(jobId) =>
      dagScheduler.handleJobCancellation(jobId)

    case JobGroupCancelled(groupId) =>
      dagScheduler.handleJobGroupCancelled(groupId)

    case AllJobsCancelled =>
      dagScheduler.doCancelAllJobs()

    case ExecutorAdded(execId, host) =>
      dagScheduler.handleExecutorAdded(execId, host)

    case ExecutorLost(execId) =>
      dagScheduler.handleExecutorLost(execId, fetchFailed = false)

    case BeginEvent(task, taskInfo) =>
      dagScheduler.handleBeginEvent(task, taskInfo)

    case GettingResultEvent(taskInfo) =>
      dagScheduler.handleGetTaskResult(taskInfo)

    case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
      dagScheduler.handleTaskCompletion(completion)

    case TaskSetFailed(taskSet, reason) =>
      dagScheduler.handleTaskSetFailed(taskSet, reason)

    case ResubmitFailedStages =>
      dagScheduler.resubmitFailedStages()
  }

  override def postStop() {
    // Cancel any active jobs in postStop hook
dagScheduler.cleanUpAfterSchedulerStop()
  }
}

 

 

可以看出核心在于实例化eventProcessActor对象,eventProcessActor会负责接收和发送DAGScheduler的消息,是DAGScheduler的通信载体。

分享到:
评论

相关推荐

    spark-3.1.3-bin-without-hadoop.tgz

    这个"spark-3.1.3-bin-without-hadoop.tgz"压缩包是Spark的3.1.3版本,不含Hadoop依赖的二进制发行版。这意味着在部署时,你需要自行配置Hadoop环境,或者在不依赖Hadoop的环境中运行Spark。 Spark的核心特性包括...

    编译的spark-hive_2.11-2.3.0和 spark-hive-thriftserver_2.11-2.3.0.jar

    spark-hive_2.11-2.3.0 spark-hive-thriftserver_2.11-2.3.0.jar log4j-2.15.0.jar slf4j-api-1.7.7.jar slf4j-log4j12-1.7.25.jar curator-client-2.4.0.jar curator-framework-2.4.0.jar curator-recipes-2.4.0....

    spark资源 spark-2.3.2-bin-hadoop2.7 tgz文件

    spark资源 spark-2.3.2-bin-hadoop2.7 tgz文件

    spark-2.1.0-bin-without-hadoop版本的压缩包,直接下载到本地解压后即可使用

    在Ubuntu里安装spark,spark-2.1.0-bin-without-hadoop该版本直接下载到本地后解压即可使用。 Apache Spark 是一种用于大数据工作负载的分布式开源处理系统。它使用内存中缓存和优化的查询执行方式,可针对任何规模...

    spark-3.1.2.tgz & spark-3.1.2-bin-hadoop2.7.tgz.rar

    Spark-3.1.2.tgz和Spark-3.1.2-bin-hadoop2.7.tgz是两个不同格式的Spark发行版,分别以tar.gz和rar压缩格式提供。 1. Spark核心概念: - RDD(弹性分布式数据集):Spark的基础数据结构,是不可变、分区的数据集合...

    spark-assembly-1.5.2-hadoop2.6.0.jar

    《Spark编程核心组件:spark-assembly-1.5.2-hadoop2.6.0.jar详解》 在大数据处理领域,Spark以其高效、易用和灵活性脱颖而出,成为了许多开发者的首选框架。Spark-assembly-1.5.2-hadoop2.6.0.jar是Spark中的一个...

    spark-1.6.0-bin-hadoop2.6.tgz

    Spark-1.6.0-bin-hadoop2.6.tgz 是针对Linux系统的Spark安装包,包含了Spark 1.6.0版本以及与Hadoop 2.6版本兼容的构建。这个安装包为在Linux环境中搭建Spark集群提供了必要的组件和库。 **1. Spark基础知识** ...

    spark-2.4.7-bin-hadoop2.6.tgz

    在解压`spark-2.4.7-bin-hadoop2.6.tgz`后,您会得到一个名为`spark-2.4.7-bin-hadoop2.6`的目录,其中包括以下组件: - `bin/`:包含可执行文件,如`spark-submit`,`pyspark`,`spark-shell`等,用于启动和管理...

    spark-2.0.0-bin-hadoop2.6.tgz

    本资源是spark-2.0.0-bin-hadoop2.6.tgz百度网盘资源下载,本资源是spark-2.0.0-bin-hadoop2.6.tgz百度网盘资源下载

    apache-doris-spark-connector-2.3_2.11-1.0.1

    Spark Doris Connector(apache-doris-spark-connector-2.3_2.11-1.0.1-incubating-src.tar.gz) Spark Doris Connector Version:1.0.1 Spark Version:2.x Scala Version:2.11 Apache Doris是一个现代MPP分析...

    spark-3.1.3-bin-hadoop3.2.tgz

    在这个特定的压缩包"spark-3.1.3-bin-hadoop3.2.tgz"中,我们得到了Spark的3.1.3版本,它已经预编译为与Hadoop 3.2兼容。这个版本的Spark不仅提供了源码,还包含了预编译的二进制文件,使得在Linux环境下快速部署和...

    spark-2.4.7-bin-without-hadoop

    为了使用"spark-2.4.7-bin-without-hadoop",你需要首先下载并解压提供的spark-2.4.7-bin-without-hadoop.tgz文件。解压后,你可以找到包含Spark所有组件的目录结构,包括Spark的可执行文件、配置文件以及相关的库...

    spark-3.2.1-bin-hadoop2.7.tgz

    这个名为"spark-3.2.1-bin-hadoop2.7.tgz"的压缩包是Spark的一个特定版本,即3.2.1,与Hadoop 2.7版本兼容。在Linux环境下,这样的打包方式方便用户下载、安装和运行Spark。 Spark的核心设计理念是快速数据处理,...

    spark-3.2.0-bin-hadoop3.2.tgz

    Spark是Apache软件基金会下的一个开源大数据处理框架,其主要特点是高效、通用、易用和可扩展。...解压"spark-3.2.0-bin-hadoop3.2.tgz"后,用户可以直接在Hadoop 3.2环境下运行Spark,享受其带来的高性能计算体验。

    spark-3.0.0-bin-hadoop2.7.tgz

    Spark-3.0.0-bin-hadoop2.7.tgz 是Spark 3.0.0版本的预编译二进制包,其中包含了针对Hadoop 2.7版本的兼容性构建。这个版本的发布对于数据科学家和大数据工程师来说至关重要,因为它提供了许多性能优化和新功能。 1...

    spark-hive-thriftserver_2.11-2.1.3-SNAPSHOT-123456.jar

    spark-hive-thriftserver_2.11-2.1.spark-hive-thrift

    spark-3.0.0-bin-hadoop3.2

    在本场景中,我们讨论的是Spark的3.0.0版本,与Hadoop3.2相结合的二进制发行版——"spark-3.0.0-bin-hadoop3.2"。这个压缩包是为了在Windows操作系统下运行Spark而设计的,因此标签明确指出它是适用于Windows平台的...

    spark-streaming-kafka-0-8_2.11-2.4.0.jar

    spark-streaming-kafka-0-8_2.11-2.4.0.jar

    spark-3.2.4-bin-hadoop3.2-scala2.13 安装包

    在本安装包“spark-3.2.4-bin-hadoop3.2-scala2.13”中,包含了用于运行Spark的核心组件以及依赖的Hadoop版本和Scala编程语言支持。以下是对这些关键组成部分的详细解释: 1. **Spark**: Spark的核心在于它的弹性...

    spark-2.3.4-bin-hadoop2.7.tgz

    在本案例中,我们关注的是Spark的2.3.4版本,它预编译为与Hadoop 2.7兼容的版本,打包成"spark-2.3.4-bin-hadoop2.7.tgz"的压缩文件。这个压缩包包含了运行Spark所需的所有组件,包括Java库、Python库(pyspark)、...

Global site tag (gtag.js) - Google Analytics