`

Spark源码分析11-BlockManager

 
阅读更多

BlockManager主要在deriver和excutor构造。在deriver构造了一个BlockManagerMasterActor对象,主要负责收集block的info。在executor创建了BlockManagerMasterActor的ref,并且将ref封装到BlockManagerMaster中用于与BlockManagerMasterActor的通信。BlockManager封装了BlockManagerMaster,用于存储block,并调用BlockManagerMaster与master通信。

 

//BlockManagerMasterActor 处理的消息。updateBolckinfo主要是excutor向deriver报告block的信息。
def receive = {
    case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
      register(blockManagerId, maxMemSize, slaveActor)
      sender ! true

    case UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
      // TODO: Ideally we want to handle all the message replies in receive instead of in the
      // individual private methods.
      updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)

    case GetLocations(blockId) =>
      sender ! getLocations(blockId)

    case GetLocationsMultipleBlockIds(blockIds) =>
      sender ! getLocationsMultipleBlockIds(blockIds)

    case GetPeers(blockManagerId, size) =>
      sender ! getPeers(blockManagerId, size)

    case GetMemoryStatus =>
      sender ! memoryStatus

    case GetStorageStatus =>
      sender ! storageStatus

    case RemoveRdd(rddId) =>
      sender ! removeRdd(rddId)

    case RemoveBlock(blockId) =>
      removeBlockFromWorkers(blockId)
      sender ! true

    case RemoveExecutor(execId) =>
      removeExecutor(execId)
      sender ! true

    case StopBlockManagerMaster =>
      logInfo("Stopping BlockManagerMaster")
      sender ! true
      if (timeoutCheckingTask != null) {
        timeoutCheckingTask.cancel()
      }
      context.stop(self)

    case ExpireDeadHosts =>
      expireDeadHosts()

    case HeartBeat(blockManagerId) =>
      sender ! heartBeat(blockManagerId)

    case other =>
      logWarning("Got unknown message: " + other)
  }
 
//BlockManagerSlaveActor处理的消息,主要用于master通知client删除block和RDD
  override def receive = {

    case RemoveBlock(blockId) =>
      blockManager.removeBlock(blockId)

    case RemoveRdd(rddId) =>
      val numBlocksRemoved = blockManager.removeRdd(rddId)
      sender ! numBlocksRemoved
  }
 

 

 

//blockmanager会调用BlockManagerWorker syncPutBlock和 syncGetBlock方法去远程拿数据或者写数据到远端
private[spark] object BlockManagerWorker extends Logging {
  private var blockManagerWorker: BlockManagerWorker = null

  def startBlockManagerWorker(manager: BlockManager) {
    blockManagerWorker = new BlockManagerWorker(manager)
  }
  //用于duplicate时往远端写数据
  def syncPutBlock(msg: PutBlock, toConnManagerId: ConnectionManagerId): Boolean = {
    val blockManager = blockManagerWorker.blockManager
    val connectionManager = blockManager.connectionManager
    val blockMessage = BlockMessage.fromPutBlock(msg)
    val blockMessageArray = new BlockMessageArray(blockMessage)
    val resultMessage = connectionManager.sendMessageReliablySync(
        toConnManagerId, blockMessageArray.toBufferMessage)
    resultMessage.isDefined
  }
   //用于 task运行时获取远端的数据
  def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): ByteBuffer = {
    val blockManager = blockManagerWorker.blockManager
    val connectionManager = blockManager.connectionManager
    val blockMessage = BlockMessage.fromGetBlock(msg)
    val blockMessageArray = new BlockMessageArray(blockMessage)
    val responseMessage = connectionManager.sendMessageReliablySync(
        toConnManagerId, blockMessageArray.toBufferMessage)
    responseMessage match {
      case Some(message) => {
        val bufferMessage = message.asInstanceOf[BufferMessage]
        logDebug("Response message received " + bufferMessage)
        BlockMessageArray.fromBufferMessage(bufferMessage).foreach(blockMessage => {
            logDebug("Found " + blockMessage)
            return blockMessage.getData
          })
      }
      case None => logDebug("No response message received")
    }
    null
  }
}
 

 

    远端的BlockManagerWorker会调用onBlockMessageReceive方法用来处理TYPE_PUT_BLOCK和TYPE_GET_BLOCK 这些事件

 

//BlockManagerWorker中的方法用来处理block 的读取,然后通过connectionManager回复response
def onBlockMessageReceive(msg: Message, id: ConnectionManagerId): Option[Message] = {
    logDebug("Handling message " + msg)
    msg match {
      case bufferMessage: BufferMessage => {
        try {
          logDebug("Handling as a buffer message " + bufferMessage)
          val blockMessages = BlockMessageArray.fromBufferMessage(bufferMessage)
          logDebug("Parsed as a block message array")
          val responseMessages = blockMessages.map(processBlockMessage).filter(_ != None).map(_.get)
          Some(new BlockMessageArray(responseMessages).toBufferMessage)
        } catch {
          case e: Exception => logError("Exception handling buffer message", e)
          None
        }
      }
      case otherMessage: Any => {
        logError("Unknown type message received: " + otherMessage)
        None
      }
    }
  }

 def processBlockMessage(blockMessage: BlockMessage): Option[BlockMessage] = {
    blockMessage.getType match {
      case BlockMessage.TYPE_PUT_BLOCK => {
        val pB = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel)
        logDebug("Received [" + pB + "]")
        putBlock(pB.id, pB.data, pB.level)
        None
      }
      case BlockMessage.TYPE_GET_BLOCK => {
        val gB = new GetBlock(blockMessage.getId)
        logDebug("Received [" + gB + "]")
        val buffer = getBlock(gB.id)
        if (buffer == null) {
          return None
        }
        Some(BlockMessage.fromGotBlock(GotBlock(gB.id, buffer)))
      }
      case _ => None
    }
  }

  

ConnectonManager用于连接的建立,数据的传输和接收.主要用了nio socket

 

MemoryStore存储的结构是  private val entries = new LinkedHashMap[BlockId, Entry](32, 0.75f, true)

,在存数据前,先会查看是否有足够的memory,如果没有,会删除老的block。如果StorageLevel是useDisk,会将老的block写到disk

 

DiskStore会按照blockId中的name创建文件,并把数据写到文件中

 

ShuffleBlockManager是BlockManager的扩展,主要用于处理shuffle操作时,shufflewrite 数据的存储

分享到:
评论

相关推荐

    Spark源码分析3-The connect between driver,master and excutor

    《Spark源码分析3——驱动器、主节点与执行器之间的连接》 在Spark的分布式计算框架中,驱动器(Driver)、主节点(Master)和执行器(Executor)是核心组件,它们之间的通信和协作构成了Spark作业执行的基础。本文将深入...

    Spark源码分析.pdf

    《Spark源码分析》这本书是针对那些希望深入了解大数据处理框架Spark以及与其紧密相关的Hadoop技术的专业人士所编写的。Spark作为一个快速、通用且可扩展的数据处理引擎,已经在大数据领域占据了重要地位,而深入...

    Apache Spark源码走读之5 -- DStream处理的容错性分析

    ### Apache Spark源码走读之五:DStream处理的容错性分析 #### 环境搭建与背景 为了深入理解Apache Spark Streaming中DStream处理的容错机制,本文将从一个简单的Spark Streaming示例出发,逐步分析Spark如何确保...

    Apache Spark源码走读之3 -- Task运行期之函数调用关系分析

    ### Apache Spark源码走读之3 -- Task运行期之函数调用关系分析 #### 概述 Apache Spark作为一款高效的大数据处理框架,在其内部有着复杂的任务调度与执行机制。本文将深入探讨Spark中Task执行期间的具体流程以及...

    spark 源码解读迷你书

    《Spark源码解读迷你书》是一本专注于深入理解Apache Spark核心源码的书籍,适合对大数据处理技术有深厚兴趣并且想要探索Spark内部机制的读者。在阅读这本书之前,建议先搭建好Spark开发环境,比如使用Intelij IDEA...

    Spark core 源码解读与扩展

    在深入分析Spark Core的源码和架构时,我们可以更好地理解其内部工作机制,并能够在基础上进行扩展以满足特定的大数据处理需求。 ### Spark Core 源码解读 Spark Core源码的解读涉及到理解以下几个关键部分: 1. ...

    spark2.02源码

    源码分析还包括对Spark的存储系统的研究,如BlockManager如何管理内存和磁盘上的数据块,以及如何与其他组件如CacheManager协同工作实现数据缓存。 总的来说,Spark 2.0.2的源码包含了许多关键的改进和优化,对于...

    Spark2.2版本内核源码深度剖析.zip

    通过源码分析,我们可以了解到数据的分布策略以及BlockManager如何与其他组件如MemoryStore和DiskStore交互。 `CacheManager`(17-2,CacheManager源码剖析.html)则是Spark中的缓存系统,它负责数据的缓存策略和...

    Apache Spark源码剖析

    内存管理在Spark中至关重要,它采用了一种称为Tachyon的内存计算存储层,以及一个称为BlockManager的组件来管理数据在内存和磁盘之间的移动。 Spark Streaming是Spark对实时数据流处理的扩展,它通过微批处理将流...

    mesos-spark-源码.rar

    通过对Mesos和Spark源码的深入分析,我们可以了解到这两个项目如何高效地处理大规模数据,以及它们在分布式环境中的协同工作方式。这对于开发者来说,无论是为了优化性能,还是为了开发新的分布式应用,都是非常有...

    spark大数据商业实战三部曲源码及资料.zip

    - Shuffle过程是如何实现的,包括MapOutputTracker和BlockManager的角色。 - RDD(弹性分布式数据集)的创建、转换和行动操作的实现细节。 四、实战应用 1. 数据分析:通过Spark SQL进行ETL(提取、转换、加载)...

    Spark-dig-and-dig:Dig Spark的源代码-spark source code

    `BlockManager`是Spark内部存储系统的核心组件,负责管理内存和磁盘上的数据块。同时,`org.apache.spark.shuffle.ShuffleManager`则负责数据的重排和交换,这是MapReduce模型的重要环节。 在计算模型上,Spark引入...

    Spark源码倒腾

    源码中的`StorageLevel`类定义了多种缓存级别,`BlockManager`负责管理内存块,`MemoryStore`则负责数据的存储和检索。 Spark SQL是Spark处理结构化数据的重要模块,它通过DataFrame和Dataset提供了SQL接口。...

    spark0.6源码

    Spark 0.6.0源码分析 Spark是Apache软件基金会下的一个开源大数据处理框架,以其高效的内存计算和弹性分布式数据集(Resilient Distributed Datasets, RDDs)为核心特性,广泛应用于数据处理、机器学习和图形计算等...

    Spark原理及源码剖析1

    本文将深入探讨Spark的原理及源码分析,首先从Spark运行时的通用流程入手,然后介绍核心组件的角色与职责,以及Spark支持的不同集群部署模式。 在Spark的运行流程中,用户通过`spark-submit`提交应用程序。这个过程...

    spark-source-code-learn-note:火花学习笔记-spark source code

    3. BlockManager:存储RDD的分区数据,实现内存和磁盘的混合存储。 4. Shuffle:Spark中的数据重新分布过程,涉及网络通信和数据持久化。 三、Spark源码解析 源码学习是深入了解Spark工作机制的关键。以下是一些...

    开源力量spark公开课的ppt

    这份名为"SPARK源码分析 - Copy.pptx"的文件,很可能包含以下内容: 1. **Spark概述**:解释Spark的基本概念,包括它的设计目标、核心架构,以及与其他大数据处理框架(如Hadoop MapReduce)的区别。 2. **Spark...

    spark-java:java实现spark核心源代码

    此外,Spark的存储系统,如BlockManager和MemoryManager,也是核心部分,它们负责数据的缓存和存储策略。 2. **模块详解** - **数据读取与转换**:在Java实现中,数据读取可能通过`SparkContext`类的API来完成,...

    spark大数据实践

    4. **Block Manager**:块管理器负责管理执行器上的缓存数据,包括内存管理和磁盘管理。 #### 四、Spark高级特性 **Spark提供的高级特性包括:** 1. **Broadcast Variables**:广播变量是一种只读的变量,可以在...

    基于hadoop-2.6.0-cdh5.4.3版本的源码阅读,以注释及博客的形式记录阅读笔记.zip

    例如,通过结合 Mahout 或 Spark 等库,可以构建高效的大规模数据挖掘和分析平台。 总之,深入研究 Hadoop 2.6.0-cdh5.4.3 的源码,不仅可以提升对分布式计算的理解,也有助于优化大数据处理的效率和性能,从而在...

Global site tag (gtag.js) - Google Analytics