原创,转载请注明出处 http://baishuo491.iteye.com/blog/1994026 ,作者邮箱:vc_java@hotmail.com,新浪微博:爱看历史的码农--白硕
在sparkContext的建立过程中(更细致的说是clientActor的preStart回调函数中),会向master发送RegisterApplication消息
master ! RegisterApplication(appDescription)
当master收到RegisterApplication请求后:
app = addApplication(description, sender)
.....
waitingApps += app
.....
schedule()
通过传入的(appDescription)和发送者,创建一个addApplication,然后把app加入到等待队列waitingApps中,之后再调用schedule函数进行调度
再来看看schedule的流程:
当等待队列waitingApps里有数据的时候,对waitingApps里的每个元素app做如下操作:
从已经注册的works里面,选择合适的works列表usableWorkers
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE).filter(canUse(app, _)).sortBy(_.coresFree).reverse
从usableWorkers的空闲cpu中,选择合适数量的cpu,为当前app进行分配
这之后调用launchExecutor,并把app的状态设置为running
launchExecutor(usableWorkers(pos), exec, app.desc.sparkHome);
app.state = ApplicationState.RUNNING
再来看看launchExecutor这个函数
先在每个传入的workerInfo参数里面,记录当前的app,和已经消耗的cpu和内存
接着向worker发送LaunchExecutor消息,向client发送ExecutorAdded消息
worker.addExecutor(exec)
worker.actor ! LaunchExecutor(
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome)
exec.application.driver ! ExecutorAdded(
exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
我们再来看看worker收到LaunchExecutor后的执行步骤:
创建一个实际执行任务的ExecutorRunner并启动它
更新已经使用的cpu和内存数量
然后向master发送ExecutorStateChanged消息
val manager = new ExecutorRunner(
appId, execId, appDesc, cores_, memory_, self, workerId, host, new File(execSparkHome_), workDir)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
memoryUsed += memory_
master ! ExecutorStateChanged(appId, execId, ExecutorState.RUNNING, None, None)
ExecutorRunner的start的函数,主要任务就是启动一个java后台线程,这个线程执行fetchAndRunExecutor函数,这个函数的主要流程如下:
拼接出一个字符串,其内容是用JAVA_HOME\bin\java命令,执行一个类StandaloneExecutorBackend
这个执行过程不在当前的jvm下执行。而是通过ProcessBuilder新建了一个jvm进程,单独执行的,相当于就是启动了一个StandaloneExecutorBackend实例
val command = buildCommandSeq()
val builder = new ProcessBuilder(command: _*).directory(executorDir)
val env = builder.environment()
for ((key, value) <- appDesc.command.environment) {
env.put(key, value)
}
......
process = builder.start()
看看StandaloneExecutorBackend做了什么:
创建了一个actor,在这个actor的在构造过程中,就是向客户端的driver发送RegisterExecutor(executorId, hostPort, cores)消息,然后开始等待消息
上面发送的那个消息,会被driver端的StandaloneSchedulerBackend接受到
它回复一个
RegisteredExecutor(sparkProperties)消息,在修改了一系列的资源记录后,调用makeOffers()函数
makeOffers()的作用是在拼装好要执行的任务列表tasks之后,把一个重要的变量hasLaunchedTask设置成true(借助scheduler.resourceOffers函数)
这之后,向StandaloneExecutorBackend发送
LaunchTask(task)消息
这里可以看到
RegisteredExecutor和
LaunchTask消息都是发送给了StandaloneExecutorBackend,我们来看看它收到消息后的动作
在StandaloneExecutorBackend收到
RegisteredExecutor(sparkProperties)消息后,会创建一个excutor变量,里面有创建了一个ThreadPoolExecutor线程池,名叫threadPool
当它再收到
LaunchTask(task)消息之后,就通过executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)函数,调用刚刚创建的threadPool线程池,执行传入的task任务
threadPool.execute(new TaskRunner(context, taskId, serializedTask))
而TaskRunner就是这个任务真正被执行的地方
- 大小: 219.2 KB
分享到:
相关推荐
1. **Spark Core**:这是Spark的基础,提供了分布式任务调度、内存管理、错误恢复和与其他存储系统的接口。 2. **Spark SQL**:用于处理结构化数据,它支持SQL查询并通过DataFrame API提供了与多种数据源的交互。 3....
5. **启动Spark**:启动Spark的Master和Worker节点,如果使用的是standalone模式,可以通过`sbin/start-all.sh`命令启动。 6. **测试运行**:使用简单的Spark应用,如WordCount,验证Spark是否安装和配置成功。 在...
5. **运行模式**:Spark支持多种运行模式,包括本地模式(方便开发测试)、standalone模式(Spark自带的集群管理器)、YARN模式(使用Hadoop的资源管理器)和Mesos模式(Mesos集群管理器)。在Hadoop 2.6环境中,...
Spark支持多种部署模式,包括本地模式、Standalone模式、YARN模式和Mesos模式,可以根据实际需求选择合适的部署方式。 8. **Spark Job调度**: Spark使用FIFO和Fair Scheduler,根据作业优先级和资源需求进行任务...
此外,还需要设置SPARK_HOME环境变量,并在启动时指定master节点,例如本地模式(`--master local[n]`)、standalone模式(`--master spark://<master_ip>:<port>`)或YARN模式(`--master yarn-client`或`--master ...
Spark支持四种运行模式:本地模式(用于测试)、集群模式(如YARN、Mesos或standalone)、Spark on Kubernetes以及云服务提供商的托管Spark。 6. **编程接口**: Spark提供了Python(PySpark)、Java、Scala和R的...
用户可以将"spark-2.3.3-bin-hadoop2.6"解压后在本地模式、集群模式(如YARN、Mesos或standalone)下运行Spark。配置文件如`conf/spark-defaults.conf`用于设置各种参数,如内存分配、日志级别等。 5. **编程接口*...
同时,可能需要配置其他的资源管理器,如YARN或Mesos,或者使用Standalone模式自建Spark集群。 Spark的核心组件主要包括:Spark Core、Spark SQL、Spark Streaming、MLlib(机器学习库)和GraphX(图计算)。Spark ...
10. **资源管理和调度**:Spark使用自己的资源管理器(称为standalone模式),也可以通过YARN或Mesos等外部资源管理系统运行。 11. **性能优化**:Spark利用内存计算,将中间结果存储在内存中,减少磁盘I/O,提高...
standalone模式下executor调度策略 Spark Sql源码阅读 Spark Sql源码阅读 hive on spark调优 Spark SQL 多维聚合分析应用案例 Spark Streaming源码阅读 动态发现新增分区 Dstream join 操作和 RDD join 操作的...
8. **Spark部署**:Spark可以运行在多种模式下,如本地模式、standalone模式、Mesos、YARN或Kubernetes。理解这些部署选项及其优缺点对于实际操作至关重要。 9. **案例研究**:书中可能包含多个真实世界的应用场景...
Spark支持两种主节点模式:standalone模式下的Master节点和Mesos或YARN资源管理器。当驱动器向Master注册时,Master会为应用程序分配资源,确保作业的执行。 3. **执行器(Executor)**: 执行器是运行在工作节点...
"Spark源码-Master"通常指的是Spark项目的主要控制节点,即集群管理器的源代码,它负责调度任务、监控资源分配以及管理Spark作业的生命周期。 在Spark中,Master节点是集群的中心管理者,它维护着整个集群的状态,...
5. 使用`spark-submit`脚本提交Spark作业到YARN上运行,或者在本地模式或standalone模式下启动Spark Shell进行交互式测试。 Spark的使用场景广泛,涵盖了数据批处理、实时流处理、机器学习和图计算等。在大数据领域...
6. **Spark部署模式**:Spark支持本地模式、集群模式(standalone、Mesos、YARN)以及云部署。用户可以根据自己的需求选择合适的部署策略。 7. **Spark性能优化**:包括调整Executor数量、内存分配、shuffle操作的...
4. 启动Spark集群,可以是standalone模式、Mesos模式或YARN模式。 5. 编写Spark应用程序,使用Scala、Java、Python或R语言。 6. 使用`spark-submit`工具提交你的Spark应用到集群上执行。 通过Spark,用户可以快速...
在 Standalone 模式下,Spark 自带了简单的调度器,而在集群环境中,它可以利用资源管理器来更有效地分配任务。 7. **容错机制**:Spark 通过 lineage(血统)来实现容错,如果某部分数据丢失,可以通过重新计算...
- **Standalone模式**:Spark自带的集群管理模式。 - **YARN模式**:集成于Hadoop YARN资源管理器。 - **Mesos模式**:可在Mesos集群上运行。 5. **Spark优化** - **Tuning参数**:如executor内存、core数量等...
Spark可以运行在多种模式下,包括本地模式、standalone模式、Mesos、YARN和Kubernetes,以适应不同的集群环境。 7. **Spark性能优化** - **内存管理**:通过配置缓存策略和调整executor内存分配,可以提高数据...
在 Spark Standalone 模式下,Master 负责接收任务并将任务分发给 Worker 节点。具体操作如下: - 下载 wordcount 示例代码。 - 使用 Maven 打包项目并上传至 Spark 集群。 - 使用以下命令提交作业: ```bash ./...