`
zhangym195
  • 浏览: 123996 次
  • 性别: Icon_minigender_1
  • 来自: 黑龙江
社区版块
存档分类
最新评论

Spark 缓存管理-CacheManger彻底解密源码

阅读更多
  Spark之所以非常出色是基于RDD构成了一体化、多元化计算核心,所以就需要在处理多范式的计算时不需要部署多个框架,只需要一个团队一个技术堆栈就可以了解决所有大数据的计算问题,相对来说在软件、硬件上团队的投入都会降低,产出确又会很高。
    作为商业的本质属性来说:更低的成本,更高的产出永远都是对的,而且就目前来看当前Spark产能来说,虽然目前基于RDD上面有五大子框架,但其实Spark上面5%的产能都未发挥出来,未来将会有极大的提高空间。
    有些人一直以为Spark都会有只能基于内存进行计算的错误想法,其实1.2版本之前确有内存一些问题,但之后其实DAG才是他的性能的核心,好的调度和天然可以进行多步骤的迭代是其真正的核心能量。
    这其中CacheManger多步骤迭代型的算法 数据交互式的数据仓库的使用中位置至关重要、起着举足轻重的作用,管理的是内存中的数据。
 
、CacheManger分析:
    1,CacheManger管理的缓存可以是基于内存的缓存,也可以是基于磁盘的缓存;
    2,CacheManager需要通过BlockManager来操作数据;
    3,每当Task运行的时候会调用RDD的compute方法进行计算,而compute方法调用iterator方法:    
override def compute(split: Partition, context: TaskContext): Iterator[U] = 
 f(context, split.index, firstParent[T].iterator(split, context))
/**
* Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
* This should ''not'' be called by users directly, but is available for implementors of custom
* subclasses of RDD.
*/
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else {
computeOrReadCheckpoint(split, context)
}
}
    iterator方法先看是否已经对数据进行了缓存,如果有则先取缓存,没有才会去进行计算,而且自定义RDD时也可以复写它。
   if( storageLevel != StorageLevel.NONE ) 意味着RDD要本身的Storage Level要设置存储,默认是基于内存的,也可以放在磁盘或者Tachyon中,见下面的源码:        
class StorageLevel private(
private var _useDisk: Boolean, //磁盘
private var _useMemory: Boolean, //内存
private var _useOffHeap: Boolean, //Tachyon
Spark的缓存有可能在内存中、磁盘上或者是Tachyon上面。
二:CacheMaanger源码详解
 
    1,Cache在工作的时候会最大化的保留数据,但是数据不一定完整。
      如何理解这句话呢?因为当前的计算如果需要内存空间的话,那么Cache在内存中的数据必须让出空间,此时如果在RDD持久化的时候同时指定了可以把数据放在Disk上,那么部分Cache的数据就可以从内存转入磁盘(会drop到磁盘中否则的话就会丢失掉已经计算的并缓存的数据),否则的话数据就会丢失(当然丢失需要重要计算)!!!    
/** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](
rdd: RDD[T],
partition: Partition,
context: TaskContext,
storageLevel: StorageLevel): Iterator[T] = {

val key = RDDBlockId(rdd.id, partition.index)
logDebug(s"Looking for partition $key")
blockManager.get(key) match {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
val existingMetrics = context.taskMetrics().registerInputMetrics(blockResult.readMethod)
existingMetrics.incBytesRead(blockResult.bytes)

val iter = blockResult.data.asInstanceOf[Iterator[T]]
new InterruptibleIterator[T](context, iter) {
override def next(): T = {
existingMetrics.incRecordsRead(1)
delegate.next()
}
}
case None =>
// Acquire a lock for loading this partition
// If another thread already holds the lock, wait for it to finish return its results
val storedValues = acquireLockForPartition[T](key)
if (storedValues.isDefined) {
return new InterruptibleIterator[T](context, storedValues.get)
}

// Otherwise, we have to load the partition ourselves
try {
logInfo(s"Partition $key not found, computing it")
val computedValues = rdd.computeOrReadCheckpoint(partition, context)
val cachedValues = putInBlockManager(key, computedValues, storageLevel)
new InterruptibleIterator(context, cachedValues)
} finally {
loading.synchronized {
loading.remove(key)
loading.notifyAll()
}
}
}
}
这样我们可以看到,Cache并不是可靠的。
 
    2,CacheManager在获得缓存数据的时候,会通过BlockManger来抓到数据,进行Cache后BlockManager进行管理,通过这个Key就能够获得缓存的数据。
logInfo(s"Finished waiting for $id")
val values = blockManager.get(id)
if (!values.isDefined) {
/**
* Get a block from the block manager (either local or remote).
*/
def get(blockId: BlockId): Option[BlockResult] = {
val local = getLocal(blockId)
if (local.isDefined) {
logInfo(s"Found block $blockId locally")
return local
}
val remote = getRemote(blockId)
if (remote.isDefined) {
logInfo(s"Found block $blockId remotely")
return remote
}
None
}
    本地有的话,数据本地性原则,先去本地获取,通过blockId 无论是在本地 或者是在远程都会获得回来。
val tLevel = StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)
 
val key = RDDBlockId(rdd.id, partition.index)
logDebug(s"Looking for partition $key")
blockManager.get(key) match {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
.....
}
case None =>
// Acquire a lock for loading this partition
// If another thread already holds the lock, wait for it to finish return its results
val storedValues = acquireLockForPartition[T](key)
if (storedValues.isDefined) {
return new InterruptibleIterator[T](context, storedValues.get)
}
 
   如果是None的话,那么说明缓存已经丢失了,那么为什么还要acquireLockForPartition呢?因为 还可能有其他线程在操作,为什么一个Partition还可能有其他线程在操作呢?
    那是因为Spark有一个慢任务(straggle task)的推测的功能,当启动这个推测功能时候,对一个Partition就会启动两个任务在两台机器上,这样在当前机器上和远程上都没有发现这个内容,可能说明你在返回时这个任务已经计算完了。

 

3,如果 CacheManager没有通过BlockManger获得缓存内容的话,此时会通过RDD的如下方法:  val computedValues = rdd.computeOrReadCheckpoint(partition, context)
来获得数据;见CacheManager.scala 中67行的代码:
// Otherwise, we have to load the partition ourselves
try {
logInfo(s"Partition $key not found, computing it")
val computedValues = rdd.computeOrReadCheckpoint(partition, context)
val cachedValues = putInBlockManager(key, computedValues, storageLevel)
new InterruptibleIterator(context, cachedValues)
} finally {
loading.synchronized {
loading.remove(key)
loading.notifyAll()
}
}
    上述方法首先查看当前的RDD是否进行了CheckPoint,不会马上进行计算的,如果做了的话就会直接读取checkPoint的数据(所以说Checkpointt很重要,这样作业级别的迭代是非常有用的,不用重复计算),否则的话就必须进行计算;
    计算之后通过putInBlockManager会把数据按照StorageLevel重新缓存起来。下次就更有机率读到(为什么是有机率呢,因为有可能又丢了,内存够大很重要啊!!!)。
/**
* Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
*/
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointedAndMaterialized) {
firstParent[T].iterator(split, context)
} else {
compute(split, context)
}
}
 
    如果内存不够的话,会使用方法blockmanager.dropfromMemory先整理出一部分内存的空间出来 ,然后基于整理出来的内存空间放入我们要放入的最新数据。
   综上总结CacheManager流程:


 
 
笔记来源 DT大数据梦工厂 王家林老师 (微博:http://weibo.com/ilovepains)Spark课程,
学习整理:张玉明 ymzhang@foxmail.com Spark学习笔记。
 
  • 大小: 80 KB
3
2
分享到:
评论

相关推荐

    ysoserial-master.zip

    ysoserial是一个用于生成利用不安全的Java对象反序列化的有效负载的概念验证工具。它包含一系列在常见Java库中发现的"gadget chains",可以在特定条件下利用执行不安全的反序列化操作的Java应用程序。ysoserial项目最初在2015年AppSecCali会议上提出,包含针对Apache Commons Collections(3.x和4.x版本)、Spring Beans/Core(4.x版本)和Groovy(2.3.x版本)的利用链

    zigbee CC2530无线自组网协议栈系统代码实现协调器与终端的TI Sensor实验和Monitor使用.zip

    1、嵌入式物联网单片机项目开发例程,简单、方便、好用,节省开发时间。 2、代码使用IAR软件开发,当前在CC2530上运行,如果是其他型号芯片,请自行移植。 3、软件下载时,请注意接上硬件,并确认烧录器连接正常。 4、有偿指导v:wulianjishu666; 5、如果接入其他传感器,请查看账号发布的其他资料。 6、单片机与模块的接线,在代码当中均有定义,请自行对照。 7、若硬件有差异,请根据自身情况调整代码,程序仅供参考学习。 8、代码有注释说明,请耐心阅读。 9、例程具有一定专业性,非专业人士请谨慎操作。

    YOLO算法-自卸卡车-挖掘机-轮式装载机数据集-2644张图像带标签-自卸卡车-挖掘机-轮式装载机.zip

    YOLO系列算法目标检测数据集,包含标签,可以直接训练模型和验证测试,数据集已经划分好,包含数据集配置文件data.yaml,适用yolov5,yolov8,yolov9,yolov7,yolov10,yolo11算法; 包含两种标签格:yolo格式(txt文件)和voc格式(xml文件),分别保存在两个文件夹中,文件名末尾是部分类别名称; yolo格式:<class> <x_center> <y_center> <width> <height>, 其中: <class> 是目标的类别索引(从0开始)。 <x_center> 和 <y_center> 是目标框中心点的x和y坐标,这些坐标是相对于图像宽度和高度的比例值,范围在0到1之间。 <width> 和 <height> 是目标框的宽度和高度,也是相对于图像宽度和高度的比例值; 【注】可以下拉页面,在资源详情处查看标签具体内容;

    Oracle10gDBA学习手册中文PDF清晰版最新版本

    **Oracle 10g DBA学习手册:安装Oracle和构建数据库** **目的:** 本章节旨在指导您完成Oracle数据库软件的安装和数据库的创建。您将通过Oracle Universal Installer (OUI)了解软件安装过程,并学习如何利用Database Configuration Assistant (DBCA)创建附加数据库。 **主题概览:** 1. 利用Oracle Universal Installer (OUI)安装软件 2. 利用Database Configuration Assistant (DBCA)创建数据库 **第2章:Oracle软件的安装与数据库构建** **Oracle Universal Installer (OUI)的运用:** Oracle Universal Installer (OUI)是一个图形用户界面(GUI)工具,它允许您查看、安装和卸载机器上的Oracle软件。通过OUI,您可以轻松地管理Oracle软件的安装和维护。 **安装步骤:** 以下是使用OUI安装Oracle软件并创建数据库的具体步骤:

    消防验收过程服务--现场记录表.doc

    消防验收过程服务--现场记录表.doc

    (4655036)数据库 管理与应用 期末考试题 数据库试题

    数据库管理\09-10年第1学期数据库期末考试试卷A(改卷参考).doc。内容来源于网络分享,如有侵权请联系我删除。另外如果没有积分的同学需要下载,请私信我。

    YOLO算法-瓶纸盒合并数据集-3161张图像带标签-纸张-纸箱-瓶子.zip

    YOLO系列算法目标检测数据集,包含标签,可以直接训练模型和验证测试,数据集已经划分好,包含数据集配置文件data.yaml,适用yolov5,yolov8,yolov9,yolov7,yolov10,yolo11算法; 包含两种标签格:yolo格式(txt文件)和voc格式(xml文件),分别保存在两个文件夹中,文件名末尾是部分类别名称; yolo格式:<class> <x_center> <y_center> <width> <height>, 其中: <class> 是目标的类别索引(从0开始)。 <x_center> 和 <y_center> 是目标框中心点的x和y坐标,这些坐标是相对于图像宽度和高度的比例值,范围在0到1之间。 <width> 和 <height> 是目标框的宽度和高度,也是相对于图像宽度和高度的比例值; 【注】可以下拉页面,在资源详情处查看标签具体内容;

    职业暴露后的处理流程.docx

    职业暴露后的处理流程.docx

    Java Web开发短消息系统

    Java Web开发短消息系统

    java毕设项目之ssm基于java和mysql的多角色学生管理系统+jsp(完整前后端+说明文档+mysql+lw).zip

    项目包含完整前后端源码和数据库文件 环境说明: 开发语言:Java 框架:ssm,mybatis JDK版本:JDK1.8 数据库:mysql 5.7 数据库工具:Navicat11 开发软件:eclipse/idea Maven包:Maven3.3 服务器:tomcat7

    批量导出多项目核心目录工具

    这是一款可以配置过滤目录及过滤的文件后缀的工具,并且支持多个项目同时输出导出,并过滤指定不需要导出的目录及文件后缀。 导出后将会保留原有的路径,并在新的文件夹中体现。

    【图像压缩】基于matlab GUI DCT图像压缩(含MAX MED MIN NONE)【含Matlab源码 9946期】.zip

    Matlab领域上传的视频均有对应的完整代码,皆可运行,亲测可用,适合小白; 1、代码压缩包内容 主函数:main.m; 调用函数:其他m文件;无需运行 运行结果效果图; 2、代码运行版本 Matlab 2019b;若运行有误,根据提示修改;若不会,私信博主; 3、运行操作步骤 步骤一:将所有文件放到Matlab的当前文件夹中; 步骤二:双击打开main.m文件; 步骤三:点击运行,等程序运行完得到结果; 4、仿真咨询 如需其他服务,可私信博主; 4.1 博客或资源的完整代码提供 4.2 期刊或参考文献复现 4.3 Matlab程序定制 4.4 科研合作

    YOLO算法-挖掘机与火焰数据集-7735张图像带标签-挖掘机.zip

    YOLO算法-挖掘机与火焰数据集-7735张图像带标签-挖掘机.zip

    操作系统实验 Ucore lab5

    操作系统实验 Ucore lab5

    IMG_5950.jpg

    IMG_5950.jpg

    竞选报价评分表.docx

    竞选报价评分表.docx

    java系统,mysql、springboot等框架

    java系统,mysql、springboot等框架

    zigbee CC2530网关+4节点无线通讯实现温湿度、光敏、LED、继电器等传感节点数据的采集上传,网关通过ESP8266上传远程服务器及下发控制.zip

    1、嵌入式物联网单片机项目开发例程,简单、方便、好用,节省开发时间。 2、代码使用IAR软件开发,当前在CC2530上运行,如果是其他型号芯片,请自行移植。 3、软件下载时,请注意接上硬件,并确认烧录器连接正常。 4、有偿指导v:wulianjishu666; 5、如果接入其他传感器,请查看账号发布的其他资料。 6、单片机与模块的接线,在代码当中均有定义,请自行对照。 7、若硬件有差异,请根据自身情况调整代码,程序仅供参考学习。 8、代码有注释说明,请耐心阅读。 9、例程具有一定专业性,非专业人士请谨慎操作。

    YOLO算法-快递衣物数据集-496张图像带标签.zip

    YOLO系列算法目标检测数据集,包含标签,可以直接训练模型和验证测试,数据集已经划分好,包含数据集配置文件data.yaml,适用yolov5,yolov8,yolov9,yolov7,yolov10,yolo11算法; 包含两种标签格:yolo格式(txt文件)和voc格式(xml文件),分别保存在两个文件夹中,文件名末尾是部分类别名称; yolo格式:<class> <x_center> <y_center> <width> <height>, 其中: <class> 是目标的类别索引(从0开始)。 <x_center> 和 <y_center> 是目标框中心点的x和y坐标,这些坐标是相对于图像宽度和高度的比例值,范围在0到1之间。 <width> 和 <height> 是目标框的宽度和高度,也是相对于图像宽度和高度的比例值; 【注】可以下拉页面,在资源详情处查看标签具体内容;

    搜索引擎lucen的相关介绍 从事搜索行业程序研发、人工智能、存储等技术人员和企业

    内容概要:本文详细讲解了搜索引擎的基础原理,特别是索引机制、优化 like 前缀模糊查询的方法、建立索引的标准以及针对中文的分词处理。文章进一步深入探讨了Lucene,包括它的使用场景、特性、框架结构、Maven引入方法,尤其是Analyzer及其TokenStream的实现细节,以及自定义Analyzer的具体步骤和示例代码。 适合人群:数据库管理员、后端开发者以及希望深入了解搜索引擎底层实现的技术人员。 使用场景及目标:适用于那些需要优化数据库查询性能、实施或改进搜索引擎技术的场景。主要目标在于提高数据库的访问效率,实现高效的数据检索。 阅读建议:由于文章涉及大量的技术术语和实现细节,建议在阅读过程中对照实际开发项目,结合示例代码进行实践操作,有助于更好地理解和吸收知识点。

Global site tag (gtag.js) - Google Analytics