`

Spark源码分析8-client 如何选择将task提交给那个excutor

 
阅读更多

spark中很重要的一点就是task具体分配到哪个excutor上执行,如果分配不合理,将会消耗很多额外的资源。例如:executor1用flume receiver接收到数据,并将数据保存到block1上,excutor2用flume receiver接收到数据,并将数据保存到block2上。RDD将有两个patition,将对应产生两个task. task1处理block1,task2处理block2.如果将 task1分配到excutor2上去处理,那么excutor2将需要从excutor1上拿到block1,然后再计算,这样就加重 了数据传输的消耗。那么spark是如何来选择的呢?spark是通过RDD的getPreferredLocations来确定某一个partition期望分配到哪个executor的。下面这个流程图中显示在创建Task的时候会先调用getPreferredLocations()这个函数获取当前patition的期望运行的位置,在addPendingTask()函数中预先将task加到各个列表中 



 以下是具体的代码,以及例子

 

//通过resourceOffers来为每个work确定需要提交的task。
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { 
var launchedTask = false
    // TaskLocality.values  is PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY
    for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
      do {
        launchedTask = false
        for (i <- 0 until offers.size) {
          val execId = offers(i).executorId
          val host = offers(i).host
          for (task <- taskSet.resourceOffer(execId, host, availableCpus(i), maxLocality)) {
            tasks(i) += task
            val tid = task.taskId
            taskIdToTaskSetId(tid) = taskSet.taskSet.id
            taskIdToExecutorId(tid) = execId
            activeExecutorIds += execId
            executorsByHost(host) += execId
            availableCpus(i) -= 1
            launchedTask = true
          }
        }
      } while (launchedTask)
} 
}

   

//按照传入的maxLocality和AllowedLocalityLevel for current time来确定allowedLocality 
def resourceOffer(
      execId: String,
      host: String,
      availableCpus: Int,
      maxLocality: TaskLocality.TaskLocality)
    : Option[TaskDescription] =
  {
    if (!isZombie && availableCpus >= CPUS_PER_TASK) {
      val curTime = clock.getTime()
      // get the allowed locality level for current time
      var allowedLocality = getAllowedLocalityLevel(curTime)
      if (allowedLocality > maxLocality) {
        allowedLocality = maxLocality   // We're not allowed to search for farther-away tasks
      }

      findTask(execId, host, allowedLocality) match {
        case Some((index, taskLocality)) => {
          // Found a task; do some bookkeeping and return a task description
         … 
          return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask))
        }
        case _ =>
      }
    }
    None
  }

   

private def findTask(execId: String, host: String, locality: TaskLocality.Value)
    : Option[(Int, TaskLocality.Value)] =
  {    
    for (index <- findTaskFromList(getPendingTasksForExecutor(execId))) {
      return Some((index, TaskLocality.PROCESS_LOCAL))
    }

    if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
      for (index <- findTaskFromList(getPendingTasksForHost(host))) {
        return Some((index, TaskLocality.NODE_LOCAL))
      }
    }

    if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
      for {
        rack <- sched.getRackForHost(host)
        index <- findTaskFromList(getPendingTasksForRack(rack))
      } {
        return Some((index, TaskLocality.RACK_LOCAL))
      }
    }

    // Look for no-pref tasks after rack-local tasks since they can run anywhere.
    for (index <- findTaskFromList(pendingTasksWithNoPrefs)) {
      return Some((index, TaskLocality.PROCESS_LOCAL))
    }

    if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
      for (index <- findTaskFromList(allPendingTasks)) {
        return Some((index, TaskLocality.ANY))
      }
    }

    // Finally, if all else has failed, find a speculative task, speculative task is some task that run slowly, then we may consider to run this task on other executor of other host 
    findSpeculativeTask(execId, host, locality)
  }

 

 

具体的示例

Precondition 

Task 1 – 50 prefer Location is excutor2

Two work: excutor1 core1 core2

                    excutor2 core3 core4

Schedule task every 1 second

Procedure 

TaskSetManager

       myLocalityLevels = Process_Local, Node_local,  Any

       Locality Wait =         3s                       3s                0s

1s:   localityForCurrentTime= process_local

maxLocality = PROCESS_LOCAL   allowedLocality =  PROCESS_LOCAL     excutor1 = none  excutor2=task1

maxLocality = NODE_LOCAL  allowedLocality = PROCESS_LOCAL         excutor1 = none  excutor2=task2

maxLocality = RACK_LOCAL  allowedLocality =  PROCESS_LOCAL     excutor1 = none  excutor2= none (because core size is 2)

maxLocality = ANY  allowedLocality = PROCESS_LOCAL   excutor1 = none  excutor2= none  (because core size is 2)

 

If all task assign to excutor2 finished

2s:   localityForCurrentTime = process_local    

maxLocality = PROCESS_LOCAL   allowedLocality =  PROCESS_LOCAL     excutor1 = none  excutor2=task3

maxLocality = NODE_LOCAL  allowedLocality = PROCESS_LOCAL         excutor1 = none  excutor2=task4

maxLocality = RACK_LOCAL  allowedLocality =  PROCESS_LOCAL     excutor1 = none  excutor2= none (because core size is 2)

maxLocality = ANY  allowedLocality = PROCESS_LOCAL   excutor1 = none  excutor2= none  (because core size is 2)

 

If all task assign to excutor2 finished

3s:   localityForCurrentTime = Node_local (because localityWait for Process_Local is 3s) 

maxLocality = PROCESS_LOCAL   allowedLocality =  PROCESS_LOCAL     excutor1 = none  excutor2=task5

maxLocality = NODE_LOCAL  allowedLocality = Node_local  excutor1 = none  excutor2=task6

maxLocality = RACK_LOCAL  allowedLocality =  Node_local  excutor1 = none  excutor2= none (because core size is 2)

maxLocality = ANY  allowedLocality = Node_local  excutor1 = none  excutor2= none  (because core size is 2)

6s:   localityForCurrentTime = Any (because localityWait for Node_local is 3s) 

maxLocality = PROCESS_LOCAL   allowedLocality =  PROCESS_LOCAL  excutor1 = none  excutor2=task11

maxLocality = NODE_LOCAL  allowedLocality = NODE_LOCAL   excutor1 = none  excutor2=task12

maxLocality = RACK_LOCAL  allowedLocality =  RACK_LOCAL   excutor1 = none  excutor2= none  

 

maxLocality = ANY  allowedLocality = ANY  excutor1 = task13 excutor2= none  (allowedLocality change to ANY,now can find task from allPendingTasks list for excutor1)

总结:1. task的选择主要依赖allowedLocality,以及task的prefer location

           2.task不是一定会分配到数据所在的那台机器上,如果有台机器长时间都没有可执行的task,它会从allPendingTasks列表里面找一个task

  • 大小: 99.3 KB
分享到:
评论

相关推荐

    Spark源码分析2-Driver generate jobs and launch task

    当用户提交一个Spark程序时,Spark Driver会解析这个程序,将一系列的RDD操作转化为DAG(有向无环图),每个DAG就代表一个Job。Job会被进一步分解为Stage,Stage是任务调度的基本单位,通常一个Stage包含一组相互...

    hive-spark-client-3.1.2.jar

    hive-on-spark客户端

    Spark源码分析3-The connect between driver,master and excutor

    《Spark源码分析3——驱动器、主节点与执行器之间的连接》 在Spark的分布式计算框架中,驱动器(Driver)、主节点(Master)和执行器(Executor)是核心组件,它们之间的通信和协作构成了Spark作业执行的基础。本文将深入...

    spark2.2.0源码------

    8. **性能优化**:在2.2.0版本中,Spark引入了更多针对特定工作负载的性能优化,比如Tungsten项目的全内存编译,可以将查询计划转换为机器码,提高执行速度。 9. **Python和R支持**:对于Python和R的API也进行了...

    spark-3.1.3-bin-without-hadoop.tgz

    这个"spark-3.1.3-bin-without-hadoop.tgz"压缩包是Spark的3.1.3版本,不含Hadoop依赖的二进制发行版。这意味着在部署时,你需要自行配置Hadoop环境,或者在不依赖Hadoop的环境中运行Spark。 Spark的核心特性包括...

    spark源码:spark-master.zip

    spark源码:spark-master.zip。方便不能登录GitHub的小伙伴下载。如果实在需要留言,可以私下给。

    spark--bin-hadoop3-without-hive.tgz

    总的来说,"spark--bin-hadoop3-without-hive.tgz"提供了一个在CentOS 8和Hadoop 3.1.3环境下运行的Spark实例,不包含Hive支持,适合那些需要高效大数据处理而不依赖Hive功能的用户。要充分利用这个版本,理解Spark...

    编译的spark-hive_2.11-2.3.0和 spark-hive-thriftserver_2.11-2.3.0.jar

    spark-hive_2.11-2.3.0 spark-hive-thriftserver_2.11-2.3.0.jar log4j-2.15.0.jar slf4j-api-1.7.7.jar slf4j-log4j12-1.7.25.jar curator-client-2.4.0.jar curator-framework-2.4.0.jar curator-recipes-2.4.0....

    spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar

    spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar

    Apache Spark源码走读之3 -- Task运行期之函数调用关系分析

    1. **Task的调度**:DAGScheduler根据DAG划分出Stage,并将Stage提交给TaskScheduler。 2. **Task的分配**:TaskScheduler决定每个Stage中的Task应该在哪个Executor上运行。 3. **Task的执行**:Executor上的...

    spark-3.2.2-bin-3.0.0-cdh6.3.2

    内容概要:由于cdh6.3.2的spark版本为2.4.0,并且spark-sql被阉割,现基于cdh6.3.2,scala2.12.0,java1.8,maven3.6.3,,对spark-3.2.2源码进行编译 应用:该资源可用于cdh6.3.2集群配置spark客户端,用于spark-sql

    spark-2.4.7-bin-hadoop2.6.tgz

    此外,可以通过`spark-submit`脚本提交应用程序到Spark集群执行,或直接在Spark Shell中交互式探索数据。 总结来说,Spark 2.4.7是大数据处理领域的重要工具,它的高性能、易用性和丰富的功能使其在数据科学和工程...

    spark2.1.0-bin-hadoop2.7

    3. 作业提交:通过`spark-submit`命令,你可以将Spark应用提交到YARN集群,例如`spark-submit --master yarn --class com.example.Main /path/to/your.jar`。 总结,Spark 2.1.0与Hadoop 2.7的整合提供了强大的...

    spark-1.6.0-bin-hadoop2.6.tgz

    开发者可以根据需求选择合适的语言编写应用程序,然后使用`spark-submit`脚本来提交任务到集群。 **6. 性能调优** Spark性能优化主要包括内存管理、任务调度和数据本地性等方面。可以通过调整`spark.executor....

    spark-3.1.2.tgz & spark-3.1.2-bin-hadoop2.7.tgz.rar

    Spark-3.1.2.tgz和Spark-3.1.2-bin-hadoop2.7.tgz是两个不同格式的Spark发行版,分别以tar.gz和rar压缩格式提供。 1. Spark核心概念: - RDD(弹性分布式数据集):Spark的基础数据结构,是不可变、分区的数据集合...

    spark-2.4.7-bin-without-hadoop

    为了使用"spark-2.4.7-bin-without-hadoop",你需要首先下载并解压提供的spark-2.4.7-bin-without-hadoop.tgz文件。解压后,你可以找到包含Spark所有组件的目录结构,包括Spark的可执行文件、配置文件以及相关的库...

    spark-assembly-1.5.2-hadoop2.6.0.jar

    Spark-assembly-1.5.2-hadoop2.6.0.jar中的优化包括RDD(弹性分布式数据集)的缓存策略、Task调度优化、内存管理优化等,以确保在大数据处理中实现高效的性能。 7. 开发和调试: 开发者在本地开发时,可以直接...

    phoenix-client-hbase-2.2-5.1.2.jar

    phoenix-client-hbase-2.2-5.1.2.jar

    spark-streaming-kafka-0-8_2.11-2.4.0.jar

    spark-streaming-kafka-0-8_2.11-2.4.0.jar

    hbase-client-2.1.0-cdh6.3.0.jar

    hbase-client-2.1.0-cdh6.3.0.jar

Global site tag (gtag.js) - Google Analytics