`

Spark源码分析6-Worker

 
阅读更多

Worker 主要负责管理excutor和driver,并向master报告excutor和driver的状态

Worker的启动

类似于master,创建了worker的actor

 

private[spark] object Worker {

  def main(argStrings: Array[String]) {
    val args = new WorkerArguments(argStrings)
    val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
      args.memory, args.masters, args.workDir)
    actorSystem.awaitTermination()
  }

  def startSystemAndActor(host: String, port: Int, webUiPort: Int, cores: Int, memory: Int,
      masterUrls: Array[String], workDir: String, workerNumber: Option[Int] = None)
      : (ActorSystem, Int) =
  {
    // The LocalSparkCluster runs multiple local sparkWorkerX actor systems
    val conf = new SparkConf
    val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
    val actorName = "Worker"
    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,
      conf = conf)
    actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
      masterUrls, systemName, actorName,  workDir, conf), name = actorName)
    (actorSystem, boundPort)
  }

 worker actor 启动的时候调用PreStart函数,创建了webUI system,调用metricsSystem注册了workerSource,并向master注册worker

  override def preStart() {
    assert(!registered)
    logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
      host, port, cores, Utils.megabytesToString(memory)))
    sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
    logInfo("Spark home: " + sparkHome)
    createWorkDir()
    webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
    webUi.start()
    registerWithMaster()

    metricsSystem.registerSource(workerSource)
    metricsSystem.start()
  }

 worker需要处理的事件

override def receive = {
    case RegisteredWorker(masterUrl, masterWebUiUrl) =>
      logInfo("Successfully registered with master " + masterUrl)
      registered = true
      changeMaster(masterUrl, masterWebUiUrl)
      context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)

    case SendHeartbeat =>
      masterLock.synchronized {
        if (connected) { master ! Heartbeat(workerId) }
      }

    case MasterChanged(masterUrl, masterWebUiUrl) =>
      logInfo("Master has changed, new master is at " + masterUrl)
      changeMaster(masterUrl, masterWebUiUrl)

      val execs = executors.values.
        map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
      sender ! WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq)

    case Heartbeat =>
      logInfo(s"Received heartbeat from driver ${sender.path}")

    case RegisterWorkerFailed(message) =>
      if (!registered) {
        logError("Worker registration failed: " + message)
        System.exit(1)
      }

    case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, execSparkHome_) =>
      if (masterUrl != activeMasterUrl) {
        logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
      } else {
        logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
        // TODO (pwendell): We shuld make sparkHome an Option[String] in
        // ApplicationDescription to be more explicit about this.
        val effectiveSparkHome = Option(execSparkHome_).getOrElse(sparkHome.getAbsolutePath)
        val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
          self, workerId, host, new File(effectiveSparkHome), workDir, akkaUrl, ExecutorState.RUNNING)
        executors(appId + "/" + execId) = manager
        manager.start()
        coresUsed += cores_
        memoryUsed += memory_
        masterLock.synchronized {
          master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
        }
      }

    case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
      masterLock.synchronized {
        master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
      }
      val fullId = appId + "/" + execId
      if (ExecutorState.isFinished(state)) {
        val executor = executors(fullId)
        logInfo("Executor " + fullId + " finished with state " + state +
          message.map(" message " + _).getOrElse("") +
          exitStatus.map(" exitStatus " + _).getOrElse(""))
        executors -= fullId
        finishedExecutors(fullId) = executor
        coresUsed -= executor.cores
        memoryUsed -= executor.memory
      }

    case KillExecutor(masterUrl, appId, execId) =>
      if (masterUrl != activeMasterUrl) {
        logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor " + execId)
      } else {
        val fullId = appId + "/" + execId
        executors.get(fullId) match {
          case Some(executor) =>
            logInfo("Asked to kill executor " + fullId)
            executor.kill()
          case None =>
            logInfo("Asked to kill unknown executor " + fullId)
        }
      }

    case LaunchDriver(driverId, driverDesc) => {
      logInfo(s"Asked to launch driver $driverId")
      val driver = new DriverRunner(driverId, workDir, sparkHome, driverDesc, self, akkaUrl)
      drivers(driverId) = driver
      driver.start()

      coresUsed += driverDesc.cores
      memoryUsed += driverDesc.mem
    }

    case KillDriver(driverId) => {
      logInfo(s"Asked to kill driver $driverId")
      drivers.get(driverId) match {
        case Some(runner) =>
          runner.kill()
        case None =>
          logError(s"Asked to kill unknown driver $driverId")
      }
    }

    case DriverStateChanged(driverId, state, exception) => {
      state match {
        case DriverState.ERROR =>
          logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
        case DriverState.FINISHED =>
          logInfo(s"Driver $driverId exited successfully")
        case DriverState.KILLED =>
          logInfo(s"Driver $driverId was killed by user")
      }
      masterLock.synchronized {
        master ! DriverStateChanged(driverId, state, exception)
      }
      val driver = drivers.remove(driverId).get
      finishedDrivers(driverId) = driver
      memoryUsed -= driver.driverDesc.mem
      coresUsed -= driver.driverDesc.cores
    }

    case x: DisassociatedEvent if x.remoteAddress == masterAddress =>
      logInfo(s"$x Disassociated !")
      masterDisconnected()

    case RequestWorkerState => {
      sender ! WorkerStateResponse(host, port, workerId, executors.values.toList,
        finishedExecutors.values.toList, drivers.values.toList,
        finishedDrivers.values.toList, activeMasterUrl, cores, memory,
        coresUsed, memoryUsed, activeMasterWebUiUrl)
    }
  }

 

 

 

分享到:
评论

相关推荐

    Spark源码分析3-The connect between driver,master and excutor

    《Spark源码分析3——驱动器、主节点与执行器之间的连接》 在Spark的分布式计算框架中,驱动器(Driver)、主节点(Master)和执行器(Executor)是核心组件,它们之间的通信和协作构成了Spark作业执行的基础。本文将深入...

    Apache Spark源码读解

    在深入探讨Apache Spark源码之前,我们先了解一些基础知识。Apache Spark是一个用于大规模数据处理的开源集群计算系统,它提供了统一的框架来处理批处理、流处理以及机器学习等多种场景。Spark的核心组件是`...

    大数据Spark源码

    源码分析可以了解DataFrame如何与RDD互操作,以及 Catalyst优化器的工作原理。 总结,Spark源码的学习是一个深入理解大数据处理流程和技术细节的过程。通过源码,我们可以了解到Spark如何高效地调度任务,如何处理...

    深入理解spark:核心思想与源码分析 高清版本

    8. Spark源码分析:书中可能涵盖了Spark源码的深度分析,帮助读者理解其内部工作机制,如调度系统、存储层次、容错机制等,这对于优化Spark应用和解决性能问题非常有价值。 9. 性能优化:Spark的性能优化是学习的...

    深入理解Spark核心思想与源码分析

    《深入理解Spark核心思想与源码分析》这本书深入剖析了Apache Spark这一大数据处理框架的核心设计理念与实现机制。Spark作为分布式计算的重要工具,以其高效的内存计算、易用的API和广泛的应用场景深受业界青睐。...

    《深入理解Spark 核心思想与源码分析》耿嘉安 完整版带书签

    《深入理解Spark核心思想与源码分析》是耿嘉安撰写的一本专著,全面而深入地探讨了Apache Spark这一大数据处理框架的核心理念和技术细节。这本书不仅覆盖了Spark的基础概念,还深入到了源码层面,为读者揭示了Spark...

    SparkCore源码阅读

    ### Spark Core 源码分析之部署方式 #### 前言 Apache Spark 是一个用于大规模数据处理的开源计算系统,其核心模块 Spark Core 提供了基础的并行计算框架和分布式部署方式的支持。本文主要关注 Spark 的部署方式,...

    spark源码阅读笔记

    ### Spark源码解析:Master与Worker机制 #### Spark概述及特点 Spark是一个高效的数据处理框架,它由加州大学伯克利分校的AMP实验室研发。该框架支持多种编程语言(包括Java、Scala、Python和R等),使开发者可以...

    Spark2.6.3源码

    源码分析对于深入理解Spark的工作原理和进行二次开发至关重要。 首先,Java目录是Spark源码的核心部分,它包含了Spark的大部分核心组件和功能的实现。以下是一些主要的子目录及其相关的知识点: 1. **core**:这是...

    Apache Spark源码剖析

    源码分析可以帮助我们理解这些算法如何并行化和优化,以及如何利用Spark的内存计算优势。 GraphX提供了图处理API,允许用户定义图的顶点和边,以及对图进行各种操作,如PageRank和最短路径算法。源码分析将揭示...

    spark源码阅读笔记(详)

    本次源码分析基于**Spark 1.2版本**,并聚焦于**standalone模式**,即独立部署模式。此模式下,Spark服务完全自包含,无需依赖其他资源管理系统。它是Spark YARN和Mesos模式的基础。 #### 四、Master与Worker的启动...

    Spark-内核源码解析.docx

    为了深入了解 Spark 的内部机制,我们需要对其内核源码进行深入分析。 Application/App 在 Spark 中,Application 指的是用户编写的 Spark 应用程序/代码,包含了 Driver 功能代码和分布在集群中多个节点上运行的 ...

    spark研究分析&restful架构

    - 启动 Spark Master 和 Worker 节点。 - 验证集群状态。 **示例: 计算 Pi** 在 CentOS 上使用 Spark 进行 Pi 的计算是一个经典的入门示例。可以通过编写简单的程序来实现,该程序使用 Monte Carlo 方法估计 Pi ...

    带你深入理解Spark核心思想走进Sprak的源码分析

    六、Spark源码分析 1. RDD创建:深入源码,我们可以看到如何通过`sparkContext.parallelize`或`sparkContext.textFile`创建RDD。 2. Action与Transformation:研究`count`、`map`等操作的实现,理解它们如何触发...

    Apache_Spark源码走读

    本文旨在通过对Apache Spark源码的初步解读,帮助读者建立起对Spark核心概念和技术细节的理解。 #### 二、基本概念 ##### 1. RDD(Resilient Distributed Dataset) - **定义**:弹性分布式数据集(Resilient ...

    【讲义-第10期Spark公益大讲堂】Spark on Yarn-.pdf

    对于深入理解Spark on Yarn的工作机制,需要对其实现源码进行详细分析。这主要包括以下几个方面: - **ApplicationMaster的实现**:了解ApplicationMaster是如何与ResourceManager和NodeManager通信的。 - **资源...

    深入理解Spark:核心思想与源码分析

    《深入理解Spark:核心思想与源码分析》是一本针对Apache Spark进行深度解析的专业书籍,旨在帮助读者透彻掌握Spark的核心理念与实现机制。Spark作为大数据处理领域的重要框架,其高性能、易用性和弹性分布式计算的...

    15.Spark源码分析

    Spark源码分析 各个组件介绍 后面补充。。。。 StandAlone模式 在StandAlone模式的start-all的shell启动脚本下,在当前机器执行了JAVA_HOME/bin/java -cp ….Master和在配置的slave的机器中执行 JAVA_HOME/bin/java ...

Global site tag (gtag.js) - Google Analytics