RDD的依赖关系
Rdd之间的依赖关系通过rdd中的getDependencies来进行表示,
在提交job后,会通过在 DAGShuduler.submitStage-->getMissingParentStages
privatedef getMissingParentStages(stage: Stage): List[Stage] = {
valmissing = new HashSet[Stage]
valvisited = new HashSet[RDD[_]]
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {
visited += rdd
if (getCacheLocs(rdd).contains(Nil)) {
for (dep <- rdd.dependencies) {
depmatch {
caseshufDep: ShuffleDependency[_,_] =>
valmapStage = getShuffleMapStage(shufDep, stage.jobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
casenarrowDep: NarrowDependency[_] =>
visit(narrowDep.rdd)
}
}
}
}
}
visit(stage.rdd)
missing.toList
}
在以上代码中得到rdd的相关dependencies,每一个rdd生成时传入rdd的dependencies信息。
如SparkContext.textFile,时生成的HadoopRDD时。此RDD的默认为dependencys为Nil.
Nil是一个空的列表。
class HadoopRDD[K, V](
sc: SparkContext,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int)
extends RDD[(K, V)](sc, Nil) with Logging {
Dependency分为ShuffleDependency与NarrowDependency。
其中NarrowDependency又包含OneToOneDependency/RangeDependency
Dependency唯一的成员就是rdd, 即所依赖的rdd, 或parent rdd
abstractclass Dependency[T](valrdd: RDD[T]) extends Serializable
OneToOneDependency关系:
最简单的依赖关系, 即parent和child里面的partitions是一一对应的, 典型的操作就是map, filter
其实partitionId就是partition在RDD中的序号, 所以如果是一一对应,
那么parent和child中的partition的序号应该是一样的,如下是OneToOneDependency的定义
/**
* Represents a one-to-one dependency between partitions of the parent and child RDDs.
*/
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
此类的Dependency中parent中的partitionId与childRDD中的partitionId是一对一的关系。
也就是partition本身范围不会改变, 一个parition经过transform还是一个partition,
虽然内容发生了变化, 所以可以在local完成,此类场景通常像mapreduce中只有map的场景,
第一个RDD执行完成后的MAP的parition直接运行第二个RDD的Map,也就是local执行。
overridedef getParents(partitionId: Int) = List(partitionId)
}
RangeDependency关系:
此类应用虽然仍然是一一对应, 但是是parent RDD中的某个区间的partitions对应到child RDD中的某个区间的partitions
典型的操作是union, 多个parent RDD合并到一个child RDD, 故每个parent RDD都对应到child RDD中的一个区间
需要注意的是, 这里的union不会把多个partition合并成一个partition, 而是的简单的把多个RDD中的partitions放到一个RDD里面, partition不会发生变化,
rdd参数,parentRDD
inStart参数,parentRDD的partitionId计算的起点位置。
outStart参数,childRDD中计算parentRDD的partitionId的起点位置,
length参数,parentRDD中partition的个数。
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) {
overridedef getParents(partitionId: Int) = {
检查partitionId的合理性,此partitionId在childRDD的partitionId中的范围需要合理。
if (partitionId >= outStart && partitionId < outStart + length) {
计算出ParentRDD的partitionId的值。
List(partitionId - outStart + inStart)
} else {
Nil
}
}
}
典型的应用场景union的场景把两个RDD合并到一个新的RDD中。
def union(other: RDD[T]): RDD[T] = new UnionRDD(sc, Array(this, other))
使用union的, 第二个参数是, 两个RDD的array, 返回值就是把这两个RDD union后产生的新的RDD
ShuffleDependency关系:
此类依赖首先要求是Product2与PairRDDFunctions的k,v的形式,这样才能做shuffle,和hadoop一样。
其次, 由于需要shuffle, 所以当然需要给出partitioner,默认是HashPartitioner 如何完成shuffle
然后, shuffle不象map可以在local进行, 往往需要网络传输或存储, 所以需要serializerClass
默认是JavaSerializer,一个类名,用于序列化网络传输或者以序列化形式缓存起来的各种对象。
默认情况下Java的序列化机制可以序列化任何实现了Serializable接口的对象,
但是速度是很慢的,
因此当你在意运行速度的时候我们建议你使用spark.KryoSerializer 并且配置 Kryo serialization。
可以是任何spark.Serializer的子类。
最后, 每个shuffle需要分配一个全局的id, context.newShuffleId()的实现就是把全局id累加
class ShuffleDependency[K, V](
@transient rdd: RDD[_ <: Product2[K, V]],
valpartitioner: Partitioner,
valserializerClass: String = null)
extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {
valshuffleId: Int = rdd.context.newShuffleId()
}
生成RDD过程分析
生成rdd我们还是按wordcount中的例子来说明;
val file = sc.textFile("/hadoop-test.txt")
valcounts = file.flatMap(line => line.split(" "))
.map(word => (word, 1)).reduceByKey(_ + _)
counts.saveAsTextFile("/newtest.txt")
1.首先SparkContext.textFile通过调用hadoopFile生成HadoopRDD实例,
textFile-->hadoopFile-->HadoopRDD,此时RDD的Dependency为Nil,一个空的列表。
此时的HadoopRDD为RDD<K,V>,每执行next方法时返回一个Pair,也就是一个KV(通过compute函数)
2.textFile得到HadoopRDD后,调用map函数,
map中每执行一次得到一个KV(compute中getNext,new NextIterator[(K, V)] ),
取出value的值并toString,生成MappedRDD<String>。此RDD的上层RDD就是1中生成的RDD。
同时此RDD的Dependency为OneToOneDependency。
def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minSplits).map(pair => pair._2.toString)
}
def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
以上代码中传入的this其实就是1中生成的HadoopRDD.
3.flatMap函数,把2中每一行输出通过一定的条件修改成0到多个新的item.生成FlatMappedRDD实例,
同时根据implicit隐式转换生成PairRDDFunctions。下面两处代码中的红色部分。
在生成FlatMappedRDD是,此时的上一层RDD就是2中生成的RDD。
同时此RDD的Dependency为OneToOneDependency。
class FlatMappedRDD[U: ClassTag, T: ClassTag](
prev: RDD[T],
f: T => TraversableOnce[U])
extends RDD[U](prev)
implicitdef rddToPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) =
new PairRDDFunctions(rdd)
4.map函数,由于3中生成的FlatMappedRDD生成出来的结果,通过implicit的隐式转换生成PairRDDFunctions。
此时的map函数需要生成隐式转换传入的RDD<K,V>的一个RDD,
因此map函数的执行需要生成一个MappedRDD<K,V> 的RDD,同时此RDD的Dependency为OneToOneDependency。
以下代码的红色部分。 ---RDD[(K, V)]。。
valcounts = file.flatMap(line => line.split(" "))
.map(word => (word, 1)).reduceByKey(_ + _)
5.reduceByKey函数,此函数通过implicit的隐式转换中的函数来进行,主要是传入一个计算两个value的函数。
reduceByKey这类的shuffle的RDD时,最终生成一个ShuffleRDD,
此RDD生成的Dependency为ShuffleDependency。
具体说明在下面的reduceByKey代码中,
首先在每一个map生成MapPartitionsRDD把各partitioner中的数据通过进行合并。合并通过Aggregator实例。
最后通过对合并后的MapPartitionsRDD,此RDD相当于mapreduce中的combiner,生成ShuffleRDD.
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
reduceByKey(defaultPartitioner(self), func)
}
def combineByKey[C](createCombiner: V => C,//创建combiner,通过V的值创建C
mergeValue: (C, V) => C,//combiner已经创建C已经有一个值,把第二个的V叠加到C中,
mergeCombiners: (C, C) => C,//把两个C进行合并,其实就是两个value的合并。
partitioner: Partitioner,//Shuffle时需要的Partitioner
mapSideCombine: Boolean = true,//为了减小传输量, 很多combine可以在map端先做,
比如叠加, 可以先在一个partition中把所有相同的key的value叠加, 再shuffle
serializerClass: String = null): RDD[(K, C)] = {
if (getKeyClass().isArray) {
if (mapSideCombine) {
thrownew SparkException("Cannot use map-side combining with array keys.")
}
if (partitioner.isInstanceOf[HashPartitioner]) {
thrownew SparkException("Default partitioner cannot partition array keys.")
}
}
生成一个Aggregator实例。
valaggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
如果RDD本身的partitioner与传入的partitioner相同,表示不需要进行shuffle
if (self.partitioner == Some(partitioner)) {
生成MapPartitionsRDD,直接在map端当前的partitioner下调用Aggregator.combineValuesByKey。
把相同的key的value进行合并。
self.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} elseif (mapSideCombine) {
生成MapPartitionsRDD,先在map端当前的partitioner下调用Aggregator.combineValuesByKey。
把相同的key的value进行合并。
combineValuesByKey中检查如果key对应的C如果不存在,通过createCombiner创建C,
否则key已经存在C时,通过mergeValue把新的V与上一次的C进行合并,
mergeValue其实就是传入的reduceByKey(_ + _) 括号中的函数,与reduce端函数相同。
valcombined = self.mapPartitionsWithContext((context, iter) => {
aggregator.combineValuesByKey(iter, context)
}, preservesPartitioning = true)
生成 ShuffledRDD,进行shuffle操作,因为此时会生成ShuffleDependency,重新生成一个新的stage.
valpartitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
.setSerializer(serializerClass)
在上一步完成,也就是shuffle完成,重新在reduce端进行合并操作。通过Aggregator.combineCombinersByKey
spark这些地方的方法定义都是通过动态加载执行的函数的方式,所以可以做到map端执行完成后reduce再去执行后续的处理。
因为函数在map时只是进行了定义,reduce端才对函数进行执行。
partitioned.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context))
}, preservesPartitioning = true)
} else {
不执行map端的合并操作,直接shuffle,并在reduce中执行合并。
// Don't apply map-side combiner.
valvalues = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
values.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
}
}
相关推荐
最后,文档对 Spark 中的 RDD 依赖关系进行了深入讨论,包括 RDD 的定义、依赖关系和签名机制。作者还介绍了 Refresher 机制,即通过对 RDD 依赖关系的管理来实现检查点机制。 本文档详细介绍了 Apache Spark 中的...
上述代码中的RDD依赖关系决定了Stage的划分: - dateRDD到reduceRDD是窄依赖,可以进行流水线处理。 - reduceRDD到addOneRDD也是窄依赖。 - addOneRDD到groupRDD是宽依赖,因为groupByKey会导致数据的Shuffle操作。 ...
- **Lineage机制**:Lineage是Spark用来追踪RDD依赖关系的一种机制,它可以帮助Spark在发生失败时进行高效的恢复。深入理解Lineage机制有助于提高程序的容错能力和性能。 - **Stage划分**:在Spark作业执行过程中,...
窄依赖和宽依赖是RDD依赖关系的分类,窄依赖允许高效计算,而宽依赖可能需要更复杂的调度和数据重分布。 7. Hadoop和Spark各有优势,Hadoop擅长批处理,而Spark更适合交互式分析和实时处理。Spark的Stage和TaskSet...
讲解RDD 依赖关系,包括窄依赖、宽依赖; 解析Spark 中DAG 逻辑视图;对RDD 内部的计算机制及计算过程进行深度解析;讲解Spark RDD 容错原理及其四大核心要点解析对Spark RDD 中Runtime 流程进行解析;通过一个...
- **RDD依赖关系**:RDD之间的依赖关系定义了任务调度的拓扑结构。 - **DAG生成和Stage划分**:Spark将用户代码转换为DAG,再根据数据依赖划分成Stage,每个Stage对应一次磁盘或网络IO操作。 - **累加器和广播...
在Spark中,RDD的函数传递和依赖关系是理解Spark计算模型的关键。 在本篇【SparkCore篇04】中,我们讨论了如何在RDD操作中处理函数传递以及与此相关的序列化问题。在Spark中,用户自定义的函数(如`Searcher`类中的...
- **宽依赖和窄依赖**:优化RDD依赖关系以减少数据重排的开销。 - **持久化策略**:选择合适的RDD持久化级别,如MEMORY_ONLY、DISK_ONLY、MEMORY_AND_DISK等。 7. **Spark部署模式**:介绍如何在本地模式、集群...
当数据丢失,系统可以通过重新计算丢失的RDD依赖关系恢复。 6. ** Shuffle过程**:Shuffle是Spark中数据重新分布的过程,涉及到网络传输和磁盘I/O。源码中优化了HashShuffleManager和SortShuffleManager,以降低...
8. **Stage**: Job的逻辑分组,由DAGScheduler根据RDD依赖关系划分。 9. **DAGScheduler**: 构建基于Stage的DAG,并提交TaskScheduler。 10. **Task Scheduler**: 负责Task的分配和执行。 Spark运行流程包括: 1. ...
这种依赖关系定义了RDD的转换历史,使得当某个RDD的计算出错时,可以通过重做依赖它的RDD的计算来恢复。 4. **分区器(可选)**:对于键值对类型的RDD,可以设置分区器来决定数据如何分布到各个分区,通常用于优化...
2. **基于Lineage的高效容错机制**:当某个节点上的RDD分区出现故障时,Spark能够根据RDD的依赖关系(即“血统”或“Lineage”),从最近的一个可靠的父RDD重新计算丢失的数据分区,而无需从头开始计算整个数据集。...
- **RDD之间的依赖关系**:当对RDD进行转换操作时,会产生新的RDD实例,这些新旧RDD之间形成一种依赖关系。这种依赖关系有助于在部分数据丢失时,仅重新计算丢失的部分而非整个数据集。 - **分片函数(Partitioner...
依赖是指RDD之间的血缘关系,新RDD记录了生成它的旧RDD的转换过程。 RDD的血缘关系分为窄依赖和宽依赖。窄依赖是上游RDD的一个分区只被下游RDD的一个分区使用,这样的依赖结构使得并行计算更高效。而宽依赖则是下游...
6. **宽依赖与窄依赖**:RDD之间的依赖关系分为两种类型。窄依赖是指每个父RDD分区最多被一个子RDD分区使用,这样的依赖易于并行化。宽依赖则是指一个子RDD分区可能依赖所有父RDD分区,这可能导致shuffle操作,影响...