`
wbj0110
  • 浏览: 1602723 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

Apache Spark源码走读之3 -- Task运行期之函数调用关系分析

阅读更多

概要

本篇主要阐述在TaskRunner中执行的task其业务逻辑是如何被调用到的,另外试图讲清楚运行着的task其输入的数据从哪获取,处理的结果返回到哪里,如何返回。

准备

  1. spark已经安装完毕
  2. spark运行在local mode或local-cluster mode

local-cluster mode

local-cluster模式也称为伪分布式,可以使用如下指令运行

MASTER=local[1,2,1024] bin/spark-shell

[1,2,1024] 分别表示,executor number, core number和内存大小,其中内存大小不应小于默认的512M

Driver Programme的初始化过程分析

初始化过程的涉及的主要源文件

  1. SparkContext.scala       整个初始化过程的入口
  2. SparkEnv.scala          创建BlockManager, MapOutputTrackerMaster, ConnectionManager, CacheManager
  3. DAGScheduler.scala       任务提交的入口,即将Job划分成各个stage的关键
  4. TaskSchedulerImpl.scala 决定每个stage可以运行几个task,每个task分别在哪个executor上运行
  5. SchedulerBackend
    1. 最简单的单机运行模式的话,看LocalBackend.scala
    2. 如果是集群模式,看源文件SparkDeploySchedulerBackend

初始化过程步骤详解

步骤1: 根据初始化入参生成SparkConf,再根据SparkConf来创建SparkEnv, SparkEnv中主要包含以下关键性组件 1. BlockManager 2. MapOutputTracker 3. ShuffleFetcher 4. ConnectionManager

 private[spark] val env = SparkEnv.create(
    conf,
    "",
    conf.get("spark.driver.host"),
    conf.get("spark.driver.port").toInt,
    isDriver = true,
    isLocal = isLocal)
  SparkEnv.set(env)

步骤2:创建TaskScheduler,根据Spark的运行模式来选择相应的SchedulerBackend,同时启动taskscheduler,这一步至为关键

  private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master, appName)
  taskScheduler.start()

TaskScheduler.start目的是启动相应的SchedulerBackend,并启动定时器进行检测

override def start() {
    backend.start()

    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
      logInfo("Starting speculative execution thread")
      import sc.env.actorSystem.dispatcher
      sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
            SPECULATION_INTERVAL milliseconds) {
        checkSpeculatableTasks()
      }
    }
  }

步骤3:以上一步中创建的TaskScheduler实例为入参创建DAGScheduler并启动运行

@volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler)
  dagScheduler.start()

步骤4:启动WEB UI

ui.start()

RDD的转换过程

还是以最简单的wordcount为例说明rdd的转换过程

sc.textFile("README.md").flatMap(line=>line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)

上述一行简短的代码其实发生了很复杂的RDD转换,下面仔细解释每一步的转换过程和转换结果

步骤1:val rawFile = sc.textFile("README.md")

textFile先是生成hadoopRDD,然后再通过map操作生成MappedRDD,如果在spark-shell中执行上述语句,得到的结果可以证明所做的分析

scala> sc.textFile("README.md")
14/04/23 13:11:48 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
14/04/23 13:11:48 INFO MemoryStore: ensureFreeSpace(119741) called with curMem=0, maxMem=311387750
14/04/23 13:11:48 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 116.9 KB, free 296.8 MB)
14/04/23 13:11:48 DEBUG BlockManager: Put block broadcast_0 locally took  277 ms
14/04/23 13:11:48 DEBUG BlockManager: Put for block broadcast_0 without replication took  281 ms
res0: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at :13

步骤2: val splittedText = rawFile.flatMap(line => line.split(" "))

flatMap将原来的MappedRDD转换成为FlatMappedRDD

 def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] =                                                                                                  new FlatMappedRDD(this, sc.clean(f))

步骤3:val wordCount = splittedText.map(word => (word, 1))

利用word生成相应的键值对,上一步的FlatMappedRDD被转换成为MappedRDD

步骤4:val reduceJob = wordCount.reduceByKey(_ + _),这一步最复杂

步骤2,3中使用到的operation全部定义在RDD.scala中,而这里使用到的reduceByKey却在RDD.scala中见不到踪迹。reduceByKey的定义出现在源文件PairRDDFunctions.scala

细心的你一定会问reduceByKey不是MappedRDD的属性和方法啊,怎么能被MappedRDD调用呢?其实这背后发生了一个隐式的转换,该转换将MappedRDD转换成为PairRDDFunctions

implicit def rddToPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) =
    new PairRDDFunctions(rdd)

这种隐式的转换是scala的一个语法特征,如果想知道的更多,请用关键字"scala implicit method"进行查询,会有不少的文章对此进行详尽的介绍。

接下来再看一看reduceByKey的定义

  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
    reduceByKey(defaultPartitioner(self), func)
  }

  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
    combineByKey[V]((v: V) => v, func, func, partitioner)
  }

  def combineByKey[C](createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializerClass: String = null): RDD[(K, C)] = {
    if (getKeyClass().isArray) {
      if (mapSideCombine) {
        throw new SparkException("Cannot use map-side combining with array keys.")
      }
      if (partitioner.isInstanceOf[HashPartitioner]) {
        throw new SparkException("Default partitioner cannot partition array keys.")
      }
    }
    val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
    if (self.partitioner == Some(partitioner)) {
      self.mapPartitionsWithContext((context, iter) => {
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else if (mapSideCombine) {
      val combined = self.mapPartitionsWithContext((context, iter) => {
        aggregator.combineValuesByKey(iter, context)
      }, preservesPartitioning = true)
      val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
        .setSerializer(serializerClass)
      partitioned.mapPartitionsWithContext((context, iter) => {
        new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      // Don't apply map-side combiner.
      val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
      values.mapPartitionsWithContext((context, iter) => {
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    }
  }

reduceByKey最终会调用combineByKey, 在这个函数中PairedRDDFunctions会被转换成为ShuffleRDD,当调用mapPartitionsWithContext之后,shuffleRDD被转换成为MapPartitionsRDD

Log输出能证明我们的分析

res1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[8] at reduceByKey at :13

RDD转换小结

小结一下整个RDD转换过程

HadoopRDD->MappedRDD->FlatMappedRDD->MappedRDD->PairRDDFunctions->ShuffleRDD->MapPartitionsRDD

整个转换过程好长啊,这一切的转换都发生在任务提交之前。

运行过程分析

数据集操作分类

在对任务运行过程中的函数调用关系进行分析之前,我们也来探讨一个偏理论的东西,作用于RDD之上的Transformantion为什么会是这个样子?

对这个问题的解答和数学搭上关系了,从理论抽象的角度来说,任务处理都可归结为“input->processing->output"。input和output对应于数据集dataset.

在此基础上作一下简单的分类

  1. one-one 一个dataset在转换之后还是一个dataset,而且dataset的size不变,如map
  2. one-one 一个dataset在转换之后还是一个dataset,但size发生更改,这种更改有两种可能:扩大或缩小,如flatMap是size增大的操作,而subtract是size变小的操作
  3. many-one 多个dataset合并为一个dataset,如combine, join
  4. one-many 一个dataset分裂为多个dataset, 如groupBy

Task运行期的函数调用

task的提交过程参考本系列中的第二篇文章。本节主要讲解当task在运行期间是如何一步步调用到作用于RDD上的各个operation

  • TaskRunner.run
    • Task.run
      • Task.runTask (Task是一个基类,有两个子类,分别为ShuffleMapTask和ResultTask)
        • RDD.iterator
          • RDD.computeOrReadCheckpoint
            • RDD.compute 

或许当看到RDD.compute函数定义时,还是觉着f没有被调用,以MappedRDD的compute定义为例

  override def compute(split: Partition, context: TaskContext) =                                                                                                      
    firstParent[T].iterator(split, context).map(f)  

注意,这里最容易产生错觉的地方就是map函数,这里的map不是RDD中的map,而是scala中定义的iterator的成员函数map, 请自行参考http://www.scala-lang.org/api/2.10.4/index.html#scala.collection.Iterator

堆栈输出

 80         at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:111)
 81         at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:154)
 82         at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149)
 83         at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64)
 84         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
 85         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
 86         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 87         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
 88         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
 89         at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
 90         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
 91         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
 92         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 93         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
 94         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
 95         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
 96         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
 97         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
 98         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
 99         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
100         at org.apache.spark.scheduler.Task.run(Task.scala:53)
101         at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)

ResultTask

compute的计算过程对于ShuffleMapTask比较复杂,绕的圈圈比较多,对于ResultTask就直接许多。

override def runTask(context: TaskContext): U = {
    metrics = Some(context.taskMetrics)
    try {
      func(context, rdd.iterator(split, context))
    } finally {
      context.executeOnCompleteCallbacks()
    }
  } 

 计算结果的传递

上面的分析知道,wordcount这个job在最终提交之后,被DAGScheduler分为两个stage,第一个Stage是shuffleMapTask,第二个Stage是ResultTask.

那么ShuffleMapTask的计算结果是如何被ResultTask取得的呢?这个过程简述如下

  1. ShffuleMapTask将计算的状态(注意不是具体的数据)包装为MapStatus返回给DAGScheduler
  2. DAGScheduler将MapStatus保存到MapOutputTrackerMaster中
  3. ResultTask在执行到ShuffleRDD时会调用BlockStoreShuffleFetcher的fetch方法去获取数据
    1. 第一件事就是咨询MapOutputTrackerMaster所要取的数据的location
    2. 根据返回的结果调用BlockManager.getMultiple获取真正的数据

BlockStoreShuffleFetcher的fetch函数伪码

    val blockManager = SparkEnv.get.blockManager

    val startTime = System.currentTimeMillis
    val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)
    logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format(
      shuffleId, reduceId, System.currentTimeMillis - startTime))

    val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer)
    val itr = blockFetcherItr.flatMap(unpackBlock) 

注意上述代码中的getServerStatusesgetMultiple,一个是询问数据的位置,一个是去获取真正的数据。

有关Shuffle的详细解释,请参考”详细探究Spark的shuffle实现一文" http://jerryshao.me/architecture/2014/01/04/spark-shuffle-detail-investigation/

 

http://www.cnblogs.com/hseagle/p/3673132.html

分享到:
评论

相关推荐

    Apache Spark源码走读之2 -- Job的提交与运行

    Apache Spark的作业提交与运行机制是其核心组成部分之一,涉及到进程、线程的创建以及任务的调度等多个方面。 首先,要想深入理解Spark作业的提交与运行,需要搭建实验环境。搭建步骤主要包括下载Spark二进制包,...

    Apache Spark源码走读之5 -- DStream处理的容错性分析

    ### Apache Spark源码走读之五:DStream处理的容错性分析 #### 环境搭建与背景 为了深入理解Apache Spark Streaming中DStream处理的容错机制,本文将从一个简单的Spark Streaming示例出发,逐步分析Spark如何确保...

    Apache Spark源码走读之4 -- DStream实时流数据处理

    ### Apache Spark源码走读之四:DStream实时流数据处理 #### 一、系统概述与流数据特性 本文档探讨了Apache Spark Streaming的核心概念之一——**DStream**(Discretized Stream)及其如何实现对实时流数据的有效...

    Apache Spark源码走读:如何进行代码跟读

    ### Apache Spark源码走读:如何进行代码跟读 #### 概述 本文旨在探讨如何有效地进行Apache Spark源码的阅读与理解。Apache Spark作为一款高性能的分布式计算框架,在大数据处理领域占据着重要地位。其核心由Scala...

    Apache_Spark源码走读

    ### Apache Spark 源码解析概述 #### 一、引言 Apache Spark 是一款开源的大规模数据处理框架,因其高效性、灵活性以及易用性在大数据处理领域得到了广泛的应用。对于想要深入了解Spark内部机制的人来说,阅读其...

    ApacheSpark源码走读(二)

    Spark作为一个非常优秀的并行处理框架,集成了一些并行化的算法也是理所当然。Graphx是一些图的常用算法在Spark上的并行化实现,同时提供了丰富的API接口。本文就Graphx的代码架构及PageRank在Graphx中的具体实现做...

    C++代码走读意见--开发注意事项

    ### C++代码走读意见与开发注意事项 #### 内存管理与安全性 在软件开发过程中,尤其是使用C++这类提供底层内存操作的语言时,代码质量和安全性尤为重要。本篇将基于给定的“C++代码走读意见--开发注意事项”文件中...

    Storm源码走读笔记

    本文档是关于Storm源码的详细走读笔记,主要分析了Storm的启动场景、Topology提交过程、worker进程中的线程使用情况、消息传递机制以及 TridentTopology的创建和ack机制等多个方面。 首先,文档提到了Storm集群中的...

    nova-compute源码分析

    ### nova-compute源码分析 #### 一、Nova概述及工作职责 **1.1 Nova的角色与任务** Nova是OpenStack项目中一个至关重要的组成部分,它主要负责虚拟机实例的生命周期管理,包括创建、调度、运行和销毁等功能。具体...

    hadoop源码分析-HDFS部分

    《Hadoop源码分析——HDFS部分》 Hadoop,作为开源大数据处理的基石,其核心组件之一就是HDFS(Hadoop Distributed File System),这是一个高度容错性的分布式文件系统,设计用于运行在廉价硬件上,能够处理大规模...

    mina源码走读与实例

    ### MINA源码走读与实例 #### 一、MINA概述 **MINA**(**M**ulti **I**nterface **N**etwork **A**pplication)是Apache组织下的一款开源网络通信框架,它主要针对TCP/IP、UDP/IP协议栈提供了高效的封装和扩展能力...

    java8源码-java8-source:java8源码走读

    IDEA走读Java源码坏境搭建 新建一个普通java项目(如:java8-source) 创建package(tech.sqlclub.java_source)存放java源码 java源码在$JAVA_HOME/src.zip 解压就行,mac用户JAVA_HOME查看如下图: 通过Debug,撸...

    Blog-Atheros_minstrel速率调整算法源码走读 _琴剑飘零1

    《Atheros Minstrel 速率调整算法源码解析》 Minstrel 速率调整算法是 Atheros 无线网卡驱动中用于优化无线通信性能的关键算法,它通过动态调整发送速率来适应网络环境的变化,以提高无线网络的吞吐量和稳定性。...

    代码走读时需要关注的内容分析

    ### 代码走读时需要关注的内容分析 #### 一、准备工作 在进行代码走读之前,准备工作至关重要。这部分主要包括以下几个方面: 1. **设计文档**:确保开发人员已获得一个设计文档来理解代码,该文档应该是最新的...

    ceph源码 io读写流程分析串讲

    由于项目需要,最近深入细致的了解了ceph的读写流程,并且跟项目组做了一个代码串讲。附上串讲用的ppt。 个人认为,理解了ceph的io流水线模型,是理解整个io读写流程的关键。

    Blog-Atheros_Ath9k速率调整算法源码走读_琴剑飘零1

    本文基于博客《【Atheros】Ath9k速率调整算法源码走读》的内容,深入探讨了Atheros Ath9k驱动中的速率调整算法,并与Minstrel算法进行了对比分析。 #### 二、速率控制概述 在Atheros Ath9k驱动中,支持两种速率...

    spark-2.1.1:spark原始物走读注解解

    《Spark 2.1.1:深度解析与源码阅读笔记》 Spark作为一个开源的分布式计算框架,以其高效、易用和灵活性深受大数据处理领域的欢迎。Spark 2.1.1是其发展中的...希望这份源码阅读笔记能为你的Spark之旅提供有力的支持。

Global site tag (gtag.js) - Google Analytics