`
zhangym195
  • 浏览: 123666 次
  • 性别: Icon_minigender_1
  • 来自: 黑龙江
社区版块
存档分类
最新评论

读源码剖析TaskScheduler运行内幕及本地性算法确定

阅读更多

一:通过Spark-shell运行程序来观察TaskScheduler内幕

1,当我们启动Spark-shell本身的时候命令终端反馈回来的主要是ClientEndpointSparkDeploySchedulerBackend,这是因为此时还没有任何Job的触发,这是启动Application本身而已,所以主要就是实例化SparkContext并注册当前的应用程序给Master且从集群中获得ExecutorBackend计算资源;

 

2DAGScheduler划分好Stage后会通过TaskSchedulerImpl中的TaskSetManager来管理当前要运行的Stage中的所有任务TaskSetTaskSetManager会根据locality aware来为Task分配计算资源、监控Task的执行状态(例如重试、慢任务进行推测式执行等)

TaskSet.scala - L9

/**
 * A set of tasks submitted together to the low-level TaskScheduler, usually representing
 * missing partitions of a particular stage.
 */
private[spark] class TaskSet(
    val tasks: Array[Task[_]],
    val stageId: Int,
    val stageAttemptId: Int,
    val priority: Int, //Pool调度池中规定了Stage的优先级
    val properties: Properties) 

 

只是一个简单的数据结构。 包含了高层调度器交给底层调度器的包含了哪些成员;

val priority: Int, //Pool调度池中规定了Stage的优先级

 

TaskSetManager.scala

private[spark] class TaskSetManager(
    sched: TaskSchedulerImpl,
    val taskSet: TaskSet,
    val maxTaskFailures: Int, //任务失败重试次数
    clock: Clock = new SystemClock())
  extends Schedulable with Logging {

 

 

 

我们期望最好 是 PROCESS_LOCAL ,最大化利用内存资源

 

二:TaskSchedulerSchedulerBackend

 
 

 

 

<!--[if !supportLists]-->1, <!--[endif]-->总体的底层任务调度的过程如下:

<!--[if !supportLists]-->a) <!--[endif]-->TaskSchedulerImpl.submitTasks:主要的作用是将TaskSet加入到TaskSetManager中进行管理;

 

if (tasks.size > 0) {
  logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
  stage.pendingPartitions ++= tasks.map(_.partitionId)
  logDebug("New pending partitions: " + stage.pendingPartitions)
  taskScheduler.submitTasks(new TaskSet(
    tasks.toArraystage.idstage.latestInfo.attemptIdjobIdproperties))
  stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
else {

 

 

 

override def submitTasks(taskSet: TaskSet) {
  val tasks = taskSet.tasks
  logInfo("Adding task set " + taskSet.id " with " + tasks.length + " tasks")
  this.synchronized {
    val manager = createTaskSetManager(taskSetmaxTaskFailures)
    val stage = taskSet.stageId
    val stageTaskSets =
      taskSetsByStageIdAndAttempt.getOrElseUpdate(stagenew HashMap[Int, TaskSetManager])
    stageTaskSets(taskSet.stageAttemptId) = manager
    val conflictingTaskSet = stageTaskSets.exists { case (_ts) =>
      ts.taskSet != taskSet && !ts.isZombie
    }

 

private[spark] class TaskSchedulerImpl(
    val sc: SparkContext,
    val maxTaskFailures: Int,
    isLocal: Boolean false)
  extends TaskScheduler with Logging
{
  def this(sc: SparkContext) = this(scsc.conf.getInt("spark.task.maxFailures"4)) 

  val conf = sc.conf

  // How often to check for speculative tasks
  val SPECULATION_INTERVAL_MS conf.getTimeAsMs("spark.speculation.interval""100ms")

  private val speculationScheduler =
    ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation")

 def this(sc: SparkContext) = this(sc, sc.conf.getInt("spark.task.maxFailures", 4))  //任务默认失败重试次数是 4

 

<!--[if !supportLists]-->b) <!--[endif]-->SchedulableBuilder.addTaskSetManagerSchedulableBuilder会确定TaskSetManager的调度顺序,然后按照TaskSetManagerlocality aware来确定每个Task具体运行在哪个ExecutorBackend中;

<!--[if !supportLists]-->c) <!--[endif]-->CoarseGrainedSchedulerBackend.reviveOffers:DriverEndpoint发送ReviveOffersReviveOffers本身是一个空的case object对象,只是起到触发底层资源调度的作用,在有Task提交或者计算资源变动的时候会发送ReviveOffers这个消息作为触发器;

<!--[if !supportLists]-->d) <!--[endif]-->DriverEndpoint接受ReviveOffers消息并路由到makeOffers具体的方法中:在makeOffers方法中首先准备好所有可以用于计算的workOffers(代表了所有可用ExecutorBackend中可以使用的Cores等信息)

<!--[if !supportLists]-->e) <!--[endif]-->TaskSchedulerImpl.resourceOffers:为每一个Task具体分配计算资源,输入是ExecutorBackend及其上可用的Cores,输出TaskDescription的二维数组,在其中确定了每个Task具体运行在哪个ExecutorBackendresourceOffers到底是如何确定Task具体运行在哪个ExecutorBackend上的呢?算法的实现具体如下:

<!--[if !supportLists]-->i. <!--[endif]-->通过Random.shuffle方法重新洗牌所有的计算资源以寻求计算的负载均衡;

<!--[if !supportLists]-->ii. <!--[endif]-->根据每个ExecutorBackendcores的个数声明类型为TaskDescriptionArrayBuffer数组;

<!--[if !supportLists]-->iii. <!--[endif]-->如果有新的ExecutorBackend分配给我们的Job,此时会调用executorAdded来获得最新的完整的可用计算计算资源;

<!--[if !supportLists]-->iv. <!--[endif]-->通过下述代码最求最高级别的优先级本地性:

for (taskSet <- sortedTaskSetsmaxLocality <- taskSet.myLocalityLevels) {
  do {
    launchedTask = resourceOfferSingleTaskSet(
        taskSetmaxLocalityshuffledOffersavailableCpustasks)
  } while (launchedTask)
}

 

<!--[if !supportLists]-->v. <!--[endif]-->通过调用TaskSetManagerresourceOffer最终确定每个Task具体运行在哪个ExecutorBackend的具体的Locality Level

f)通过launchTasks把任务发送给ExecutorBackend去执行;

 

 

 

 

 

 

补充:

<!--[if !supportLists]-->1, <!--[endif]-->Task默认的最大重试次数是4次:

def this(sc: SparkContext) = this(scsc.conf.getInt("spark.task.maxFailures"4))

 

<!--[if !supportLists]-->2, <!--[endif]-->Spark应用程序目前支持两种调度器:FIFOFAIR,可以通过spark-env.shspark.scheduler.mode进行具体的设置,默认情况下是FIFO的方式:

private val schedulingModeConf conf.get("spark.scheduler.mode""FIFO")
val schedulingModeSchedulingMode try {
  SchedulingMode.withName(schedulingModeConf.toUpperCase)

<!--[if !supportLists]-->3, <!--[endif]-->TaskScheduler中要负责为Task分配计算资源:此时程序已经具备集群中的计算资源了,根据计算本地性原则确定Task具体要运行在哪个ExecutorBackend中; 

<!--[if !supportLists]-->4, <!--[endif]-->TaskDescription中已经确定好了Task具体要运行在哪个ExecutorBackend上:

private[spark] class TaskDescription(
    val taskId: Long,
    val attemptNumber: Int,
    val executorId: String,
    val name: String,
    val index: Int,    // Index within this task's TaskSet
    _serializedTask: ByteBuffer)
  extends Serializable {

 

而确定Task具体运行在哪个ExecutorBackend上的算法是由TaskSetManagerresourceOffer方法决定

 

<!--[if !supportLists]-->5, <!--[endif]-->数据本地优先级从高到底以此为:优先级高低排: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY,其中NO_PREF是指机器本地性

<!--[if !supportLists]-->6, <!--[endif]-->每个Task默认是采用一个线程进行计算的:

// CPUs to request per task
val CPUS_PER_TASK conf.getInt("spark.task.cpus"1)

 

<!--[if !supportLists]-->7, <!--[endif]-->DAGScheduler是从数据层面考虑preferedLocation的,而TaskScheduler是从具体计算Task角度考虑计算的本地性;

<!--[if !supportLists]-->8, <!--[endif]-->Task进行广播时候的AkkFrameSize大小是128MB,如果任务大于等于128MB-200K的话则Task会直接被丢弃掉;如果小于128MB-200K的话会通过CoarseGrainedSchedulerBackendlaunchTask到具体的ExecutorBackend上;

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics