`

源码跟踪executor如何写数据到blockmanager, 以及如何从blockmanager读数据

阅读更多
之前看了Job怎么submit 以及最后run的, 然后也看了blockmanager是怎么工作的, 那么接下来就是要看spark是如何从blockManager中读写数据的。 

首先每个计算都是在对应机器的executor的task上面运行的, 那么计算完后也是从executor端开始写入的, 根据之前文章的解析, 我们知道最后Task是在executor的TaskRunner中执行的, 其中在数据操作端, 计算完成后如果resultSize大于Akka可以传输的size的话, 就会存储到block中, 然后通过Driver这边的taskscheduler来从executor端的blockmanager中获取对应block的信息, executor的taskrunner中是通过这段代码来存数据的:
val serializedResult: ByteBuffer = {
          if (maxResultSize > 0 && resultSize > maxResultSize) {
            logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
              s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
              s"dropping it.")
            ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
          } else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
            val blockId = TaskResultBlockId(taskId)
            env.blockManager.putBytes(
              blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
            logInfo(
              s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
            ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
          } else {
            logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
            serializedDirectResult
          }
        }

        execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)


可以看到如果数据小的话直接就返回给Driver了, 如果数据大的话, 那么就通过
env.blockManager.putBytes(
              blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)

放到对应的block里面, 等Driver端来取。 这里直接调用了blockManager的putBytes方法 (这个env是slave端的, 所以blockmanager也是slave端的), 看下putbytes:
  def putBytes(
      blockId: BlockId,
      bytes: ByteBuffer,
      level: StorageLevel,
      tellMaster: Boolean = true,
      effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
    require(bytes != null, "Bytes is null")
    doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
  }

直接调用了doPut方法, 这个方法蛮长的, 具体可以慢慢看, 挑几个重点的地方写一下:
private def doPut(
      blockId: BlockId,
      data: BlockValues,
      level: StorageLevel,
      tellMaster: Boolean = true,
      effectiveStorageLevel: Option[StorageLevel] = None)
    : Seq[(BlockId, BlockStatus)] = { 
	
	...
	
	 val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
	 
	 
	 //获取BlockInfo如果已经存在则获取, 不然创建新的
	val putBlockInfo = {
      val tinfo = new BlockInfo(level, tellMaster)
      // Do atomically !
      val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
      if (oldBlockOpt.isDefined) {
        if (oldBlockOpt.get.waitForReady()) {
          logWarning(s"Block $blockId already exists on this machine; not re-adding it")
          return updatedBlocks
        }
        // TODO: So the block info exists - but previous attempt to load it (?) failed.
        // What do we do now ? Retry on it ?
        oldBlockOpt.get
      } else {
        tinfo
      }
    }
	
	...
	
	putBlockInfo.synchronized {
	
	...
	//根据配置, 来确定是用memorystore还是diskstore或者externalBlockStore
	        val (returnValues, blockStore: BlockStore) = {
          if (putLevel.useMemory) {
            // Put it in memory first, even if it also has useDisk set to true;
            // We will drop it to disk later if the memory store can't hold it.
            (true, memoryStore)
          } else if (putLevel.useOffHeap) {
            // Use external block store
            (false, externalBlockStore)
          } else if (putLevel.useDisk) {
            // Don't get back the bytes from put unless we replicate them
            (putLevel.replication > 1, diskStore)
          } else {
            assert(putLevel == StorageLevel.NONE)
            throw new BlockException(
              blockId, s"Attempted to put block $blockId without specifying storage level!")
          }
        }
	
	...
	
	//根据不一样的类型, 存放数据到memory中或者放到磁盘上
	        val result = data match {
          case IteratorValues(iterator) =>
            blockStore.putIterator(blockId, iterator, putLevel, returnValues)
          case ArrayValues(array) =>
            blockStore.putArray(blockId, array, putLevel, returnValues)
          case ByteBufferValues(bytes) =>
            bytes.rewind()
            blockStore.putBytes(blockId, bytes, putLevel)
	
	...
	
	        val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
        if (putBlockStatus.storageLevel != StorageLevel.NONE) {
          // Now that the block is in either the memory, externalBlockStore, or disk store,
          // let other threads read it, and tell the master about it.
          marked = true
          putBlockInfo.markReady(size)
          if (tellMaster) {
            reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
          }
          updatedBlocks += ((blockId, putBlockStatus))
        }
	
	
	...
	
	}
	
	...
	
	}

可以看到代码里面是调用了各种store的putBytes方法 (或者putIterator, putArray)

那么我们拿memorystore来看一下, putBytes方法:
   override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = {
    // Work on a duplicate - since the original input might be used elsewhere.
    val bytes = _bytes.duplicate()
    bytes.rewind()
	
	//如果选择memoryonly, 则值为false, 来源StorageLevel定义:
	//val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
    if (level.deserialized) {
      val values = blockManager.dataDeserialize(blockId, bytes)
	  
	  存放数据
      putIterator(blockId, values, level, returnValues = true)
    } else {
      val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
	  
	  //存放数据
      tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks)
      PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks)
    }
  }



可以看到如果是选择MemoryOnly的话就会去执行tryToPut方法存放数据, 看一下这个方法怎么做的:
 private def tryToPut(
      blockId: BlockId,
      value: Any,
      size: Long,
      deserialized: Boolean,
      droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
    tryToPut(blockId, () => value, size, deserialized, droppedBlocks)
  }

private def tryToPut(
      blockId: BlockId,
      value: () => Any,
      size: Long,
      deserialized: Boolean,
      droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {

    /* TODO: Its possible to optimize the locking by locking entries only when selecting blocks
     * to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has
     * been released, it must be ensured that those to-be-dropped blocks are not double counted
     * for freeing up more space for another block that needs to be put. Only then the actually
     * dropping of blocks (and writing to disk if necessary) can proceed in parallel. */

    memoryManager.synchronized {
      // Note: if we have previously unrolled this block successfully, then pending unroll
      // memory should be non-zero. This is the amount that we already reserved during the
      // unrolling process. In this case, we can just reuse this space to cache our block.
      // The synchronization on `memoryManager` here guarantees that the release and acquire
      // happen atomically. This relies on the assumption that all memory acquisitions are
      // synchronized on the same lock.
      releasePendingUnrollMemoryForThisTask()
	  
	  //判断是否有足够内存
      val enoughMemory = memoryManager.acquireStorageMemory(blockId, size, droppedBlocks)
      if (enoughMemory) {
        // We acquired enough memory for the block, so go ahead and put it
		//如果有足够内存, 那么直接存到内存里, 其实就是放到entries里面就算放到内存中了
		
        val entry = new MemoryEntry(value(), size, deserialized)
        entries.synchronized {
          entries.put(blockId, entry)
        }
        val valuesOrBytes = if (deserialized) "values" else "bytes"
        logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
          blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed)))
      } else {
        // Tell the block manager that we couldn't put it in memory so that it can drop it to
        // disk if the block allows disk storage.
		
		//如果没有足够内存, 那么就看我们是否允许将数据放到磁盘上, 我们选的MemoryOnly的话deserialized是false
		
        lazy val data = if (deserialized) {
          Left(value().asInstanceOf[Array[Any]])
        } else {
          deserialized 
        }
        val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data)
        droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
      }
      enoughMemory
    }
  }



到目前为止, 如果内存足够, 这个data就放到entries里面了, 那么接下来如果要从blockmanager里面获取数据呢, 比如说Driver端要从executor这边把数据拿回去, 我们看一下, 在taskschedulerimpl的statusUpdate是Driver端获取数据的方法, 里面调用了:
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)

这个方法是实际去取数据的, 我们看一下
def enqueueSuccessfulTask(
    taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
    getTaskResultExecutor.execute(new Runnable {
      override def run(): Unit = Utils.logUncaughtExceptions {
        try {
          val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
            case directResult: DirectTaskResult[_] =>
              if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
                return
              }
              // deserialize "value" without holding any lock so that it won't block other threads.
              // We should call it here, so that when it's called again in
              // "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value.
              directResult.value()
              (directResult, serializedData.limit())
            case IndirectTaskResult(blockId, size) =>
              if (!taskSetManager.canFetchMoreResults(size)) {
                // dropped by executor if size is larger than maxResultSize
                sparkEnv.blockManager.master.removeBlock(blockId)
                return
              }
              logDebug("Fetching indirect task result for TID %s".format(tid))
              scheduler.handleTaskGettingResult(taskSetManager, tid)
              val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
              if (!serializedTaskResult.isDefined) {
                /* We won't be able to get the task result if the machine that ran the task failed
                 * between when the task ended and when we tried to fetch the result, or if the
                 * block manager had to flush the result. */
                scheduler.handleFailedTask(
                  taskSetManager, tid, TaskState.FINISHED, TaskResultLost)
                return
              }
              val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
                serializedTaskResult.get)
              sparkEnv.blockManager.master.removeBlock(blockId)
              (deserializedResult, size)
          }

          result.metrics.setResultSize(size)
          scheduler.handleSuccessfulTask(taskSetManager, tid, result)
        } catch {
          case cnf: ClassNotFoundException =>
            val loader = Thread.currentThread.getContextClassLoader
            taskSetManager.abort("ClassNotFound with classloader: " + loader)
          // Matching NonFatal so we don't catch the ControlThrowable from the "return" above.
          case NonFatal(ex) =>
            logError("Exception while getting task result", ex)
            taskSetManager.abort("Exception while getting task result: %s".format(ex))
        }
      }
    })
  }


看到了把, 如果返回的是IndirectTaskResult, 那么就会根据blockID去blockManager去拿:
val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)

这里的blockManager应该还是Driver端的blockmanager, 我们看一下getRemoteBytes方法:
  def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
    logDebug(s"Getting remote block $blockId as bytes")
    doGetRemote(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
  }


直接调用doGetRemote:
  private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
    require(blockId != null, "BlockId is null")
    val locations = Random.shuffle(master.getLocations(blockId))
    var numFetchFailures = 0
    for (loc <- locations) {
      logDebug(s"Getting remote block $blockId from $loc")
      val data = try {
        blockTransferService.fetchBlockSync(
          loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
      } catch {
        case NonFatal(e) =>
          numFetchFailures += 1
          if (numFetchFailures == locations.size) {
            // An exception is thrown while fetching this block from all locations
            throw new BlockFetchException(s"Failed to fetch block from" +
              s" ${locations.size} locations. Most recent failure cause:", e)
          } else {
            // This location failed, so we retry fetch from a different one by returning null here
            logWarning(s"Failed to fetch remote block $blockId " +
              s"from $loc (failed attempt $numFetchFailures)", e)
            null
          }
      }

      if (data != null) {
        if (asBlockResult) {
          return Some(new BlockResult(
            dataDeserialize(blockId, data),
            DataReadMethod.Network,
            data.limit()))
        } else {
          return Some(data)
        }
      }
      logDebug(s"The value of block $blockId is null")
    }
    logDebug(s"Block $blockId not found")
    None
  }


首先 会通过:
val locations = Random.shuffle(master.getLocations(blockId))
从master那边获取到blockId的所有location, 然后一个一个location这边取回, 取回的方式是通过:
blockTransferService.fetchBlockSync(
          loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
这个blockTransferService是我们在创建BlockManager的时候一起创建的然后和BlockManager一起初始化的。

这里通过NIO的方式从executor这边取回了数据。 总的存结果取结果的大致路径就是这个样子的, 拿了MemoryOnly做了一个列子, 其他方式就按这个流程跟踪一遍肯定明白。




0
0
分享到:
评论

相关推荐

    spark-blockmanager基础及源码彻底解析

    - **OneForOneBlockFetcher**:用于从远程BlockManager获取数据块,实现跨Executor的数据传输。 - **OneForOneStreamManager**:管理数据流状态,通过ConcurrentHashMap存储chunks,确保数据传输的正确性。 5. **...

    Spark2.2版本内核源码深度剖析.zip

    通过源码分析,我们可以了解到数据的分布策略以及BlockManager如何与其他组件如MemoryStore和DiskStore交互。 `CacheManager`(17-2,CacheManager源码剖析.html)则是Spark中的缓存系统,它负责数据的缓存策略和...

    Spark源码分析3-The connect between driver,master and excutor

    Executor之间通过`BlockManager`进行数据交换,使用`RDD`的分区信息在Executor之间进行数据传输。 5. **源码分析**: 在源码层面,`SparkContext`的初始化过程中,会创建`SparkDeploySchedulerBackend`,它实现了`...

    spark-广播变量基础及源码解析

    请求数据的时候会先获取 block 的所有存储位置信息请求服务是随机的在所有存储了该 Executor 的 BlockManager 去获取,避免了数据请求服务集中于一点。 结论 ---------- 广播变量是 Spark 中的一种高效的数据共享...

    Spark源码分析.pdf

    例如,DAGScheduler如何将作业拆分成Stage,TaskScheduler如何将任务分配到Executor,以及如何利用BlockManager和Storage层实现内存管理。此外,Shuffle服务、网络通信库Tachyon和Akka框架也是深入理解Spark的重要...

    Spark core 源码解读与扩展

    其中,BlockManager管理内存和磁盘上的数据块,ShuffleManager则负责shuffle过程中的数据交换。 5. **资源管理**:Spark支持多种资源管理方式,包括Standalone模式、Yarn、Mesos以及本地模式,不同的模式适用于不同...

    Spark原理及源码剖析1

    Executor还负责通过Block Manager为应用程序缓存RDD。 Spark支持三种集群管理模式: 1. Standalone:Spark的原生集群管理器,可单独部署,无需依赖其他资源管理系统。 2. Hadoop YARN:统一资源管理机制,可运行...

    mesos-spark-源码.rar

    2. **内存管理**:Spark通过`Storage`模块实现内存缓存,源码中的`BlockManager`负责数据存储和回收,`MemoryManager`则管理内存分配。 3. **DAG执行**:Spark的工作流程由DAGScheduler分解为Stage,再由Task...

    Apache Spark源码剖析

    内存管理在Spark中至关重要,它采用了一种称为Tachyon的内存计算存储层,以及一个称为BlockManager的组件来管理数据在内存和磁盘之间的移动。 Spark Streaming是Spark对实时数据流处理的扩展,它通过微批处理将流...

    spark0.6源码

    `Executor`通过`BlockManager`管理数据块,实现数据在内存和磁盘之间的移动,这是Spark高效缓存策略的基础。 `master.scala`则包含了主节点(Master Node)的实现,它是Spark集群的管理者,负责调度任务、监控...

    ApacheSpark设计与实现.pdf+ApacheSpark源码剖析.pdf+Spark原著中文版.pdf

    3. **存储系统**:BlockManager如何管理内存和磁盘上的数据,以及如何与Shuffle服务交互。 4. **网络通信**:Akka在Spark中的作用,以及如何实现高效的节点间通信。 5. **性能优化**:Tungsten项目如何提升DataFrame...

    spark大数据商业实战三部曲源码及资料.zip

    - Shuffle过程是如何实现的,包括MapOutputTracker和BlockManager的角色。 - RDD(弹性分布式数据集)的创建、转换和行动操作的实现细节。 四、实战应用 1. 数据分析:通过Spark SQL进行ETL(提取、转换、加载)...

    SPARK源代码

    `org.apache.spark.shuffle`包下的`ShuffleManager`和`BlockManager`协同工作,确保数据正确地在Executor之间传递。 6. **Executor**:Executor是Spark运行任务的进程,负责执行计算并存储结果。`org.apache.spark....

    Apache Spark源码走读之3 -- Task运行期之函数调用关系分析

    此外,Task在执行过程中会利用BlockManager进行数据的缓存与读取,通过ShuffleFetcher进行Shuffle数据的获取等。 #### 结论 通过对Spark中Task执行期间的函数调用关系分析,我们可以更加深入地理解Spark的工作原理...

    开源力量spark公开课的ppt

    3. **源码解析**:深入Spark的源代码,讲解关键类和方法,如DAGScheduler、TaskScheduler、Storage层的BlockManager等,帮助理解Spark如何调度任务和管理数据。 4. **编程模型**:解释Spark的API使用,包括Scala、...

    spark大数据实践

    4. **Block Manager**:块管理器负责管理执行器上的缓存数据,包括内存管理和磁盘管理。 #### 四、Spark高级特性 **Spark提供的高级特性包括:** 1. **Broadcast Variables**:广播变量是一种只读的变量,可以在...

    Spark-source-code-description-spark source code

    源码揭示了Shuffle过程中MapSide与ReduceSide的操作,以及如何通过BlockManager进行数据传输。 五、Spark SQL与DataFrame/Dataset API Spark SQL是Spark处理结构化数据的组件,其DataFrame和Dataset API提供了强大...

    spark-source-code-learn-note:火花学习笔记-spark source code

    《Spark源码学习笔记》是针对大数据处理框架Spark进行深入研究的资料,主要涵盖了Spark的核心设计理念、架构原理以及源码解析。本笔记旨在帮助读者理解Spark如何高效地处理大规模数据,提升数据分析和处理的能力。 ...

    Spark-dig-and-dig:Dig Spark的源代码-spark source code

    `BlockManager`是Spark内部存储系统的核心组件,负责管理内存和磁盘上的数据块。同时,`org.apache.spark.shuffle.ShuffleManager`则负责数据的重排和交换,这是MapReduce模型的重要环节。 在计算模型上,Spark引入...

Global site tag (gtag.js) - Google Analytics