`
bit1129
  • 浏览: 1069507 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark三十七】Spark Cache机制

 
阅读更多

今天状态很差,很困,无精打采。学到的Spark知识,没有连贯起来,很多知识点有印象但是很模糊,说不出个123来。本来今天要看看cache,checkpoint和broadcast,结果今天到现在为止已经是5点了,还没有任何的进展。开始硬着头皮把Spark的Cache机制搞一搞吧,发现,cache机制比想象中的难驾驭。

 



 

 调用reduceByKey对应的ShuffledRDD对应的cache

 

cache不起作用

 

package spark.examples

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

object SparkWordCountCache {
  def main(args: Array[String]) {
    System.setProperty("hadoop.home.dir", "E:\\devsoftware\\hadoop-2.5.2\\hadoop-2.5.2");
    val conf = new SparkConf()
    conf.setAppName("SparkWordCount")
    conf.setMaster("local[3]")
    conf.set("spark.shuffle.manager", "hash"); ///hash是否有影响?
    val sc = new SparkContext(conf)
    val rdd1 = sc.textFile("file:///D:/word.in.3");
    val rdd2 = rdd1.flatMap(_.split(" "))
    val rdd3 = rdd2.map((_, 1))
    val rdd4 = rdd3.reduceByKey(_ + _, 3);
    rdd4.cache();
    rdd4.saveAsTextFile("file:///D:/wordout" + System.currentTimeMillis());
    val result = rdd4.collect; ///没有触发ShuffleMapTask执行,但是依然需要从ShuffleMapTask产生的结果拉取数据
    result.foreach(println(_));
    sc.stop
  }
}

 

以上代码调用rdd3.cache(),而rdd3是一个ShuffleMapRDD,也就是说,保存的是Stage2里面的RDD结果。此时调用cache.collect时,产生的Task都是ResultTask,也就是说,由于cache作用,最后一个Job并没有从前面从头计算?

感觉不对,即使不用cache,也应该不会从头计算吧

 

经验证,感觉是对的,将上面的代码做如下修改,结果一样,最后也不会调用ShuffleMapTask,但是在执行ResultTask时,还是会从MapTask的输出中拉取数据,所以并没有对Shuffle读过程进行简化。

 

    rdd3.saveAsTextFile("file:///D:/wordout" + System.currentTimeMillis());
    val result = rdd3.collect;
    result.foreach(println(_));

 

上来就踩了个cache的坑!Spark是不支持ShuffleMapRDD的cache的,虽然上面不需要ShuffleMapTask,但是ResultTask运行时,依然需要从MapTask的结果中拉取数据

 

 

调用groupByKey对应的ShuffledRDD对应的cache

 

结果rdd.cache起作用了

 

package spark.examples

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SparkGroupByExample {

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("GroupByKey").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setCheckpointDir("/tmp/checkpoint/" + System.currentTimeMillis())

    val data = Array[(Int, Char)]((1, 'a'), (2, 'b'),
      (3, 'c'), (4, 'd'),
      (5, 'e'), (3, 'f'),
      (2, 'g'), (1, 'h')
    )
    val pairs = sc.parallelize(data)
    val rdd = pairs.groupByKey(2)
    rdd.cache
    rdd.count;
    rdd.collect.foreach(println(_));
  }
}

 

 

 

调用textFile对应的MappedRDD对应的cache操作

 

基本流程:假如在一个程序中有两个Job。第一个Job运行时,,对于调用了cache的RDD首先计算它的数据,然后写入cache。第二个job在运行时,会直接从cache中读取。

这对于迭代计算的Job,会非常适合,将上个任务的结果缓存,供第二个任务使用,然后依次类推

 

 

package spark.examples

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

object SparkWordCountCache {
  def main(args: Array[String]) {
    System.setProperty("hadoop.home.dir", "E:\\devsoftware\\hadoop-2.5.2\\hadoop-2.5.2");
    val conf = new SparkConf()
    conf.setAppName("SparkWordCount")
    conf.setMaster("local")
    //Hash based Shuffle;
    conf.set("spark.shuffle.manager", "hash");
    val sc = new SparkContext(conf)
    val rdd1 = sc.textFile("file:///D:/word.in.3");
    rdd1.cache() ///数据读取后即做cache,第一个job运行后,就会缓存
    val rdd2 = rdd1.flatMap(_.split(" "))
    val rdd3 = rdd2.map((_, 1))
    val result = rdd3.collect; ///打印rdd3的内容
    result.foreach(println(_));
    val rdd4 = rdd3.reduceByKey(_ + _); ///对rdd3做reduceByKey操作
    rdd4.saveAsTextFile("file:///D:/wordout" + System.currentTimeMillis());
    sc.stop
  }
}

 

 

源代码基本流程:

 

  • 调用RDD的iterator方法,计算RDD的数据集合(得到的是一个可迭代的集合)
  • 在RDD的iterator方法中,检查RDD的storage level,如果设置了storage level,那么调用SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
  • 在CacheManager的getOrCompute方法中,

           a.首先判断是否存在于cache中,如果存在则直接返回,

           b.如果不存在,则调用  val computedValues = rdd.computeOrReadCheckpoint(partition, context)进行计算。

           c.计算结束后,调用CacheManager自身的putInBlockManager将计算得到的数据缓存

           d. 数据放入BlockManager后,还需要更新这个RDD和BlockManager之间的对应关系,以便下次再计算这个RDD时,检查RDD数据是否已经缓存

 

主要源代码

 

 1. getOrCompute方法

 

/** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
  def getOrCompute[T](
      rdd: RDD[T],
      partition: Partition,
      context: TaskContext,
      storageLevel: StorageLevel): Iterator[T] = {

    val key = RDDBlockId(rdd.id, partition.index) //RDD的id和partition的index构造RDDBlockId,一个RDD可以有多个partition
    logDebug(s"Looking for partition $key")
    blockManager.get(key) match { ///从blockManger中根据key查找,key最后会存入BlockManager么吗?BlockManager管理Spark的块信息
      case Some(blockResult) =>
        // Partition is already materialized, so just return its values
        context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
        new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])

      case None =>
        // Acquire a lock for loading this partition
        // If another thread already holds the lock, wait for it to finish return its results
        val storedValues = acquireLockForPartition[T](key) ///根据Key获取缓存的数据,acquireLockForPartition名字起得不好
        if (storedValues.isDefined) { ///找到数据
          return new InterruptibleIterator[T](context, storedValues.get)
        }

        // Otherwise, we have to load the partition ourselves
        ///为找到缓存的数据,表明是job第一次运行
        try { 
          logInfo(s"Partition $key not found, computing it")
          val computedValues = rdd.computeOrReadCheckpoint(partition, context) ///计算RDD数据

          // If the task is running locally, do not persist the result
          if (context.isRunningLocally) { ///如果数据在本地,就不需要缓存了?
            return computedValues
          }

          // Otherwise, cache the values and keep track of any updates in block statuses
          ///缓存数据
          val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
          
          ///将数据存入BlockManager,注意四个参数
          val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
          
          ///这是什么意思?任务的metrics,任务的
          val metrics = context.taskMetrics
          val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
          metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
          new InterruptibleIterator(context, cachedValues)

        } finally {
          loading.synchronized {
            loading.remove(key)
            loading.notifyAll()
          }
        }
    }
  }

 

2. putInBlockManager方法

/**
   * Cache the values of a partition, keeping track of any updates in the storage statuses of
   * other blocks along the way.
   *
   * The effective storage level refers to the level that actually specifies BlockManager put
   * behavior, not the level originally specified by the user. This is mainly for forcing a
   * MEMORY_AND_DISK partition to disk if there is not enough room to unroll the partition,
   * while preserving the the original semantics of the RDD as specified by the application.
   */
  private def putInBlockManager[T](
      key: BlockId,
      values: Iterator[T],
      level: StorageLevel,
      updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
      effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {

    val putLevel = effectiveStorageLevel.getOrElse(level)
    if (!putLevel.useMemory) {
      /*
       * This RDD is not to be cached in memory, so we can just pass the computed values as an
       * iterator directly to the BlockManager rather than first fully unrolling it in memory.
       */
      updatedBlocks ++=
        blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
      blockManager.get(key) match {
        case Some(v) => v.data.asInstanceOf[Iterator[T]]
        case None =>
          logInfo(s"Failure to store $key")
          throw new BlockException(key, s"Block manager failed to return cached value for $key!")
      }
    } else {
      /*
       * This RDD is to be cached in memory. In this case we cannot pass the computed values
       * to the BlockManager as an iterator and expect to read it back later. This is because
       * we may end up dropping a partition from memory store before getting it back.
       *
       * In addition, we must be careful to not unroll the entire partition in memory at once.
       * Otherwise, we may cause an OOM exception if the JVM does not have enough space for this
       * single partition. Instead, we unroll the values cautiously, potentially aborting and
       * dropping the partition to disk if applicable.
       */
      blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
        case Left(arr) =>
          // We have successfully unrolled the entire partition, so cache it in memory
          updatedBlocks ++=
            blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
          arr.iterator.asInstanceOf[Iterator[T]]
        case Right(it) =>
          // There is not enough space to cache this partition in memory
          val returnValues = it.asInstanceOf[Iterator[T]]
          if (putLevel.useDisk) {
            logWarning(s"Persisting partition $key to disk instead.")
            val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
              useOffHeap = false, deserialized = false, putLevel.replication)
            putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
          } else {
            returnValues
          }
      }
    }
  }

 

 

  • 大小: 76.9 KB
分享到:
评论
1 楼 jchubby 2017-02-20  
关于第一个reduceByKey对应的cache,shuffle之后的rdd,spark会自动保存该结果,直到这个rdd短时间内不会再被使用或者被垃圾回收了,所以用不用cache,collect这个job的第一个map stage都会被skip

相关推荐

    七个pdf理解spark系列_6-CacheAndCheckpoint

    GitHub上某位大牛JerryLead对Spark的理解,大量图示,生动形象,总共7个pdf,看完对spark的原理,运行机制以及后续性能调优有很大的帮助,这是第六个pdf,讲述了cache、checkpoint的实现、使用等常见问题

    spark下实现wordcount

    #### 三、深入理解 Spark WordCount ##### 1. RDD 概念 - **RDD(Resilient Distributed Dataset)**:弹性分布式数据集,是 Spark 的核心抽象。 - **Transformation**:如 `map`, `filter`, `flatMap`, `reduceBy...

    spark-scala-api

    1. **Spark编程模型**:Spark基于DAG(有向无环图)的任务调度机制,使得数据处理任务可以被分解为多个阶段,每个阶段由一系列的算子(如map、filter、reduce等)组成。这种模型提高了任务执行的效率,因为重复计算...

    spark 2.0.1 JavaAPI

    - **错误恢复**:Spark的容错机制基于检查点和数据复制,当任务失败时,可以从已保存的状态重新计算。 - **窄依赖和宽依赖**:理解这两种依赖关系对优化Spark作业的性能至关重要,窄依赖可以在并行化时最大限度地...

    spark example 2.2.0版本 maven项目

    Spark允许我们使用persist()或cache()将数据存储在内存中,以加快后续操作的速度。同时,可以设置不同的持久化级别,如MEMORY_ONLY, MEMORY_AND_DISK等。 12. **故障恢复**: Spark的容错机制主要基于检查点和RDD...

    Apache Spark的设计与实现 PDF中文版

    本文档面向的是希望对 Spark 设计与实现机制,以及大数据分布式处理框架深入了解的 Geeks。 因为 Spark 社区很活跃,更新速度很快,本文档也会尽量保持同步,文档号的命名与 Spark 版本一致,只是多了一位,最后一...

    Spark-core核心部分的用Elipse IDE软件编写得内容

    Spark Core通过检查点和血统机制实现容错。当某一分区数据丢失时,可以通过重做已知转换获取数据,或者从持久化的检查点恢复。 9. **交互式Shell** Spark提供了`spark-shell`,在Eclipse中,开发者可以借助`spark...

    spark2.1.0 JAVA API

    14. **错误处理与日志记录**: Spark提供了异常处理机制,并通过log4j进行日志记录,方便调试和监控应用状态。 以上就是Spark 2.1.0 Java API的主要知识点,通过掌握这些,开发者能够有效地利用Java编写Spark应用,...

    code: spark for python developer

    七、Spark性能优化 1. Partitioning:合理设置分区数量可以提高并行度,优化数据分布。 2. Caching:使用`.cache()`或`.persist()`缓存数据,减少重复计算。 3. 调整Executor和Driver资源:根据任务需求配置内存和...

    spark调优.rar

    10. **监控与调优工具**:利用Spark自带的Web UI监控任务执行情况,以及使用第三方工具如Ganglia、Prometheus等进行性能监控,帮助定位性能瓶颈。 通过以上调优策略,我们可以显著提升Spark应用的运行效率,减少...

    Spark内存使用机制分析.pptx

    Spark内存使用机制是Apache Spark高性能计算的关键组成部分,它在处理大规模数据时的高效性与内存管理密切相关。在Spark中,内存被分为多个部分,用于不同的功能,包括存储、执行和其他用途。以下是对Spark内存使用...

    A-Deeper-Understanding-of-Spark-Internals-Aaron-Davidson

    ### 深入理解Spark内部机制 #### 一、引言 随着大数据处理需求的日益增长,Apache Spark作为一款高效的大数据处理框架,受到了越来越多的关注。为了更好地掌握Spark的使用技巧,了解其内部运行机制至关重要。本文...

    apache spark的设计与实现

    Broadcast变量是在各Task间广播大而不变的数据的一种机制,它可以减少网络传输开销,提高程序执行效率。 #### 八、总结 通过对Apache Spark的设计与实现进行深入探讨,我们可以更好地理解其背后的技术原理和架构...

    编程指南快速入门 - Spark 2.4.0文档.pdf

    - 在Spark中,可以通过缓存机制来加速多次访问同一数据集的操作。 - 使用`.persist()`或`.cache()`方法对数据集进行缓存。 - **自包含的应用程序** - Spark支持构建自包含的应用程序,这些应用程序可以在不同的...

    spark RDD 论文 中文版

    #### 三、Spark编程接口 ##### 3.1 Spark中RDD的操作 - **转换操作**:包括`map`, `flatMap`, `filter`, `reduceByKey`等。 - **行动操作**:如`count`, `first`, `take`, `saveAsTextFile`等。 - **缓存操作**:...

    spark培训ppt

    - **容错机制**:Spark 通过记录 RDD 的转换过程来实现数据的自动恢复,而 Hadoop MapReduce 主要依赖于 HDFS 的数据冗余来实现容错。 ### Spark 编程模型 #### RDD 的特性 - **不可变性**:一旦创建,RDD 就不能...

    精通Spark内核

    精通Spark内核:此阶段聚焦于Spark内核的设计、实现和核心源码解析,对内核中的实现架构、运行原理、性能调优和核心源码各个击破: 1, 通过源码精通Spark内核实现和任务调度; 2,精通RDD、DAGScheduler、Task...

    Spark相关知识PPT

    此外,也会讨论Spark的缓存机制,通过`cache()`或`persist()`方法,将数据存储在内存中,实现快速重用。 3. **SparkSQL** SparkSQL是Spark的一个模块,它将SQL与DataFrame和Dataset API集成,使得开发者可以用SQL...

    Spark大数据处理数据性能优化学习

    另外,使用缓存机制(如persist或cache)可以将常用数据驻留在内存,减少重复计算。 执行计划优化是另一个重要环节。通过DAG(有向无环图)调度,Spark能够智能地规划任务执行顺序。用户还可以通过调整Shuffle操作...

    【Spark调优篇04】Spark之JVM调优1

    本文档主要讲解了 Spark 之 JVM 调优的相关知识点,涉及到 JVM 调优的重要性、缓存操作的内存占比、统一内存管理机制、Executor 堆外内存的调节、连接等待时长的设置等内容。 JVM 调优的重要性 在 Spark 中,JVM ...

Global site tag (gtag.js) - Google Analytics