`
zhangxiong0301
  • 浏览: 359661 次
社区版块
存档分类
最新评论

SPARK调度机制--翻译(原创)

阅读更多

spark作业调度

概述

         spark有多种方式调度各个计算所需的资源.

         首先,每个application(即sparkContext实例)有一组独立的Executor进程。各种cluster manageryarn Resource managerstandalone master等等)为application间的调度提供自己的方式。

         其次,一个application内的各个jobspark action实例)也可能并行执行,前提是各个job是在独立的线程中提交。这种情况在处理网络请求的场景下是正常的,如sharkspark提供了fairscheduler来调度application内的job

 

调度application

         每个spark application有一组专有的executor jvm,这组jvm只用来跑这个application的任务,以及只存储这个application的数据。如果是多用户共享集群,不同的cluster manager,会有不同的资源分配(或调度)方式:

standalone模式:默认情况下,提交到集群的applicationFIFO方式调度,每个被调度的application会使用所有的集群资源。你可以配置spark.cores.max来控制application的资源使用量;若未设置spark.core.max,也可以通过spark.deploy.defaultCores设置默认值。除了控制core的使用,也可以通过spark.executor.memory指定每个executor的内存使用量。

Mesosmemos模式下,通过spark.mesos.coarse=true的配置可以启用静态分区(static partition)功能。同standalone模式一样,也可以通过spark.cores.maxspark.executor.memory分别配置core和内存的使用量。

yarn:yarn模式下,--num-executors控制application使用的executor的数量,--executor-memory  --executor-cores分别控制core和内存使用。

 

         对于memos,还可以实现CPU核心动态共享(dynamic sharing of CPU cores)。这种模式下,每个application同样具有固定和独立的内存,只是在计算空闲时,可以把core分配给其他application使用。这种模式在具有大量不是很活跃的application的情况下很有用。此模式的一个缺点是:当一个application需要重新获得属于自己的核心时,可能由于借用coreapplication正在计算而需要等待,造成不可预测的延迟。开启这种模式,只需要使用memos://URL以及spark.mesos.coarse设置为false(默认值就是false)。

请注意,目前没有一种模式提供application间的内存共享。如果有这种需求,我们推荐使用单个sever application对外提供服务,以便共享一个RDDshark JDBC就是用这种方式提供SQL查询的。

 

动态资源分配(Dynamic Resource Allocation

 

         spark1.2开始,可以根据application的负载动态地增加和减少分配给application的资源。也就是说,你的application在不需要资源的时候会把资源退还给集群,而在需要的时候重新申请获得资源。这在spark集群上有多个application时候很有用。当分配给某个application的资源处于空闲状态,这些资源会退还到集群的资源池从而被其他application使用。spark中的动态资源分配的粒度是executor,通过spark.dynamicAllocation.enabled=true即可开启。

 

         目前这个功能是关闭的,仅在yarn中有效。未来发行版本会在standalonememos coarse-grained模式应用。虽然memos目前在fine-grained模式下有类似的动态资源共享,但是开启动态资源分配可以减少memos在粗粒度的调度延迟。

 

配置

         所有配置都在spark.dynamicAllocation.*的命名空间下。为了使用动态资源分配特性,application必须设置spark.dynamicAllocation.enabled=true,以及通过spark.dynamicAllocation.minExecutors spark.dynamicAllocation.maxExecutors分别设置executor数量的上下界。

         另外,spark应用必须使用独立的shuffle service。这个shuffle service的目的是保存executor产生的文件并提供给后续任务使用,从而使得executor可以安全的移除。要启用独立的shuffle sevice,需要设置spark.shuffle.service.enabled=true。在yarn中,实现该shuffle service的类是 org.apache.spark.yarn.network.YarnShuffleService,该服务会运行在所有的nodemanager。具体启动shuffle service 的步骤如下:

1.编译spark,同时指定YARN profile

2.找到spark-<version>-yarn-shuffle.jar。这是应该是shuffle sevice。如果在编译时候没有指定--tgz,则这个jar包在 $SPARK_HOME/network/yarn/target/scala-<version>目录下;否则就在打包后的发行版的lib目录。

3.拷贝上述jar包到yarn集群的所有nodemanager

4.在每个nodemanageryarn-site.xml配置文件里: yarn.nodemanager.aux-services配置项添加spark_shuffle ;配置项yarn.nodemanager.aux-services.spark_shuffle.class添加org.apache.spark.network.yarn.YarnShuffleService。另外设置所有和spark.shuffle.service.* 相关的配置项。

5.重启所有nodemanager

 

资源分配策略

         从上层看,spark应该在不需要的时候减少executor,在需要的时候动态增加executor。虽然没有确切的方式去预测即将被去掉的executor会马上被重新用到,或者即将被添加的executor马上会空闲,我们需要一些启发式算法来动态增减executor

请求策略

         开启动态分配策略后,application会在task因没有足够资源被挂起的时候去动态申请资源,这种情况意味着该application现有的executor无法满足所有task并行运行。spark一轮一轮的申请资源,当有task挂起或等待spark.dynamicAllocation.schedulerBacklogTimeout时间的时候,会开始动态资源分配;之后会每隔spark.dynamicAllocation.sustainedSchedulerBacklogTimeout时间申请一次,直到申请到足够的资源。每次申请的资源量是指数增长的,即1,2,4,8等。

之所以采用指数增长,出于两方面考虑。其一,开始申请的少是考虑到可能application会马上得到满足;其次要成倍增加,是为了防止application需要很多资源,儿该方式可以在很少次数的申请之后得到满足。

 

删除executor策略

         很简单,当applicationexecutor空闲时间超过spark.dynamicAllocation.executorIdleTimeout后,就会将其删除掉。

华丽的移除executor

         在动态资源分配之前,当application完成或者executor运行出错次数超过限定值时,executor就会安全退出,此时executor的所有状态将不再需要。在动态资源分配情况下,当executor被移除时,application还在执行。如果application要访问被移除executor的状态,则需要重新计算其状态。因此,spark需要一种机制确保移除executor之前保存executor的状态。

         这个需求对shuffle是很重要的。在shuffle过程中,spark executor首先保存它的map结果文件到本地磁盘上,然后以server的身份供其他executor来获取文件。对于某些executor执行比其他executor慢很多的情况,动态资源分配会移除空闲的executor,当后续任务需要被移除executor的结果文件时,就要重新计算结果文件了。

保存已移除executor的结果文件的方式就是使用独立的shuffle server。这个server是一个在所有nodemanager都会长期运行的进程。当开启独立shuffle server时,executor将会从该server直接读取文件而不是相互之间获取文件。这样executor产生的结果文件就会比executor具有更长的生命周期。

         另外,除了executorshuffle文件,executor还会在磁盘或内存缓存数据。当executor被移除后,这些缓存的数据将不再可用。目前还没有解决方案。

 

 

application内调度

         在一个spark application内,由不同线程提交的各个job可以并行执行,job指的是action以及关联的task

         默认情况下,spark调度器采用FIFO方式。每个job被分成多个stage,第一个job会得到所有可用的资源并计算,运行完成后第二个job开始运行。如果第一个job不需要所有资源,则第二个job可以马上执行;但是当第一个作业很大的时候,第二个作业则会有很大的延迟。

 

         spark 0.8版本开始,spark开始支持公平调度job。这样,不同jobtasksround-robin的方式调度,从而job之间以近似公平的方式被调度。这也意味着一个长时间运行的作业运行时提交一个短作业,则这个短作业也可以在合理的时间内完成。为了使用公平调度,需要设置spark.scheduler.mode=FAIR 


val conf =newSparkConf().setMaster(...).setAppName(...)

conf.set("spark.scheduler.mode","FAIR")

val sc =newSparkContext(conf)

公平调度池

         公平调度支持把job归类到组,每个池里的job采用不同的调度策略。例如,可以为重要的jobs创建高优先级的池,或者把不同用户的job放到不同的组,然后给用户配置相同的资源量从而不至于某些用户的作业少而得到更少的资源。

         在无任何干预的情况下,job被提交到默认池。可以通过设置spark.scheduler.pool本地属性设置job的池。例如,sc.setLocalProperty("spark.scheduler.pool", "pool1")

         当设置了这个本地属性后,所有由这个线程提交的job都会进入这个池。清除一个池也很简单:sc.setLocalProperty("spark.scheduler.pool", null)

 

池的默认行为

         默认情况下,各个池会用相同分量的集群资源,但池内jobs采用FIFO调度。例如,每个用户创建了一个池,则这些用户将有等量的集群资源运行自己的job,但是每个用户的作业将按顺序执行,而不会是后面的job在前面的job运行完之前运行。

 

池属性配置

         池的一些属性是可以修改的,每个池包含三个属性:

schedulingModeFIFOFAIR

weight:这个用来控制此池相对于其他池对集群资源的使用量,默认值为1,即所有池平分集群资源。如果配置一个池的weight2,则分配给该池的资源将是其他池的两倍。

minShare:除了全局的weight,也可以通过设置minShare来保证某个池的最少资源分配量。公平调度器会尽量满足所有池的minShare量,然后才会考虑按weight分配多余的资源。通过minShare可以保证一个池总是能得到一定数量的资源。该属性默认值为0.这个数值应该是core的数量。

 

         池的属性可以通过XML文件制定,类似conf/fairscheduler.xml.template。然后在代码里加上,conf.set("spark.scheduler.allocation.file", "/path/to/file"),就可以了。

一个文件的例子是:

<?xml version="1.0"?>
<allocations>
  <poolname="production">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
  <poolname="test">
    <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>3</minShare>
  </pool>

 

</allocations>
分享到:
评论

相关推荐

    spark-3.1.2.tgz & spark-3.1.2-bin-hadoop2.7.tgz.rar

    Spark-3.1.2.tgz和Spark-3.1.2-bin-hadoop2.7.tgz是两个不同格式的Spark发行版,分别以tar.gz和rar压缩格式提供。 1. Spark核心概念: - RDD(弹性分布式数据集):Spark的基础数据结构,是不可变、分区的数据集合...

    spark-assembly-1.5.2-hadoop2.6.0.jar

    Spark-assembly-1.5.2-hadoop2.6.0.jar中的优化包括RDD(弹性分布式数据集)的缓存策略、Task调度优化、内存管理优化等,以确保在大数据处理中实现高效的性能。 7. 开发和调试: 开发者在本地开发时,可以直接...

    spark-2.1.0-bin-without-hadoop版本的压缩包,直接下载到本地解压后即可使用

    在Ubuntu里安装spark,spark-2.1.0-bin-without-hadoop该版本直接下载到本地后解压即可使用。 Apache Spark 是一种用于大数据工作负载的分布式开源处理系统。它使用内存中缓存和优化的查询执行方式,可针对任何规模...

    spark-3.2.1-bin-hadoop2.7.tgz

    这个名为"spark-3.2.1-bin-hadoop2.7.tgz"的压缩包是Spark的一个特定版本,即3.2.1,与Hadoop 2.7版本兼容。在Linux环境下,这样的打包方式方便用户下载、安装和运行Spark。 Spark的核心设计理念是快速数据处理,...

    spark-2.0.0-bin-hadoop2.6.tgz

    本资源是spark-2.0.0-bin-hadoop2.6.tgz百度网盘资源下载,本资源是spark-2.0.0-bin-hadoop2.6.tgz百度网盘资源下载

    spark-3.1.3-bin-without-hadoop.tgz

    这个"spark-3.1.3-bin-without-hadoop.tgz"压缩包是Spark的3.1.3版本,不含Hadoop依赖的二进制发行版。这意味着在部署时,你需要自行配置Hadoop环境,或者在不依赖Hadoop的环境中运行Spark。 Spark的核心特性包括...

    spark-3.2.4-bin-hadoop3.2-scala2.13 安装包

    在本安装包“spark-3.2.4-bin-hadoop3.2-scala2.13”中,包含了用于运行Spark的核心组件以及依赖的Hadoop版本和Scala编程语言支持。以下是对这些关键组成部分的详细解释: 1. **Spark**: Spark的核心在于它的弹性...

    spark-3.2.0-bin-hadoop3.2.tgz

    这个压缩包"spark-3.2.0-bin-hadoop3.2.tgz"包含了Spark 3.2.0版本的二进制文件,以及针对Hadoop 3.2的兼容构建。 Spark的核心组件包括:Spark Core、Spark SQL、Spark Streaming、MLlib(机器学习库)和GraphX(图...

    spark-3.1.3-bin-hadoop3.2.tgz

    在这个特定的压缩包"spark-3.1.3-bin-hadoop3.2.tgz"中,我们得到了Spark的3.1.3版本,它已经预编译为与Hadoop 3.2兼容。这个版本的Spark不仅提供了源码,还包含了预编译的二进制文件,使得在Linux环境下快速部署和...

    apache-doris-spark-connector-2.3_2.11-1.0.1

    Spark Doris Connector(apache-doris-spark-connector-2.3_2.11-1.0.1-incubating-src.tar.gz) Spark Doris Connector Version:1.0.1 Spark Version:2.x Scala Version:2.11 Apache Doris是一个现代MPP分析...

    spark-1.6.0-bin-hadoop2.6.tgz

    Spark-1.6.0-bin-hadoop2.6.tgz 是针对Linux系统的Spark安装包,包含了Spark 1.6.0版本以及与Hadoop 2.6版本兼容的构建。这个安装包为在Linux环境中搭建Spark集群提供了必要的组件和库。 **1. Spark基础知识** ...

    spark-3.0.0-bin-hadoop2.7.tgz

    Spark-3.0.0-bin-hadoop2.7.tgz 是Spark 3.0.0版本的预编译二进制包,其中包含了针对Hadoop 2.7版本的兼容性构建。这个版本的发布对于数据科学家和大数据工程师来说至关重要,因为它提供了许多性能优化和新功能。 1...

    spark-3.1.2-bin-hadoop3.2.tgz

    5. **交互式Shell**:Spark提供了一个名为`spark-shell`的交互式环境,方便开发人员测试和调试代码。 **Spark与Hadoop 3.2的兼容性** Hadoop 3.2引入了许多新特性,如: 1. **多命名空间**:支持多个HDFS命名空间...

    spark-3.0.0-bin-hadoop3.2

    在本场景中,我们讨论的是Spark的3.0.0版本,与Hadoop3.2相结合的二进制发行版——"spark-3.0.0-bin-hadoop3.2"。这个压缩包是为了在Windows操作系统下运行Spark而设计的,因此标签明确指出它是适用于Windows平台的...

    spark-2.4.0-bin-hadoop2.7.tgz

    然后,你可以通过`spark-submit`命令提交Spark作业到集群,或者使用`pyspark`或`spark-shell`启动交互式环境。 在实际应用中,Spark常被用于大数据分析、实时数据处理、机器学习模型训练和图数据分析。由于其内存...

    spark-hive-thriftserver_2.11-2.1.3-SNAPSHOT-123456.jar

    spark-hive-thriftserver_2.11-2.1.spark-hive-thrift

    spark-2.3.1-bin-hadoop2.7.zip

    - `bin`:包含Spark的可执行脚本,如`spark-shell`(Scala交互式环境)、`pyspark`(Python交互式环境)和`spark-submit`(提交Spark应用)等。 - `conf`:配置文件目录,其中`spark-defaults.conf`是默认配置,可以...

    spark-assembly-1.5.2-hadoop2.6.0jar包

    Spark-assembly-1.5.2-hadoop2.6.0.jar是Apache Spark的一个关键组件,主要用于在Scala环境中开发Spark应用程序。这个特定的版本(1.5.2)与Hadoop 2.6.0兼容,这意味着它可以无缝地运行在支持Hadoop 2.6.0的集群上...

    spark-2.3.4-bin-hadoop2.7.tgz

    在本案例中,我们关注的是Spark的2.3.4版本,它预编译为与Hadoop 2.7兼容的版本,打包成"spark-2.3.4-bin-hadoop2.7.tgz"的压缩文件。这个压缩包包含了运行Spark所需的所有组件,包括Java库、Python库(pyspark)、...

    spark-3.2.0-bin-hadoop3-without-hive

    本压缩包“spark-3.2.0-bin-hadoop3-without-hive”则特别针对不包含 Hive 支持的环境进行打包,更专注于基础的 Spark 与 Hadoop 3 的集成。 首先,让我们深入了解 Spark 3.2.0 的主要改进。这一版本引入了新的 SQL...

Global site tag (gtag.js) - Google Analytics