`
wbj0110
  • 浏览: 1617566 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

Spark Core源码分析: Spark任务模型

阅读更多

概述

 

一个Spark的Job分为多个stage,最后一个stage会包括一个或多个ResultTask,前面的stages会包括一个或多个ShuffleMapTasks。

ResultTask执行并将结果返回给driver application。

ShuffleMapTask将task的output根据task的partition分离到多个buckets里。一个ShuffleMapTask对应一个ShuffleDependency的partition,而总partition数同并行度、reduce数目是一致的。

 

 

Task

 

Task的代码在scheduler package下。

抽象类Task构造参数如下:

Java代码  收藏代码
  1. private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable  

 

Task对应一个stageId和partitionId。

提供runTask()接口、kill()接口等。

提供killed变量、TaskMetrics变量、TaskContext变量等。

 

除了上述基本接口和变量,Task的伴生对象提供了序列化和反序列化应用依赖的jar包的方法。原因是Task需要保证工作节点具备本次Task需要的其他依赖,注册到SparkContext下,所以提供了把依赖转成流写入写出的方法。

 

 

Task的两种实现

 

ShuffleMapTask

 

ShuffleMapTask构造参数如下,

Java代码  收藏代码
  1. private[spark] class ShuffleMapTask(  
  2.     stageId: Int,  
  3.     var rdd: RDD[_],  
  4.     var dep: ShuffleDependency[_,_],  
  5.     _partitionId: Int,  
  6.     @transient private var locs: Seq[TaskLocation])  
  7.   extends Task[MapStatus](stageId, _partitionId)  

RDD partitioner对应的是ShuffleDependency。

 

ShuffleMapTask复写了MapStatus向外读写的方法,因为向外读写的内容包括:stageId,rdd,dep,partitionId,epoch和split(某个partition)。对于其中的stageId,rdd,dep有统一的序列化和反序列化操作并会cache在内存里,再放到ObjectOutput里写出去。序列化操作使用的是Gzip,序列化信息会维护在serializedInfoCache=newHashMap[Int, Array[Byte]]。这部分需要序列化并保存的原因是:stageId,rdd,dep真正代表了本次Shuffle Task的信息,为了减轻master节点负担,把这部分序列化结果cache了起来。

 

 

Stage执行逻辑

 

主要步骤如下:

Java代码  收藏代码
  1. val ser = Serializer.getSerializer(dep.serializer)  
  2. shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)  

这一步是初始化一个ShuffleWriterGroup,Group里面是一个BlockObjectWriter数组。

 

Java代码  收藏代码
  1. for (elem <- rdd.iterator(split, context)) {  
  2. val pair = elem.asInstanceOf[Product2[Any, Any]]  
  3.   val bucketId = dep.partitioner.getPartition(pair._1)  
  4.   shuffle.writers(bucketId).write(pair)  
  5. }  

这一步是为每个Writer对应一个bucket,调用每个BlockObjectWriter的write()方法写数据

 

 

 

Java代码  收藏代码
  1. var totalBytes = 0L  
  2. var totalTime = 0L  
  3. val compressedSizes: Array[Byte] =   
  4. shuffle.writers.map { writer: BlockObjectWriter =>  
  5.     writer.commit()  
  6.     writer.close()  
  7. val size = writer.fileSegment().length  
  8.     totalBytes += size  
  9. totalTime += writer.timeWriting()  
  10. MapOutputTracker.compressSize(size)  
  11. }  

这一步是执行writer.commit(),并得到结果file segment大小,对总大小压缩

 

Java代码  收藏代码
  1. val shuffleMetrics = new ShuffleWriteMetrics  
  2. shuffleMetrics.shuffleBytesWritten = totalBytes  
  3. shuffleMetrics.shuffleWriteTime = totalTime  
  4. metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)  
  5.   
  6. success = true  
  7. new MapStatus(blockManager.blockManagerId, compressedSizes)  

这一步是记录metrcis信息,最后返回一个MapStatus类,里面是本地ShuffleMapTask结果的相关信息。

 

最后会release writers,让对应的shuffle文件得到记录和重用(ShuffleBlockManager管理这些file,这些file是Shuffle Task中一组Writer写的对象)。

主要把下图看懂。

 

重要类

介绍涉及到的重要外部类,帮助理解。

ShuffleBlockManager

 

整体梳理:

ShuffleState维护了两个ShuffleFileGroup的ConcurrentLinkedQueue,以记录目前shuffle的state。

ShuffleState记录了一次shuffle操作的文件组状态,在ShuffleBlockManager内用Map为每个shuffleId维护了一个ShuffleState。

每个shuffleId通过forMapTask()方法得到一组writer,即ShuflleWriterGroup。这组里的writers共享一个shuffleId和mapId,但是每个对应不同的bucketId和file。在为writer分配FileGroup的时候,会从shuffleId对应的shuffle state里先取unusedFileGroup,如果不存在,则在HDFS上新建File。

对于HDFS上的目标file,writer是可以append写的。在新建file的时候,是根据shuffleId和bucket number和一个递增的fileId来创建新的文件的。

 

 

 

ShuffleFileGroup的重用files和记录mapId,index,offset这块似懂非懂。

 

重要方法:

Java代码  收藏代码
  1. def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = { new ShuffleWriterGroup {} }  

 

该方法被一个ShuffleMapTask调用,传入了这次shuffle操作的id,mapId是partitionId。Buckects数目等于分区数目。该方法返回的ShuffleWriterGroup里面是一组DiskBlockObjectWriter,每一个writer都属于这一次shuffle操作,所以他们有共同的shuffleId,mapId,但是他们对应了不同的bucket,并且各自对应一个file。

 

在shuffle run里的调用和参数传入:

Java代码  收藏代码
  1. val ser = Serializer.getSerializer(dep.serializer)  
  2. shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)  

shuffleId是由ShuffleDependency获得的全局唯一id,代表本次shuffle任务id

mapId等于partitionId

Bucket数目等于分区数目

 

产生writers:

Writer类型是DiskBlockObjectWriter,数目等于buckets数目。bufferSize的设置:

Java代码  收藏代码
  1. conf.getInt("spark.shuffle.file.buffer.kb"100) * 1024  

blockId产生自:

Java代码  收藏代码
  1. blockId = ShuffleBlockId(shuffleId, mapId, bucketId)  

 

在生成writer的时候调用的是BlockManager的getDiskWriter方法,ShuffleBlockManager初始化的时候绑定BlockManager。

Java代码  收藏代码
  1. private[spark] class DiskBlockObjectWriter(  
  2.     blockId: BlockId,  
  3.     file: File,  
  4.     serializer: Serializer,  
  5.     bufferSize: Int,  
  6.     compressStream: OutputStream => OutputStream,  
  7.     syncWrites: Boolean)  
  8.   extends BlockObjectWriter(blockId)  

 

ShuffleFileGroup:私有内部类,对应了一组shuffle files,每个file对应一个reducer。一个Mapper会分到一个ShuffleFileGroup,把mapper的结果写到这组File里去。

 

MapStatus

 

注意到ShuffleMapTask的类型是MapStatus类。MapStatus类是ShuffleMapTask要返回给scheduler的执行结果,包括两个东西:

Java代码  收藏代码
  1. class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte])  

 

前者是run这次task的block manager地址(BlockManagerId是一个类,保存了executorId,host, port, nettyPort),后者是output大小,该值会传给接下来的reduce任务。该size是被MapOutputTracker压缩过的。

 

 

MapStatus类提供了两个方法如下,ShuffleMapTask进行了复写。

Java代码  收藏代码
  1. def writeExternal(out: ObjectOutput) {  
  2.   location.writeExternal(out)  
  3.   out.writeInt(compressedSizes.length)  
  4.   out.write(compressedSizes)  
  5. }  
  6.   
  7. def readExternal(in: ObjectInput) {  
  8.   location = BlockManagerId(in)  
  9.   compressedSizes = new Array[Byte](in.readInt())  
  10.   in.readFully(compressedSizes)  
  11. }  

 

 

BlockManagerId

 

 

BlockManagerId类构造依赖executorId, host, port, nettyPort这些信息。伴生对象维护了一个blockManagerIdCache,实现为ConcurrentHashMap[BlockManagerId,BlockManagerId]() 。

比如MapStatus的readExternal方法把ObjectInput传入BlockManagerId构造函数的时候,BlockManagerId的apply()方法就会根据ObjectInput取出executorId, host, port,nettyPort信息,把这个BlockManagerIdobj维护到blockManagerIdCache

 

 

ResultTask

 

构造参数

Java代码  收藏代码
  1. private[spark] class ResultTask[T, U](  
  2.     stageId: Int,  
  3.     var rdd: RDD[T],  
  4.     var func: (TaskContext, Iterator[T]) => U,  
  5.     _partitionId: Int,  
  6.     @transient locs: Seq[TaskLocation],  
  7.     var outputId: Int)  
  8.   extends Task[U](stageId, _partitionId) with Externalizable {  

 

ResultTask比较简单,runTask方法调用的是rdd的迭代器:

Java代码  收藏代码
  1. override def runTask(context: TaskContext): U = {  
  2.   metrics = Some(context.taskMetrics)  
  3.   try {  
  4.     func(context, rdd.iterator(split, context))  
  5.   } finally {  
  6.     context.executeOnCompleteCallbacks()  
  7.   }  
  8. }  

 

进程模型 vs. 线程模型

Spark同节点上的任务以多线程的方式运行在一个JVM进程中。

 

优点:

启动任务快

共享内存,适合内存密集型任务

Executor所占资源可重复利用

 

缺点:

同节点上的所有任务运行在一个进程中,会出现严重的资源争用,难以细粒度控制每个任务的占用资源。MapReduce为Map Task和Reduce Task设置不同资源,细粒度控制任务占用资源量。

 

MapReduce的每个Task都是一个JVM进程,都要经历:资源申请->运行任务->释放资源的过程

 

每个节点可以有一个或多个Executor,Executor配有一定数量slots,Executor内可以跑多个Result Task和ShuffleMap Task。

在共享内存方面,broadcast的变量会在每个executor里存一份,这个executor内的任务可以共享。

 

http://jgsj.iteye.com/blog/2050690

分享到:
评论

相关推荐

    深入理解Spark:核心思想及源码分析.pdf

    《深入理解Spark:核心思想及源码分析》这本书旨在帮助读者深入掌握Apache Spark这一大数据处理框架的核心原理与实现细节。Spark作为一个快速、通用且可扩展的数据处理系统,已经在大数据领域得到了广泛应用。它提供...

    Spark源码分析.pdf

    通过分析Spark源码,我们可以了解其内部工作机制,如任务调度、数据分发、故障恢复等机制。例如,DAGScheduler如何将作业拆分成Stage,TaskScheduler如何将任务分配到Executor,以及如何利用BlockManager和Storage层...

    spark-2.4.0源码

    10. **Spark UI和事件日志**:Spark提供了Web UI来监控作业状态,源码中可以看到如何实现这些可视化工具,同时Spark还支持事件日志,便于后期分析作业行为。 深入理解Spark源码,有助于开发者优化应用程序性能、...

    spark1.3.1源码下载

    - `core/`:包含Spark核心组件的源码,如`spark-core`、`spark-network`等子模块。 - `sql/`:Spark SQL的相关代码,包括DataFrame API和Hive的集成。 - `streaming/`:Spark Streaming的实现,包括DStream...

    spark源码分析

    总结,Spark源码分析是一个涵盖广泛的主题,涉及Spark的设计理念、核心组件、内存管理、任务调度等多个方面。深入学习Spark源码,不仅能提升我们对大数据处理的理解,还能为日常开发工作提供有力的支持。

    Apache Spark源码走读:如何进行代码跟读

    在进行源码分析的过程中,往往需要对代码进行一些修改来辅助理解和测试。如果不想提交这些临时修改到版本控制系统,可以使用git的`stash`功能暂时保存这些更改,以便随时恢复而不影响其他工作。 **示例**:假设你...

    spark2.3源码下载

    Spark Core是基础,提供分布式任务调度和内存管理;Spark SQL负责处理SQL查询和数据处理,支持Hive等传统SQL系统;Spark Streaming处理实时流数据,提供DStream抽象;MLlib是机器学习库,包含多种算法和工具。 二、...

    Spark-2.4.5官网下载源码包

    8. Spark Streaming的微批处理模型:将实时流数据转化为一系列小的批处理作业,实现低延迟处理。 9. MLlib:Spark的机器学习库,包括分类、回归、聚类、协同过滤等多种算法,以及模型选择和评估工具。 10. GraphX:...

    Spark github源码 例子很有价值

    "Spark github源码 例子很有价值"这个标题表明,通过分析这些源码,我们可以学习到Spark如何实现其核心功能,如弹性分布式数据集(RDD)、DataFrame、Dataset以及Spark SQL等。 描述中提到"github无法下载的可以直接...

    spark2.2.0源码包(含分析文档),包含机器学习mlib 及ml

    7. **源码解析**:源码分析文档可能详细讲解了Spark的各个组件是如何协同工作的,包括RDD的生命周期、调度器的工作方式、DataFrame/Dataset的优化策略、机器学习算法的实现细节等。 对于机器学习开发者而言,MLlib...

    深入理解Spark 核心思想与源码分析

    源码分析方面,Spark的设计遵循模块化的架构,主要包括以下几个部分: 1. **Spark Core**:提供基本的分布式计算框架,包括DAG调度、任务调度、内存管理、错误恢复和集群资源管理。 2. **Spark SQL**:Spark的SQL...

    spark-2.2.0.tgz源码

    4. **DAG执行模型**:Spark通过将任务拆分为一系列任务阶段,并构建有向无环图(DAG)来执行任务。每个任务阶段由一系列可以并行运行的任务组成,这些任务之间通过RDD的依赖关系连接。 5. **存储系统集成**:Spark...

    spark 3.2.1 源码下载

    1. **Core Spark**:这是Spark的核心模块,负责任务调度、内存管理、故障恢复以及与存储系统的交互。它提供了一种弹性分布式数据集(Resilient Distributed Datasets, RDDs)的概念,RDD是Spark处理数据的基本单元。...

    spark(svn自动下载)源码

    Spark源码分析首先需要了解其核心组件和架构。Spark主要由以下几个部分组成: 1. **Spark Core**:这是Spark的基础,提供分布式任务调度、内存管理、错误恢复以及与其他存储系统交互的能力。它定义了RDD(弹性...

    Spark2.6.3源码

    源码分析对于深入理解Spark的工作原理和进行二次开发至关重要。 首先,Java目录是Spark源码的核心部分,它包含了Spark的大部分核心组件和功能的实现。以下是一些主要的子目录及其相关的知识点: 1. **core**:这是...

    spark高级数据分析源代码

    1. Spark Core:Spark的基础组件,提供了分布式任务调度、内存管理、错误恢复等基础功能。 2. Spark SQL:Spark的SQL接口,可以处理结构化和半结构化数据,与Hive兼容,支持DataFrame和Dataset API。 3. Spark ...

    spark-2.4.4-bin-hadoop2.6.tgz

    - **Spark Core**:基础执行引擎,负责任务调度、内存管理、故障恢复等。 - **Spark SQL**:提供SQL和DataFrame/Dataset API,用于结构化和半结构化数据处理,与Hive兼容。 - **Spark Streaming**:处理连续数据...

    spark-2.2.1.tar.gz 源码

    - **Spark Core**:负责任务调度,使用DAG(有向无环图)来表示任务,通过TaskScheduler接口实现任务调度。内存管理采用Tachyon或HDFS作为持久化存储,容错机制基于检查点和事件日志。 - **Spark SQL**:提供了...

    Spark 核心思想与源码分析.7z

    《Spark核心思想与源码分析》是一份深入探讨Apache Spark技术的资料,旨在帮助读者理解Spark的内在工作原理,从而更好地应用和优化这个大数据处理框架。Spark作为一个分布式计算框架,以其高效、易用和可扩展性赢得...

    spark-2.2.0源码

    总结,Spark 2.2.0源码的分析,不仅可以帮助我们理解Spark的内在机制,还能指导我们在实际项目中更好地利用Spark进行大数据处理。通过对源码的深入学习,我们可以发现并解决性能瓶颈,实现更高效的数据处理和分析。...

Global site tag (gtag.js) - Google Analytics