- 浏览: 73878 次
之前看了Job怎么submit 以及最后run的, 然后也看了blockmanager是怎么工作的, 那么接下来就是要看spark是如何从blockManager中读写数据的。
首先每个计算都是在对应机器的executor的task上面运行的, 那么计算完后也是从executor端开始写入的, 根据之前文章的解析, 我们知道最后Task是在executor的TaskRunner中执行的, 其中在数据操作端, 计算完成后如果resultSize大于Akka可以传输的size的话, 就会存储到block中, 然后通过Driver这边的taskscheduler来从executor端的blockmanager中获取对应block的信息, executor的taskrunner中是通过这段代码来存数据的:
可以看到如果数据小的话直接就返回给Driver了, 如果数据大的话, 那么就通过
env.blockManager.putBytes(
blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
放到对应的block里面, 等Driver端来取。 这里直接调用了blockManager的putBytes方法 (这个env是slave端的, 所以blockmanager也是slave端的), 看下putbytes:
直接调用了doPut方法, 这个方法蛮长的, 具体可以慢慢看, 挑几个重点的地方写一下:
可以看到代码里面是调用了各种store的putBytes方法 (或者putIterator, putArray)
那么我们拿memorystore来看一下, putBytes方法:
可以看到如果是选择MemoryOnly的话就会去执行tryToPut方法存放数据, 看一下这个方法怎么做的:
到目前为止, 如果内存足够, 这个data就放到entries里面了, 那么接下来如果要从blockmanager里面获取数据呢, 比如说Driver端要从executor这边把数据拿回去, 我们看一下, 在taskschedulerimpl的statusUpdate是Driver端获取数据的方法, 里面调用了:
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
这个方法是实际去取数据的, 我们看一下
看到了把, 如果返回的是IndirectTaskResult, 那么就会根据blockID去blockManager去拿:
val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
这里的blockManager应该还是Driver端的blockmanager, 我们看一下getRemoteBytes方法:
直接调用doGetRemote:
首先 会通过:
val locations = Random.shuffle(master.getLocations(blockId))
从master那边获取到blockId的所有location, 然后一个一个location这边取回, 取回的方式是通过:
blockTransferService.fetchBlockSync(
loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
这个blockTransferService是我们在创建BlockManager的时候一起创建的然后和BlockManager一起初始化的。
这里通过NIO的方式从executor这边取回了数据。 总的存结果取结果的大致路径就是这个样子的, 拿了MemoryOnly做了一个列子, 其他方式就按这个流程跟踪一遍肯定明白。
首先每个计算都是在对应机器的executor的task上面运行的, 那么计算完后也是从executor端开始写入的, 根据之前文章的解析, 我们知道最后Task是在executor的TaskRunner中执行的, 其中在数据操作端, 计算完成后如果resultSize大于Akka可以传输的size的话, 就会存储到block中, 然后通过Driver这边的taskscheduler来从executor端的blockmanager中获取对应block的信息, executor的taskrunner中是通过这段代码来存数据的:
val serializedResult: ByteBuffer = { if (maxResultSize > 0 && resultSize > maxResultSize) { logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " + s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " + s"dropping it.") ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize)) } else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { val blockId = TaskResultBlockId(taskId) env.blockManager.putBytes( blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER) logInfo( s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)") ser.serialize(new IndirectTaskResult[Any](blockId, resultSize)) } else { logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver") serializedDirectResult } } execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
可以看到如果数据小的话直接就返回给Driver了, 如果数据大的话, 那么就通过
env.blockManager.putBytes(
blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
放到对应的block里面, 等Driver端来取。 这里直接调用了blockManager的putBytes方法 (这个env是slave端的, 所以blockmanager也是slave端的), 看下putbytes:
def putBytes( blockId: BlockId, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true, effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = { require(bytes != null, "Bytes is null") doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel) }
直接调用了doPut方法, 这个方法蛮长的, 具体可以慢慢看, 挑几个重点的地方写一下:
private def doPut( blockId: BlockId, data: BlockValues, level: StorageLevel, tellMaster: Boolean = true, effectiveStorageLevel: Option[StorageLevel] = None) : Seq[(BlockId, BlockStatus)] = { ... val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] //获取BlockInfo如果已经存在则获取, 不然创建新的 val putBlockInfo = { val tinfo = new BlockInfo(level, tellMaster) // Do atomically ! val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo) if (oldBlockOpt.isDefined) { if (oldBlockOpt.get.waitForReady()) { logWarning(s"Block $blockId already exists on this machine; not re-adding it") return updatedBlocks } // TODO: So the block info exists - but previous attempt to load it (?) failed. // What do we do now ? Retry on it ? oldBlockOpt.get } else { tinfo } } ... putBlockInfo.synchronized { ... //根据配置, 来确定是用memorystore还是diskstore或者externalBlockStore val (returnValues, blockStore: BlockStore) = { if (putLevel.useMemory) { // Put it in memory first, even if it also has useDisk set to true; // We will drop it to disk later if the memory store can't hold it. (true, memoryStore) } else if (putLevel.useOffHeap) { // Use external block store (false, externalBlockStore) } else if (putLevel.useDisk) { // Don't get back the bytes from put unless we replicate them (putLevel.replication > 1, diskStore) } else { assert(putLevel == StorageLevel.NONE) throw new BlockException( blockId, s"Attempted to put block $blockId without specifying storage level!") } } ... //根据不一样的类型, 存放数据到memory中或者放到磁盘上 val result = data match { case IteratorValues(iterator) => blockStore.putIterator(blockId, iterator, putLevel, returnValues) case ArrayValues(array) => blockStore.putArray(blockId, array, putLevel, returnValues) case ByteBufferValues(bytes) => bytes.rewind() blockStore.putBytes(blockId, bytes, putLevel) ... val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo) if (putBlockStatus.storageLevel != StorageLevel.NONE) { // Now that the block is in either the memory, externalBlockStore, or disk store, // let other threads read it, and tell the master about it. marked = true putBlockInfo.markReady(size) if (tellMaster) { reportBlockStatus(blockId, putBlockInfo, putBlockStatus) } updatedBlocks += ((blockId, putBlockStatus)) } ... } ... }
可以看到代码里面是调用了各种store的putBytes方法 (或者putIterator, putArray)
那么我们拿memorystore来看一下, putBytes方法:
override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = { // Work on a duplicate - since the original input might be used elsewhere. val bytes = _bytes.duplicate() bytes.rewind() //如果选择memoryonly, 则值为false, 来源StorageLevel定义: //val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) if (level.deserialized) { val values = blockManager.dataDeserialize(blockId, bytes) 存放数据 putIterator(blockId, values, level, returnValues = true) } else { val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] //存放数据 tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks) PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks) } }
可以看到如果是选择MemoryOnly的话就会去执行tryToPut方法存放数据, 看一下这个方法怎么做的:
private def tryToPut( blockId: BlockId, value: Any, size: Long, deserialized: Boolean, droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { tryToPut(blockId, () => value, size, deserialized, droppedBlocks) } private def tryToPut( blockId: BlockId, value: () => Any, size: Long, deserialized: Boolean, droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { /* TODO: Its possible to optimize the locking by locking entries only when selecting blocks * to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has * been released, it must be ensured that those to-be-dropped blocks are not double counted * for freeing up more space for another block that needs to be put. Only then the actually * dropping of blocks (and writing to disk if necessary) can proceed in parallel. */ memoryManager.synchronized { // Note: if we have previously unrolled this block successfully, then pending unroll // memory should be non-zero. This is the amount that we already reserved during the // unrolling process. In this case, we can just reuse this space to cache our block. // The synchronization on `memoryManager` here guarantees that the release and acquire // happen atomically. This relies on the assumption that all memory acquisitions are // synchronized on the same lock. releasePendingUnrollMemoryForThisTask() //判断是否有足够内存 val enoughMemory = memoryManager.acquireStorageMemory(blockId, size, droppedBlocks) if (enoughMemory) { // We acquired enough memory for the block, so go ahead and put it //如果有足够内存, 那么直接存到内存里, 其实就是放到entries里面就算放到内存中了 val entry = new MemoryEntry(value(), size, deserialized) entries.synchronized { entries.put(blockId, entry) } val valuesOrBytes = if (deserialized) "values" else "bytes" logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format( blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed))) } else { // Tell the block manager that we couldn't put it in memory so that it can drop it to // disk if the block allows disk storage. //如果没有足够内存, 那么就看我们是否允许将数据放到磁盘上, 我们选的MemoryOnly的话deserialized是false lazy val data = if (deserialized) { Left(value().asInstanceOf[Array[Any]]) } else { deserialized } val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data) droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) } } enoughMemory } }
到目前为止, 如果内存足够, 这个data就放到entries里面了, 那么接下来如果要从blockmanager里面获取数据呢, 比如说Driver端要从executor这边把数据拿回去, 我们看一下, 在taskschedulerimpl的statusUpdate是Driver端获取数据的方法, 里面调用了:
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
这个方法是实际去取数据的, 我们看一下
def enqueueSuccessfulTask( taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) { getTaskResultExecutor.execute(new Runnable { override def run(): Unit = Utils.logUncaughtExceptions { try { val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match { case directResult: DirectTaskResult[_] => if (!taskSetManager.canFetchMoreResults(serializedData.limit())) { return } // deserialize "value" without holding any lock so that it won't block other threads. // We should call it here, so that when it's called again in // "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value. directResult.value() (directResult, serializedData.limit()) case IndirectTaskResult(blockId, size) => if (!taskSetManager.canFetchMoreResults(size)) { // dropped by executor if size is larger than maxResultSize sparkEnv.blockManager.master.removeBlock(blockId) return } logDebug("Fetching indirect task result for TID %s".format(tid)) scheduler.handleTaskGettingResult(taskSetManager, tid) val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId) if (!serializedTaskResult.isDefined) { /* We won't be able to get the task result if the machine that ran the task failed * between when the task ended and when we tried to fetch the result, or if the * block manager had to flush the result. */ scheduler.handleFailedTask( taskSetManager, tid, TaskState.FINISHED, TaskResultLost) return } val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]]( serializedTaskResult.get) sparkEnv.blockManager.master.removeBlock(blockId) (deserializedResult, size) } result.metrics.setResultSize(size) scheduler.handleSuccessfulTask(taskSetManager, tid, result) } catch { case cnf: ClassNotFoundException => val loader = Thread.currentThread.getContextClassLoader taskSetManager.abort("ClassNotFound with classloader: " + loader) // Matching NonFatal so we don't catch the ControlThrowable from the "return" above. case NonFatal(ex) => logError("Exception while getting task result", ex) taskSetManager.abort("Exception while getting task result: %s".format(ex)) } } }) }
看到了把, 如果返回的是IndirectTaskResult, 那么就会根据blockID去blockManager去拿:
val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
这里的blockManager应该还是Driver端的blockmanager, 我们看一下getRemoteBytes方法:
def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = { logDebug(s"Getting remote block $blockId as bytes") doGetRemote(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]] }
直接调用doGetRemote:
private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = { require(blockId != null, "BlockId is null") val locations = Random.shuffle(master.getLocations(blockId)) var numFetchFailures = 0 for (loc <- locations) { logDebug(s"Getting remote block $blockId from $loc") val data = try { blockTransferService.fetchBlockSync( loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer() } catch { case NonFatal(e) => numFetchFailures += 1 if (numFetchFailures == locations.size) { // An exception is thrown while fetching this block from all locations throw new BlockFetchException(s"Failed to fetch block from" + s" ${locations.size} locations. Most recent failure cause:", e) } else { // This location failed, so we retry fetch from a different one by returning null here logWarning(s"Failed to fetch remote block $blockId " + s"from $loc (failed attempt $numFetchFailures)", e) null } } if (data != null) { if (asBlockResult) { return Some(new BlockResult( dataDeserialize(blockId, data), DataReadMethod.Network, data.limit())) } else { return Some(data) } } logDebug(s"The value of block $blockId is null") } logDebug(s"Block $blockId not found") None }
首先 会通过:
val locations = Random.shuffle(master.getLocations(blockId))
从master那边获取到blockId的所有location, 然后一个一个location这边取回, 取回的方式是通过:
blockTransferService.fetchBlockSync(
loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
这个blockTransferService是我们在创建BlockManager的时候一起创建的然后和BlockManager一起初始化的。
这里通过NIO的方式从executor这边取回了数据。 总的存结果取结果的大致路径就是这个样子的, 拿了MemoryOnly做了一个列子, 其他方式就按这个流程跟踪一遍肯定明白。
发表评论
-
kafka + flume + hdfs + zookeeper + spark 测试环境搭建
2017-07-20 11:28 1105最近由于项目需要, 搭建了一个类似线上环境的处理流数据的环境 ... -
Spark中Blockmanager相关代码解析
2016-08-04 19:47 1843前一段时间看了如何划分stage以及如何提交Job, 最后把结 ... -
Spark在submitStage后如何通过clustermanager调度执行task到Driver接收计算结果的代码解析
2016-08-01 14:08 1472前文: http://humingminghz.iteye.c ... -
Spark中saveAsTextFile至stage划分和job提交的源代码分析
2016-07-29 14:20 3360之前看了Spark Streaming和Spark SQL, ... -
SparkSQL DF.agg 执行过程解析
2016-07-19 10:21 4120在上一篇文章前, 我一直没看懂为什么下面的代码就能得到max或 ... -
SparkSQL SQL语句解析过程源代码浅析
2016-07-15 19:34 6641前两天一直在忙本职工 ... -
SparkSQL SQL语句解析过程浅析
2016-07-15 19:06 0前两天一直在忙本职工 ... -
SparkStreaming从启动Receiver到收取数据生成RDD的代码浅析
2016-07-08 17:54 2233前面一片文章介绍了SocketTextStream 是如何从b ... -
Sparkstreaming是如何获取数据组成Dstream的源码浅析
2016-07-08 11:23 1472前面一篇文章介绍了SparkStreaming是如何不停的循环 ... -
SparkSQL 使用SQLContext读取csv文件 分析数据 (含部分数据)
2016-07-06 11:24 10149前两天开始研究SparkSQL, 其主要分为HiveConte ... -
SparkStreaming是如何完成不停的循环处理的代码浅析
2016-07-02 12:26 4654一直很好奇Sparkstreaming的ssc.start是怎 ... -
SparkStreaming 对Window的reduce的方法解析
2016-06-30 11:57 4727在sparkstreaming中对窗口 ... -
Sparkstreaming reduceByKeyAndWindow(_+_, _-_, Duration, Duration) 的源码/原理解析
2016-06-29 19:50 8790最近在玩spark streaming, 感觉到了他的强大。 ... -
关于Eclipse开发环境下 Spark+Kafka 获取topic的时候连接出错
2016-06-28 17:20 7403林林总总玩了Spark快一个月了, 打算试一下kafka的消息 ...
相关推荐
- **OneForOneBlockFetcher**:用于从远程BlockManager获取数据块,实现跨Executor的数据传输。 - **OneForOneStreamManager**:管理数据流状态,通过ConcurrentHashMap存储chunks,确保数据传输的正确性。 5. **...
通过源码分析,我们可以了解到数据的分布策略以及BlockManager如何与其他组件如MemoryStore和DiskStore交互。 `CacheManager`(17-2,CacheManager源码剖析.html)则是Spark中的缓存系统,它负责数据的缓存策略和...
Executor之间通过`BlockManager`进行数据交换,使用`RDD`的分区信息在Executor之间进行数据传输。 5. **源码分析**: 在源码层面,`SparkContext`的初始化过程中,会创建`SparkDeploySchedulerBackend`,它实现了`...
请求数据的时候会先获取 block 的所有存储位置信息请求服务是随机的在所有存储了该 Executor 的 BlockManager 去获取,避免了数据请求服务集中于一点。 结论 ---------- 广播变量是 Spark 中的一种高效的数据共享...
例如,DAGScheduler如何将作业拆分成Stage,TaskScheduler如何将任务分配到Executor,以及如何利用BlockManager和Storage层实现内存管理。此外,Shuffle服务、网络通信库Tachyon和Akka框架也是深入理解Spark的重要...
其中,BlockManager管理内存和磁盘上的数据块,ShuffleManager则负责shuffle过程中的数据交换。 5. **资源管理**:Spark支持多种资源管理方式,包括Standalone模式、Yarn、Mesos以及本地模式,不同的模式适用于不同...
Executor还负责通过Block Manager为应用程序缓存RDD。 Spark支持三种集群管理模式: 1. Standalone:Spark的原生集群管理器,可单独部署,无需依赖其他资源管理系统。 2. Hadoop YARN:统一资源管理机制,可运行...
2. **内存管理**:Spark通过`Storage`模块实现内存缓存,源码中的`BlockManager`负责数据存储和回收,`MemoryManager`则管理内存分配。 3. **DAG执行**:Spark的工作流程由DAGScheduler分解为Stage,再由Task...
内存管理在Spark中至关重要,它采用了一种称为Tachyon的内存计算存储层,以及一个称为BlockManager的组件来管理数据在内存和磁盘之间的移动。 Spark Streaming是Spark对实时数据流处理的扩展,它通过微批处理将流...
`Executor`通过`BlockManager`管理数据块,实现数据在内存和磁盘之间的移动,这是Spark高效缓存策略的基础。 `master.scala`则包含了主节点(Master Node)的实现,它是Spark集群的管理者,负责调度任务、监控...
3. **存储系统**:BlockManager如何管理内存和磁盘上的数据,以及如何与Shuffle服务交互。 4. **网络通信**:Akka在Spark中的作用,以及如何实现高效的节点间通信。 5. **性能优化**:Tungsten项目如何提升DataFrame...
- Shuffle过程是如何实现的,包括MapOutputTracker和BlockManager的角色。 - RDD(弹性分布式数据集)的创建、转换和行动操作的实现细节。 四、实战应用 1. 数据分析:通过Spark SQL进行ETL(提取、转换、加载)...
`org.apache.spark.shuffle`包下的`ShuffleManager`和`BlockManager`协同工作,确保数据正确地在Executor之间传递。 6. **Executor**:Executor是Spark运行任务的进程,负责执行计算并存储结果。`org.apache.spark....
此外,Task在执行过程中会利用BlockManager进行数据的缓存与读取,通过ShuffleFetcher进行Shuffle数据的获取等。 #### 结论 通过对Spark中Task执行期间的函数调用关系分析,我们可以更加深入地理解Spark的工作原理...
3. **源码解析**:深入Spark的源代码,讲解关键类和方法,如DAGScheduler、TaskScheduler、Storage层的BlockManager等,帮助理解Spark如何调度任务和管理数据。 4. **编程模型**:解释Spark的API使用,包括Scala、...
4. **Block Manager**:块管理器负责管理执行器上的缓存数据,包括内存管理和磁盘管理。 #### 四、Spark高级特性 **Spark提供的高级特性包括:** 1. **Broadcast Variables**:广播变量是一种只读的变量,可以在...
源码揭示了Shuffle过程中MapSide与ReduceSide的操作,以及如何通过BlockManager进行数据传输。 五、Spark SQL与DataFrame/Dataset API Spark SQL是Spark处理结构化数据的组件,其DataFrame和Dataset API提供了强大...
《Spark源码学习笔记》是针对大数据处理框架Spark进行深入研究的资料,主要涵盖了Spark的核心设计理念、架构原理以及源码解析。本笔记旨在帮助读者理解Spark如何高效地处理大规模数据,提升数据分析和处理的能力。 ...
`BlockManager`是Spark内部存储系统的核心组件,负责管理内存和磁盘上的数据块。同时,`org.apache.spark.shuffle.ShuffleManager`则负责数据的重排和交换,这是MapReduce模型的重要环节。 在计算模型上,Spark引入...