`
wbj0110
  • 浏览: 1617726 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

Spark源码系列(二)RDD详解

阅读更多

1、什么是RDD?

上一章讲了Spark提交作业的过程,这一章我们要讲RDD。简单的讲,RDD就是Spark的input,知道input是啥吧,就是输入的数据。

RDD的全名是Resilient Distributed Dataset,意思是容错的分布式数据集,每一个RDD都会有5个特征:

1、有一个分片列表。就是能被切分,和hadoop一样的,能够切分的数据才能并行计算。

2、有一个函数计算每一个分片,这里指的是下面会提到的compute函数。

3、对其他的RDD的依赖列表,依赖还具体分为宽依赖和窄依赖,但并不是所有的RDD都有依赖。

4、可选:key-value型的RDD是根据哈希来分区的,类似于mapreduce当中的Paritioner接口,控制key分到哪个reduce。

5、可选:每一个分片的优先计算位置(preferred locations),比如HDFS的block的所在位置应该是优先计算的位置。

 对应着上面这几点,我们在RDD里面能找到这4个方法和1个属性,别着急,下面我们会慢慢展开说这5个东东。

复制代码
 //只计算一次     protected def getPartitions: Array[Partition]  //对一个分片进行计算,得出一个可遍历的结果  def compute(split: Partition, context: TaskContext): Iterator[T] //只计算一次,计算RDD对父RDD的依赖 protected def getDependencies: Seq[Dependency[_]] = deps  //可选的,分区的方法,针对第4点,类似于mapreduce当中的Paritioner接口,控制key分到哪个reduce   @transient val partitioner: Option[Partitioner] = None //可选的,指定优先位置,输入参数是split分片,输出结果是一组优先的节点位置 protected def getPreferredLocations(split: Partition): Seq[String] = Nil 
复制代码

2、多种RDD之间的转换

下面用一个实例讲解一下吧,就拿我们常用的一段代码来讲吧,然后会把我们常用的RDD都会讲到。

    val hdfsFile = sc.textFile(args(1))
    val flatMapRdd = hdfsFile.flatMap(s => s.split(" "))
    val filterRdd = flatMapRdd.filter(_.length == 2)
    val mapRdd = filterRdd.map(word => (word, 1))
    val reduce = mapRdd.reduceByKey(_ + _)

这里涉及到很多个RDD,textFile是一个HadoopRDD经过map后的MappredRDD,经过flatMap是一个FlatMappedRDD,经过filter方法之后生成了一个FilteredRDD,经过map函数之后,变成一个MappedRDD,通过隐式转换成 PairRDD,最后经过reduceByKey。

我们首先看textFile的这个方法,进入SparkContext这个方法,找到它。

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString)
}

看它的输入参数,path,TextInputFormat,LongWritable,Text,同志们联想到什么?写过mapreduce的童鞋都应该知道哈。

1、hdfs的地址

2、InputFormat的类型

3、Mapper的第一个类型

4、Mapper的第二类型

这就不难理解为什么立马就对hadoopFile后面加了一个map方法,取pair的第二个参数了,最后在shell里面我们看到它是一个MappredRDD了。

那么现在如果大家要用的不是textFile,而是一个别的hadoop文件类型,大家会不会使用hadoopFile来得到自己要得到的类型呢,不要告诉我不会哈,不会的赶紧回去复习mapreduce。

言归正传,默认的defaultMinPartitions的2太小了,我们用的时候还是设置大一点吧。

2.1 HadoopRDD

我们继续追杀下去,看看hadoopFile方法,里面我们看到它做了3个操作。

1、把hadoop的配置文件保存到广播变量里。

2、设置路径的方法

3、new了一个HadoopRDD返回

好,我们接下去看看HadoopRDD这个类吧,我们重点看看它的getPartitions、compute、getPreferredLocations。

先看getPartitions,它的核心代码如下:

    val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
    val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) {
      array(i) = new HadoopPartition(id, i, inputSplits(i))
    } 

它调用的是inputFormat自带的getSplits方法来计算分片,然后把分片HadoopPartition包装到到array里面返回。

这里顺便顺带提一下,因为1.0又出来一个NewHadoopRDD,它使用的是mapreduce新api的inputformat,getSplits就不要有minPartitions了,别的逻辑都是一样的,只是使用的类有点区别。

我们接下来看compute方法,它的输入值是一个Partition,返回是一个Iterator[(K, V)]类型的数据,这里面我们只需要关注2点即可。

1、把Partition转成HadoopPartition,然后通过InputSplit创建一个RecordReader

2、重写Iterator的getNext方法,通过创建的reader调用next方法读取下一个值。

复制代码
      // 转换成HadoopPartition val split = theSplit.asInstanceOf[HadoopPartition]
      logInfo("Input split: " + split.inputSplit)
      var reader: RecordReader[K, V] = null val jobConf = getJobConf()
      val inputFormat = getInputFormat(jobConf)
        context.stageId, theSplit.index, context.attemptId.toInt, jobConf) // 通过Inputform的getRecordReader来创建这个InputSpit的Reader reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) // 调用Reader的next方法 val key: K = reader.createKey()
      val value: V = reader.createValue()
      override def getNext() = { try {
          finished = !reader.next(key, value)
        } catch { case eof: EOFException => finished = true }
        (key, value)
      }
复制代码

从这里我们可以看得出来compute方法是通过分片来获得Iterator接口,以遍历分片的数据。

getPreferredLocations方法就更简单了,直接调用InputSplit的getLocations方法获得所在的位置。

2.2 依赖

下面我们看RDD里面的map方法

def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))

直接new了一个MappedRDD,还把匿名函数f处理了再传进去,我们继续追杀到MappedRDD。

复制代码
private[spark] class MappedRDD[U: ClassTag, T: ClassTag](prev: RDD[T], f: T => U) extends RDD[U](prev) {
  override def getPartitions: Array[Partition] = firstParent[T].partitions
  override def compute(split: Partition, context: TaskContext) = firstParent[T].iterator(split, context).map(f)
}
复制代码

MappedRDD把getPartitions和compute给重写了,而且都用到了firstParent[T],这个firstParent是何须人也?我们可以先点击进入RDD[U](prev)这个构造函数里面去。

def this(@transient oneParent: RDD[_]) = this(oneParent.context , List(new OneToOneDependency(oneParent)))

就这样你会发现它把RDD复制给了deps,HadoopRDD成了MappedRDD的父依赖了,这个OneToOneDependency是一个窄依赖,子RDD直接依赖于父RDD,继续看firstParent。

protected[spark] def firstParent[U: ClassTag] = {
  dependencies.head.rdd.asInstanceOf[RDD[U]]
}

由此我们可以得出两个结论:

1、getPartitions直接沿用了父RDD的分片信息

2、compute函数是在父RDD遍历每一行数据时套一个匿名函数f进行处理

好吧,现在我们可以理解compute函数真正是在干嘛的了

它的两个显著作用:

1、在没有依赖的条件下,根据分片的信息生成遍历数据的Iterable接口

2、在有前置依赖的条件下,在父RDD的Iterable接口上给遍历每个元素的时候再套上一个方法

我们看看点击进入map(f)的方法进去看一下

  def map[B](f: A => B): Iterator[B] = new AbstractIterator[B] {
    def hasNext = self.hasNext
    def next() = f(self.next())  }

看黄色的位置,看它的next函数,不得不说,写得真的很妙!

我们接着看RDD的flatMap方法,你会发现它和map函数几乎没什么区别,只是RDD变成了FlatMappedRDD,但是flatMap和map的效果还是差别挺大的。

比如((1,2),(3,4)), 如果是调用了flatMap函数,我们访问到的就是(1,2,3,4)4个元素;如果是map的话,我们访问到的就是(1,2),(3,4)两个元素。

有兴趣的可以去看看FlatMappedRDD和FilteredRDD这里就不讲了,和MappedRDD类似。

2.3 reduceByKey

前面的RDD转换都简单,可是到了reduceByKey可就不简单了哦,因为这里有一个同相同key的内容聚合的一个过程,所以它是最复杂的那一类。

那reduceByKey这个方法在哪里呢,它在PairRDDFunctions里面,这是个隐式转换,所以比较隐蔽哦,你在RDD里面是找不到的。

  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
    combineByKey[V]((v: V) => v, func, func, partitioner)
  }

它调用的是combineByKey方法,过程过程蛮复杂的,折叠起来,喜欢看的人看看吧。

复制代码
def combineByKey[C](createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null): RDD[(K, C)] = {

    val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) if (self.partitioner == Some(partitioner)) { // 一般的RDD的partitioner是None,这个条件不成立,即使成立只需要对这个数据做一次按key合并value的操作即可 self.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else if (mapSideCombine) { // 默认是走的这个方法,需要map端的combinber. val combined = self.mapPartitionsWithContext((context, iter) => {
        aggregator.combineValuesByKey(iter, context)
      }, preservesPartitioning = true)
      val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
        .setSerializer(serializer)
      partitioned.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context))
      }, preservesPartitioning = true)
    } else { // 不需要map端的combine,直接就来shuffle val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializer)
      values.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    }
  }
复制代码

按照一个比较标准的流程来看的话,应该是走的中间的这条路径,它干了三件事:

1、给每个分片的数据在外面套一个combineValuesByKey方法的MapPartitionsRDD。

2、用MapPartitionsRDD来new了一个ShuffledRDD出来。

3、对ShuffledRDD做一次combineCombinersByKey。

下面我们先看MapPartitionsRDD,我把和别的RDD有别的两行给拿出来了,很明显的区别,f方法是套在iterator的外边,这样才能对iterator的所有数据做一个合并。

 override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None override def compute(split: Partition, context: TaskContext) = f(context, split.index, firstParent[T].iterator(split, context)) }

 接下来我们看Aggregator的combineValuesByKey的方法吧。

复制代码
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
                         context: TaskContext): Iterator[(K, C)] = { // 是否使用外部排序,是由参数spark.shuffle.spill,默认是true if (!externalSorting) {
      val combiners = new AppendOnlyMap[K,C]
      var kv: Product2[K, V] = null val update = (hadValue: Boolean, oldValue: C) => { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
      } // 用map来去重,用update方法来更新值,如果没值的时候,返回值,如果有值的时候,通过mergeValue方法来合并 // mergeValue方法就是我们在reduceByKey里面写的那个匿名函数,在这里就是(_ + _) while (iter.hasNext) {
        kv = iter.next()
        combiners.changeValue(kv._1, update)
      }
      combiners.iterator
    } else { // 用了一个外部排序的map来去重,就不停的往里面插入值即可,基本原理和上面的差不多,区别在于需要外部排序  val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners) while (iter.hasNext) {
        val (k, v) = iter.next()
        combiners.insert(k, v)
      }
      combiners.iterator
}
复制代码

这个就是一个很典型的按照key来做合并的方法了,我们继续看ShuffledRDD吧。

ShuffledRDD和之前的RDD很明显的特征是

1、它的依赖传了一个Nil(空列表)进去,表示它没有依赖。

2、它的compute计算方式比较特别,这个在之后的文章说,过程比较复杂。

3、它的分片默认是采用HashPartitioner,数量和前面的RDD的分片数量一样,也可以不一样,我们可以在reduceByKey的时候多传一个分片数量即可。

在new完ShuffledRDD之后又来了一遍mapPartitionsWithContext,不过调用的匿名函数变成了combineCombinersByKey。

combineCombinersByKey和combineValuesByKey的逻辑基本相同,只是输入输出的类型有区别。combineCombinersByKey只是做单纯的合并,不会对输入输出的类型进行改变,combineValuesByKey会把iter[K, V]的V值变成iter[K, C]。

case class Aggregator[K, V, C] (
  createCombiner: V => C,
  mergeValue: (C, V) => C,
  mergeCombiners: (C, C) => C)
  ......
}

 这个方法会根据我们传进去的匿名方法的参数的类型做一个自动转换。

到这里,作业都没有真正执行,只是将RDD各种嵌套,我们通过RDD的id和类型的变化观测到这一点,RDD[1]->RDD[2]->RDD[3]......

3、其它RDD

平常我们除了从hdfs上面取数据之后,我们还可能从数据库里面取数据,那怎么办呢?没关系,有个JdbcRDD!

复制代码
    val rdd = new JdbcRDD(
      sc,
      () => { DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb") }, "SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?", 1, 100, 3,
      (r: ResultSet) => { r.getInt(1) } 
   ).cache()
复制代码

前几个参数大家都懂,我们重点说一下后面1, 100, 3是咋回事?

在这个JdbcRDD里面它默认我们是会按照一个long类型的字段对数据进行切分,(1,100)分别是最小值和最大值,3是分片的数量。

比如我们要一次查ID为1-1000,000的的用户,分成10个分片,我们就填(1, 1000,000, 10)即可,在sql语句里面还必须有"? <= ID AND ID <= ?"的句式,别尝试着自己造句哦!

最后是怎么处理ResultSet的方法,自己爱怎么处理怎么处理去吧。不过确实觉着用得不方便的可以自己重写一个RDD。

 

http://www.luobo360.com/article/136

 
分享到:
评论

相关推荐

    Spark源码剖析

    《Spark源码剖析》PDF 文件很可能会深入到这些技术细节,包括类结构、算法实现以及关键代码的解析,帮助读者更好地理解和优化 Spark 应用。通过深入学习 Spark 源码,开发者可以更好地掌握 Spark 内部工作原理,从而...

    spark最新源码以及二次开发教程

    《Spark最新源码与二次开发详解》 Spark作为一款开源的大数据处理框架,因其高效、易用和灵活性而备受开发者青睐。在Openfire环境中,Spark更扮演着关键的角色,为实时通讯提供支持。本教程旨在深入解析Spark的最新...

    Apache Spark源码走读之3 -- Task运行期之函数调用关系分析

    ### Apache Spark源码走读之3 -- Task运行期之函数调用关系分析 #### 概述 Apache Spark作为一款高效的大数据处理框架,在其内部有着复杂的任务调度与执行机制。本文将深入探讨Spark中Task执行期间的具体流程以及...

    spark源码阅读笔记(详)

    ### Spark源码解析要点 #### 一、Spark概述与特性 **Spark** 是一款由加州大学伯克利分校AMP实验室研发的数据处理框架,它极大简化了大数据应用的开发流程,支持多种编程语言如Java、Scala、Python和R,使得开发者...

    spark源码阅读笔记

    ### Spark源码阅读笔记 #### 一、Spark概述与特性 **Spark** 是一款由加州大学伯克利分校AMP实验室研发的数据处理框架,它极大简化了开发者编写并行应用程序的过程,使得用户能够在集群环境中轻松地运行自己的应用...

    xmqtt spark2.6.1 源码项目

    《Xmqtt Spark2.6.1 源码解析与即时通信详解》 在IT行业中,Spark和Xmqtt是两个非常关键的技术组件。Spark作为大数据处理领域的明星框架,以其高效、易用的特点深受开发者喜爱,而Xmqtt则是一款强大的MQTT(Message...

    Spark从入门到精通

    6、大量全网唯一的知识点:基于排序的wordcount,Spark二次排序,Spark分组取topn,DataFrame与RDD的两种转换方式,Spark SQL的内置函数、开窗函数、UDF、UDAF,Spark Streaming的Kafka Direct API、...

    Spark(1.6.13)源码

    在深入研究Spark源码时,我们可以关注以下几个关键点: - **SparkContext的初始化**:如何连接到集群并创建工作环境。 - **RDD的创建与转换**:理解如何定义和操作RDD,以及它们如何在内存和磁盘间移动。 - **Stage...

    spark-2.2.1.tar.gz 源码

    Spark源码中包含丰富的单元测试,便于开发者理解和验证代码。使用ScalaTest框架,开发者可以编写自己的测试用例。此外,Spark还支持本地模式、伪分布式模式和完全分布式模式进行调试。 5. **扩展与优化**: Spark...

    Spark技术内幕

    此外,源码详解部分会深入到Spark的内部机制,比如任务调度算法、内存管理策略和数据序列化等。 Spark的源码阅读对于深入理解其工作原理至关重要。读者可以从中了解到Spark如何实现高效的分布式计算,以及如何通过...

    Spark大数据中文分词统计Scala语言工程源码

    《Spark大数据中文分词统计Scala语言工程源码详解》 在大数据处理领域,Apache Spark以其高效、易用的特性成为了众多开发者的首选工具。而针对中文数据,分词是进行文本分析的重要步骤,尤其在诸如情感分析、关键词...

    learning-spark-examples-master

    《Spark编程实例详解》 "learning-spark-examples-master"这一项目主要涵盖了使用Apache Spark进行数据处理和分析的实例代码,旨在帮助学习者深入理解Spark的核心功能和使用方式。Spark作为一个快速、通用且可扩展...

    Spark各种demo学习

    理解Spark源码对于深入学习和优化性能至关重要。例如,了解DAGScheduler如何将任务转化为Stage,TaskScheduler如何调度任务到Worker节点,以及Shuffle过程中的数据分区和缓存策略。 四、大数据处理实战 1. 数据清洗...

    spark入门教程

    - **解决编译错误**:如果是因为64位系统导致的编译问题,可能需要重新编译Spark源码。 - **网络配置**:确保所有节点之间的网络通信畅通无阻,可以通过修改网络设置实现。 #### 七、虚拟机配置示例 - **安装...

    基于Spark的电影推荐系统源码.rar

    《基于Spark的电影推荐系统详解》 在当今的互联网时代,个性化推荐系统已经成为各大网站吸引用户、提升用户体验的重要工具。本项目以“懂你”电影网站为例,利用大数据过滤引擎,构建了一个基于Spark的电影推荐系统...

    Spark-SourceCode-Customization-spark source code

    《Spark源码定制详解》 Spark,作为大数据处理领域中的明星框架,以其高效、易用和灵活的特点赢得了广泛赞誉。然而,为了满足特定业务需求,有时我们需要深入理解其内部机制,甚至进行源码定制。本篇文章将围绕...

    精品--毕业设计源码-基于Spark的Kmeans聚类算法优化.zip

    【标题】:“精品--毕业设计源码-基于Spark的Kmeans聚类算法优化.zip” 【内容详解】: Spark是Apache软件基金会的一个开源大数据处理框架,它以高效、易用和可扩展性著称,尤其在大数据分析领域。Spark提供了一个...

    日常总结java + 大数据.zip

    AQS详解JVM 知识JVM 运行时数据区域JVM 垃圾回收JVM 内存溢出异常测试Java 新特性Java串行接口算法相关二叉树聚类算法图论拓扑排序算法深度优先遍历(DFS)和广度优先遍历(BFS)查找斐波那契查找排序快速排序归并排序堆...

Global site tag (gtag.js) - Google Analytics