- 浏览: 74301 次
一直很好奇Sparkstreaming的ssc.start是怎么做到不停的一直定时循环处理数据的, 看了一下源码, 大致明白了整个过程, 记录分享一下。
入口为StreamingContext的start方法:
在构造StreamingContext的时候 state就初始化为INITIALIZED , 并且定义了一个JobScheduler scheduler
代码里面很明白, 在初始化的时候, 执行了JobScheduler的start方法。
那么我们看一下JobScheduler的start方法里面都做了什么:
1.启动定义了一个eventLoop
2.启动定义了一个ReceiverTracker
3.启动定义了一个jobGenerator
eventloop主要来处理Job相关的event:
JobStarted(job, startTime) => handleJobStart(job, startTime)
JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)
ErrorReported(m, e) => handleError(m, e)
然后ReceiverTracker 主要用来启动receiver和接收数据的。 回头有空了再详细了解一下receiver是怎么工作的, 到时候再写一篇文章
JobScheduler里面最重要的就是JobGenerator的start方法,
这里面也做了两个主要的事:
1.定义了一个新的eventLoop 用来处理以下情况:
GenerateJobs(time) => generateJobs(time)
ClearMetadata(time) => clearMetadata(time)
DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater)
ClearCheckpointData(time) => clearCheckpointData(time)
2.startFirstTime 方法就是不停的做loop, 每次到我们设定的Duration的时候 再submitJob
先看一下#1:
其实在构造JobGenerator的时候, 我们已经构造了一个timer, 这个timer就是用来call back GenerateJobs(time)的: eventLoop.post
在#1定义完eventloop后, 正常情况下 第一次就会执行startFirstTime ,
在这么方法里面就会去执行 timer.start动作, start的动作里面主要就是去启动一个thread:
这个thread定义就是:
他的run里面就跑了loop方法, 一看方法名字, 我们就知道StreamingContext能循环submitJob 的功能估计就在这个方法里面了, 我们看一下代码, 他干了什么:
代码很简单, 主要功能就是clock.waitTillTime(nextTime) 确认一个Duration到了, 然后执行callback(nextTime) 方法, 再设定nexttime到下一个Duration。
关键就是这个callback做的是什么。 从上面的代码我们可以看出来 具体执行的是在timer里面做的, 在我们构造timer的时候我们已经构造了callback这个方法, 具体就是在JobGenerator里面的:
这个callback其实就是eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
那么GenerateJobs()做的又是什么呢? 网上翻, 可以看到我们有一个eventloop里面定义了GenerateJobs(time)去执行generateJobs()方法, 这个方法里面又做了什么呢? 还是看代码:
可以看到他主要就是做submitJobSet这个动作, 那么这个方法里面又干了些什么呢?
好啦, 越来越明白了, 这里面就是去执行Job, 在JobHandler 里面还有job.run
可能有点绕, 但是也基本明白了StreamingContext是怎么工作的, 从初始化, 循环, 再到submitJob, 整个过程大概就是这个样子。
接下来马上要花一些时间玩一下SparkSQL, 到时候看看能不能写几篇他的文章出来, 最后再啃ML这个硬骨头 打算再花一个多月时间弄懂SparkSQL和ML后, 好好玩一下hadoop的生态系统, 目前只对MR大致会用, 不知道内部是怎么运行的, 有机会好好看一下
入口为StreamingContext的start方法:
在构造StreamingContext的时候 state就初始化为INITIALIZED , 并且定义了一个JobScheduler scheduler
代码里面很明白, 在初始化的时候, 执行了JobScheduler的start方法。
def start(): Unit = synchronized { state match { case INITIALIZED => startSite.set(DStream.getCreationSite()) StreamingContext.ACTIVATION_LOCK.synchronized { StreamingContext.assertNoOtherContextIsActive() try { validate() // Start the streaming scheduler in a new thread, so that thread local properties // like call sites and job groups can be reset without affecting those of the // current thread. ThreadUtils.runInNewThread("streaming-start") { sparkContext.setCallSite(startSite.get) sparkContext.clearJobGroup() sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") scheduler.start() } state = StreamingContextState.ACTIVE } catch { case NonFatal(e) => logError("Error starting the context, marking it as stopped", e) scheduler.stop(false) state = StreamingContextState.STOPPED throw e } StreamingContext.setActiveContext(this) } shutdownHookRef = ShutdownHookManager.addShutdownHook( StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown) // Registering Streaming Metrics at the start of the StreamingContext assert(env.metricsSystem != null) env.metricsSystem.registerSource(streamingSource) uiTab.foreach(_.attach()) logInfo("StreamingContext started") case ACTIVE => logWarning("StreamingContext has already been started") case STOPPED => throw new IllegalStateException("StreamingContext has already been stopped") } }
那么我们看一下JobScheduler的start方法里面都做了什么:
1.启动定义了一个eventLoop
2.启动定义了一个ReceiverTracker
3.启动定义了一个jobGenerator
def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been started logDebug("Starting JobScheduler") eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) } eventLoop.start() // attach rate controllers of input streams to receive batch completion updates for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController) listenerBus.start(ssc.sparkContext) receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) receiverTracker.start() jobGenerator.start() logInfo("Started JobScheduler") }
eventloop主要来处理Job相关的event:
JobStarted(job, startTime) => handleJobStart(job, startTime)
JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)
ErrorReported(m, e) => handleError(m, e)
然后ReceiverTracker 主要用来启动receiver和接收数据的。 回头有空了再详细了解一下receiver是怎么工作的, 到时候再写一篇文章
JobScheduler里面最重要的就是JobGenerator的start方法,
def start(): Unit = synchronized { if (eventLoop != null) return // generator has already been started // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock. // See SPARK-10125 checkpointWriter eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } } eventLoop.start() if (ssc.isCheckpointPresent) { restart() } else { startFirstTime() } }
这里面也做了两个主要的事:
1.定义了一个新的eventLoop 用来处理以下情况:
GenerateJobs(time) => generateJobs(time)
ClearMetadata(time) => clearMetadata(time)
DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater)
ClearCheckpointData(time) => clearCheckpointData(time)
2.startFirstTime 方法就是不停的做loop, 每次到我们设定的Duration的时候 再submitJob
先看一下#1:
其实在构造JobGenerator的时候, 我们已经构造了一个timer, 这个timer就是用来call back GenerateJobs(time)的: eventLoop.post
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
在#1定义完eventloop后, 正常情况下 第一次就会执行startFirstTime ,
private def startFirstTime() { val startTime = new Time(timer.getStartTime()) graph.start(startTime - graph.batchDuration) timer.start(startTime.milliseconds) logInfo("Started JobGenerator at " + startTime) }
在这么方法里面就会去执行 timer.start动作, start的动作里面主要就是去启动一个thread:
def start(startTime: Long): Long = synchronized { nextTime = startTime thread.start() logInfo("Started timer for " + name + " at time " + nextTime) nextTime }
这个thread定义就是:
private val thread = new Thread("RecurringTimer - " + name) { setDaemon(true) override def run() { loop } }
他的run里面就跑了loop方法, 一看方法名字, 我们就知道StreamingContext能循环submitJob 的功能估计就在这个方法里面了, 我们看一下代码, 他干了什么:
private def loop() { try { while (!stopped) { triggerActionForNextInterval() } triggerActionForNextInterval() } catch { case e: InterruptedException => } } } private def triggerActionForNextInterval(): Unit = { clock.waitTillTime(nextTime) callback(nextTime) prevTime = nextTime nextTime += period logDebug("Callback for " + name + " called at time " + prevTime) }
代码很简单, 主要功能就是clock.waitTillTime(nextTime) 确认一个Duration到了, 然后执行callback(nextTime) 方法, 再设定nexttime到下一个Duration。
关键就是这个callback做的是什么。 从上面的代码我们可以看出来 具体执行的是在timer里面做的, 在我们构造timer的时候我们已经构造了callback这个方法, 具体就是在JobGenerator里面的:
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
这个callback其实就是eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
那么GenerateJobs()做的又是什么呢? 网上翻, 可以看到我们有一个eventloop里面定义了GenerateJobs(time)去执行generateJobs()方法, 这个方法里面又做了什么呢? 还是看代码:
/** Generate jobs and perform checkpoint for the given `time`. */ private def generateJobs(time: Time) { // Set the SparkEnv in this thread, so that job generation code can access the environment // Example: BlockRDDs are created in this thread, and it needs to access BlockManager // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed. SparkEnv.set(ssc.env) Try { jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) }
可以看到他主要就是做submitJobSet这个动作, 那么这个方法里面又干了些什么呢?
def submitJobSet(jobSet: JobSet) { if (jobSet.jobs.isEmpty) { logInfo("No jobs added for time " + jobSet.time) } else { listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) jobSets.put(jobSet.time, jobSet) jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) logInfo("Added jobs for time " + jobSet.time) } }
好啦, 越来越明白了, 这里面就是去执行Job, 在JobHandler 里面还有job.run
可能有点绕, 但是也基本明白了StreamingContext是怎么工作的, 从初始化, 循环, 再到submitJob, 整个过程大概就是这个样子。
接下来马上要花一些时间玩一下SparkSQL, 到时候看看能不能写几篇他的文章出来, 最后再啃ML这个硬骨头 打算再花一个多月时间弄懂SparkSQL和ML后, 好好玩一下hadoop的生态系统, 目前只对MR大致会用, 不知道内部是怎么运行的, 有机会好好看一下
发表评论
-
kafka + flume + hdfs + zookeeper + spark 测试环境搭建
2017-07-20 11:28 1114最近由于项目需要, 搭建了一个类似线上环境的处理流数据的环境 ... -
源码跟踪executor如何写数据到blockmanager, 以及如何从blockmanager读数据
2016-08-10 19:41 1438之前看了Job怎么submit 以 ... -
Spark中Blockmanager相关代码解析
2016-08-04 19:47 1852前一段时间看了如何划分stage以及如何提交Job, 最后把结 ... -
Spark在submitStage后如何通过clustermanager调度执行task到Driver接收计算结果的代码解析
2016-08-01 14:08 1486前文: http://humingminghz.iteye.c ... -
Spark中saveAsTextFile至stage划分和job提交的源代码分析
2016-07-29 14:20 3381之前看了Spark Streaming和Spark SQL, ... -
SparkSQL DF.agg 执行过程解析
2016-07-19 10:21 4133在上一篇文章前, 我一直没看懂为什么下面的代码就能得到max或 ... -
SparkSQL SQL语句解析过程源代码浅析
2016-07-15 19:34 6670前两天一直在忙本职工 ... -
SparkSQL SQL语句解析过程浅析
2016-07-15 19:06 0前两天一直在忙本职工 ... -
SparkStreaming从启动Receiver到收取数据生成RDD的代码浅析
2016-07-08 17:54 2242前面一片文章介绍了SocketTextStream 是如何从b ... -
Sparkstreaming是如何获取数据组成Dstream的源码浅析
2016-07-08 11:23 1486前面一篇文章介绍了SparkStreaming是如何不停的循环 ... -
SparkSQL 使用SQLContext读取csv文件 分析数据 (含部分数据)
2016-07-06 11:24 10171前两天开始研究SparkSQL, 其主要分为HiveConte ... -
SparkStreaming 对Window的reduce的方法解析
2016-06-30 11:57 4738在sparkstreaming中对窗口 ... -
Sparkstreaming reduceByKeyAndWindow(_+_, _-_, Duration, Duration) 的源码/原理解析
2016-06-29 19:50 8807最近在玩spark streaming, 感觉到了他的强大。 ... -
关于Eclipse开发环境下 Spark+Kafka 获取topic的时候连接出错
2016-06-28 17:20 7416林林总总玩了Spark快一个月了, 打算试一下kafka的消息 ...
相关推荐
算法文档通常会介绍算法的具体实现步骤,但在本例中,由于是“无代码浅析”,文档可能不会提供完整的编程代码,而是着重于算法的理论基础、处理逻辑、应用场景以及算法复杂度等。在IT专业领域,理解算法的原理对于...
循环结构通常是在顺序结构基础上建立的,顺序结构需要编写若干次相同的代码以完成重复任务,而循环结构则可以减少这种重复,通过循环控制语句重复执行相关代码块。 循环结构程序的设计方法主要包括两种类型:当型...
本文将深入浅析C语言中的循环语句,包括其类型、工作原理和实际应用。 一、循环语句的种类 1. `for` 循环:C语言中的`for`循环通常用于已知循环次数的情况。它由三个部分组成:初始化、条件检查和迭代操作。例如:...
《组织机构代码的沉淀数据处理浅析》 组织机构代码,作为我国国民经济和社会信息化工作的重要组成部分,对于确保数据信息质量,提升全社会的应用水平具有关键意义。近年来,全国范围内的组织机构代码赋码颁证工作已...
while循环语句与if语句的不同就在于if语句只有在条件为真的情况下才执行后续的代码段一次,而while循环语句只要其条件为真时,就会重复执行代码段。 do...while循环语句与while循环语句类似,但是它首先执行循环体...
高浓缩倍率循环冷却水处理技术的发展在我国起步较晚,70年代初,我国引进引进了循环冷却水处理技术,它的优越性和带来的经济效益很快被人们公认,并由化工行业迅速推广到石化、热电等其它行业。特别是80年代后期,高浓缩...
本篇文章将深入浅析C、C++中的异常处理机制以及MFC中的异常处理策略。 首先,我们来看C语言的异常处理。C语言本身并不直接支持异常处理,但可以通过返回错误码或者设置全局变量的方式进行错误处理。这种方式称为...
### 浅析框架与代码规范 #### 代码规范的重要性及实践方法 在现代软件开发过程中,代码规范扮演着至关重要的角色。它不仅有助于提高代码的可读性和可维护性,还能促进团队之间的协作效率,减少潜在的错误和缺陷。...
建筑工程地基处理技术浅析.doc
本文档是无垠式代码生成器研发阶段的理论探讨过程,对理解无垠式代码生成器源码和和平之翼代码生成器这两种动词算子式代码生成器是很重要的。
Java程序设计中异常处理技术浅析
### 浅析JAVA异常处理机制 #### 一、Java异常处理机制概述 异常处理是Java语言中的一个重要机制,它能够确保程序在遇到不可预料的情况时仍能维持稳定运行。异常处理主要包括三个方面:捕获异常、控制程序流程以及...
"浅析建筑垃圾的循环利用" 建筑垃圾的循环利用是当前建筑行业面临的重要挑战之一。建筑垃圾的产生对环境和生态系统造成了严重的影响,包括占用土地、影响空气质量、对水域的污染、破坏城市软环境、安全隐患等。然而...
这样做的好处是可以有效避免长时间运行的任务阻塞事件循环线程,提高系统的并发处理能力。 总的来说,Netty的设计哲学强调了灵活性和可扩展性,这使得它能够在广泛的网络应用程序中发挥出色的表现。无论是简单的...
组织机构代码在社会活动中的功能浅析-浅析组织机构代码在社.doc
3. 数据处理:Spark Core负责数据计算,Spark SQL用于结构化查询,Spark Streaming用于实时流处理,MLlib则提供了机器学习算法,用于异常检测和威胁预测。 4. 安全分析:结合规则匹配、统计分析和机器学习等方法,...
浅析KIS标准版结转销售成本业务的处理.pdf
- **循环优化**:尽量减少循环次数,如提前退出循环、减少循环内部操作、使用向量操作等。 - **分支预测**:理解和利用处理器的分支预测机制,避免过多的分支跳转导致性能下降。 4. **编程风格优化**: - **清晰...
屯兰矿选煤厂在进行重介洗选改造后,为了...分析了煤泥水厂内回收、洗水闭路循环的影响因素,论述了实现煤泥水处理循环利用所采取的措施,通过改造取得了良好效果,实现了煤泥厂内回收、洗水闭路循环,煤泥水的再生利用。
机械工程自动化技术的问题与处理对策浅析研究.pdf