Spark支持三种模式的部署:YARN、Standalone以及Mesos。本篇说到的Worker只有在Standalone模式下才有。Worker节点是Spark的工作节点,用于执行提交的作业。我们先从Worker节点的启动开始介绍。
Spark中Worker的启动有多种方式,但是最终调用的都是org.apache.spark.deploy.worker.Worker类,启动Worker节点的时候可以传很多的参数:内存、核、工作目录等。如果你不知道如何传递,没关系,help一下即可:
01 |
[wyp @iteblog spark]$ ./bin/spark- class org.apache.spark.deploy.worker.Worker -h
|
02 |
Spark assembly has been built with Hive, including Datanucleus jars on classpath |
03 |
Usage: Worker [options] <master> |
05 |
Master must be a URL of the form spark:
|
08 |
-c CORES, --cores CORES Number of cores to use
|
09 |
-m MEM, --memory MEM Amount of memory to use (e.g. 1000M, 2G)
|
10 |
-d DIR, --work-dir DIR Directory to run apps in ( default : SPARK_HOME/work)
|
11 |
-i HOST, --ip IP Hostname to listen on (deprecated, please use --host or -h)
|
12 |
-h HOST, --host HOST Hostname to listen on
|
13 |
-p PORT, --port PORT Port to listen on ( default : random)
|
14 |
--webui-port PORT Port for web UI ( default : 8081 )
|
从上面的输出我们可以看出Worker的启动支持多达7个参数!这样每个都这样输入岂不是很麻烦?其实,我们不用担心,Worker节点启动地时候将先读取conf/spark-env.sh里面的配置,这些参数配置的解析都是由Worker中的WorkerArguments类进行解析的。如果你没有设置内存,那么将会把Worker启动所在机器的所有内存(会预先留下1G内存给操作系统)分给Worker,具体的代码实现如下:
01 |
def inferDefaultMemory() : Int = {
|
02 |
val ibmVendor = System.getProperty( "java.vendor" ).contains( "IBM" )
|
05 |
val bean = ManagementFactory.getOperatingSystemMXBean()
|
07 |
val beanClass = Class.forName( "com.ibm.lang.management.OperatingSystemMXBean" )
|
08 |
val method = beanClass.getDeclaredMethod( "getTotalPhysicalMemory" )
|
09 |
totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024 ).toInt
|
11 |
val beanClass = Class.forName( "com.sun.management.OperatingSystemMXBean" )
|
12 |
val method = beanClass.getDeclaredMethod( "getTotalPhysicalMemorySize" )
|
13 |
totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024 ).toInt
|
16 |
case e : Exception = > {
|
18 |
System.out.println( "Failed to get total physical memory. Using " + totalMb + " MB" )
|
22 |
math.max(totalMb - 1024 , 512 )
|
同样,如果你没设置cores,那么Spark将会获取你机器的所有可用的核作为参数传进去。解析完参数之后,将运行preStart函数,进行一些启动相关的操作,比如判断是否已经向Master注册过,创建工作目录,启动Worker的WEB UI,向Master进行注册等操作,如下:
01 |
override def preStart() {
|
03 |
logInfo( "Starting Spark worker %s:%d with %d cores, %s RAM" .format(
|
04 |
host, port, cores, Utils.megabytesToString(memory)))
|
05 |
logInfo( "Spark home: " + sparkHome)
|
07 |
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
|
08 |
webUi = new WorkerWebUI( this , workDir, Some(webUiPort))
|
12 |
metricsSystem.registerSource(workerSource)
|
Worker向Master注册的超时时间为20秒,如果在这20秒内没有成功地向Master注册,那么将会进行重试,重试的次数为3,如过重试的次数大于等于3,那么将无法启动Worker,这时候,你就该看看你的网络环境或者你的Master是否存在问题了。
Worker在运行的过程中将会触发许多的事件, 比如:RegisteredWorker、SendHeartbeat、WorkDirCleanup以及MasterChanged等等,收到不同的事件,Worker进行不同的操作。比如,如果需要运行一个作业,Worker将会启动一个或多个ExecutorRunner,具体的代码可参见receiveWithLogging函数:
01 |
override def receiveWithLogging = {
|
02 |
case RegisteredWorker(masterUrl, masterWebUiUrl) = >
|
05 |
case WorkDirCleanup = >
|
07 |
case MasterChanged(masterUrl, masterWebUiUrl) = >
|
11 |
case RegisterWorkerFailed(message) = >
|
13 |
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores _ , memory _ ) = >
|
15 |
case ExecutorStateChanged(appId, execId, state, message, exitStatus) = >
|
17 |
case KillExecutor(masterUrl, appId, execId) = >
|
19 |
case LaunchDriver(driverId, driverDesc) = > {
|
22 |
case KillDriver(driverId) = > {
|
24 |
case DriverStateChanged(driverId, state, exception) = > {
|
27 |
case x : DisassociatedEvent if x.remoteAddress == masterAddress = >
|
29 |
case RequestWorkerState = > {
|
上面的代码是经过处理的,其实receiveWithLogging 方法是从ActorLogReceive继承下来的。
当Worker节点Stop的时候,将会执行postStop函数,如下:
1 |
override def postStop() {
|
3 |
registrationRetryTimer.foreach( _ .cancel())
|
4 |
executors.values.foreach( _ .kill())
|
5 |
drivers.values.foreach( _ .kill())
|
杀掉所有还未执行完的executors、drivers等,操作。这方法也是从Actor继承下来的。
本文只是简单地介绍了Worker节点的一些环境,启动等相关的代码,关于它如何和Master通信;如何启动Executor;如何启动Driver都没有涉及,如果你想更好地了解Worker的运行情况,请参见Worker相关的代码吧。
http://www.iteblog.com/archives/1145
分享到:
相关推荐
8. Spark源码分析:书中可能涵盖了Spark源码的深度分析,帮助读者理解其内部工作机制,如调度系统、存储层次、容错机制等,这对于优化Spark应用和解决性能问题非常有价值。 9. 性能优化:Spark的性能优化是学习的...
源码分析可以了解DataFrame如何与RDD互操作,以及 Catalyst优化器的工作原理。 总结,Spark源码的学习是一个深入理解大数据处理流程和技术细节的过程。通过源码,我们可以了解到Spark如何高效地调度任务,如何处理...
Spark源码分析** 通过阅读Spark源码,可以深入了解任务调度、内存管理、容错机制等内部工作原理,这对于调优和解决实际问题非常有帮助。 **10. 性能调优** Spark的性能调优涉及多个方面,包括配置参数调整(如...
《Spark源码分析3——驱动器、主节点与执行器之间的连接》 在Spark的分布式计算框架中,驱动器(Driver)、主节点(Master)和执行器(Executor)是核心组件,它们之间的通信和协作构成了Spark作业执行的基础。本文将深入...
《深入理解Spark核心思想与源码分析》是耿嘉安撰写的一本专著,全面而深入地探讨了Apache Spark这一大数据处理框架的核心理念和技术细节。这本书不仅覆盖了Spark的基础概念,还深入到了源码层面,为读者揭示了Spark...
### Spark Core 源码分析之部署方式 #### 前言 Apache Spark 是一个用于大规模数据处理的开源计算系统,其核心模块 Spark Core 提供了基础的并行计算框架和分布式部署方式的支持。本文主要关注 Spark 的部署方式,...
在深入探讨Apache Spark源码之前,我们先了解一些基础知识。Apache Spark是一个用于大规模数据处理的开源集群计算系统,它提供了统一的框架来处理批处理、流处理以及机器学习等多种场景。Spark的核心组件是`...
### Spark源码解析:Master与Worker机制 #### Spark概述及特点 Spark是一个高效的数据处理框架,它由加州大学伯克利分校的AMP实验室研发。该框架支持多种编程语言(包括Java、Scala、Python和R等),使开发者可以...
源码分析对于深入理解Spark的工作原理和进行二次开发至关重要。 首先,Java目录是Spark源码的核心部分,它包含了Spark的大部分核心组件和功能的实现。以下是一些主要的子目录及其相关的知识点: 1. **core**:这是...
源码分析可以帮助我们理解这些算法如何并行化和优化,以及如何利用Spark的内存计算优势。 GraphX提供了图处理API,允许用户定义图的顶点和边,以及对图进行各种操作,如PageRank和最短路径算法。源码分析将揭示...
六、Spark源码分析 1. RDD创建:深入源码,我们可以看到如何通过`sparkContext.parallelize`或`sparkContext.textFile`创建RDD。 2. Action与Transformation:研究`count`、`map`等操作的实现,理解它们如何触发...
本次源码分析基于**Spark 1.2版本**,并聚焦于**standalone模式**,即独立部署模式。此模式下,Spark服务完全自包含,无需依赖其他资源管理系统。它是Spark YARN和Mesos模式的基础。 #### 四、Master与Worker的启动...
《深入理解Spark:核心思想与源码分析》是一本针对Apache Spark进行深度解析的专业书籍,旨在帮助读者透彻掌握Spark的核心理念与实现机制。Spark作为大数据处理领域的重要框架,其高性能、易用性和弹性分布式计算的...
Spark 0.6.0源码分析 Spark是Apache软件基金会下的一个开源大数据处理框架,以其高效的内存计算和弹性分布式数据集(Resilient Distributed Datasets, RDDs)为核心特性,广泛应用于数据处理、机器学习和图形计算等...
Spark源码分析 各个组件介绍 后面补充。。。。 StandAlone模式 在StandAlone模式的start-all的shell启动脚本下,在当前机器执行了JAVA_HOME/bin/java -cp ….Master和在配置的slave的机器中执行 JAVA_HOME/bin/java ...
本文旨在通过对Apache Spark源码的初步解读,帮助读者建立起对Spark核心概念和技术细节的理解。 #### 二、基本概念 ##### 1. RDD(Resilient Distributed Dataset) - **定义**:弹性分布式数据集(Resilient ...
《Spark源码探索之旅》 Spark,作为大数据处理领域中的明星框架,因其高效、易用和可扩展性而备受赞誉。Spark源码的探索对于理解其内部工作机制、优化性能以及进行二次开发至关重要。本文将围绕Spark的核心概念和...
`Worker原理剖析与源码分析.html`(9)会介绍Spark集群中的Worker节点,它是任务执行的物理实体,负责启动executor进程并管理其生命周期。源码分析将深入到Worker如何接收和执行任务,以及如何报告状态。 最后,`...