cache和persist都是用于将一个RDD进行缓存的,这样在之后使用的过程中就不需要重新计算了,可以大大节省程序运行时间。
cache和persist的区别
基于Spark 1.4.1 的源码,可以看到
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): this.type = persist()
说明是cache()调用了persist(), 想要知道二者的不同还需要看一下persist函数:
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
可以看到persist()内部调用了persist(StorageLevel.MEMORY_ONLY),继续深入:
/**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. This can only be used to assign a new storage level if the RDD does not
* have a storage level set yet..
*/
def persist(newLevel: StorageLevel): this.type = {
// TODO: Handle changes of StorageLevel
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
throw new UnsupportedOperationException(
"Cannot change storage level of an RDD after it was already assigned a level")
}
sc.persistRDD(this)
// Register the RDD with the ContextCleaner for automatic GC-based cleanup
sc.cleaner.foreach(_.registerRDDForCleanup(this))
storageLevel = newLevel
this
}
可以看出来persist有一个 StorageLevel 类型的参数,这个表示的是RDD的缓存级别。
至此便可得出cache和persist的区别了:cache只有一个默认的缓存级别MEMORY_ONLY ,而persist可以根据情况设置其它的缓存级别。
RDD的缓存级别
顺便看一下RDD都有哪些缓存级别,查看 StorageLevel 类的源码:
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)
......
}
可以看到这里列出了12种缓存级别,但这些有什么区别呢?可以看到每个缓存级别后面都跟了一个StorageLevel的构造函数,里面包含了4个或5个参数,如下
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
查看其构造函数
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)
extends Externalizable {
......
def useDisk: Boolean = _useDisk
def useMemory: Boolean = _useMemory
def useOffHeap: Boolean = _useOffHeap
def deserialized: Boolean = _deserialized
def replication: Int = _replication
......
}
可以看到StorageLevel类的主构造器包含了5个参数:
- useDisk:使用硬盘(外存)
- useMemory:使用内存
- useOffHeap:使用堆外内存,这是Java虚拟机里面的概念,堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。
- deserialized:反序列化,其逆过程序列化(Serialization)是java提供的一种机制,将对象表示成一连串的字节;而反序列化就表示将字节恢复为对象的过程。序列化是对象永久化的一种机制,可以将对象及其属性保存起来,并能在反序列化后直接恢复这个对象
- replication:备份数(在多个节点上备份)
理解了这5个参数,StorageLevel 的12种缓存级别就不难理解了。
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) 就表示使用这种缓存级别的RDD将存储在硬盘以及内存中,使用序列化(在硬盘中),并且在多个节点上备份2份(正常的RDD只有一份)
另外还注意到有一种特殊的缓存级别
val OFF_HEAP = new StorageLevel(false, false, true, false)
使用了堆外内存,StorageLevel 类的源码中有一段代码可以看出这个的特殊性,它不能和其它几个参数共存。
if (useOffHeap) {
require(!useDisk, "Off-heap storage level does not support using disk")
require(!useMemory, "Off-heap storage level does not support using heap memory")
require(!deserialized, "Off-heap storage level does not support deserialized storage")
require(replication == 1, "Off-heap storage level does not support multiple replication")
}
http://blog.csdn.net/houmou/article/details/52491419
相关推荐
Spark允许我们使用persist()或cache()将数据存储在内存中,以加快后续操作的速度。同时,可以设置不同的持久化级别,如MEMORY_ONLY, MEMORY_AND_DISK等。 12. **故障恢复**: Spark的容错机制主要基于检查点和RDD...
- **Persistence**: 通过`persist()`或`cache()`方法可以将RDD的数据持久化在内存中,以加速后续的计算过程。 ##### 3. Driver Program - **初始化SparkContext**: 创建`SparkContext`实例,初始化Spark应用程序的...
- **持久化**:通过.cache()或.persist()方法,可以将RDD存储在内存或磁盘上,提高重用效率。 - **错误恢复**:Spark的容错机制基于检查点和数据复制,当任务失败时,可以从已保存的状态重新计算。 - **窄依赖和宽...
8. **持久化策略**:RDD可以通过`cache`或`persist`方法缓存到内存或磁盘,以加速后续的重复计算。 9. **错误恢复**:Spark通过检查点(Checkpointing)和故障恢复机制来应对节点失败,确保数据处理的可靠性。 10....
Eclipse中可以通过`persist`或`cache`方法标记需要缓存的RDD。 通过以上内容,我们可以理解Spark Core在Eclipse IDE中的开发过程,以及如何利用其特性进行高效的数据处理。实践中,开发者还需要根据具体业务需求,...
- 使用`.persist()`或`.cache()`方法对数据集进行缓存。 - **自包含的应用程序** - Spark支持构建自包含的应用程序,这些应用程序可以在不同的环境中运行而无需额外配置。 - **从这里往哪儿走** - 掌握基础后,...
Spark是Apache Hadoop生态系统中的一个快速、通用且可扩展的大数据处理框架,它以其高效的内存计算和DAG(有向无环图)执行模型而闻名。Spark调优是提升Spark应用性能的关键步骤,涉及到多个层面,包括配置优化、...
2. Caching:使用`.cache()`或`.persist()`缓存数据,减少重复计算。 3. 调整Executor和Driver资源:根据任务需求配置内存和CPU。 4. 动态资源调度:启用动态分配,根据任务需求自动调整资源。 八、Spark MLlib 1. ...
Persist和Cache是RDD持久化策略,用于优化性能和资源利用。 在实时处理方面,Spark Streaming提供了DStream API,支持对实时数据流进行处理。DStream是一系列连续的RDD,可以处理包括文件系统、消息队列和实时数据...
- **缓存中间结果**:使用 `persist` 或 `cache` 保留中间结果。 - **合理设置并行度**:根据集群资源调整任务的并行程度。 #### 四、总结 通过上述步骤,我们成功地在 Spark 环境下实现了 WordCount。这个过程...
此外,也会讨论Spark的缓存机制,通过`cache()`或`persist()`方法,将数据存储在内存中,实现快速重用。 3. **SparkSQL** SparkSQL是Spark的一个模块,它将SQL与DataFrame和Dataset API集成,使得开发者可以用SQL...
为了提高性能,Spark提供了RDD持久化机制,即通过调用`persist`或`cache`方法可以将RDD保存在内存中,避免重复计算。这种策略对于多次访问相同数据的情况特别有用,能够显著提升执行效率。 ### Spark的...
另外,使用缓存机制(如persist或cache)可以将常用数据驻留在内存,减少重复计算。 执行计划优化是另一个重要环节。通过DAG(有向无环图)调度,Spark能够智能地规划任务执行顺序。用户还可以通过调整Shuffle操作...
- **Value 型 Transformation 算子** 包括 `map`, `flatMap`, `mapPartitions`, `union`, `groupBy`, `filter`, `distinct`, `sample`, `cache`, 和 `persist`。 - **Key-Value 型 Transformation 算子** 包括 `...
13. **持久化**: 使用`cache()`或`persist()`方法可以缓存RDD,提高重用效率。不同级别的持久化策略如MEMORY_ONLY, MEMORY_AND_DISK等可以根据需求选择。 14. **错误处理与日志记录**: Spark提供了异常处理机制,并...
本篇文章主要探讨Spark中的RDD缓存和checkpoint机制。 首先,RDD缓存是Spark提升性能的关键特性。通过调用`persist()`或`cache()`方法,我们可以将一个RDD的计算结果存储在内存中,以便后续的操作可以快速访问,...
- `cache()`:默认为MEMORY_ONLY存储级别,即只缓存在内存中。 #### 五、总结 RDD作为Spark的核心数据结构,提供了一种高度灵活的方式来处理大规模数据集。通过利用其丰富的转换和行动操作,开发者可以轻松地实现...
1. DAGScheduler 划分 Stage(TaskSet),记录哪个 RDD 或者 Stage 输出被物化(缓存),通常在一个复杂的 shuffle 之后,通常物化一下(cache、persist),方便之后的计算。 2. 重新提交出错/失败的 Stage(shuffle...
使用`cache`或`persist`将中间结果存储在内存中,避免重复计算;还可以设置合适的executor数量、内存大小和核心数来适应不同的计算资源。 7. 结果可视化: 在分析完成后,可以将结果导出到支持Spark SQL的BI工具,...