BlockManager主要在deriver和excutor构造。在deriver构造了一个BlockManagerMasterActor对象,主要负责收集block的info。在executor创建了BlockManagerMasterActor的ref,并且将ref封装到BlockManagerMaster中用于与BlockManagerMasterActor的通信。BlockManager封装了BlockManagerMaster,用于存储block,并调用BlockManagerMaster与master通信。
//BlockManagerMasterActor 处理的消息。updateBolckinfo主要是excutor向deriver报告block的信息。 def receive = { case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) => register(blockManagerId, maxMemSize, slaveActor) sender ! true case UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) => // TODO: Ideally we want to handle all the message replies in receive instead of in the // individual private methods. updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) case GetLocations(blockId) => sender ! getLocations(blockId) case GetLocationsMultipleBlockIds(blockIds) => sender ! getLocationsMultipleBlockIds(blockIds) case GetPeers(blockManagerId, size) => sender ! getPeers(blockManagerId, size) case GetMemoryStatus => sender ! memoryStatus case GetStorageStatus => sender ! storageStatus case RemoveRdd(rddId) => sender ! removeRdd(rddId) case RemoveBlock(blockId) => removeBlockFromWorkers(blockId) sender ! true case RemoveExecutor(execId) => removeExecutor(execId) sender ! true case StopBlockManagerMaster => logInfo("Stopping BlockManagerMaster") sender ! true if (timeoutCheckingTask != null) { timeoutCheckingTask.cancel() } context.stop(self) case ExpireDeadHosts => expireDeadHosts() case HeartBeat(blockManagerId) => sender ! heartBeat(blockManagerId) case other => logWarning("Got unknown message: " + other) }
//BlockManagerSlaveActor处理的消息,主要用于master通知client删除block和RDD override def receive = { case RemoveBlock(blockId) => blockManager.removeBlock(blockId) case RemoveRdd(rddId) => val numBlocksRemoved = blockManager.removeRdd(rddId) sender ! numBlocksRemoved }
//blockmanager会调用BlockManagerWorker syncPutBlock和 syncGetBlock方法去远程拿数据或者写数据到远端 private[spark] object BlockManagerWorker extends Logging { private var blockManagerWorker: BlockManagerWorker = null def startBlockManagerWorker(manager: BlockManager) { blockManagerWorker = new BlockManagerWorker(manager) } //用于duplicate时往远端写数据 def syncPutBlock(msg: PutBlock, toConnManagerId: ConnectionManagerId): Boolean = { val blockManager = blockManagerWorker.blockManager val connectionManager = blockManager.connectionManager val blockMessage = BlockMessage.fromPutBlock(msg) val blockMessageArray = new BlockMessageArray(blockMessage) val resultMessage = connectionManager.sendMessageReliablySync( toConnManagerId, blockMessageArray.toBufferMessage) resultMessage.isDefined } //用于 task运行时获取远端的数据 def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): ByteBuffer = { val blockManager = blockManagerWorker.blockManager val connectionManager = blockManager.connectionManager val blockMessage = BlockMessage.fromGetBlock(msg) val blockMessageArray = new BlockMessageArray(blockMessage) val responseMessage = connectionManager.sendMessageReliablySync( toConnManagerId, blockMessageArray.toBufferMessage) responseMessage match { case Some(message) => { val bufferMessage = message.asInstanceOf[BufferMessage] logDebug("Response message received " + bufferMessage) BlockMessageArray.fromBufferMessage(bufferMessage).foreach(blockMessage => { logDebug("Found " + blockMessage) return blockMessage.getData }) } case None => logDebug("No response message received") } null } }
远端的BlockManagerWorker会调用onBlockMessageReceive方法用来处理TYPE_PUT_BLOCK和TYPE_GET_BLOCK 这些事件
//BlockManagerWorker中的方法用来处理block 的读取,然后通过connectionManager回复response def onBlockMessageReceive(msg: Message, id: ConnectionManagerId): Option[Message] = { logDebug("Handling message " + msg) msg match { case bufferMessage: BufferMessage => { try { logDebug("Handling as a buffer message " + bufferMessage) val blockMessages = BlockMessageArray.fromBufferMessage(bufferMessage) logDebug("Parsed as a block message array") val responseMessages = blockMessages.map(processBlockMessage).filter(_ != None).map(_.get) Some(new BlockMessageArray(responseMessages).toBufferMessage) } catch { case e: Exception => logError("Exception handling buffer message", e) None } } case otherMessage: Any => { logError("Unknown type message received: " + otherMessage) None } } } def processBlockMessage(blockMessage: BlockMessage): Option[BlockMessage] = { blockMessage.getType match { case BlockMessage.TYPE_PUT_BLOCK => { val pB = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel) logDebug("Received [" + pB + "]") putBlock(pB.id, pB.data, pB.level) None } case BlockMessage.TYPE_GET_BLOCK => { val gB = new GetBlock(blockMessage.getId) logDebug("Received [" + gB + "]") val buffer = getBlock(gB.id) if (buffer == null) { return None } Some(BlockMessage.fromGotBlock(GotBlock(gB.id, buffer))) } case _ => None } }
ConnectonManager用于连接的建立,数据的传输和接收.主要用了nio socket
MemoryStore存储的结构是 private val entries = new LinkedHashMap[BlockId, Entry](32, 0.75f, true)
,在存数据前,先会查看是否有足够的memory,如果没有,会删除老的block。如果StorageLevel是useDisk,会将老的block写到disk
DiskStore会按照blockId中的name创建文件,并把数据写到文件中
ShuffleBlockManager是BlockManager的扩展,主要用于处理shuffle操作时,shufflewrite 数据的存储
相关推荐
《Spark源码分析3——驱动器、主节点与执行器之间的连接》 在Spark的分布式计算框架中,驱动器(Driver)、主节点(Master)和执行器(Executor)是核心组件,它们之间的通信和协作构成了Spark作业执行的基础。本文将深入...
《Spark源码分析》这本书是针对那些希望深入了解大数据处理框架Spark以及与其紧密相关的Hadoop技术的专业人士所编写的。Spark作为一个快速、通用且可扩展的数据处理引擎,已经在大数据领域占据了重要地位,而深入...
### Apache Spark源码走读之五:DStream处理的容错性分析 #### 环境搭建与背景 为了深入理解Apache Spark Streaming中DStream处理的容错机制,本文将从一个简单的Spark Streaming示例出发,逐步分析Spark如何确保...
### Apache Spark源码走读之3 -- Task运行期之函数调用关系分析 #### 概述 Apache Spark作为一款高效的大数据处理框架,在其内部有着复杂的任务调度与执行机制。本文将深入探讨Spark中Task执行期间的具体流程以及...
《Spark源码解读迷你书》是一本专注于深入理解Apache Spark核心源码的书籍,适合对大数据处理技术有深厚兴趣并且想要探索Spark内部机制的读者。在阅读这本书之前,建议先搭建好Spark开发环境,比如使用Intelij IDEA...
在深入分析Spark Core的源码和架构时,我们可以更好地理解其内部工作机制,并能够在基础上进行扩展以满足特定的大数据处理需求。 ### Spark Core 源码解读 Spark Core源码的解读涉及到理解以下几个关键部分: 1. ...
源码分析还包括对Spark的存储系统的研究,如BlockManager如何管理内存和磁盘上的数据块,以及如何与其他组件如CacheManager协同工作实现数据缓存。 总的来说,Spark 2.0.2的源码包含了许多关键的改进和优化,对于...
通过源码分析,我们可以了解到数据的分布策略以及BlockManager如何与其他组件如MemoryStore和DiskStore交互。 `CacheManager`(17-2,CacheManager源码剖析.html)则是Spark中的缓存系统,它负责数据的缓存策略和...
内存管理在Spark中至关重要,它采用了一种称为Tachyon的内存计算存储层,以及一个称为BlockManager的组件来管理数据在内存和磁盘之间的移动。 Spark Streaming是Spark对实时数据流处理的扩展,它通过微批处理将流...
通过对Mesos和Spark源码的深入分析,我们可以了解到这两个项目如何高效地处理大规模数据,以及它们在分布式环境中的协同工作方式。这对于开发者来说,无论是为了优化性能,还是为了开发新的分布式应用,都是非常有...
- Shuffle过程是如何实现的,包括MapOutputTracker和BlockManager的角色。 - RDD(弹性分布式数据集)的创建、转换和行动操作的实现细节。 四、实战应用 1. 数据分析:通过Spark SQL进行ETL(提取、转换、加载)...
`BlockManager`是Spark内部存储系统的核心组件,负责管理内存和磁盘上的数据块。同时,`org.apache.spark.shuffle.ShuffleManager`则负责数据的重排和交换,这是MapReduce模型的重要环节。 在计算模型上,Spark引入...
源码中的`StorageLevel`类定义了多种缓存级别,`BlockManager`负责管理内存块,`MemoryStore`则负责数据的存储和检索。 Spark SQL是Spark处理结构化数据的重要模块,它通过DataFrame和Dataset提供了SQL接口。...
Spark 0.6.0源码分析 Spark是Apache软件基金会下的一个开源大数据处理框架,以其高效的内存计算和弹性分布式数据集(Resilient Distributed Datasets, RDDs)为核心特性,广泛应用于数据处理、机器学习和图形计算等...
本文将深入探讨Spark的原理及源码分析,首先从Spark运行时的通用流程入手,然后介绍核心组件的角色与职责,以及Spark支持的不同集群部署模式。 在Spark的运行流程中,用户通过`spark-submit`提交应用程序。这个过程...
3. BlockManager:存储RDD的分区数据,实现内存和磁盘的混合存储。 4. Shuffle:Spark中的数据重新分布过程,涉及网络通信和数据持久化。 三、Spark源码解析 源码学习是深入了解Spark工作机制的关键。以下是一些...
这份名为"SPARK源码分析 - Copy.pptx"的文件,很可能包含以下内容: 1. **Spark概述**:解释Spark的基本概念,包括它的设计目标、核心架构,以及与其他大数据处理框架(如Hadoop MapReduce)的区别。 2. **Spark...
此外,Spark的存储系统,如BlockManager和MemoryManager,也是核心部分,它们负责数据的缓存和存储策略。 2. **模块详解** - **数据读取与转换**:在Java实现中,数据读取可能通过`SparkContext`类的API来完成,...
4. **Block Manager**:块管理器负责管理执行器上的缓存数据,包括内存管理和磁盘管理。 #### 四、Spark高级特性 **Spark提供的高级特性包括:** 1. **Broadcast Variables**:广播变量是一种只读的变量,可以在...
例如,通过结合 Mahout 或 Spark 等库,可以构建高效的大规模数据挖掘和分析平台。 总之,深入研究 Hadoop 2.6.0-cdh5.4.3 的源码,不仅可以提升对分布式计算的理解,也有助于优化大数据处理的效率和性能,从而在...