in this section,we will verify that how does spark collect data from prevous stage to next stage(result task)
figure after finishing ShuffleMapTask computation(ie post process ).note:the last method 'reviveOffers()' is redundant in this mode as the step 13 will setup next stage(reuslttask) there!
preparation for ResultTask(pre-process in reduce side)
figure of running ResultTask(reduce)
=====
-ResultTask core method
override def runTask(context: TaskContext): U = { // Deserialize the RDD and the func using the broadcast variables. val deserializeStartTime = System.currentTimeMillis() val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime //-no log is extended by this class,so use console println("-func %s in this task %s,serializer %s,rdd %s".format(func.getClass,toString,ser,rdd)) metrics = Some(context.taskMetrics) //-eg. func is as ScalaWordCount#collect()>SparkContext#runJob().p1769,ie iterator.toArray() func(context, rdd.iterator(partition, context)) }
*redudent code path: LocalEndpoint#receive() case StatusUpdate(taskId, state, serializedData) => scheduler.statusUpdate(taskId, state, serializedData) //-async op if (TaskState.isFinished(state)) { freeCores += scheduler.CPUS_PER_TASK //-release cpu resource assigned in reviveOffers() logInfo("**startup next offers..") //-same effect if comment below reviveOffers() //-TBD redundant code path no any tasks issued within code paths below def reviveOffers() { //-construct one executor with several cores,so tasks can be issued concurrently val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) //-offset one exer logInfo(s"-launching tasks,executor size ${offers.size}") for (task <- scheduler.resourceOffers(offers).flatten) { logInfo("-task:" + task + ",freeCores:" + freeCores) freeCores -= scheduler.CPUS_PER_TASK //-in locally running,no limit is enable //-processed result will be writen back via Executor.TaskRunner#run() executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber, task.name, task.serializedTask) //-note:differnet mode will deliver tasks differently } logInfo(s"-finished launching tasks!") } detailed logs== 2016-08-10 16:21:03,016 INFO [sparkDriver-akka.actor.default-dispatcher-3] local.LocalEndpoint (Logging.scala:logInfo(59)) - **startup next offers.. 2016-08-10 16:21:03,017 INFO [sparkDriver-akka.actor.default-dispatcher-3] local.LocalEndpoint (Logging.scala:logInfo(59)) - -launching tasks,executor size 1 2016-08-10 16:21:03,017 INFO [sparkDriver-akka.actor.default-dispatcher-3] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - parentName: , name: TaskSet_0, runningTasks: 1 2016-08-10 16:21:03,018 INFO [sparkDriver-akka.actor.default-dispatcher-3] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - -max locality NO_PREF,launchedTask false,taskset org.apache.spark.scheduler.TaskSetManager@432cf6ba 2016-08-10 16:21:03,019 INFO [sparkDriver-akka.actor.default-dispatcher-3] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - -max locality ANY,launchedTask false,taskset org.apache.spark.scheduler.TaskSetManager@432cf6ba 2016-08-10 16:21:03,020 INFO [sparkDriver-akka.actor.default-dispatcher-3] local.LocalEndpoint (Logging.scala:logInfo(59)) - -finished launching tasks! 2016-08-10 16:21:03,021 INFO [sparkDriver-akka.actor.default-dispatcher-3] scheduler.TaskSetManager (Logging.scala:logInfo(59)) - -removing task id 0,parent org.apache.spark.scheduler.Pool@2061b97c 2016-08-10 16:21:03,021 INFO [sparkDriver-akka.actor.default-dispatcher-3] local.LocalEndpoint (Logging.scala:logInfo(59)) - **startup next offers.. 2016-08-10 16:21:03,022 INFO [sparkDriver-akka.actor.default-dispatcher-3] local.LocalEndpoint (Logging.scala:logInfo(59)) - -launching tasks,executor size 1 2016-08-10 16:21:03,022 INFO [sparkDriver-akka.actor.default-dispatcher-3] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - parentName: , name: TaskSet_0, runningTasks: 0 2016-08-10 16:21:03,021 INFO [task-result-getter-0] scheduler.TaskResultGetter (Logging.scala:logInfo(59)) - -deserialized result from tid 1 2016-08-10 16:21:03,022 INFO [sparkDriver-akka.actor.default-dispatcher-3] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - -max locality NO_PREF,launchedTask false,taskset org.apache.spark.scheduler.TaskSetManager@432cf6ba 2016-08-10 16:21:03,023 INFO [sparkDriver-akka.actor.default-dispatcher-3] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - -max locality ANY,launchedTask false,taskset org.apache.spark.scheduler.TaskSetManager@432cf6ba 2016-08-10 16:21:03,023 INFO [sparkDriver-akka.actor.default-dispatcher-3] local.LocalEndpoint (Logging.scala:logInfo(59)) - -finished launching tasks! 2016-08-10 16:21:03,023 INFO [task-result-getter-1] scheduler.TaskResultGetter (Logging.scala:logInfo(59)) - -deserialized result from tid 0 //but the previous task’s result deserialization is started up now in enqueueSuccessfulTask() in TaskResultGetter
ref:
相关推荐
Nginx上传模块该模块基于Nginx上传模块(v 2.2.0) 。 ...由于似乎作者没有维护该... } # Upload form should be submitted to this location location /upload { # Pass altered request body to this location uploa
A project model for the FreeBSD Project Niklas Saers Copyright © 2002-2005 Niklas Saers [ Split HTML / Single HTML ] Table of Contents Foreword 1 Overview 2 Definitions 2.1. Activity 2.2. Process ...
18/05/25 19:51:35 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1527248744555_0001 18/05/25 19:51:36 INFO impl.YarnClientImpl: Submitted application application_1527248744555_0001 18/05/...
Enhancement Proposal (PEP) may be submitted if your idea gains ground. All current PEPs, as well as guidelines for submitting a new PEP, are listed at http://www.python.org/dev/peps/. Release ...
with correctly recognized figure titles, and 70.98% for part label detection and character recognition. Data and software from the competition are available through the online UCI Machine Learning ...
We are delighted to introduce the proceedings of the second edition of the 2017 European Alliance for Innovation (EAI) International Conference on Machine Learning and Intelligent Communications ...
ICS V7 is a stable release that may still be updated for major bugs, but not for new releases of Delphi, latest it supported was XE3. ICS V8 is the current development release which is held in a ...
Simultaneous Localization and Mapping (SLAM) technology is a core research topic in mobile robotics, unmanned aerial vehicles (UAVs), and autonomous driving fields. It enables robots, drones, and self...
For-Hire Vehicle (“FHV”) trip records include fields capturing the dispatching base license number and the pick-up date, time, and taxi zone location ID (shape file below). These records are ...
submitted SEP 2015 (https://sites.google.com/site/jwkang10/) -EFLA : J. Liu, L. Yuan, and J. Ye. An efficient algorithm for a class of fused lasso problems, proc of ACM SIGKDD Conference on Knowledge ...
It is now accepted for submitted articles, both in Elsevier's electronic submission system and elsewhere. Elsevier's previous document class for typeset articles, elsart.cls, is now over 10 years ...
中心表单提交 基于约定的功能,用于向Hubspot提交数据。...const submitted = await hubspotSubmit ( 'your-hubspot-id' , 'hubspot-form-id' , { email : 'user-email@gmail.com' , message : 'User information...'
This short tutorial provides the steps for searching for and deleting lines by writing a simple script. Parsing XML files and editing XML files Parsing XML can be a time-consuming task, especially ...
protection for its customers, Hortonworks uses a holistic approach based on five core security features: • Administration • Authentication and perimeter security • Authorization • Audit • Data ...
We also give our most thanks to all the authors of the submitted papers to make this conference successful in the good paper quality for presenta- tions. We are grateful to Springer for publishing ...
Understanding and Improving Bloom Filter Configurationfor Lazy Address-Set DisambiguationbyMark C. JeffreyA thesis submitted in conformity with the requirements for the degree of Master of Applied ...
V2.2-2017年12月 Instructions for using the maxwell program. The mathematical formulation is provided in the references. 参考: Smith, B. and D. Sandwell, A three-dimensional semianalytic viscoelastic ...
Doctoral Thesis Sundsvall 2005Alternating ...Mid Sweden University Doctoral Thesis 1ISBN 91-85317-08-XA dissertation submitted to the Mid Sweden University, Sweden, in partial fulfillment of the re
Or you can simply keep the one i have provided (Its bug free i promise) -Uses Downloader.ocx (submitted by sean gallardy, you can search for his name and find the source code for the downloader on ...
Federal Information Processing Standard (FIPS) 2011 was developed to establish standards for identity credentials for federal employees and federal contractors. This document provides the ...