`
bit1129
  • 浏览: 1069962 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark102】Spark存储模块BlockManager剖析

 
阅读更多

Spark围绕着BlockManager构建了存储模块,包括RDD,Shuffle,Broadcast的存储都使用了BlockManager。而BlockManager在实现上是一个针对每个应用的Master/Executor结构,即Driver上BlockManager充当了Master角色,而各个Slave上(具体到应用范围,就是Executor)的BlockManager充当了Slave角色。

因此,BlockManager在Driver和应用的各个Executor之间各有一份,Driver上的BlockManager不具备实际存储的能力,它记录了各个Executor的BlockManager的状态(通过查看BlockManagerMaster和BlockManagerMasterActor的源代码,BlockManagerMaster和BlockManagerMasterActor并没有持有一个BlockManager对象,那么每个Executor BlockManager的状态存储在什么地方?通过查看BlockManager的类注释,发现BlockManager确实运行在Driver上)。Master BlockManager和ExecutorBlockManager之间的通信也是基于Akka,消息格式定义于BlockManagerMessages类中。

 

上面的描述并不准确,事实上在Driver端,同Executor一样,各有一个BlockManager。除此之外,Driver上还有一个BlockManager Master,它的实现类是BlockManagerManager,因此,对于BlockManager而言,Driver既是Master也是Slave

 

0.BlockManager类注释:

 

/**
 * Manager running on every node (driver and executors) which provides interfaces for putting and
 * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
 *
 * Note that #initialize() must be called before the BlockManager is usable.
 */
private[spark] class BlockManager(

 

 

1. Master/Slave的通信内容

Master BlockManager向Executor BlockManager可以发送的消息包括:

 

 sealed trait ToBlockManagerSlave

  // Remove a block from the slaves that have it. This can only be used to remove
  // blocks that the master knows about.
  case class RemoveBlock(blockId: BlockId) extends ToBlockManagerSlave

  // Remove all blocks belonging to a specific RDD.
  case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave

  // Remove all blocks belonging to a specific shuffle.
  case class RemoveShuffle(shuffleId: Int) extends ToBlockManagerSlave

  // Remove all blocks belonging to a specific broadcast.
  case class RemoveBroadcast(broadcastId: Long, removeFromDriver: Boolean = true)
    extends ToBlockManagerSlave

 

Executor BlockManager向Master BlockManager可以发送的消息包括:

 

 

 sealed trait ToBlockManagerMaster

  case class RegisterBlockManager(
      blockManagerId: BlockManagerId,
      maxMemSize: Long,
      sender: ActorRef)
    extends ToBlockManagerMaster

  //获取某个Block在哪些Executor的BlockManager上  
  case class GetLocations(blockId: BlockId) extends ToBlockManagerMaster

  //获取一组Block在哪些Executor的BlockManager上
  case class GetLocationsMultipleBlockIds(blockIds: Array[BlockId]) extends ToBlockManagerMaster
 
  case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster

  case class GetActorSystemHostPortForExecutor(executorId: String) extends ToBlockManagerMaster

  //删除Executor
  case class RemoveExecutor(execId: String) extends ToBlockManagerMaster

  
  case object StopBlockManagerMaster extends ToBlockManagerMaster

  case object GetMemoryStatus extends ToBlockManagerMaster

  case object GetStorageStatus extends ToBlockManagerMaster

  case class GetBlockStatus(blockId: BlockId, askSlaves: Boolean = true)
    extends ToBlockManagerMaster

  case class GetMatchingBlockIds(filter: BlockId => Boolean, askSlaves: Boolean = true)
    extends ToBlockManagerMaster

  case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster

  case object ExpireDeadHosts extends ToBlockManagerMaster

  //更新Block信息
  case class UpdateBlockInfo(
      var blockManagerId: BlockManagerId,
      var blockId: BlockId,
      var storageLevel: StorageLevel,
      var memSize: Long,
      var diskSize: Long,
      var tachyonSize: Long)
    extends ToBlockManagerMaster
    with Externalizable {

    def this() = this(null, null, null, 0, 0, 0)  // For deserialization only

    override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
      blockManagerId.writeExternal(out)
      out.writeUTF(blockId.name)
      storageLevel.writeExternal(out)
      out.writeLong(memSize)
      out.writeLong(diskSize)
      out.writeLong(tachyonSize)
    }

    override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
      blockManagerId = BlockManagerId(in)
      blockId = BlockId(in.readUTF())
      storageLevel = StorageLevel(in)
      memSize = in.readLong()
      diskSize = in.readLong()
      tachyonSize = in.readLong()
    }
  }

 

 

2. BlockManagerMasterActor说明

 

/**
 * BlockManagerMasterActor is an actor on the master node to track statuses of
 * all slaves' block managers.
 */
 包含的数据结构:

 

 

 

  // Mapping from block manager id to the block manager's information.
  ///BlockManagerId与BlockManagerInfo之间的对应,每个Executor对应一个BlockManagerId
  private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]

  // Mapping from executor ID to block manager ID.
  //Executor ID与BlockManagerID之间的对应关系
  private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]

  // Mapping from block id to the set of block managers that have the block.
  //BlockId与包含这个Block的Location(由BlockManagerId表示)
  private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
 

 

BlockManagerId是一个由host,port和executorId表示的数据结构,从这里也可以看出来BlockManager是Executor范围的数据结构

 

 

3. BlockManagerSlaveActor说明

 

/**
 * An actor to take commands from the master to execute options. For example,
 * this is used to remove blocks from the slave's BlockManager.
 */

BlockManagerSlaveActor包含的数据结构都体现在构造函数中了,如下所示,BlockManagerSlaveActor包含了本Executor对应的BlockManager以及该Executor的MapOutputTracker用于记录Map Shuffle输出

private[storage]
class BlockManagerSlaveActor(
    blockManager: BlockManager,
    mapOutputTracker: MapOutputTracker)
  extends Actor with ActorLogReceive with Logging

 

弄清楚BlockManager的通信机制,发现要比分析BlockManager的读写数据(依赖于DiskStore和MemoryStore实现,而DiskStore又依赖于DiskBlockManager实现)复杂一些,主要是头脑中没有清晰的picuture:关于BlockManager,Driver有什么,Executor上有什么,它们之间如何通信,这个继续分析吧。

 

 

 

 

 

分享到:
评论

相关推荐

    ApacheSpark设计与实现.pdf+ApacheSpark源码剖析.pdf+Spark原著中文版.pdf

    3. **存储系统**:BlockManager如何管理内存和磁盘上的数据,以及如何与Shuffle服务交互。 4. **网络通信**:Akka在Spark中的作用,以及如何实现高效的节点间通信。 5. **性能优化**:Tungsten项目如何提升DataFrame...

    Spark core 源码解读与扩展

    4. **分布式存储**:包括BlockManager、ShuffleManager等组件,管理任务执行中的数据存储和传输。其中,BlockManager管理内存和磁盘上的数据块,ShuffleManager则负责shuffle过程中的数据交换。 5. **资源管理**:...

    Apache Spark源码剖析

    内存管理在Spark中至关重要,它采用了一种称为Tachyon的内存计算存储层,以及一个称为BlockManager的组件来管理数据在内存和磁盘之间的移动。 Spark Streaming是Spark对实时数据流处理的扩展,它通过微批处理将流...

    spark大数据处理技术

    4. **BlockManager**:负责内存管理和数据存储,为RDD提供内存空间。 5. **Resilient Distributed Datasets (RDDs)**:弹性分布式数据集,提供了容错机制,当数据丢失时可以从源重新计算。 Spark SQL是Spark针对...

    spark2.02源码

    源码分析还包括对Spark的存储系统的研究,如BlockManager如何管理内存和磁盘上的数据块,以及如何与其他组件如CacheManager协同工作实现数据缓存。 总的来说,Spark 2.0.2的源码包含了许多关键的改进和优化,对于...

    Spark Core介绍

    在存储方面,Spark Core利用BlockManager和CacheManager管理内存和磁盘上的数据块。BlockManager负责数据块的存储和检索,而DiskStore和MemoryStore分别处理磁盘和内存中的数据。此外,Block Transfer Service使用...

    mesos-spark-源码.rar

    2. **内存管理**:Spark通过`Storage`模块实现内存缓存,源码中的`BlockManager`负责数据存储和回收,`MemoryManager`则管理内存分配。 3. **DAG执行**:Spark的工作流程由DAGScheduler分解为Stage,再由Task...

    spark-java:java实现spark核心源代码

    此外,Spark的存储系统,如BlockManager和MemoryManager,也是核心部分,它们负责数据的缓存和存储策略。 2. **模块详解** - **数据读取与转换**:在Java实现中,数据读取可能通过`SparkContext`类的API来完成,...

    Spark源码倒腾

    源码中的`StorageLevel`类定义了多种缓存级别,`BlockManager`负责管理内存块,`MemoryStore`则负责数据的存储和检索。 Spark SQL是Spark处理结构化数据的重要模块,它通过DataFrame和Dataset提供了SQL接口。...

    Spark Core 应用解析

    - **RDD概述**:RDD是由Hadoop中的BlockManager管理的一组分片(Partition),这些分片分布在不同的工作节点上。RDD的主要特点包括它的只读性、分区、依赖和缓存能力。 2. **RDD的属性** - **什么是RDD**:RDD是...

    Spark-dig-and-dig:Dig Spark的源代码-spark source code

    `BlockManager`是Spark内部存储系统的核心组件,负责管理内存和磁盘上的数据块。同时,`org.apache.spark.shuffle.ShuffleManager`则负责数据的重排和交换,这是MapReduce模型的重要环节。 在计算模型上,Spark引入...

    spark-source-code-learn-note:火花学习笔记-spark source code

    3. BlockManager:存储RDD的分区数据,实现内存和磁盘的混合存储。 4. Shuffle:Spark中的数据重新分布过程,涉及网络通信和数据持久化。 三、Spark源码解析 源码学习是深入了解Spark工作机制的关键。以下是一些...

Global site tag (gtag.js) - Google Analytics