`
bit1129
  • 浏览: 1072701 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark二六】Spark代码剖析

 
阅读更多

SparkEnv初始化的角色

 

 

	org.apache.spark.shuffle.sort.SortShuffleManager   ///shuffleManager
	org.apache.spark.MapOutputTrackerMaster
	org.apache.spark.shuffle.ShuffleMemoryManager
	org.apache.spark.network.netty.NettyBlockTransferService
	org.apache.spark.MapOutputTrackerMaster@25e45d
	org.apache.spark.serializer.JavaSerializer@dc42ab   ///closureSeirializer, serializer
	org.apache.spark.storage.BlockManager@16d5aa8
	org.apache.spark.storage.BlockManagerMaster@a62840
	org.apache.spark.network.netty.NettyBlockTransferService@148d5b2   //blockTransferService
	org.apache.spark.CacheManager@1ac9928
	org.apache.spark.HttpFileServer@131d67
	org.apache.spark.metrics.MetricsSystem@516ac3
	org.apache.spark.MapOutputTrackerMaster@25e45d
	org.apache.spark.broadcast.BroadcastManager@f8008d
	C:\Users\hadoop\AppData\Local\Temp\spark-7f0f46d9-28d0-4e8d-94d0-9a8f8f589d14   //sparkFilesDir


    new SparkEnv(
      executorId,
      actorSystem,
      serializer,
      closureSerializer,
      cacheManager,
      mapOutputTracker,
      shuffleManager,
      broadcastManager,
      blockTransferService,
      blockManager,
      securityManager,
      httpFileServer,
      sparkFilesDir,
      metricsSystem,
      shuffleMemoryManager,
      conf)
  }

 

分析的源代码:

 

package spark.examples

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

object SparkWordCount {
  def main(args: Array[String]) {
    System.setProperty("hadoop.home.dir", "E:\\devsoftware\\hadoop-2.5.2\\hadoop-2.5.2");
    val conf = new SparkConf()
    conf.setAppName("SparkWordCount")
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    val rdd1 = sc.textFile("file:///D:/word.in")
    println(rdd1.toDebugString)
    val rdd2 = rdd.flatMap(_.split(" "))
    println("rdd2:" + rdd2.toDebugString)
    val rdd3 = rdd2.map((_, 1))
    println("rdd3:" + rdd3.toDebugString)
    val rdd4 = rdd4.reduceByKey(_ + _);
    println("rdd4:" + rdd4.toDebugString)
    rdd3.saveAsTextFile("file:///D:/wordout" + System.currentTimeMillis());
    sc.stop
  }
}

 

 输出的RDD依赖图是:

RDD1

rdd1:(1) file:///D:/word.in MappedRDD[1] at textFile at SparkWordCount.scala:15 []
 |  file:///D:/word.in HadoopRDD[0] at textFile at SparkWordCount.scala:15 []

RDD2

rdd2:(1) FlatMappedRDD[2] at flatMap at SparkWordCount.scala:17 []
 |  file:///D:/word.in MappedRDD[1] at textFile at SparkWordCount.scala:15 []
 |  file:///D:/word.in HadoopRDD[0] at textFile at SparkWordCount.scala:15 []

RDD3

rdd3:(1) MappedRDD[3] at map at SparkWordCount.scala:19 []
 |  FlatMappedRDD[2] at flatMap at SparkWordCount.scala:17 []
 |  file:///D:/word.in MappedRDD[1] at textFile at SparkWordCount.scala:15 []
 |  file:///D:/word.in HadoopRDD[0] at textFile at SparkWordCount.scala:15 []

 

rdd4:(1) ShuffledRDD[4] at reduceByKey at SparkWordCount.scala:21 []
 +-(1) MappedRDD[3] at map at SparkWordCount.scala:19 []
    |  FlatMappedRDD[2] at flatMap at SparkWordCount.scala:17 []
    |  file:///D:/word.in MappedRDD[1] at textFile at SparkWordCount.scala:15 []
    |  file:///D:/word.in HadoopRDD[0] at textFile at SparkWordCount.scala:15 []
 

 ResultTask的runTask方法里的func方法调用PairRDDFunctions里的writeToFile函数完成写结果操作(saveAsTextFile)

 

  override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

    metrics = Some(context.taskMetrics)
    func(context, rdd.iterator(partition, context)) //调用PairRDDFunctions里的writeToFile函数完成写结果操作(saveAsTextFile)
  }

 

 

PairRDDFunctions里的writeToFile函数完成写结果操作(saveAsTextFile)

 val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {
      val config = wrappedConf.value
      // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
      // around by taking a mod. We expect that no task will be attempted 2 billion times.
      val attemptNumber = (context.attemptId % Int.MaxValue).toInt

      val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context, config)

      writer.setup(context.stageId, context.partitionId, attemptNumber)
      writer.open()
      try {
        var recordsWritten = 0L
        while (iter.hasNext) {
          val record = iter.next()
          writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])

          // Update bytes written metric every few records
          maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
          recordsWritten += 1
        }
      } finally {
        writer.close()
      }
      writer.commit()
      bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
    }
 

 

 

 

 

分享到:
评论

相关推荐

    spark项目代码以及数据

    这个压缩包文件包含了Spark项目相关的代码和数据,这意味着我们可以深入探讨Spark在实际项目中的应用,以及如何处理和分析数据。 一、Spark概述 Spark的核心设计理念是提供内存计算,以加速大规模数据处理。相比...

    spark2官方示例源代码

    六、Spark任务调度和并行化 Spark的并行化能力是其性能的关键。通过示例,你可以了解如何配置Spark作业,设置并行度,以及理解任务调度过程。例如,DAG(有向无环图)的构建、Stage划分和Task调度等概念。 七、性能...

    IM, spark 分析代码

    本主题聚焦于"IM, spark 分析代码",这通常涉及到即时通讯(IM)系统的数据处理,以及如何利用Apache Spark进行高效分析。IMClient-master可能是一个包含IM客户端源代码的项目,而IMProject.userlibraries可能包含了...

    Spark源码剖析

    Spark源码剖析是一门深入探索Apache Spark底层实现原理的课程,其主要内容包括对Spark源码的详细解读和分析。Apache Spark是一个开源的分布式计算系统,提供了快速而通用的计算引擎。1.02版本作为课程的切入点,是一...

    基于spark电商用户可视化行为分析项目源码,以 Spark 框架为核心, 对电商网站的日志进行离线和实时分析

    项目概述 本项目来源于企业级电商网站的大数据统计分析平台, 该平台以 Spark 框架为核心, 对电商网站的日志进行离线和实时分析。 该大数据分析平台对电商网站的各种用户行为( 访问行为、购物行为、广告点击行为等...

    scala开发spark代码

    此外,Spark的API设计友好,使得它适合用于复杂的数据分析任务,包括机器学习、图计算和交互式查询。 总结来说,这个压缩包提供了Scala开发Spark应用程序的实践代码,涵盖了Spark Core的基本操作、Spark SQL的结构...

    spark2.7.7源代码

    Spark 2.7.7 源代码是 Apache Spark 的一个特定版本,它是一个用于大规模数据处理的开源框架。这个版本结合了Openfire,一个基于XMPP协议的即时通讯(IM)服务器,使得Spark能够支持实时通信功能。在这个源代码包中...

    Spark 实践-基于 Spark Streaming 的实时日志分析系统+源代码+文档说明

    1、资源内容:Spark 实践——基于 Spark Streaming 的实时日志分析系统+源代码+文档说明 2、代码特点:内含运行结果,不会运行可私信,参数化编程、参数可方便更改、代码编程思路清晰、注释明细,都经过测试运行成功...

    spark部署和基础代码的编写

    - **为什么学Spark**:Spark的高性能、易于编程的特性使得它在大数据处理领域广泛应用,尤其适合实时分析、机器学习和图计算任务。 - **Spark特点**: - **快**:通过使用内存计算,Spark可以比Hadoop MapReduce...

    spark的样例代码

    这个"spark的样例代码"压缩包很可能是为了帮助初学者或开发者快速理解和掌握Spark的基本操作和编程模型。接下来,我们将分别针对Java、Scala和Python这三种编程语言的Spark编程进行详细的解释和探讨。 **Java Spark...

    基于spark的外卖大数据平台分析系统.zip

    《基于Spark的外卖大数据平台分析系统》 在当今数字化时代,大数据分析已成为各行各业的重要工具,尤其是在餐饮服务领域,外卖业务的兴起催生了对外卖大数据处理的强烈需求。Apache Spark作为一个高效、通用的大...

    Big Data Analytics with Spark and Hadoop(Spark与Hadoop大数据分析)代码code

    Big Data Analytics with Spark and Hadoop(Spark与Hadoop大数据分析)代码code

    Spark最全操作完整示例代码

    最全Spark操作完整示例代码-------是基于java的。 包含所有的spark常用算子操作和ml以及mlib、sparkstreaming、sparkSQL操作的示例DEMO。 内附有详细说明,由于内容过大删除了两个jar包,需要自己去下载,spark的安装包...

    基于spark的数据分析+源代码+文档说明

    该资源内项目源码是个人的毕设,代码都测试ok,都是运行成功后才上传资源,答辩评审平均分达到96分,放心下载使用! <项目介绍> 1、该资源内项目代码都经过测试运行成功,功能ok的情况下才上传的,请放心下载使用! ...

    Scala和Spark大数据分析函数式编程、数据流和机器学习

    在大数据分析方面,Scala与Spark的结合使得开发者能够编写出高度并发且可扩展的代码。Spark的DataFrame和Dataset API与Scala紧密集成,提供了强大的数据操作和优化。DataFrame允许进行SQL式的表达式操作,而Dataset...

    深入理解Spark+核心思想与源码分析.pdf

    深入理解Sp深入理解SPARK:核心思想与源码分析》结合大量图和示例,对Spark的架构、部署模式和工作模块的设计理念、实现源码与使用技巧进行了深入的剖析与解读。 《深入理解SPARK:核心思想与源码分析》一书对Spark...

    ApacheSpark设计与实现.pdf+ApacheSpark源码剖析.pdf+Spark原著中文版.pdf

    《Apache Spark源码剖析》则会深入到Spark的源代码层面,帮助读者理解其实现细节: 1. **源码结构**:Spark源代码的主要模块和包划分,以及关键类的职责。 2. **调度机制**:DAGScheduler和TaskScheduler的工作流程...

    Spark大数据分析与实战课后练习答案.rar

    《Spark大数据分析与实战》课程是一门深入探讨Apache Spark在大数据处理领域的应用和技术的课程,其课后练习答案集提供了对课程所讲授知识的巩固和实践。这是一份珍贵的配套教学资源,旨在帮助学生更好地理解和掌握...

Global site tag (gtag.js) - Google Analytics