`

Spark中Blockmanager相关代码解析

阅读更多
前一段时间看了如何划分stage以及如何提交Job, 最后把结果返回到Driver端的过程, 中间也涉及到了通过blockManager来获取Data等过程。 这两天花了点时间看了一下blockmanager是如何工作的, 在这里记录一下。

看了一下源代码, 这里有几个主要的对象:
1.BlockManager
2.BlockManagerMaster
3.BlockManagerMasterEndpoint
4.BlockManagerSlaveEndpoint
5.SparkEnv

BlockManagerMaster是在Driver端, 用来管理整个集群的BlockManager的, 当BlockManager初始化的时候, 会向Driver端的Master注册。

BlockManagerSlaveEndpoint是在executor端执行Master的操作命令用的。

比如说BlockManagerMaster要做removeBlock的时候 会执行BlockManagerMasterEndpoint中的removeBlockFromWorkers, 继而在executor端的BlockManagerSlaveEndpoint中执行RemoveBlock, 完成任务

那么先看一下SparkContext (driver端)中的_env创建:

_env = createSparkEnv(_conf, isLocal, listenerBus)


  private[spark] def createSparkEnv(
      conf: SparkConf,
      isLocal: Boolean,
      listenerBus: LiveListenerBus): SparkEnv = {
    SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))
  }


可以看到直接调用了SparkEnv类的createDriverEnv方法:
  private[spark] def createDriverEnv(
      conf: SparkConf,
      isLocal: Boolean,
      listenerBus: LiveListenerBus,
      numCores: Int,
      mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
    assert(conf.contains("spark.driver.host"), "spark.driver.host is not set on the driver!")
    assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
    val hostname = conf.get("spark.driver.host")
    val port = conf.get("spark.driver.port").toInt
    create(
      conf,
      SparkContext.DRIVER_IDENTIFIER,
      hostname,
      port,
      isDriver = true,
      isLocal = isLocal,
      numUsableCores = numCores,
      listenerBus = listenerBus,
      mockOutputCommitCoordinator = mockOutputCommitCoordinator
    )
  }



继续调用create方法:
 private def create(
      conf: SparkConf,
      executorId: String,
      hostname: String,
      port: Int,
      isDriver: Boolean,
      isLocal: Boolean,
      numUsableCores: Int,
      listenerBus: LiveListenerBus = null,
      mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
	 
	...

	 
	  // Create the ActorSystem for Akka and get the port it binds to.
    val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
    val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager,
      clientMode = !isDriver)
	  
	  val actorSystem: ActorSystem =
      if (rpcEnv.isInstanceOf[AkkaRpcEnv]) {
        rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem
      } else {
        val actorSystemPort =
          if (port == 0 || rpcEnv.address == null) {
            port
          } else {
            rpcEnv.address.port + 1
          }
        // Create a ActorSystem for legacy codes
        AkkaUtils.createActorSystem(
          actorSystemName + "ActorSystem",
          hostname,
          actorSystemPort,
          conf,
          securityManager
        )._1
      }
	  
	  ...
	  
	   def registerOrLookupEndpoint(
        name: String, endpointCreator: => RpcEndpoint):
      RpcEndpointRef = {
      if (isDriver) {
        logInfo("Registering " + name)
        rpcEnv.setupEndpoint(name, endpointCreator)
      } else {
        RpcUtils.makeDriverRef(name, conf, rpcEnv)
      }
    }
	...
	  
	 val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores)

    val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
      BlockManagerMaster.DRIVER_ENDPOINT_NAME,
      new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
      conf, isDriver)

	   val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
      serializer, conf, memoryManager, mapOutputTracker, shuffleManager,
      blockTransferService, securityManager, numUsableCores)

    val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)

    val cacheManager = new CacheManager(blockManager)
		
		
		...
		
		 val envInstance = new SparkEnv(
      executorId,
      rpcEnv,
      actorSystem,
      serializer,
      closureSerializer,
      cacheManager,
      mapOutputTracker,
      shuffleManager,
      broadcastManager,
      blockTransferService,
      blockManager,
      securityManager,
      sparkFilesDir,
      metricsSystem,
      memoryManager,
      outputCommitCoordinator,
      conf)

    // Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is
    // called, and we only need to do it for driver. Because driver may run as a service, and if we
    // don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.
    if (isDriver) {
      envInstance.driverTmpDirToDelete = Some(sparkFilesDir)
    }

    envInstance
	
	  }


整个类很长, 截取了一部分比较重要的内容来说:
首先创建了一个server端的RpcEnv。
然后创建了一个registerOrLookupEndpoint函数。 再创建了blockTransferService (传输block data用)

继续创建一个blockManagerMaster并且endpoint类是BlockManagerMasterEndpoint (通过registerOrLookupEndpoint判断是Driver)

再创建了一个blockManager, 这个是在Driver端的blockManager。

后面陆续创建了broadcastManager, cacheManager

最后new了一个SparkEnv返回。

到这里为止, 几个重要的对象都被创建了, 但是这些都是在Driver端的。

那么对应的executor端是在哪里创建的呢, 我们来看一下CoarseGrainedExecutorBackend的创建executor方法:

private[spark] class CoarseGrainedExecutorBackend(
    override val rpcEnv: RpcEnv,
    driverUrl: String,
    executorId: String,
    hostPort: String,
    cores: Int,
    userClassPath: Seq[URL],
    env: SparkEnv)
  extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {

...

    case RegisteredExecutor(hostname) =>
      logInfo("Successfully registered with driver")
      executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)

...

}


看到参数env了吧, 这个是在实例化CoarseGrainedExecutorBackend就传入进来的,那么看一下实例化这个类的地方:
private[spark] object CoarseGrainedExecutorBackend extends Logging {

  private def run(
      driverUrl: String,
      executorId: String,
      hostname: String,
      cores: Int,
      appId: String,
      workerUrl: Option[String],
      userClassPath: Seq[URL]) {

    SignalLogger.register(log)

    SparkHadoopUtil.get.runAsSparkUser { () =>
      // Debug code
      Utils.checkHost(hostname)

      // Bootstrap to fetch the driver's Spark properties.
      val executorConf = new SparkConf
      val port = executorConf.getInt("spark.executor.port", 0)
      val fetcher = RpcEnv.create(
        "driverPropsFetcher",
        hostname,
        port,
        executorConf,
        new SecurityManager(executorConf),
        clientMode = true)
      val driver = fetcher.setupEndpointRefByURI(driverUrl)
      val props = driver.askWithRetry[Seq[(String, String)]](RetrieveSparkProps) ++
        Seq[(String, String)](("spark.app.id", appId))
      fetcher.shutdown()

      // Create SparkEnv using properties we fetched from the driver.
      val driverConf = new SparkConf()
      for ((key, value) <- props) {
        // this is required for SSL in standalone mode
        if (SparkConf.isExecutorStartupConf(key)) {
          driverConf.setIfMissing(key, value)
        } else {
          driverConf.set(key, value)
        }
      }
      if (driverConf.contains("spark.yarn.credentials.file")) {
        logInfo("Will periodically update credentials from: " +
          driverConf.get("spark.yarn.credentials.file"))
        SparkHadoopUtil.get.startExecutorDelegationTokenRenewer(driverConf)
      }

      val env = SparkEnv.createExecutorEnv(
        driverConf, executorId, hostname, port, cores, isLocal = false)

      // SparkEnv will set spark.executor.port if the rpc env is listening for incoming
      // connections (e.g., if it's using akka). Otherwise, the executor is running in
      // client mode only, and does not accept incoming connections.
      val sparkHostPort = env.conf.getOption("spark.executor.port").map { port =>
          hostname + ":" + port
        }.orNull
      env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
        env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, userClassPath, env))
      workerUrl.foreach { url =>
        env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
      }
      env.rpcEnv.awaitTermination()
      SparkHadoopUtil.get.stopExecutorDelegationTokenRenewer()
    }
  }


中间的env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
        env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, userClassPath, env)) 创建的, 而这里的env确是通过
  val env = SparkEnv.createExecutorEnv(
        driverConf, executorId, hostname, port, cores, isLocal = false)

创建的。

这里直接调用了createExecutorEnv, 那么我们看一下SparkEnv里面这个方法是怎么写的:
  private[spark] def createExecutorEnv(
      conf: SparkConf,
      executorId: String,
      hostname: String,
      port: Int,
      numCores: Int,
      isLocal: Boolean): SparkEnv = {
    val env = create(
      conf,
      executorId,
      hostname,
      port,
      isDriver = false,
      isLocal = isLocal,
      numUsableCores = numCores
    )
    SparkEnv.set(env)
    env
  }


还是调用create方法, 但是传入的isDriver却是false, 当为false的时候在创建过程中这几个点会被影响
1.blocManagerMaster

executor端master创建的时候由于isDriver是false, 那么就会调用
RpcUtils.makeDriverRef(name, conf, rpcEnv)
从而返回DriverRef,一会看blockManager初始化的时候会向这里创建的master注册, 由于这里isDriver是false, 所以executor端的blockManager都是向driver端的master注册的。

好了, 到这里后Driver和executor端的各种object都已经创建了, 那么我们来看一下blockManager是怎么初始化的, 来看executor的创建:

private[spark] class Executor(
    executorId: String,
    executorHostname: String,
    env: SparkEnv,
    userClassPath: Seq[URL] = Nil,
    isLocal: Boolean = false)
  extends Logging {

  ...

  if (!isLocal) {
    // Setup an uncaught exception handler for non-local mode.
    // Make any thread terminations due to uncaught exceptions kill the entire
    // executor process to avoid surprising stalls.
    Thread.setDefaultUncaughtExceptionHandler(SparkUncaughtExceptionHandler)
  }

  // Start worker thread pool
  private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")
  private val executorSource = new ExecutorSource(threadPool, executorId)

  if (!isLocal) {
    env.metricsSystem.registerSource(executorSource)
    env.blockManager.initialize(conf.getAppId)
  }

  ...
  }

看到了把, blockManager是在executor创建的时候一并被初始化的, 我们来看一下初始化的方法:

  def initialize(appId: String): Unit = {
    blockTransferService.init(this)
    shuffleClient.init(appId)

    blockManagerId = BlockManagerId(
      executorId, blockTransferService.hostName, blockTransferService.port)

    shuffleServerId = if (externalShuffleServiceEnabled) {
      logInfo(s"external shuffle service port = $externalShuffleServicePort")
      BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
    } else {
      blockManagerId
    }

    master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)

    // Register Executors' configuration with the local shuffle service, if one should exist.
    if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
      registerWithExternalShuffleServer()
    }
  }


可以看到这里创建了一个blockManagerId, 它是由hostname port和executorid来创建的, 理论上是不会重复的。

后面就是注册的动作了
master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)

这里的slaveEndpoint是BlockManager里面这样定义的:
关联到BlockManagerSlaveEndpoint
  private val slaveEndpoint = rpcEnv.setupEndpoint(
    "BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next,
    new BlockManagerSlaveEndpoint(rpcEnv, this, mapOutputTracker))


由于上面说的, executor的master的endpoint其实是Driver的enpoint, 那么这里registerBlockManager就是远程执行Driver (BlockManagerMasterEndpointer)里面的registerBlockManager方法:

master类中:
  def registerBlockManager(
      blockManagerId: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): Unit = {
    logInfo("Trying to register BlockManager")
    tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint))
    logInfo("Registered BlockManager")
  }


调用endpointer中:
  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint) =>
      register(blockManagerId, maxMemSize, slaveEndpoint)
      context.reply(true)
...


调用其register方法:

  private def register(id: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef) {
    val time = System.currentTimeMillis()
    if (!blockManagerInfo.contains(id)) {
      blockManagerIdByExecutor.get(id.executorId) match {
        case Some(oldId) =>
          // A block manager of the same executor already exists, so remove it (assumed dead)
          logError("Got two different block manager registrations on same executor - "
              + s" will replace old one $oldId with new one $id")
          removeExecutor(id.executorId)
        case None =>
      }
      logInfo("Registering block manager %s with %s RAM, %s".format(
        id.hostPort, Utils.bytesToString(maxMemSize), id))

      blockManagerIdByExecutor(id.executorId) = id

      blockManagerInfo(id) = new BlockManagerInfo(
        id, System.currentTimeMillis(), maxMemSize, slaveEndpoint)
    }
    listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
  }


这里主要做的就是如果这个executor id已经有存在的blockmanager了, 那么就删除, 再重新将blockmanagerID添加到blockManagerIdByExecutor里面, 创建一个BlockManagerInfo, info里面存了id memory和 slaveEndpoint。 其中slaveEndpoint会在后面master要删block的时候调用, 从而在executor端执行。


ok, 到现在blockmanager和master以及两个engpointor之间已经相互关联起来了, 举个列子, 如果 blockManagerMaster要删除block, 那么会做如下动作:
blockManagerMaster中的removeBlock:

  def removeBlock(blockId: BlockId) {
    driverEndpoint.askWithRetry[Boolean](RemoveBlock(blockId))
  }

他会找自己的endpoint类(BlockManagerMasterEndpoint)去执行RemoveBlock方法:
    case RemoveBlock(blockId) =>
      removeBlockFromWorkers(blockId)
      context.reply(true)


endpoint 里面执行removeBlockFromWorkers方法:
 private def removeBlockFromWorkers(blockId: BlockId) {
    val locations = blockLocations.get(blockId)
    if (locations != null) {
      locations.foreach { blockManagerId: BlockManagerId =>
        val blockManager = blockManagerInfo.get(blockManagerId)
        if (blockManager.isDefined) {
          // Remove the block from the slave's BlockManager.
          // Doesn't actually wait for a confirmation and the message might get lost.
          // If message loss becomes frequent, we should add retry logic here.
          blockManager.get.slaveEndpoint.ask[Boolean](RemoveBlock(blockId))
        }
      }
    }
  }


可以看到里面会先从blockManagerInfo里面拿出对应的blockmanagerinfo:
val blockManager = blockManagerInfo.get(blockManagerId)

然后执行:
blockManager.get.slaveEndpoint.ask[Boolean](RemoveBlock(blockId))

这样就去slaveendpoint去执行RemoveBlock方法了, 而slaveendpoint就是BlockManagerSlaveEndpoint,我们看一下他的removeBlock方法:
    case RemoveBlock(blockId) =>
      doAsync[Boolean]("removing block " + blockId, context) {
        blockManager.removeBlock(blockId)
        true
      }


他会从executor端把block删除掉。

可以看到本地执行的方法有很多, 这些都是通过master来调用slave的方法:
 override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case RemoveBlock(blockId) =>
      doAsync[Boolean]("removing block " + blockId, context) {
        blockManager.removeBlock(blockId)
        true
      }

    case RemoveRdd(rddId) =>
      doAsync[Int]("removing RDD " + rddId, context) {
        blockManager.removeRdd(rddId)
      }

    case RemoveShuffle(shuffleId) =>
      doAsync[Boolean]("removing shuffle " + shuffleId, context) {
        if (mapOutputTracker != null) {
          mapOutputTracker.unregisterShuffle(shuffleId)
        }
        SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId)
      }

    case RemoveBroadcast(broadcastId, _) =>
      doAsync[Int]("removing broadcast " + broadcastId, context) {
        blockManager.removeBroadcast(broadcastId, tellMaster = true)
      }

    case GetBlockStatus(blockId, _) =>
      context.reply(blockManager.getStatus(blockId))

    case GetMatchingBlockIds(filter, _) =>
      context.reply(blockManager.getMatchingBlockIds(filter))

    case TriggerThreadDump =>
      context.reply(Utils.getThreadDump())
  }


好了, 到这里为止, 基本上理清楚了
1.BlockManager
2.BlockManagerMaster
3.BlockManagerMasterEndpoint
4.BlockManagerSlaveEndpoint
5.SparkEnv
这些对象的关系, 和执行方式, 下回有时间再写一下blockmanager中对数据处理方法
0
0
分享到:
评论

相关推荐

    Spark内核解析.docx

    《Spark内核解析》 Spark作为大数据处理领域的明星框架,其内核设计的高效和灵活性使其在处理大规模数据时展现出卓越性能。Spark的核心运行机制主要包括Spark核心组件、任务调度、内存管理和核心功能的运行原理。...

    spark-广播变量基础及源码解析

    HttpBroadcast 会在 Driver 端的 BlockManager 里面存储广播变量对象,并将该广播变量序列化写入文件中去。所有数据请求都在 Driver 端,所以存在单点故障和网络 Io 性能问题。 ### TorrentBroadcast ...

    SPARK源代码

    下面,我们将从多个角度详细解析Spark源码中的关键知识点。 1. **RDD(弹性分布式数据集)**:Spark的核心数据抽象是RDD,它是一种不可变、分区的记录集合。RDD支持并行操作,并通过血统关系(lineage)实现容错。...

    ApacheSpark设计与实现.pdf+ApacheSpark源码剖析.pdf+Spark原著中文版.pdf

    3. **存储系统**:BlockManager如何管理内存和磁盘上的数据,以及如何与Shuffle服务交互。 4. **网络通信**:Akka在Spark中的作用,以及如何实现高效的节点间通信。 5. **性能优化**:Tungsten项目如何提升DataFrame...

    spark大数据商业实战三部曲源码及资料.zip

    这套资源包含了书中的源码和相关资料,旨在帮助读者深入理解Spark并提升在实际项目中的应用能力。 一、Spark概述 Spark是一个分布式计算框架,以其高效、易用和多模态处理能力而闻名。它支持批处理、交互式查询(如...

    Spark2.2版本内核源码深度剖析.zip

    在数据存储方面,`BlockManager`(16-2,BlockManager源码剖析中.html、16-2,BlockManager源码剖析下.html)是关键组件,负责在内存和磁盘之间管理数据块。BlockManager不仅实现了数据的存储,还提供了数据共享和...

    Apache Spark源码走读之3 -- Task运行期之函数调用关系分析

    本文将深入探讨Spark中Task执行期间的具体流程以及相关函数调用的关系。 #### 准备工作 在开始之前,请确保满足以下条件: 1. **Spark已安装**:确保您的系统中已正确安装Spark。 2. **运行模式**:Spark可以在...

    spark-java:java实现spark核心源代码

    Java实现中,BlockManager和MemoryManager会处理数据在内存中的分配和回收,可能包括LRU淘汰策略的实现。 - **容错与恢复**:Spark通过检查点和RDD lineage实现容错。在Java版本中,可能会有对错误检测、任务重试...

    开源力量spark公开课的ppt

    3. **源码解析**:深入Spark的源代码,讲解关键类和方法,如DAGScheduler、TaskScheduler、Storage层的BlockManager等,帮助理解Spark如何调度任务和管理数据。 4. **编程模型**:解释Spark的API使用,包括Scala、...

    Spark原理及源码剖析1

    Executor还负责通过Block Manager为应用程序缓存RDD。 Spark支持三种集群管理模式: 1. Standalone:Spark的原生集群管理器,可单独部署,无需依赖其他资源管理系统。 2. Hadoop YARN:统一资源管理机制,可运行...

    Spark源码倒腾

    在`org.apache.spark.sql`包下,可以找到SQL查询的解析、优化和执行的相关代码。 Spark Streaming处理实时数据流,它将时间窗口内的数据分批处理,形成DStream(Discretized Stream)。DStream是由一系列RDD组成的...

    Spark-dig-and-dig:Dig Spark的源代码-spark source code

    `BlockManager`是Spark内部存储系统的核心组件,负责管理内存和磁盘上的数据块。同时,`org.apache.spark.shuffle.ShuffleManager`则负责数据的重排和交换,这是MapReduce模型的重要环节。 在计算模型上,Spark引入...

    Spark-source-code-description-spark source code

    在`org.apache.spark.storage`和`org.apache.spark.scheduler`包下,可以看到关于BlockManager、BlockReplicationManager和TaskFailureListener的实现,它们协同工作以应对节点故障。 七、调度策略 Spark的调度...

Global site tag (gtag.js) - Google Analytics