`
bit1129
  • 浏览: 1069816 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Scala三】分析Spark源代码总结的Scala语法一

 
阅读更多

Scala语法

1. classOf运算符

Scala中的classOf[T]是一个class对象,等价于Java的T.class,比如classOf[TextInputFormat]等价于TextInputFormat.class

 

 2. 方法默认值

defaultMinPartitions就是一个默认值,类似C++的方法默认值

 

 

  def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }

 

3. 函数常量

如下map方法的参数pair => pair._2.toString表示一个函数常量,pair是调用map方法的集合的每个元素,

 

 

  def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }

 

4. Tuple类型

上例中,调用map方法的对象,即hadoopFile的返回值是Tuple类型,它是一个元素类型可以不同的元组,比如val t: Tuple = (A,2,Three),访问Three,则使用t._3进行操作

 

5. 范型

如下方法所示例子,RDD[(K,V)]是方法返回值类型,它的类型是RDD类,而这个RDD类包含的范型使用[]声明,而(K,V)是元组Tuple类,因此,RDD[(K,V)]表达的类型是RDD<K,V>

 

  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
    reduceByKey(defaultPartitioner(self), func)
  }

 

 

6. 函数类型

下面这个例子,func是个函数类型的对象引用,func也是对象,也有类型,在Scala中,func的类型使用参数输入和输出来指定的。

(V,V)=>V,表示func接受两个参数,类型为V,而输出也是类型为V,因此,(a:Int,b:Int)=>a*b,(a:Int,b:Int)=>a+b都属于这个类型的对象

  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
    reduceByKey(defaultPartitioner(self), func)
  }

 

 7. 隐式函数以实现类型转换

 

7.1什么是隐式函数?

 

SparkContext中定义了如下的隐式函数,它的含义是将K,V类型的RDD(注:入参rdd是RDD[(K,V)])转换为PairRDDFunctions类型,它接受K,V类型的RDD作为构造入参

 

 

  implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
      (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = {
    new PairRDDFunctions(rdd)
  }
 

 

7.2 隐式函数何时被调用?

 

 

 val rdd = sc.textFile("file:///home/hadoop/spark1.2.0/word.txt")
 val rdd1 =  rdd.flatMap(_.split(" "))
 val rdd2 = rdd1.map((_, 1))
 //reduceByKey is not a member of org.apache.spark.rdd.RDD[(String, Int)],则需要import org.apache.spark.SparkContext._
 val rdd3 = rdd2.reduceByKey(_ + _)
 

 

在上面的SparkWordCount例子中,reduceByKey方法并不是RDD的方法,而rdd2是RDD类型的对象,所以Scala必须提供一种机制,将这个方法的调用进行转换,这里就是隐式函数发挥作用的情况,

那么哪个隐式函数会用来做这个转换?在上面的注释中清楚的写到,为了使用reduceByKey方法,必须引入import org.apache.spark.SparkContext._,这里的意思就是通过这个import语句,将rdd2进行类型转换的函数引进来,更具体的是,也可以使用如下的import语句进行导入

 

 

import org.apache.spark.SparkContext.rddToPairRDDFunctions
 

 

在SparkContext中,将RDD[(K,V)]转换为其它类型的隐式函数只有rddToPairRDDFunctions,因此,此方法会自动调用,

  implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
      (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = {
    new PairRDDFunctions(rdd)
  }

rddToPairRDDFunctions函数将rdd转为为new PairRDDFunctions对象,那么rdd2.reduceByKey调用最终转换到了PairRDDFunctions的reduceByKey函数的调用。如下方法定义于PairRDDFunctions类中。

reduceByKey(_ + _)匹配到func:(V,V)=>V

 

  /**
   * Merge the values for each key using an associative reduce function. This will also perform
   * the merging locally on each mapper before sending results to a reducer, similarly to a
   * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
   * parallelism level.
   */
  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
    reduceByKey(defaultPartitioner(self), func)
  }

 

 8. T*定义变长参数和定义_作为placeholder

在下面的例子中,RDD[ _ ]*同时使用了两个符号_和*,*表示变长参数,也就是说,others是个变长的RDD集合。而_表示在那个位置上的占位符,RDD[ _ ]中的_表示类型占位符,只是类型不定,此处类似于Java的Object,实现万能类型匹配

 

  def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
    val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
    for (r <- bySize if r.partitioner.isDefined) {
      return r.partitioner.get
    }
    if (rdd.context.conf.contains("spark.default.parallelism")) {
      new HashPartitioner(rdd.context.defaultParallelism)
    } else {
      new HashPartitioner(bySize.head.partitions.size)
    }
  }
}

 

9.Seq()操作

在上面的代码中,有Seq(rdd)方法调用,它的含义是将参数rdd集合化,即Seq(rdd)表示仅有一个元素rdd的集合,这个集合有个方法++,Seq(rdd) ++ others是将两个集合中的元素进行合并成一个集合,++也是两个集合的取集合并集的操作

 

10.带if条件判断的for循环

 在8中,有个for循环语句,bySize是个集合,也就是for循环在遍历bySize(存入到r变量中),但是是否每次遍历都进入代码块可以在for循环中使用if语句进行指定。

 

    for (r <- bySize if r.partitioner.isDefined) {
      return r.partitioner.get
    }

 

分享到:
评论

相关推荐

    scala与spark基础

    1. Scala语法基础:包括变量声明、函数定义、类和对象、模式匹配、高阶函数等。 2. Spark核心概念:理解RDD、DataFrame/Dataset,以及如何进行转换和行动操作。 3. Spark SQL:学习如何使用DataFrame API进行数据...

    上手提示:使用新版IDEA+Maven+Scala编写Spark程序

    - **自动下载依赖的源代码**:IDEA的一个强大之处在于它能够自动下载依赖库的源代码,这对于调试和理解第三方库的工作原理非常有用。这一功能可以通过IDEA右下角的Maven配置界面来启用。 - **创建Scala Class**:在...

    Scala语法入门.pdf

    Scala的源代码会被编译成Java字节码,从而可以在JVM上运行。Scala支持静态类型检查,这有助于在编译期间捕获错误,类似于Java、C和C++等静态类型语言。同时,Scala也吸收了现代函数式编程语言的设计特点,包括不可变...

    spark项目代码以及数据

    - Scala/Java/Python/R源代码:实现Spark作业逻辑,包括数据读取、转换和写入操作。 - SQL查询:如果使用Spark SQL,可能包含创建DataFrame、执行SQL查询的代码。 - 测试用例:验证代码正确性的单元测试或集成测试。...

    scala sdk scala-2.12.3

    1. **编译器**:Scala编译器将Scala源代码转换为Java字节码,使得程序可以在JVM上运行。2.12.3版本的编译器支持最新的语言特性,并对错误报告和编译速度进行了改进。 2. **标准库**:Scala的标准库提供了大量的类和...

    windows版scala-2.11.12.zip

    这样,无论你在哪个目录下,都可以通过键入`scala`或`scalac`来启动Scala的交互式Shell或者编译Scala源代码。 Spark是一个用Scala编写的分布式计算框架,它利用Scala的简洁语法和强大的功能来构建大规模数据处理...

    scala-intellij-bin-2019.1.9.zip

    1. **语法高亮**:提供Scala源代码的色彩标记,使得代码更易读,有助于快速识别语句类型。 2. **代码补全**:在编写代码时,自动提示可能的函数、变量、类等,提高编码效率。 3. **错误检测**:实时检查代码错误,...

    scala核心编程总结

    3. **Packages(包)**:类似于Java中的包,Scala中的包用于组织源代码。通过使用包,开发者可以避免命名冲突,提高代码的组织性和可管理性。 4. **Data Structures(数据结构)**:Scala提供了一套丰富的内置数据...

    实时计算项目(Scala结合spark实现).zip

    1. **源代码**:项目的核心代码,使用Scala编写,可能包含了Spark Job的定义,以及数据处理逻辑。 2. **配置文件**:如`conf`目录下的配置文件,可能包含Spark的配置参数和环境变量。 3. **测试用例**:可能有单元...

    scala-2.11.12.rar

    这个版本的Scala对Spark开发者来说尤其重要,因为Spark是用Scala编写的,并且通常会指定一个兼容的Scala版本来确保代码的稳定性和性能。 Spark作为一个大数据处理框架,它主要由以下几个核心组件构成: 1. **Spark...

    scala-2.9.3.tgz

    Scala的语法简洁,能够更好地支持函数式编程,这使得Spark的代码更加简洁、可读性强,同时也更容易编写出并发和分布式程序。 在Spark集群中,主要组件包括: 1. **Spark Core**:这是Spark的基础,提供了分布式任务...

    scala编写,以spark stream方式监听消费kafka的topic,同步写入elasticsearch

    在IT行业中,大数据处理与实时分析是至关重要的领域,而Scala、Kafka、Spark和Elasticsearch这四个组件是构建高效数据流系统的关键技术。在这个项目中,我们使用Scala编程语言,结合Spark Streaming来监听和消费...

    基于spark的外卖大数据平台分析系统.zip

    Spark SQL提供了与SQL兼容的接口,使得数据分析师可以使用熟悉的SQL语法进行复杂的数据查询和分析。例如,可以分析用户购买习惯,找出最受欢迎的菜品和商家,或者挖掘用户的消费模式,为精准营销提供数据支持。 ...

    IDEA的scala插件

    1. **语法高亮与代码提示**:Scala插件为Scala源代码提供语法高亮显示,使代码更易于阅读。同时,它还提供了智能代码补全和错误检测,帮助开发者快速编写和修正代码。 2. **代码导航**:插件支持类、方法、变量等...

    intellij idea2018的scala编程插件

    1. **语法高亮**:Scala插件为Scala源代码提供颜色编码,使得代码更易读,更容易识别不同的语法元素。 2. **代码完成**:当你在编写代码时,插件会提供智能的代码补全建议,包括类、方法、变量等,极大地减少了手动...

    scala-2.12.8

    然后,他们可以开始学习Scala语法,创建Spark项目,并使用Maven或Sbt等构建工具管理依赖。对于初学者,了解Scala的基本概念如类、对象、特质、模式匹配、高阶函数和不可变数据结构非常重要。对于Spark开发,理解RDD...

    scala-2.11.8.tgz

    1. **Scala编译器**:`scala-2.11.8/bin/scala`,这是一个命令行工具,用于将Scala源代码编译成JVM可以执行的字节码。 2. **Scala解释器**:`scala-2.11.8/bin/scalac`,允许用户直接在命令行运行Scala代码,进行...

    scala 2.11.7 安装包

    总的来说,这个Scala 2.11.7安装包是学习大数据处理和Spark的一个良好起点,通过深入理解和实践,开发者可以掌握一种强大的工具,以应对日益增长的数据挑战。在学习过程中,建议结合实际案例和项目练习,以更好地...

    《scala编程》第3版英文版&第3版源码

    资源包含《Scala编程》第3版英文版和第3版源代码,Scala编程第3版,目前是最新版,支持Scala 2.11以上。《Scala编程》是Scala语言的创始人参与编写的,涵盖的语法特性非常全面,并且作者解释了为什么这么设计,有...

    scala:scala代码

    在"scala: scala代码"的描述中,我们可以推测这是一个关于Scala编程语言的学习资源或者项目源代码。 1. **面向对象编程**:Scala是基于Java虚拟机(JVM)的语言,因此它兼容Java的所有类库。它支持类、对象和继承等...

Global site tag (gtag.js) - Google Analytics