`

SparkStreaming源码全方位解析--

 
阅读更多

 复制大牛的分析,因为他的博客在国内打不开
http://guaver.info/spark-streaming-code-analysis/

 

SparkStreaming源码全方位解析

09 MARCH 2015

最近在做基于Kafka + Spark Streaming的实时计算,今天研究了下Spark Streaming源码,在此记录下。主要以WordCount为例,具体讲解Spark Streaming的实现细节。


从WordCount说起

一个最简单的基于Spark Streaming的WordCount,代码如下:

object SocketWordCount extends App {  
  val conf = new SparkConf().
      setMaster("local[*]").setAppName("WordCount")
  val ssc = new StreamingContext(conf, Seconds(10))
  val lines = ssc.socketTextStream("localhost", 9999)
  val wordCounts = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
  wordCounts.print
  ssc.start
  ssc.awaitTermination
}

这个WordCount小程序很简单。首先创建一个SparkContext对象(与创建SparkContext不同,需要指定一个时间间隔);然后通过ssc.socketTextStream创建InputDStream,然后对DStream进行各种transformation,调用print将结果输出;最后调用ssc.start启动程序即可。

更多Spark Streaming资料,详见官网教程 Spark Streaming Programming Guide


创建StreamingContext

val ssc = new StreamingContext(conf, Seconds(10))

StreamingContext内部包含一个SparkContext,可以直接传入构造函数,或者通过传入的SparkConf新建;如果设置Checkpoint,可以通过Checkpoint.sparkConf新建。(注:为简化流程,后续解读均不涉及check point,write ahead log等细节)

除SparkContext,如下初始化组件需要注意:

DSreamGraph:主要含有一个inputStreams数组和一个outputStreams数组。

JobScheduler:调度SparkSteaming任务,主要包含一个ReceiverTracker(Receiver跟踪器),一个JobGenerator(JobGenerator生成器),以及一个JobScheduler Actor。


创建InputDSteam

val lines = ssc.socketTextStream("localhost", 9999)

socketTextStream函数新建并返回了一个SocketInputDStream对象,其继承关系依次为:SocketInputDStream <= ReceiverInputDStream <= InputDStream <= DStream。

首先,我们来看一看ReceiverInputDStream这个抽象类:

  • 继承于InputDStream,表示此InputDStream必须在worker节点上启动一个receiver来接收外部的数据。(InputDStream除了ReceiverInputDStream子类外,还有一个ConstantInputDStream子类,标示每个时间点都返回相同的RDD,主要用于测试)
  • 基类InputDStream初始化时,会调用ssc.graph.addInputStream(this),将自己加入到graph的inputStreams中。
  • 包含一个由ssc生成的streamId,作为receiver input stream的唯一标示。
  • 实现了基类的start和stop方法,函数体均为空(没有必要实现)。
  • 实现了基类的compute(validTime: Time)方法,主要功能为从receiverTracker读取此streamId在一个batchTime的所有blocks,最后返回一个new BlockRDD[T](ssc.sc, blockIds),用于后续的计算。
  • 最后留了一个def getReceiver(): Receiver[T]方法,供子类实现。

SocketInputDStream很简单,只是实现了基类的getReceiver方法,新建并返回了一个SocketReceiver对象。

SocketReceiver是一个Receiver实现。实现一个Receiver很简单,只需要实现继承Receiver类,并实现onStart()和onStop()方法即可。更多关于Receiver的内容,详见官网教程 Custom Receiver Guide

OK,至此我们对InputDSteam和Receiver的构造有了一个基本了解,至于到底如何使用,我们接着放下看。


DStream转化

val wordCounts = lines.flatMap(.split(" ")).map((, 1)).reduceByKey(_ + _) wordCounts.print

Spark Core运算的基本单位是RDD,而Spark Streaming则是在DStream之上进行计算;RDD有一系列的transformation和action,DStream也有很多transformationoutput operatioin。DStream是一个RDD的时间序列,其实最终计算还是会转移到RDD的计算上。

我们首先来看下DStream的构造(其实上节已经看其子类InputDSteam的实现):

  • 子类需实现如下三个函数
    • def slideDuration: Duration // DStream生成一个RDD的时间间隔
    • def dependencies: List[DStream[_]] // 所依赖的父DStream(与RDD相似,也有依赖关系)
    • def compute (validTime: Time): Option[RDD[T]] // 对于一个给定的时间生成一个RDD(应该还记得上一节ReceiverInputDStream返回的new BlockRDD[T](ssc.sc, blockIds)吧)
  • 除此之外就是各种transformation和output operation的实现了。
  • def getOrCompute(time: Time): Option[\RDD[T]],我们需要注意下这个函数,DStream含有一个generatedRDDs: HashMap[\Time, RDD[T]],首先会看generatedRDDs中是否有time对应的RDDs,若有直接返回;否则,调用compute(time)进行计算;最后再将新计算的newRDD加入到generatedRDDs进行缓存。

下面我们分别选一个transformation和output operation实现来看下。

首先我们看下flatMap函数实现:

def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = {  
  new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
  }

很简单,返回了一个FlatMappedDStream。FlatMappedDStream实现也很简单,分别实现了slideDuration、dependencies、compute这三个函数。

override def dependencies = List(parent)

override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[U]] = {  
 parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
}

通过Compute函数,可见其会调用getOrCompute,获取parent DStream在某个时间点的RDD,然后对RDD信息转换,生成新的RDD。

接下来,我们再来看下print函数的实现:

def print() {  
  def foreachFunc = (rdd: RDD[T], time: Time) => {
    val first11 = rdd.take(11)
    first11.take(10).foreach(println)
  }
  new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
}

print()最后新建并返回了一个ForEachDStream,而所有output operation均是如此,我们再来看下ForEachDStream的实现:

override def compute(validTime: Time): Option[RDD[Unit]] = None

override def generateJob(time: Time): Option[Job] = {  
  parent.getOrCompute(time) match {
    case Some(rdd) =>
      val jobFunc = () => {
        ssc.sparkContext.setCallSite(creationSite)
        foreachFunc(rdd, time)
      }
      Some(new Job(time, jobFunc))
    case None => None
  }
}

其compute函数返回None,但是多了一个generateJob函数,生成new Job(time, jobFunc)对象,而Job之后会被调度。


启动StreamingContext

ssc.start

很简单,启动JobScheduler,而JobScheduler接着启动了ReceiverTrackerJobGenerator

ReceiverTracker主要负责原始数据的读入,而JobGenerator主要负责具体Job的触发与执行。下面我将分三个小节来分别讲解这两个核心组件,ReceiverTracker内容较多,分receiver启动和外部数据读取两个小节讲解。


ReceiverTracker源码分析(一) receiver启动

ReceiverTracker启动后,会创建ReceiverTrackerActor,响应RegisterReceiver、AddBlock、ReportError、DeregisterReceiver事件。

接着启动ReceiverLauncher线程,这个线程将通过startReceivers函数,启动这个集群上的所有receivers。

我们看下startReceivers函数:

  • 首先其从inputStreams获取所有的receivers
  • 然后将其封装成为tempRDD
  • 定义函数startReceiver
  • 最后调用ssc.sparkContext.runJob(tempRDD, ssc.sparkContext.clean(startReceiver))函数,将所有的receiver通过封装成RDD,分发到集群的节点上,并启动startReceiver函数。(借住RDD分发任务,非常巧妙!)

下面重点庄转移到startReceiver函数,其新建了一个ReceiverSupervisorImpl,对receiver的一个包装类,然后启动supervisor,之后awaitTermination阻塞。

接着看ReceiverSupervisorImpl启动做了什么。调用两个函数onStart()和startReceiver()

  • onStart()由ReceiverSupervisorImpl实现,主要是启动blockGenerator。(ReceiverSupervisorImpl初始化时,会一并初始化blockGenerator,传入BlockGeneratorListener。BlockGenerator的具体含义在下一节会讲到)
  • startReceiver()由ReceiverSupervisor基类实现
    • 主要是调用receiver.onStart(),终于启动receiver了!!!
    • 然后调用ReceiverSupervisorImpl的onReceiverStart(),即为向ReceiverTrackerActor发送RegisterReceiver消息,将receiver加入元信息receiverInfo中。

至此,Receiver的启动过程完毕!!!


ReceiverTracker源码分析(二) 外部数据读取

数据的读入由receiver触发,receiver启动后会读取外部数据源的消息,有两种方法将其存储:

  • 调用store(dataItem: T),存储单条消息
    • 最终调用receiver对应的ReceiverSupervisorImpl的pushSingle(dataItem)方法
    • pushSingle调用blockGenerator.addData(data),将消息写入到 blockGenerator的currentBuffer中。(放入currentBuffer之前会有一个流控函数,配置参数spark.streaming.receiver.maxRate)
    • 上节讲到会启动blockGenerator。
      1. 会定时启动updateCurrentBuffer函数),将currentBuffer生成Block,放入blocksForPushing队列。(spark.streaming.blockInterval,默认200毫秒)
      2. 启动blockPushingThread线程,获取blocksForPushing队列中的blocks,并调用pushAndReportBlock方法。(调用比较曲折,先调用listener.onPushBlock,再调用ReceiverSupervisorImpl的pushArrayBuffer,最后再调用ReceiverSupervisorImpl的pushAndReportBlock)
  • 调用store(dataBuffer: ArrayBuffer[T]),消息批量存储
    • 会调用receiver对应的ReceiverSupervisorImpl的pushArrayBuffer方法
    • 最后直接调用pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption)方法

可见使用store(dataItem: T)无需自己生成Block,且有自动流控措施,但是当receiver挂掉的时候currentBuffer中的messages和blocksForPushing中的blocks均有可能会丢失。所以Unreliable Receivers可以使用,而对于Reliable Receivers,必须使用store(dataBuffer: ArrayBuffer[T])。详见官网 Custom Receiver Guide

最后再来说一说pushAndReportBlock方法:

  • 首先获取一个blockId
  • 然后调用 val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock),接着调用BlockManagerBasedBlockHandler的storeBlock,最后 blockManager.putIterator,将Block信息存入blockManage!!!
  • 最后向ReceiverTrackerActor发送AddBlock消息,将ReceivedBlockInfo(streamId, numRecords, blockStoreResult),ReceiverTrackerActor接着调用receivedBlockTracker.addBlock(receivedBlockInfo),最后加入getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo。将Block信息存入ReceiverTrackerActor的streamIdToUnallocatedBlockQueues,供计算使用!!!

至此,外部数据读取过程完毕!!!


JobGenerator源码分析

JobGenerator启动后,创一个JobGeneratorActor,响应GenerateJobs(time)、ClearMetadata(time)、DoCheckpoint(time)、ClearCheckpointData(time)等事件。

然后调用startFirstTime函数,依次启动graph(就是inputStreams和outputStreams的一些初始化,不讲了),并且根据配置的batchDuration定时向JobGeneratorActor发送GenerateJobs消息。

OK,来看看GenerateJobs做了什么:

  • receiverTracker.allocateBlocksToBatch(time) // 从streamIdToUnallocatedBlockQueues中获取这个batchDuration的所有streamId对应Blocks,加入到timeToAllocatedBlocks(batchTime) = allocatedBlocks
  • val jobs = graph.generateJobs(time) // 生成这个batchDuration内所有的jobs
    • 遍历所有的outputStreams,分别调用其generateJob(time)方法,生成job
  • val receivedBlockInfos = jobScheduler.receiverTracker.getBlocksOfBatch(time) // 获取timeToAllocatedBlocks(batchTime)对应的所有blocks
  • jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfos)) // 提交JobSet
    • 遍历JobSet中的每个job,讲new JobHandler(job)加入线程池jobExecutor执行(线程池大小:spark.streaming.concurrentJobs,默认为1)
    • JobHandler开始执行时,首先向JobGeneratorActor发送JobStarted消息,然后调用job的run()方法,进而调用func()函数,及最后的foreachFunc()。foreachFunc最终将作用于这个batchDuration的outputStream对应的RDD上,进而产生Spark的任务!!!
    • 执行完毕后,向JobGeneratorActor发送JobCompleted消息。

至此,Job调度过程完毕!!!


总结:Spark Streaming与Spark Core的联系

总体来说,Spark Streaming的实现以Spark Core为基础,通过ReceiverTracker来读取外部数据,通过JobGenerator定期生成计算任务,整体结构实现清晰明确。

Spark Streaming用到Spark Core的地方在总结下:

  1. Receiver分发至各个节点并执行,使用了Spark Core提交RDD任务的过程,很巧妙;
  2. 外部数据源读入的数据存入BlockManager;
  3. 对于InputDStream,每隔batchDuration切分的RDD,DStream间的transformation,即为RDD的transformation;
  4. 提交的任务最终转化为一个Spark Core的RDD计算任务。

后续计划

  1. Spark Streaming No Data Loss 源码分析
  2. KafkaReceiver 源码分析
  3. Kafka + Spark Streaming 整合最佳实践
分享到:
评论

相关推荐

    spark pdf大全

    全面覆盖了Spark的技术特性、应用场景以及性能优化策略,帮助读者从理论到实践,全方位提升Spark技能。 8. **深入理解Spark:核心思想与源码分析.pdf**: 类似于前面的源码剖析,这本书进一步探讨Spark的设计哲学...

    模拟风电场监控项目

    这个项目提供了完整的源码和详细说明,确保在Window 10/11环境下能顺利运行,并且包含了演示用的图片和部署教程,为学习者提供了全方位的学习资源。 1. **软件工程**:此项目展示了软件开发的完整生命周期,从需求...

    Windows系统字体转unifont字体v2.0(grub4dos字体生成工具)

    在IT领域,尤其是在图形设计和跨平台应用开发中,字体的选择和使用是非常关键的。"Windows系统字体转unifont字体工具"是一个专门用于将Windows操作系统中的字体转换为unifont格式的工具。这个工具的主要目标是帮助用户解决在不同操作系统之间字体兼容性的问题,特别是对于那些需要支持大量字符集,如Unicode的项目。 Unifont是一种开放源代码的字体,包含了几乎所有的Unicode字符,因此在多语言环境和开源软件中特别受欢迎。它提供了一种统一的视觉体验,确保无论在哪种操作系统或设备上,都能准确显示各种语言的文字。然而,Windows系统默认的字体并不包含所有Unicode字符,这可能导致在某些情况下无法正确显示非标准字符。 转换过程涉及以下几个核心知识点: 1. **字体格式**:Windows系统中常见的字体格式有TrueType(.ttf)和OpenType(.otf),而unifont是一种特殊的Bitmap字体,通常以.gz ufnt或.ttf.gz形式存在。Bitmap字体将每个字符绘制为位图,适合低分辨率屏幕或嵌入式系统,但可能在高分辨率下显得模糊。

    uClinux源代码中Make文件完全解析.pdf.rar

    uClinux源代码中Make文件完全解析.pdf.rar

    最新更新!上市公司股吧舆论数据(2008-2023年)

    ## 介绍 进入互联网新媒体时代,“股吧”作为一类专门针对上市公司的社交媒介,已经成为中小投资者分享投资经验和发表对公司运营意见的重要平台,股吧舆论作为投资者情绪的反映,直接影响股票的市场表现。 ## 一、上市公司股吧舆论数据的介绍 “股吧”作为新兴社交媒体代表,本身并不提供信息,仅提供多方交互平台,其将个体间的实时交流和回应形成公众关注和舆论;因此,股吧舆论数据可以帮助研究人员深入分析网络舆论与企业表现之间的关系,并为投资者提供情绪波动的参考依据。 本分享数据年份为2008年到2023年,数据来源于东方财富网股吧,涉及A股上市公司的讨论情况,涵盖了股吧发帖数量、阅读量、评论次数等多个维度。 ## 二、数据指标

    【东海证券-2025研报】公司深度报告:AIOT次新品显著放量,产品矩阵拓展布局新市场.pdf

    【东海证券-2025研报】公司深度报告:AIOT次新品显著放量,产品矩阵拓展布局新市场.pdf

    基于SpringBoot的图书管理系统(源码+数据库+万字文档+ppt)358

    图书管理系统,系统包含两种角色:管理员、用户,系统分为前台和后台两大模块,主要功能如下。 前台: - 首页:展示系统的概览信息。 - 图书信息:展示图书的详细信息。 - 公告信息:展示图书馆相关的通知公告。 - 在线咨询:提供在线客服咨询服务。 - 个人中心:用户可以登录后进入个人中心 后台: 管理员角色: - 个人中心:管理员可以管理个人信息,修改密码等。 - 用户管理:管理员可以对用户的信息进行增删改查等操作。 - 图书分类管理:管理员可以管理图书分类信息,添加、修改、删除分类名称及其描述。 - 图书信息管理:管理员可以管理图书的基本信息。 - 系统管理:管理员可以管理系统的一些通用配置。 二、项目技术 编程语言:Java 数据库:MySQL 项目管理工具:Maven 前端技术:Vue 后端技术:SpringBoot 三、运行环境 操作系统:Windows、macOS都可以 JDK版本:JDK1.8以上都可以 开发工具:IDEA、Ecplise、Myecplise都可以 数据库: MySQL5.7以上都可以 Maven:任意版本都可以

    企业数字化转型IT信息化战略.pdf

    企业数字化转型IT信息化战略.pdf

    LB1Q-PHP+MySql_1个通用条件工资成绩等通用查询系统手机加强版版(Utf-8)_2024最终版.zip

    LB1Q-PHP+MySql_1个通用条件工资成绩等通用查询系统手机加强版版(Utf-8)_2024最终版

    基于Python卷积神经网络人脸识别驾驶员疲劳检测与预警系统+源码+文档(毕业设计)

    基于Python卷积神经网络人脸识别驾驶员疲劳检测与预警系统+源码+文档(毕业设计)本资源中的源码都是经过本地编译过可运行的,评审分达到95分以上,适合正在准备毕业设计或者大作业的学生和实战人员,可作为毕业设计、大作业,资源项目的难度比较适中,内容都是经过助教老师审定过的能够满足学习、使用需求,如果有需要的话可以放心下载使用。 基于Python卷积神经网络人脸识别驾驶员疲劳检测与预警系统+源码+文档(毕业设计)基于Python卷积神经网络人脸识别驾驶员疲劳检测与预警系统+源码+文档(毕业设计)基于Python卷积神经网络人脸识别驾驶员疲劳检测与预警系统+源码+文档(毕业设计)基于Python卷积神经网络人脸识别驾驶员疲劳检测与预警系统+源码+文档(毕业设计)基于Python卷积神经网络人脸识别驾驶员疲劳检测与预警系统+源码+文档(毕业设计)基于Python卷积神经网络人脸识别驾驶员疲劳检测与预警系统+源码+文档(毕业设计)基于Python卷积神经网络人脸识别驾驶员疲劳检测与预警系统+源码+文档(毕业设计)基于Python卷积神经网络人脸识别驾驶员疲劳检测与预警系统+源码+文档(毕业设

    江门市乡镇边界,矢量边界,shp格式

    矢量边界,行政区域边界,精确到乡镇街道,可直接导入arcgis使用

    平芯微PW4584应用电路pcb,ad格式

    平芯微PW4584应用电路pcb,ad格式

    基于SpringBoot的民宿管理系统(源码+数据库+万字文档)336

    民宿管理系统,系统包含两种角色:管理员、用户,系统分为前台和后台两大模块,主要功能如下。 前台功能: 1. 首页:展示民宿的相关信息和推荐房源。 2. 房间信息:用户可以查看房间的详细信息。 3. 论坛:用户可以在论坛上进行讨论和交流。 4. 公告信息:展示民宿的公告信息,包括优惠活动、重要通知等。 5. 个人中心:用户可以管理个人信息、订单记录、收藏房源等。 后台功能: 用户: 1. 个人中心:管理个人信息。 2. 房间信息管理:管理房间信息,包括添加、编辑、删除房间信息。 3. 论坛管理:管理论坛帖子,包括查看、删除、置顶等操作。 4. 公告信息管理:管理公告信息,包括添加、编辑、删除公告等操作。 管理员: 1. 个人中心:管理个人信息。 2. 管理员管理:管理其他管理员账号,包括添加、编辑、删除管理员等操作。 3. 基础数据管理:管理基础数据,包括地区信息、设施信息等。 4. 房间信息管理:管理房间信息,包括添加、编辑、删除房间信息。 5. 论坛管理:对论坛帖子进行管理,包括查看、删除、置顶等操作。 6. 公告信息管理:管理公告信息,包括添加、编辑、删除公告等操作。

    30012第9章MCS-51单片机IO接口技术20140413.ppt

    30012第9章MCS-51单片机IO接口技术20140413.ppt

    【最新版】 JESD22-A104F.01 2023.rar

    【最新版】 JESD22-A104F.01 2023.rar

    基于SpringBoot的船舶维保管理系统(源码+数据库+万字文档)381

    船舶维保管理系统,系统包含四种角色:管理员、船家、维保人员、维保公司,系统分为前台和后台两大模块,主要功能如下。 船家: - 个人中心:管理个人信息。 - 公告管理:查看和发布系统公告。 - 船舶管理:管理自己的船舶信息。 - 维保公司管理:选择和管理维保公司。 - 维保计划管理:制定和管理船舶的维保计划。 - 故障上报管理:上报船舶的故障情况。 维保公司: - 个人中心:管理个人信息。 - 公告管理:查看和发布系统公告。 - 船舶管理:管理负责的船舶信息。 - 维保人员管理:管理维保人员的信息。 - 维保计划管理:制定和管理船舶的维保计划。 - 故障上报管理:接收和处理船舶的故障上报。 - 维修成本管理:记录和统计维修过程中的成本。 维保人员: - 个人中心:管理个人信息。 - 公告管理:查看系统公告。 - 船舶管理:管理负责的船舶信息。 - 维保计划管理:查看和执行船舶的维保计划。 - 故障上报管理:上报船舶的故障情况。 - 维修成本管理:记录和统计维修过程中的成本。

    LW3T-PHP+TXT_3个通用条件工资成绩等通用查询系统电脑网页版版(Utf-8)_2024最终版.zip

    LW3T-PHP+TXT_3个通用条件工资成绩等通用查询系统电脑网页版版(Utf-8)_2024最终版

    【东兴证券-2025研报】食品饮料行业:市场预期逐渐改变,关注短期估值修复行情.pdf

    【东兴证券-2025研报】食品饮料行业:市场预期逐渐改变,关注短期估值修复行情.pdf

    基于SpringBoot的毕业就业信息管理系统(源码+数据库+万字文档)328

    毕业就业信息管理系统,系统包含三种角色:管理员、公司、用户,系统分为前台和后台两大模块,主要功能如下。 前台: 首页:展示毕业就业信息管理平台的相关内容。 公司:浏览和搜索招聘公司的信息。 简历:学生上传和管理自己的简历,进行简历投递。 公告信息:查看和发布最新的就业相关公告信息。 职位招聘:浏览和搜索最新的职位招聘信息。 个人中心:管理个人信息和简历。 后台(管理员): 个人中心:管理个人信息。 管理员管理:管理其他管理员账号的信息。 基础数据管理:管理系统中的基础数据, 公司管理:管理招聘公司的信息, 简历管理:管理学生上传的简历信息, 就业统计管理:统计毕业生的就业情况。 公告信息管理:发布和管理最新的就业相关公告信息。 简历投递管理:管理简历投递情况, 学生管理:管理学生账号信息, 职位招聘管理:管理职位招聘信息, 轮播图信息:管理系统的轮播图信息, 用户: 个人中心:管理个人信息和简历。 简历管理:上传和管理个人的简历信息。 就业统计管理:查看和更新个人的就业情况。 公告信息管理:查看最新的就业相关公告信息。 简历投递管理:查看个人简历投递情况和审核进度。 职位招聘

    君の850云端女孩全面屏.apk

    君の850云端女孩全面屏.apk

Global site tag (gtag.js) - Google Analytics