`
m635674608
  • 浏览: 5042880 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

Spark(四) -- Spark工作机制

 
阅读更多

一、应用执行机制

一个应用的生命周期即,用户提交自定义的作业之后,Spark框架进行处理的一系列过程。

在这个过程中,不同的时间段里,应用会被拆分为不同的形态来执行。

1、应用执行过程中的基本组件和形态

Driver:
运行在客户端或者集群中,执行Application的main方法并创建SparkContext,调控整个应用的执行。

Application:
用户自定义并提交的Spark程序。

Job:
一个Application可以包含多个Job,每个Job由Action操作触发。

Stage:
比Job更小的单位,一个Job会根据RDD之间的依赖关系被划分为多个Stage,每个Stage中只存有RDD之间的窄依赖,即Transformation算子。

TaskSet:
每个Stage中包含的一组相同的Task。

Task:
最后被分发到Executor中执行的具体任务,执行Stage中包含的算子。

明确了一个应用的生命周期中会有哪些组件参与之后,再来看看用户是怎么提交Spark程序的。

2、应用的两种提交方式

Driver进程运行在客户端(Client模式):

即用户在客户端直接运行程序。
程序的提交过程大致会经过以下阶段:

  1. 用户运行程序。
  2. 启动Driver进行(包括DriverRunner和SchedulerBackend),并向集群的Master注册。
  3. Driver在客户端初始化DAGScheduler等组件。
  4. Woker节点向Master节点注册并启动Executor(包括ExecutorRunner和ExecutorBackend)。
  5. ExecutorBackend启动后,向Driver内部的SchedulerBackend注册,使得Driver可以找到计算节点。
  6. Driver中的DAGScheduler解析RDD生成Stage等操作。
  7. Driver将Task分配到各个Executor中并行执行。

Driver进程运行在集群中(某个Worker节点,Cluster模式):

即用户将Spark程序提交给Master分配执行。
大致会经过一下流程:

  1. 用户启动客户端,提交Spark程序给Master。
  2. Master针对每个应用分发给指定的Worker启动Driver进行。
  3. Worker收到命令之后启动Driver进程(即DriverRunner和其中的SchedulerBackend),并向Master注册。
  4. Master指定其他Worker启动Executor(即ExecutorRunner和其内部的ExecutorBackend)。
  5. ExecutorBackend向Driver中的SchedulerBackend注册。
  6. Driver中的DAGScheduler解析RDD生产Stage等。
  7. Executor内部启动线程池并行化执行Task。

可以看到,两种程序的提交方式在处理过程中,仅仅是在哪个地方启动Driver进程的区别而已。
为Client模式中时(使用Spark Shell直接执行的程序),Driver就在客户端上。
为Cluster模式时(提交Spark程序到Master),Driver运行与集群中的某个Worker节点。

二、调度与任务分配模块

Spark框架就像一个操作系统一样,有着自己的作业调度策略,当集群运行在不同的模式下,调度不同级别的单位,使用的策略也是有所不同的。

1、Application之间的调度

当有多个用户提交多个Spark程序时,Spark是如何调度这些应用并合理地分配资源呢?

Standalone模式下,默认使用FIFO,每个app会独占所有资源

可以通过以下几个参数调整集群相关的资源:

  • spark.cores.max:调整app可以在整个集群中申请的CPU core数量
  • spark.deploy.defaultCores:默认的CPU core数量
  • spark.executor.memory:限制每个Executor可用的内存

在Mesos模式下,可以使用

  • spark.mesos.coarse=true设置静态配置资源的策略
  • 使用mesos://URL且不配置spark.mesos.coarse=true(每个app会有独立固定的内存分配,空闲时其他机器可以使用其资源)

在Yarn模式下,提交作业时可以使用

  • 通过–num-executors控制分配多少个Executor给app
  • –executor-memory和–executor-cores分别控制Executor的内存和CPU core

2、Application内部的Job调度机制

一个Application中,由各个Action触发的多个Job之间也是存在调度关系的。

Action操作实现上是调用了SparkContext的runJob方法提交Job。

Spark中调度Job有两种策略

FIFO:

  • 第一个Job分配其所需的所有资源
  • 第二个Job如果还有剩余资源的话就分配,否则等待

FAIR:

  • 使用轮询的方式调度Job

可以通过配置spark.scheduler.mode调整Job的调度方式

另外也可以配置调度池,具体参考官方文档
或者参考conf/fairscheduler.xml.template文件。

3、Job中的Stage调度

Stage是由DAGScheduler组件生产的,在源码中,有三个比较特殊的变量:

  • waitingStages:存储等待执行的Stages
  • runningStages:存储正在执行的Stages
  • failedStages:存储执行失败的Stage

Spark会通过广度优先遍历找到最开始的Stage执行,若有父Stage没有执行完则等待。

4、Stage中的Task调度

暂未了解。。。

三、I/O制度

Spark虽然是基于内存计算的框架,但是不可避免的也会接触到一些存储层,那么在和存储层交互的时候,Spark做了哪些工作?

1、序列化

序列化的本质就是将对象转换为字节流,可以理解为将链表中存储的非连续空间的数据存储转化为连续空间存储的数组中

Spark为什么要做序列化操作?

内存或磁盘中RDD会含有对象的存储,而在节点间数据的传输时,序列化之后的数据可以节约空间和提高效率。

2、压缩

压缩是日常生活中的一个常见操作,好处显而易见,节约空间,从而就可以获得时间上的效率。

Spark中序列化之后的数据可以进行压缩以减少空间开销。
Spark支持两种压缩算法

  • Snappy算法:高压缩速度
  • LZF算法:高压缩比

在不同的场景中选择不同的压缩算法可以有效的提高程序运行的效率。

压缩配置方式:

  1. 启动前在spark-env.sh中设置:export SPARK_JAVA_OPTS=”-Dspark.broadcast.compress”
  2. 在应用程序中配置
    conf.getBoolean(“spark.broadcast.compress,true”)
    conf.set(“spark.broadcast.compress”,true)

3、块管理

RDD从物理上看是一个元数据结构,记录着Block和Node之间的映射关系。

存储RDD是以Block块为单位的,每个分区对应一个块,PartitionID通过元数据信息可以映射到Block。

BlockManager管理和接口、块读写流程、数据块读写管理等细节待继续深入了解。

四、通信模块

Spark中使用Akka作为通信框架

  • Actors是一组包含状态和行为的对象
  • 一个Actor接收到其他Actor的信息之后可以根据需求做出各种反应
  • Client、Master、Worker等都是一个Actor

Spark各个组件的之间协调工作都是基于Akka机制来的,待深入了解的有:

  • Client Actor通信代码逻辑
  • Master Actor通信代码逻辑
  • Worker Actor消息处理逻辑

五、容错机制

之前讲过,RDD之间的算子操作会形成DAG图,RDD之间的依赖关系会形成Lineage。

要理解Lineage机制首先要明确两种依赖的概念:

  • Shuffle Dependencies(宽依赖)
    父分区可以被多个子分区所用
    即多对多的关系

  • Narrow Dependencies(窄依赖)
    父分区最多被一个子分区所用
    即一对一或者多对一的关系

当出现某个节点计算错误的时候,会顺着RDD的操作顺序往回走

一旦是Narrow Dependencies错误,重新计算父RDD分区即可,因为其不依赖其他节点

而如果Shuffle Dependencies错误,重算代价较高,因为一旦重新计算其依赖的父RDD分区,会造成冗余计算

这时候就需要人为的添加检查点来提高容错机制的执行效率

什么情况下需要加CheckPoint

  • DAG中的Lineage过长,如果重算开销太大,故在特定几个Shuffle Dependencies上做CheckPoint是有价值的。
  • Checkpoint会产生磁盘开销,因为其就是将数据持久化到磁盘中,所以做检查点的RDD最好是已经在内存中缓存了。

六、Shuffle机制

Shuffle的定义:对无规则的数据进行重组排序等过程

为什么要Shuffle:分布式计算中数据是分布在各个节点上计算的,而汇总统计等操作需要在所有数据上执行

Spark中Shuffle经历的阶段:

Shuffle Write
    将各个节点数据写入到指定分区
        1、根据下一个Stage分区数分成相应的Bucket
        2、将Bucket写入磁盘
Shuffle Fetch
    获取各个分区发送的数据
        1、在存储有Shuffle数据节点的磁盘Fetch需要的数据
        2、Fetch到本地之后进行自定义的聚集函数操作

最后记录一下提交Spark作业的方法
在spark的bin目录下
执行spark-submit脚本
./spark-submit \
–class 入口函数所在的类名全称 \
–master spark master节点的地址(默认端口7077)\
–executor-memory 指定worker中Executor的内存 \
–total-executor-cores 100 \
jar文件所在的目录 \

 

http://blog.csdn.net/qq1010885678/article/details/45728173

分享到:
评论

相关推荐

    spark-3.1.2-bin-hadoop3.2.tgz

    3. **弹性**:Spark提供了容错机制,当工作节点失败时,可以自动恢复丢失的任务。 4. **内存计算**:Spark利用内存进行计算,显著提高了处理速度,减少了磁盘I/O。 5. **交互式Shell**:Spark提供了一个名为`spark-...

    spark-3.2.4-bin-hadoop3.2-scala2.13 安装包

    7. **安全性和监控**: Spark 3.2.4支持身份验证和授权,可以集成Kerberos等安全机制,确保数据和集群的安全。同时,它提供了一套监控和日志系统,如Web UI、metrics系统和事件日志,帮助用户监控作业状态和诊断问题...

    spark-2.3.0-bin-hadoop2.7版本.zip

    3. **多模式支持**:Spark支持多种工作模式,包括本地模式、Standalone集群模式、YARN模式(与Hadoop2.7集成)以及Mesos模式,适应不同的部署环境。 4. **组件丰富**:Spark包含多个模块,如Spark Core、Spark SQL...

    spark-2.4.7-bin-hadoop2.7.tgz

    - **Spark Core**:Spark的基础,提供分布式任务调度、内存管理和错误恢复机制。 - **Spark SQL**:Spark的一个模块,用于处理结构化数据,支持SQL查询和DataFrame API,可以与多种数据源(如Hive、Parquet、JSON...

    spark-2.3.4-bin-hadoop2.7.tgz

    Spark的主要优势在于其内存计算机制,它将数据存储在内存中,从而显著提高了数据处理速度,相比传统的MapReduce模型,性能提升可达数十倍。此外,Spark提供了SQL查询支持(通过Spark SQL),机器学习库(MLlib),图...

    spark-3.0.0-bin-without-hadoop.tgz

    Spark Core是Spark的基础,提供了分布式任务调度和内存管理机制。它负责任务的分发、监控和故障恢复,确保整个集群的稳定运行。此外,Spark Core还支持弹性数据集(Resilient Distributed Datasets, RDDs),这是...

    spark-3.0.1-bin-hadoop2.7.tgz

    在这个版本中,可能包含了改进的容错机制和更高效的窗口操作,以应对实时分析的挑战。 MLlib是Spark的机器学习库,包含了大量的机器学习算法,如分类、回归、聚类、协同过滤等。在Spark 3.0.1中,可能有新的算法...

    spark-2.4.3-bin-hadoop2.7.zip

    例如,加入了更多的SQL函数,支持更多的数据源,以及改进了类型推断机制,使代码更简洁。 2. SQL性能优化:包括优化查询计划、改进Join操作性能以及对Parquet格式的读写速度提升,使得数据分析更快更高效。 3. ...

    spark-2.4.0-bin-without-hadoop.tgz

    Apache Spark 是一个强大的分布式计算框架,专为大数据处理而设计,具有高效、易用和可扩展的特性。Spark 2.4.0 版本是该框架的一个重要里程碑,它包含了许多新特性和性能优化。"spark-2.4.0-bin-without-hadoop" 这...

    spark-2.0.2-bin-hadoop2.4.tgz

    6. **测试Spark**:编写一个简单的Spark程序,如Word Count,验证Spark是否正常工作。 Spark 2.0.2版本引入了一些重要的改进和新特性,比如DataFrame和Dataset API的增强,这使得Spark SQL更加高效且易于使用。此外...

    spark-2.3.4-bin-hadoop2.6.tgz

    2. **Structured Streaming**:Spark的流处理能力进一步增强,提供了更强大的保证和容错机制,支持多路数据源和多路数据输出。 3. **机器学习改进**:MLlib中的算法进行了优化,提供了更丰富的模型评估指标,同时也...

    spark-2.4.6-bin-hadoop2.6.tgz

    此外,它还引入了DataFrame的UDF(用户自定义函数)注册和调用机制,使得用户可以更方便地自定义复杂的数据处理逻辑。 对于Spark Streaming,此版本优化了DStream操作的内存管理,降低了内存溢出的风险,并增强了对...

    spark-2.4.0-bin-hadoop2.7

    8. **容错与性能**:Spark 通过检查点和数据复制等机制实现了高容错性。同时,其内存计算模型极大地减少了 I/O 开销,提高了数据处理速度。Spark 2.4.0 在这些方面做了优化,提升了整体的运行效率。 综上所述,...

    spark-3.2.1 安装包 下载 hadoop3.2

    对于大规模生产环境,通常会选择YARN或Kubernetes,因为它们提供了更好的资源管理和容错机制。 总的来说,Spark 3.2.1是大数据处理领域的重要工具,它的强大功能和高效的性能使其在数据科学、机器学习和实时分析等...

    spark-1.6.3-bin-hadoop2.4.tgz

    RDD提供了一种高度容错的机制,即使在节点故障时也能保持数据完整性。 在这个压缩包中,"spark-1.6.3-bin-hadoop2.4"目录下,你会找到多个子目录和文件,如: 1. `bin`:包含Spark的命令行工具,如`spark-shell`...

    spark-2.4.5-bin-hadoop2.7.rar

    在Spark 2.4.5 中,RDD 的容错机制得到了强化,使得数据处理更具健壮性。此外,DataFrame 和 Dataset 的引入,为开发者提供了更高级别的抽象,简化了开发过程,同时保持了高性能。 DataFrame 是基于Spark SQL的,它...

    spark-streaming-kafka-0-8-assembly_2.11-2.4.5

    `ReceiverStream`则依赖于Spark的接收器机制,数据由Spark工作节点上的接收器获取并存储在内存中。 **库的使用** 在使用pyspark时,我们需要将`spark-streaming-kafka-0-8-assembly_2.11-2.4.5.jar`添加到Spark的类...

    spark1.6.0-src.rar

    Spark是Apache软件基金会下的一...通过阅读和分析源码,不仅可以深入理解Spark的工作机制,还可以学习到分布式计算、内存管理和数据处理的最佳实践。这对于任何想要在大数据领域深入研究的人来说,都是极其宝贵的资源。

    spark-2.2.0-bin-hadoop2.6.tgz

    这种方式允许Spark与Hadoop集群无缝集成,同时利用Hadoop的资源管理和容错机制。 在压缩包`spark-2.2.0-bin-hadoop2.6`中,通常包含以下组件: 1. `bin/`:包含Spark的可执行文件,如`spark-submit`,用于提交...

    spark-2.1.2-bin-hadoop2.6.tgz

    - **容错性**:理解Spark的容错机制,如RDD持久化和检查点,确保任务的可靠执行。 - **性能优化**:利用数据分区、宽依赖优化、广播变量等手段提升Spark应用的性能。 以上就是关于"spark-2.1.2-bin-hadoop2.6.tgz...

Global site tag (gtag.js) - Google Analytics