- 浏览: 73879 次
前一段时间看了如何划分stage以及如何提交Job, 最后把结果返回到Driver端的过程, 中间也涉及到了通过blockManager来获取Data等过程。 这两天花了点时间看了一下blockmanager是如何工作的, 在这里记录一下。
看了一下源代码, 这里有几个主要的对象:
1.BlockManager
2.BlockManagerMaster
3.BlockManagerMasterEndpoint
4.BlockManagerSlaveEndpoint
5.SparkEnv
BlockManagerMaster是在Driver端, 用来管理整个集群的BlockManager的, 当BlockManager初始化的时候, 会向Driver端的Master注册。
BlockManagerSlaveEndpoint是在executor端执行Master的操作命令用的。
比如说BlockManagerMaster要做removeBlock的时候 会执行BlockManagerMasterEndpoint中的removeBlockFromWorkers, 继而在executor端的BlockManagerSlaveEndpoint中执行RemoveBlock, 完成任务
那么先看一下SparkContext (driver端)中的_env创建:
_env = createSparkEnv(_conf, isLocal, listenerBus)
可以看到直接调用了SparkEnv类的createDriverEnv方法:
继续调用create方法:
整个类很长, 截取了一部分比较重要的内容来说:
首先创建了一个server端的RpcEnv。
然后创建了一个registerOrLookupEndpoint函数。 再创建了blockTransferService (传输block data用)
继续创建一个blockManagerMaster并且endpoint类是BlockManagerMasterEndpoint (通过registerOrLookupEndpoint判断是Driver)
再创建了一个blockManager, 这个是在Driver端的blockManager。
后面陆续创建了broadcastManager, cacheManager
最后new了一个SparkEnv返回。
到这里为止, 几个重要的对象都被创建了, 但是这些都是在Driver端的。
那么对应的executor端是在哪里创建的呢, 我们来看一下CoarseGrainedExecutorBackend的创建executor方法:
看到参数env了吧, 这个是在实例化CoarseGrainedExecutorBackend就传入进来的,那么看一下实例化这个类的地方:
中间的env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, userClassPath, env)) 创建的, 而这里的env确是通过
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, port, cores, isLocal = false)
创建的。
这里直接调用了createExecutorEnv, 那么我们看一下SparkEnv里面这个方法是怎么写的:
还是调用create方法, 但是传入的isDriver却是false, 当为false的时候在创建过程中这几个点会被影响
1.blocManagerMaster
executor端master创建的时候由于isDriver是false, 那么就会调用
RpcUtils.makeDriverRef(name, conf, rpcEnv)
从而返回DriverRef,一会看blockManager初始化的时候会向这里创建的master注册, 由于这里isDriver是false, 所以executor端的blockManager都是向driver端的master注册的。
好了, 到这里后Driver和executor端的各种object都已经创建了, 那么我们来看一下blockManager是怎么初始化的, 来看executor的创建:
看了一下源代码, 这里有几个主要的对象:
1.BlockManager
2.BlockManagerMaster
3.BlockManagerMasterEndpoint
4.BlockManagerSlaveEndpoint
5.SparkEnv
BlockManagerMaster是在Driver端, 用来管理整个集群的BlockManager的, 当BlockManager初始化的时候, 会向Driver端的Master注册。
BlockManagerSlaveEndpoint是在executor端执行Master的操作命令用的。
比如说BlockManagerMaster要做removeBlock的时候 会执行BlockManagerMasterEndpoint中的removeBlockFromWorkers, 继而在executor端的BlockManagerSlaveEndpoint中执行RemoveBlock, 完成任务
那么先看一下SparkContext (driver端)中的_env创建:
_env = createSparkEnv(_conf, isLocal, listenerBus)
private[spark] def createSparkEnv( conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus): SparkEnv = { SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master)) }
可以看到直接调用了SparkEnv类的createDriverEnv方法:
private[spark] def createDriverEnv( conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus, numCores: Int, mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = { assert(conf.contains("spark.driver.host"), "spark.driver.host is not set on the driver!") assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!") val hostname = conf.get("spark.driver.host") val port = conf.get("spark.driver.port").toInt create( conf, SparkContext.DRIVER_IDENTIFIER, hostname, port, isDriver = true, isLocal = isLocal, numUsableCores = numCores, listenerBus = listenerBus, mockOutputCommitCoordinator = mockOutputCommitCoordinator ) }
继续调用create方法:
private def create( conf: SparkConf, executorId: String, hostname: String, port: Int, isDriver: Boolean, isLocal: Boolean, numUsableCores: Int, listenerBus: LiveListenerBus = null, mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = { ... // Create the ActorSystem for Akka and get the port it binds to. val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager, clientMode = !isDriver) val actorSystem: ActorSystem = if (rpcEnv.isInstanceOf[AkkaRpcEnv]) { rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem } else { val actorSystemPort = if (port == 0 || rpcEnv.address == null) { port } else { rpcEnv.address.port + 1 } // Create a ActorSystem for legacy codes AkkaUtils.createActorSystem( actorSystemName + "ActorSystem", hostname, actorSystemPort, conf, securityManager )._1 } ... def registerOrLookupEndpoint( name: String, endpointCreator: => RpcEndpoint): RpcEndpointRef = { if (isDriver) { logInfo("Registering " + name) rpcEnv.setupEndpoint(name, endpointCreator) } else { RpcUtils.makeDriverRef(name, conf, rpcEnv) } } ... val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores) val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint( BlockManagerMaster.DRIVER_ENDPOINT_NAME, new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)), conf, isDriver) val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster, serializer, conf, memoryManager, mapOutputTracker, shuffleManager, blockTransferService, securityManager, numUsableCores) val broadcastManager = new BroadcastManager(isDriver, conf, securityManager) val cacheManager = new CacheManager(blockManager) ... val envInstance = new SparkEnv( executorId, rpcEnv, actorSystem, serializer, closureSerializer, cacheManager, mapOutputTracker, shuffleManager, broadcastManager, blockTransferService, blockManager, securityManager, sparkFilesDir, metricsSystem, memoryManager, outputCommitCoordinator, conf) // Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is // called, and we only need to do it for driver. Because driver may run as a service, and if we // don't delete this tmp dir when sc is stopped, then will create too many tmp dirs. if (isDriver) { envInstance.driverTmpDirToDelete = Some(sparkFilesDir) } envInstance }
整个类很长, 截取了一部分比较重要的内容来说:
首先创建了一个server端的RpcEnv。
然后创建了一个registerOrLookupEndpoint函数。 再创建了blockTransferService (传输block data用)
继续创建一个blockManagerMaster并且endpoint类是BlockManagerMasterEndpoint (通过registerOrLookupEndpoint判断是Driver)
再创建了一个blockManager, 这个是在Driver端的blockManager。
后面陆续创建了broadcastManager, cacheManager
最后new了一个SparkEnv返回。
到这里为止, 几个重要的对象都被创建了, 但是这些都是在Driver端的。
那么对应的executor端是在哪里创建的呢, 我们来看一下CoarseGrainedExecutorBackend的创建executor方法:
private[spark] class CoarseGrainedExecutorBackend( override val rpcEnv: RpcEnv, driverUrl: String, executorId: String, hostPort: String, cores: Int, userClassPath: Seq[URL], env: SparkEnv) extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging { ... case RegisteredExecutor(hostname) => logInfo("Successfully registered with driver") executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) ... }
看到参数env了吧, 这个是在实例化CoarseGrainedExecutorBackend就传入进来的,那么看一下实例化这个类的地方:
private[spark] object CoarseGrainedExecutorBackend extends Logging { private def run( driverUrl: String, executorId: String, hostname: String, cores: Int, appId: String, workerUrl: Option[String], userClassPath: Seq[URL]) { SignalLogger.register(log) SparkHadoopUtil.get.runAsSparkUser { () => // Debug code Utils.checkHost(hostname) // Bootstrap to fetch the driver's Spark properties. val executorConf = new SparkConf val port = executorConf.getInt("spark.executor.port", 0) val fetcher = RpcEnv.create( "driverPropsFetcher", hostname, port, executorConf, new SecurityManager(executorConf), clientMode = true) val driver = fetcher.setupEndpointRefByURI(driverUrl) val props = driver.askWithRetry[Seq[(String, String)]](RetrieveSparkProps) ++ Seq[(String, String)](("spark.app.id", appId)) fetcher.shutdown() // Create SparkEnv using properties we fetched from the driver. val driverConf = new SparkConf() for ((key, value) <- props) { // this is required for SSL in standalone mode if (SparkConf.isExecutorStartupConf(key)) { driverConf.setIfMissing(key, value) } else { driverConf.set(key, value) } } if (driverConf.contains("spark.yarn.credentials.file")) { logInfo("Will periodically update credentials from: " + driverConf.get("spark.yarn.credentials.file")) SparkHadoopUtil.get.startExecutorDelegationTokenRenewer(driverConf) } val env = SparkEnv.createExecutorEnv( driverConf, executorId, hostname, port, cores, isLocal = false) // SparkEnv will set spark.executor.port if the rpc env is listening for incoming // connections (e.g., if it's using akka). Otherwise, the executor is running in // client mode only, and does not accept incoming connections. val sparkHostPort = env.conf.getOption("spark.executor.port").map { port => hostname + ":" + port }.orNull env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend( env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, userClassPath, env)) workerUrl.foreach { url => env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url)) } env.rpcEnv.awaitTermination() SparkHadoopUtil.get.stopExecutorDelegationTokenRenewer() } }
中间的env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, userClassPath, env)) 创建的, 而这里的env确是通过
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, port, cores, isLocal = false)
创建的。
这里直接调用了createExecutorEnv, 那么我们看一下SparkEnv里面这个方法是怎么写的:
private[spark] def createExecutorEnv( conf: SparkConf, executorId: String, hostname: String, port: Int, numCores: Int, isLocal: Boolean): SparkEnv = { val env = create( conf, executorId, hostname, port, isDriver = false, isLocal = isLocal, numUsableCores = numCores ) SparkEnv.set(env) env }
还是调用create方法, 但是传入的isDriver却是false, 当为false的时候在创建过程中这几个点会被影响
1.blocManagerMaster
executor端master创建的时候由于isDriver是false, 那么就会调用
RpcUtils.makeDriverRef(name, conf, rpcEnv)
从而返回DriverRef,一会看blockManager初始化的时候会向这里创建的master注册, 由于这里isDriver是false, 所以executor端的blockManager都是向driver端的master注册的。
好了, 到这里后Driver和executor端的各种object都已经创建了, 那么我们来看一下blockManager是怎么初始化的, 来看executor的创建:
private[spark] class Executor( executorId: String, executorHostname: String, env: SparkEnv, userClassPath: Seq[URL] = Nil, isLocal: Boolean = false) extends Logging { ... if (!isLocal) { // Setup an uncaught exception handler for non-local mode. // Make any thread terminations due to uncaught exceptions kill the entire // executor process to avoid surprising stalls. Thread.setDefaultUncaughtExceptionHandler(SparkUncaughtExceptionHandler) } // Start worker thread pool private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker") private val executorSource = new ExecutorSource(threadPool, executorId) if (!isLocal) { env.metricsSystem.registerSource(executorSource) env.blockManager.initialize(conf.getAppId) } ... } 看到了把, blockManager是在executor创建的时候一并被初始化的, 我们来看一下初始化的方法:def initialize(appId: String): Unit = { blockTransferService.init(this) shuffleClient.init(appId) blockManagerId = BlockManagerId( executorId, blockTransferService.hostName, blockTransferService.port) shuffleServerId = if (externalShuffleServiceEnabled) { logInfo(s"external shuffle service port = $externalShuffleServicePort") BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort) } else { blockManagerId } master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint) // Register Executors' configuration with the local shuffle service, if one should exist. if (externalShuffleServiceEnabled && !blockManagerId.isDriver) { registerWithExternalShuffleServer() } }
可以看到这里创建了一个blockManagerId, 它是由hostname port和executorid来创建的, 理论上是不会重复的。
后面就是注册的动作了
master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)
这里的slaveEndpoint是BlockManager里面这样定义的:
关联到BlockManagerSlaveEndpointprivate val slaveEndpoint = rpcEnv.setupEndpoint( "BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next, new BlockManagerSlaveEndpoint(rpcEnv, this, mapOutputTracker))
由于上面说的, executor的master的endpoint其实是Driver的enpoint, 那么这里registerBlockManager就是远程执行Driver (BlockManagerMasterEndpointer)里面的registerBlockManager方法:
master类中:def registerBlockManager( blockManagerId: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): Unit = { logInfo("Trying to register BlockManager") tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint)) logInfo("Registered BlockManager") }
调用endpointer中:override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint) => register(blockManagerId, maxMemSize, slaveEndpoint) context.reply(true) ...
调用其register方法:private def register(id: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef) { val time = System.currentTimeMillis() if (!blockManagerInfo.contains(id)) { blockManagerIdByExecutor.get(id.executorId) match { case Some(oldId) => // A block manager of the same executor already exists, so remove it (assumed dead) logError("Got two different block manager registrations on same executor - " + s" will replace old one $oldId with new one $id") removeExecutor(id.executorId) case None => } logInfo("Registering block manager %s with %s RAM, %s".format( id.hostPort, Utils.bytesToString(maxMemSize), id)) blockManagerIdByExecutor(id.executorId) = id blockManagerInfo(id) = new BlockManagerInfo( id, System.currentTimeMillis(), maxMemSize, slaveEndpoint) } listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize)) }
这里主要做的就是如果这个executor id已经有存在的blockmanager了, 那么就删除, 再重新将blockmanagerID添加到blockManagerIdByExecutor里面, 创建一个BlockManagerInfo, info里面存了id memory和 slaveEndpoint。 其中slaveEndpoint会在后面master要删block的时候调用, 从而在executor端执行。
ok, 到现在blockmanager和master以及两个engpointor之间已经相互关联起来了, 举个列子, 如果 blockManagerMaster要删除block, 那么会做如下动作:
blockManagerMaster中的removeBlock:
def removeBlock(blockId: BlockId) { driverEndpoint.askWithRetry[Boolean](RemoveBlock(blockId)) }
他会找自己的endpoint类(BlockManagerMasterEndpoint)去执行RemoveBlock方法:case RemoveBlock(blockId) => removeBlockFromWorkers(blockId) context.reply(true)
endpoint 里面执行removeBlockFromWorkers方法:private def removeBlockFromWorkers(blockId: BlockId) { val locations = blockLocations.get(blockId) if (locations != null) { locations.foreach { blockManagerId: BlockManagerId => val blockManager = blockManagerInfo.get(blockManagerId) if (blockManager.isDefined) { // Remove the block from the slave's BlockManager. // Doesn't actually wait for a confirmation and the message might get lost. // If message loss becomes frequent, we should add retry logic here. blockManager.get.slaveEndpoint.ask[Boolean](RemoveBlock(blockId)) } } } }
可以看到里面会先从blockManagerInfo里面拿出对应的blockmanagerinfo:
val blockManager = blockManagerInfo.get(blockManagerId)
然后执行:
blockManager.get.slaveEndpoint.ask[Boolean](RemoveBlock(blockId))
这样就去slaveendpoint去执行RemoveBlock方法了, 而slaveendpoint就是BlockManagerSlaveEndpoint,我们看一下他的removeBlock方法:case RemoveBlock(blockId) => doAsync[Boolean]("removing block " + blockId, context) { blockManager.removeBlock(blockId) true }
他会从executor端把block删除掉。
可以看到本地执行的方法有很多, 这些都是通过master来调用slave的方法:override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RemoveBlock(blockId) => doAsync[Boolean]("removing block " + blockId, context) { blockManager.removeBlock(blockId) true } case RemoveRdd(rddId) => doAsync[Int]("removing RDD " + rddId, context) { blockManager.removeRdd(rddId) } case RemoveShuffle(shuffleId) => doAsync[Boolean]("removing shuffle " + shuffleId, context) { if (mapOutputTracker != null) { mapOutputTracker.unregisterShuffle(shuffleId) } SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId) } case RemoveBroadcast(broadcastId, _) => doAsync[Int]("removing broadcast " + broadcastId, context) { blockManager.removeBroadcast(broadcastId, tellMaster = true) } case GetBlockStatus(blockId, _) => context.reply(blockManager.getStatus(blockId)) case GetMatchingBlockIds(filter, _) => context.reply(blockManager.getMatchingBlockIds(filter)) case TriggerThreadDump => context.reply(Utils.getThreadDump()) }
好了, 到这里为止, 基本上理清楚了
1.BlockManager
2.BlockManagerMaster
3.BlockManagerMasterEndpoint
4.BlockManagerSlaveEndpoint
5.SparkEnv
这些对象的关系, 和执行方式, 下回有时间再写一下blockmanager中对数据处理方法发表评论
-
kafka + flume + hdfs + zookeeper + spark 测试环境搭建
2017-07-20 11:28 1105最近由于项目需要, 搭建了一个类似线上环境的处理流数据的环境 ... -
源码跟踪executor如何写数据到blockmanager, 以及如何从blockmanager读数据
2016-08-10 19:41 1415之前看了Job怎么submit 以 ... -
Spark在submitStage后如何通过clustermanager调度执行task到Driver接收计算结果的代码解析
2016-08-01 14:08 1472前文: http://humingminghz.iteye.c ... -
Spark中saveAsTextFile至stage划分和job提交的源代码分析
2016-07-29 14:20 3360之前看了Spark Streaming和Spark SQL, ... -
SparkSQL DF.agg 执行过程解析
2016-07-19 10:21 4120在上一篇文章前, 我一直没看懂为什么下面的代码就能得到max或 ... -
SparkSQL SQL语句解析过程源代码浅析
2016-07-15 19:34 6641前两天一直在忙本职工 ... -
SparkSQL SQL语句解析过程浅析
2016-07-15 19:06 0前两天一直在忙本职工 ... -
SparkStreaming从启动Receiver到收取数据生成RDD的代码浅析
2016-07-08 17:54 2233前面一片文章介绍了SocketTextStream 是如何从b ... -
Sparkstreaming是如何获取数据组成Dstream的源码浅析
2016-07-08 11:23 1472前面一篇文章介绍了SparkStreaming是如何不停的循环 ... -
SparkSQL 使用SQLContext读取csv文件 分析数据 (含部分数据)
2016-07-06 11:24 10149前两天开始研究SparkSQL, 其主要分为HiveConte ... -
SparkStreaming是如何完成不停的循环处理的代码浅析
2016-07-02 12:26 4654一直很好奇Sparkstreaming的ssc.start是怎 ... -
SparkStreaming 对Window的reduce的方法解析
2016-06-30 11:57 4727在sparkstreaming中对窗口 ... -
Sparkstreaming reduceByKeyAndWindow(_+_, _-_, Duration, Duration) 的源码/原理解析
2016-06-29 19:50 8790最近在玩spark streaming, 感觉到了他的强大。 ... -
关于Eclipse开发环境下 Spark+Kafka 获取topic的时候连接出错
2016-06-28 17:20 7403林林总总玩了Spark快一个月了, 打算试一下kafka的消息 ...
相关推荐
《Spark内核解析》 Spark作为大数据处理领域的明星框架,其内核设计的高效和灵活性使其在处理大规模数据时展现出卓越性能。Spark的核心运行机制主要包括Spark核心组件、任务调度、内存管理和核心功能的运行原理。...
HttpBroadcast 会在 Driver 端的 BlockManager 里面存储广播变量对象,并将该广播变量序列化写入文件中去。所有数据请求都在 Driver 端,所以存在单点故障和网络 Io 性能问题。 ### TorrentBroadcast ...
下面,我们将从多个角度详细解析Spark源码中的关键知识点。 1. **RDD(弹性分布式数据集)**:Spark的核心数据抽象是RDD,它是一种不可变、分区的记录集合。RDD支持并行操作,并通过血统关系(lineage)实现容错。...
3. **存储系统**:BlockManager如何管理内存和磁盘上的数据,以及如何与Shuffle服务交互。 4. **网络通信**:Akka在Spark中的作用,以及如何实现高效的节点间通信。 5. **性能优化**:Tungsten项目如何提升DataFrame...
在数据存储方面,`BlockManager`(16-2,BlockManager源码剖析中.html、16-2,BlockManager源码剖析下.html)是关键组件,负责在内存和磁盘之间管理数据块。BlockManager不仅实现了数据的存储,还提供了数据共享和...
这套资源包含了书中的源码和相关资料,旨在帮助读者深入理解Spark并提升在实际项目中的应用能力。 一、Spark概述 Spark是一个分布式计算框架,以其高效、易用和多模态处理能力而闻名。它支持批处理、交互式查询(如...
本文将深入探讨Spark中Task执行期间的具体流程以及相关函数调用的关系。 #### 准备工作 在开始之前,请确保满足以下条件: 1. **Spark已安装**:确保您的系统中已正确安装Spark。 2. **运行模式**:Spark可以在...
Java实现中,BlockManager和MemoryManager会处理数据在内存中的分配和回收,可能包括LRU淘汰策略的实现。 - **容错与恢复**:Spark通过检查点和RDD lineage实现容错。在Java版本中,可能会有对错误检测、任务重试...
3. **源码解析**:深入Spark的源代码,讲解关键类和方法,如DAGScheduler、TaskScheduler、Storage层的BlockManager等,帮助理解Spark如何调度任务和管理数据。 4. **编程模型**:解释Spark的API使用,包括Scala、...
Executor还负责通过Block Manager为应用程序缓存RDD。 Spark支持三种集群管理模式: 1. Standalone:Spark的原生集群管理器,可单独部署,无需依赖其他资源管理系统。 2. Hadoop YARN:统一资源管理机制,可运行...
在`org.apache.spark.sql`包下,可以找到SQL查询的解析、优化和执行的相关代码。 Spark Streaming处理实时数据流,它将时间窗口内的数据分批处理,形成DStream(Discretized Stream)。DStream是由一系列RDD组成的...
`BlockManager`是Spark内部存储系统的核心组件,负责管理内存和磁盘上的数据块。同时,`org.apache.spark.shuffle.ShuffleManager`则负责数据的重排和交换,这是MapReduce模型的重要环节。 在计算模型上,Spark引入...
在`org.apache.spark.storage`和`org.apache.spark.scheduler`包下,可以看到关于BlockManager、BlockReplicationManager和TaskFailureListener的实现,它们协同工作以应对节点故障。 七、调度策略 Spark的调度...