`
bit1129
  • 浏览: 1068082 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark九十】Spark定义计算逻辑函数最佳实践

 
阅读更多

这里所谓的Spark定义的计算逻辑函数指的是在Spark中,任务执行的计算逻辑都是定义在Driver Program的函数中的,由于Scala定义函数的多样性,因此有必要总结下各种情况下的函数定义,对Spark将函数序列化到计算节点(Worker)的影响

 

Spark建议的三种做法+一种不推荐的做法

1.定义内部函数常量

package spark.examples.rddapi

import org.apache.spark.{SparkContext, SparkConf}

object ReduceTest_20 {

  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("CoGroupTest_05")
    val sc = new SparkContext(conf);
    val z1 = sc.parallelize(List((3, "A"), (6, "B1"), (7, "Z1"), (9, "E"), (7, "F"), (9, "Y"), (77, "Z"), (31, "X")), 3)

    /**
     * Reduces the elements of this RDD using the specified commutative and
     * associative binary operator.
     */
    //r是结果不是集合,直接不是RDD
    def func(k1: (Int, String), k2: (Int, String)) = {
      (k1._1 + k2._1, k1._2 + k2._2)
    }

    //对RDD的元素类型不要求,不需要是KV类型
    val r = z1.reduce(func)
    println(r) //结果:(149,AB1Z1EFYZX),对二元组的第一个元素和第二个元素分别做累加操作
  }

}

 在上面这个例子定义了一个函数func,并且将它放到了main函数中作为一个局部变量,其实也可以把func定义为和main平级(此时func是个全局函数),这种全局函数的定义跟下面第三种定义函数的方式道理一样。

 

2. 定义函数字面量直接传递到RDD定义的高阶函数中、

package spark.examples.rddapi

import org.apache.spark.{SparkContext, SparkConf}

object ReduceTest_21 {

  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("CoGroupTest_05")
    val sc = new SparkContext(conf);
    val z1 = sc.parallelize(List((3, "A"), (6, "B1"), (7, "Z1"), (9, "E"), (7, "F"), (9, "Y"), (77, "Z"), (31, "X")), 3)

    /**
     * Reduces the elements of this RDD using the specified commutative and
     * associative binary operator.
     */
    //r是结果不是集合,直接不是RDD
    //对RDD的元素类型不要求,不需要是KV类型
    val r = z1.reduce((k1: (Int, String), k2: (Int, String)) =>(k1._1 + k2._1, k1._2 + k2._2))
    println(r) //结果:(149,AB1Z1EFYZX)
  }

}

 

3. 将函数计算逻辑作为全局函数定义到Scala object中

 

Scala object函数定义:

package spark.examples.rddapi

object ReduceTestFunctions {
  def compute(k1: (Int, String), k2: (Int, String)) = {
    (k1._1 + k2._1, k1._2 + k2._2)
  }
}

  

 

Spark程序中引用Scala object函数定义

package spark.examples.rddapi

import org.apache.spark.{SparkContext, SparkConf}

object ReduceTestFunctions_20 {

  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("CoGroupTest_05")
    val sc = new SparkContext(conf);
    val z1 = sc.parallelize(List((3, "A"), (6, "B1"), (7, "Z1"), (9, "E"), (7, "F"), (9, "Y"), (77, "Z"), (31, "X")), 3)

    /**
     * Reduces the elements of this RDD using the specified commutative and
     * associative binary operator.
     */
    //r是结果不是集合,直接不是RDD
    //对RDD的元素类型不要求,不需要是KV类型
    val r = z1.reduce(ReduceTestFunctions.compute(_, _))
    println(r) //结果:(149,AB1Z1EFYZX)
  }

}

 

说明:

通过在Scala object中定义函数,因为Scala object是单例的,那么在序列化时就不需要序列化这个object,仅仅把function序列化到Worker节点即可

 

4.在普通Scala类中定义函数(不推荐)

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

  • 大小: 42.3 KB
分享到:
评论

相关推荐

    spark期末复习题总结

    3. Spark的计算逻辑会被解析成DAG(有向无环图),这个解析操作由Driver完成。DAG是一种有向无环图,每个节点表示一个RDD操作,边表示操作之间的依赖关系。 4. Spark的优点包括基于内存计算,减少低效的磁盘交互、...

    spark大数据分析核心概念技术及实践

    通过阅读《Spark大数据分析核心概念技术及实践OCR》.pdf,你可以深入了解以上知识点,并结合实际案例进行学习,逐步掌握Spark在大数据分析中的应用。这个教程将帮助你快速上手,体验Spark带来的高效数据处理能力。

    spark考试练习题含答案.rar

    转换定义了数据处理逻辑,而动作触发实际计算。 二、Spark编程模型 1. **Spark Shell**:交互式的命令行工具,可用于测试和学习Spark。 2. **Spark Job**:一系列操作的集合,由SparkContext提交执行。 3. **...

    Spark开发指导文档

    2. 创建Spark应用:使用Scala编写Spark程序,创建SparkContext并定义数据处理逻辑。 3. 数据读写:使用Spark的DataFrame或RDD API读取HDFS、Hive、Cassandra等数据源,或者通过Spark SQL执行SQL查询。 4. 并行计算:...

    mastering-apache-spark最好的spark教程

    用户可以定义自己的函数(UDF),从而在SQL查询中使用自定义的逻辑。UDF可以增强Spark SQL的功能,使其能执行更复杂的计算。 12. Spark SQL中的数据聚合 数据聚合是指在一组数据上执行计算,如求和、平均等。Spark ...

    scala与spark基础

    在Spark中,Scala用于定义数据处理逻辑,通过RDD(弹性分布式数据集)或者DataFrame/Dataset API进行操作,这些API提供了丰富的转换和行动操作,如map、filter、reduce等,支持并行计算,极大地提高了处理速度。...

    spark core、spark sql以及spark streaming 的Scala、java项目混合框架搭建以及大数据案例

    在大数据处理领域,Spark作为一款高效、通用的计算框架,被广泛应用在数据分析、机器学习等多个场景。本项目涉及的核心知识点包括Spark Core、Spark SQL和Spark Streaming,同时结合了Scala和Java编程语言,以及...

    Spark python API 函数调用学习

    这些转换定义了数据处理逻辑,但不会立即执行,只有在触发行动操作时才会计算。 5. **行动(Actions)**:行动如`count()`, `collect()`, `save()`, `take()`等会触发计算并返回结果。`count()`返回RDD元素数量,`...

    spark学习文档.rar

    在学习这两个模块时,你需要理解DataFrame和Dataset API的基本操作,如创建、过滤、聚合,以及如何定义和使用窗口函数。同时,对于Spark Streaming,你需要掌握DStream的创建、转换和输出操作,以及如何设置和管理...

    Spark_SQL大数据实例开发教程.pdf by Spark_SQL大数据实例开发教程.pdf (z-lib.org)1

    6. **Spark SQL UDF与UDAF**:用户定义的函数(UDF)和用户定义的聚合函数(UDAF)让开发者可以自定义处理逻辑,这部分会介绍如何创建和使用它们。 7. **Thrift Server**:Thrift Server允许通过JDBC和ODBC接口访问...

    大数据spark交流SPARK 技术交流

    RDD的分区策略和依赖关系定义了任务的执行逻辑。 在Spark中,DAG(有向无环图)用于描述任务的执行流程。transformations产生的新RDD并不会立即计算,而是等到遇到actions时才会启动计算。依赖关系分为窄依赖和宽...

    spark 代码示例

    嵌套函数是 Scala 中的一种函数定义方式,允许在函数内部定义新的函数。嵌套函数可以提高代码的可读性和灵活性。 5.尾递归 尾递归是一种函数调用方式,允许函数调用自身,但不创建新的栈帧。尾递归可以提高代码的...

    Springboot 结合Apache Spark 2.4.4与Scala 2.12 集成示例

    在这个类中,我们将定义一个简单的Spark作业,例如读取一个文本文件,计算单词总数,然后输出结果。这里,我们利用Spark的`SparkSession`接口,它是Spark SQL和DataFrame API的入口点。 ```scala import org.apache...

    Spark机器学习模块源码解读

    总的来说,Spark机器学习模块的源码解读涉及了广泛的机器学习理论与实践,不仅涵盖了各种算法的实现,还包括了分布式系统的设计与优化。通过深入学习这些源码,开发者能够更好地理解Spark在机器学习中的工作方式,...

    Spark Ml 源码分析-LogistRegression.zip_Spark ML_spark_spark ML_spa

    在Spark MLlib中,逻辑回归的模型构建过程包括模型参数初始化、损失函数定义、优化算法选择以及模型训练。模型参数主要包括权重向量`weights`和截距`intercept`。损失函数通常采用对数似然损失,对于二分类问题,...

    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql

    在操作的过程中,初始值会分别参与seqOp和combOp的计算,因此聚合结果会受到初始值以及操作函数定义的影响。 具体到示例中,首先创建了一个包含整数的RDD,并对这个RDD应用了aggregate函数。示例中的初始值为0,...

    spark新手上路之源码解析.pdf

    RDD 之间的依赖关系定义了计算的执行顺序。常见的依赖类型有窄依赖(如 map)和宽依赖(如 shuffle)。窄依赖允许多个任务并行执行,而宽依赖需要等待所有前驱任务完成。 4. **转换操作的依赖图** 转换操作构建...

    实时计算项目(Scala结合spark实现).zip

    1. **源代码**:项目的核心代码,使用Scala编写,可能包含了Spark Job的定义,以及数据处理逻辑。 2. **配置文件**:如`conf`目录下的配置文件,可能包含Spark的配置参数和环境变量。 3. **测试用例**:可能有单元...

Global site tag (gtag.js) - Google Analytics