`
Kevin12
  • 浏览: 234760 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Spark SQL内置函数应用

阅读更多
简单说明
    使用Spark SQL中的内置函数对数据进行分析,Spark SQL API不同的是,DataFrame中的内置函数操作的结果返回一个Column对象,而DataFrame天生就是“A distributed collection of data organized into named columns”,这就为数据的复杂分析建立了坚实的基础并提供了极大的方便性,例如说,我们在操作DataFrame的方法中可以随时调用内置函数进行业务需要的处理,这之于我们构建附件的业务逻辑而言是可以极大的减少不必须的时间消耗(基本上就是实际模型的映射),让我们聚焦在数据分析上,这对于提高工程师的生产力而言是非常有价值的;
Spark 1.5.x开始提供了大量的内置函数,例如agg:
def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = {
   groupBy().agg(aggExpr, aggExprs : _*)
}

还有max,mean,min,sum,ave,explode,size,sort_array,day,to_date,abs,acros,asin,atan等
总体而言,内置函数包含了五大基本类型:
1.聚合函数,例如countDistinct,sumDistinct等;
2.集合函数,例如sort_array,explode等;
3.日期,时间函数,例如hour,quarter,next_day等;
4.数学函数,例如asin,atan,sqrt,tan,round等;
5.开窗函数,例如rowNumber等;
6.字符串函数,例如concat,format_number,rexexp_extract(正则)等;
7.其他函数,isNaN,sha,randn,callUDF等;

package com.imf.spark.sql

import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.types.{StructType, IntegerType, StringType, StructField}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions._
/**
  * Description:使用Spark SQL中的内置函数对数据进行分析
  * Author:lujinyong168
  * Date:2016/4/14 21:09
  */
object SparkSQLAgg {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
    conf.setAppName("SparkSQLAgg for scala")
    conf.setMaster("local")

val sc = new SparkContext(conf)
//    val sqlContext  = new HiveContext(sc)//构建上下文
val sqlContext  = new SQLContext(sc)//构建上下文

    //要使用Spark SQL 的内置函数,就一定要导入SQLContext下的隐式转换
import sqlContext.implicits._
/**
      * 简单模拟电商访问的数据
      */
val userData = Array(
"2016-04-15,1001,http://spark.apache.org,1000",
"2016-04-15,1001,http://hadoop.apache.org,1001",
"2016-04-15,1002,http://fink.apache.org,1002",
"2016-04-16,1003,http://kafka.apache.org,1020",
"2016-04-16,1004,http://spark.apache.org,1010",
"2016-04-16,1002,http://hive.apache.org,1200",
"2016-04-16,1001,http://parquet.apache.org,1500",
"2016-04-16,1001,http://spark.apache.org,1800"
)

val userDataRDD = sc.parallelize(userData)//生成RDD分布式集合对象


/**
      * 根据业务需要对数据进行预处理生成DataFrame,要想把RDD转换成DataFrame,需要先把RDD中的元素类型变成Row类型
      * 与此同时要提供DataFrame中的Columns的元素信息描述
      */
val userDataRDDRow = userDataRDD.map(row=> {val splited = row.split(",");Row(splited(0),splited(1).toInt,splited(2),splited(3).toInt)})
val structTypes = StructType(Array(
StructField("date",StringType,true),
StructField("id",IntegerType,true),
StructField("url",StringType,true),
StructField("amount",IntegerType,true)
    ))
val userDataDF = sqlContext.createDataFrame(userDataRDDRow,structTypes)

/**
      * 使用Spark SQL提供的内置函数对DataFrame进行操作,特别注意:内置函数生成的Column对象且自动进行CG(code generation)
      * 所有的内置函数操作结果都会返回具体的列
      * DataFrame中的列可以动态增长
      */
//userDataDF.groupBy("date").agg('date,countDistinct('id)).show()
//    userDataDF.groupBy("date").agg('date,countDistinct('id)).map(row => Row(row(1),row(2))).collect.foreach(println)//数据量比较大时不能用collect
userDataDF.groupBy("date").agg('date,sum('amount)).show()//对销售额统计

}
}


执行结果

userDataDF.groupBy("date").agg('date,countDistinct('id)).show()
返回的结果:
+----------+----------+---------+
|      date|      date|count(id)|
+----------+----------+---------+
|2016-04-15|2016-04-15|        2|
|2016-04-16|2016-04-16|        4|
+----------+----------+---------+
userDataDF.groupBy("date").agg('date,countDistinct('id)).map(row => Row(row(1),row(2))).collect.foreach(println)//数据量比较大时不能用collect
返回的结果:
[2016-04-15,2]
[2016-04-16,4]
userDataDF.groupBy("date").agg('date,sum('amount)).show()//对销售额统计
返回的结果:
+----------+----------+-----------+
|      date|      date|sum(amount)|
+----------+----------+-----------+
|2016-04-15|2016-04-15|       3003|
|2016-04-16|2016-04-16|       6530|
+----------+----------+-----------+



分享到:
评论

相关推荐

    SparkSQL内置函数.pdf

    本文将详细介绍 Spark SQL 内置函数列表的使用方法和示例。 一、算术运算符 Spark SQL 内置函数列表中包括了多个算术运算符,用于对数据进行基本的数学运算。这些运算符包括: * %:返回 expr1/expr2 的余数。...

    Spark SQL操作JSON字段的小技巧

    Spark SQL是一款强大的大数据处理工具,它提供了对JSON数据的内置支持,使得在处理JSON格式的数据时更加便捷。本文将详细介绍Spark SQL操作JSON字段的几个关键函数:get_json_object、from_json 和 to_json,以及...

    Spark_SQL大数据实例开发教程.pdf by Spark_SQL大数据实例开发教程.pdf (z-lib.org)1

    5. **Spark SQL内置函数与窗口函数**:深入讲解Spark SQL的内置函数,包括聚合、转换、统计等,以及如何使用窗口函数进行复杂的时间序列分析。 6. **Spark SQL UDF与UDAF**:用户定义的函数(UDF)和用户定义的聚合...

    Spark SQL操作大全.zip

    - ** Catalyst优化器**:Spark SQL内置了Catalyst优化器,能够自动优化查询计划,提高执行效率。 - **代码生成**:Spark SQL会将DataFrame操作转化为高效的执行计划,利用代码生成技术减少运行时的反射开销。 - *...

    spark SQL应用解析

    Spark SQL是Apache Spark的一个核心组件,它集成了SQL查询和Spark的分布式计算能力,使得开发人员可以使用SQL或者DataFrame和DataSet API对大规模数据进行查询和分析。Spark SQL不仅支持标准的SQL语句,还提供了与...

    Learning Spark SQL_source_code - Aurobindo Sarkar

    3. **SQL支持**:介绍如何注册DataFrame为临时视图,然后通过SQL查询这些视图,以及如何使用Spark SQL的内置函数。 4. **数据源**:讲解如何读写各种数据源,如Parquet、JSON、CSV、Hive表等,以及自定义数据源的...

    spark-sql数据.rar

    Spark SQL提供了丰富的内置函数和算子,可用于数据清洗、转换和聚合。例如,`filter`用于筛选满足条件的行,`groupBy`和`agg`用于分组和聚合操作,`join`用于合并多个DataFrame,`window`函数用于窗口操作等。此外...

    mastring-spark-sql

    DataFrames是拥有特定模式的RDDs,它们是更高级别的抽象,并且使用Spark SQL的内置函数进行处理时,可以实现更佳的性能优化。Datasets是DataFrames之上的一个更进一步的抽象,它在Spark 1.6中引入,与DataFrames相比...

    Spark SQL最佳实践.pdf

    优化建议包括尽量减少UDF的使用,或者将耗时的操作通过Spark的内置函数来实现,以减少性能损失。 2. Metastore:Metastore在Spark中用于存储表结构和相关统计信息,这使得Spark能够利用这些信息来优化执行计划。...

    mastering-apache-spark最好的spark教程

    10. Spark SQL内置函数 Spark SQL提供了一系列内置的标准函数和聚合函数,用户可以直接使用这些函数来执行常见的数据处理任务,如日期和时间的处理、窗口函数等。 11. 用户自定义函数(UDF) 用户可以定义自己的...

    Spark SQL 在字节跳动的优化实践-郭俊

    - **代码优化**:避免在UDF(用户自定义函数)中进行复杂的计算,尽可能使用Spark内置函数,提高执行效率。 2. **硬件资源优化** - **内存管理**:合理配置executor内存,平衡计算和存储的需求,防止OOM(Out Of ...

    Spark SQL 日志处理实战,日志文件20000行

    为了更好地理解分析结果,可以使用DataFrame的内置功能或者连接外部可视化库(如matplotlib或seaborn)进行数据可视化。 **步骤6:保存结果** 最后,我们可以将分析结果保存为其他格式,如Parquet或CSV,方便后续...

    Spark从入门到精通

    DataFrame与RDD的两种转换方式,Spark SQL的内置函数、开窗函数、UDF、UDAF,Spark Streaming的Kafka Direct API、updateStateByKey、transform、滑动窗口、foreachRDD性能优化、与Spark SQL整合使用、持久化、...

    实验七:Spark初级编程实践

    在 Spark Shell 中,可以使用内置函数读取文件,如 `sc.textFile()`,并进行简单的数据分析。实验中统计了 `/home/hadoop/test.txt` 和 `/user/hadoop/test.txt` 文件的行数,这展示了 Spark 对文本数据的基本操作。...

    Hive内置函数速查表.pdf

    Hive提供了SQL类查询语言HiveQL,它可以将SQL语句转换为MapReduce、Tez或者Spark任务运行。对于开发者来说,Hive内置函数是其数据分析能力的有力工具,通过这些函数可以方便地对数据进行处理和计算。 在Hive中,...

    SparkSQL 使用SQLContext读取csv文件 分析数据 (含部分数据)

    Spark SQL提供了丰富的内置函数来支持这些操作。此外,由于`DataFrame`和`Dataset`的统一,Spark SQL还支持用户自定义函数(UDF),使得处理更加灵活。 对于博文链接中提到的资源,你可能可以找到更多关于如何利用...

    spark学习笔记

    通过定义结构体StructType并使用Spark SQL内置的函数,可以将日志数据转换成相应的DataFrame格式。在转换过程中,可以使用Spark SQL提供的转换工具类,定义输出字段,并实现日志数据解析的逻辑,从而将原始日志信息...

    Spark自定义UDF分析Uber数据-内含源码以及设计说明书(可以自己运行复现).zip

    UDFs是用户在Spark SQL中定义的、具有特定功能的函数,它们可以扩展Spark的内置函数库,处理复杂的数据转换和业务逻辑。 2. **自定义UDF的创建**: 在Scala或Python中,你可以定义一个普通函数,然后使用`spark....

    spark2.3.1-with-hive

    5. **Spark SQL 和 HiveQL 混合使用**:用户可以在同一个 Spark 应用程序中混合使用 SQL 和 HiveQL 查询,无缝切换,提升了开发效率。 然而,值得注意的是,压缩包名称为 "spark-2.3.1-bin-hadoop2-without-hive",...

Global site tag (gtag.js) - Google Analytics