`
baishuo491
  • 浏览: 78306 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

spark源码分析--spark的任务调度(standalone模式)

阅读更多
原创,转载请注明出处  http://baishuo491.iteye.com/blog/1994026 ,作者邮箱:vc_java@hotmail.com,新浪微博:爱看历史的码农--白硕

在sparkContext的建立过程中(更细致的说是clientActor的preStart回调函数中),会向master发送RegisterApplication消息
master ! RegisterApplication(appDescription)
  当master收到RegisterApplication请求后:
  app = addApplication(description, sender)
  .....
  waitingApps += app
  .....
  schedule()
  通过传入的(appDescription)和发送者,创建一个addApplication,然后把app加入到等待队列waitingApps中,之后再调用schedule函数进行调度 
  再来看看schedule的流程:
    当等待队列waitingApps里有数据的时候,对waitingApps里的每个元素app做如下操作:
      从已经注册的works里面,选择合适的works列表usableWorkers
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE).filter(canUse(app, _)).sortBy(_.coresFree).reverse
从usableWorkers的空闲cpu中,选择合适数量的cpu,为当前app进行分配
      这之后调用launchExecutor,并把app的状态设置为running
launchExecutor(usableWorkers(pos), exec, app.desc.sparkHome);
app.state = ApplicationState.RUNNING
 
再来看看launchExecutor这个函数
先在每个传入的workerInfo参数里面,记录当前的app,和已经消耗的cpu和内存
      接着向worker发送LaunchExecutor消息,向client发送ExecutorAdded消息
    
 worker.addExecutor(exec)
      worker.actor ! LaunchExecutor(
      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome)
      exec.application.driver ! ExecutorAdded(
      exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
     
  我们再来看看worker收到LaunchExecutor后的执行步骤:
创建一个实际执行任务的ExecutorRunner并启动它
      更新已经使用的cpu和内存数量
      然后向master发送ExecutorStateChanged消息
 val manager = new ExecutorRunner(
        appId, execId, appDesc, cores_, memory_, self, workerId, host, new File(execSparkHome_), workDir)
      executors(appId + "/" + execId) = manager
      manager.start()
      coresUsed += cores_
      memoryUsed += memory_
      master ! ExecutorStateChanged(appId, execId, ExecutorState.RUNNING, None, None)
     
ExecutorRunner的start的函数,主要任务就是启动一个java后台线程,这个线程执行fetchAndRunExecutor函数,这个函数的主要流程如下:
      拼接出一个字符串,其内容是用JAVA_HOME\bin\java命令,执行一个类StandaloneExecutorBackend
      这个执行过程不在当前的jvm下执行。而是通过ProcessBuilder新建了一个jvm进程,单独执行的,相当于就是启动了一个StandaloneExecutorBackend实例
val command = buildCommandSeq()
      val builder = new ProcessBuilder(command: _*).directory(executorDir)
      val env = builder.environment()
      for ((key, value) <- appDesc.command.environment) {
        env.put(key, value)
      }
      ......
      process = builder.start()

看看StandaloneExecutorBackend做了什么:
      创建了一个actor,在这个actor的在构造过程中,就是向客户端的driver发送RegisterExecutor(executorId, hostPort, cores)消息,然后开始等待消息     
上面发送的那个消息,会被driver端的StandaloneSchedulerBackend接受到
      它回复一个RegisteredExecutor(sparkProperties)消息,在修改了一系列的资源记录后,调用makeOffers()函数
      makeOffers()的作用是在拼装好要执行的任务列表tasks之后,把一个重要的变量hasLaunchedTask设置成true(借助scheduler.resourceOffers函数)
      这之后,向StandaloneExecutorBackend发送LaunchTask(task)消息
      这里可以看到RegisteredExecutorLaunchTask消息都是发送给了StandaloneExecutorBackend,我们来看看它收到消息后的动作    
      在StandaloneExecutorBackend收到RegisteredExecutor(sparkProperties)消息后,会创建一个excutor变量,里面有创建了一个ThreadPoolExecutor线程池,名叫threadPool
      当它再收到LaunchTask(task)消息之后,就通过executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)函数,调用刚刚创建的threadPool线程池,执行传入的task任务
     
threadPool.execute(new TaskRunner(context, taskId, serializedTask))
      而TaskRunner就是这个任务真正被执行的地方
  • 大小: 219.2 KB
分享到:
评论

相关推荐

    spark-2.1.1-bin-hadoop2.7.tgz.7z

    1. **Spark Core**:这是Spark的基础,提供了分布式任务调度、内存管理、错误恢复和与其他存储系统的接口。 2. **Spark SQL**:用于处理结构化数据,它支持SQL查询并通过DataFrame API提供了与多种数据源的交互。 3....

    spark-1.6.0-bin-hadoop2.4.tgz

    5. **启动Spark**:启动Spark的Master和Worker节点,如果使用的是standalone模式,可以通过`sbin/start-all.sh`命令启动。 6. **测试运行**:使用简单的Spark应用,如WordCount,验证Spark是否安装和配置成功。 在...

    spark-2.4.0-bin-hadoop2.6.tgz

    5. **运行模式**:Spark支持多种运行模式,包括本地模式(方便开发测试)、standalone模式(Spark自带的集群管理器)、YARN模式(使用Hadoop的资源管理器)和Mesos模式(Mesos集群管理器)。在Hadoop 2.6环境中,...

    spark-2.4.4-bin-hadoop2.6.tgz

    Spark支持多种部署模式,包括本地模式、Standalone模式、YARN模式和Mesos模式,可以根据实际需求选择合适的部署方式。 8. **Spark Job调度**: Spark使用FIFO和Fair Scheduler,根据作业优先级和资源需求进行任务...

    spark-2.1.0-bin-without-hadoop.tgz.7z

    此外,还需要设置SPARK_HOME环境变量,并在启动时指定master节点,例如本地模式(`--master local[n]`)、standalone模式(`--master spark://&lt;master_ip&gt;:&lt;port&gt;`)或YARN模式(`--master yarn-client`或`--master ...

    spark-2.4.0-bin-hadoop2.7.zip

    Spark支持四种运行模式:本地模式(用于测试)、集群模式(如YARN、Mesos或standalone)、Spark on Kubernetes以及云服务提供商的托管Spark。 6. **编程接口**: Spark提供了Python(PySpark)、Java、Scala和R的...

    spark-2.3.3-bin-hadoop2.6.tgz

    用户可以将"spark-2.3.3-bin-hadoop2.6"解压后在本地模式、集群模式(如YARN、Mesos或standalone)下运行Spark。配置文件如`conf/spark-defaults.conf`用于设置各种参数,如内存分配、日志级别等。 5. **编程接口*...

    spark-2.4.5-bin-without-hadoop.tgz.7z

    同时,可能需要配置其他的资源管理器,如YARN或Mesos,或者使用Standalone模式自建Spark集群。 Spark的核心组件主要包括:Spark Core、Spark SQL、Spark Streaming、MLlib(机器学习库)和GraphX(图计算)。Spark ...

    spark-2.3.1-bin-hadoop2.6.tgz

    10. **资源管理和调度**:Spark使用自己的资源管理器(称为standalone模式),也可以通过YARN或Mesos等外部资源管理系统运行。 11. **性能优化**:Spark利用内存计算,将中间结果存储在内存中,减少磁盘I/O,提高...

    Spark-2.3.1源码解读

    standalone模式下executor调度策略 Spark Sql源码阅读 Spark Sql源码阅读 hive on spark调优 Spark SQL 多维聚合分析应用案例 Spark Streaming源码阅读 动态发现新增分区 Dstream join 操作和 RDD join 操作的...

    Spark The Definitive Guide-201712

    8. **Spark部署**:Spark可以运行在多种模式下,如本地模式、standalone模式、Mesos、YARN或Kubernetes。理解这些部署选项及其优缺点对于实际操作至关重要。 9. **案例研究**:书中可能包含多个真实世界的应用场景...

    Spark源码分析3-The connect between driver,master and excutor

    Spark支持两种主节点模式:standalone模式下的Master节点和Mesos或YARN资源管理器。当驱动器向Master注册时,Master会为应用程序分配资源,确保作业的执行。 3. **执行器(Executor)**: 执行器是运行在工作节点...

    spark源码-master

    "Spark源码-Master"通常指的是Spark项目的主要控制节点,即集群管理器的源代码,它负责调度任务、监控资源分配以及管理Spark作业的生命周期。 在Spark中,Master节点是集群的中心管理者,它维护着整个集群的状态,...

    spark-3.2.1 安装包 集成 hadoop2.7

    5. 使用`spark-submit`脚本提交Spark作业到YARN上运行,或者在本地模式或standalone模式下启动Spark Shell进行交互式测试。 Spark的使用场景广泛,涵盖了数据批处理、实时流处理、机器学习和图计算等。在大数据领域...

    spark-2.1.1-bin-hadoop2.6.tgz

    6. **Spark部署模式**:Spark支持本地模式、集群模式(standalone、Mesos、YARN)以及云部署。用户可以根据自己的需求选择合适的部署策略。 7. **Spark性能优化**:包括调整Executor数量、内存分配、shuffle操作的...

    spark-3.0.3-bin-hadoop2.7.tgz

    4. 启动Spark集群,可以是standalone模式、Mesos模式或YARN模式。 5. 编写Spark应用程序,使用Scala、Java、Python或R语言。 6. 使用`spark-submit`工具提交你的Spark应用到集群上执行。 通过Spark,用户可以快速...

    Spark源码剖析

    在 Standalone 模式下,Spark 自带了简单的调度器,而在集群环境中,它可以利用资源管理器来更有效地分配任务。 7. **容错机制**:Spark 通过 lineage(血统)来实现容错,如果某部分数据丢失,可以通过重新计算...

    spark期末考核--一班.zip

    - **Standalone模式**:Spark自带的集群管理模式。 - **YARN模式**:集成于Hadoop YARN资源管理器。 - **Mesos模式**:可在Mesos集群上运行。 5. **Spark优化** - **Tuning参数**:如executor内存、core数量等...

    Spark学习总结-入门.rar_Spark!_spark_spark入门_大数据 spark

    Spark可以运行在多种模式下,包括本地模式、standalone模式、Mesos、YARN和Kubernetes,以适应不同的集群环境。 7. **Spark性能优化** - **内存管理**:通过配置缓存策略和调整executor内存分配,可以提高数据...

    spark 分布式集群搭建

    在 Spark Standalone 模式下,Master 负责接收任务并将任务分发给 Worker 节点。具体操作如下: - 下载 wordcount 示例代码。 - 使用 Maven 打包项目并上传至 Spark 集群。 - 使用以下命令提交作业: ```bash ./...

Global site tag (gtag.js) - Google Analytics