Task的执行过程分析
Task的执行通过Worker启动时生成的Executor实例进行,
case RegisteredExecutor(sparkProperties) =>
logInfo("Successfully registered with driver")
// Make this host instead of hostPort ?
executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties)
通过executor实例的launchTask启动task的执行操作。
def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
valtr = new TaskRunner(context, taskId, serializedTask)
runningTasks.put(taskId, tr)
threadPool.execute(tr)
}
生成TaskRunner线程,把task与当前的Wroker的启动的executorBackend传入,
on yarn模式为CoarseGrainedExecutorBackend.
通过threadPool线程池执行生成TaskRunner线程。
TaskRunner.run函数:
用于执行task任务的线程
overridedef run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
valstartTime = System.currentTimeMillis()
SparkEvn后面在进行分析。
SparkEnv.set(env)
Thread.currentThread.setContextClassLoader(replClassLoader)
valser = SparkEnv.get.closureSerializer.newInstance()
logInfo("Running task ID " + taskId)
通过execBackend更新此task的状态。设置task的状态为RUNNING.向master发送StatusUpdate事件。
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
varattemptedTask: Option[Task[Any]] = None
vartaskStart: Long = 0
def gcTime = ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum
valstartGCTime = gcTime
try {
SparkEnv.set(env)
Accumulators.clear()
解析出task的资源信息。包括要执行的jar,其它资源,task定义信息
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
更新资源信息,并把task执行的jar更新到当前Thread的ClassLoader中。
updateDependencies(taskFiles, taskJars)
通过SparkEnv中配置的Serialize实现对task定义进行反serialize,得到Task实例。
Task的具体实现为ShuffleMapTask或者ResultTask
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
如果killed的值为true,不执行当前task任务,进入catch处理。
// If this task has been killed before we deserialized it, let's quit now. Otherwise,
// continue executing the task.
if (killed) {
// Throw an exception rather than returning, because returning within a try{} block
// causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
// exception will be caught by the catch block, leading to an incorrect ExceptionFailure
// for the task.
throw TaskKilledException
}
attemptedTask = Some(task)
logDebug("Task " + taskId +"'s epoch is " + task.epoch)
env.mapOutputTracker.updateEpoch(task.epoch)
生成TaskContext实例,通过Task.runTask执行task的任务,等待task执行完成。
// Run the actual task and measure its runtime.
taskStart = System.currentTimeMillis()
valvalue = task.run(taskId.toInt)
valtaskFinish = System.currentTimeMillis()
此时task执行结束,检查如果task是被killed的结果,进入catch处理。
// If the task has been killed, let's fail it.
if (task.killed) {
throw TaskKilledException
}
对task执行的返回结果进行serialize操作。
valresultSer = SparkEnv.get.serializer.newInstance()
valbeforeSerialization = System.currentTimeMillis()
valvalueBytes = resultSer.serialize(value)
valafterSerialization = System.currentTimeMillis()
发送监控指标
for (m <- task.metrics) {
m.hostname = Utils.localHostName()
m.executorDeserializeTime = (taskStart - startTime).toInt
m.executorRunTime = (taskFinish - taskStart).toInt
m.jvmGCTime = gcTime - startGCTime
m.resultSerializationTime = (afterSerialization - beforeSerialization).toInt
}
valaccumUpdates = Accumulators.values
把Task的返回结果生成DirectTaskResult实例。并对其进行serialize操作。
valdirectResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.getOrElse(null))
valserializedDirectResult = ser.serialize(directResult)
logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit)
检查task result的大小是否超过了akka的发送消息大小,
如果是通过BlockManager来管理结果,设置RDD的存储级别为MEMORY与DISK,否则表示没有达到actor消息大小,
直接使用TaskResult,此部分信息主要是需要通过状态更新向Scheduler向送StatusUpdate事件调用。
valserializedResult = {
if (serializedDirectResult.limit >= akkaFrameSize - 1024) {
logInfo("Storing result for " + taskId + " in local BlockManager")
valblockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
ser.serialize(new IndirectTaskResult[Any](blockId))
} else {
logInfo("Sending result for " + taskId + " directly to driver")
serializedDirectResult
}
}
通过execBackend更新此task的状态。设置task的状态为FINISHED.向master发送StatusUpdate事件。
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
logInfo("Finished task ID " + taskId)
} catch {
出现异常,发送FAILED事件。
caseffe: FetchFailedException => {
valreason = ffe.toTaskEndReason
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
}
case TaskKilledException => {
logInfo("Executor killed task " + taskId)
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
}
caset: Throwable => {
valserviceTime = (System.currentTimeMillis() - taskStart).toInt
valmetrics = attemptedTask.flatMap(t => t.metrics)
for (m <- metrics) {
m.executorRunTime = serviceTime
m.jvmGCTime = gcTime - startGCTime
}
valreason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
// TODO: Should we exit the whole executor here? On the one hand, the failed task may
// have left some weird state around depending on when the exception was thrown, but on
// the other hand, maybe we could detect that when future tasks fail and exit then.
logError("Exception in task ID " + taskId, t)
//System.exit(1)
}
} finally {
从shuffleMemoryMap中移出此线程对应的shuffle的内存空间
// TODO: Unregister shuffle memory only for ResultTask
valshuffleMemoryMap = env.shuffleMemoryMap
shuffleMemoryMap.synchronized {
shuffleMemoryMap.remove(Thread.currentThread().getId)
}
从runningTasks中移出此task
runningTasks.remove(taskId)
}
}
}
Task执行过程的状态更新
ExecutorBackend.statusUpdate
on yarn模式实现类CoarseGrainedExecutorBackend,通过master的actor发送StatusUpdate事件。
overridedef statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
driver ! StatusUpdate(executorId, taskId, state, data)
}
master 中的ExecutorBackend处理状态更新操作:
实现类:CoarseGrainedSchedulerBackend.DriverActor
case StatusUpdate(executorId, taskId, state, data) =>
通过TaskSchedulerImpl的statusUpdate处理状态更新。
scheduler.statusUpdate(taskId, state, data.value)
如果Task状态为完成状态,完成状态包含(FINISHED, FAILED, KILLED, LOST)
if (TaskState.isFinished(state)) {
if (executorActor.contains(executorId)) {
每一个task占用一个cpu core,此时task完成,把可用的core值加一
freeCores(executorId) += 1
在此executor上接着执行其于的task任务,此部分可参见scheduler调度过程分析中的部分说明。
makeOffers(executorId)
} else {
// Ignoring the update since we don't know about the executor.
valmsg = "Ignored task status update (%d state %s) from unknown executor %s with ID %s"
logWarning(msg.format(taskId, state, sender, executorId))
}
}
TaskSchedulerImpl.statusUpdate函数处理流程
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
varfailedExecutor: Option[String] = None
synchronized {
try {
如果Task的状态传入为Task的执行丢失,同时task在executor列表中存在
if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
得到此task执行的worker所属的executorID,
// We lost this entire executor, so remember that it's gone
valexecId = taskIdToExecutorId(tid)
如果此executor是active的Executor,执行scheduler的executorLost操作。
包含TaskSetManager,会执行TaskSetManager.executorLost操作.
设置当前的executor为failedExecutor,共函数最后使用。
if (activeExecutorIds.contains(execId)) {
removeExecutor(execId)
failedExecutor = Some(execId)
}
}
taskIdToTaskSetId.get(tid) match {
case Some(taskSetId) =>
如果task状态是完成状态,非RUNNING状态。移出对应的容器中的值
if (TaskState.isFinished(state)) {
taskIdToTaskSetId.remove(tid)
if (taskSetTaskIds.contains(taskSetId)) {
taskSetTaskIds(taskSetId) -= tid
}
taskIdToExecutorId.remove(tid)
}
activeTaskSets.get(taskSetId).foreach { taskSet =>
如果task是成功完成,从TaskSet中移出此task,同时通过TaskResultGetter获取数据。
if (state == TaskState.FINISHED) {
taskSet.removeRunningTask(tid)
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
} elseif (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
task任务执行失败的处理部分:
taskSet.removeRunningTask(tid)
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
}
}
case None =>
logInfo("Ignoring update with state %s from TID %s because its task set is gone"
.format(state, tid))
}
} catch {
casee: Exception => logError("Exception in statusUpdate", e)
}
}
如果有failed的worker,通过dagScheduler处理此executor.
// Update the DAGScheduler without holding a lock on this, since that can deadlock
if (failedExecutor != None) {
dagScheduler.executorLost(failedExecutor.get)
发起task执行的分配与任务执行操作。
backend.reviveOffers()
}
}
TaskStatus.LOST状态,同时executor在activeExecutorIds中
TaskStatus的状态为LOST时,同时executor是活动的executor(也就是有过执行task的情况)
privatedef removeExecutor(executorId: String) {
从activeExecutorIds中移出此executor
activeExecutorIds -= executorId
得到此executor对应的worker的host
valhost = executorIdToHost(executorId)
取出host对应的所有executor,并移出当前的executor
valexecs = executorsByHost.getOrElse(host, new HashSet)
execs -= executorId
if (execs.isEmpty) {
executorsByHost -= host
}
从executor对应的host容器中移出此executor
executorIdToHost -= executorId
此处主要是去执行TaskSetManager.executorLost函数。
rootPool.executorLost(executorId, host)
}
TaskSetManager.executorLost函数:
此函数主要处理executor导致task丢失的情况,把executor上的task重新添加到pending的tasks列表中
overridedef executorLost(execId: String, host: String) {
logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)
// Re-enqueue pending tasks for this host based on the status of the cluster -- for example, a
// task that used to have locations on only this host might now go to the no-prefs list. Note
// that it's okay if we add a task to the same queue twice (if it had multiple preferred
// locations), because findTaskFromList will skip already-running tasks.
重新生成此TaskSet中的pending队列,因为当前executor的实例被移出,需要重新生成。
for (index <- getPendingTasksForExecutor(execId)) {
addPendingTask(index, readding=true)
}
for (index <- getPendingTasksForHost(host)) {
addPendingTask(index, readding=true)
}
// Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage
如果当前的RDD是shuffle的rdd,
if (tasks(0).isInstanceOf[ShuffleMapTask]) {
for ((tid, info) <- taskInfosifinfo.executorId == execId) {
valindex = taskInfos(tid).index
if (successful(index)) {
successful(index) = false
copiesRunning(index) -= 1
tasksSuccessful -= 1
addPendingTask(index)
// Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
// stage finishes when a total of tasks.size tasks finish.
通过DAGScheduler发送CompletionEvent处理事件,事件类型为Resubmitted,
sched.dagScheduler.taskEnded(tasks(index), Resubmitted, null, null, info, null)
}
}
}
如果task还处于running状态,同时此task在lost的executor上运行,
// Also re-enqueue any tasks that were running on the node
for ((tid, info) <- taskInfosifinfo.running && info.executorId == execId) {
设置task的failed值为true,移出此task的running列表中的值,重新添加task到pendingtasks队列中。
handleFailedTask(tid, TaskState.FAILED, None)
}
}
DAGScheduler处理CompletionEvent事件。
...........................
casecompletion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
listenerBus.post(SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics))
handleTaskCompletion(completion)
.........................
case Resubmitted =>
logInfo("Resubmitted " + task + ", so marking it as still running")
pendingTasks(stage) += task
(TaskState.FAILED, TaskState.KILLED, TaskState.LOST)状态
.........................
} elseif (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
把task从running容器中移出
taskSet.removeRunningTask(tid)
此函数主要是解析出出错的信息。并通过TaskSchedulerImpl.handleFailedTask处理exception
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
}
TaskSchedulerImpl.handleFailedTask函数:
def handleFailedTask(
taskSetManager: TaskSetManager,
tid: Long,
taskState: TaskState,
reason: Option[TaskEndReason]) = synchronized {
taskSetManager.handleFailedTask(tid, taskState, reason)
如果task不是被KILLED掉的task,重新发起task的分配与执行操作。
if (taskState != TaskState.KILLED) {
// Need to revive offers again now that the task set manager state has been updated to
// reflect failed tasks that need to be re-run.
backend.reviveOffers()
}
}
TaskSetManager.handleFailedTask函数流程
TaskSetManager.handleFailedTask,函数,处理task执行的exception信息。
def handleFailedTask(tid: Long, state: TaskState, reason: Option[TaskEndReason]) {
valinfo = taskInfos(tid)
if (info.failed) {
return
}
removeRunningTask(tid)
valindex = info.index
info.markFailed()
varfailureReason = "unknown"
if (!successful(index)) {
logWarning("Lost TID %s (task %s:%d)".format(tid, taskSet.id, index))
copiesRunning(index) -= 1
如果是通过TaskSetManager.executorLost函数发起的此函数调用(Task.LOST),下面的case部分不会执行,
否则是task的执行exception情况,也就是状态更新中非Task.LOST状态时。
// Check if the problem is a map output fetch failure. In that case, this
// task will never succeed on any node, so tell the scheduler about it.
reason.foreach {
casefetchFailed: FetchFailed =>
读取失败,移出所有此taskset的task执行。并从scheduler中移出此taskset的调度,不再执行下面流程
logWarning("Loss was due to fetch failure from " + fetchFailed.bmAddress)
sched.dagScheduler.taskEnded(tasks(index), fetchFailed, null, null, info, null)
successful(index) = true
tasksSuccessful += 1
sched.taskSetFinished(this)
removeAllRunningTasks()
return
case TaskKilled =>
task被kill掉,移出此task,同时不再执行下面流程
logWarning("Task %d was killed.".format(tid))
sched.dagScheduler.taskEnded(tasks(index), reason.get, null, null, info, null)
return
caseef: ExceptionFailure =>
sched.dagScheduler.taskEnded(
tasks(index), ef, null, null, info, ef.metrics.getOrElse(null))
if (ef.className == classOf[NotSerializableException].getName()) {
// If the task result wasn't rerializable, there's no point in trying to re-execute it.
logError("Task %s:%s had a not serializable result: %s; not retrying".format(
taskSet.id, index, ef.description))
abort("Task %s:%s had a not serializable result: %s".format(
taskSet.id, index, ef.description))
return
}
valkey = ef.description
failureReason = "Exception failure: %s".format(ef.description)
valnow = clock.getTime()
val (printFull, dupCount) = {
if (recentExceptions.contains(key)) {
val (dupCount, printTime) = recentExceptions(key)
if (now - printTime > EXCEPTION_PRINT_INTERVAL) {
recentExceptions(key) = (0, now)
(true, 0)
} else {
recentExceptions(key) = (dupCount + 1, printTime)
(false, dupCount + 1)
}
} else {
recentExceptions(key) = (0, now)
(true, 0)
}
}
if (printFull) {
vallocs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString))
logWarning("Loss was due to %s\n%s\n%s".format(
ef.className, ef.description, locs.mkString("\n")))
} else {
logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount))
}
case TaskResultLost =>
failureReason = "Lost result for TID %s on host %s".format(tid, info.host)
logWarning(failureReason)
sched.dagScheduler.taskEnded(tasks(index), TaskResultLost, null, null, info, null)
case _ => {}
}
重新把task添加到pending的执行队列中,同时如果状态非KILLED的状态,设置并检查是否达到重试的最大次数
// On non-fetch failures, re-enqueue the task as pending for a max number of retries
addPendingTask(index)
if (state != TaskState.KILLED) {
numFailures(index) += 1
if (numFailures(index) >= maxTaskFailures) {
logError("Task %s:%d failed %d times; aborting job".format(
taskSet.id, index, maxTaskFailures))
abort("Task %s:%d failed %d times (most recent failure: %s)".format(
taskSet.id, index, maxTaskFailures, failureReason))
}
}
} else {
logInfo("Ignoring task-lost event for TID " + tid +
" because task " + index + " is already finished")
}
}
DAGScheduler处理taskEnded流程:
def taskEnded(
task: Task[_],
reason: TaskEndReason,
result: Any,
accumUpdates: Map[Long, Any],
taskInfo: TaskInfo,
taskMetrics: TaskMetrics) {
eventProcessActor ! CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics)
}
处理CompletionEvent事件:
casecompletion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
listenerBus.post(SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics))
handleTaskCompletion(completion)
DAGScheduler.handleTaskCompletion
读取失败的case,
case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
// Mark the stage that the reducer was in as unrunnable
valfailedStage = stageIdToStage(task.stageId)
running -= failedStage
failed += failedStage
..............................
// Mark the map whose fetch failed as broken in the map stage
valmapStage = shuffleToMapStage(shuffleId)
if (mapId != -1) {
mapStage.removeOutputLoc(mapId, bmAddress)
mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
}
...........................
failed += mapStage
// Remember that a fetch failed now; this is used to resubmit the broken
// stages later, after a small wait (to give other tasks the chance to fail)
lastFetchFailureTime = System.currentTimeMillis() // TODO: Use pluggable clock
// TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
把stage中可执行的partition中对应的executorid的location全部移出。
handleExecutorLost(bmAddress.executorId, Some(task.epoch))
}
case ExceptionFailure(className, description, stackTrace, metrics) =>
// Do nothing here, left up to the TaskScheduler to decide how to handle user failures
case TaskResultLost =>
// Do nothing here; the TaskScheduler handles these failures and resubmits the task.
TaskStatus.FINISHED状态
此状态表示task正常完成,
if (state == TaskState.FINISHED) {
移出taskSet中的running队列中移出此task
taskSet.removeRunningTask(tid)
获取task的响应数据。
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
TaskResultGetter.enqueueSuccessfulTask函数:
def enqueueSuccessfulTask(
taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
getTaskResultExecutor.execute(new Runnable {
overridedef run() {
try {
从响应的结果中得到数据,需要先执行deserialize操作。
valresult = serializer.get().deserialize[TaskResult[_]](serializedData) match {
如果result的结果小于akka的actor传输的大小,直接返回task的执行结果
casedirectResult: DirectTaskResult[_] => directResult
case IndirectTaskResult(blockId) =>
否则,result结果太大,通过BlockManager管理,通过blockManager拿到result的数据
logDebug("Fetching indirect task result for TID %s".format(tid))
给DAGScheduler发送GettingResultEvent事件处理,
见下面TaskSchedulerImpl.handleTaskGettingResult函数
scheduler.handleTaskGettingResult(taskSetManager, tid)
得到task的执行结果
valserializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
task执行完成,并拿结果失败,见上面的错误处理中的TaskResultLost部分。
if (!serializedTaskResult.isDefined) {
/* We won't be able to get the task result if the machine that ran the task failed
* between when the task ended and when we tried to fetch the result, or if the
* block manager had to flush the result. */
scheduler.handleFailedTask(
taskSetManager, tid, TaskState.FINISHED, Some(TaskResultLost))
return
}
对task的执行结果进行deserialized操作。
valdeserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
serializedTaskResult.get)
拿到执行结果,移出对应的blockid
sparkEnv.blockManager.master.removeBlock(blockId)
deserializedResult
}
result.metrics.resultSize = serializedData.limit()
见下面的TaskSchedulerImpl.handleSuccessfulTask处理函数。
scheduler.handleSuccessfulTask(taskSetManager, tid, result)
} catch {
casecnf: ClassNotFoundException =>
valloader = Thread.currentThread.getContextClassLoader
taskSetManager.abort("ClassNotFound with classloader: " + loader)
caseex: Throwable =>
taskSetManager.abort("Exception while deserializing and fetching task: %s".format(ex))
}
}
})
}
TaskSchedulerImpl.handleTaskGettingResult函数:
def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long) {
taskSetManager.handleTaskGettingResult(tid)
}
taskSetManager中
def handleTaskGettingResult(tid: Long) = {
valinfo = taskInfos(tid)
info.markGettingResult()
sched.dagScheduler.taskGettingResult(tasks(info.index), info)
}
通过DAGScheduler发起GettingResultEvent事件。
def taskGettingResult(task: Task[_], taskInfo: TaskInfo) {
eventProcessActor ! GettingResultEvent(task, taskInfo)
}
对GettingResultEvent事件的处理:其实就是打个酱油,无实际处理操作。
case GettingResultEvent(task, taskInfo) =>
listenerBus.post(SparkListenerTaskGettingResult(task, taskInfo))
TaskSchedulerImpl.handleSuccessfulTask处理函数:
def handleSuccessfulTask(
taskSetManager: TaskSetManager,
tid: Long,
taskResult: DirectTaskResult[_]) = synchronized {
taskSetManager.handleSuccessfulTask(tid, taskResult)
}
TastSetManager中
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]) = {
valinfo = taskInfos(tid)
valindex = info.index
info.markSuccessful()
从running队列中移出此task
removeRunningTask(tid)
if (!successful(index)) {
logInfo("Finished TID %s in %d ms on %s (progress: %d/%d)".format(
tid, info.duration, info.host, tasksSuccessful, numTasks))
向dagscheduler发送success消息,
sched.dagScheduler.taskEnded(
tasks(index), Success, result.value, result.accumUpdates, info, result.metrics)
设置成功完成的task个数加一,同时在successful容器中设置task对应的运行状态为true,表示成功。
// Mark successful and stop if all the tasks have succeeded.
tasksSuccessful += 1
successful(index) = true
如果完成的task个数,达到task的总个数,完成此taskset,也就相当于完成了一个rdd
if (tasksSuccessful == numTasks) {
sched.taskSetFinished(this)
}
} else {
logInfo("Ignorning task-finished event for TID " + tid + " because task " +
index + " has already completed successfully")
}
}
DAGScheduler处理CompletionEvent的Success,,,,
case Success =>
logInfo("Completed " + task)
if (event.accumUpdates != null) {
Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted
}
把等待执行队列中移出此task
pendingTasks(stage) -= task
stageToInfos(stage).taskInfos += event.taskInfo -> event.taskMetrics
根据task的执行类型,处理两个类型的Task
taskmatch {
如果task是ResultTask,表示不需要shuffle操作
casert: ResultTask[_, _] =>
resultStageToJob.get(stage) match {
case Some(job) =>
如果此执行的stage的ActiveJob中对应此task的partition存储的finished标志为false,
if (!job.finished(rt.outputId)) {
设置task的完成标志为true
job.finished(rt.outputId) = true
把job中完成的task个数加一,同时检查是否所有的task都完成,如果所有task都完成,
从相关的容器中移出此job与对应的stage.
job.numFinished += 1
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
idToActiveJob -= stage.jobId
activeJobs -= job
resultStageToJob -= stage
markStageAsFinished(stage)
jobIdToStageIdsRemove(job.jobId)
listenerBus.post(SparkListenerJobEnd(job, JobSucceeded))
}
调用ActiveJob内的JobWaiter.taskSucceeded函数,更新此task为完成,同时把result传入进行输出处理。
job.listener.taskSucceeded(rt.outputId, event.result)
}
case None =>
logInfo("Ignoring result from " + rt + " because its job has finished")
}
针对shuffle的task的执行完成,处理流程:
casesmt: ShuffleMapTask =>
valstatus = event.result.asInstanceOf[MapStatus]
valexecId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)
} else {
把shuffle的result(MapStatus)写入到stage的outputLoc中。每添加一个会把numAvailableOutputs的值加一,
当numAvailableOutputs的值==numPartitions的值时,表示shuffle的map执行完成。
stage.addOutputLoc(smt.partitionId, status)
}
如果此stage还处在running状态,同时pendingTasks中所有的task已经处理完成
if (running.contains(stage) && pendingTasks(stage).isEmpty) {
更新stage的状态
markStageAsFinished(stage)
.......................................
此处表示shuffle的stage处理完成,把shuffleid与stage的outputLocs注册到mapOutputTracker中。
把每一个shuffle taks执行的executor与host等信息,每一个task执行完成的大小。注册到mapoutput中。
每一个task的shuffle的writer都会有shuffleid的信息,注册成功后,
下一个stage会根据mapoutputtracker中此shuffleid的信息读取数据。
mapOutputTracker.registerMapOutputs(
stage.shuffleDep.get.shuffleId,
stage.outputLocs.map(list => if (list.isEmpty) nullelse list.head).toArray,
changeEpoch = true)
}
clearCacheLocs()
stage中每一个partition的outputLoc默认值为Nil,如果发现有partition的值为Nil,表示有task处理失败,
重新提交此stage.此时会把没有成功的task重新执行。
if (stage.outputLocs.exists(_ == Nil)) {
.........................................
submitStage(stage)
} else {
valnewlyRunnable = new ArrayBuffer[Stage]
for (stage <- waiting) {
logInfo("Missing parents for " + stage + ": " + getMissingParentStages(stage))
}
此处检查下面未执行的所有的stage,如果stage(RDD)的上级shuffle依赖完成,
或者后面所有的stage不再有shuffle的stage的所有stage,拿到这些个stage.
for (stage <- waitingif getMissingParentStages(stage) == Nil) {
newlyRunnable += stage
}
执行此stage后面的所有可执行的stage,把waiting中移出要执行的stage,
waiting --= newlyRunnable
在running队列中添加要执行的新的stage.
running ++= newlyRunnable
for {
stage <- newlyRunnable.sortBy(_.id)
jobId <- activeJobForStage(stage)
} {
提交下一个stage的task分配与执行。
logInfo("Submitting " + stage + " (" + stage.rdd + "), which is now runnable")
submitMissingTasks(stage, jobId)
}
}
}
}
JobWaiter.taskSucceeded函数,
task完成后的处理函数。
override def taskSucceeded(index: Int, result: Any): Unit = synchronized {
if (_jobFinished) {
thrownew UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter")
}
通过resultHandler函数把结果进行处理。此函数是生成JobWaiter时传入
resultHandler(index, result.asInstanceOf[T])
把完成的task值加一
finishedTasks += 1
if (finishedTasks == totalTasks) {
如果完成的task个数等于所有的task的个数时,设置job的完成状态为true,并设置状态为JobSucceeded
如果设置为true,表示job执行完成,前面的等待执行完成结束等待。
_jobFinished = true
jobResult = JobSucceeded
this.notifyAll()
}
}
Task.runTask函数实现
Task的实现分为两类,
需要进行shuffle操作的ShuffleMapTask,
不需要进行shuffle操作的ResultTask.
ResulitTask.runTask
override def runTask(context: TaskContext): U = {
metrics = Some(context.taskMetrics)
try {
此处通过生成task实例时也就是DAGScheduler的runJob时传入的function进行处理
比如在PairRDDFunction.saveAsHadoopDataset中定义的writeToFile函数
rdd.iterator中会根据不现的RDD的实现,执行其compute函数,
而compute函数具体执行通过业务代码中定义的如map函数传入的定义的function进行执行,
func(context, rdd.iterator(split, context))
} finally {
context.executeOnCompleteCallbacks()
}
}
ShuffleMapTask.runTask
override def runTask(context: TaskContext): MapStatus = {
valnumOutputSplits = dep.partitioner.numPartitions
metrics = Some(context.taskMetrics)
valblockManager = SparkEnv.get.blockManager
valshuffleBlockManager = blockManager.shuffleBlockManager
varshuffle: ShuffleWriterGroup = null
varsuccess = false
try {
通过shuffleId拿到一个shuffle的写入实例
// Obtain all the block writers for shuffle blocks.
valser = SparkEnv.get.serializerManager.get(dep.serializerClass, SparkEnv.get.conf)
shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)
执行rdd.iterator操作,生成Pair,也就是Product2,根据key重新shuffle到不同的文件中。
当所有的shuffle的task完成后,会把此stage注册到 mapOutputTracker中,
等待下一个stage从中读取数据并执行其它操作,每一个shuffle的task完成后会生成一个MapStatus实例,
此实例主要包含有shuffle执行的executor与host等信息,每一个task执行完成的大小。
具体的shuffle数据读取可参见后面的shufle分析.
// Write the map output to its associated buckets.
for (elem <- rdd.iterator(split, context)) {
valpair = elem.asInstanceOf[Product2[Any, Any]]
valbucketId = dep.partitioner.getPartition(pair._1)
shuffle.writers(bucketId).write(pair)
}
// Commit the writes. Get the size of each bucket block (total block size).
vartotalBytes = 0L
vartotalTime = 0L
valcompressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter =>
writer.commit()
writer.close()
valsize = writer.fileSegment().length
totalBytes += size
totalTime += writer.timeWriting()
MapOutputTracker.compressSize(size)
}
// Update shuffle metrics.
valshuffleMetrics = new ShuffleWriteMetrics
shuffleMetrics.shuffleBytesWritten = totalBytes
shuffleMetrics.shuffleWriteTime = totalTime
metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)
success = true
new MapStatus(blockManager.blockManagerId, compressedSizes)
} catch { casee: Exception =>
// If there is an exception from running the task, revert the partial writes
// and throw the exception upstream to Spark.
if (shuffle != null && shuffle.writers != null) {
for (writer <- shuffle.writers) {
writer.revertPartialWrites()
writer.close()
}
}
throwe
} finally {
// Release the writers back to the shuffle block manager.
if (shuffle != null && shuffle.writers != null) {
shuffle.releaseWriters(success)
}
// Execute the callbacks on task completion.
context.executeOnCompleteCallbacks()
}
}
相关推荐
#### Task执行流程分析 当一个作业被提交时,DAGScheduler会根据RDD的依赖关系构建出一个有向无环图(DAG),并将这个DAG划分为一系列的Stage。每个Stage由一系列的Task组成,这些Task将会被分配到不同的Executor上...
Hadoop源代码分析(MapTask) Hadoop的MapTask类是Hadoop MapReduce框架中的一部分,负责执行Map任务。MapTask类继承自Task类,是MapReduce框架中的一个重要组件。本文将对MapTask类的源代码进行分析,了解其内部...
一个自定义Task通常继承自`org.apache.tools.ant.Task`类,并重写`execute()`方法,这是Task执行时调用的核心方法。在这个方法中,你可以编写自定义的任务逻辑。 下面是一个名为`HelloWorldTask`的简单示例: ```...
在流程执行过程中遇到 Decision 节点时,系统会根据预设的条件自动计算并选择合适的路径继续执行。 **特点:** 1. **条件判断**:每个 Decision 节点都可以设置多个条件路径。 2. **智能选择**:系统自动根据条件...
在MTK平台中,添加TASK即创建一个新的进程或线程来执行特定功能。步骤通常包括: 1. 定义任务结构体:首先需要定义一个结构体,包含任务的优先级、堆栈大小和入口函数等信息。 2. 分配堆栈:为新任务分配内存空间,...
在Azure Data Factory (ADF) 中,Taskflow是一个强大的特性,允许你创建复杂的、有顺序的任务执行流程。在处理大数据工作负载时,有时我们需要在不同任务之间传递参数,以实现更灵活和可重用的工作流。本篇文章将...
在执行过程中,可以监听进度,处理异常,或者在需要时暂停和恢复。 4. **ADF TaskFlow_RemoteProducer**: - "ADF_TaskFlow_RemoteProducer"可能是指一个特定的TaskFlow实例,它表示一个远程执行的任务生产者。在...
7. **错误处理**:考虑异常安全,确保即使在任务执行过程中抛出异常,也能正确地清理资源。 8. **性能优化**:考虑使用线程池来重用线程,避免频繁创建和销毁线程的开销。 9. **并行度控制**:根据系统资源和需求...
MapTask执行过程中,键值对的输出是一个关键环节,它直接影响到后续的排序、合并等操作。输出缓冲区中涉及的关键变量包括`bufstart`、`bufend`、`bufmark`和`bufvoid`。这些变量共同协作,确保数据能够正确且高效...
【Hadoop 源代码分析:Task 类】 在Hadoop框架中,Task类是一个关键的抽象类,它是MapTask和ReduceTask的父类,分别对应Map阶段和Reduce阶段的执行单元。Task类定义了任务的基本行为和状态管理,是整个MapReduce...
ServiceTask 可以在执行过程中读取和修改流程变量。使用 `${variableName}` 表达式可以访问变量,而 `setVariable` 方法则用于设置变量值。 8. **边界事件与异常处理** ServiceTask 支持边界事件,例如错误、信号...
7. **结果收集**:所有的Task执行完成后,Driver会收集计算结果,并可能进行进一步的处理或存储。 文章中提到的两个.vsdx文件"spark_client_generateJob.vsd"和"spark_client_runJob.vsd"可能是流程图或架构图,...
其中,Sonar Ant Task就是用于集成SonarQube到Apache Ant构建系统中的关键组件,它使得在Ant构建流程中执行代码质量检查变得简单易行。 “sonar-ant-task-2.1.jar”是SonarQube Ant任务的二进制库,版本为2.1。这个...
### Hadoop MapTask辅助类源代码分析 #### 一、概述 Hadoop作为一个分布式计算框架,其核心组件之一是MapReduce。MapReduce负责处理大规模数据集的并行运算任务,而MapTask作为MapReduce的核心组成部分之一,其...
这是指在作业执行过程中周期性地保存检查点(Checkpoint),一旦发生故障,系统可以从最近的检查点恢复,确保数据处理的准确性(Exactly-Once语义)。 #### 5.1 Fault Tolerance演进之路 从Storm的...
在软件工程中,构建(Build)是将源代码转换成可执行程序或库的过程。Soyatec可能是一家为华为提供服务的公司,他们提供的文档可能详细阐述了如何配置和执行华为项目的构建任务,包括编译、打包、测试等步骤。构建...
在Delmia软件中进行的人类任务模拟是一种先进的技术,它能够帮助用户在虚拟环境中精确地模拟人类操作员的工作流程和任务执行情况。通过这种模拟,可以有效地预测和分析工作环境中的各种问题,如工作效率、人体工程学...
- **添加分析到模拟**: 在模拟过程中加入数据分析功能,以获取更多关于任务执行的数据支持。 - **验证过程**: 通过一系列的验证步骤,确保整个任务流程的合理性和可行性。 - **共享关联操作员**: 设置多个操作员之间...
"深入分析C# Task" Task是C#中的一种异步编程模式,自.NET Framework 4中引入,主要用于实现多线程编程。Task类表示一个单个操作,不返回值,通常以异步方式执行。Task对象是一种基于任务的异步模式的中心思想,...
在Hadoop中,MapTask是MapReduce框架的关键组件,负责执行Mapper阶段的工作。MapTask辅助类,特别是MapOutputBuffer,是Mapper输出数据管理的核心部分。本文将继续深入分析MapOutputBuffer的内部实现,以便理解...