Spark很重要的一部分是Task的schedual,以下是具体的流程图。
SchedulableBuilder分为两种,分别是FairSchedulableBuilder和FIFOSchedulableBuilder。主要是pool的getSortedTaskSetQueue方法中调用不同的taskSetSchedulingAlgorithm去排序schedulableQueue
override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = { var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager] val sortedSchedulableQueue = schedulableQueue.sortWith(taskSetSchedulingAlgorithm.comparator) for (schedulable <- sortedSchedulableQueue) { sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue() } sortedTaskSetQueue }
Schedual另一个重要的类就是SchedulerBackend。它的子类有四类分别为MesosSchedulerBackend,CoarseMesosSchedulerBackend,SimrSchedulerBackend,SparkDeploySchedulerBackend。MesosSchedulerBackend和CoarseMesosSchedulerBackend用于mesos的部署方式,SimrSchedulerBackend用于hadoop部署方式,SparkDeploySchedulerBackend用于纯spark的部署方式。
SparkDeploySchedulerBackend是CoarseGrainedSchedulerBackend的子类,它的start方法中调用了父类的start方法,并且创建了一个AppClent实例,调用client.start()方法注册到master。Master会通知work用command启动CoarseGrainedExecutorBackend。
override def start() { super.start() // The endpoint for executors to talk to us val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( conf.get("spark.driver.host"), conf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}") val command = Command( "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs) val sparkHome = sc.getSparkHome().getOrElse(null) val appDesc = new ApplicationDescription(appName, maxCores, sc.executorMemory, command, sparkHome, "http://" + sc.ui.appUIAddress) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() }
以下是clientActor处理的消息
override def receive = { case RegisteredApplication(appId_, masterUrl) => appId = appId_ registered = true changeMaster(masterUrl) listener.connected(appId) case ApplicationRemoved(message) => logError("Master removed our application: %s; stopping client".format(message)) markDisconnected() context.stop(self) case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) => val fullId = appId + "/" + id logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort, cores)) listener.executorAdded(fullId, workerId, hostPort, cores, memory) case ExecutorUpdated(id, state, message, exitStatus) => val fullId = appId + "/" + id val messageText = message.map(s => " (" + s + ")").getOrElse("") logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText)) if (ExecutorState.isFinished(state)) { listener.executorRemoved(fullId, message.getOrElse(""), exitStatus) } case MasterChanged(masterUrl, masterWebUiUrl) => logInfo("Master has changed, new master is at " + masterUrl) changeMaster(masterUrl) alreadyDisconnected = false sender ! MasterChangeAcknowledged(appId) case DisassociatedEvent(_, address, _) if address == masterAddress => logWarning(s"Connection to $address failed; waiting for master to reconnect...") markDisconnected() case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) => logWarning(s"Could not connect to $address: $cause") case StopAppClient => markDead() sender ! true context.stop(self) }
在CoarseGrainedSchedulerBackend的start方法中创建了DriverActor,并调用preStart方法, schedule了一个reviveInterval,每个reviveInterval发送一个ReviveOffers。ReviveOffers launch task。
override def preStart() { // Listen for remote client disconnection events, since they don't go through Akka's watch() context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) // Periodically revive offers to allow delay scheduling to work val reviveInterval = conf.getLong("spark.scheduler.revive.interval", 1000) import context.dispatcher context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers) }
DriverActor中处理的事件
def receive = { case RegisterExecutor(executorId, hostPort, cores) => Utils.checkHostPort(hostPort, "Host port expected " + hostPort) if (executorActor.contains(executorId)) { sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId) } else { logInfo("Registered executor: " + sender + " with ID " + executorId) sender ! RegisteredExecutor(sparkProperties) executorActor(executorId) = sender executorHost(executorId) = Utils.parseHostPort(hostPort)._1 freeCores(executorId) = cores executorAddress(executorId) = sender.path.address addressToExecutorId(sender.path.address) = executorId totalCoreCount.addAndGet(cores) makeOffers() } case StatusUpdate(executorId, taskId, state, data) => scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { if (executorActor.contains(executorId)) { freeCores(executorId) += 1 makeOffers(executorId) } else { // Ignoring the update since we don't know about the executor. val msg = "Ignored task status update (%d state %s) from unknown executor %s with ID %s" logWarning(msg.format(taskId, state, sender, executorId)) } } case ReviveOffers => makeOffers() case KillTask(taskId, executorId) => executorActor(executorId) ! KillTask(taskId, executorId) case StopDriver => sender ! true context.stop(self) case StopExecutors => logInfo("Asking each executor to shut down") for (executor <- executorActor.values) { executor ! StopExecutor } sender ! true case RemoveExecutor(executorId, reason) => removeExecutor(executorId, reason) sender ! true case DisassociatedEvent(_, address, _) => addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disassociated")) }
MesosSchedulerBackend的部署方式
在start()方法中启动了MesosSchedulerDriver。
override def start() { synchronized { classLoader = Thread.currentThread.getContextClassLoader new Thread("MesosSchedulerBackend driver") { setDaemon(true) override def run() { val scheduler = MesosSchedulerBackend.this val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build() driver = new MesosSchedulerDriver(scheduler, fwInfo, master) try { val ret = driver.run() logInfo("driver.run() returned with code " + ret) } catch { case e: Exception => logError("driver.run() failed", e) } } }.start() waitForRegister() } }
当TaskSchedulerImpl调用submitTasks()时会调用MesosSchedulerBackend的reviveOffers()。通知MesosSchedulerDriver reviveOffers。MesosSchedulerDriver 会被调用resourceOffers方法,resourceOffers方法中创建了mesosTasks,并且调用launchTasks方法提交task到mesos。bellow is resourceOffers function
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { val oldClassLoader = setClassLoader() try { synchronized { // Build a big list of the offerable workers, and remember their indices so that we can // figure out which Offer to reply to for each worker val offerableIndices = new ArrayBuffer[Int] val offerableWorkers = new ArrayBuffer[WorkerOffer] def enoughMemory(o: Offer) = { val mem = getResource(o.getResourcesList, "mem") val slaveId = o.getSlaveId.getValue mem >= sc.executorMemory || slaveIdsWithExecutors.contains(slaveId) } for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) { offerableIndices += index offerableWorkers += new WorkerOffer( offer.getSlaveId.getValue, offer.getHostname, getResource(offer.getResourcesList, "cpus").toInt) } // Call into the ClusterScheduler val taskLists = scheduler.resourceOffers(offerableWorkers) // Build a list of Mesos tasks for each slave val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]()) for ((taskList, index) <- taskLists.zipWithIndex) { if (!taskList.isEmpty) { val offerNum = offerableIndices(index) val slaveId = offers(offerNum).getSlaveId.getValue slaveIdsWithExecutors += slaveId mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size) for (taskDesc <- taskList) { taskIdToSlaveId(taskDesc.taskId) = slaveId mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId)) } } } // Reply to the offers val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout? for (i <- 0 until offers.size) { d.launchTasks(offers(i).getId, mesosTasks(i), filters) } } } finally { restoreClassLoader(oldClassLoader) } }
CoarseMesosSchedulerBackend
CoarseMesosSchedulerBackend也是mesos部署的一种,它和MesosSchedulerBackend区别在于CoarseMesosSchedulerBackend在mesos上运行一个CoarseGrainedExecutorBackend,然后CoarseMesosSchedulerBackend提交task给CoarseGrainedExecutorBackend,让CoarseGrainedExecutorBackend运行。MesosSchedulerBackend是直接将task转化为
Mesos的task,提交到mesos上运行。下面是createCommand的代码,其他和MesosSchedulerBackend类似。
def createCommand(offer: Offer, numCores: Int): CommandInfo = { val environment = Environment.newBuilder() sc.executorEnvs.foreach { case (key, value) => environment.addVariables(Environment.Variable.newBuilder() .setName(key) .setValue(value) .build()) } val command = CommandInfo.newBuilder() .setEnvironment(environment) val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( conf.get("spark.driver.host"), conf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) val uri = conf.get("spark.executor.uri", null) if (uri == null) { val runScript = new File(sparkHome, "./bin/spark-class").getCanonicalPath command.setValue( "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d".format( runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) } else { // Grab everything to the first '.'. We'll use that and '*' to // glob the directory "correctly". val basename = uri.split('/').last.split('.').head command.setValue( "cd %s*; ./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d" .format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) } command.build() }
SimrSchedulerBackend
目前还没有看懂
相关推荐
10. **社区贡献**:Spark 2.2.0还包括了大量的社区贡献,涵盖了各种功能增强和bug修复,体现了开源社区的力量。 通过深入研究Spark 2.2.0的源码,开发者可以更好地理解其内部机制,定制化自己的大数据处理流程,...
spark源码:spark-master.zip。方便不能登录GitHub的小伙伴下载。如果实在需要留言,可以私下给。
本压缩包“spark--bin-hadoop3-without-hive.tgz”提供了Spark二进制版本,针对Hadoop 3.1.3进行了编译和打包,这意味着它已经与Hadoop 3.x兼容,但不包含Hive组件。在CentOS 8操作系统上,这个版本的Spark已经被...
5. 测试安装:通过运行`spark-shell`或`pyspark`启动Spark交互式Shell,如果一切正常,你应该能够开始编写和执行Spark程序了。 四、使用实践 1. 数据读取:使用`SparkSession`对象的`read`方法,可以从HDFS加载...
1. **解析用户代码**:Driver会分析Spark程序中的transformations(转换操作)和actions(行动操作)。Transformations创建新的RDD,而actions触发实际的计算。Actions是Job的起点,因为它们会触发Spark执行计算并...
spark-hive_2.11-2.3.0 spark-hive-thriftserver_2.11-2.3.0.jar log4j-2.15.0.jar slf4j-api-1.7.7.jar slf4j-log4j12-1.7.25.jar curator-client-2.4.0.jar curator-framework-2.4.0.jar curator-recipes-2.4.0....
《Spark源码分析3——驱动器、主节点与执行器之间的连接》 在Spark的分布式计算框架中,驱动器(Driver)、主节点(Master)和执行器(Executor)是核心组件,它们之间的通信和协作构成了Spark作业执行的基础。本文将深入...
这个"spark-3.1.3-bin-without-hadoop.tgz"压缩包是Spark的3.1.3版本,不含Hadoop依赖的二进制发行版。这意味着在部署时,你需要自行配置Hadoop环境,或者在不依赖Hadoop的环境中运行Spark。 Spark的核心特性包括...
在解压`spark-2.4.7-bin-hadoop2.6.tgz`后,您会得到一个名为`spark-2.4.7-bin-hadoop2.6`的目录,其中包括以下组件: - `bin/`:包含可执行文件,如`spark-submit`,`pyspark`,`spark-shell`等,用于启动和管理...
spark-streaming-kafka-0-10_2.11-2.4.0-cdh6.1.1.jar
北风网spark课程源码spark-study-scala.rar,
Spark-3.1.2.tgz和Spark-3.1.2-bin-hadoop2.7.tgz是两个不同格式的Spark发行版,分别以tar.gz和rar压缩格式提供。 1. Spark核心概念: - RDD(弹性分布式数据集):Spark的基础数据结构,是不可变、分区的数据集合...
Spark-1.6.0-bin-hadoop2.6.tgz 是针对Linux系统的Spark安装包,包含了Spark 1.6.0版本以及与Hadoop 2.6版本兼容的构建。这个安装包为在Linux环境中搭建Spark集群提供了必要的组件和库。 **1. Spark基础知识** ...
在本场景中,我们讨论的是Spark的3.0.0版本,与Hadoop3.2相结合的二进制发行版——"spark-3.0.0-bin-hadoop3.2"。这个压缩包是为了在Windows操作系统下运行Spark而设计的,因此标签明确指出它是适用于Windows平台的...
该文件为2.4.7版本的spark包(spark-2.4.7-bin-hadoop2.7.tar.gz)
在本安装包“spark-3.2.4-bin-hadoop3.2-scala2.13”中,包含了用于运行Spark的核心组件以及依赖的Hadoop版本和Scala编程语言支持。以下是对这些关键组成部分的详细解释: 1. **Spark**: Spark的核心在于它的弹性...
在这个特定的压缩包"spark-3.1.3-bin-hadoop3.2.tgz"中,我们得到了Spark的3.1.3版本,它已经预编译为与Hadoop 3.2兼容。这个版本的Spark不仅提供了源码,还包含了预编译的二进制文件,使得在Linux环境下快速部署和...
这个名为"spark-3.2.1-bin-hadoop2.7.tgz"的压缩包是Spark的一个特定版本,即3.2.1,与Hadoop 2.7版本兼容。在Linux环境下,这样的打包方式方便用户下载、安装和运行Spark。 Spark的核心设计理念是快速数据处理,...
这个压缩包"spark-3.2.0-bin-hadoop3.2.tgz"包含了Spark 3.2.0版本的二进制文件,以及针对Hadoop 3.2的兼容构建。 Spark的核心组件包括:Spark Core、Spark SQL、Spark Streaming、MLlib(机器学习库)和GraphX(图...
在Ubuntu里安装spark,spark-2.1.0-bin-without-hadoop该版本直接下载到本地后解压即可使用。 Apache Spark 是一种用于大数据工作负载的分布式开源处理系统。它使用内存中缓存和优化的查询执行方式,可针对任何规模...