`

[spark-src-core] 2.2 job submitted flow for local mode-part II

 
阅读更多

  in this section,we will verify that  how does spark collect data from prevous stage to next stage(result task)

 



 
  figure after finishing ShuffleMapTask computation(ie post process ).note:the last method 'reviveOffers()' is redundant in this mode as the step 13 will setup next stage(reuslttask) there!

 

 

    preparation for ResultTask(pre-process in reduce side)

 



   figure of running ResultTask(reduce)

 

=====

 

-ResultTask core method

override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val deserializeStartTime = System.currentTimeMillis()
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
    //-no log is extended by this class,so use console
    println("-func %s in this task %s,serializer %s,rdd %s".format(func.getClass,toString,ser,rdd))
    metrics = Some(context.taskMetrics)
    //-eg. func is as ScalaWordCount#collect()>SparkContext#runJob().p1769,ie iterator.toArray()
    func(context, rdd.iterator(partition, context))
  }

 

*redudent code path:
  LocalEndpoint#receive()
case StatusUpdate(taskId, state, serializedData) =>
      scheduler.statusUpdate(taskId, state, serializedData) //-async op
      if (TaskState.isFinished(state)) {
        freeCores += scheduler.CPUS_PER_TASK //-release cpu resource assigned in reviveOffers()
        logInfo("**startup next offers..")
        //-same effect if comment below
        reviveOffers()  //-TBD redundant code path
      

  no any tasks issued within code paths below
def reviveOffers() {
    //-construct one executor with several cores,so tasks can be issued concurrently
    val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) //-offset one exer
    logInfo(s"-launching tasks,executor size ${offers.size}")
    for (task <- scheduler.resourceOffers(offers).flatten) {
      logInfo("-task:" + task + ",freeCores:" + freeCores)
      freeCores -= scheduler.CPUS_PER_TASK  //-in locally running,no limit is enable
      //-processed result will be writen back via Executor.TaskRunner#run()
      executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber,
        task.name, task.serializedTask) //-note:differnet mode will deliver tasks differently
    }
    logInfo(s"-finished launching tasks!")
}  
detailed logs==
2016-08-10 16:21:03,016 INFO  [sparkDriver-akka.actor.default-dispatcher-3] local.LocalEndpoint (Logging.scala:logInfo(59)) - **startup next offers..
2016-08-10 16:21:03,017 INFO  [sparkDriver-akka.actor.default-dispatcher-3] local.LocalEndpoint (Logging.scala:logInfo(59)) - -launching tasks,executor size 1
2016-08-10 16:21:03,017 INFO  [sparkDriver-akka.actor.default-dispatcher-3] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - parentName: , name: TaskSet_0, runningTasks: 1
2016-08-10 16:21:03,018 INFO  [sparkDriver-akka.actor.default-dispatcher-3] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - -max locality NO_PREF,launchedTask false,taskset org.apache.spark.scheduler.TaskSetManager@432cf6ba
2016-08-10 16:21:03,019 INFO  [sparkDriver-akka.actor.default-dispatcher-3] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - -max locality ANY,launchedTask false,taskset org.apache.spark.scheduler.TaskSetManager@432cf6ba
2016-08-10 16:21:03,020 INFO  [sparkDriver-akka.actor.default-dispatcher-3] local.LocalEndpoint (Logging.scala:logInfo(59)) - -finished launching tasks!
2016-08-10 16:21:03,021 INFO  [sparkDriver-akka.actor.default-dispatcher-3] scheduler.TaskSetManager (Logging.scala:logInfo(59)) - -removing task id 0,parent org.apache.spark.scheduler.Pool@2061b97c
2016-08-10 16:21:03,021 INFO  [sparkDriver-akka.actor.default-dispatcher-3] local.LocalEndpoint (Logging.scala:logInfo(59)) - **startup next offers..
2016-08-10 16:21:03,022 INFO  [sparkDriver-akka.actor.default-dispatcher-3] local.LocalEndpoint (Logging.scala:logInfo(59)) - -launching tasks,executor size 1
2016-08-10 16:21:03,022 INFO  [sparkDriver-akka.actor.default-dispatcher-3] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - parentName: , name: TaskSet_0, runningTasks: 0
2016-08-10 16:21:03,021 INFO  [task-result-getter-0] scheduler.TaskResultGetter (Logging.scala:logInfo(59)) - -deserialized result from tid 1
2016-08-10 16:21:03,022 INFO  [sparkDriver-akka.actor.default-dispatcher-3] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - -max locality NO_PREF,launchedTask false,taskset org.apache.spark.scheduler.TaskSetManager@432cf6ba
2016-08-10 16:21:03,023 INFO  [sparkDriver-akka.actor.default-dispatcher-3] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - -max locality ANY,launchedTask false,taskset org.apache.spark.scheduler.TaskSetManager@432cf6ba
2016-08-10 16:21:03,023 INFO  [sparkDriver-akka.actor.default-dispatcher-3] local.LocalEndpoint (Logging.scala:logInfo(59)) - -finished launching tasks!
2016-08-10 16:21:03,023 INFO  [task-result-getter-1] scheduler.TaskResultGetter (Logging.scala:logInfo(59)) - -deserialized result from tid 0	//but the previous task’s result deserialization is started up now in enqueueSuccessfulTask() in TaskResultGetter

 

ref:

[spark-src-core] 2.2 job submitted flow for local mode-part I

  • 大小: 89.1 KB
  • 大小: 128.2 KB
  • 大小: 80.1 KB
0
1
分享到:
评论

相关推荐

    nginx-upload-module

    Nginx上传模块该模块基于Nginx上传模块(v 2.2.0) 。 ...由于似乎作者没有维护该... } # Upload form should be submitted to this location location /upload { # Pass altered request body to this location uploa

    a project model for the FreeBSD Project.7z

    A project model for the FreeBSD Project Niklas Saers Copyright © 2002-2005 Niklas Saers [ Split HTML / Single HTML ] Table of Contents Foreword 1 Overview 2 Definitions 2.1. Activity 2.2. Process ...

    hadoop 2.7.6 eclipse插件

    18/05/25 19:51:35 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1527248744555_0001 18/05/25 19:51:36 INFO impl.YarnClientImpl: Submitted application application_1527248744555_0001 18/05/...

    Python安装包version 3.1.5

    Enhancement Proposal (PEP) may be submitted if your idea gains ground. All current PEPs, as well as guidelines for submitting a new PEP, are listed at http://www.python.org/dev/peps/. Release ...

    Competition-Based Development of Image Processing Algorithms

    with correctly recognized figure titles, and 70.98% for part label detection and character recognition. Data and software from the competition are available through the online UCI Machine Learning ...

    Machine Learning and Intelligent Communications_MLICOM,Part I-Springer(2017)

    We are delighted to introduce the proceedings of the second edition of the 2017 European Alliance for Innovation (EAI) International Conference on Machine Learning and Intelligent Communications ...

    ICS delphixe10源码版

    ICS V7 is a stable release that may still be updated for major bugs, but not for new releases of Delphi, latest it supported was XE3. ICS V8 is the current development release which is held in a ...

    基于改进光流法的视觉SLAM1

    Simultaneous Localization and Mapping (SLAM) technology is a core research topic in mobile robotics, unmanned aerial vehicles (UAVs), and autonomous driving fields. It enables robots, drones, and self...

    For-Hire Vehicle Trip Records 2023 01

    For-Hire Vehicle (“FHV”) trip records include fields capturing the dispatching base license number and the pick-up date, time, and taxi zone location ID (shape file below). These records are ...

    装箱问题matlab代码-ssamp:演示具有一维有限差分稀疏性的压缩感知(CS)恢复

    submitted SEP 2015 (https://sites.google.com/site/jwkang10/) -EFLA : J. Liu, L. Yuan, and J. Ye. An efficient algorithm for a class of fused lasso problems, proc of ACM SIGKDD Conference on Knowledge ...

    elsarticle.zip

    It is now accepted for submitted articles, both in Elsevier's electronic submission system and elsewhere. Elsevier's previous document class for typeset articles, elsart.cls, is now over 10 years ...

    hubspot-form-submit

    中心表单提交 基于约定的功能,用于向Hubspot提交数据。...const submitted = await hubspotSubmit ( 'your-hubspot-id' , 'hubspot-form-id' , { email : 'user-email@gmail.com' , message : 'User information...'

    UE(官方下载)

    This short tutorial provides the steps for searching for and deleting lines by writing a simple script. Parsing XML files and editing XML files Parsing XML can be a time-consuming task, especially ...

    sec_hdp_security_overview.pdf

    protection for its customers, Hortonworks uses a holistic approach based on five core security features: • Administration • Authentication and perimeter security • Authorization • Audit • Data ...

    Security with Intelligent Computing and Big-data Services-Springer(2018).pdf

    We also give our most thanks to all the authors of the submitted papers to make this conference successful in the good paper quality for presenta- tions. We are grateful to Springer for publishing ...

    Understanding and Improving Bloom Filter Configuration for Lazy Address-set Disambiguation (2011)-计算机科学

    Understanding and Improving Bloom Filter Configurationfor Lazy Address-Set DisambiguationbyMark C. JeffreyA thesis submitted in conformity with the requirements for the degree of Master of Applied ...

    matlab集成c代码-fftfault:地震周期建模代码

    V2.2-2017年12月 Instructions for using the maxwell program. The mathematical formulation is provided in the references. 参考: Smith, B. and D. Sandwell, A three-dimensional semianalytic viscoelastic ...

    Alternating Coding and its Decoder Architectures for Unary-Prefixed Codes - PhD Thesis, Shang Xue (2005)-计算机科学

    Doctoral Thesis Sundsvall 2005Alternating ...Mid Sweden University Doctoral Thesis 1ISBN 91-85317-08-XA dissertation submitted to the Mid Sweden University, Sweden, in partial fulfillment of the re

    P2P代码

    Or you can simply keep the one i have provided (Its bug free i promise) -Uses Downloader.ocx (submitted by sean gallardy, you can search for his name and find the source code for the downloader on ...

    NIST SP800-87r2.pdf

    Federal Information Processing Standard (FIPS) 2011 was developed to establish standards for identity credentials for federal employees and federal contractors. This document provides the ...

Global site tag (gtag.js) - Google Analytics