`
wbj0110
  • 浏览: 1617593 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

Spark Core源码分析: RDD基础

阅读更多

RDD

 

RDD初始参数:上下文和一组依赖

Java代码  收藏代码
  1. abstract class RDD[T: ClassTag](  
  2.     @transient private var sc: SparkContext,  
  3.     @transient private var deps: Seq[Dependency[_]]  
  4.   ) extends Serializable  

 

以下需要仔细理清:

A list of Partitions

Function to compute split (sub RDD impl)

A list of Dependencies

Partitioner for K-V RDDs (Optional)

Preferred locations to compute each spliton (Optional)

 

Dependency

 

Dependency代表了RDD之间的依赖关系,即血缘

 

RDD中的使用

RDD给子类提供了getDependencies方法来制定如何依赖父类RDD

Java代码  收藏代码
  1. protected def getDependencies: Seq[Dependency[_]] = deps  

事实上,在获取first parent的时候,子类经常会使用下面这个方法

Java代码  收藏代码
  1. protected[spark] def firstParent[U: ClassTag] = {  
  2.   dependencies.head.rdd.asInstanceOf[RDD[U]]  
  3. }  

可以看到,Seq里的第一个dependency应该是直接的parent,从而从第一个dependency类里获得了rdd,这个rdd就是父RDD。

一般的RDD子类都会这么实现compute和getPartition方法,以SchemaRDD举例:

Java代码  收藏代码
  1. override def compute(split: Partition, context: TaskContext): Iterator[Row] =  
  2.     firstParent[Row].compute(split, context).map(_.copy())  
  3.   
  4. override def getPartitions: Array[Partition] = firstParent[Row].partitions  

compute()方法调用了第一个父类的compute,把结果RDD copy返回

getPartitions返回的就是第一个父类的partitions

 

下面看一下Dependency类及其子类的实现。

宽依赖和窄依赖

Java代码  收藏代码
  1. abstract class Dependency[T](val rdd: RDD[T]) extends Serializable  

Dependency里传入的rdd,就是父RDD本身。

继承结构如下:

 

NarrowDependency代表窄依赖,即父RDD的分区,最多被子RDD的一个分区使用。所以支持并行计算。

子类需要实现方法:

Java代码  收藏代码
  1. def getParents(partitionId: Int): Seq[Int]  

OneToOneDependency表示父RDD和子RDD的分区依赖是一对一的。

 

RangeDependency表示在一个range范围内,依赖关系是一对一的,所以初始化的时候会有一个范围,范围外的partitionId,传进去之后返回的是Nil。

下面介绍宽依赖。

Java代码  收藏代码
  1. class ShuffleDependency[K, V](  
  2.     @transient rdd: RDD[_ <: Product2[K, V]],  
  3.     val partitioner: Partitioner,  
  4.     val serializer: Serializer = null)  
  5.   extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {  
  6.   
  7.   // 上下文增量定义的Id  
  8.   val shuffleId: Int = rdd.context.newShuffleId()  
  9.   
  10.   // ContextCleaner的作用和实现在SparkContext章节叙述  
  11.   rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))  
  12. }  

宽依赖针对的RDD是KV形式的,需要一个partitioner指定分区方式(下一节介绍),需要一个序列化工具类,序列化工具目前的实现如下:

 

宽依赖和窄依赖对失败恢复时候的recompute有不同程度的影响,宽依赖可能是要全部计算的。

 

Partition

Partition具体表示RDD每个数据分区。

Partition提供trait类,内含一个index和hashCode()方法,具体子类实现与RDD子类有关,种类如下:

在分析每个RDD子类的时候再涉及。

Partitioner

Partitioner决定KV形式的RDD如何根据key进行partition

Java代码  收藏代码
  1. abstract class Partitioner extends Serializable {  
  2.   def numPartitions: Int // 总分区数  
  3.   def getPartition(key: Any): Int  
  4. }  

在ShuffleDependency里对应一个Partitioner,来完成宽依赖下,子RDD如何获取父RDD。

默认Partitioner

Partitioner的伴生对象提供defaultPartitioner方法,逻辑为:

传入的RDD(至少两个)中,遍历(顺序是partition数目从大到小)RDD,如果已经有Partitioner了,就使用。如果RDD们都没有Partitioner,则使用默认的HashPartitioner。而HashPartitioner的初始化partition数目,取决于是否设置了spark.default.parallelism,如果没有的话就取RDD中partition数目最大的值。

如果上面这段文字看起来费解,代码如下:

Java代码  收藏代码
  1. def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {  
  2.   val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse  
  3.   for (r <- bySize if r.partitioner.isDefined) {  
  4.     return r.partitioner.get  
  5.   }  
  6.   if (rdd.context.conf.contains("spark.default.parallelism")) {  
  7.     new HashPartitioner(rdd.context.defaultParallelism)  
  8.   } else {  
  9.     new HashPartitioner(bySize.head.partitions.size)  
  10.   }  
  11. }  


HashPartitioner

HashPartitioner基于java的Object.hashCode。会有个问题是Java的Array有自己的hashCode,不基于Array里的内容,所以RDD[Array[_]]或RDD[(Array[_], _)]使用HashPartitioner会有问题。

 

顾名思义,getPartition方法实现如下

Java代码  收藏代码
  1. def getPartition(key: Any): Int = key match {  
  2.   case null => 0  
  3.   case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)  
  4. }  

RangePartitioner

RangePartitioner处理的KV RDD要求Key是可排序的,即满足Scala的Ordered[K]类型。所以它的构造如下:

Java代码  收藏代码
  1. class RangePartitioner[K <% Ordered[K]: ClassTag, V](  
  2.     partitions: Int,  
  3.     @transient rdd: RDD[_ <: Product2[K,V]],  
  4.     private val ascending: Boolean = true)  
  5.   extends Partitioner {  

内部会计算一个rangBounds(上界),在getPartition的时候,如果rangBoundssize小于1000,则逐个遍历获得;否则二分查找获得partitionId。

Persist

默认cache()过程是将RDD persist在内存里,persist()操作可以为RDD重新指定StorageLevel,

Java代码  收藏代码
  1. class StorageLevel private(  
  2.     private var useDisk_ : Boolean,  
  3.     private var useMemory_ : Boolean,  
  4.     private var useOffHeap_ : Boolean,  
  5.     private var deserialized_ : Boolean,  
  6.     private var replication_ : Int = 1)  

Java代码  收藏代码
  1. object StorageLevel {  
  2.   val NONE = new StorageLevel(falsefalsefalsefalse)  
  3.   val DISK_ONLY = new StorageLevel(truefalsefalsefalse)  
  4.   val DISK_ONLY_2 = new StorageLevel(truefalsefalsefalse2)  
  5.   val MEMORY_ONLY = new StorageLevel(falsetruefalsetrue)  
  6.   val MEMORY_ONLY_2 = new StorageLevel(falsetruefalsetrue2)  
  7.   val MEMORY_ONLY_SER = new StorageLevel(falsetruefalsefalse)  
  8.   val MEMORY_ONLY_SER_2 = new StorageLevel(falsetruefalsefalse2)  
  9.   val MEMORY_AND_DISK = new StorageLevel(truetruefalsetrue)  
  10.   val MEMORY_AND_DISK_2 = new StorageLevel(truetruefalsetrue2)  
  11.   val MEMORY_AND_DISK_SER = new StorageLevel(truetruefalsefalse)  
  12.   val MEMORY_AND_DISK_SER_2 = new StorageLevel(truetruefalsefalse2)  
  13.   val OFF_HEAP = new StorageLevel(falsefalsetruefalse// Tachyon  

RDD的persist()和unpersist()操作,都是由SparkContext执行的(SparkContext的persistRDD和unpersistRDD方法)。

 

Persist过程是把该RDD存在上下文的TimeStampedWeakValueHashMap里维护起来。也就是说,其实persist并不是action,并不会触发任何计算。

Unpersist过程如下,会交给SparkEnv里的BlockManager处理。

Java代码  收藏代码
  1. private[spark] def unpersistRDD(rddId: Int, blocking: Boolean = true) {  
  2.   env.blockManager.master.removeRdd(rddId, blocking)  
  3.   persistentRdds.remove(rddId)  
  4.   listenerBus.post(SparkListenerUnpersistRDD(rddId))  
  5. }  

Checkpoint

RDD Actions api里提供了checkpoint()方法,会把本RDD save到SparkContext CheckpointDir

目录下。建议该RDD已经persist在内存中,否则需要recomputation。

 

如果该RDD没有被checkpoint过,则会生成新的RDDCheckpointData。RDDCheckpointData类与一个RDD关联,记录了checkpoint相关的信息,并且记录checkpointRDD的一个状态,

[ Initialized --> marked for checkpointing--> checkpointing in progress --> checkpointed ]

内部有一个doCheckpoint()方法(会被下面调用)。

执行逻辑

真正的checkpoint触发,在RDD私有方法doCheckpoint()里。doCheckpoint()会被DAGScheduler调用,且是在此次job里使用这个RDD完毕之后,此时这个RDD就已经被计算或者物化过了。可以看到,会对RDD的父RDD进行递归。

Java代码  收藏代码
  1. private[spark] def doCheckpoint() {  
  2.   if (!doCheckpointCalled) {  
  3.     doCheckpointCalled = true  
  4.     if (checkpointData.isDefined) {  
  5.       checkpointData.get.doCheckpoint()  
  6.     } else {  
  7.       dependencies.foreach(_.rdd.doCheckpoint())  
  8.     }  
  9.   }  
  10. }  

RDDCheckpointData的doCheckpoint()方法关键代码如下:
Java代码  收藏代码
  1. // Create the output path for the checkpoint  
  2. val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)  
  3. val fs = path.getFileSystem(rdd.context.hadoopConfiguration)  
  4. if (!fs.mkdirs(path)) {  
  5.   throw new SparkException("Failed to create checkpoint path " + path)  
  6. }  
  7.   
  8. // Save to file, and reload it as an RDD  
  9. val broadcastedConf = rdd.context.broadcast(  
  10.   new SerializableWritable(rdd.context.hadoopConfiguration))  
  11. // 这次runJob最终调的是dagScheduler的runJob  
  12. rdd.context.runJob(rdd,   
  13. CheckpointRDD.writeToFile(path.toString, broadcastedConf) _)  
  14. // 此时rdd已经记录到磁盘上  
  15. val newRDD = new CheckpointRDD[T](rdd.context, path.toString)  
  16. if (newRDD.partitions.size != rdd.partitions.size) {  
  17.   throw new SparkException("xxx")  
  18. }  

runJob最终调的是dagScheduler的runJob。做完后,生成一个CheckpointRDD。

具体CheckpointRDD相关内容可以参考其他章节。

API

子类需要实现的方法

Java代码  收藏代码
  1. // 计算某个分区  
  2. def compute(split: Partition, context: TaskContext): Iterator[T]  
  3.   
  4. protected def getPartitions: Array[Partition]  
  5. // 依赖的父RDD,默认就是返回整个dependency序列  
  6. protected def getDependencies: Seq[Dependency[_]] = deps  
  7.   
  8. protected def getPreferredLocations(split: Partition): Seq[String] = Nil  

Transformations

略。
 

Actions

略。
 
 

SubRDDs

部分RDD子类的实现分析,包括以下几个部分:

1)子类本身构造参数

2)子类的特殊私有变量

3)子类的Partitioner实现

4)子类的父类函数实现

Java代码  收藏代码
  1. def compute(split: Partition, context: TaskContext): Iterator[T]  
  2. protected def getPartitions: Array[Partition]  
  3. protected def getDependencies: Seq[Dependency[_]] = deps  
  4. protected def getPreferredLocations(split: Partition): Seq[String] = Nil  

CheckpointRDD

Java代码  收藏代码
  1. class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)  
  2.   extends RDD[T](sc, Nil)  

CheckpointRDDPartition继承自Partition,没有什么增加。

有一个被广播的hadoop conf变量,在compute方法里使用(readFromFile的时候用)

Java代码  收藏代码
  1. val broadcastedConf = sc.broadcast(  
  2. new SerializableWritable(sc.hadoopConfiguration))  

getPartitions: Array[Partition]方法:

根据checkpointPath去查看Path下有多少个partitionFile,File个数为partition数目。getPartitions方法返回的Array[Partition]内容为New CheckpointRDDPartition(i),i为[0, 1, …, partitionNum]

 

getPreferredLocations(split:Partition): Seq[String]方法:

文件位置信息,借助hadoop core包,获得block location,把得到的结果按照host打散(flatMap)并过滤掉localhost,返回。

 

compute(split: Partition, context:TaskContext): Iterator[T]方法:

调用CheckpointRDD.readFromFile(filebroadcastedConf,context)方法,其中file为hadoopfile path,conf为广播过的hadoop conf。

Hadoop文件读写及序列化

伴生对象提供writeToFile方法和readFromFile方法,主要用于读写hadoop文件,并且利用env下的serializer进行序列化和反序列化工作。两个方法具体实现如下:

Java代码  收藏代码
  1. def writeToFile[T](  
  2.  path: String,  
  3.  broadcastedConf: Broadcast[SerializableWritable[Configuration]],  
  4.  blockSize: Int = -1  
  5. )(ctx: TaskContext, iterator: Iterator[T]) {  

创建hadoop文件的时候会若存在会抛异常。把hadoop的outputStream放入serializer的stream里,serializeStream.writeAll(iterator)写入。

 

writeToFile的调用在RDDCheckpointData类的doCheckpoint方法里,如下:

Java代码  收藏代码
  1. rdd.context.runJob(rdd,   
  2. CheckpointRDD.writeToFile(path.toString, broadcastedConf) _)  

Java代码  收藏代码
  1. def readFromFile[T](  
  2.   path: Path,  
  3.   broadcastedConf: Broadcast[SerializableWritable[Configuration]],  
  4.   context: TaskContext  
  5. ): Iterator[T] = {  

打开Hadoop的inutStream,读取的时候使用env下的serializer得到反序列化之后的流。返回的时候,DeserializationStream这个trait提供了asIterator方法,每次next操作可以进行一次readObject。

在返回之前,调用了TaskContext提供的addOnCompleteCallback回调,用于关闭hadoop的inputStream。

NewHadoopRDD

Java代码  收藏代码
  1. class NewHadoopRDD[K, V](  
  2.     sc : SparkContext,  
  3.     inputFormatClass: Class[_ <: InputFormat[K, V]],  
  4.     keyClass: Class[K],  
  5.     valueClass: Class[V],  
  6.     @transient conf: Configuration)  
  7.   extends RDD[(K, V)](sc, Nil)  
  8.   with SparkHadoopMapReduceUtil  

Java代码  收藏代码
  1. private[spark] class NewHadoopPartition(  
  2.     rddId: Int,  
  3.     val index: Int,  
  4.     @transient rawSplit: InputSplit with Writable)  
  5.   extends Partition {  
  6.   
  7.   val serializableHadoopSplit = new SerializableWritable(rawSplit)  
  8.   
  9.   override def hashCode(): Int = 41 * (41 + rddId) + index  
  10. }  

getPartitions操作:

根据inputFormatClass和conf,通过hadoop InputFormat实现类的getSplits(JobContext)方法得到InputSplits。(ORCFile在此处的优化)

这样获得的split同RDD的partition直接对应。

 

compute操作:

针对本次split(partition),调用InputFormat的createRecordReader(split)方法,

得到RecordReader<K,V>。这个RecordReader包装在Iterator[(K,V)]类内,复写Iterator的next()和hasNext方法,让compute返回的InterruptibleIterator[(K,V)]能够被迭代获得RecordReader取到的数据。

 

getPreferredLocations(split: Partition)操作:

Java代码  收藏代码
  1. theSplit.serializableHadoopSplit.value.getLocations.filter(_ != "localhost")  

在NewHadoopPartition里SerializableWritable将split序列化,然后调用InputSplit本身的getLocations接口,得到有数据分布节点的nodes name列表。

 

WholeTextFileRDD

NewHadoopRDD的子类

Java代码  收藏代码
  1. private[spark] class WholeTextFileRDD(  
  2.     sc : SparkContext,  
  3.     inputFormatClass: Class[_ <: WholeTextFileInputFormat],  
  4.     keyClass: Class[String],  
  5.     valueClass: Class[String],  
  6.     @transient conf: Configuration,  
  7.     minSplits: Int)  
  8.   extends NewHadoopRDD[String, String](sc, inputFormatClass, keyClass, valueClass, conf) {  

复写了getPartitions方法:

NewHadoopRDD有自己的inputFormat实现类和recordReader实现类。在spark/input package下专门写了这两个类的实现。感觉是种参考。

 

InputFormat

WholeTextFileRDD在spark里实现了自己的inputFormat。读取的File以K,V的结构获取,K为path,V为整个file的content。

 

复写createRecordReader以使用WholeTextFileRecordReader

 

复写setMaxSplitSize方法,由于用户可以传入minSplits数目,计算平均大小(splits files总大小除以split数目)的时候就变了。

 

RecordReader

复写nextKeyValue方法,会读出指定path下的file的内容,生成new Text()给value,结果是String。如果文件正在被别的进行打开着,会返回false。否则把file内容读进value里。

 

使用场景

在SparkContext下提供wholeTextFile方法,

Java代码  收藏代码
  1. def wholeTextFiles(path: String, minSplits: Int = defaultMinSplits):  
  2.   RDD[(String, String)]  

用于读取一个路径下的所有text文件,以K,V的形式返回,K为一个文件的path,V为文件内容。比较适合小文件。

http://jgsj.iteye.com/blog/2050688

分享到:
评论

相关推荐

    深入理解Spark:核心思想及源码分析.pdf

    通过阅读《深入理解Spark:核心思想及源码分析》,读者将能够全面了解Spark的工作原理,提升解决实际问题的能力,并为成为Spark专家奠定坚实基础。对于希望在大数据领域深化技术理解的开发者,这本书无疑是宝贵的...

    Spark源码分析.pdf

    《Spark源码分析》这本书是针对那些希望深入了解大数据处理框架Spark以及与其紧密相关的Hadoop技术的专业人士所编写的。Spark作为一个快速、通用且可扩展的数据处理引擎,已经在大数据领域占据了重要地位,而深入...

    Spark core 源码解读与扩展

    在深入分析Spark Core的源码和架构时,我们可以更好地理解其内部工作机制,并能够在基础上进行扩展以满足特定的大数据处理需求。 ### Spark Core 源码解读 Spark Core源码的解读涉及到理解以下几个关键部分: 1. ...

    spark-2.4.0源码

    它提供了RDD(弹性分布式数据集)这一基础抽象,使得数据处理可以并行化且具有容错性。 2. **Spark SQL**:Spark SQL是Spark用于结构化数据处理的模块,它可以与传统SQL数据库进行交互,并支持DataFrame和Dataset ...

    spark-2.2.0源码

    通过阅读和分析Spark 2.2.0的源码,开发者不仅可以深入理解Spark的工作原理,还能学习到如何设计和实现大规模分布式系统,这对于提升大数据处理技能和解决实际问题有着极大的帮助。同时,这也是一个极好的学习分布式...

    spark源码分析

    《Spark源码分析》 Spark,作为大数据处理领域的重要框架,以其高效、易用和弹性伸缩等特性,被广泛应用于数据处理、机器学习和实时流处理等多个场景。本资料将深入探讨Spark的核心思想和源码,帮助读者从底层原理...

    Spark-2.3.1源码解读

    Spark Core源码阅读 Spark Context 阅读要点 Spark的缓存,变量,shuffle数据等清理及机制 Spark-submit关于参数及部署模式的部分解析 GroupByKey VS ReduceByKey OrderedRDDFunctions那些事 高效使用...

    spark2.2.0源码包(含分析文档),包含机器学习mlib 及ml

    7. **源码解析**:源码分析文档可能详细讲解了Spark的各个组件是如何协同工作的,包括RDD的生命周期、调度器的工作方式、DataFrame/Dataset的优化策略、机器学习算法的实现细节等。 对于机器学习开发者而言,MLlib...

    Spark github源码 例子很有价值

    1. **Core Spark**:包括RDD(弹性分布式数据集)的实现,这是Spark的基础抽象,提供了并行计算的能力。 2. **Spark SQL**:Spark对SQL查询的支持,它允许用户使用SQL语句处理数据,并与DataFrame和Dataset接口相...

    spark2.3源码下载

    5. **弹性分布式数据集(RDD)改进**:虽然DataFrame和Dataset逐渐成为主流,但RDD作为Spark的基础,也在2.3.0中得到了优化,如更好的错误恢复机制和内存使用效率。 6. **性能优化**:通过Tungsten项目,Spark ...

    Spark 核心思想与源码分析.7z

    《Spark核心思想与源码分析》是一份深入探讨Apache Spark技术的资料,旨在帮助读者理解Spark的内在工作原理,从而更好地应用和优化这个大数据处理框架。Spark作为一个分布式计算框架,以其高效、易用和可扩展性赢得...

    深入理解Spark 核心思想与源码分析

    源码分析方面,Spark的设计遵循模块化的架构,主要包括以下几个部分: 1. **Spark Core**:提供基本的分布式计算框架,包括DAG调度、任务调度、内存管理、错误恢复和集群资源管理。 2. **Spark SQL**:Spark的SQL...

    spark 优秀资源源码(个人整理)

    源码分析可以帮助我们深入理解其内部运行机制,例如DAG(有向无环图)调度、RDD(弹性分布式数据集)的创建与操作、Stage划分以及Task执行等。 2. Spark与ETL的整合: ETL是数据仓库系统中的关键步骤,Spark提供了...

    spark 3.2.1 源码下载

    源码下载是开发者深入理解Spark工作原理、调试代码或定制功能的基础步骤。获取Spark 3.2.1源码,你可以直接访问Apache Spark的官方网站或者通过Git仓库来下载。下载完成后,解压文件,你会得到名为"spark-3.2.1"的...

    spark(svn自动下载)源码

    Spark源码分析首先需要了解其核心组件和架构。Spark主要由以下几个部分组成: 1. **Spark Core**:这是Spark的基础,提供分布式任务调度、内存管理、错误恢复以及与其他存储系统交互的能力。它定义了RDD(弹性...

    spark-2.2.0.tgz源码

    2. **RDD(弹性分布式数据集)**:Spark 2.2.0中的关键概念是RDD,它是Spark的基础数据抽象,是一个不可变、分区的数据集合,可以在集群中的多个节点上并行操作。RDD具有容错性,如果某个分区丢失,可以通过其他分区...

    spark高级数据分析源代码

    1. Spark Core:Spark的基础组件,提供了分布式任务调度、内存管理、错误恢复等基础功能。 2. Spark SQL:Spark的SQL接口,可以处理结构化和半结构化数据,与Hive兼容,支持DataFrame和Dataset API。 3. Spark ...

    Spark2.6.3源码

    1. **core**:这是Spark的基础模块,包含了任务调度、内存管理、容错机制以及与Hadoop交互的关键组件。例如,`RDD`(弹性分布式数据集)的定义和操作,`DAGScheduler`和`TaskScheduler`的调度策略,`Storage`模块中的...

    spark1.0.0源码

    通过分析Spark 1.0.0的源码,开发者不仅可以了解Spark的工作原理,还可以学习到分布式计算、内存管理和容错设计等方面的精髓,这对于深入理解和优化Spark应用非常有帮助。同时,这也为后续版本的更新和升级提供了...

    spark-2.4.4-bin-hadoop2.6.tgz

    - **Spark Core**:基础执行引擎,负责任务调度、内存管理、故障恢复等。 - **Spark SQL**:提供SQL和DataFrame/Dataset API,用于结构化和半结构化数据处理,与Hive兼容。 - **Spark Streaming**:处理连续数据...

Global site tag (gtag.js) - Google Analytics