yep from [1] we know that spark will divide jobs into two steps to be executed:a.launches executors and b.assigns tasks to that executors by driver.so how do executors are assigned to workers by master is very important!
for standalone mode,when we dive into the src in Master#receiveWithLogging() for case RequestSubmitDriver you will figure out it.
1.what
while you step more ,u will see the details in the code path below:
/**相比hadoop中的mr slots,spark分配executors显得智能了:后者是按照cores,mem总体要求进行全集群分配, * 并且资源多的workers分配更多exers.很明显,这里不是类似hadoop那样按照splits数量进行; 另外也比hadoop slots更智能些. * Schedule executors to be launched on the workers.-note:here will not clear out the assigned app. * vip==> spread out purpose: * There are two modes of launching executors. The first attempts to spread out an application's * executors on as many workers as possible, while the second does the opposite (i.e. launch them * on as few workers as possible). The former is usually better for data locality purposes and is * the default.<== * * The number of cores assigned to each executor is configurable. When this is explicitly set, * multiple executors from the same application may be launched on the same worker if the worker * has enough cores and memory. Otherwise, each executor grabs all the cores available on the * worker by default, in which case only one executor may be launched on each worker. */ private def startExecutorsOnWorkers(): Unit = { // Right now this is a very simple <<FIFO scheduler>>. We keep trying to fit in the first app // in the queue, then the second app, etc. if (spreadOutApps) { //- how to present the meaning of 'spread out'? see loop 'while()' // Try to spread out each app among all the workers, until it has all its cores for (app <- waitingApps if app.coresLeft > 0) { //深度置后 //1 //-workers satisfied the need of a executor;reverse order to balance worker's load //-this filters limit thats a worker's mem and free cores must satisfy at least one executor's mem and cpus. val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1)) .sortBy(_.coresFree).reverse //-the more free cores of worker the more priority it has //-2 balance the resources asked to spread out to cluster as far as possible val numUsable = usableWorkers.length val assigned = new Array[Int](numUsable) // Number of cores to give on each node //-here means if app.coresLeft > sum(workers'cores),more than one exectors will be reassigned in one worker // in next round.app.coresLeft can be thinked as the property spark.cores.max per app var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) //-can't determine which one is less var pos = 0 ///-spread out the target cpus(spark.cores.max) across cluster for load balance. while (toAssign > 0) { //-coresFree is never changed,so one work's cpus will be all assigned as far as possible if (usableWorkers(pos).coresFree - assigned(pos) > 0) {//-app.coresLeft is a multiple of one exer core?needless toAssign -= 1 assigned(pos) += 1 } pos = (pos + 1) % numUsable } //3 Now that we've decided how many cores to give on each node, let's actually give them //严格按照worker本身的cpus and mem资源来分配exers,若果不够一个executor资源要求则不分配 for (pos <- 0 until numUsable if assigned(pos) > 0) { //广度优先(横向) //-worker free mem(mainly) and coresFree and app.coresLeft both will be decreased below allocateWorkerResourceToExecutors(app, assigned(pos), usableWorkers(pos)) } } } else { //-spark.deploy.spreadOut=false will launch 25 executores(ie.50 cores which same as specified) // Pack each app into as few workers as possible until we've assigned all its cores.先逐个worker分配资源.不够再下一个 //-worker.coresFree will be decreased in allocateWorkerResourceToExecutors(); // 实际上每次分配exers到worker时只考虑mem,而cpus是在下一轮分配时再考虑!这意味着stpreadOut=false时单个worker上 // exers占用的cpus可能超过 单个worker配额. for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { //广度置后 for (app <- waitingApps if app.coresLeft > 0) { //深度优先(垂直),若果一个worker资源不满足,进入一个worker继续分配 allocateWorkerResourceToExecutors(app, app.coresLeft, worker) } } } } /**-以worker.memoryFree和参数corsToAllocate为原则生成executors,对于spreadOut=true,cores分配是严格按照worker实际数量进行的. * assign cores and mem to executor by it's reqeusts(core and mem unit). * *对于spreadOut=false,实际上这里分配executors时只考虑了mem而cpus并未考虑, * 只有在下一轮分配exers到worker时才考虑,see startExecutorsOnWorkers().但spreadOut=true时严格按照mem and cpus来分配 * Allocate a worker's resources to one or more executors.-ie several exers may be run on same worker * @param app the info of the application which the executors belong to * @param coresToAllocate cores on this worker to be allocated to this application(-total cores to be assigned to this * worker) * @param worker the worker info */ private def allocateWorkerResourceToExecutors( app: ApplicationInfo, coresToAllocate: Int, worker: WorkerInfo): Unit = { val memoryPerExecutor = app.desc.memoryPerExecutorMB val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresToAllocate) var coresLeft = coresToAllocate ///-stop whichever meet the cpus or mem conditions while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor) { val exec = app.addExecutor(worker, coresPerExecutor) //-here will decrease the app.coresGranted,ie coresLeft coresLeft -= coresPerExecutor launchExecutor(worker, exec) //-here will decrease the number of free core and mem of worker app.state = ApplicationState.RUNNING } }
its meaning by below figure:
2.how about
annotation refered from spark src,it said thtat 'spark.cores.max' is the # cores to be allocated to one app as many as possible.that means there will be a computation bug in spark,ie.(spreadOut=true):
case | spark.cores.max | #workers | #worker cores | #worker mem | coresPerExecutor | memPerExecutor | result |
1 | 10 | 10 | 16 | 16g | 2 | 2g |
failed:no executors be allocated,ie 10/10=1 < 2coresPerExecutor |
2 | 20 |
10 executors allocated at one wave,ie a.20/10=2>=coresPerExecutor,2/2=1 b.10 cores>=2x1 c.16g>=2gx1 |
|||||
3 | 40 | 20 executors at one wave | |||||
4 | 40 | 2 | 16g | 10 exers at one wave,10 exers at othe wave,total is 20 | |||
5 | 40 | 16 | 2g | similar as above | |||
6 | 40 | 20 |
failed, #worker cpus < 20 |
||||
7 | 40 | 2 | 20g |
failed, #worker mem < 20g |
|||
8 | 15 | 10 | 16 | 16g | 2 | 2g |
only 5 executors allocated,ie 15/10=1wave,then15-10=5 that is only 10 cores to be assigned. |
so from case 1 ,8 we know that the cluster has enough resources to allocate exers but in fact no any executors (or no reasonable # executors) to be launched.then you will see something weird occurs:
16/11/18 14:07:10 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 16/11/18 14:07:25 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 16/11/18 14:07:40 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
3.workarounds
a.use a reasonable # of cores ,ie
natural number= cores.max % (#workers x # coresPerExecutor)
b.appends a embedding code block to check cores.max
ie checks this # max whether can be collapsed with prevous wave computations,no matter more or less then coresPerExecutor after assigning cores to workers and before allocating executors.
4.conclusion
no doubt the property 'spark.cores.max' maybe arise certain misunderstands,but u can aovid this case if adopt the solutions above.
in general speaking this property will let spark more intelligent to allocate executors dynamically compared to other yarn computation framework etc.
ref:
[1] [spark-src-core] 4.2 communications b/t certain kernal components
[2] spark调度系列----1. spark stanalone模式下Master对worker上各个executor资源的分配
相关推荐
TFTP(Trivial File Transfer Protocol,简单文件传输协议)是一种轻量级的文件传输协议,常用于设备初始化、固件更新、网络诊断等场景。在RHEL5(Red Hat Enterprise Linux 5)系统中,TFTP服务器的实现主要依赖于`...
描述中的“tftp-server flash:/7942G/apps42.9-4-2ES22.sbn”是一个TFTP服务器上的文件路径,说明在进行设备升级或配置时,会通过TFTP(Trivial File Transfer Protocol)协议从服务器上下载文件“7942G/apps42.9-4-...
然后,通过TFTP(Trivial File Transfer Protocol)服务器或其他方式将.bin文件上传到路由器的闪存中。在路由器上执行升级命令,如“copy tftp flash”来安装新的IOS。 升级过程需要注意以下几点: 1. 在进行升级前...
通过WEB升级通常是指通过设备的网页界面进行,而TFTP(Trivial File Transfer Protocol)升级则通常涉及网络设备之间直接的文件传输。 "如果正常使用中,不建议追新!"这是一个普遍的建议,因为固件更新可能会引入...
"可通过web或者tftp升级"表明用户有两种方式来更新接入点的固件:通过Web界面进行图形化操作,或者使用TFTP (Trivial File Transfer Protocol) 这种简单文件传输协议进行命令行操作。TFTP通常在没有图形界面的环境下...
这种文件通常通过TFTP(Trivial File Transfer Protocol)或FTP服务器上传到路由器,然后通过命令行接口(CLI)进行安装和激活。 标签 "ios_c3640" 明确指出了这个文件与Cisco 3640路由器的IOS相关,这使得该文件...
"unzip-c3725-adventerprisek9-mz.124-15.T5.bin"是实际的文件名,它是一个二进制文件,通常用于通过TFTP(Trivial File Transfer Protocol)或其他网络协议上传到路由器进行安装。在更新或恢复Cisco设备的软件时,...
TFTP(Trivial File Transfer Protocol),简单文件传输协议,是一种轻量级的文件传输协议,主要用于在设备之间进行简单的文件传输,尤其是用于网络配置、软件更新和操作系统映像的加载等场景。在本文中,我们将深入...
4. 工具和脚本:用于自动化设置过程,例如配置TFTP(Trivial File Transfer Protocol)服务器,或者用于创建和管理Windows安装映像的工具。 5. 文档和指南:解释如何部署和使用RIS-Linux 0.4,以及解决常见问题的...
在netkit-tftp-0.172.zip这个压缩包中,包含了构建和运行一个针对arm5718平台的TFTP(Trivial File Transfer Protocol)客户端所需的全部资源。TFTP虽然功能比标准FTP简单,但因其轻量级和易于实现的特性,常在配置...
- 上传固件:通过TFTP(Trivial File Transfer Protocol)或其他方式,将"initramfs-kernel.bin"上传到设备。 - 启动新内核:设备加载新的Initramfs和内核,进入临时环境。 - 上传并应用系统升级:使用sysupgrade...
TFTP(Trivial File Transfer Protocol,简单文件传输协议)是一种轻量级的文件传输协议,主要用于在设备之间进行简单的文件交换,尤其适用于低带宽或简化的网络环境。tftp-hpa-0.29.tar.gz是一个包含了TFTP服务器源...
标题中的"CISCO 无线AP 3802ME模式固件AIR-AP3800-K9-ME-8-10-130-0"指的是Cisco公司的无线接入点(Access Point,简称AP)型号为3802ME的特定固件版本。此固件是针对3800系列AP的,它运行在ME(Managed Enterprise...
压缩包子文件 "c7200-adventerprisek9-mz.124-11T.bin" 是实际的IOS映像文件,解压后将通过TFTP(Trivial File Transfer Protocol)或其他网络传输方式加载到路由器的闪存中,然后通过命令行界面(CLI)进行安装和...
"tap"通常是“Trivial Application Programming Interface”(简单应用编程接口)的缩写,有时也被用于“Tap”的全称"Transport Agnostic Protocol",在数据集成领域,它表示一种可以从数据源导出数据的规范。...
这个名为“vivi-tftp-0.4.tar.gz”的压缩包包含了VIVI的一个更新版本,特别加入了TFTP(Trivial File Transfer Protocol)的支持。TFTP是一种简单且常用的文件传输协议,尤其在网络配置和远程系统初始化时非常实用,...
在信息技术领域,TFTP(Trivial File Transfer Protocol,简单文件传输协议)是一种轻量级的文件传输协议,尤其在配置网络设备、远程更新固件或者在资源有限的环境中进行数据交换时,它的作用显得尤为重要。...
#### 三、TFTP (Trivial File Transfer Protocol) 的使用 #### 一、TFTP 简介 TFTP(简单文件传输协议)是一种轻量级的文件传输协议,主要用于无盘工作站的启动、路由器等网络设备的软件更新。TFTP基于UDP协议,...
2. TFTP(Trivial File Transfer Protocol):这是一个简单的文件传输协议,适用于小型设备或低带宽环境。在`TFTPClient`类中,我们可以看到对TFTP协议的实现。 3. Telnet(远程登录协议):`TelnetClient`类提供了...
使用这个文件通常需要通过TFTP(Trivial File Transfer Protocol)服务器进行传输,然后通过命令行界面(CLI)或网络设备管理工具如Cisco的SDM(Secure Device Manager)或CLI命令来执行升级。 在部署或升级此IOS时...