Spark streaming 程序的运行过程是将DStream的操作转化成RDD的操作,Spark Streaming 和 Spark Core 的关系如下图(图片来自spark官网)
Spark Streaming 会按照程序设定的时间间隔不断动态生成Job来处理输入数据,这里的Job生成是指将Spark Streaming 的程序翻译成Spark内核的RDD操作,翻译的过程并不会触发Job的运行,Spark Streaming 会将翻译的处理逻辑封装在Job对象中,最后会将Job提交到集群上运行。这就是Spark Streaming 运行的基本过程。下面详细介绍Job动态生成和提交过程。
首先,当SparkStreaming的start方法调用后,整个Spark Streaming 程序开始运行,按照指定的时间间隔生成Job并提交给集群运行,在生成Job的工程中主要核心对象有
1.JobScheduler
2.JobGenerator
3.DStreamGraph
4.DStream
其中, JobScheduler 负责启动JobGenerator生成Job,并提交生成的Job到集群运行,这里的Job不是在spark core 中提到的job,它只是作业运行的代码模板,是逻辑级别的,可以类比java线程中的Runnable接口实现,不是真正运行的作业, 它封装了由DStream转化而来的RDD操作.JobGenerator负责定时调用DStreamingGraph的generateJob方法生成Job和清理Dstream的元数据, DStreamGraph持有构成DStream图的所有DStream对象,并调用DStream的generateJob方法生成具体Job对象.DStream生成最终的Job交给JobScheduler 调度执行。整体过程如下图所示:
下面结合源码分析每一步过程 (源码中黄色背景部分为核心逻辑代码,例如 : scheduler.start()) :
首先,StreamingContext起动时调用start方法
try{
validate()
// Start the streaming scheduler in a new thread, so that thread local properties
// like call sites and job groups can be reset without affecting those of the
// current thread.
ThreadUtils.runInNewThread("streaming-start"){
sparkContext.setCallSite(startSite.get)
sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,"false")
savedProperties.set(SerializationUtils.clone(
sparkContext.localProperties.get()).asInstanceOf[Properties])
scheduler.start()
}
state =StreamingContextState.ACTIVE
}catch{
caseNonFatal(e)=>
logError("Error starting the context, marking it as stopped", e)
scheduler.stop(false)
state =StreamingContextState.STOPPED
throw e
}
其中调用了scheduler的start方法,此处的scheduler 就是 org.apache.spark.streaming.scheduler.JobScheduler 对象,
StreamingContext持有org.apache.spark.streaming.scheduler.JobScheduler对象的引用。
下面看一下JobScheduler的start方法:
eventLoop =newEventLoop[JobSchedulerEvent]("JobScheduler"){
override protected def onReceive(event:JobSchedulerEvent):Unit= processEvent(event)
override protected def onError(e:Throwable):Unit= reportError("Error in job scheduler", e)
}
eventLoop.start()
// attach rate controllers of input streams to receive batch completion updates
for{
inputDStream <- ssc.graph.getInputStreams
rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)
listenerBus.start()
receiverTracker =newReceiverTracker(ssc)
inputInfoTracker =newInputInfoTracker(ssc)
executorAllocationManager =ExecutorAllocationManager.createIfEnabled(
ssc.sparkContext,
receiverTracker,
ssc.conf,
ssc.graph.batchDuration.milliseconds,
clock)
executorAllocationManager.foreach(ssc.addStreamingListener)
receiverTracker.start()
jobGenerator.start()
executorAllocationManager.foreach(_.start())
logInfo("Started JobScheduler")
可以看到JobScheduler调用了jobGenerator的start方法和eventLoop的start方法,eventLoop用来接收JobSchedulerEvent消息,并交给processEvent函数进行处理
代码如下:
private def processEvent(event:JobSchedulerEvent){
try{
event match {
caseJobStarted(job, startTime)=> handleJobStart(job, startTime)
caseJobCompleted(job, completedTime)=> handleJobCompletion(job, completedTime)
caseErrorReported(m, e)=> handleError(m, e)
}
}catch{
case e:Throwable=>
reportError("Error in job scheduler", e)
}
}
可以看到JobScheduler中的eventLoop只处理JobStarted,JobCompleted和ErrorReported 三类消息,这三类消息的处理不是Job动态生成的核心逻辑代码先略过,(注意:后面JobGenerator中也有个eventLoop不要和这里的eventLoop混淆。)
JobGenerator的start方法首先new了一个EventLoop对象eventLoop,并复写onReceive(),将收到的JobGeneratorEvent 消息交给 processEvent 方法处理.源码如下:
/** Start generation of jobs */ def start(): Unit = synchronized { if (eventLoop != null) return // generator has already been started // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock. // See SPARK-10125 checkpointWriter eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } } eventLoop.start() if (ssc.isCheckpointPresent) { restart() } else { startFirstTime() } }
JobGenerator创建了eventLoop对象之后调用该对象的start方法,启动监听进程,准备接收JobGeneratorEvent类型消息交给processEvent函数处理,然后调用了startFirstTime方法,该方法启动DStreamGraph和定时器,定时器启动后根据程序设定的时间间隔给eventLoop对象发送GenerateJobs消息,如下图:
eventLoop对象收到 GenerateJobs 消息交个processEvent方法处理,processEvent收到该消息,调用generateJobs方法处理,源码如下:
/** Generate jobs and perform checkpoint for the given `time`. */
private def generateJobs(time:Time){
// Checkpoint all RDDs marked for checkpointing to ensure their lineages are
// truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS,"true")
Try{
jobScheduler.receiverTracker.allocateBlocksToBatch(time)// allocate received blocks to batch
graph.generateJobs(time)// generate jobs using allocated block
} match {
caseSuccess(jobs)=>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
caseFailure(e)=>
jobScheduler.reportError("Error generating jobs for time "+ time, e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater =false))
}
JobGenerator中的generateJobs方法主要关注两行代码,首先调用graph的generateJobs方法,给方法返回Success(jobs) 或者 Failure(e),其中的jobs就是该方法返回的Job对象集合,如果Job创建成功,再调用JobScheduler的submitJobSet方法将job提交给集群执行。
首先分析Job对象的产生,DStreamGraph 的start方法源码:
def generateJobs(time:Time):Seq[Job]={
logDebug("Generating jobs for time "+ time)
val jobs =this.synchronized{
outputStreams.flatMap { outputStream =>
val jobOption = outputStream.generateJob(time)
jobOption.foreach(_.setCallSite(outputStream.creationSite))
jobOption
}
}
logDebug("Generated "+ jobs.length +" jobs for time "+ time)
jobs
}
DStreamGraph 的start方法源码调用了outputStream对象的generateJob方法,ForeachDStream重写了该方法:
ForeachDStream的generateJob 将用户编写的DStream处理函数封装在jobFunc中,并将其传入Job对象,至此Job的生成。
接下来分析Job提交过程,JobScheduler负责Job的提交,核心代码在submitJobSet方法中:
def submitJobSet(jobSet:JobSet){
if(jobSet.jobs.isEmpty){
logInfo("No jobs added for time "+ jobSet.time)
}else{
listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(newJobHandler(job)))
logInfo("Added jobs for time "+ jobSet.time)
}
}
其中jobExecutor对象是一个线程池,JobHandler实现了Runnable接口,在JobHandler 的run方法中会调用传入的job对象的run方法。
疑问:Job的run方法执行是如何触发RDD的Action操作从而出发job的真正运行的呢?我们下次再具体分析,请随时关注博客更新!
相关推荐
7.SparkStreaming(上)--SparkStreaming原理介绍.pdf 7.SparkStreaming(下)--SparkStreaming实战.pdf 8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf 8.SparkMLlib(下)--SparkMLlib实战.pdf 9.SparkGraphX...
1.Spark及其生态圈简介.pdf2.Spark编译与部署(上)--基础环境搭建.pdf2.Spark编译与部署(下)--Spark编译安装.pdf2.Spark编译与部署(中)--Hadoop编译安装.pdf3.Spark编程模型(上)--概念及SparkShell实战.pdf3....
《Spark技术内幕深入解析Spark内核架构设计与实现原理》这本书深入探讨了Apache Spark这一分布式计算框架的核心架构和实现机制,旨在帮助读者全面理解Spark的工作原理,并能够有效地利用其进行大数据处理。...
《Spark技术内幕深入解析Spark内核架构设计与实现原理》这本书深入探讨了Apache Spark这一分布式计算框架的核心架构和实现机制,对于理解Spark的工作原理及其在大数据处理中的应用具有极高的价值。以下是对其中主要...
《Spark技术内幕:深入解析Spark内核架构设计与实现原理》是一本专注于深入探究Apache Spark核心技术的书籍。这本书旨在帮助读者理解Spark的内部工作机制,包括其架构设计、分布式计算模型以及机器学习库MLlib的实现...
《Spark技术内幕:深入解析Spark内核架构设计与实现原理》是一本专注于Spark技术深度剖析的书籍,旨在帮助读者理解Spark的核心架构、设计理念以及其实现机制。这本书高清完整,包含了完整的书签,方便读者查阅和学习...
《Apress.Pro.Spark.Streaming.The.Zen.of.Real-Time.Analytics.Using.Apache.Spark》这本书专注于探讨如何利用Apache Spark进行实时数据分析,是Spark流处理技术的深入解析。Apache Spark作为一个快速、通用且可...
7.SparkStreaming(上)--SparkStreaming原理介绍.pdf 7.SparkStreaming(下)--SparkStreaming实战.pdf 8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf 8.SparkMLlib(下)--SparkMLlib实战.pdf 9.SparkGraphX...
《Spark大数据处理:技术、应用与性能优化》是大数据技术丛书中的一本,全面解析了Apache Spark这一核心的大数据处理框架。Spark以其高效、灵活和易用性在大数据领域占据了重要地位,它不仅支持批处理,还支持实时流...
2.理解Spark Streaming的工作原理。 3.学会使用Spark Streaming处理流式数据。 二、实验环境 Windows 10 VMware Workstation Pro虚拟机 Hadoop环境 Jdk1.8 三、实验内容 (一)Spark Streaming处理套接字流 1:编写...
7.SparkStreaming(上)--SparkStreaming原理介绍.pdf 7.SparkStreaming(下)--SparkStreaming实战.pdf 8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf 8.SparkMLlib(下)--SparkMLlib实战.pdf 9.SparkGraphX...
Apache Spark是一个开源的分布式大数据处理框架,它能够处理大规模数据...6. **多语言支持**:Spark支持Scala、Java、Python和R等多种编程语言。 7. **多数据源支持**:Spark可以读取HDFS、S3、 Cassandra、HBase等多种
7.SparkStreaming(上)--SparkStreaming原理介绍.pdf 7.SparkStreaming(下)--SparkStreaming实战.pdf 8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf 8.SparkMLlib(下)--SparkMLlib实战.pdf 9.SparkGraphX...
7.SparkStreaming(上)--SparkStreaming原理介绍.pdf 7.SparkStreaming(下)--SparkStreaming实战.pdf 8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf 8.SparkMLlib(下)--SparkMLlib实战.pdf 9.SparkGraphX...
例如,图2-4展示了Spark Streaming与Storm在吞吐量上的比较结果,而图6-3、图6-4和图6-5则分别展示了Storm、Spark Streaming和Samza的架构图。通过这些对比,技术人员可以更好地了解各自的技术优势和适用场景。 7. ...
7.SparkStreaming(上)--SparkStreaming原理介绍.pdf 7.SparkStreaming(下)--SparkStreaming实战.pdf 8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf 8.SparkMLlib(下)--SparkMLlib实战.pdf 9.SparkGraphX...
7.SparkStreaming(上)--SparkStreaming原理介绍.pdf 7.SparkStreaming(下)--SparkStreaming实战.pdf 8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf 8.SparkMLlib(下)--SparkMLlib实战.pdf 9.SparkGraphX...
7.SparkStreaming(上)--SparkStreaming原理介绍.pdf 7.SparkStreaming(下)--SparkStreaming实战.pdf 8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf 8.SparkMLlib(下)--SparkMLlib实战.pdf 9.SparkGraphX...
7.SparkStreaming(上)--SparkStreaming原理介绍.pdf 7.SparkStreaming(下)--SparkStreaming实战.pdf 8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf 8.SparkMLlib(下)--SparkMLlib实战.pdf 9.SparkGraphX...