`
m635674608
  • 浏览: 5029244 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

Spark源码分析之Worker

 
阅读更多

 Spark支持三种模式的部署:YARN、Standalone以及Mesos。本篇说到的Worker只有在Standalone模式下才有。Worker节点是Spark的工作节点,用于执行提交的作业。我们先从Worker节点的启动开始介绍。
  Spark中Worker的启动有多种方式,但是最终调用的都是org.apache.spark.deploy.worker.Worker类,启动Worker节点的时候可以传很多的参数:内存、核、工作目录等。如果你不知道如何传递,没关系,help一下即可:

01 [wyp@iteblog spark]$ ./bin/spark-class org.apache.spark.deploy.worker.Worker -h
02 Spark assembly has been built with Hive, including Datanucleus jars on classpath
03 Usage: Worker [options] <master>
04  
05 Master must be a URL of the form spark://hostname:port
06  
07 Options:
08   -c CORES, --cores CORES  Number of cores to use
09   -m MEM, --memory MEM     Amount of memory to use (e.g. 1000M, 2G)
10   -d DIR, --work-dir DIR   Directory to run apps in (default: SPARK_HOME/work)
11   -i HOST, --ip IP         Hostname to listen on (deprecated, please use --host or -h)
12   -h HOST, --host HOST     Hostname to listen on
13   -p PORT, --port PORT     Port to listen on (default: random)
14   --webui-port PORT        Port for web UI (default: 8081)

  从上面的输出我们可以看出Worker的启动支持多达7个参数!这样每个都这样输入岂不是很麻烦?其实,我们不用担心,Worker节点启动地时候将先读取conf/spark-env.sh里面的配置,这些参数配置的解析都是由Worker中的WorkerArguments类进行解析的。如果你没有设置内存,那么将会把Worker启动所在机器的所有内存(会预先留下1G内存给操作系统)分给Worker,具体的代码实现如下:

01 def inferDefaultMemory(): Int = {
02     val ibmVendor = System.getProperty("java.vendor").contains("IBM")
03     var totalMb = 0
04     try {
05       val bean = ManagementFactory.getOperatingSystemMXBean()
06       if (ibmVendor) {
07         val beanClass = Class.forName("com.ibm.lang.management.OperatingSystemMXBean")
08         val method = beanClass.getDeclaredMethod("getTotalPhysicalMemory")
09         totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024).toInt
10       } else {
11         val beanClass = Class.forName("com.sun.management.OperatingSystemMXBean")
12         val method = beanClass.getDeclaredMethod("getTotalPhysicalMemorySize")
13         totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024).toInt
14       }
15     } catch {
16       case e: Exception => {
17         totalMb = 2*1024
18         System.out.println("Failed to get total physical memory. Using " + totalMb + " MB")
19       }
20     }
21     // Leave out 1 GB for the operating system, but don't return a negative memory size
22     math.max(totalMb - 1024, 512)
23   }

  同样,如果你没设置cores,那么Spark将会获取你机器的所有可用的核作为参数传进去。解析完参数之后,将运行preStart函数,进行一些启动相关的操作,比如判断是否已经向Master注册过,创建工作目录,启动Worker的WEB UI,向Master进行注册等操作,如下:

01 override def preStart() {
02   assert(!registered)
03   logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
04     host, port, cores, Utils.megabytesToString(memory)))
05   logInfo("Spark home: " + sparkHome)
06   createWorkDir()
07   context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
08   webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
09   webUi.bind()
10   registerWithMaster()
11  
12   metricsSystem.registerSource(workerSource)
13   metricsSystem.start()
14 }

  Worker向Master注册的超时时间为20秒,如果在这20秒内没有成功地向Master注册,那么将会进行重试,重试的次数为3,如过重试的次数大于等于3,那么将无法启动Worker,这时候,你就该看看你的网络环境或者你的Master是否存在问题了。
Worker在运行的过程中将会触发许多的事件, 比如:RegisteredWorker、SendHeartbeat、WorkDirCleanup以及MasterChanged等等,收到不同的事件,Worker进行不同的操作。比如,如果需要运行一个作业,Worker将会启动一个或多个ExecutorRunner,具体的代码可参见receiveWithLogging函数:

01 override def receiveWithLogging = {
02     case RegisteredWorker(masterUrl, masterWebUiUrl) =>
03  
04     case SendHeartbeat =>
05     case WorkDirCleanup =>
06  
07     case MasterChanged(masterUrl, masterWebUiUrl) =>
08  
09     case Heartbeat =>
10      
11     case RegisterWorkerFailed(message) =>
12      
13     case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
14       
15     case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
16        
17     case KillExecutor(masterUrl, appId, execId) =>
18        
19     case LaunchDriver(driverId, driverDesc) => {
20       
21  
22     case KillDriver(driverId) => {
23  
24     case DriverStateChanged(driverId, state, exception) => {
25       
26  
27     case x: DisassociatedEvent if x.remoteAddress == masterAddress =>
28     
29     case RequestWorkerState => {
30   }

  上面的代码是经过处理的,其实receiveWithLogging 方法是从ActorLogReceive继承下来的。
  当Worker节点Stop的时候,将会执行postStop函数,如下:

1 override def postStop() {
2   metricsSystem.report()
3   registrationRetryTimer.foreach(_.cancel())
4   executors.values.foreach(_.kill())
5   drivers.values.foreach(_.kill())
6   webUi.stop()
7   metricsSystem.stop()
8 }

  杀掉所有还未执行完的executors、drivers等,操作。这方法也是从Actor继承下来的。
  本文只是简单地介绍了Worker节点的一些环境,启动等相关的代码,关于它如何和Master通信;如何启动Executor;如何启动Driver都没有涉及,如果你想更好地了解Worker的运行情况,请参见Worker相关的代码吧。

 

http://www.iteblog.com/archives/1145

分享到:
评论

相关推荐

    深入理解spark:核心思想与源码分析 高清版本

    8. Spark源码分析:书中可能涵盖了Spark源码的深度分析,帮助读者理解其内部工作机制,如调度系统、存储层次、容错机制等,这对于优化Spark应用和解决性能问题非常有价值。 9. 性能优化:Spark的性能优化是学习的...

    大数据Spark源码

    源码分析可以了解DataFrame如何与RDD互操作,以及 Catalyst优化器的工作原理。 总结,Spark源码的学习是一个深入理解大数据处理流程和技术细节的过程。通过源码,我们可以了解到Spark如何高效地调度任务,如何处理...

    深入理解Spark核心思想与源码分析

    Spark源码分析** 通过阅读Spark源码,可以深入了解任务调度、内存管理、容错机制等内部工作原理,这对于调优和解决实际问题非常有帮助。 **10. 性能调优** Spark的性能调优涉及多个方面,包括配置参数调整(如...

    Spark源码分析3-The connect between driver,master and excutor

    《Spark源码分析3——驱动器、主节点与执行器之间的连接》 在Spark的分布式计算框架中,驱动器(Driver)、主节点(Master)和执行器(Executor)是核心组件,它们之间的通信和协作构成了Spark作业执行的基础。本文将深入...

    《深入理解Spark 核心思想与源码分析》耿嘉安 完整版带书签

    《深入理解Spark核心思想与源码分析》是耿嘉安撰写的一本专著,全面而深入地探讨了Apache Spark这一大数据处理框架的核心理念和技术细节。这本书不仅覆盖了Spark的基础概念,还深入到了源码层面,为读者揭示了Spark...

    SparkCore源码阅读

    ### Spark Core 源码分析之部署方式 #### 前言 Apache Spark 是一个用于大规模数据处理的开源计算系统,其核心模块 Spark Core 提供了基础的并行计算框架和分布式部署方式的支持。本文主要关注 Spark 的部署方式,...

    Apache Spark源码读解

    在深入探讨Apache Spark源码之前,我们先了解一些基础知识。Apache Spark是一个用于大规模数据处理的开源集群计算系统,它提供了统一的框架来处理批处理、流处理以及机器学习等多种场景。Spark的核心组件是`...

    spark源码阅读笔记

    ### Spark源码解析:Master与Worker机制 #### Spark概述及特点 Spark是一个高效的数据处理框架,它由加州大学伯克利分校的AMP实验室研发。该框架支持多种编程语言(包括Java、Scala、Python和R等),使开发者可以...

    Spark2.6.3源码

    源码分析对于深入理解Spark的工作原理和进行二次开发至关重要。 首先,Java目录是Spark源码的核心部分,它包含了Spark的大部分核心组件和功能的实现。以下是一些主要的子目录及其相关的知识点: 1. **core**:这是...

    Apache Spark源码剖析

    源码分析可以帮助我们理解这些算法如何并行化和优化,以及如何利用Spark的内存计算优势。 GraphX提供了图处理API,允许用户定义图的顶点和边,以及对图进行各种操作,如PageRank和最短路径算法。源码分析将揭示...

    带你深入理解Spark核心思想走进Sprak的源码分析

    六、Spark源码分析 1. RDD创建:深入源码,我们可以看到如何通过`sparkContext.parallelize`或`sparkContext.textFile`创建RDD。 2. Action与Transformation:研究`count`、`map`等操作的实现,理解它们如何触发...

    spark源码阅读笔记(详)

    本次源码分析基于**Spark 1.2版本**,并聚焦于**standalone模式**,即独立部署模式。此模式下,Spark服务完全自包含,无需依赖其他资源管理系统。它是Spark YARN和Mesos模式的基础。 #### 四、Master与Worker的启动...

    深入理解Spark:核心思想与源码分析

    《深入理解Spark:核心思想与源码分析》是一本针对Apache Spark进行深度解析的专业书籍,旨在帮助读者透彻掌握Spark的核心理念与实现机制。Spark作为大数据处理领域的重要框架,其高性能、易用性和弹性分布式计算的...

    spark0.6源码

    Spark 0.6.0源码分析 Spark是Apache软件基金会下的一个开源大数据处理框架,以其高效的内存计算和弹性分布式数据集(Resilient Distributed Datasets, RDDs)为核心特性,广泛应用于数据处理、机器学习和图形计算等...

    15.Spark源码分析

    Spark源码分析 各个组件介绍 后面补充。。。。 StandAlone模式 在StandAlone模式的start-all的shell启动脚本下,在当前机器执行了JAVA_HOME/bin/java -cp ….Master和在配置的slave的机器中执行 JAVA_HOME/bin/java ...

    Apache_Spark源码走读

    本文旨在通过对Apache Spark源码的初步解读,帮助读者建立起对Spark核心概念和技术细节的理解。 #### 二、基本概念 ##### 1. RDD(Resilient Distributed Dataset) - **定义**:弹性分布式数据集(Resilient ...

    Spark源码倒腾

    《Spark源码探索之旅》 Spark,作为大数据处理领域中的明星框架,因其高效、易用和可扩展性而备受赞誉。Spark源码的探索对于理解其内部工作机制、优化性能以及进行二次开发至关重要。本文将围绕Spark的核心概念和...

    Spark2.2版本内核源码深度剖析.zip

    `Worker原理剖析与源码分析.html`(9)会介绍Spark集群中的Worker节点,它是任务执行的物理实体,负责启动executor进程并管理其生命周期。源码分析将深入到Worker如何接收和执行任务,以及如何报告状态。 最后,`...

Global site tag (gtag.js) - Google Analytics