`
zhangym195
  • 浏览: 124256 次
  • 性别: Icon_minigender_1
  • 来自: 黑龙江
社区版块
存档分类
最新评论

Spark 缓存管理-CacheManger彻底解密源码

阅读更多
  Spark之所以非常出色是基于RDD构成了一体化、多元化计算核心,所以就需要在处理多范式的计算时不需要部署多个框架,只需要一个团队一个技术堆栈就可以了解决所有大数据的计算问题,相对来说在软件、硬件上团队的投入都会降低,产出确又会很高。
    作为商业的本质属性来说:更低的成本,更高的产出永远都是对的,而且就目前来看当前Spark产能来说,虽然目前基于RDD上面有五大子框架,但其实Spark上面5%的产能都未发挥出来,未来将会有极大的提高空间。
    有些人一直以为Spark都会有只能基于内存进行计算的错误想法,其实1.2版本之前确有内存一些问题,但之后其实DAG才是他的性能的核心,好的调度和天然可以进行多步骤的迭代是其真正的核心能量。
    这其中CacheManger多步骤迭代型的算法 数据交互式的数据仓库的使用中位置至关重要、起着举足轻重的作用,管理的是内存中的数据。
 
、CacheManger分析:
    1,CacheManger管理的缓存可以是基于内存的缓存,也可以是基于磁盘的缓存;
    2,CacheManager需要通过BlockManager来操作数据;
    3,每当Task运行的时候会调用RDD的compute方法进行计算,而compute方法调用iterator方法:    
override def compute(split: Partition, context: TaskContext): Iterator[U] = 
 f(context, split.index, firstParent[T].iterator(split, context))
/**
* Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
* This should ''not'' be called by users directly, but is available for implementors of custom
* subclasses of RDD.
*/
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else {
computeOrReadCheckpoint(split, context)
}
}
    iterator方法先看是否已经对数据进行了缓存,如果有则先取缓存,没有才会去进行计算,而且自定义RDD时也可以复写它。
   if( storageLevel != StorageLevel.NONE ) 意味着RDD要本身的Storage Level要设置存储,默认是基于内存的,也可以放在磁盘或者Tachyon中,见下面的源码:        
class StorageLevel private(
private var _useDisk: Boolean, //磁盘
private var _useMemory: Boolean, //内存
private var _useOffHeap: Boolean, //Tachyon
Spark的缓存有可能在内存中、磁盘上或者是Tachyon上面。
二:CacheMaanger源码详解
 
    1,Cache在工作的时候会最大化的保留数据,但是数据不一定完整。
      如何理解这句话呢?因为当前的计算如果需要内存空间的话,那么Cache在内存中的数据必须让出空间,此时如果在RDD持久化的时候同时指定了可以把数据放在Disk上,那么部分Cache的数据就可以从内存转入磁盘(会drop到磁盘中否则的话就会丢失掉已经计算的并缓存的数据),否则的话数据就会丢失(当然丢失需要重要计算)!!!    
/** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](
rdd: RDD[T],
partition: Partition,
context: TaskContext,
storageLevel: StorageLevel): Iterator[T] = {

val key = RDDBlockId(rdd.id, partition.index)
logDebug(s"Looking for partition $key")
blockManager.get(key) match {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
val existingMetrics = context.taskMetrics().registerInputMetrics(blockResult.readMethod)
existingMetrics.incBytesRead(blockResult.bytes)

val iter = blockResult.data.asInstanceOf[Iterator[T]]
new InterruptibleIterator[T](context, iter) {
override def next(): T = {
existingMetrics.incRecordsRead(1)
delegate.next()
}
}
case None =>
// Acquire a lock for loading this partition
// If another thread already holds the lock, wait for it to finish return its results
val storedValues = acquireLockForPartition[T](key)
if (storedValues.isDefined) {
return new InterruptibleIterator[T](context, storedValues.get)
}

// Otherwise, we have to load the partition ourselves
try {
logInfo(s"Partition $key not found, computing it")
val computedValues = rdd.computeOrReadCheckpoint(partition, context)
val cachedValues = putInBlockManager(key, computedValues, storageLevel)
new InterruptibleIterator(context, cachedValues)
} finally {
loading.synchronized {
loading.remove(key)
loading.notifyAll()
}
}
}
}
这样我们可以看到,Cache并不是可靠的。
 
    2,CacheManager在获得缓存数据的时候,会通过BlockManger来抓到数据,进行Cache后BlockManager进行管理,通过这个Key就能够获得缓存的数据。
logInfo(s"Finished waiting for $id")
val values = blockManager.get(id)
if (!values.isDefined) {
/**
* Get a block from the block manager (either local or remote).
*/
def get(blockId: BlockId): Option[BlockResult] = {
val local = getLocal(blockId)
if (local.isDefined) {
logInfo(s"Found block $blockId locally")
return local
}
val remote = getRemote(blockId)
if (remote.isDefined) {
logInfo(s"Found block $blockId remotely")
return remote
}
None
}
    本地有的话,数据本地性原则,先去本地获取,通过blockId 无论是在本地 或者是在远程都会获得回来。
val tLevel = StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)
 
val key = RDDBlockId(rdd.id, partition.index)
logDebug(s"Looking for partition $key")
blockManager.get(key) match {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
.....
}
case None =>
// Acquire a lock for loading this partition
// If another thread already holds the lock, wait for it to finish return its results
val storedValues = acquireLockForPartition[T](key)
if (storedValues.isDefined) {
return new InterruptibleIterator[T](context, storedValues.get)
}
 
   如果是None的话,那么说明缓存已经丢失了,那么为什么还要acquireLockForPartition呢?因为 还可能有其他线程在操作,为什么一个Partition还可能有其他线程在操作呢?
    那是因为Spark有一个慢任务(straggle task)的推测的功能,当启动这个推测功能时候,对一个Partition就会启动两个任务在两台机器上,这样在当前机器上和远程上都没有发现这个内容,可能说明你在返回时这个任务已经计算完了。

 

3,如果 CacheManager没有通过BlockManger获得缓存内容的话,此时会通过RDD的如下方法:  val computedValues = rdd.computeOrReadCheckpoint(partition, context)
来获得数据;见CacheManager.scala 中67行的代码:
// Otherwise, we have to load the partition ourselves
try {
logInfo(s"Partition $key not found, computing it")
val computedValues = rdd.computeOrReadCheckpoint(partition, context)
val cachedValues = putInBlockManager(key, computedValues, storageLevel)
new InterruptibleIterator(context, cachedValues)
} finally {
loading.synchronized {
loading.remove(key)
loading.notifyAll()
}
}
    上述方法首先查看当前的RDD是否进行了CheckPoint,不会马上进行计算的,如果做了的话就会直接读取checkPoint的数据(所以说Checkpointt很重要,这样作业级别的迭代是非常有用的,不用重复计算),否则的话就必须进行计算;
    计算之后通过putInBlockManager会把数据按照StorageLevel重新缓存起来。下次就更有机率读到(为什么是有机率呢,因为有可能又丢了,内存够大很重要啊!!!)。
/**
* Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
*/
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointedAndMaterialized) {
firstParent[T].iterator(split, context)
} else {
compute(split, context)
}
}
 
    如果内存不够的话,会使用方法blockmanager.dropfromMemory先整理出一部分内存的空间出来 ,然后基于整理出来的内存空间放入我们要放入的最新数据。
   综上总结CacheManager流程:


 
 
笔记来源 DT大数据梦工厂 王家林老师 (微博:http://weibo.com/ilovepains)Spark课程,
学习整理:张玉明 ymzhang@foxmail.com Spark学习笔记。
 
  • 大小: 80 KB
3
2
分享到:
评论

相关推荐

    spark-3.1.3-bin-without-hadoop.tgz

    这个"spark-3.1.3-bin-without-hadoop.tgz"压缩包是Spark的3.1.3版本,不含Hadoop依赖的二进制发行版。这意味着在部署时,你需要自行配置Hadoop环境,或者在不依赖Hadoop的环境中运行Spark。 Spark的核心特性包括...

    spark-2.1.0-bin-without-hadoop版本的压缩包,直接下载到本地解压后即可使用

    在Ubuntu里安装spark,spark-2.1.0-bin-without-hadoop该版本直接下载到本地后解压即可使用。 Apache Spark 是一种用于大数据工作负载的分布式开源处理系统。它使用内存中缓存和优化的查询执行方式,可针对任何规模...

    spark资源 spark-2.3.2-bin-hadoop2.7 tgz文件

    spark资源 spark-2.3.2-bin-hadoop2.7 tgz文件

    spark-assembly-1.5.2-hadoop2.6.0.jar

    Spark-assembly-1.5.2-hadoop2.6.0.jar中的优化包括RDD(弹性分布式数据集)的缓存策略、Task调度优化、内存管理优化等,以确保在大数据处理中实现高效的性能。 7. 开发和调试: 开发者在本地开发时,可以直接...

    spark-3.1.2.tgz & spark-3.1.2-bin-hadoop2.7.tgz.rar

    Spark-3.1.2.tgz和Spark-3.1.2-bin-hadoop2.7.tgz是两个不同格式的Spark发行版,分别以tar.gz和rar压缩格式提供。 1. Spark核心概念: - RDD(弹性分布式数据集):Spark的基础数据结构,是不可变、分区的数据集合...

    spark-2.4.7-bin-hadoop2.6.tgz

    - `bin/`:包含可执行文件,如`spark-submit`,`pyspark`,`spark-shell`等,用于启动和管理Spark作业。 - `conf/`:存放配置文件,如`spark-defaults.conf`,用户可以在此自定义Spark的默认配置。 - `jars/`:包含...

    编译的spark-hive_2.11-2.3.0和 spark-hive-thriftserver_2.11-2.3.0.jar

    spark-hive_2.11-2.3.0 spark-hive-thriftserver_2.11-2.3.0.jar log4j-2.15.0.jar slf4j-api-1.7.7.jar slf4j-log4j12-1.7.25.jar curator-client-2.4.0.jar curator-framework-2.4.0.jar curator-recipes-2.4.0....

    spark-1.6.0-bin-hadoop2.6.tgz

    Spark-1.6.0-bin-hadoop2.6.tgz 是针对Linux系统的Spark安装包,包含了Spark 1.6.0版本以及与Hadoop 2.6版本兼容的构建。这个安装包为在Linux环境中搭建Spark集群提供了必要的组件和库。 **1. Spark基础知识** ...

    spark-2.0.0-bin-hadoop2.6.tgz

    本资源是spark-2.0.0-bin-hadoop2.6.tgz百度网盘资源下载,本资源是spark-2.0.0-bin-hadoop2.6.tgz百度网盘资源下载

    spark-3.1.3-bin-hadoop3.2.tgz

    在这个特定的压缩包"spark-3.1.3-bin-hadoop3.2.tgz"中,我们得到了Spark的3.1.3版本,它已经预编译为与Hadoop 3.2兼容。这个版本的Spark不仅提供了源码,还包含了预编译的二进制文件,使得在Linux环境下快速部署和...

    spark-3.2.1-bin-hadoop2.7.tgz

    这个名为"spark-3.2.1-bin-hadoop2.7.tgz"的压缩包是Spark的一个特定版本,即3.2.1,与Hadoop 2.7版本兼容。在Linux环境下,这样的打包方式方便用户下载、安装和运行Spark。 Spark的核心设计理念是快速数据处理,...

    apache-doris-spark-connector-2.3_2.11-1.0.1

    Spark Doris Connector(apache-doris-spark-connector-2.3_2.11-1.0.1-incubating-src.tar.gz) Spark Doris Connector Version:1.0.1 Spark Version:2.x Scala Version:2.11 Apache Doris是一个现代MPP分析...

    spark-2.4.7-bin-without-hadoop

    为了使用"spark-2.4.7-bin-without-hadoop",你需要首先下载并解压提供的spark-2.4.7-bin-without-hadoop.tgz文件。解压后,你可以找到包含Spark所有组件的目录结构,包括Spark的可执行文件、配置文件以及相关的库...

    spark-3.2.0-bin-hadoop3.2.tgz

    这个压缩包"spark-3.2.0-bin-hadoop3.2.tgz"包含了Spark 3.2.0版本的二进制文件,以及针对Hadoop 3.2的兼容构建。 Spark的核心组件包括:Spark Core、Spark SQL、Spark Streaming、MLlib(机器学习库)和GraphX(图...

    spark-3.1.2-bin-hadoop2.7.tgz

    这个"spark-3.1.2-bin-hadoop2.7.tgz"是一个压缩包,包含了Spark 3.1.2版本,针对Hadoop 2.7优化的二进制发行版。在Linux环境下,这个版本的Spark可以与Hadoop生态系统无缝集成,用于大数据分析和处理任务。 Spark...

    spark-3.0.0-bin-hadoop3.2

    在本场景中,我们讨论的是Spark的3.0.0版本,与Hadoop3.2相结合的二进制发行版——"spark-3.0.0-bin-hadoop3.2"。这个压缩包是为了在Windows操作系统下运行Spark而设计的,因此标签明确指出它是适用于Windows平台的...

    spark-3.0.0-bin-hadoop2.7.tgz

    Spark-3.0.0-bin-hadoop2.7.tgz 是Spark 3.0.0版本的预编译二进制包,其中包含了针对Hadoop 2.7版本的兼容性构建。这个版本的发布对于数据科学家和大数据工程师来说至关重要,因为它提供了许多性能优化和新功能。 1...

    spark-hive-thriftserver_2.11-2.1.3-SNAPSHOT-123456.jar

    spark-hive-thriftserver_2.11-2.1.spark-hive-thrift

    spark-3.2.4-bin-hadoop3.2-scala2.13 安装包

    在本安装包“spark-3.2.4-bin-hadoop3.2-scala2.13”中,包含了用于运行Spark的核心组件以及依赖的Hadoop版本和Scala编程语言支持。以下是对这些关键组成部分的详细解释: 1. **Spark**: Spark的核心在于它的弹性...

    spark-streaming-kafka-0-8_2.11-2.4.0.jar

    spark-streaming-kafka-0-8_2.11-2.4.0.jar

Global site tag (gtag.js) - Google Analytics