`

spark rdd dataset sql udf udaf

阅读更多
以后 为了操作的便利性, 把逻辑都包装成  udf ,udaf .  
写一个  包装接口, 对一份数据的操作  ,
直接 在 repl   给 hdfs 加上 meta  desc
在 repl  直接   写  sql .   出来的结果,直接拿 save .  
可以积累 业务逻辑。 重用






import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import org.apache.spark.{SparkContext, SparkConf}
import yunzhi.utils._


object wordCount {
  def main(args: Array[String]) {


    val spark = SparkSession
      .builder()
      .appName("Spark SQL Data Soures Example")
      .config("spark.some.config.option", "some-value")
      .getOrCreate()




    val data = spark.sparkContext.textFile("testdata/c01/wc.txt") //读取文件
.flatMap(_.split(" "))
      .map((_, 10))

    data.cache()


    // Rdd
data.reduceByKey(_ + _)
      .collect()
      .foreach(println) //word计数
    // sql  rdd => DataFrame  要加入implicits 转换
import spark.implicits._

    val dataDF =   data.toDF()
    dataDF.createOrReplaceTempView("dataDF")
    spark.sql("SELECT _1,sum(_2) as cnt  FROM dataDF group by _1").show()


    // register UDF
spark.sqlContext.udf.register("strLen", (s: String) => s.length())
    spark.sql("SELECT strLen(_1),sum(_2) as cnt  FROM dataDF group by _1").show()





    //register UDAF  wordCountUDAF(String)
spark.sqlContext.udf.register("wordCountUDAF", new wordCountUDAF)

    spark.sql("SELECT strLen(_1),wordCountUDAF(_1) as cnt  FROM dataDF group by _1").show()

    //register UDAF  sum(Int)
spark.sqlContext.udf.register("IntSumUDAF", new IntSumUDAF)
    spark.sql("SELECT _1,wordCountUDAF(_1) as countcnt , IntSumUDAF(_2) as sumcnt   FROM dataDF group by _1").show()



  }
}



package yunzhi.utils

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._

/**
  * Created by l on 16-10-13.
  */
class UDAFUtil {

}



class wordCountUDAF extends UserDefinedAggregateFunction{ //ctrl+I实现复写方法
/**
    * 该方法指定具体输入数据的类型   Array 类型可以输入 多个参数,定义多个 StructField   , Array 格式 ,由于 sql 中,可能传入,多个列, 在 udaf 中 都是数组.
    * @return
*/
override def inputSchema: StructType = StructType(Array(StructField("input", StringType, true),StructField("input", StringType, true)))
  /**
    * 在进行聚合操作的时候要处理的数据的结果的类型 Array 可以定义多个 StructField  Array 格式
    * @return
*/
override def bufferSchema: StructType = StructType(Array(StructField("count", IntegerType, true)))
  /**
    * 指定UDAF函数计算后返回的结果类型
    * @return
*/
override def dataType: DataType = IntegerType
  override def deterministic: Boolean = true
/**
    * 在Aggregate之前每组数据的初始化结果
    * @param buffer
*/
override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0)=0}
  /**
    * 在进行聚合的时候有新的值进来,对分组后的聚合如何进行计算
    * 本地的聚合操作,相当于Hadoop MapReduce模型中的Combiner(这里的Row跟DataFrame的Row无关)   worker 里面先计算
    * @param buffer
* @param input
*/
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = buffer.getAs[Int](0) + 1
}
  /**
    * 最后在分布式节点进行Local Reduce完成后需要进行全局级别的Merge操作   worker 之前的 计算
    * @param buffer1
* @param buffer2
*/
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getAs[Int](0) + buffer2.getAs[Int](0)
  }
  /**
    *  返回UDAF最后的计算结果
    * @param buffer
* @return
*/
override def evaluate(buffer: Row): Any = buffer.getAs[Int](0)
}






// 和  aggrerate 函数的参数的行为 很类似   .   initialize update   merge ......
class IntSumUDAF extends UserDefinedAggregateFunction{ //ctrl+I实现复写方法
/**
    * 该方法指定具体输入数据的类型
    * @return
*/
override def inputSchema: StructType = StructType(Array(StructField("input", IntegerType, true)))
  /**
    * 在进行聚合操作的时候要处理的数据的结果的类型
    * @return
*/
override def bufferSchema: StructType = StructType(Array(StructField("count", IntegerType, true)))
  /**
    * 指定UDAF函数计算后返回的结果类型
    * @return
*/
override def dataType: DataType = IntegerType
  override def deterministic: Boolean = true
/**
    * 在Aggregate之前每组数据的初始化结果
    * @param buffer
*/
override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0)=0}
  /**
    * 在进行聚合的时候有新的值进来,对分组后的聚合如何进行计算
    * 本地的聚合操作,相当于Hadoop MapReduce模型中的Combiner(这里的Row跟DataFrame的Row无关)   worker 里面先计算
    * @param buffer
* @param input
*/
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = buffer.getAs[Int](0) + input.getAs[Int](0)
  }
  /**
    * 最后在分布式节点进行Local Reduce完成后需要进行全局级别的Merge操作   worker 之前的 计算
    * @param buffer1
* @param buffer2
*/
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getAs[Int](0) + buffer2.getAs[Int](0)
  }
  /**
    *  返回UDAF最后的计算结果
    * @param buffer
* @return
*/
override def evaluate(buffer: Row): Any = buffer.getAs[Int](0)



}




分享到:
评论

相关推荐

    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql

    在本文中,将详细介绍Spark的RDD API,以及如何在RDD、DataFrame和Dataset之间相互转换,以及如何使用Spark SQL进行操作。 首先来看RDD API的聚合操作aggregate。aggregate函数是RDD API中非常重要的一个动作操作,...

    sparkRDD函数大全

    spark rdd函数大全。spark rdd操作为core操作,虽然后续版本主要以dataset来操作,但是rdd操作也是不可忽略的一部分。

    基于Scala的Spark RDD、Spark SQL、Spark Streaming相关Demo设计源码

    这是一个基于Scala语言开发的Spark RDD、Spark SQL、Spark Streaming相关Demo,包含35个文件。主要文件类型包括29个Scala源文件、2个Markdown文档、1个Reduced文件、1个XML文件、1个Java源文件和1个TXT文件。该项目...

    Spark思维导图之Spark RDD.png

    Spark思维导图之Spark RDD.png

    spark rdd 操作详解

    spark rdd相关操作详解;包括全部的操作说明和举例;

    spark RDD 论文 中文版

    ### Spark RDD论文中文版知识点详述 #### 一、引言 Spark RDD(Resilient Distributed Datasets)作为Apache Spark的核心组件之一,在大数据处理领域扮演着至关重要的角色。本论文旨在探讨Spark RDD的设计理念及其在...

    spark RDD操作详解

    ### Spark RDD操作详解 #### 一、RDD概念与特性 **RDD(弹性分布式数据集)**是Apache Spark的核心抽象,代表一种不可变的、可分区的、能够进行并行操作的数据集合。它提供了丰富的API来支持高效的大规模数据处理...

    基于Scala的Apache Spark相关RDD、SQL、Streaming Demos设计源码

    本资源提供了一套基于Scala的Apache Spark相关RDD、SQL、Streaming Demos的设计源码,包含35个文件,其中包括29个Scala源代码文件,2个Markdown文档,1个Reduced文件,1个XML配置文件,1个Java源代码文件,以及1个...

    电影评分数据汇总(使用spark2.4+scala, 分析采用spark RDD的API. 数据集采用标准电影评分数据).zip

    【资源说明】 1、该资源内项目代码都是经过测试运行成功,功能正常的情况下才上传的,请放心下载使用。...电影评分数据汇总,(使用spark2.4+scala完成, 分析采用spark RDD的API. 数据集采用标准电影评分数据).zip

    spark rdd 论文翻译_中文_spark老汤

    Spark RDD(弹性分布式数据集)是Apache Spark框架中的核心组件,它是Spark处理大规模数据的核心抽象。RDD代表了一种可容错、只读、分片的数据集合,这些分片可以分布在Spark集群的不同节点上。RDD的设计目标是提供...

    spark RDD 论文

    此外,Spark 还提供了对 SQL 查询的支持以及机器学习库 MLlib。 - **用户案例与基准测试**:Spark 项目评估了多个用户应用程序和基准测试,证明了 RDD 在迭代算法和交互式数据挖掘方面的优势。例如,在 PageRank、K-...

    Spark RDD 练习作业(选择部分数据(可以是自拟,可以是采集的,也可以是现有的),进行多角度数据统计及分析).zip

    Spark RDD 练习作业(选择部分数据(可以是自拟,可以是采集的,也可以是现有的),进行多角度数据统计及分析,并进行数据整合及展示(尽量多的运用 Spark RDD API)).zip Spark RDD 练习作业(选择部分数据(可以...

    大数据实验报告Windows环境下安装Spark及RDD编程和Spark编程实现wordcount.doc

    * intersection(otherDataset) 算子对源 RDD 和参数 RDD 求交集后返回一个新的 RDD。 * groupByKey([numTasks]) 算子将 RDD[key,value] 按照相同的 key 进行分组,形成 RDD[key,Iterable[value]] 的形式。 知识点三...

    spark rdd 实战 ,基本语法

    RDD(Resilient Distributed Dataset)是 Spark 中的核心概念。RDD 是一个只读、分区记录的集合,可以被理解为一个存储数据的数据结构。RDD 可以从以下几种方式创建: * 集合转换 * 从文件系统输入 * 从父 RDD 转换...

    SparkRDD.xmind

    Spark RDD思维导图,xmind

    Spark RDD 论文原文

    spark RDD论文:Resilient distributed datasets: A fault-tolerant abstraction for in-memory cluster computing

    SparkRDD 啊 啊啊

    SparkRDD 啊 啊啊

    sparkrdd的讲解

    - **定义**:RDD(Resilient Distributed Dataset)即弹性分布式数据集,是Spark中最基本的数据抽象。它代表了一个不可变、可分区、且元素可以并行计算的数据集合。RDD具有以下特性:自动容错、位置感知性调度以及可...

    Spark RDD以及其特性.rar_RDD_Spark!_parallelbwz_spark_特性

    Spark的弹性分布式数据集(Resilient Distributed Datasets, 简称RDD)是其核心的数据抽象,也是Spark实现高效并行计算的关键所在。在理解RDD及其特性之前,我们需要先了解Spark的基本工作原理。Spark是一种基于内存...

Global site tag (gtag.js) - Google Analytics