1.概述
Action算子是触发Spark计算的入口,属于Spark核心逻辑。本文梳理Action触发、计算、返回的整个数据流
本文涉及代码主体位于org.apache.spark.scheduler/rdd/executor几个模块。核心类:RDD、SparkContext、DAGScheduler、TaskSchedulerImpl、CoarseGrainedSchedulerBackEnd、CoarseGrainedExecutorBackEnd、TaskSetManager、Executor、ResultTask、ShuffleMapTask
BTW:吐槽一下Iteye的编辑器实在太不给力了,插入图片好麻烦,从mac版word复制还带格式乱码。。
2.整体调用逻辑图
实线框内为方法名,连接线上为传递的消息实体
红线:提交计算主体调用流 蓝线:返回数据主体调用流 浅蓝线:事件总线消息分发 绿线:Stage提交回调回路
黄底:重要逻辑实现调用栈 绿底:其他模块相关调用 空心箭头:异步调用/远程通信
3.源码详解
Step 1-RDD
// Utils.getIteratorSize _是传入的计算函数,这里是计算每个分区的size,runJob返回Array[Long]。不同Action就是传入不同的func,因此调度逻辑(DagScheduler/TaskScheduler不需要关心具体操作内容)。RDD.foreach()就是直接传入自定义Action
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
Step 2-SparkContext
// 对于每个partition执行getIteratorSize。RDD的物理内容上就是一个Seq,Spark中通过迭代子的形式传递RDD内容的引用,逻辑上可以把Iterator[T]就看做RDD本身。func则传递Action操作信息。最终计算落地到RDD.compute(),后文会说到
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.length)
}
// cleanedFunc是一个闭包的函数,用到asm来解析class。大致是去掉对闭包无影响的父类、子类、transient属性等,确认闭包可序列化。后续文章再深入分析这个方法。最终getIteratorSize函数传到runJob里
def runJob[T, U: ClassTag]( rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int]): Array[U] = {
val cleanedFunc = clean(func)
runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
}
def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = {
val callSite = getCallSite// callSite是通过StackTrace获取调用端的代码行号,用来记日志
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {// 日志,建议打开
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
// 交给DasScheduler执行,action信息通过cleanedFunc参数传递
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())// 更新进度,也就是控制台看到的进度日志
rdd.doCheckpoint()// 记录checkpoint
}
Step 3-DAGScheduler
// 这步就是异步调用submitJob
def runJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
waiter.awaitResult() match {// 阻塞等待计算结果
case JobSucceeded =>
logInfo("Job %d finished: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
case JobFailed(exception: Exception) =>
logInfo("Job %d failed: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
throw exception
}
}
def submitJob[T, U](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): JobWaiter[U] = {
// 校验所有partition都存在
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +"Total number of partitions: " + maxPartitions)
}
val jobId = nextJobId.getAndIncrement()// 生成一个jobId
if (partitions.size == 0) {// 没有partition,啥也不用做
return new JobWaiter[U](this, jobId, 0, resultHandler)
}
assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
// JobWaiter是类似Future的异步任务封装
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
// 封装一个提交消息,JobSubmitted实现了DAGSchedulerEvent,EventLoop是一个消息队列,实现类是DAGSchedulerEventProcessLoop
eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties)))
waiter
}
DAGSchedulerEventLoop
private[spark] abstract class EventLoop[E](name: String) extends Logging {
private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()
private val eventThread = new Thread(name) {
override def run(): Unit = {
while (!stopped.get) {
val event = eventQueue.take()
onReceive(event)
……
}
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
……//其他消息处理
// 回到dagScheduler,这里消息队列只是做了一次平峰。listener是JobWaiter回调对象
private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) {
var finalStage: ResultStage = null
try {
// 根据dependency关系划分Stage。Stage是可以并发执行的Task集合,Stage边界为RDD宽依赖
// 实现是一个广度优先遍历,具体代码参见《内幕》4.2.3节,此处略去
finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
// 生成一个ActiveJob准备执行,这里func信息封装到了Stage对象里,整个DAG的func信息由Stage.parentStage()关联到RDD起来
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()// 清本地缓存
logInfo("Got job %s (%s) with %d output partitions".format(job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
// 注册Job
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
// 提交一个SparkListenerJobStart事件(SparkListenerEvent),listenerBus实现是AsynchronousListenerBus,也是一个异步消息队列
listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage) // 提交Stage
submitWaitingStages() // Stage完成后尝试提交等待的Stage,submitWaitingStage其实是一个回调方法,会在很多地方调用,形成回调回路
}
// 递归提交stage
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { // 运行中/等待中/已失败的Stage不用重复触发
val missing = getMissingParentStages(stage).sortBy(_.id)// 如果存在未完成的Parent Stage,就先提交ParentStage,本Stage等待
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)// 如果Job不存在也就不用计算Stage了
}
}
Step 4-DAGScheduler
/** 提交当前Stage未计算的Task,此时Parent Stage的结果已经available*/
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
stage.pendingPartitions.clear() // 注册PendingTask
/* 获取未计算的分区,分两种实现:1.ShuffleMapStage:通过查找outputLocs(id).isEmpty, outputLocs存放MapStatus对象(ShuffleMapTask计算结果的元数据)
2.ResultStage:通过!job.finished(partitionId)判断 */
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
// 仅当当前stage没有部分提交的时候,重置累加器(如果部分提交了,重置会覆盖之前的结果
if (stage.internalAccumulators.isEmpty || stage.numPartitions == partitionsToCompute.size) {
stage.resetInternalAccumulators()
}
// 注册状态
val properties = jobIdToActiveJob(jobId).properties
runningStages += stage
// 注册Stage各分区重试(Attempt)状态,OutputCommitCoordinator在Driver和Executor端都有,记录Stage运行状态及HDFS相关权限
// SparkListenerStageSubmitted should be posted before testing whether tasks are serializable. If tasks are not serializable, a SparkListenerStageCompleted event will be posted, which should always come after a corresponding SparkListenerStageSubmitted event.
stage match {
case s: ShuffleMapStage =>
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
case s: ResultStage =>
outputCommitCoordinator.stageStart(
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
}
// 为每个mission partition分配候选的Executor地址,这里包含本地优先策略(locality),TaskLocation包含Executor的host和executorId。这个Map的value null表示没有优选的executor(不是说不能执行)
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage => partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
val job = s.activeJob.get
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
case NonFatal(e) =>
// 记录一次(已失败的)计算Attempt。StageInfo是一个封装Stage状态的pojo
stage.makeNewStageAttempt(partitionsToCompute.size)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))
runningStages -= stage
return
}
// 记录一次计算Attempt
stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
// 异步注册StageSubmit消息监听,落地到SparkListener这个trait的实现类
// ListenerBus是异步消息总线,用于Driver端整体的消息分发。消息被所有SparkListener实现类订阅(此处略去一些不重要的实现):
// SparkFirehoseListener:用户自定义事件监听,这个类只暴露了一个onEvent抽象方法,可统一处理各类型事件
// JavaSparkListener:用户自定义监听
// SQLListener:Spark-SQL使用
// ExecutorAllocationListener:根据负载动态挂载/卸载Executor,注意Executor是Slave节点上的独立进程
// JobProcessListener:Job进度监听,用于维护各Stage运行状态
// StorgeListener:存储状态监听,包括在WebUI呈现
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
// 向所有Executor广播Task内容(RDD序列化后的二进制码)。每个Task都获取一份独立的(序列化的)RDD副本,这使Task之间有更好的隔离性。因此每个Task修改闭包中的引用都是独立的(所以也没有并发问题)This is necessary in Hadoop where the JobConf/Configuration object is not thread-safe.
var taskBinary: Broadcast[Array[Byte]] = null
try {
// ShuffleMapTask, 广播rdd与shuffleDep依赖关系
// ResultTask, 广播rdd与func(func就是Action操作,包括foreach)
val taskBinaryBytes: Array[Byte] = stage match {
case stage: ShuffleMapStage =>
// closureSerializer默认为JavaSerializer,就是java.io.Serializer默认序列化方式。序列化异常的时候看到的一长串writeExternal堆栈就是这个类调用
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}
taskBinary = sc.broadcast(taskBinaryBytes)// 广播
} catch {
// 序列化失败报的错,如果任务无法序列化则直接放弃
case e: NotSerializableException =>
abortStage(stage, "Task not serializable: " + e.toString, Some(e))
runningStages -= stage
return
case NonFatal(e) =>
abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}", Some(e))
runningStages -= stage
return
}
// 创建Task实例。Task信息传递到Executor有多个渠道,DAG信息通过广播,依赖Jar通过文件下载,Action本身(字节码)通过Akka
val tasks: Seq[Task[_]] = try {
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
// 附带loc和partition信息
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, stage.internalAccumulators)
}
case stage: ResultStage =>
val job = stage.activeJob.get
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, id, stage.internalAccumulators)
}
}
} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))
runningStages -= stage
return
}
// 提交Task
if (tasks.size > 0) {
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
stage.pendingPartitions ++= tasks.map(_.partitionId)
logDebug("New pending partitions: " + stage.pendingPartitions)
// 封装Task到TaskSet并提交,TaskSet基本就是Task的集合。控制移交到TaskScheduler
taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
// 没有待执行Task,标志Stage结束
markStageAsFinished(stage, None)
val debugString = stage match {
case stage: ShuffleMapStage =>
s"Stage ${stage} is actually done; " + s"(available: ${stage.isAvailable}," + s"available outputs: ${stage.numAvailableOutputs}," + s"partitions: ${stage.numPartitions})"
case stage : ResultStage =>
s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
}
logDebug(debugString)
}
}
private def getPreferredLocsInternal(rdd: RDD[_],partition: Int, visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
// If the partition has already been visited, no need to re-visit.
// This avoids exponential path exploration. SPARK-695
if (!visited.add((rdd, partition))) {
// Nil has already been returned for previously visited partitions.
return Nil
}
// 先拿缓存
val cached = getCacheLocs(rdd)(partition)
if (cached.nonEmpty) {
return cached
}
// 如果有预设的preference,则根据预设返回。这是递归的终点,所有preference都是从input-rdd沿窄依赖传递的
val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
if (rddPrefs.nonEmpty) {
return rddPrefs.map(TaskLocation(_))
}
// 递归寻找各个窄依赖
rdd.dependencies.foreach {
case n: NarrowDependency[_] =>
for (inPart <- n.getParents(partition)) {
val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
if (locs != Nil) {
return locs
}
}
case _ =>
}
Nil
}
以上完成DagScheduler提交逻辑,后续逻辑由TaskScheduler继续实现,未完待续
相关推荐
1. **DataFrame/Dataset API强化**:Spark 2.2.0进一步提升了DataFrame和Dataset API的性能和易用性。DataFrame API提供了SQL-like查询接口,而Dataset API则支持类型安全和强类型编程。在这一版本中,API更加成熟,...
spark源码:spark-master.zip。方便不能登录GitHub的小伙伴下载。如果实在需要留言,可以私下给。
本压缩包“spark--bin-hadoop3-without-hive.tgz”提供了Spark二进制版本,针对Hadoop 3.1.3进行了编译和打包,这意味着它已经与Hadoop 3.x兼容,但不包含Hive组件。在CentOS 8操作系统上,这个版本的Spark已经被...
1. 解压压缩包:使用tar命令解压文件,例如`tar -xvf spark-3.1.3-bin-without-hadoop.tgz`。 2. 配置环境变量:在`~/.bashrc`或`~/.bash_profile`中设置SPARK_HOME,并将Spark的bin目录添加到PATH。 3. 如果在...
1. 下载:首先,你需要下载Spark 2.1.0与Hadoop 2.7兼容的二进制包,即`spark-2.1.0-bin-hadoop2.7.tgz`。 2. 解压:在Linux服务器上,使用`tar -zxvf spark-2.1.0-bin-hadoop2.7.tgz`命令解压文件。 3. 配置环境...
此外,可以通过`spark-submit`脚本提交应用程序到Spark集群执行,或直接在Spark Shell中交互式探索数据。 总结来说,Spark 2.4.7是大数据处理领域的重要工具,它的高性能、易用性和丰富的功能使其在数据科学和工程...
开发者可以根据需求选择合适的语言编写应用程序,然后使用`spark-submit`脚本来提交任务到集群。 **6. 性能调优** Spark性能优化主要包括内存管理、任务调度和数据本地性等方面。可以通过调整`spark.executor....
spark-hive_2.11-2.3.0 spark-hive-thriftserver_2.11-2.3.0.jar log4j-2.15.0.jar slf4j-api-1.7.7.jar slf4j-log4j12-1.7.25.jar curator-client-2.4.0.jar curator-framework-2.4.0.jar curator-recipes-2.4.0....
北风网spark课程源码spark-study-scala.rar,
Spark-3.1.2.tgz和Spark-3.1.2-bin-hadoop2.7.tgz是两个不同格式的Spark发行版,分别以tar.gz和rar压缩格式提供。 1. Spark核心概念: - RDD(弹性分布式数据集):Spark的基础数据结构,是不可变、分区的数据集合...
在这个特定的压缩包"spark-3.1.3-bin-hadoop3.2.tgz"中,我们得到了Spark的3.1.3版本,它已经预编译为与Hadoop 3.2兼容。这个版本的Spark不仅提供了源码,还包含了预编译的二进制文件,使得在Linux环境下快速部署和...
在Ubuntu里安装spark,spark-2.1.0-bin-without-hadoop该版本直接下载到本地后解压即可使用。 Apache Spark 是一种用于大数据工作负载的分布式开源处理系统。它使用内存中缓存和优化的查询执行方式,可针对任何规模...
对于应用程序开发,可以使用Scala、Java、Python或R编写代码,然后通过`spark-submit`脚本提交作业到集群。 6. **性能优化**: Spark提供了一系列性能优化手段,如Tungsten内存管理、Code Generation、Shuffle优化等...
在本场景中,我们讨论的是Spark的3.0.0版本,与Hadoop3.2相结合的二进制发行版——"spark-3.0.0-bin-hadoop3.2"。这个压缩包是为了在Windows操作系统下运行Spark而设计的,因此标签明确指出它是适用于Windows平台的...
该文件为2.4.7版本的spark包(spark-2.4.7-bin-hadoop2.7.tar.gz)
《Spark编程核心组件:spark-assembly-1.5.2-hadoop2.6.0.jar详解》 在大数据处理领域,Spark以其高效、易用和灵活性脱颖而出,成为了许多开发者的首选框架。Spark-assembly-1.5.2-hadoop2.6.0.jar是Spark中的一个...
这个名为"spark-3.2.1-bin-hadoop2.7.tgz"的压缩包是Spark的一个特定版本,即3.2.1,与Hadoop 2.7版本兼容。在Linux环境下,这样的打包方式方便用户下载、安装和运行Spark。 Spark的核心设计理念是快速数据处理,...
为了使用"spark-2.4.7-bin-without-hadoop",你需要首先下载并解压提供的spark-2.4.7-bin-without-hadoop.tgz文件。解压后,你可以找到包含Spark所有组件的目录结构,包括Spark的可执行文件、配置文件以及相关的库...
这个压缩包"spark-3.2.0-bin-hadoop3.2.tgz"包含了Spark 3.2.0版本的二进制文件,以及针对Hadoop 3.2的兼容构建。 Spark的核心组件包括:Spark Core、Spark SQL、Spark Streaming、MLlib(机器学习库)和GraphX(图...
1. `bin`:存放可执行脚本,如`spark-submit`用于提交Spark应用,`spark-shell`提供交互式Shell环境。 2. `conf`:配置文件夹,存放默认配置模板,如`spark-defaults.conf`,用户可以根据需求自定义配置。 3. `jars`...