Deploy模块整体架构
deploy模块主要包含3个子模块:master, worker, client。他们继承于Actor
,通过actor实现互相之间的通信。
- Master:master的主要功能是接收worker的注册并管理所有的worker,接收client提交的application,(FIFO)调度等待的application并向worker提交。
-
Worker:worker的主要功能是向master注册自己,根据master发送的application配置进程环境,并启动
StandaloneExecutorBackend
。 -
Client:client的主要功能是向master注册并监控application。当用户创建
SparkContext
时会实例化SparkDeploySchedulerBackend
,而实例化SparkDeploySchedulerBackend
的同时就会启动client,通过向client传递启动参数和application有关信息,client向master发送请求注册application并且在slave node上启动StandaloneExecutorBackend
。
下面来看一下deploy模块的类图:
Deploy模块通信消息
Deploy模块并不复杂,代码也不多,主要集中在各个子模块之间的消息传递和处理上,因此在这里列出了各个模块之间传递的主要消息:
-
client to master
-
RegisterApplication
(向master注册application)
-
-
master to client
-
RegisteredApplication
(作为注册application的reply,回复给client) -
ExecutorAdded
(通知client worker已经启动了Executor环境,当向worker发送LaunchExecutor
后通知client) -
ExecutorUpdated
(通知client Executor状态已经发生变化了,包括结束、异常退出等,当worker向master发送ExecutorStateChanged
后通知client)
-
-
master to worker
-
LaunchExecutor
(发送消息启动Executor环境) -
RegisteredWorker
(作为worker向master注册的reply) -
RegisterWorkerFailed
(作为worker向master注册失败的reply) -
KillExecutor
(发送给worker请求停止executor环境)
-
-
worker to master
-
RegisterWorker
(向master注册自己) -
Heartbeat
(定期向master发送心跳信息) -
ExecutorStateChanged
(向master发送Executor状态改变信息)
-
Deploy模块代码详解
Deploy模块相比于scheduler模块简单,因此对于deploy模块的代码并不做十分细节的分析,只针对application的提交和结束过程做一定的分析。
Client提交application
Client是由SparkDeploySchedulerBackend
创建被启动的,因此client是被嵌入在每一个application中,只为这个applicator所服务,在client启动时首先会先master注册application:
def start(){
// Just launch an actor; it will call back into the listener.
actor = actorSystem.actorOf(Props(newClientActor))
}
overridedef preStart(){
logInfo("Connecting to master "+ masterUrl)
try{
master = context.actorFor(Master.toAkkaUrl(masterUrl))
masterAddress = master.path.address
master !RegisterApplication(appDescription)//向master注册application
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(master)// Doesn't work with remote actors, but useful for testing
}catch{
case e:Exception=>
logError("Failed to connect to master", e)
markDisconnected()
context.stop(self)
}
}
Master在收到RegisterApplication
请求后会把application加到等待队列中,等待调度:
caseRegisterApplication(description)=>{
logInfo("Registering app "+ description.name)
val app = addApplication(description, sender)
logInfo("Registered app "+ description.name +" with ID "+ app.id)
waitingApps += app
context.watch(sender)// This doesn't work with remote actors but helps for testing
sender !RegisteredApplication(app.id)
schedule()
}
Master会在每次操作后调用schedule()
函数,以确保等待的application能够被及时调度。
在前面提到deploy模块是资源管理模块,那么Spark的deploy管理的是什么资源,资源以什么单位进行调度的呢?在当前版本的Spark中,集群的cpu数量是Spark资源管理的一个标准,每个提交的application都会标明自己所需要的资源数(也就是cpu的core数),Master以FIFO的方式管理所有的application请求,当资源数量满足当前任务执行需求的时候该任务就会被调度,否则就继续等待,当然如果master能给予当前任务部分资源则也会启动该application。schedule()
函数实现的就是此功能。
def schedule(){
if(spreadOutApps){
for(app <- waitingApps if app.coresLeft >0){
val usableWorkers = workers.toArray.filter(_.state ==WorkerState.ALIVE)
.filter(canUse(app, _)).sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
val assigned =newArray[Int](numUsable)// Number of cores to give on each node
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
var pos =0
while(toAssign >0){
if(usableWorkers(pos).coresFree - assigned(pos)>0){
toAssign -=1
assigned(pos)+=1
}
pos =(pos +1)% numUsable
}
// Now that we've decided how many cores to give on each node, let's actually give them
for(pos <-0until numUsable){
if(assigned(pos)>0){
val exec= app.addExecutor(usableWorkers(pos), assigned(pos))
launchExecutor(usableWorkers(pos),exec, app.desc.sparkHome)
app.state =ApplicationState.RUNNING
}
}
}
}else{
// Pack each app into as few nodes as possible until we've assigned all its cores
for(worker <- workers if worker.coresFree >0&& worker.state ==WorkerState.ALIVE){
for(app <- waitingApps if app.coresLeft >0){
if(canUse(app, worker)){
val coresToUse = math.min(worker.coresFree, app.coresLeft)
if(coresToUse >0){
val exec= app.addExecutor(worker, coresToUse)
launchExecutor(worker,exec, app.desc.sparkHome)
app.state =ApplicationState.RUNNING
}
}
}
}
}
}
当application得到调度后就会调用launchExecutor()
向worker发送请求,同时向client汇报状态:
def launchExecutor(worker:WorkerInfo,exec:ExecutorInfo, sparkHome:String){
worker.addExecutor(exec)
worker.actor !LaunchExecutor(exec.application.id,exec.id,exec.application.desc,exec.cores,exec.memory, sparkHome)
exec.application.driver !ExecutorAdded(exec.id, worker.id, worker.host,exec.cores,exec.memory)
}
至此client与master的交互已经转向了master与worker的交互,worker需要配置application启动环境
caseLaunchExecutor(appId, execId, appDesc, cores_, memory_, execSparkHome_)=>
val manager =newExecutorRunner(
appId, execId, appDesc, cores_, memory_,self, workerId, ip,newFile(execSparkHome_), workDir)
executors(appId +"/"+ execId)= manager
manager.start()
coresUsed += cores_
memoryUsed += memory_
master !ExecutorStateChanged(appId, execId,ExecutorState.RUNNING,None,None)
Worker在接收到LaunchExecutor
消息后创建ExecutorRunner
实例,同时汇报master executor环境启动。
ExecutorRunner
在启动的过程中会创建线程,配置环境,启动新进程:
def start(){
workerThread =newThread("ExecutorRunner for "+ fullId){
overridedef run(){ fetchAndRunExecutor()}
}
workerThread.start()
// Shutdown hook that kills actors on shutdown.
...
}
def fetchAndRunExecutor(){
try{
// Create the executor's working directory
val executorDir =newFile(workDir, appId +"/"+ execId)
if(!executorDir.mkdirs()){
thrownewIOException("Failed to create directory "+ executorDir)
}
// Launch the process
val command = buildCommandSeq()
val builder =newProcessBuilder(command: _*).directory(executorDir)
val env = builder.environment()
for((key, value)<- appDesc.command.environment){
env.put(key, value)
}
env.put("SPARK_MEM", memory.toString +"m")
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
env.put("SPARK_LAUNCH_WITH_SCALA","0")
process = builder.start()
// Redirect its stdout and stderr to files
redirectStream(process.getInputStream,newFile(executorDir,"stdout"))
redirectStream(process.getErrorStream,newFile(executorDir,"stderr"))
// Wait for it to exit; this is actually a bad thing if it happens, because we expect to run
// long-lived processes only. However, in the future, we might restart the executor a few
// times on the same machine.
val exitCode = process.waitFor()
val message ="Command exited with code "+ exitCode
worker !ExecutorStateChanged(appId, execId,ExecutorState.FAILED,Some(message),
Some(exitCode))
}catch{
case interrupted:InterruptedException=>
logInfo("Runner thread for executor "+ fullId +" interrupted")
case e:Exception=>{
logError("Error running executor", e)
if(process !=null){
process.destroy()
}
val message = e.getClass +": "+ e.getMessage
worker !ExecutorStateChanged(appId, execId,ExecutorState.FAILED,Some(message),None)
}
}
}
在ExecutorRunner
启动后worker向master汇报ExecutorStateChanged
,而master则将消息重新pack成为ExecutorUpdated
发送给client。
至此整个application提交过程基本结束,提交的过程并不复杂,主要涉及到的消息的传递。
Application的结束
由于各种原因(包括正常结束,异常返回等)会造成application的结束,我们现在就来看看applicatoin结束的整个流程。
application的结束往往会造成client的结束,而client的结束会被master通过Actor
检测到,master检测到后会调用removeApplication()
函数进行操作:
def removeApplication(app:ApplicationInfo){
if(apps.contains(app)){
logInfo("Removing app "+ app.id)
apps -= app
idToApp -= app.id
actorToApp -= app.driver
addressToWorker -= app.driver.path.address
completedApps += app // Remember it in our history
waitingApps -= app
for(exec<- app.executors.values){
exec.worker.removeExecutor(exec)
exec.worker.actor !KillExecutor(exec.application.id,exec.id)
}
app.markFinished(ApplicationState.FINISHED)// TODO: Mark it as FAILED if it failed
schedule()
}
}
removeApplicatoin()
首先会将application从master自身所管理的数据结构中删除,其次它会通知每一个work,请求其KillExecutor
。worker在收到KillExecutor
后调用ExecutorRunner
的kill()
函数:
caseKillExecutor(appId, execId)=>
val fullId = appId +"/"+ execId
executors.get(fullId) match {
caseSome(executor)=>
logInfo("Asked to kill executor "+ fullId)
executor.kill()
caseNone=>
logInfo("Asked to kill unknown executor "+ fullId)
}
在ExecutorRunner
内部,它会结束监控线程,同时结束监控线程所启动的进程,并且向worker汇报ExecutorStateChanged
:
def kill(){
if(workerThread !=null){
workerThread.interrupt()
workerThread =null
if(process !=null){
logInfo("Killing process!")
process.destroy()
process.waitFor()
}
worker !ExecutorStateChanged(appId, execId,ExecutorState.KILLED,None,None)
Runtime.getRuntime.removeShutdownHook(shutdownHook)
}
}
Application结束的同时清理了master和worker上的关于该application的所有信息,这样关于application结束的整个流程就介绍完了,当然在这里我们对于许多异常处理分支没有细究,但这并不影响我们对主线的把握。
相关推荐
maven-deploy-plugin-2.8.2.jar
maven-deploy-plugin-2.7.jar
本文将深入探讨FIS3中的`deploy-replace`模块,这是FIS3部署过程中用于替换资源引用路径的组件。 ### 一、FIS3简介 FIS3是一个高效、灵活且强大的前端构建工具,旨在提高开发效率,降低维护成本。它支持多种语言,...
通过分析模块依赖关系,它能够智能地组织代码,确保正确执行。 2. **AMD到CommonJS转换**:虽然AMD在浏览器环境中很流行,但在服务器端或Node.js环境中,CommonJS规范更常见。"fis-deploy-amdpackage"可以将AMD格式...
在这种情况下,`ios-deploy` 工具就派上用场了。`ios-deploy` 是一个命令行工具,允许开发者将Swift或Objective-C编译的iOS应用安装到连接的设备上,并进行调试。 **一、ios-deploy简介** `ios-deploy` 是由GitHub...
文件名“go-deploy-master”表明这是该项目的主分支源代码,通常包含了项目的最新功能和改进。用户在获取并解压此压缩包后,可以按照项目文档进行编译和安装,然后根据提供的指南配置和使用Go-go-deploy。通过定制化...
**iOS部署:超越Xcode的调试利器——ios-deploy** 在iOS开发过程中,Xcode是开发者最常用的工具,它提供了完整的开发环境和设备调试功能。然而,有些情况下,开发者可能需要在不使用Xcode或者在未越狱的设备上安装...
ceph-deploy部署文档(2020最新)centos7 ceph.14 最详细的部署教程。系统:centos7,ceph版本为14.走过路过不要错过
.archanyconnect-win-3.1.00495-pre-deploy-k9.msi
webase-deploy.zip
spark-sql sql on yarn --deploy-mode cluster 改造为 可以cluster提交
maven-deploy-plugin-2.4.jar
maven-deploy-plugin-2.0.jar
maven-deploy-plugin-2.5.jar
Shipyard-deploy自动部署脚本,操作说明:sh shipyard-deploy 具体部署方法请参考:https://blog.csdn.net/qq_35175995
kubernetes-1.20.0-centos-7.9-deploy(New).zip,可以修改安装目录,并且可以安装在欧拉系kubernetes-1.20.0-centos-7.9-deploy(New).zip,可以修改安装目录,并且可以安装在欧拉系kubernetes-1.20.0-centos-7.9-...
《hook-deploy源码解析与应用实践》 在软件开发过程中,部署自动化是提升效率、减少错误的关键环节。"hook-deploy"项目提供了一种基于钩子(hook)的自动化部署解决方案,允许开发者自定义各个阶段的操作,实现灵活...
**标签“前端开源”**意味着fis3-deploy-zip是一个开放源码的项目,开发者可以查看其源代码,理解其工作原理,并且根据自身需求进行定制或贡献代码。这促进了社区的交流和共享,推动了前端开发工具的进步。 综上所...
ember-cli-deploy-build 一个ember-cli-deploy插件,用于构建您的ember-cli应用程序 该插件将构建您的ember-cli应用程序文件并将其输出到目录。 什么是ember-cli-deploy插件? 插件是一个插件,可以作为ember-cli...
ceph-deploy 是一个 Ceph 的简易部署工具,可以非常方便的部署 Ceph 集群存储系统。 标签:cephdeploy