`

SparkStreaming是如何完成不停的循环处理的代码浅析

阅读更多
一直很好奇Sparkstreaming的ssc.start是怎么做到不停的一直定时循环处理数据的, 看了一下源码, 大致明白了整个过程, 记录分享一下。

入口为StreamingContext的start方法:

在构造StreamingContext的时候 state就初始化为INITIALIZED , 并且定义了一个JobScheduler scheduler

代码里面很明白, 在初始化的时候, 执行了JobScheduler的start方法。


def start(): Unit = synchronized {
    state match {
      case INITIALIZED =>
        startSite.set(DStream.getCreationSite())
        StreamingContext.ACTIVATION_LOCK.synchronized {
          StreamingContext.assertNoOtherContextIsActive()
          try {
            validate()

            // Start the streaming scheduler in a new thread, so that thread local properties
            // like call sites and job groups can be reset without affecting those of the
            // current thread.
            ThreadUtils.runInNewThread("streaming-start") {
              sparkContext.setCallSite(startSite.get)
              sparkContext.clearJobGroup()
              sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
              scheduler.start()
            }
            state = StreamingContextState.ACTIVE
          } catch {
            case NonFatal(e) =>
              logError("Error starting the context, marking it as stopped", e)
              scheduler.stop(false)
              state = StreamingContextState.STOPPED
              throw e
          }
          StreamingContext.setActiveContext(this)
        }
        shutdownHookRef = ShutdownHookManager.addShutdownHook(
          StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
        // Registering Streaming Metrics at the start of the StreamingContext
        assert(env.metricsSystem != null)
        env.metricsSystem.registerSource(streamingSource)
        uiTab.foreach(_.attach())
        logInfo("StreamingContext started")
      case ACTIVE =>
        logWarning("StreamingContext has already been started")
      case STOPPED =>
        throw new IllegalStateException("StreamingContext has already been stopped")
    }
  }




那么我们看一下JobScheduler的start方法里面都做了什么:
1.启动定义了一个eventLoop
2.启动定义了一个ReceiverTracker
3.启动定义了一个jobGenerator



def start(): Unit = synchronized {
    if (eventLoop != null) return // scheduler has already been started

    logDebug("Starting JobScheduler")
    eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
      override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
    }
    eventLoop.start()

    // attach rate controllers of input streams to receive batch completion updates
    for {
      inputDStream <- ssc.graph.getInputStreams
      rateController <- inputDStream.rateController
    } ssc.addStreamingListener(rateController)

    listenerBus.start(ssc.sparkContext)
    receiverTracker = new ReceiverTracker(ssc)
    inputInfoTracker = new InputInfoTracker(ssc)
    receiverTracker.start()
    jobGenerator.start()
    logInfo("Started JobScheduler")
  }



eventloop主要来处理Job相关的event:
JobStarted(job, startTime) => handleJobStart(job, startTime)
JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)
ErrorReported(m, e) => handleError(m, e)

然后ReceiverTracker 主要用来启动receiver和接收数据的。 回头有空了再详细了解一下receiver是怎么工作的, 到时候再写一篇文章

JobScheduler里面最重要的就是JobGenerator的start方法,



def start(): Unit = synchronized {
    if (eventLoop != null) return // generator has already been started

    // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
    // See SPARK-10125
    checkpointWriter

    eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
      override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = {
        jobScheduler.reportError("Error in job generator", e)
      }
    }
    eventLoop.start()

    if (ssc.isCheckpointPresent) {
      restart()
    } else {
      startFirstTime()
    }
  }



这里面也做了两个主要的事:
1.定义了一个新的eventLoop 用来处理以下情况:
GenerateJobs(time) => generateJobs(time)
ClearMetadata(time) => clearMetadata(time)
DoCheckpoint(time, clearCheckpointDataLater) =>      doCheckpoint(time, clearCheckpointDataLater)
ClearCheckpointData(time) => clearCheckpointData(time)

2.startFirstTime 方法就是不停的做loop, 每次到我们设定的Duration的时候 再submitJob

先看一下#1:
其实在构造JobGenerator的时候, 我们已经构造了一个timer, 这个timer就是用来call back GenerateJobs(time)的: eventLoop.post


  private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")



在#1定义完eventloop后, 正常情况下 第一次就会执行startFirstTime ,



  private def startFirstTime() {
    val startTime = new Time(timer.getStartTime())
    graph.start(startTime - graph.batchDuration)
    timer.start(startTime.milliseconds)
    logInfo("Started JobGenerator at " + startTime)
  }


在这么方法里面就会去执行 timer.start动作, start的动作里面主要就是去启动一个thread:

  def start(startTime: Long): Long = synchronized {
    nextTime = startTime
    thread.start()
    logInfo("Started timer for " + name + " at time " + nextTime)
    nextTime
  }


这个thread定义就是:
  private val thread = new Thread("RecurringTimer - " + name) {
    setDaemon(true)
    override def run() { loop }
  }




他的run里面就跑了loop方法, 一看方法名字, 我们就知道StreamingContext能循环submitJob 的功能估计就在这个方法里面了, 我们看一下代码, 他干了什么:


private def loop() {
    try {
      while (!stopped) {
        triggerActionForNextInterval()
      }
      triggerActionForNextInterval()
    } catch {
      case e: InterruptedException =>
    }
  }
}

private def triggerActionForNextInterval(): Unit = {
    clock.waitTillTime(nextTime)
    callback(nextTime)
    prevTime = nextTime
    nextTime += period
    logDebug("Callback for " + name + " called at time " + prevTime)
  }



代码很简单, 主要功能就是clock.waitTillTime(nextTime) 确认一个Duration到了, 然后执行callback(nextTime) 方法, 再设定nexttime到下一个Duration。

关键就是这个callback做的是什么。 从上面的代码我们可以看出来 具体执行的是在timer里面做的, 在我们构造timer的时候我们已经构造了callback这个方法, 具体就是在JobGenerator里面的:
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")


这个callback其实就是eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

那么GenerateJobs()做的又是什么呢? 网上翻, 可以看到我们有一个eventloop里面定义了GenerateJobs(time)去执行generateJobs()方法, 这个方法里面又做了什么呢? 还是看代码:

 /** Generate jobs and perform checkpoint for the given `time`.  */
  private def generateJobs(time: Time) {
    // Set the SparkEnv in this thread, so that job generation code can access the environment
    // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
    // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
    SparkEnv.set(ssc.env)
    Try {
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
    }
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
  }



可以看到他主要就是做submitJobSet这个动作, 那么这个方法里面又干了些什么呢?

  def submitJobSet(jobSet: JobSet) {
    if (jobSet.jobs.isEmpty) {
      logInfo("No jobs added for time " + jobSet.time)
    } else {
      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
      jobSets.put(jobSet.time, jobSet)
      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
      logInfo("Added jobs for time " + jobSet.time)
    }
  }





好啦, 越来越明白了, 这里面就是去执行Job, 在JobHandler 里面还有job.run

可能有点绕, 但是也基本明白了StreamingContext是怎么工作的, 从初始化, 循环, 再到submitJob, 整个过程大概就是这个样子。

接下来马上要花一些时间玩一下SparkSQL, 到时候看看能不能写几篇他的文章出来, 最后再啃ML这个硬骨头  打算再花一个多月时间弄懂SparkSQL和ML后, 好好玩一下hadoop的生态系统, 目前只对MR大致会用, 不知道内部是怎么运行的, 有机会好好看一下


1
7
分享到:
评论

相关推荐

    算法文档无代码浅析“最小表示法”思想在字符串循环同构问题中的应用

    算法文档通常会介绍算法的具体实现步骤,但在本例中,由于是“无代码浅析”,文档可能不会提供完整的编程代码,而是着重于算法的理论基础、处理逻辑、应用场景以及算法复杂度等。在IT专业领域,理解算法的原理对于...

    C语言循环结构教学浅析.pdf

    循环结构通常是在顺序结构基础上建立的,顺序结构需要编写若干次相同的代码以完成重复任务,而循环结构则可以减少这种重复,通过循环控制语句重复执行相关代码块。 循环结构程序的设计方法主要包括两种类型:当型...

    C语言循环语句浅析.pdf

    本文将深入浅析C语言中的循环语句,包括其类型、工作原理和实际应用。 一、循环语句的种类 1. `for` 循环:C语言中的`for`循环通常用于已知循环次数的情况。它由三个部分组成:初始化、条件检查和迭代操作。例如:...

    组织机构代码的沉淀数据处理浅析.pdf

    《组织机构代码的沉淀数据处理浅析》 组织机构代码,作为我国国民经济和社会信息化工作的重要组成部分,对于确保数据信息质量,提升全社会的应用水平具有关键意义。近年来,全国范围内的组织机构代码赋码颁证工作已...

    浅析PHP语言中的循环语句.pdf

    while循环语句与if语句的不同就在于if语句只有在条件为真的情况下才执行后续的代码段一次,而while循环语句只要其条件为真时,就会重复执行代码段。 do...while循环语句与while循环语句类似,但是它首先执行循环体...

    浅析高浓缩倍率循环冷却水处理技术

    高浓缩倍率循环冷却水处理技术的发展在我国起步较晚,70年代初,我国引进引进了循环冷却水处理技术,它的优越性和带来的经济效益很快被人们公认,并由化工行业迅速推广到石化、热电等其它行业。特别是80年代后期,高浓缩...

    c/vc++/MFC异常处理/结构化异常处理 浅析

    本篇文章将深入浅析C、C++中的异常处理机制以及MFC中的异常处理策略。 首先,我们来看C语言的异常处理。C语言本身并不直接支持异常处理,但可以通过返回错误码或者设置全局变量的方式进行错误处理。这种方式称为...

    浅析框架与代码规范.pdf

    ### 浅析框架与代码规范 #### 代码规范的重要性及实践方法 在现代软件开发过程中,代码规范扮演着至关重要的角色。它不仅有助于提高代码的可读性和可维护性,还能促进团队之间的协作效率,减少潜在的错误和缺陷。...

    浅析中央空调循环水水质处理技术.doc

    【中央空调循环水水质处理技术】 中央空调在现代生活中扮演着重要的角色,它为人们提供舒适的生活和工作环境,同时也被广泛应用于工业生产中的温度和湿度控制。然而,随着空调系统的普及,循环水系统的水质问题日益...

    城市垃圾的危害及处理技术浅析.doc

    城市垃圾的危害及处理技术浅析 城市垃圾对环境和人类健康的威胁不容忽视。垃圾问题已成为城市发展面临的重要挑战。本文将探讨城市垃圾的危害及其常见的处理技术。 1. 城市垃圾的危害 - 危及人体健康:垃圾中的...

    建筑工程地基处理技术浅析.doc

    建筑工程地基处理技术浅析.doc

    代码生成原理浅析PPT

    本文档是无垠式代码生成器研发阶段的理论探讨过程,对理解无垠式代码生成器源码和和平之翼代码生成器这两种动词算子式代码生成器是很重要的。

    Java程序设计中异常处理技术浅析.pdf

    Java程序设计中异常处理技术浅析

    浅析JAVA异常处理机制.pdf

    ### 浅析JAVA异常处理机制 #### 一、Java异常处理机制概述 异常处理是Java语言中的一个重要机制,它能够确保程序在遇到不可预料的情况时仍能维持稳定运行。异常处理主要包括三个方面:捕获异常、控制程序流程以及...

    浅析建筑垃圾的循环利用.docx

    "浅析建筑垃圾的循环利用" 建筑垃圾的循环利用是当前建筑行业面临的重要挑战之一。建筑垃圾的产生对环境和生态系统造成了严重的影响,包括占用土地、影响空气质量、对水域的污染、破坏城市软环境、安全隐患等。然而...

    Netty实现原理浅析.pdf

    这样做的好处是可以有效避免长时间运行的任务阻塞事件循环线程,提高系统的并发处理能力。 总的来说,Netty的设计哲学强调了灵活性和可扩展性,这使得它能够在广泛的网络应用程序中发挥出色的表现。无论是简单的...

    组织机构代码在社会活动中的功能浅析-浅析组织机构代码在社.doc

    组织机构代码在社会活动中的功能浅析-浅析组织机构代码在社.doc

    浅析基于Spark技术的网络安全大数据分析平台.pdf

    3. 数据处理:Spark Core负责数据计算,Spark SQL用于结构化查询,Spark Streaming用于实时流处理,MLlib则提供了机器学习算法,用于异常检测和威胁预测。 4. 安全分析:结合规则匹配、统计分析和机器学习等方法,...

    浅析KIS标准版结转销售成本业务的处理.pdf

    浅析KIS标准版结转销售成本业务的处理.pdf

    浅析C语言程序代码的优化.pdf

    - **循环优化**:尽量减少循环次数,如提前退出循环、减少循环内部操作、使用向量操作等。 - **分支预测**:理解和利用处理器的分支预测机制,避免过多的分支跳转导致性能下降。 4. **编程风格优化**: - **清晰...

Global site tag (gtag.js) - Google Analytics