`
zhangym195
  • 浏览: 123389 次
  • 性别: Icon_minigender_1
  • 来自: 黑龙江
社区版块
存档分类
最新评论

读源码剖析TaskScheduler运行内幕及本地性算法确定

阅读更多

一:通过Spark-shell运行程序来观察TaskScheduler内幕

1,当我们启动Spark-shell本身的时候命令终端反馈回来的主要是ClientEndpointSparkDeploySchedulerBackend,这是因为此时还没有任何Job的触发,这是启动Application本身而已,所以主要就是实例化SparkContext并注册当前的应用程序给Master且从集群中获得ExecutorBackend计算资源;

 

2DAGScheduler划分好Stage后会通过TaskSchedulerImpl中的TaskSetManager来管理当前要运行的Stage中的所有任务TaskSetTaskSetManager会根据locality aware来为Task分配计算资源、监控Task的执行状态(例如重试、慢任务进行推测式执行等)

TaskSet.scala - L9

/**
 * A set of tasks submitted together to the low-level TaskScheduler, usually representing
 * missing partitions of a particular stage.
 */
private[spark] class TaskSet(
    val tasks: Array[Task[_]],
    val stageId: Int,
    val stageAttemptId: Int,
    val priority: Int, //Pool调度池中规定了Stage的优先级
    val properties: Properties) 

 

只是一个简单的数据结构。 包含了高层调度器交给底层调度器的包含了哪些成员;

val priority: Int, //Pool调度池中规定了Stage的优先级

 

TaskSetManager.scala

private[spark] class TaskSetManager(
    sched: TaskSchedulerImpl,
    val taskSet: TaskSet,
    val maxTaskFailures: Int, //任务失败重试次数
    clock: Clock = new SystemClock())
  extends Schedulable with Logging {

 

 

 

我们期望最好 是 PROCESS_LOCAL ,最大化利用内存资源

 

二:TaskSchedulerSchedulerBackend

 
 

 

 

<!--[if !supportLists]-->1, <!--[endif]-->总体的底层任务调度的过程如下:

<!--[if !supportLists]-->a) <!--[endif]-->TaskSchedulerImpl.submitTasks:主要的作用是将TaskSet加入到TaskSetManager中进行管理;

 

if (tasks.size > 0) {
  logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
  stage.pendingPartitions ++= tasks.map(_.partitionId)
  logDebug("New pending partitions: " + stage.pendingPartitions)
  taskScheduler.submitTasks(new TaskSet(
    tasks.toArraystage.idstage.latestInfo.attemptIdjobIdproperties))
  stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
else {

 

 

 

override def submitTasks(taskSet: TaskSet) {
  val tasks = taskSet.tasks
  logInfo("Adding task set " + taskSet.id " with " + tasks.length + " tasks")
  this.synchronized {
    val manager = createTaskSetManager(taskSetmaxTaskFailures)
    val stage = taskSet.stageId
    val stageTaskSets =
      taskSetsByStageIdAndAttempt.getOrElseUpdate(stagenew HashMap[Int, TaskSetManager])
    stageTaskSets(taskSet.stageAttemptId) = manager
    val conflictingTaskSet = stageTaskSets.exists { case (_ts) =>
      ts.taskSet != taskSet && !ts.isZombie
    }

 

private[spark] class TaskSchedulerImpl(
    val sc: SparkContext,
    val maxTaskFailures: Int,
    isLocal: Boolean false)
  extends TaskScheduler with Logging
{
  def this(sc: SparkContext) = this(scsc.conf.getInt("spark.task.maxFailures"4)) 

  val conf = sc.conf

  // How often to check for speculative tasks
  val SPECULATION_INTERVAL_MS conf.getTimeAsMs("spark.speculation.interval""100ms")

  private val speculationScheduler =
    ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation")

 def this(sc: SparkContext) = this(sc, sc.conf.getInt("spark.task.maxFailures", 4))  //任务默认失败重试次数是 4

 

<!--[if !supportLists]-->b) <!--[endif]-->SchedulableBuilder.addTaskSetManagerSchedulableBuilder会确定TaskSetManager的调度顺序,然后按照TaskSetManagerlocality aware来确定每个Task具体运行在哪个ExecutorBackend中;

<!--[if !supportLists]-->c) <!--[endif]-->CoarseGrainedSchedulerBackend.reviveOffers:DriverEndpoint发送ReviveOffersReviveOffers本身是一个空的case object对象,只是起到触发底层资源调度的作用,在有Task提交或者计算资源变动的时候会发送ReviveOffers这个消息作为触发器;

<!--[if !supportLists]-->d) <!--[endif]-->DriverEndpoint接受ReviveOffers消息并路由到makeOffers具体的方法中:在makeOffers方法中首先准备好所有可以用于计算的workOffers(代表了所有可用ExecutorBackend中可以使用的Cores等信息)

<!--[if !supportLists]-->e) <!--[endif]-->TaskSchedulerImpl.resourceOffers:为每一个Task具体分配计算资源,输入是ExecutorBackend及其上可用的Cores,输出TaskDescription的二维数组,在其中确定了每个Task具体运行在哪个ExecutorBackendresourceOffers到底是如何确定Task具体运行在哪个ExecutorBackend上的呢?算法的实现具体如下:

<!--[if !supportLists]-->i. <!--[endif]-->通过Random.shuffle方法重新洗牌所有的计算资源以寻求计算的负载均衡;

<!--[if !supportLists]-->ii. <!--[endif]-->根据每个ExecutorBackendcores的个数声明类型为TaskDescriptionArrayBuffer数组;

<!--[if !supportLists]-->iii. <!--[endif]-->如果有新的ExecutorBackend分配给我们的Job,此时会调用executorAdded来获得最新的完整的可用计算计算资源;

<!--[if !supportLists]-->iv. <!--[endif]-->通过下述代码最求最高级别的优先级本地性:

for (taskSet <- sortedTaskSetsmaxLocality <- taskSet.myLocalityLevels) {
  do {
    launchedTask = resourceOfferSingleTaskSet(
        taskSetmaxLocalityshuffledOffersavailableCpustasks)
  } while (launchedTask)
}

 

<!--[if !supportLists]-->v. <!--[endif]-->通过调用TaskSetManagerresourceOffer最终确定每个Task具体运行在哪个ExecutorBackend的具体的Locality Level

f)通过launchTasks把任务发送给ExecutorBackend去执行;

 

 

 

 

 

 

补充:

<!--[if !supportLists]-->1, <!--[endif]-->Task默认的最大重试次数是4次:

def this(sc: SparkContext) = this(scsc.conf.getInt("spark.task.maxFailures"4))

 

<!--[if !supportLists]-->2, <!--[endif]-->Spark应用程序目前支持两种调度器:FIFOFAIR,可以通过spark-env.shspark.scheduler.mode进行具体的设置,默认情况下是FIFO的方式:

private val schedulingModeConf conf.get("spark.scheduler.mode""FIFO")
val schedulingModeSchedulingMode try {
  SchedulingMode.withName(schedulingModeConf.toUpperCase)

<!--[if !supportLists]-->3, <!--[endif]-->TaskScheduler中要负责为Task分配计算资源:此时程序已经具备集群中的计算资源了,根据计算本地性原则确定Task具体要运行在哪个ExecutorBackend中; 

<!--[if !supportLists]-->4, <!--[endif]-->TaskDescription中已经确定好了Task具体要运行在哪个ExecutorBackend上:

private[spark] class TaskDescription(
    val taskId: Long,
    val attemptNumber: Int,
    val executorId: String,
    val name: String,
    val index: Int,    // Index within this task's TaskSet
    _serializedTask: ByteBuffer)
  extends Serializable {

 

而确定Task具体运行在哪个ExecutorBackend上的算法是由TaskSetManagerresourceOffer方法决定

 

<!--[if !supportLists]-->5, <!--[endif]-->数据本地优先级从高到底以此为:优先级高低排: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY,其中NO_PREF是指机器本地性

<!--[if !supportLists]-->6, <!--[endif]-->每个Task默认是采用一个线程进行计算的:

// CPUs to request per task
val CPUS_PER_TASK conf.getInt("spark.task.cpus"1)

 

<!--[if !supportLists]-->7, <!--[endif]-->DAGScheduler是从数据层面考虑preferedLocation的,而TaskScheduler是从具体计算Task角度考虑计算的本地性;

<!--[if !supportLists]-->8, <!--[endif]-->Task进行广播时候的AkkFrameSize大小是128MB,如果任务大于等于128MB-200K的话则Task会直接被丢弃掉;如果小于128MB-200K的话会通过CoarseGrainedSchedulerBackendlaunchTask到具体的ExecutorBackend上;

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

分享到:
评论

相关推荐

    Arduino-TaskScheduler.zip

    Arduino-TaskScheduler.zip,Arduino、ESPX和STM32微控制器任务调度器的协同多任务处理,Arduino是一家开源软硬件公司和制造商社区。Arduino始于21世纪初,深受电子制造商的欢迎,Arduino通过开源系统提供了很多灵活性...

    使用编程方式添加计划任务的.net组件taskscheduler

    总结来说,`TaskScheduler`组件是.NET框架中用于计划任务的重要工具,它提供了丰富的功能和灵活性,可以帮助开发者构建高效、可维护的计划任务解决方案。通过深入理解和有效利用这个组件,可以优化应用程序的后台...

    TaskScheduler

    TaskSchedulerTaskScheduler,它决定了task该如何被调度,而在.net framework中有两种系统定义Scheduler,第一个是Task默认的ThreadPoolTaskScheduler,还是一种SynchronizationContextTaskScheduler,以及这两种...

    C#利用Interop.TaskScheduler.dll添加删除计划任务

    C#利用Interop.TaskScheduler.dll添加删除计划任务,可实现程序随Windows系统自动启动; 项目用VS2017打开,需要.net 2.0支持,需要管理员权限;支持win7 win10;不支持xp。

    Android-TaskScheduler一个简洁实用方便的Android异步处理库

    `TaskScheduler`是一个专为Android设计的轻量级、高效的异步任务处理库,它已经被广泛应用在拥有百万日活跃用户的线上项目中,充分证明了其稳定性和实用性。 **1. 异步编程的重要性** 在Android平台上,主线程(UI...

    .archMicrosoft.Win32.TaskScheduler.dll

    .archMicrosoft.Win32.TaskScheduler.dll

    spark Software Components架构图及Task Scheduler架构

    《Spark软件组件架构图及Task Scheduler架构解析》 Spark,作为大数据处理领域的明星框架,以其高效、易用和可扩展性赢得了广泛的认可。本文将深入探讨Spark的核心组件架构及其Task Scheduler的设计原理,帮助读者...

    Basic class for using the Microsoft Task Scheduler(33KB)

    Basic class for using the Microsoft Task Scheduler(33KB)

    proj.zip_algorithms_scheduler_task scheduler

    本文将深入探讨“proj.zip_algorithms_scheduler_task scheduler”所涉及的知识点,包括任务调度的基本概念、绝对优先级算法以及如何实现一个基于算法的任务调度器。 一、任务调度基本概念 任务调度器,也称为作业...

    TaskScheduler:使用Spring Boot的TaskScheduler

    Spring Boot的`TaskScheduler`提供了灵活的方式来管理定时任务,无论是简单的周期性执行,还是基于复杂规则的调度。通过自定义配置和实现,你可以根据项目的具体需求定制任务调度策略,使得任务执行更加高效和可控。...

    PSO-Task-scheduler-master_粒子群算法_粒子群调度_PSO_资源调度优化_优化调度_

    在“PSO-Task-scheduler-master”项目中,开发者已经实现了这样一个基于粒子群优化的调度器,通过分析其代码结构和算法实现,我们可以学习到如何将PSO理论应用于实际问题,解决复杂的资源调度优化挑战。 总结,粒子...

    interop.taskscheduler.dll

    Windows 8 SDK 此 SDK 于 2012 年 11 月发布,可用于创建适用于 Windows 8 或更早版本的 Windows 应用 () 使用 Web 技术、本机和托管代码;或使用本机或托管编程模型的桌面应用。

    TaskScheduler:适用于Arduino,ESPx和STM32微控制器的协作式多任务处理

    TaskScheduler库是一个强大的工具,专为Arduino、ESPx(包括ESP8266)和STM32微控制器设计,用于实现协作式多任务处理。在嵌入式系统中,多任务处理是必不可少的功能,它允许同时执行多个独立的任务,提高了系统的...

    今天不要延迟()使用Arduino Task Scheduler!-项目开发

    "今天不要延迟()使用Arduino Task Scheduler!-项目开发"这一标题暗示了在编程时应该尽早引入任务调度器,以避免代码混乱和效率低下,通常称为“意大利面代码”。 任务调度器是一种工具或库,它允许开发者定义多...

    GPUTaskScheduler:GPU Task Scheduler(Python库)

    您所需要做的就是通过简单的配置定义参数组合,在我们的框架中编写测试代码,然后让GPU Task Scheduler为您并行运行测试。 如果您已经有一个测试代码,请不要担心。 将您的测试代码迁移到我们的框架非常容易。 ...

    Apache Spark源码剖析

    源码剖析对于理解其内部工作机制和优化应用性能至关重要。本篇文章将深入探讨Spark的核心组件、架构设计以及关键算法,帮助读者从源码层面了解Spark的强大功能。 1. **Spark架构概述** Spark基于弹性分布式数据集...

    Calamus.TaskScheduler:基于Asp.Net Core 5.0采用Quartz.Net编写的开源任务调度Web管理平台

    Calamus.TaskScheduler 基于Asp.Net Core 5.0采用Quartz.Net编写的开源任务调度Web管理平台 部署步骤 1,创建持久化数据库(以MySQL为例) -创建数据库石英,Charset为utf8mb4 -根据数据库/表/表_mysql_innodb.sql...

    dotnet-SharpTask是一个简单的代码集可与TaskScheduler服务api交互并与CobaltStrike兼容

    《深入理解dotnet-SharpTask:与TaskScheduler服务API交互及CobaltStrike兼容性》 在.NET开发领域,有时我们需要对系统任务进行自动化管理,例如定时执行某些操作或者与安全工具集成。dotnet-SharpTask就是这样一个...

Global site tag (gtag.js) - Google Analytics