一:通过Spark-shell运行程序来观察TaskScheduler内幕
1,当我们启动Spark-shell本身的时候命令终端反馈回来的主要是ClientEndpoint和SparkDeploySchedulerBackend,这是因为此时还没有任何Job的触发,这是启动Application本身而已,所以主要就是实例化SparkContext并注册当前的应用程序给Master且从集群中获得ExecutorBackend计算资源;
2,DAGScheduler划分好Stage后会通过TaskSchedulerImpl中的TaskSetManager来管理当前要运行的Stage中的所有任务TaskSet,TaskSetManager会根据locality aware来为Task分配计算资源、监控Task的执行状态(例如重试、慢任务进行推测式执行等)
TaskSet.scala - L9 |
/**
只是一个简单的数据结构。 包含了高层调度器交给底层调度器的包含了哪些成员; val priority: Int, //Pool调度池中规定了Stage的优先级
|
TaskSetManager.scala |
private[spark] class TaskSetManager(
|
|
我们期望最好 是 PROCESS_LOCAL ,最大化利用内存资源 |
二:TaskScheduler与SchedulerBackend
<!--[if !supportLists]-->1, <!--[endif]-->总体的底层任务调度的过程如下:
<!--[if !supportLists]-->a) <!--[endif]-->TaskSchedulerImpl.submitTasks:主要的作用是将TaskSet加入到TaskSetManager中进行管理;
|
if (tasks.size > 0) {
|
|
|
override def submitTasks(taskSet: TaskSet) {
|
private[spark] class TaskSchedulerImpl( def this(sc: SparkContext) = this(sc, sc.conf.getInt("spark.task.maxFailures", 4)) //任务默认失败重试次数是 4次 |
<!--[if !supportLists]-->b) <!--[endif]-->SchedulableBuilder.addTaskSetManager:SchedulableBuilder会确定TaskSetManager的调度顺序,然后按照TaskSetManager的locality aware来确定每个Task具体运行在哪个ExecutorBackend中;
<!--[if !supportLists]-->c) <!--[endif]-->CoarseGrainedSchedulerBackend.reviveOffers:给DriverEndpoint发送ReviveOffers,ReviveOffers本身是一个空的case object对象,只是起到触发底层资源调度的作用,在有Task提交或者计算资源变动的时候会发送ReviveOffers这个消息作为触发器;
<!--[if !supportLists]-->d) <!--[endif]-->在DriverEndpoint接受ReviveOffers消息并路由到makeOffers具体的方法中:在makeOffers方法中首先准备好所有可以用于计算的workOffers(代表了所有可用ExecutorBackend中可以使用的Cores等信息)
<!--[if !supportLists]-->e) <!--[endif]-->TaskSchedulerImpl.resourceOffers:为每一个Task具体分配计算资源,输入是ExecutorBackend及其上可用的Cores,输出TaskDescription的二维数组,在其中确定了每个Task具体运行在哪个ExecutorBackend;resourceOffers到底是如何确定Task具体运行在哪个ExecutorBackend上的呢?算法的实现具体如下:
<!--[if !supportLists]-->i. <!--[endif]-->通过Random.shuffle方法重新洗牌所有的计算资源以寻求计算的负载均衡;
<!--[if !supportLists]-->ii. <!--[endif]-->根据每个ExecutorBackend的cores的个数声明类型为TaskDescription的ArrayBuffer数组;
<!--[if !supportLists]-->iii. <!--[endif]-->如果有新的ExecutorBackend分配给我们的Job,此时会调用executorAdded来获得最新的完整的可用计算计算资源;
<!--[if !supportLists]-->iv. <!--[endif]-->通过下述代码最求最高级别的优先级本地性:
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
do {
launchedTask = resourceOfferSingleTaskSet(
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
} while (launchedTask)
}
<!--[if !supportLists]-->v. <!--[endif]-->通过调用TaskSetManager的resourceOffer最终确定每个Task具体运行在哪个ExecutorBackend的具体的Locality Level;
f)通过launchTasks把任务发送给ExecutorBackend去执行;
补充:
<!--[if !supportLists]-->1, <!--[endif]-->Task默认的最大重试次数是4次:
def this(sc: SparkContext) = this(sc, sc.conf.getInt("spark.task.maxFailures", 4))
<!--[if !supportLists]-->2, <!--[endif]-->Spark应用程序目前支持两种调度器:FIFO、FAIR,可以通过spark-env.sh中spark.scheduler.mode进行具体的设置,默认情况下是FIFO的方式:
private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")
val schedulingMode: SchedulingMode = try {
SchedulingMode.withName(schedulingModeConf.toUpperCase)
<!--[if !supportLists]-->3, <!--[endif]-->TaskScheduler中要负责为Task分配计算资源:此时程序已经具备集群中的计算资源了,根据计算本地性原则确定Task具体要运行在哪个ExecutorBackend中;
<!--[if !supportLists]-->4, <!--[endif]-->TaskDescription中已经确定好了Task具体要运行在哪个ExecutorBackend上:
private[spark] class TaskDescription(
val taskId: Long,
val attemptNumber: Int,
val executorId: String,
val name: String,
val index: Int, // Index within this task's TaskSet
_serializedTask: ByteBuffer)
extends Serializable {
而确定Task具体运行在哪个ExecutorBackend上的算法是由TaskSetManager的resourceOffer方法决定
<!--[if !supportLists]-->5, <!--[endif]-->数据本地优先级从高到底以此为:优先级高低排: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY,其中NO_PREF是指机器本地性
<!--[if !supportLists]-->6, <!--[endif]-->每个Task默认是采用一个线程进行计算的:
// CPUs to request per task
val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
<!--[if !supportLists]-->7, <!--[endif]-->DAGScheduler是从数据层面考虑preferedLocation的,而TaskScheduler是从具体计算Task角度考虑计算的本地性;
<!--[if !supportLists]-->8, <!--[endif]-->Task进行广播时候的AkkFrameSize大小是128MB,如果任务大于等于128MB-200K的话则Task会直接被丢弃掉;如果小于128MB-200K的话会通过CoarseGrainedSchedulerBackend去launchTask到具体的ExecutorBackend上;
相关推荐
Arduino-TaskScheduler.zip,Arduino、ESPX和STM32微控制器任务调度器的协同多任务处理,Arduino是一家开源软硬件公司和制造商社区。Arduino始于21世纪初,深受电子制造商的欢迎,Arduino通过开源系统提供了很多灵活性...
总结来说,`TaskScheduler`组件是.NET框架中用于计划任务的重要工具,它提供了丰富的功能和灵活性,可以帮助开发者构建高效、可维护的计划任务解决方案。通过深入理解和有效利用这个组件,可以优化应用程序的后台...
TaskScheduler_适用于Arduino+ESPx+STM32+nRF+其他微控制器的协作多任务处理器_优质嵌入式项目分享
TaskSchedulerTaskScheduler,它决定了task该如何被调度,而在.net framework中有两种系统定义Scheduler,第一个是Task默认的ThreadPoolTaskScheduler,还是一种SynchronizationContextTaskScheduler,以及这两种...
C#利用Interop.TaskScheduler.dll添加删除计划任务,可实现程序随Windows系统自动启动; 项目用VS2017打开,需要.net 2.0支持,需要管理员权限;支持win7 win10;不支持xp。
`TaskScheduler`是一个专为Android设计的轻量级、高效的异步任务处理库,它已经被广泛应用在拥有百万日活跃用户的线上项目中,充分证明了其稳定性和实用性。 **1. 异步编程的重要性** 在Android平台上,主线程(UI...
.archMicrosoft.Win32.TaskScheduler.dll
《Spark软件组件架构图及Task Scheduler架构解析》 Spark,作为大数据处理领域的明星框架,以其高效、易用和可扩展性赢得了广泛的认可。本文将深入探讨Spark的核心组件架构及其Task Scheduler的设计原理,帮助读者...
Basic class for using the Microsoft Task Scheduler(33KB)
Windows 8 SDK 此 SDK 于 2012 年 11 月发布,可用于创建适用于 Windows 8 或更早版本的 Windows 应用 () 使用 Web 技术、本机和托管代码;或使用本机或托管编程模型的桌面应用。
本文将深入探讨“proj.zip_algorithms_scheduler_task scheduler”所涉及的知识点,包括任务调度的基本概念、绝对优先级算法以及如何实现一个基于算法的任务调度器。 一、任务调度基本概念 任务调度器,也称为作业...
Spring Boot的`TaskScheduler`提供了灵活的方式来管理定时任务,无论是简单的周期性执行,还是基于复杂规则的调度。通过自定义配置和实现,你可以根据项目的具体需求定制任务调度策略,使得任务执行更加高效和可控。...
在“PSO-Task-scheduler-master”项目中,开发者已经实现了这样一个基于粒子群优化的调度器,通过分析其代码结构和算法实现,我们可以学习到如何将PSO理论应用于实际问题,解决复杂的资源调度优化挑战。 总结,粒子...
"今天不要延迟()使用Arduino Task Scheduler!-项目开发"这一标题暗示了在编程时应该尽早引入任务调度器,以避免代码混乱和效率低下,通常称为“意大利面代码”。 任务调度器是一种工具或库,它允许开发者定义多...
TaskScheduler库是一个强大的工具,专为Arduino、ESPx(包括ESP8266)和STM32微控制器设计,用于实现协作式多任务处理。在嵌入式系统中,多任务处理是必不可少的功能,它允许同时执行多个独立的任务,提高了系统的...
您所需要做的就是通过简单的配置定义参数组合,在我们的框架中编写测试代码,然后让GPU Task Scheduler为您并行运行测试。 如果您已经有一个测试代码,请不要担心。 将您的测试代码迁移到我们的框架非常容易。 ...
源码剖析对于理解其内部工作机制和优化应用性能至关重要。本篇文章将深入探讨Spark的核心组件、架构设计以及关键算法,帮助读者从源码层面了解Spark的强大功能。 1. **Spark架构概述** Spark基于弹性分布式数据集...
《深入理解dotnet-SharpTask:与TaskScheduler服务API交互及CobaltStrike兼容性》 在.NET开发领域,有时我们需要对系统任务进行自动化管理,例如定时执行某些操作或者与安全工具集成。dotnet-SharpTask就是这样一个...