`
hongs_yang
  • 浏览: 61016 次
  • 性别: Icon_minigender_1
  • 来自: 西安
社区版块
存档分类
最新评论

Task的执行过程分析

阅读更多

Task的执行过程分析

 

Task的执行通过Worker启动时生成的Executor实例进行,

 

case RegisteredExecutor(sparkProperties) =>

 

logInfo("Successfully registered with driver")

 

// Make this host instead of hostPort ?

 

executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties)

 

 

 

通过executor实例的launchTask启动task的执行操作。

 

 

 

def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {

 

valtr = new TaskRunner(context, taskId, serializedTask)

 

runningTasks.put(taskId, tr)

 

threadPool.execute(tr)

 

}

 

 

 

生成TaskRunner线程,把task与当前的Wroker的启动的executorBackend传入,

 

on yarn模式为CoarseGrainedExecutorBackend.

 

通过threadPool线程池执行生成TaskRunner线程。

 

 

 

TaskRunner.run函数:

 

用于执行task任务的线程

 

overridedef run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>

 

valstartTime = System.currentTimeMillis()

 

SparkEvn后面在进行分析。

 

SparkEnv.set(env)

 

Thread.currentThread.setContextClassLoader(replClassLoader)

 

valser = SparkEnv.get.closureSerializer.newInstance()

 

logInfo("Running task ID " + taskId)

 

通过execBackend更新此task的状态。设置task的状态为RUNNING.master发送StatusUpdate事件。

 

execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)

 

varattemptedTask: Option[Task[Any]] = None

 

vartaskStart: Long = 0

 

def gcTime = ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum

 

valstartGCTime = gcTime

 

 

 

try {

 

SparkEnv.set(env)

 

Accumulators.clear()

 

解析出task的资源信息。包括要执行的jar,其它资源,task定义信息

 

val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)

 

更新资源信息,并把task执行的jar更新到当前ThreadClassLoader中。

 

updateDependencies(taskFiles, taskJars)

 

通过SparkEnv中配置的Serialize实现对task定义进行反serialize,得到Task实例。

 

Task的具体实现为ShuffleMapTask或者ResultTask

 

task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)

 

 

 

如果killed的值为true,不执行当前task任务,进入catch处理。

 

// If this task has been killed before we deserialized it, let's quit now. Otherwise,

 

// continue executing the task.

 

if (killed) {

 

// Throw an exception rather than returning, because returning within a try{} block

 

// causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl

 

// exception will be caught by the catch block, leading to an incorrect ExceptionFailure

 

// for the task.

 

throw TaskKilledException

 

}

 

 

 

attemptedTask = Some(task)

 

logDebug("Task " + taskId +"'s epoch is " + task.epoch)

 

env.mapOutputTracker.updateEpoch(task.epoch)

 

生成TaskContext实例,通过Task.runTask执行task的任务,等待task执行完成。

 

// Run the actual task and measure its runtime.

 

taskStart = System.currentTimeMillis()

 

valvalue = task.run(taskId.toInt)

 

valtaskFinish = System.currentTimeMillis()

 

 

 

此时task执行结束,检查如果task是被killed的结果,进入catch处理。

 

// If the task has been killed, let's fail it.

 

if (task.killed) {

 

throw TaskKilledException

 

}

 

task执行的返回结果进行serialize操作。

 

valresultSer = SparkEnv.get.serializer.newInstance()

 

valbeforeSerialization = System.currentTimeMillis()

 

valvalueBytes = resultSer.serialize(value)

 

valafterSerialization = System.currentTimeMillis()

 

发送监控指标

 

for (m <- task.metrics) {

 

m.hostname = Utils.localHostName()

 

m.executorDeserializeTime = (taskStart - startTime).toInt

 

m.executorRunTime = (taskFinish - taskStart).toInt

 

m.jvmGCTime = gcTime - startGCTime

 

m.resultSerializationTime = (afterSerialization - beforeSerialization).toInt

 

}

 

 

 

valaccumUpdates = Accumulators.values

 

Task的返回结果生成DirectTaskResult实例。并对其进行serialize操作。

 

valdirectResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.getOrElse(null))

 

valserializedDirectResult = ser.serialize(directResult)

 

logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit)

 

检查task result的大小是否超过了akka的发送消息大小,

 

如果是通过BlockManager来管理结果,设置RDD的存储级别为MEMORYDISK,否则表示没有达到actor消息大小,

 

直接使用TaskResult,此部分信息主要是需要通过状态更新向Scheduler向送StatusUpdate事件调用。

 

valserializedResult = {

 

if (serializedDirectResult.limit >= akkaFrameSize - 1024) {

 

logInfo("Storing result for " + taskId + " in local BlockManager")

 

valblockId = TaskResultBlockId(taskId)

 

env.blockManager.putBytes(

 

blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)

 

ser.serialize(new IndirectTaskResult[Any](blockId))

 

} else {

 

logInfo("Sending result for " + taskId + " directly to driver")

 

serializedDirectResult

 

}

 

}

 

通过execBackend更新此task的状态。设置task的状态为FINISHED.master发送StatusUpdate事件。

 

execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)

 

logInfo("Finished task ID " + taskId)

 

} catch {

 

出现异常,发送FAILED事件。

 

caseffe: FetchFailedException => {

 

valreason = ffe.toTaskEndReason

 

execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))

 

}

 

 

 

case TaskKilledException => {

 

logInfo("Executor killed task " + taskId)

 

execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))

 

}

 

 

 

caset: Throwable => {

 

valserviceTime = (System.currentTimeMillis() - taskStart).toInt

 

valmetrics = attemptedTask.flatMap(t => t.metrics)

 

for (m <- metrics) {

 

m.executorRunTime = serviceTime

 

m.jvmGCTime = gcTime - startGCTime

 

}

 

valreason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)

 

execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))

 

 

 

// TODO: Should we exit the whole executor here? On the one hand, the failed task may

 

// have left some weird state around depending on when the exception was thrown, but on

 

// the other hand, maybe we could detect that when future tasks fail and exit then.

 

logError("Exception in task ID " + taskId, t)

 

//System.exit(1)

 

}

 

} finally {

 

shuffleMemoryMap中移出此线程对应的shuffle的内存空间

 

// TODO: Unregister shuffle memory only for ResultTask

 

valshuffleMemoryMap = env.shuffleMemoryMap

 

shuffleMemoryMap.synchronized {

 

shuffleMemoryMap.remove(Thread.currentThread().getId)

 

}

 

runningTasks中移出此task

 

runningTasks.remove(taskId)

 

}

 

}

 

}

 

 

 

Task执行过程的状态更新

 

ExecutorBackend.statusUpdate

 

on yarn模式实现类CoarseGrainedExecutorBackend,通过masteractor发送StatusUpdate事件。

 

overridedef statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {

 

driver ! StatusUpdate(executorId, taskId, state, data)

 

}

 

 

 

master 中的ExecutorBackend处理状态更新操作:

 

实现类:CoarseGrainedSchedulerBackend.DriverActor

 

case StatusUpdate(executorId, taskId, state, data) =>

 

通过TaskSchedulerImplstatusUpdate处理状态更新。

 

scheduler.statusUpdate(taskId, state, data.value)

 

如果Task状态为完成状态,完成状态包含(FINISHED, FAILED, KILLED, LOST)

 

if (TaskState.isFinished(state)) {

 

if (executorActor.contains(executorId)) {

 

每一个task占用一个cpu core,此时task完成,把可用的core值加一

 

freeCores(executorId) += 1

 

在此executor上接着执行其于的task任务,此部分可参见scheduler调度过程分析中的部分说明。

 

makeOffers(executorId)

 

} else {

 

// Ignoring the update since we don't know about the executor.

 

valmsg = "Ignored task status update (%d state %s) from unknown executor %s with ID %s"

 

logWarning(msg.format(taskId, state, sender, executorId))

 

}

 

}

 

 

 

TaskSchedulerImpl.statusUpdate函数处理流程

 

 

 

def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {

 

varfailedExecutor: Option[String] = None

 

synchronized {

 

try {

 

如果Task的状态传入为Task的执行丢失,同时taskexecutor列表中存在

 

if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {

 

得到此task执行的worker所属的executorID

 

// We lost this entire executor, so remember that it's gone

 

valexecId = taskIdToExecutorId(tid)

 

如果此executoractiveExecutor,执行schedulerexecutorLost操作。

 

包含TaskSetManager,会执行TaskSetManager.executorLost操作.

 

设置当前的executorfailedExecutor,共函数最后使用。

 

if (activeExecutorIds.contains(execId)) {

 

removeExecutor(execId)

 

failedExecutor = Some(execId)

 

}

 

}

 

taskIdToTaskSetId.get(tid) match {

 

case Some(taskSetId) =>

 

如果task状态是完成状态,非RUNNING状态。移出对应的容器中的值

 

if (TaskState.isFinished(state)) {

 

taskIdToTaskSetId.remove(tid)

 

if (taskSetTaskIds.contains(taskSetId)) {

 

taskSetTaskIds(taskSetId) -= tid

 

}

 

taskIdToExecutorId.remove(tid)

 

}

 

activeTaskSets.get(taskSetId).foreach { taskSet =>

 

如果task是成功完成,从TaskSet中移出此task,同时通过TaskResultGetter获取数据。

 

if (state == TaskState.FINISHED) {

 

taskSet.removeRunningTask(tid)

 

taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)

 

} elseif (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {

 

task任务执行失败的处理部分:

 

taskSet.removeRunningTask(tid)

 

taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)

 

}

 

}

 

case None =>

 

logInfo("Ignoring update with state %s from TID %s because its task set is gone"

 

.format(state, tid))

 

}

 

} catch {

 

casee: Exception => logError("Exception in statusUpdate", e)

 

}

 

}

 

如果有failedworker,通过dagScheduler处理此executor.

 

// Update the DAGScheduler without holding a lock on this, since that can deadlock

 

if (failedExecutor != None) {

 

dagScheduler.executorLost(failedExecutor.get)

 

发起task执行的分配与任务执行操作。

 

backend.reviveOffers()

 

}

 

}

 

 

 

TaskStatus.LOST状态,同时executoractiveExecutorIds

 

TaskStatus的状态为LOST时,同时executor是活动的executor(也就是有过执行task的情况)

 

privatedef removeExecutor(executorId: String) {

 

activeExecutorIds中移出此executor

 

activeExecutorIds -= executorId

 

得到此executor对应的workerhost

 

valhost = executorIdToHost(executorId)

 

取出host对应的所有executor,并移出当前的executor

 

valexecs = executorsByHost.getOrElse(host, new HashSet)

 

execs -= executorId

 

if (execs.isEmpty) {

 

executorsByHost -= host

 

}

 

executor对应的host容器中移出此executor

 

executorIdToHost -= executorId

 

此处主要是去执行TaskSetManager.executorLost函数。

 

rootPool.executorLost(executorId, host)

 

}

 

 

 

TaskSetManager.executorLost函数:

 

此函数主要处理executor导致task丢失的情况,把executor上的task重新添加到pendingtasks列表中

 

overridedef executorLost(execId: String, host: String) {

 

logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)

 

 

 

// Re-enqueue pending tasks for this host based on the status of the cluster -- for example, a

 

// task that used to have locations on only this host might now go to the no-prefs list. Note

 

// that it's okay if we add a task to the same queue twice (if it had multiple preferred

 

// locations), because findTaskFromList will skip already-running tasks.

 

重新生成此TaskSet中的pending队列,因为当前executor的实例被移出,需要重新生成。

 

for (index <- getPendingTasksForExecutor(execId)) {

 

addPendingTask(index, readding=true)

 

}

 

for (index <- getPendingTasksForHost(host)) {

 

addPendingTask(index, readding=true)

 

}

 

 

 

// Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage

 

如果当前的RDDshufflerdd,

 

if (tasks(0).isInstanceOf[ShuffleMapTask]) {

 

for ((tid, info) <- taskInfosifinfo.executorId == execId) {

 

valindex = taskInfos(tid).index

 

if (successful(index)) {

 

successful(index) = false

 

copiesRunning(index) -= 1

 

tasksSuccessful -= 1

 

addPendingTask(index)

 

// Tell the DAGScheduler that this task was resubmitted so that it doesn't think our

 

// stage finishes when a total of tasks.size tasks finish.

 

通过DAGScheduler发送CompletionEvent处理事件,事件类型为Resubmitted,

 

sched.dagScheduler.taskEnded(tasks(index), Resubmitted, null, null, info, null)

 

}

 

}

 

}

 

如果task还处于running状态,同时此tasklostexecutor上运行,

 

// Also re-enqueue any tasks that were running on the node

 

for ((tid, info) <- taskInfosifinfo.running && info.executorId == execId) {

 

设置taskfailed值为true,移出此taskrunning列表中的值,重新添加taskpendingtasks队列中。

 

handleFailedTask(tid, TaskState.FAILED, None)

 

}

 

}

 

 

 

DAGScheduler处理CompletionEvent事件。

 

...........................

 

casecompletion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>

 

listenerBus.post(SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics))

 

handleTaskCompletion(completion)

 

.........................

 

case Resubmitted =>

 

logInfo("Resubmitted " + task + ", so marking it as still running")

 

pendingTasks(stage) += task

 

 

 

(TaskState.FAILED, TaskState.KILLED, TaskState.LOST)状态

 

.........................

 

} elseif (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {

 

taskrunning容器中移出

 

taskSet.removeRunningTask(tid)

 

此函数主要是解析出出错的信息。并通过TaskSchedulerImpl.handleFailedTask处理exception

 

taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)

 

}

 

 

 

 

 

TaskSchedulerImpl.handleFailedTask函数:

 

def handleFailedTask(

 

taskSetManager: TaskSetManager,

 

tid: Long,

 

taskState: TaskState,

 

reason: Option[TaskEndReason]) = synchronized {

 

taskSetManager.handleFailedTask(tid, taskState, reason)

 

如果task不是被KILLED掉的task,重新发起task的分配与执行操作。

 

if (taskState != TaskState.KILLED) {

 

// Need to revive offers again now that the task set manager state has been updated to

 

// reflect failed tasks that need to be re-run.

 

backend.reviveOffers()

 

}

 

}

 

 

 

TaskSetManager.handleFailedTask函数流程

 

TaskSetManager.handleFailedTask,函数,处理task执行的exception信息。

 

def handleFailedTask(tid: Long, state: TaskState, reason: Option[TaskEndReason]) {

 

valinfo = taskInfos(tid)

 

if (info.failed) {

 

return

 

}

 

removeRunningTask(tid)

 

valindex = info.index

 

info.markFailed()

 

varfailureReason = "unknown"

 

if (!successful(index)) {

 

logWarning("Lost TID %s (task %s:%d)".format(tid, taskSet.id, index))

 

copiesRunning(index) -= 1

 

如果是通过TaskSetManager.executorLost函数发起的此函数调用(Task.LOST),下面的case部分不会执行,

 

否则是task的执行exception情况,也就是状态更新中非Task.LOST状态时。

 

// Check if the problem is a map output fetch failure. In that case, this

 

// task will never succeed on any node, so tell the scheduler about it.

 

reason.foreach {

 

casefetchFailed: FetchFailed =>

 

读取失败,移出所有此tasksettask执行。并从scheduler中移出此taskset的调度,不再执行下面流程

 

logWarning("Loss was due to fetch failure from " + fetchFailed.bmAddress)

 

sched.dagScheduler.taskEnded(tasks(index), fetchFailed, null, null, info, null)

 

successful(index) = true

 

tasksSuccessful += 1

 

sched.taskSetFinished(this)

 

removeAllRunningTasks()

 

return

 

 

 

case TaskKilled =>

 

taskkill掉,移出此task,同时不再执行下面流程

 

logWarning("Task %d was killed.".format(tid))

 

sched.dagScheduler.taskEnded(tasks(index), reason.get, null, null, info, null)

 

return

 

 

 

caseef: ExceptionFailure =>

 

sched.dagScheduler.taskEnded(

 

tasks(index), ef, null, null, info, ef.metrics.getOrElse(null))

 

if (ef.className == classOf[NotSerializableException].getName()) {

 

// If the task result wasn't rerializable, there's no point in trying to re-execute it.

 

logError("Task %s:%s had a not serializable result: %s; not retrying".format(

 

taskSet.id, index, ef.description))

 

abort("Task %s:%s had a not serializable result: %s".format(

 

taskSet.id, index, ef.description))

 

return

 

}

 

valkey = ef.description

 

failureReason = "Exception failure: %s".format(ef.description)

 

valnow = clock.getTime()

 

val (printFull, dupCount) = {

 

if (recentExceptions.contains(key)) {

 

val (dupCount, printTime) = recentExceptions(key)

 

if (now - printTime > EXCEPTION_PRINT_INTERVAL) {

 

recentExceptions(key) = (0, now)

 

(true, 0)

 

} else {

 

recentExceptions(key) = (dupCount + 1, printTime)

 

(false, dupCount + 1)

 

}

 

} else {

 

recentExceptions(key) = (0, now)

 

(true, 0)

 

}

 

}

 

if (printFull) {

 

vallocs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString))

 

logWarning("Loss was due to %s\n%s\n%s".format(

 

ef.className, ef.description, locs.mkString("\n")))

 

} else {

 

logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount))

 

}

 

 

 

case TaskResultLost =>

 

failureReason = "Lost result for TID %s on host %s".format(tid, info.host)

 

logWarning(failureReason)

 

sched.dagScheduler.taskEnded(tasks(index), TaskResultLost, null, null, info, null)

 

 

 

case _ => {}

 

}

 

重新把task添加到pending的执行队列中,同时如果状态非KILLED的状态,设置并检查是否达到重试的最大次数

 

// On non-fetch failures, re-enqueue the task as pending for a max number of retries

 

addPendingTask(index)

 

if (state != TaskState.KILLED) {

 

numFailures(index) += 1

 

if (numFailures(index) >= maxTaskFailures) {

 

logError("Task %s:%d failed %d times; aborting job".format(

 

taskSet.id, index, maxTaskFailures))

 

abort("Task %s:%d failed %d times (most recent failure: %s)".format(

 

taskSet.id, index, maxTaskFailures, failureReason))

 

}

 

}

 

} else {

 

logInfo("Ignoring task-lost event for TID " + tid +

 

" because task " + index + " is already finished")

 

}

 

}

 

 

 

DAGScheduler处理taskEnded流程:

 

def taskEnded(

 

task: Task[_],

 

reason: TaskEndReason,

 

result: Any,

 

accumUpdates: Map[Long, Any],

 

taskInfo: TaskInfo,

 

taskMetrics: TaskMetrics) {

 

eventProcessActor ! CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics)

 

}

 

处理CompletionEvent事件:

 

casecompletion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>

 

listenerBus.post(SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics))

 

handleTaskCompletion(completion)

 

 

 

DAGScheduler.handleTaskCompletion

 

读取失败的case,

 

case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>

 

// Mark the stage that the reducer was in as unrunnable

 

valfailedStage = stageIdToStage(task.stageId)

 

running -= failedStage

 

failed += failedStage

 

..............................

 

// Mark the map whose fetch failed as broken in the map stage

 

valmapStage = shuffleToMapStage(shuffleId)

 

if (mapId != -1) {

 

mapStage.removeOutputLoc(mapId, bmAddress)

 

mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)

 

}

 

...........................

 

failed += mapStage

 

// Remember that a fetch failed now; this is used to resubmit the broken

 

// stages later, after a small wait (to give other tasks the chance to fail)

 

lastFetchFailureTime = System.currentTimeMillis() // TODO: Use pluggable clock

 

// TODO: mark the executor as failed only if there were lots of fetch failures on it

 

if (bmAddress != null) {

 

stage中可执行的partition中对应的executoridlocation全部移出。

 

handleExecutorLost(bmAddress.executorId, Some(task.epoch))

 

}

 

 

 

case ExceptionFailure(className, description, stackTrace, metrics) =>

 

// Do nothing here, left up to the TaskScheduler to decide how to handle user failures

 

 

 

case TaskResultLost =>

 

// Do nothing here; the TaskScheduler handles these failures and resubmits the task.

 

 

 

 

 

TaskStatus.FINISHED状态

 

此状态表示task正常完成,

 

if (state == TaskState.FINISHED) {

 

移出taskSet中的running队列中移出此task

 

taskSet.removeRunningTask(tid)

 

获取task的响应数据。

 

taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)

 

 

 

TaskResultGetter.enqueueSuccessfulTask函数:

 

 

 

def enqueueSuccessfulTask(

 

taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {

 

getTaskResultExecutor.execute(new Runnable {

 

overridedef run() {

 

try {

 

从响应的结果中得到数据,需要先执行deserialize操作。

 

valresult = serializer.get().deserialize[TaskResult[_]](serializedData) match {

 

如果result的结果小于akkaactor传输的大小,直接返回task的执行结果

 

casedirectResult: DirectTaskResult[_] => directResult

 

case IndirectTaskResult(blockId) =>

 

否则,result结果太大,通过BlockManager管理,通过blockManager拿到result的数据

 

logDebug("Fetching indirect task result for TID %s".format(tid))

 

DAGScheduler发送GettingResultEvent事件处理,

 

见下面TaskSchedulerImpl.handleTaskGettingResult函数

 

scheduler.handleTaskGettingResult(taskSetManager, tid)

 

得到task的执行结果

 

valserializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)

 

task执行完成,并拿结果失败,见上面的错误处理中的TaskResultLost部分。

 

if (!serializedTaskResult.isDefined) {

 

/* We won't be able to get the task result if the machine that ran the task failed

 

* between when the task ended and when we tried to fetch the result, or if the

 

* block manager had to flush the result. */

 

scheduler.handleFailedTask(

 

taskSetManager, tid, TaskState.FINISHED, Some(TaskResultLost))

 

return

 

}

 

task的执行结果进行deserialized操作。

 

valdeserializedResult = serializer.get().deserialize[DirectTaskResult[_]](

 

serializedTaskResult.get)

 

拿到执行结果,移出对应的blockid

 

sparkEnv.blockManager.master.removeBlock(blockId)

 

deserializedResult

 

}

 

result.metrics.resultSize = serializedData.limit()

 

见下面的TaskSchedulerImpl.handleSuccessfulTask处理函数。

 

scheduler.handleSuccessfulTask(taskSetManager, tid, result)

 

} catch {

 

casecnf: ClassNotFoundException =>

 

valloader = Thread.currentThread.getContextClassLoader

 

taskSetManager.abort("ClassNotFound with classloader: " + loader)

 

caseex: Throwable =>

 

taskSetManager.abort("Exception while deserializing and fetching task: %s".format(ex))

 

}

 

}

 

})

 

}

 

 

 

TaskSchedulerImpl.handleTaskGettingResult函数:

 

 

 

def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long) {

 

taskSetManager.handleTaskGettingResult(tid)

 

}

 

taskSetManager

 

def handleTaskGettingResult(tid: Long) = {

 

valinfo = taskInfos(tid)

 

info.markGettingResult()

 

sched.dagScheduler.taskGettingResult(tasks(info.index), info)

 

}

 

通过DAGScheduler发起GettingResultEvent事件。

 

def taskGettingResult(task: Task[_], taskInfo: TaskInfo) {

 

eventProcessActor ! GettingResultEvent(task, taskInfo)

 

}

 

 

 

GettingResultEvent事件的处理:其实就是打个酱油,无实际处理操作。

 

case GettingResultEvent(task, taskInfo) =>

 

listenerBus.post(SparkListenerTaskGettingResult(task, taskInfo))

 

 

 

 

 

TaskSchedulerImpl.handleSuccessfulTask处理函数:

 

def handleSuccessfulTask(

 

taskSetManager: TaskSetManager,

 

tid: Long,

 

taskResult: DirectTaskResult[_]) = synchronized {

 

taskSetManager.handleSuccessfulTask(tid, taskResult)

 

}

 

TastSetManager

 

def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]) = {

 

valinfo = taskInfos(tid)

 

valindex = info.index

 

info.markSuccessful()

 

running队列中移出此task

 

removeRunningTask(tid)

 

if (!successful(index)) {

 

logInfo("Finished TID %s in %d ms on %s (progress: %d/%d)".format(

 

tid, info.duration, info.host, tasksSuccessful, numTasks))

 

dagscheduler发送success消息,

 

sched.dagScheduler.taskEnded(

 

tasks(index), Success, result.value, result.accumUpdates, info, result.metrics)

 

设置成功完成的task个数加一,同时在successful容器中设置task对应的运行状态为true,表示成功。

 

// Mark successful and stop if all the tasks have succeeded.

 

tasksSuccessful += 1

 

successful(index) = true

 

如果完成的task个数,达到task的总个数,完成此taskset,也就相当于完成了一个rdd

 

if (tasksSuccessful == numTasks) {

 

sched.taskSetFinished(this)

 

}

 

} else {

 

logInfo("Ignorning task-finished event for TID " + tid + " because task " +

 

index + " has already completed successfully")

 

}

 

}

 

 

 

DAGScheduler处理CompletionEventSuccess,,,,

 

case Success =>

 

logInfo("Completed " + task)

 

if (event.accumUpdates != null) {

 

Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted

 

}

 

把等待执行队列中移出此task

 

pendingTasks(stage) -= task

 

stageToInfos(stage).taskInfos += event.taskInfo -> event.taskMetrics

 

根据task的执行类型,处理两个类型的Task

 

taskmatch {

 

如果taskResultTask,表示不需要shuffle操作

 

casert: ResultTask[_, _] =>

 

resultStageToJob.get(stage) match {

 

case Some(job) =>

 

如果此执行的stageActiveJob中对应此taskpartition存储的finished标志为false,

 

if (!job.finished(rt.outputId)) {

 

设置task的完成标志为true

 

job.finished(rt.outputId) = true

 

job中完成的task个数加一,同时检查是否所有的task都完成,如果所有task都完成,

 

从相关的容器中移出此job与对应的stage.

 

job.numFinished += 1

 

// If the whole job has finished, remove it

 

if (job.numFinished == job.numPartitions) {

 

idToActiveJob -= stage.jobId

 

activeJobs -= job

 

resultStageToJob -= stage

 

markStageAsFinished(stage)

 

jobIdToStageIdsRemove(job.jobId)

 

listenerBus.post(SparkListenerJobEnd(job, JobSucceeded))

 

}

 

调用ActiveJob内的JobWaiter.taskSucceeded函数,更新此task为完成,同时把result传入进行输出处理。

 

job.listener.taskSucceeded(rt.outputId, event.result)

 

}

 

case None =>

 

logInfo("Ignoring result from " + rt + " because its job has finished")

 

}

 

针对shuffletask的执行完成,处理流程:

 

casesmt: ShuffleMapTask =>

 

valstatus = event.result.asInstanceOf[MapStatus]

 

valexecId = status.location.executorId

 

logDebug("ShuffleMapTask finished on " + execId)

 

if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {

 

logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)

 

} else {

 

shuffleresult(MapStatus)写入到stageoutputLoc中。每添加一个会把numAvailableOutputs的值加一,

 

numAvailableOutputs的值==numPartitions的值时,表示shufflemap执行完成。

 

stage.addOutputLoc(smt.partitionId, status)

 

}

 

如果此stage还处在running状态,同时pendingTasks中所有的task已经处理完成

 

if (running.contains(stage) && pendingTasks(stage).isEmpty) {

 

更新stage的状态

 

markStageAsFinished(stage)

 

.......................................

 

 

 

此处表示shufflestage处理完成,把shuffleidstageoutputLocs注册到mapOutputTracker中。

 

把每一个shuffle taks执行的executorhost等信息,每一个task执行完成的大小。注册到mapoutput中。

 

每一个taskshufflewriter都会有shuffleid的信息,注册成功后,

 

下一个stage会根据mapoutputtracker中此shuffleid的信息读取数据。

 

mapOutputTracker.registerMapOutputs(

 

stage.shuffleDep.get.shuffleId,

 

stage.outputLocs.map(list => if (list.isEmpty) nullelse list.head).toArray,

 

changeEpoch = true)

 

}

 

clearCacheLocs()

 

stage中每一个partitionoutputLoc默认值为Nil,如果发现有partition的值为Nil,表示有task处理失败,

 

重新提交此stage.此时会把没有成功的task重新执行。

 

if (stage.outputLocs.exists(_ == Nil)) {

 

.........................................

 

submitStage(stage)

 

} else {

 

valnewlyRunnable = new ArrayBuffer[Stage]

 

for (stage <- waiting) {

 

logInfo("Missing parents for " + stage + ": " + getMissingParentStages(stage))

 

}

 

此处检查下面未执行的所有的stage,如果stage(RDD)的上级shuffle依赖完成,

 

或者后面所有的stage不再有shufflestage的所有stage,拿到这些个stage.

 

for (stage <- waitingif getMissingParentStages(stage) == Nil) {

 

newlyRunnable += stage

 

}

 

执行此stage后面的所有可执行的stage,waiting中移出要执行的stage,

 

waiting --= newlyRunnable

 

running队列中添加要执行的新的stage.

 

running ++= newlyRunnable

 

for {

 

stage <- newlyRunnable.sortBy(_.id)

 

jobId <- activeJobForStage(stage)

 

} {

 

提交下一个stagetask分配与执行。

 

logInfo("Submitting " + stage + " (" + stage.rdd + "), which is now runnable")

 

submitMissingTasks(stage, jobId)

 

}

 

}

 

}

 

}

 

 

 

JobWaiter.taskSucceeded函数,

 

task完成后的处理函数。

 

override def taskSucceeded(index: Int, result: Any): Unit = synchronized {

 

if (_jobFinished) {

 

thrownew UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter")

 

}

 

通过resultHandler函数把结果进行处理。此函数是生成JobWaiter时传入

 

resultHandler(index, result.asInstanceOf[T])

 

把完成的task值加一

 

finishedTasks += 1

 

if (finishedTasks == totalTasks) {

 

如果完成的task个数等于所有的task的个数时,设置job的完成状态为true,并设置状态为JobSucceeded

 

如果设置为true,表示job执行完成,前面的等待执行完成结束等待。

 

_jobFinished = true

 

jobResult = JobSucceeded

 

this.notifyAll()

 

}

 

}

 

 

 

 

 

Task.runTask函数实现

 

Task的实现分为两类,

 

需要进行shuffle操作的ShuffleMapTask,

 

不需要进行shuffle操作的ResultTask.

 

 

 

ResulitTask.runTask

 

override def runTask(context: TaskContext): U = {

 

metrics = Some(context.taskMetrics)

 

try {

 

此处通过生成task实例时也就是DAGSchedulerrunJob时传入的function进行处理

 

比如在PairRDDFunction.saveAsHadoopDataset中定义的writeToFile函数

 

rdd.iterator中会根据不现的RDD的实现,执行其compute函数,

 

compute函数具体执行通过业务代码中定义的如map函数传入的定义的function进行执行,

 

func(context, rdd.iterator(split, context))

 

} finally {

 

context.executeOnCompleteCallbacks()

 

}

 

}

 

 

 

ShuffleMapTask.runTask

 

 

 

override def runTask(context: TaskContext): MapStatus = {

 

valnumOutputSplits = dep.partitioner.numPartitions

 

metrics = Some(context.taskMetrics)

 

 

 

valblockManager = SparkEnv.get.blockManager

 

valshuffleBlockManager = blockManager.shuffleBlockManager

 

varshuffle: ShuffleWriterGroup = null

 

varsuccess = false

 

 

 

try {

 

通过shuffleId拿到一个shuffle的写入实例

 

// Obtain all the block writers for shuffle blocks.

 

valser = SparkEnv.get.serializerManager.get(dep.serializerClass, SparkEnv.get.conf)

 

shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)

 

执行rdd.iterator操作,生成Pair,也就是Product2,根据key重新shuffle到不同的文件中。

 

当所有的shuffletask完成后,会把此stage注册到 mapOutputTracker中,

 

等待下一个stage从中读取数据并执行其它操作,每一个shuffletask完成后会生成一个MapStatus实例,

 

此实例主要包含有shuffle执行的executorhost等信息,每一个task执行完成的大小。

 

具体的shuffle数据读取可参见后面的shufle分析.

 

// Write the map output to its associated buckets.

 

for (elem <- rdd.iterator(split, context)) {

 

valpair = elem.asInstanceOf[Product2[Any, Any]]

 

valbucketId = dep.partitioner.getPartition(pair._1)

 

shuffle.writers(bucketId).write(pair)

 

}

 

 

 

// Commit the writes. Get the size of each bucket block (total block size).

 

vartotalBytes = 0L

 

vartotalTime = 0L

 

valcompressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter =>

 

writer.commit()

 

writer.close()

 

valsize = writer.fileSegment().length

 

totalBytes += size

 

totalTime += writer.timeWriting()

 

MapOutputTracker.compressSize(size)

 

}

 

 

 

// Update shuffle metrics.

 

valshuffleMetrics = new ShuffleWriteMetrics

 

shuffleMetrics.shuffleBytesWritten = totalBytes

 

shuffleMetrics.shuffleWriteTime = totalTime

 

metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)

 

 

 

success = true

 

new MapStatus(blockManager.blockManagerId, compressedSizes)

 

} catch { casee: Exception =>

 

// If there is an exception from running the task, revert the partial writes

 

// and throw the exception upstream to Spark.

 

if (shuffle != null && shuffle.writers != null) {

 

for (writer <- shuffle.writers) {

 

writer.revertPartialWrites()

 

writer.close()

 

}

 

}

 

throwe

 

} finally {

 

// Release the writers back to the shuffle block manager.

 

if (shuffle != null && shuffle.writers != null) {

 

shuffle.releaseWriters(success)

 

}

 

// Execute the callbacks on task completion.

 

context.executeOnCompleteCallbacks()

 

}

 

}

 

0
0
分享到:
评论

相关推荐

    Apache Spark源码走读之3 -- Task运行期之函数调用关系分析

    #### Task执行流程分析 当一个作业被提交时,DAGScheduler会根据RDD的依赖关系构建出一个有向无环图(DAG),并将这个DAG划分为一系列的Stage。每个Stage由一系列的Task组成,这些Task将会被分配到不同的Executor上...

    Hadoop源代码分析(MapTask)

    Hadoop源代码分析(MapTask) Hadoop的MapTask类是Hadoop MapReduce框架中的一部分,负责执行Map任务。MapTask类继承自Task类,是MapReduce框架中的一个重要组件。本文将对MapTask类的源代码进行分析,了解其内部...

    自定义AntTask简单实例

    一个自定义Task通常继承自`org.apache.tools.ant.Task`类,并重写`execute()`方法,这是Task执行时调用的核心方法。在这个方法中,你可以编写自定义的任务逻辑。 下面是一个名为`HelloWorldTask`的简单示例: ```...

    State Decision Task活动详解

    在流程执行过程中遇到 Decision 节点时,系统会根据预设的条件自动计算并选择合适的路径继续执行。 **特点:** 1. **条件判断**:每个 Decision 节点都可以设置多个条件路径。 2. **智能选择**:系统自动根据条件...

    在MTK中添加TASK与常用函数分析.rar

    在MTK平台中,添加TASK即创建一个新的进程或线程来执行特定功能。步骤通常包括: 1. 定义任务结构体:首先需要定义一个结构体,包含任务的优先级、堆栈大小和入口函数等信息。 2. 分配堆栈:为新任务分配内存空间,...

    ADF Taskflow传参例子

    在Azure Data Factory (ADF) 中,Taskflow是一个强大的特性,允许你创建复杂的、有顺序的任务执行流程。在处理大数据工作负载时,有时我们需要在不同任务之间传递参数,以实现更灵活和可重用的工作流。本篇文章将...

    通过程序调用TaskFlow

    在执行过程中,可以监听进度,处理异常,或者在需要时暂停和恢复。 4. **ADF TaskFlow_RemoteProducer**: - "ADF_TaskFlow_RemoteProducer"可能是指一个特定的TaskFlow实例,它表示一个远程执行的任务生产者。在...

    基于C++的task 类

    7. **错误处理**:考虑异常安全,确保即使在任务执行过程中抛出异常,也能正确地清理资源。 8. **性能优化**:考虑使用线程池来重用线程,避免频繁创建和销毁线程的开销。 9. **并行度控制**:根据系统资源和需求...

    Hadoop源代码分析(MapTask辅助类,III)

    MapTask执行过程中,键值对的输出是一个关键环节,它直接影响到后续的排序、合并等操作。输出缓冲区中涉及的关键变量包括`bufstart`、`bufend`、`bufmark`和`bufvoid`。这些变量共同协作,确保数据能够正确且高效...

    Hadoop源代码分析(类Task)

    【Hadoop 源代码分析:Task 类】 在Hadoop框架中,Task类是一个关键的抽象类,它是MapTask和ReduceTask的父类,分别对应Map阶段和Reduce阶段的执行单元。Task类定义了任务的基本行为和状态管理,是整个MapReduce...

    工作流中ServiceTask的各种方法

    ServiceTask 可以在执行过程中读取和修改流程变量。使用 `${variableName}` 表达式可以访问变量,而 `setVariable` 方法则用于设置变量值。 8. **边界事件与异常处理** ServiceTask 支持边界事件,例如错误、信号...

    Spark源码分析2-Driver generate jobs and launch task

    7. **结果收集**:所有的Task执行完成后,Driver会收集计算结果,并可能进行进一步的处理或存储。 文章中提到的两个.vsdx文件"spark_client_generateJob.vsd"和"spark_client_runJob.vsd"可能是流程图或架构图,...

    sonar-ant-task-2.1.jar

    其中,Sonar Ant Task就是用于集成SonarQube到Apache Ant构建系统中的关键组件,它使得在Ant构建流程中执行代码质量检查变得简单易行。 “sonar-ant-task-2.1.jar”是SonarQube Ant任务的二进制库,版本为2.1。这个...

    Hadoop源代码分析(MapTask辅助类 I)

    ### Hadoop MapTask辅助类源代码分析 #### 一、概述 Hadoop作为一个分布式计算框架,其核心组件之一是MapReduce。MapReduce负责处理大规模数据集的并行运算任务,而MapTask作为MapReduce的核心组成部分之一,其...

    透过源码看懂Flink核心框架的执行流程.pdf

    这是指在作业执行过程中周期性地保存检查点(Checkpoint),一旦发生故障,系统可以从最近的检查点恢复,确保数据处理的准确性(Exactly-Once语义)。 #### 5.1 Fault Tolerance演进之路 从Storm的...

    job task and task

    在软件工程中,构建(Build)是将源代码转换成可执行程序或库的过程。Soyatec可能是一家为华为提供服务的公司,他们提供的文档可能详细阐述了如何配置和执行华为项目的构建任务,包括编译、打包、测试等步骤。构建...

    Delmia human task simulation

    在Delmia软件中进行的人类任务模拟是一种先进的技术,它能够帮助用户在虚拟环境中精确地模拟人类操作员的工作流程和任务执行情况。通过这种模拟,可以有效地预测和分析工作环境中的各种问题,如工作效率、人体工程学...

    深入分析C# Task

    "深入分析C# Task" Task是C#中的一种异步编程模式,自.NET Framework 4中引入,主要用于实现多线程编程。Task类表示一个单个操作,不返回值,通常以异步方式执行。Task对象是一种基于任务的异步模式的中心思想,...

    Hadoop源代码分析(MapTask辅助类,II)

    在Hadoop中,MapTask是MapReduce框架的关键组件,负责执行Mapper阶段的工作。MapTask辅助类,特别是MapOutputBuffer,是Mapper输出数据管理的核心部分。本文将继续深入分析MapOutputBuffer的内部实现,以便理解...

    EditeTest_自定义Task工作流-等高线批量赋值_ae10.rar

    Task工作流是一种图形化设计的工作执行流程,常用于GIS应用程序中简化复杂任务的执行过程。用户可以通过拖放控件,设定条件和逻辑,实现一系列操作的自动化。 首先,我们需要理解等高线批量赋值的概念。等高线是...

Global site tag (gtag.js) - Google Analytics