Spark围绕着BlockManager构建了存储模块,包括RDD,Shuffle,Broadcast的存储都使用了BlockManager。而BlockManager在实现上是一个针对每个应用的Master/Executor结构,即Driver上BlockManager充当了Master角色,而各个Slave上(具体到应用范围,就是Executor)的BlockManager充当了Slave角色。
因此,BlockManager在Driver和应用的各个Executor之间各有一份,Driver上的BlockManager不具备实际存储的能力,它记录了各个Executor的BlockManager的状态(通过查看BlockManagerMaster和BlockManagerMasterActor的源代码,BlockManagerMaster和BlockManagerMasterActor并没有持有一个BlockManager对象,那么每个Executor BlockManager的状态存储在什么地方?通过查看BlockManager的类注释,发现BlockManager确实运行在Driver上)。Master BlockManager和ExecutorBlockManager之间的通信也是基于Akka,消息格式定义于BlockManagerMessages类中。
上面的描述并不准确,事实上在Driver端,同Executor一样,各有一个BlockManager。除此之外,Driver上还有一个BlockManager Master,它的实现类是BlockManagerManager,因此,对于BlockManager而言,Driver既是Master也是Slave
0.BlockManager类注释:
/** * Manager running on every node (driver and executors) which provides interfaces for putting and * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap). * * Note that #initialize() must be called before the BlockManager is usable. */ private[spark] class BlockManager(
1. Master/Slave的通信内容
Master BlockManager向Executor BlockManager可以发送的消息包括:
sealed trait ToBlockManagerSlave // Remove a block from the slaves that have it. This can only be used to remove // blocks that the master knows about. case class RemoveBlock(blockId: BlockId) extends ToBlockManagerSlave // Remove all blocks belonging to a specific RDD. case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave // Remove all blocks belonging to a specific shuffle. case class RemoveShuffle(shuffleId: Int) extends ToBlockManagerSlave // Remove all blocks belonging to a specific broadcast. case class RemoveBroadcast(broadcastId: Long, removeFromDriver: Boolean = true) extends ToBlockManagerSlave
Executor BlockManager向Master BlockManager可以发送的消息包括:
sealed trait ToBlockManagerMaster case class RegisterBlockManager( blockManagerId: BlockManagerId, maxMemSize: Long, sender: ActorRef) extends ToBlockManagerMaster //获取某个Block在哪些Executor的BlockManager上 case class GetLocations(blockId: BlockId) extends ToBlockManagerMaster //获取一组Block在哪些Executor的BlockManager上 case class GetLocationsMultipleBlockIds(blockIds: Array[BlockId]) extends ToBlockManagerMaster case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster case class GetActorSystemHostPortForExecutor(executorId: String) extends ToBlockManagerMaster //删除Executor case class RemoveExecutor(execId: String) extends ToBlockManagerMaster case object StopBlockManagerMaster extends ToBlockManagerMaster case object GetMemoryStatus extends ToBlockManagerMaster case object GetStorageStatus extends ToBlockManagerMaster case class GetBlockStatus(blockId: BlockId, askSlaves: Boolean = true) extends ToBlockManagerMaster case class GetMatchingBlockIds(filter: BlockId => Boolean, askSlaves: Boolean = true) extends ToBlockManagerMaster case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster case object ExpireDeadHosts extends ToBlockManagerMaster //更新Block信息 case class UpdateBlockInfo( var blockManagerId: BlockManagerId, var blockId: BlockId, var storageLevel: StorageLevel, var memSize: Long, var diskSize: Long, var tachyonSize: Long) extends ToBlockManagerMaster with Externalizable { def this() = this(null, null, null, 0, 0, 0) // For deserialization only override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { blockManagerId.writeExternal(out) out.writeUTF(blockId.name) storageLevel.writeExternal(out) out.writeLong(memSize) out.writeLong(diskSize) out.writeLong(tachyonSize) } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { blockManagerId = BlockManagerId(in) blockId = BlockId(in.readUTF()) storageLevel = StorageLevel(in) memSize = in.readLong() diskSize = in.readLong() tachyonSize = in.readLong() } }
2. BlockManagerMasterActor说明
/** * BlockManagerMasterActor is an actor on the master node to track statuses of * all slaves' block managers. */包含的数据结构:
// Mapping from block manager id to the block manager's information. ///BlockManagerId与BlockManagerInfo之间的对应,每个Executor对应一个BlockManagerId private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo] // Mapping from executor ID to block manager ID. //Executor ID与BlockManagerID之间的对应关系 private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId] // Mapping from block id to the set of block managers that have the block. //BlockId与包含这个Block的Location(由BlockManagerId表示) private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
BlockManagerId是一个由host,port和executorId表示的数据结构,从这里也可以看出来BlockManager是Executor范围的数据结构
3. BlockManagerSlaveActor说明
/** * An actor to take commands from the master to execute options. For example, * this is used to remove blocks from the slave's BlockManager. */
BlockManagerSlaveActor包含的数据结构都体现在构造函数中了,如下所示,BlockManagerSlaveActor包含了本Executor对应的BlockManager以及该Executor的MapOutputTracker用于记录Map Shuffle输出
private[storage] class BlockManagerSlaveActor( blockManager: BlockManager, mapOutputTracker: MapOutputTracker) extends Actor with ActorLogReceive with Logging
弄清楚BlockManager的通信机制,发现要比分析BlockManager的读写数据(依赖于DiskStore和MemoryStore实现,而DiskStore又依赖于DiskBlockManager实现)复杂一些,主要是头脑中没有清晰的picuture:关于BlockManager,Driver有什么,Executor上有什么,它们之间如何通信,这个继续分析吧。
相关推荐
3. **存储系统**:BlockManager如何管理内存和磁盘上的数据,以及如何与Shuffle服务交互。 4. **网络通信**:Akka在Spark中的作用,以及如何实现高效的节点间通信。 5. **性能优化**:Tungsten项目如何提升DataFrame...
4. **分布式存储**:包括BlockManager、ShuffleManager等组件,管理任务执行中的数据存储和传输。其中,BlockManager管理内存和磁盘上的数据块,ShuffleManager则负责shuffle过程中的数据交换。 5. **资源管理**:...
内存管理在Spark中至关重要,它采用了一种称为Tachyon的内存计算存储层,以及一个称为BlockManager的组件来管理数据在内存和磁盘之间的移动。 Spark Streaming是Spark对实时数据流处理的扩展,它通过微批处理将流...
4. **BlockManager**:负责内存管理和数据存储,为RDD提供内存空间。 5. **Resilient Distributed Datasets (RDDs)**:弹性分布式数据集,提供了容错机制,当数据丢失时可以从源重新计算。 Spark SQL是Spark针对...
源码分析还包括对Spark的存储系统的研究,如BlockManager如何管理内存和磁盘上的数据块,以及如何与其他组件如CacheManager协同工作实现数据缓存。 总的来说,Spark 2.0.2的源码包含了许多关键的改进和优化,对于...
在存储方面,Spark Core利用BlockManager和CacheManager管理内存和磁盘上的数据块。BlockManager负责数据块的存储和检索,而DiskStore和MemoryStore分别处理磁盘和内存中的数据。此外,Block Transfer Service使用...
2. **内存管理**:Spark通过`Storage`模块实现内存缓存,源码中的`BlockManager`负责数据存储和回收,`MemoryManager`则管理内存分配。 3. **DAG执行**:Spark的工作流程由DAGScheduler分解为Stage,再由Task...
此外,Spark的存储系统,如BlockManager和MemoryManager,也是核心部分,它们负责数据的缓存和存储策略。 2. **模块详解** - **数据读取与转换**:在Java实现中,数据读取可能通过`SparkContext`类的API来完成,...
源码中的`StorageLevel`类定义了多种缓存级别,`BlockManager`负责管理内存块,`MemoryStore`则负责数据的存储和检索。 Spark SQL是Spark处理结构化数据的重要模块,它通过DataFrame和Dataset提供了SQL接口。...
- **RDD概述**:RDD是由Hadoop中的BlockManager管理的一组分片(Partition),这些分片分布在不同的工作节点上。RDD的主要特点包括它的只读性、分区、依赖和缓存能力。 2. **RDD的属性** - **什么是RDD**:RDD是...
`BlockManager`是Spark内部存储系统的核心组件,负责管理内存和磁盘上的数据块。同时,`org.apache.spark.shuffle.ShuffleManager`则负责数据的重排和交换,这是MapReduce模型的重要环节。 在计算模型上,Spark引入...
3. BlockManager:存储RDD的分区数据,实现内存和磁盘的混合存储。 4. Shuffle:Spark中的数据重新分布过程,涉及网络通信和数据持久化。 三、Spark源码解析 源码学习是深入了解Spark工作机制的关键。以下是一些...