`

关于Spark的Broadcast解析

 
阅读更多
本文重点关注 数据块切分方法以及P2P下载数据方法
Broadcast(广播)是相对较为常用方法功能,通常使用方式,包括共享配置文件,map数据集,树形数据结构等,为能够更好更快速为TASK任务使用相关变量。

期间,曾见过有童鞋用原始日志(log)进行广播,导致集群运行缓慢,诸位童鞋可以引此为戒,其与JAVA的ServletContext的作用近似,ServletContext存放过多数据也会内存溢出的,Broadcast虽然不会溢出(使用内存+硬盘方式),但是依然会影响运行。

基础使用:
val broadcastValue = sc.broadcast(存储值)  
broadcastValue.value


我们从三个方面叙述Broadcast,初始化、创建(写入)、使用

初始化

Broadcast是典型的建造者模式方法,相对内部设计相对较为简单,同时初始化并非直接创建Broadcast对象,作用有两个方面:

1. 依据配置属性(spark.broadcast.factory)创建BroadcastFactory对象 - 反射创建。

2. 将sparkConf对象注入Broadcast中,同时定义压缩编码


初始化入口sparkContext启动时创建,其调用顺序为(后续有时间调整为时序图):

    SparkContext#构造方法
    SparkEnv#create
    BroadcastManager#initialize()

    TorrentBroadcastFactoryr#initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager)
    TorrentBroadcast#initialize(_isDriver: Boolean, conf: SparkConf)



1. 创建过程中,初始化Env配置环境时创建BroadcastManager对象,相关代码为:

SparkContext:
 // Create the Spark execution environment (cache, map output tracker, etc)  
  // 创建spark的执行环境  
  private[spark] val env = SparkEnv.create(  
    conf, // spark配置文件  
    "<driver>",  
    conf.get("spark.driver.host"), // 主机名  
    conf.get("spark.driver.port").toInt, // 端口号  
    isDriver = true, // 默认启动SparkContext客户端,便是Driver  
    isLocal = isLocal,// 是否是本地运行,是通过master获取该值,如果是submit提交,请参考SparkSubmitArguments类,会将参数转换为master  
    listenerBus = listenerBus   
    /* spark监听总线(LiveListenerBus),他是负责监听spark事件,包括job启动和介绍、BlockManage的添加等等,简单理解UI能看到的变化都是这块监听的, 
     * 如果有时间,可以将这块与大家分享一下,底层使用队列实现,典型观察者模式实现,未使用akka实现 */  
    )  
  SparkEnv.set(env) // 注册SparkEnv对象


SparkEnv:
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,  
      serializer, conf, securityManager, mapOutputTracker, shuffleManager)  
  
val connectionManager = blockManager.connectionManager  
  
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)  
  
val cacheManager = new CacheManager(blockManager)


2.BroadcastManager构造函数调用initialize方法构建
// Called by SparkContext or Executor before using Broadcast  
  // 一个context仅初始化一次,默认是Torrent  
  private def initialize() {  
    // TODO 初始化BroadcastFactory  
    // 1.确定仅有第一次进入时,创建BroadcastFactory对象  
    // 2.初始化BroadcastFactory,并与BroadcastManager建立hook  
      
    synchronized {  
      // 1.0 如果第一次进入,初始化BroadcastFactory  
      if (!initialized) {  
        // 2.0 反射创建BroadcastFactory  
        val broadcastFactoryClass =  
          conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.TorrentBroadcastFactory")  
        broadcastFactory =  
          Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]  
  
        // 2.1 初始化BroadcastFactory,并与BroadcastManager建立hook  
        broadcastFactory.initialize(isDriver, conf, securityManager)  
        // 2.2 表示第一次进入完毕  
        initialized = true  
      }  
    }  
  }


3. TorrentBroadcastFactory调用initialize方法
override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {  
    TorrentBroadcast.initialize(isDriver, conf)  
}


4.最终将sparkConf对象注入Broadcast中,同时定义压缩方式
/** 初始化TorrentBroadcast属性 */  
  def initialize(_isDriver: Boolean, conf: SparkConf) {  
    TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests  
    synchronized {  
      if (!initialized) {  
        compress = conf.getBoolean("spark.broadcast.compress", true)  
        compressionCodec = CompressionCodec.createCodec(conf)  
        initialized = true  
      }  
    }  
  }

以上是sparkContext创建时完成broadcast,即便我们没有使用broadcast相关功能依然会进行创建,同时也表示broadcast类型,broadcast压缩方式是开始便定义,无法实现app中不同job使用不同的broadcast,TorrentBroadcast和HttpBroadcast无法并存。

spark默认使用TorrentBroadcast(并发),效率相对要比http要好,同时避免单机热点的产生,比较适合分布式系统的思想。

思想类似于迅雷BT下载,已使用的executor越多,及速度越快。

创建(写入):

driver首先要将值序列化到byteArray中,然后在按block大小进行分割(默认是4M),将信息存放在driver的blockmanage中,并通知blockmanage-master,完成注册,并可以让所有executor,存储方式内存+硬盘

使用write顺序:

1.   SparkContext#broadcast  外层方法,使用sc.broadcast 进行广播

2.   BroadcastManager#newBroadcast(value_ : T, isLocal: Boolean)

3.   TorrentBroadcastFactory#newBroadcast(value_ : T, isLocal: Boolean, id:Long)

4.   TorrentBroadcast#构造函数

5.   TorrentBroadcast#writeBlocks

6.   BlockManage#putBytes(

      blockId: BlockId,

      bytes: ByteBuffer,

      level: StorageLevel,

      tellMaster: Boolean = true,

      effectiveStorageLevel:Option[StorageLevel] = None)   最终存储


使用广播较为简单,如果sparkContext为长期有效执行多个job,则考虑注销广播,同时尽量广播要小(不然你会死的很惨^_^, 可以试试,嘿嘿)

代码如下:
val broadcastValue = sc.broadcast(存储值) // 注册  
broadcastValue.unpersist() // 注销方法一  
SparkEnv.get.broadcastManager.unbroadcast(id, false, false) // 注销方法二


创建时,使用SparkContext的broadcast方法,并将值一值传递至TorrentBroadcast,并构建TorrentBroadcast对象,同时完成将值交给BlockManage进行注册,并序列化在本地存储。(SparkEnv.get.blockManager.putBytes方法)
private[spark] class TorrentBroadcast[T: ClassTag](  
    obj : T,  
    @transient private val isLocal: Boolean,  
    id: Long)  
  extends Broadcast[T](id) with Logging with Serializable {  
  
  
 /** 1.driver是直接读取本地的值 
  *  2.其他executor是依靠blockManager读取(readObject) */  
  @transient private var _value: T = obj  
  
  /* 固定格式: 
  * broadcastId = broadcast_广播ID 
  * blockID = broadcast_广播ID_piece[1,2,3,4] */  
  private val broadcastId = BroadcastBlockId(id)  
  
  /** 1.广播值交给blockManager管理 
   *  2.广播转换为ByteArray,返回数据块的长度 */  
  private val numBlocks: Int = writeBlocks()  
  
  override protected def getValue() = _value


writeBlocks是主要执行方法,主要功能便是按照定义的广播块大小切分数据(默认是4M,spark.broadcast.blockSize),其后将块注册blockManage,并写入本地磁盘中。

writeBlocks(){

   1.blockifyObject  数据切分方法

   2.BlockManage.putBytes  数据存储方法

}

blockifyObject其实算是较为通用的方法,写法也蛮实用,可以作为项目工具类使用,此处也将注释后贴给大家分享:

/** 切分数据,方法较为实用,可作为工具类,已收藏 
   *  @param obj 切分数据对象 */  
  def blockifyObject[T: ClassTag](obj: T): Array[ByteBuffer] = {  
    // TODO: Create a special ByteArrayOutputStream that splits the output directly into chunks  
    // so we don't need to do the extra memory copy.  
    // TODO 数据切块,按照默认的4M切分数据块,返回4MByteBuffer(数据体检变小)  
    //      数据 -> 压缩 -> 序列化 -> 分割  
    // 1. 声明输出流(定义压缩方式和序列化)  
    // 2. 压缩后数据按4M进行分割  
    // 3. 返回ByteBuffer字符  
      
    // 1.0 定义输出流  
    val bos = new ByteArrayOutputStream()  
    // 1.1 包装压缩方式  
    val out: OutputStream = if (compress) compressionCodec.compressedOutputStream(bos) else bos  
    // 1.2 创建序列化对象  
    val ser = SparkEnv.get.serializer.newInstance()  
    // 1.3 包装序列化输出流(默认java序列化,不过一般推荐KryoSerializer,建议修改spark-defaults.conf)  
    val serOut = ser.serializeStream(out)  
    // 1.4 将value写至ByteArray中  
    serOut.writeObject[T](obj).close()  
    val byteArray = bos.toByteArray  
    // 2.0 将ByteArray转换为输入流  
    val bais = new ByteArrayInputStream(byteArray)  
    // 2.1 获取分割块数,ceil有余数+1  
    val numBlocks = math.ceil(byteArray.length.toDouble / BLOCK_SIZE).toInt  
    // 2.2 定义数据块集合  
    val blocks = new Array[ByteBuffer](numBlocks)  
    // 2.3 定义块ID  
    var blockId = 0  
    // 2.4 循环按4M分割数据块,步长为4M  
    for (i <- 0 until (byteArray.length, BLOCK_SIZE)) {  
      // 2.4.1 定义装载4M的byte的容器  
      val thisBlockSize = math.min(BLOCK_SIZE, byteArray.length - i)  
      val tempByteArray = new Array[Byte](thisBlockSize)  
      // 2.4.2 装载数据  
      bais.read(tempByteArray, 0, thisBlockSize)  
      blocks(blockId) = ByteBuffer.wrap(tempByteArray)  
      // 2.4.3 index加一  
      blockId += 1  
    }  
    // 3.0 切分结束,关闭流  
    bais.close()  
    // 3.1 返回流  
    blocks  
  }


读取:

broadcase写入是优先写入依据存储策略写入本地(BlockManage#putBytes方法),既然序列化数据是本地存储,由此而来的问题是读取问题,BlockManage存储数据并不似hdfs会依据备份策略存储多份数据放置不同节点(但是多提一句,spark的taskScheblue是拥有类似机架感知策略分配任务),如没有备份数据,那么必然产生一下数个问题:

   1.节点故障,无法访问节点数据

   2.数据热点,所有任务皆使用该数据

   3.网络传输,所有节点频繁访问单节点

那么解决该问题,spark并没有使用HDFS的思想,而选择是P2P点对点方式(BT下载)解决问题,是只要使用过broadcase数据,则在本接节点存储数据,由此变成新的数据源,随和数据源不断增加速度也会越来越快,刚开始传输则相对会慢一些,同时,以上不建议使用大文件broadcase,亦是如此,如果使用较为频繁的数据,他相当于每个节点都要存储一份,形成网状传输方式交换数据,因此建议存储配置文件或某种数据结构为上佳选择。



调用顺序:

TorrentBroadcast#readObject()

TorrentBroadcast#readBlocks()

BlockManage#getLocalBytes(blockId:BlockId) / getRemoteBytes(blockId: BlockId)

BlockManage#putBytes()



readObject是broadcase读取的主方法,管理整个读取策略
/** Used by the JVM when deserializing this object. */  
  private def readObject(in: ObjectInputStream) {  
    // TODO 读取广播变量,有便读取本地,没有则远程并存储在本地  
      
    // 1.0 可读取对象中静态变量  
    in.defaultReadObject()  
    // 2.0 读取广播变量(单个executor独享)  
    TorrentBroadcast.synchronized {  
      // 2.1 读取本地广播数据  
      SparkEnv.get.blockManager.getLocal(broadcastId).map(_.data.next()) match {  
        // 2.2 获取本地数据成功  
        case Some(x) =>  
          _value = x.asInstanceOf[T]  
        // 2.3 获取本地数据失败  
        case None =>  
          // 2.4 获取Blocks,同时将块存储到本地  
          logInfo("启动读取 broadcast variable " + id)  
          val start = System.nanoTime()  
          val blocks = readBlocks()  
          val time = (System.nanoTime() - start) / 1e9  
          logInfo("Reading broadcast variable " + id + " took " + time + " s")  
  
          // 2.5 将数据块反序列化,并解压缩  
          _value = TorrentBroadcast.unBlockifyObject[T](blocks)  
          // Store the merged copy in BlockManager so other tasks on this executor don't  
          // need to re-fetch it.  
          SparkEnv.get.blockManager.putSingle(  
            broadcastId, _value, StorageLevel.MEMORY_AND_DISK, tellMaster = false)  
      }  
    }  
  }


readBlocks则是实现P2P思想的具体实现者
/** Fetch torrent blocks from the driver and/or other executors. */  
  private def readBlocks(): Array[ByteBuffer] = {  
    // Fetch chunks of data. Note that all these chunks are stored in the BlockManager and reported  
    // to the driver, so other executors can pull these chunks from this executor as well.  
      
    // 1.0 定义数据块集合  
    val blocks = new Array[ByteBuffer](numBlocks)  
    // 1.1 引用blockManager  
    val bm = SparkEnv.get.blockManager  
      
    // 2.0 循环遍历所有块,避免访问热点,随机顺序读  
    for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {  
      // 2.1 组装块ID  
      val pieceId = BroadcastBlockId(id, "piece" + pid)  
  
      // First try getLocalBytes because  there is a chance that previous attempts to fetch the  
      // broadcast blocks have already fetched some of the blocks. In that case, some blocks  
      // would be available locally (on this executor).  
      // 2.2 他会先查本地,继而查询远程,但是前面已经查找的是广播,现在查找的是认数据块(区别)  
      var blockOpt = bm.getLocalBytes(pieceId)  
      // 2.3 如果本地为查询到结果,则通过blockManager远程获取,并将数据存储到本地  
      if (!blockOpt.isDefined) {  
        blockOpt = bm.getRemoteBytes(pieceId)  
        blockOpt match {  
          case Some(block) =>  
            // If we found the block from remote executors/driver's BlockManager, put the block  
            // in this executor's BlockManager.  
            SparkEnv.get.blockManager.putBytes(  
              pieceId,  
              block,  
              StorageLevel.MEMORY_AND_DISK_SER,  
              tellMaster = true)  
  
          case None =>  
            throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)  
        }  
      }  
      // If we get here, the option is defined.  
      // 3.0 赋值数据块集合  
      blocks(pid) = blockOpt.get  
    }  
    // 3.1 返回数据块  
    blocks  
  }


相关配置属性说明:

spark.broadcast.factory 定义使用http或Torrent方式,默认是Torrent,无需修改

spark.broadcast.blockSize 数据库块大小,blockifyObject依据此属性切分数据块,默认4M

spark.broadcast.compress 是否压缩,默认是使用,sparkcontext初始化该属性,无需修改


分享到:
评论

相关推荐

    spark 调优解析 spark 企业调优

    - **策略:** 对于需要广播到所有task的大变量,使用`broadcast()`函数减少网络传输压力。 **原则八:使用Kryo优化序列化性能** - **策略:** 开启Kryo序列化支持,提高序列化/反序列化的速度。 **原则九:优化数据...

    Spark原理解析

    一个大牛写的Spark原理解析,中文的,主要包括以下几个方面: 1-Overview 概览 2-JobLogicalPlan Job逻辑执行图 3-JobPhysicalPlan Job物理执行图 4-shuffleDetails Shuffle过程 5-Architecture 架构 6-...

    精通Spark内核

    精通Spark内核:此阶段聚焦于Spark内核的设计、实现和核心源码解析,对内核中的实现架构、运行原理、性能调优和核心源码各个击破: 1, 通过源码精通Spark内核实现和任务调度; 2,精通RDD、DAGScheduler、Task...

    spark调优.rar

    以下是对Spark调优的详细解析: 1. **配置优化**:Spark的配置参数对性能有很大影响。例如,`spark.executor.instances`决定了Spark作业可以使用的Executor数量,`spark.executor.memory`定义了每个Executor的内存...

    spark-blockmanager基础及源码彻底解析

    - **Broadcast变量**:广播变量的缓存在BlockManager中,确保只在每个Executor上存储一份。 - **Spark Streaming**:Receiver模式下,接收到的数据存储在BlockManager,生成Job时从中读取。 - **RDD缓存**:cache...

    Spark_competion 数据集

    三、Spark Competition数据集解析 "Spark_competition"数据集可能包含了多种类型的文件,如: - "Accumulator":Spark中的累加器,用于在任务执行过程中进行累加计算,通常在并行计算中收集统计信息。 - "ads":可能...

    spark-广播变量基础及源码解析

    Spark 广播变量基础及源码解析 Spark 广播变量是一种高效的数据共享机制,在 Spark 中广泛应用于大数据处理和机器学习等领域。广播变量可以使我们在每台机器上都缓存一个变量,而不会每次都随着 task 的生成进行...

    Spark 1.0.0 API (java)

    **Spark 1.0.0 API (Java) 深度解析** Spark 是一个快速、通用且可扩展的大数据处理框架,它最初由加州大学伯克利分校AMPLab开发,并随后成为Apache软件基金会的顶级项目。Spark 1.0.0版本是其发展中的一个重要里程...

    Spark2.0新特性

    - **ANSI-SQL和HiveQL解析器**: 支持两种不同的SQL解析器,增加了灵活性。 - **DDL命令支持**: 支持创建、修改和删除数据库对象的DDL命令。 - **子查询支持**: 支持IN/NOT IN、EXISTS/NOT EXISTS等子查询,增加了...

    Spark SQL- Relational Data Processing in Spark(Paper).rar

    下面我们将详细解析Spark SQL的关键知识点。 1. **DataFrame API**: Spark SQL的核心是DataFrame API,它提供了一种声明式的编程模型,允许用户以结构化的方式处理数据。DataFrame可以看作是分布式的、带类型的...

    spark-2.4.0.zip

    《Spark 2.4.0 源码解析与实战指南》 Spark 2.4.0 是 Apache Spark 的一个重要版本,它在大数据处理领域扮演着核心角色,尤其在分布式计算、流处理和机器学习等方面提供了高效且易用的框架。这个版本包含了丰富的...

    Spark SQL优化器系统Catalyst的深入解析和应用.pdf

    Catalyst优化器通过分析不同Join策略(如Broadcast Hash Join、Sort Merge Join等)的代价来选择最优的Join方案。这种自动化选择Join策略的方式,不仅减轻了数据工程师的负担,也使得数据处理过程更加高效和精确。 ...

    Spark-Spark的开发调优.pdf

    ### Spark开发调优知识点解析 #### 一、前言与背景 随着大数据处理需求的日益增长,Apache Spark作为一款高效的数据处理框架,在业界得到了广泛的应用。Spark不仅支持批处理、实时流处理,还覆盖了机器学习和图形...

    Spark大数据处理:技术、应用与性能优化

    本文将详细解析其中的关键知识点。 一、Spark核心概念与架构 1. Resilient Distributed Datasets (RDDs):Spark的基础数据抽象,是不可变的、分区的数据集合,具备容错性。 2. DAG执行模型:Spark通过构建有向无环...

    Spark开发者的免费入门宝典:让你的数据处理更简单(上册).pdf

    - **Spark SQL 性能优化**:本书还提供了关于 Spark SQL 性能优化的实用指南。包括如何使用 AQE 提升查询性能、如何调整参数以优化内存管理和数据读取等。 - **Structured Streaming 生产化实践**:针对 Structured...

    Apache Spark RDD面试题

    ### Apache Spark RDD 相关知识点解析 #### 一、SparkContext 创建位置 - **知识点**:`SparkContext` 是 Spark 应用程序的入口点,它由用户在编写应用程序时手动创建。 - **解释**:在 Spark 应用程序中,`...

    spark-2.1.1:spark原始物走读注解解

    《Spark 2.1.1:深度解析与源码阅读笔记》 Spark作为一个开源的分布式计算框架,以其高效、易用和灵活性深受大数据处理领域的欢迎。Spark 2.1.1是其发展中的一个重要版本,它在性能优化、功能增强以及稳定性上都有...

    spark_source_code_breakdown:Spark学习与源码阅读

    《Spark学习与源码阅读深度解析》 Spark作为大数据处理领域的明星框架,以其高效、易用和可扩展性赢得了广大开发者和数据科学家的青睐。深入理解Spark的源码,不仅有助于提升开发技能,还能帮助我们优化性能,解决...

    spark-sql-2.3-source-code-interpretation:spark sql 2.3原始代码理解自己的阅读源码后的总结,欢迎大家阅读-spark source code

    本文将基于源码解析,对Spark SQL 2.3的关键特性进行深入探讨。 1. **DataFrame和Dataset API**:Spark SQL 2.3中,DataFrame和Dataset API进一步强化了类型安全和编程便利性。DataFrame是基于Schema的RDD,而...

Global site tag (gtag.js) - Google Analytics