`
bit1129
  • 浏览: 1069663 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark八十三】BlockManager在Spark中的使用场景

 
阅读更多

1. Broadcast变量的存储,在HttpBroadcast类中可以知道

2. RDD通过CacheManager存储RDD中的数据,CacheManager也是通过BlockManager进行存储的

3. ShuffleMapTask得到的结果数据,是通过FileShuffleBlockManager进行管理的,而FileShuffleBlockManager最终也是使用BlockManager来对数据块进行管理

4. ResultTask的结果如果非常大,那么使用IndirectTaskResult作为返回结果给Driver,Driver通过IndirectTaskResult反查BlockManager获取数据,通过Netty的数据传输

代码在TaskRunner的run方法中。

 

       // directSend = sending directly back to the driver
        val serializedResult = {
          if (maxResultSize > 0 && resultSize > maxResultSize) {
            logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
              s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
              s"dropping it.")
            ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
          } else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { //如果结果大于Akka传输的数据量(默认10M)
            val blockId = TaskResultBlockId(taskId)
            env.blockManager.putBytes( //设置到BlockManager中,MEMORY_AND_DISK_SER表示
              blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
            logInfo(
              s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
            ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
          } else {
            logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
            serializedDirectResult
          }
        }

 

 5. Spark Streaming将接收到的数据首先写入到BlockManager中

 

关于StorageLevel的设置:

 

class StorageLevel private(
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1)

五个参数:

使用磁盘存储

使用内存存储

使用Tachyon存储

deserialzed表示存储的数据是否是原始数据(没有进行序列化)?

副本数

 

  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(false, false, true, false)

 

 

 

 

分享到:
评论

相关推荐

    spark大数据商业实战三部曲源码及资料.zip

    《Spark大数据商业实战三部曲》是一套深度探讨Apache Spark在商业应用中的实践教程,涵盖了Spark的核心技术及其在数据处理、分析和应用开发中的实际运用。这套资源包含了书中的源码和相关资料,旨在帮助读者深入理解...

    ApacheSpark设计与实现.pdf+ApacheSpark源码剖析.pdf+Spark原著中文版.pdf

    3. **存储系统**:BlockManager如何管理内存和磁盘上的数据,以及如何与Shuffle服务交互。 4. **网络通信**:Akka在Spark中的作用,以及如何实现高效的节点间通信。 5. **性能优化**:Tungsten项目如何提升DataFrame...

    Spark源码分析.pdf

    例如,DAGScheduler如何将作业拆分成Stage,TaskScheduler如何将任务分配到Executor,以及如何利用BlockManager和Storage层实现内存管理。此外,Shuffle服务、网络通信库Tachyon和Akka框架也是深入理解Spark的重要...

    spark 源码解读迷你书

    在阅读这本书之前,建议先搭建好Spark开发环境,比如使用Intelij IDEA这样的集成开发环境,以便在学习过程中能够进行实践操作,更好地理解和消化源码知识。 Spark作为一个分布式计算框架,其核心概念包括弹性分布式...

    spark2.02源码

    源码分析还包括对Spark的存储系统的研究,如BlockManager如何管理内存和磁盘上的数据块,以及如何与其他组件如CacheManager协同工作实现数据缓存。 总的来说,Spark 2.0.2的源码包含了许多关键的改进和优化,对于...

    Apache Spark源码走读之5 -- DStream处理的容错性分析

    3. **数据推送**: 在`BlockGenerator`中还有一个`BlockPushingThread`,它的任务是不断地将`blocksForPush`队列中的元素通过`pushArrayBuffer`函数传递给`BlockManager`。`BlockManager`随后将数据存储到`...

    spark-广播变量基础及源码解析

    HttpBroadcast 会在 Driver 端的 BlockManager 里面存储广播变量对象,并将该广播变量序列化写入文件中去。所有数据请求都在 Driver 端,所以存在单点故障和网络 Io 性能问题。 ### TorrentBroadcast ...

    Spark Standalone架构设计.docx

    在 Standalone 模式下,默认使用的是 FIFO 这种简单的调度策略,在进行调度的过程中,大概流程如下图所示:从用户提交 Spark 程序,最终生成 TaskSet,而在调度时,通过 TaskSetManager 来管理一个 TaskSet(包含一...

    spark:Executor分配详解

    Executor还可以通过BlockManager缓存中间结果,以提高后续任务的执行效率。 - **回收**:当任务执行完毕或资源不再需要时,Master可以决定回收特定的Executor,释放其占用的资源。 #### 三、数据管理与Shuffle过程 ...

    Spark Core介绍

    在存储方面,Spark Core利用BlockManager和CacheManager管理内存和磁盘上的数据块。BlockManager负责数据块的存储和检索,而DiskStore和MemoryStore分别处理磁盘和内存中的数据。此外,Block Transfer Service使用...

    开源力量spark公开课的ppt

    3. **源码解析**:深入Spark的源代码,讲解关键类和方法,如DAGScheduler、TaskScheduler、Storage层的BlockManager等,帮助理解Spark如何调度任务和管理数据。 4. **编程模型**:解释Spark的API使用,包括Scala、...

    spark shuffle简介

    2. **Spark 1.1.0**:引入了文件合并机制(file consolidation),即Hash-based Shuffle中的BlockManager开始合并输出文件,减少了磁盘I/O次数。但这个特性默认是关闭的,需要用户手动配置启用。 3. **Spark 1.4.0*...

    spark大数据实践

    **Spark** 是一款高性能的分布式计算框架,它能够在内存中处理大规模数据集,从而显著提高数据处理速度。相较于Hadoop MapReduce,Spark的主要优势在于: 1. **内存计算**:Spark支持数据在内存中的缓存,减少了...

    大数据 HCIA-Big Data H13-711考题.docx

    FusionInsight HD 系统中 HDFS DataNode 会周期性地将本节点的 Block 相关信息发送给 NameNode。 十六、大数据主要特征 大数据的主要特征包括数据来源多、格式多、数据增长速度快、处理速度快、数据量大、计算量...

    大数据各类性能调优

    例如,在大型集群中,可能需要增加Manager的内存分配,以便更好地支持更多的并发请求。 #### 12.3 HBase ##### 12.3.1 提升BulkLoad效率 - BulkLoad是一种将大量数据快速导入HBase的有效方法。为了进一步提高Bulk...

    公有云中的Cloudera作业数据库实践.pptx

    - **Spark Streaming与作业数据库**:在公有云中,可以创建独立的Spark Streaming集群和作业数据库集群,两者在相同的可用区内运行,以实现实时处理和服务架构。 6. **计费模式比较**: - **按需计费 (On-Demand)...

    hbase-1.2.0-cdh5.14.0.tar.gz

    HBase是Apache Hadoop生态系统中的一个...通过理解以上知识点,您可以开始在CDH 5.14.0环境中部署和使用HBase,实现高效的大数据存储和处理。在实际应用中,还需要结合具体的业务需求和场景来调整配置和优化性能。

    最新大数据Hadoop面试题!(附答案解析).pdf

    - 在大数据处理场景下,集群的主要瓶颈通常是磁盘IO,因为处理大量数据时,读写操作频繁,且需要冗余存储。 5. **集群管理和优化** - 集群管理工具有Puppet、Pdsh和Cloudera Manager等,这些工具可以帮助管理员...

    大数据高频面试题库.docx

    在大数据领域,面试通常会涵盖各种核心技术,如Linux与Shell、Hadoop、YARN和Zookeeper等。以下是对这些知识点的详细阐述: 1. **Linux&Shell相关**: - **Linux常用命令**:包括但不限于ls、cd、pwd、mkdir、rm、...

Global site tag (gtag.js) - Google Analytics