`
bit1129
  • 浏览: 1069827 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Scala四】分析Spark源代码总结的Scala语法二

 
阅读更多

1. Some操作

 

在下面的代码中,使用了Some操作:if (self.partitioner == Some(partitioner)),那么Some(partitioner)表示什么含义?首先partitioner是方法combineByKey传入的变量,

Some的文档说明:

 

/** Class `Some[A]` represents existing values of type
 *  `A`.
 *
 *  @author  Martin Odersky
 *  @version 1.0, 16/07/2003
 */
final case class Some[+A](x: A) extends Option[A] {
  def isEmpty = false
  def get = x
}

 

Some(partitioner)表达什么含义?它是Some类型的对象,包装了partitioner对象。self是RDD的partitioner属性,它是Option类型.

 

Some(partitioner) == Some(partitioner)为true

Option(partitioner) == Some(partitioner)为false

 

 

  /**
   * Generic function to combine the elements for each key using a custom set of aggregation
   * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
   * Note that V and C can be different -- for example, one might group an RDD of type
   * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
   *
   * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
   * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
   * - `mergeCombiners`, to combine two C's into a single one.
   *
   * In addition, users can control the partitioning of the output RDD, and whether to perform
   * map-side aggregation (if a mapper can produce multiple items with the same key).
   */
  def combineByKey[C](createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null): RDD[(K, C)] = {
    require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
    if (keyClass.isArray) {
      if (mapSideCombine) { //对于array keys,不能使用 map-side combining
        throw new SparkException("Cannot use map-side combining with array keys.")
      }
      if (partitioner.isInstanceOf[HashPartitioner]) {//对于array keys,不能使用默认的HashPartioner
        throw new SparkException("Default partitioner cannot partition array keys.")
      }
    }
    val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
    if (self.partitioner == Some(partitioner)) { 
      self.mapPartitions(iter => {
        val context = TaskContext.get()
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      new ShuffledRDD[K, V, C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(mapSideCombine)
    }
  }

 

2. self变量

上面的代码中用了self变量,由于没看清楚,这个self不是想象中的关键字,而是PairRDDFunctions的构造属性(成为类属性),self是PairRDDFunctions隐式类对应的本尊类(RDD[(K,V)])对应的对象,这应该是Scala的命名规范吧

 

3. 继承

 

1. 如下ShuffledRDD继承自RDD,在类的继承中,ShuffleRDD可以接受构造参数,而且可以提供给父类的构造参数,比如prev.context,Nil,prev本是ShuffledRDD的构造参数,而后传递给了它的父类RDD

2. RDD[ _ <: Product2[K,V] ],其中的<:表示什么含义?

 

/**
 * :: DeveloperApi ::
 * The resulting RDD from a shuffle (e.g. repartitioning of data).
 * @param prev the parent RDD.
 * @param part the partitioner used to partition the RDD
 * @tparam K the key class.
 * @tparam V the value class.
 * @tparam C the combiner class.
 */
// TODO: Make this return RDD[Product2[K, C]] or have some way to configure mutable pairs
@DeveloperApi
class ShuffledRDD[K, V, C](
    @transient var prev: RDD[_ <: Product2[K, V]],
    part: Partitioner)
  extends RDD[(K, C)](prev.context, Nil){
  .....
}

  

 

 

4. Scala单例

object SparkHadoopUtil {

  private val hadoop = {
    val yarnMode = java.lang.Boolean.valueOf(
        System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
    if (yarnMode) {
      try {
        Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")
          .newInstance()
          .asInstanceOf[SparkHadoopUtil]
      } catch {
       case e: Exception => throw new SparkException("Unable to load YARN support", e)
      }
    } else {
      new SparkHadoopUtil
    }
  }

  def get: SparkHadoopUtil = {
    hadoop
  }
}

 

 5. 方法调用不写(),

在调用没有参数的方法时,可以不加().

    val outputFormatInstance = hadoopConf.getOutputFormat
    val keyClass = hadoopConf.getOutputKeyClass //方法调用
    val valueClass = hadoopConf.getOutputValueClass

 

 6. until表达式

 

  0 until rdd.partitions.size表达式返回的是一个集合,Seq[Int]。而for(i <= 0 until 10)表示的是0,9一共10次循环,即0 until 10先得到一个集合,使用for( i <= 集合)来遍历集合

  /**
   * Run a job on all partitions in an RDD and return the results in an array.
   */
  def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = {
    runJob(rdd, func, 0 until rdd.partitions.size, false)
  }

 

 7.ClassTag的用法

下面这个方法定义中,可以分析的东西很多,首先,方法saveAsHadoopFile有两个参数,字符串类型的path以及ClassTag[F]类型的fm,所不同的是saveAsHadoopFile带有两个()()。

fm的类型是ClassTag[T],而fm.runtimeClass.asInstanceOf[Class[F]]的含义是什么?

 

 

  def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) {
    saveAsHadoopFile(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]])
  }

 

 8.(index, res) => results(index) = res表示什么含义?

将results(index)复制位res

 

  /**
   * Run a function on a given set of partitions in an RDD and return the results as an array. The
   * allowLocal flag specifies whether the scheduler can run the computation on the driver rather
   * than shipping it out to the cluster, for short actions like first().
   */
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      allowLocal: Boolean
      ): Array[U] = {
    val results = new Array[U](partitions.size)
    runJob[T, U](rdd, func, partitions, allowLocal, (index, res) => results(index) = res)
    results
  }

 上面runJob方法的定义是:

 

  /**
   * Run a function on a given set of partitions in an RDD and pass the results to the given
   * handler function. This is the main entry point for all actions in Spark. The allowLocal
   * flag specifies whether the scheduler can run the computation on the driver rather than
   * shipping it out to the cluster, for short actions like first().
   */
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      allowLocal: Boolean,
      resultHandler: (Int, U) => Unit) {
    if (dagScheduler == null) {
      throw new SparkException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
      resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

 

 9.Any类型,

Scala中,Any表示任意类型,也就是它类似于Java的Object类型。

如下代码取自MapOutputTracker.scala

 

  /**
   * Send a message to the trackerActor and get its result within a default timeout, or
   * throw a SparkException if this fails.
   */
  protected def askTracker(message: Any): Any = {
    try {
      val future = trackerActor.ask(message)(timeout)
      Await.result(future, timeout)
    } catch {
      case e: Exception =>
        logError("Error communicating with MapOutputTracker", e)
        throw new SparkException("Error communicating with MapOutputTracker", e)
    }
  }

 

 

 

 

 

 

 

 

分享到:
评论

相关推荐

    scala与spark基础

    1. Scala语法基础:包括变量声明、函数定义、类和对象、模式匹配、高阶函数等。 2. Spark核心概念:理解RDD、DataFrame/Dataset,以及如何进行转换和行动操作。 3. Spark SQL:学习如何使用DataFrame API进行数据...

    上手提示:使用新版IDEA+Maven+Scala编写Spark程序

    ### 使用新版IDEA+Maven+Scala编写Spark程序的关键步骤 #### 一、理解IDEA在2017版中的变化 2017版的IntelliJ IDEA(简称IDEA)相较于之前版本,在界面设计与操作流程上进行了较大的改进与优化。对于初次接触这个...

    Scala语法入门.pdf

    Scala的源代码会被编译成Java字节码,从而可以在JVM上运行。Scala支持静态类型检查,这有助于在编译期间捕获错误,类似于Java、C和C++等静态类型语言。同时,Scala也吸收了现代函数式编程语言的设计特点,包括不可变...

    spark项目代码以及数据

    - Scala/Java/Python/R源代码:实现Spark作业逻辑,包括数据读取、转换和写入操作。 - SQL查询:如果使用Spark SQL,可能包含创建DataFrame、执行SQL查询的代码。 - 测试用例:验证代码正确性的单元测试或集成测试。...

    scala sdk scala-2.12.3

    1. **编译器**:Scala编译器将Scala源代码转换为Java字节码,使得程序可以在JVM上运行。2.12.3版本的编译器支持最新的语言特性,并对错误报告和编译速度进行了改进。 2. **标准库**:Scala的标准库提供了大量的类和...

    windows版scala-2.11.12.zip

    这样,无论你在哪个目录下,都可以通过键入`scala`或`scalac`来启动Scala的交互式Shell或者编译Scala源代码。 Spark是一个用Scala编写的分布式计算框架,它利用Scala的简洁语法和强大的功能来构建大规模数据处理...

    scala核心编程总结

    3. **Packages(包)**:类似于Java中的包,Scala中的包用于组织源代码。通过使用包,开发者可以避免命名冲突,提高代码的组织性和可管理性。 4. **Data Structures(数据结构)**:Scala提供了一套丰富的内置数据...

    实时计算项目(Scala结合spark实现).zip

    1. **源代码**:项目的核心代码,使用Scala编写,可能包含了Spark Job的定义,以及数据处理逻辑。 2. **配置文件**:如`conf`目录下的配置文件,可能包含Spark的配置参数和环境变量。 3. **测试用例**:可能有单元...

    scala-2.11.12.rar

    2. **Spark SQL**:Spark SQL是用于处理结构化数据的模块,它可以与Hive、Parquet和其他数据源交互,并提供了DataFrame和DataSet API,使得SQL查询和函数式编程可以无缝集成。 3. **Spark Streaming**:Spark ...

    scala-intellij-bin-2019.1.9.zip

    1. **语法高亮**:提供Scala源代码的色彩标记,使得代码更易读,有助于快速识别语句类型。 2. **代码补全**:在编写代码时,自动提示可能的函数、变量、类等,提高编码效率。 3. **错误检测**:实时检查代码错误,...

    scala-2.9.3.tgz

    Scala的语法简洁,能够更好地支持函数式编程,这使得Spark的代码更加简洁、可读性强,同时也更容易编写出并发和分布式程序。 在Spark集群中,主要组件包括: 1. **Spark Core**:这是Spark的基础,提供了分布式任务...

    scala编写,以spark stream方式监听消费kafka的topic,同步写入elasticsearch

    在IT行业中,大数据处理与实时分析是至关重要的领域,而Scala、Kafka、Spark和Elasticsearch这四个组件是构建高效数据流系统的关键技术。在这个项目中,我们使用Scala编程语言,结合Spark Streaming来监听和消费...

    基于spark的外卖大数据平台分析系统.zip

    Spark SQL提供了与SQL兼容的接口,使得数据分析师可以使用熟悉的SQL语法进行复杂的数据查询和分析。例如,可以分析用户购买习惯,找出最受欢迎的菜品和商家,或者挖掘用户的消费模式,为精准营销提供数据支持。 ...

    IDEA的scala插件

    1. **语法高亮与代码提示**:Scala插件为Scala源代码提供语法高亮显示,使代码更易于阅读。同时,它还提供了智能代码补全和错误检测,帮助开发者快速编写和修正代码。 2. **代码导航**:插件支持类、方法、变量等...

    scala-2.12.8

    然后,他们可以开始学习Scala语法,创建Spark项目,并使用Maven或Sbt等构建工具管理依赖。对于初学者,了解Scala的基本概念如类、对象、特质、模式匹配、高阶函数和不可变数据结构非常重要。对于Spark开发,理解RDD...

    intellij idea2018的scala编程插件

    1. **语法高亮**:Scala插件为Scala源代码提供颜色编码,使得代码更易读,更容易识别不同的语法元素。 2. **代码完成**:当你在编写代码时,插件会提供智能的代码补全建议,包括类、方法、变量等,极大地减少了手动...

    scala-2.11.8.tgz

    1. **Scala编译器**:`scala-2.11.8/bin/scala`,这是一个命令行工具,用于将Scala源代码编译成JVM可以执行的字节码。 2. **Scala解释器**:`scala-2.11.8/bin/scalac`,允许用户直接在命令行运行Scala代码,进行...

    scala:scala代码

    在"scala: scala代码"的描述中,我们可以推测这是一个关于Scala编程语言的学习资源或者项目源代码。 1. **面向对象编程**:Scala是基于Java虚拟机(JVM)的语言,因此它兼容Java的所有类库。它支持类、对象和继承等...

    scala 2.11.7 安装包

    通过创建SparkSession,用户可以连接到数据源,执行SQL查询,或者进行复杂的数据分析和机器学习任务。 总的来说,这个Scala 2.11.7安装包是学习大数据处理和Spark的一个良好起点,通过深入理解和实践,开发者可以...

    scala-2.12.1 for Linux

    4. `src`目录(如果包含的话):源代码,对于学习Scala的内部工作原理非常有用。 Scala的多范式特性使其在处理复杂问题时特别有效。例如,面向对象编程允许我们通过类和对象来组织和抽象代码,而函数式编程则提供了...

Global site tag (gtag.js) - Google Analytics