- 浏览: 126220 次
- 性别:
- 来自: 杭州
文章分类
最新评论
以后 为了操作的便利性, 把逻辑都包装成 udf ,udaf .
写一个 包装接口, 对一份数据的操作 ,
直接 在 repl 给 hdfs 加上 meta desc
在 repl 直接 写 sql . 出来的结果,直接拿 save .
可以积累 业务逻辑。 重用
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import org.apache.spark.{SparkContext, SparkConf}
import yunzhi.utils._
object wordCount {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("Spark SQL Data Soures Example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
val data = spark.sparkContext.textFile("testdata/c01/wc.txt") //读取文件
.flatMap(_.split(" "))
.map((_, 10))
data.cache()
// Rdd
data.reduceByKey(_ + _)
.collect()
.foreach(println) //word计数
// sql rdd => DataFrame 要加入implicits 转换
import spark.implicits._
val dataDF = data.toDF()
dataDF.createOrReplaceTempView("dataDF")
spark.sql("SELECT _1,sum(_2) as cnt FROM dataDF group by _1").show()
// register UDF
spark.sqlContext.udf.register("strLen", (s: String) => s.length())
spark.sql("SELECT strLen(_1),sum(_2) as cnt FROM dataDF group by _1").show()
//register UDAF wordCountUDAF(String)
spark.sqlContext.udf.register("wordCountUDAF", new wordCountUDAF)
spark.sql("SELECT strLen(_1),wordCountUDAF(_1) as cnt FROM dataDF group by _1").show()
//register UDAF sum(Int)
spark.sqlContext.udf.register("IntSumUDAF", new IntSumUDAF)
spark.sql("SELECT _1,wordCountUDAF(_1) as countcnt , IntSumUDAF(_2) as sumcnt FROM dataDF group by _1").show()
}
}
package yunzhi.utils
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
/**
* Created by l on 16-10-13.
*/
class UDAFUtil {
}
class wordCountUDAF extends UserDefinedAggregateFunction{ //ctrl+I实现复写方法
/**
* 该方法指定具体输入数据的类型 Array 类型可以输入 多个参数,定义多个 StructField , Array 格式 ,由于 sql 中,可能传入,多个列, 在 udaf 中 都是数组.
* @return
*/
override def inputSchema: StructType = StructType(Array(StructField("input", StringType, true),StructField("input", StringType, true)))
/**
* 在进行聚合操作的时候要处理的数据的结果的类型 Array 可以定义多个 StructField Array 格式
* @return
*/
override def bufferSchema: StructType = StructType(Array(StructField("count", IntegerType, true)))
/**
* 指定UDAF函数计算后返回的结果类型
* @return
*/
override def dataType: DataType = IntegerType
override def deterministic: Boolean = true
/**
* 在Aggregate之前每组数据的初始化结果
* @param buffer
*/
override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0)=0}
/**
* 在进行聚合的时候有新的值进来,对分组后的聚合如何进行计算
* 本地的聚合操作,相当于Hadoop MapReduce模型中的Combiner(这里的Row跟DataFrame的Row无关) worker 里面先计算
* @param buffer
* @param input
*/
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getAs[Int](0) + 1
}
/**
* 最后在分布式节点进行Local Reduce完成后需要进行全局级别的Merge操作 worker 之前的 计算
* @param buffer1
* @param buffer2
*/
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getAs[Int](0) + buffer2.getAs[Int](0)
}
/**
* 返回UDAF最后的计算结果
* @param buffer
* @return
*/
override def evaluate(buffer: Row): Any = buffer.getAs[Int](0)
}
// 和 aggrerate 函数的参数的行为 很类似 . initialize update merge ......
class IntSumUDAF extends UserDefinedAggregateFunction{ //ctrl+I实现复写方法
/**
* 该方法指定具体输入数据的类型
* @return
*/
override def inputSchema: StructType = StructType(Array(StructField("input", IntegerType, true)))
/**
* 在进行聚合操作的时候要处理的数据的结果的类型
* @return
*/
override def bufferSchema: StructType = StructType(Array(StructField("count", IntegerType, true)))
/**
* 指定UDAF函数计算后返回的结果类型
* @return
*/
override def dataType: DataType = IntegerType
override def deterministic: Boolean = true
/**
* 在Aggregate之前每组数据的初始化结果
* @param buffer
*/
override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0)=0}
/**
* 在进行聚合的时候有新的值进来,对分组后的聚合如何进行计算
* 本地的聚合操作,相当于Hadoop MapReduce模型中的Combiner(这里的Row跟DataFrame的Row无关) worker 里面先计算
* @param buffer
* @param input
*/
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getAs[Int](0) + input.getAs[Int](0)
}
/**
* 最后在分布式节点进行Local Reduce完成后需要进行全局级别的Merge操作 worker 之前的 计算
* @param buffer1
* @param buffer2
*/
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getAs[Int](0) + buffer2.getAs[Int](0)
}
/**
* 返回UDAF最后的计算结果
* @param buffer
* @return
*/
override def evaluate(buffer: Row): Any = buffer.getAs[Int](0)
}
写一个 包装接口, 对一份数据的操作 ,
直接 在 repl 给 hdfs 加上 meta desc
在 repl 直接 写 sql . 出来的结果,直接拿 save .
可以积累 业务逻辑。 重用
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import org.apache.spark.{SparkContext, SparkConf}
import yunzhi.utils._
object wordCount {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("Spark SQL Data Soures Example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
val data = spark.sparkContext.textFile("testdata/c01/wc.txt") //读取文件
.flatMap(_.split(" "))
.map((_, 10))
data.cache()
// Rdd
data.reduceByKey(_ + _)
.collect()
.foreach(println) //word计数
// sql rdd => DataFrame 要加入implicits 转换
import spark.implicits._
val dataDF = data.toDF()
dataDF.createOrReplaceTempView("dataDF")
spark.sql("SELECT _1,sum(_2) as cnt FROM dataDF group by _1").show()
// register UDF
spark.sqlContext.udf.register("strLen", (s: String) => s.length())
spark.sql("SELECT strLen(_1),sum(_2) as cnt FROM dataDF group by _1").show()
//register UDAF wordCountUDAF(String)
spark.sqlContext.udf.register("wordCountUDAF", new wordCountUDAF)
spark.sql("SELECT strLen(_1),wordCountUDAF(_1) as cnt FROM dataDF group by _1").show()
//register UDAF sum(Int)
spark.sqlContext.udf.register("IntSumUDAF", new IntSumUDAF)
spark.sql("SELECT _1,wordCountUDAF(_1) as countcnt , IntSumUDAF(_2) as sumcnt FROM dataDF group by _1").show()
}
}
package yunzhi.utils
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
/**
* Created by l on 16-10-13.
*/
class UDAFUtil {
}
class wordCountUDAF extends UserDefinedAggregateFunction{ //ctrl+I实现复写方法
/**
* 该方法指定具体输入数据的类型 Array 类型可以输入 多个参数,定义多个 StructField , Array 格式 ,由于 sql 中,可能传入,多个列, 在 udaf 中 都是数组.
* @return
*/
override def inputSchema: StructType = StructType(Array(StructField("input", StringType, true),StructField("input", StringType, true)))
/**
* 在进行聚合操作的时候要处理的数据的结果的类型 Array 可以定义多个 StructField Array 格式
* @return
*/
override def bufferSchema: StructType = StructType(Array(StructField("count", IntegerType, true)))
/**
* 指定UDAF函数计算后返回的结果类型
* @return
*/
override def dataType: DataType = IntegerType
override def deterministic: Boolean = true
/**
* 在Aggregate之前每组数据的初始化结果
* @param buffer
*/
override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0)=0}
/**
* 在进行聚合的时候有新的值进来,对分组后的聚合如何进行计算
* 本地的聚合操作,相当于Hadoop MapReduce模型中的Combiner(这里的Row跟DataFrame的Row无关) worker 里面先计算
* @param buffer
* @param input
*/
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getAs[Int](0) + 1
}
/**
* 最后在分布式节点进行Local Reduce完成后需要进行全局级别的Merge操作 worker 之前的 计算
* @param buffer1
* @param buffer2
*/
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getAs[Int](0) + buffer2.getAs[Int](0)
}
/**
* 返回UDAF最后的计算结果
* @param buffer
* @return
*/
override def evaluate(buffer: Row): Any = buffer.getAs[Int](0)
}
// 和 aggrerate 函数的参数的行为 很类似 . initialize update merge ......
class IntSumUDAF extends UserDefinedAggregateFunction{ //ctrl+I实现复写方法
/**
* 该方法指定具体输入数据的类型
* @return
*/
override def inputSchema: StructType = StructType(Array(StructField("input", IntegerType, true)))
/**
* 在进行聚合操作的时候要处理的数据的结果的类型
* @return
*/
override def bufferSchema: StructType = StructType(Array(StructField("count", IntegerType, true)))
/**
* 指定UDAF函数计算后返回的结果类型
* @return
*/
override def dataType: DataType = IntegerType
override def deterministic: Boolean = true
/**
* 在Aggregate之前每组数据的初始化结果
* @param buffer
*/
override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0)=0}
/**
* 在进行聚合的时候有新的值进来,对分组后的聚合如何进行计算
* 本地的聚合操作,相当于Hadoop MapReduce模型中的Combiner(这里的Row跟DataFrame的Row无关) worker 里面先计算
* @param buffer
* @param input
*/
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getAs[Int](0) + input.getAs[Int](0)
}
/**
* 最后在分布式节点进行Local Reduce完成后需要进行全局级别的Merge操作 worker 之前的 计算
* @param buffer1
* @param buffer2
*/
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getAs[Int](0) + buffer2.getAs[Int](0)
}
/**
* 返回UDAF最后的计算结果
* @param buffer
* @return
*/
override def evaluate(buffer: Row): Any = buffer.getAs[Int](0)
}
发表评论
-
Spark SQL运行 过程 抄的别人的,记录 学习
2018-05-13 23:07 1038抄的别人的,觉得写的特别好 val FILESOURCE ... -
thriftserver log4j.properties 生效
2018-04-09 11:46 455/home/isuhadoop/spark2/sbin/sta ... -
udaf 返回的 子属性
2018-03-20 13:22 448udaf 返回的 子属性 spark.sql(" ... -
spark datasource
2018-03-16 16:36 673DataFrameWriter format val c ... -
如何 map 端 Join。
2018-03-04 19:31 635Hive 中 修改表的 rawDataSize = 1 1 ... -
spark thrift server 修改
2018-03-04 12:58 591org.apache.spark.sql.hive.thrif ... -
hive hbase thriftserver run
2018-03-03 15:13 418正确方法 : 0\ 拷贝对应目录到 spark2 jars ... -
scala package
2018-01-25 09:48 538#scala 打包 mvn clean scala:com ... -
SPARK SERVER
2018-01-23 22:15 557sbin/start-thriftserver.sh --dr ... -
driver class
2018-01-21 22:11 528sbin/start-thriftserver.sh -- ... -
spark thrift server 调试
2017-10-20 15:50 869spark-hive-thriftserver 本地调试 ... -
spark SQL conf
2017-10-18 14:36 648org.apache.spark.sql.internal.S ... -
java 死锁 ,内存问题 分析
2017-10-17 10:50 356jstack -l pid /opt/soft/jdk/ ... -
thriftServer proxy
2017-10-16 14:21 950sudo yum install haproxy 257 ... -
hive spark conf
2017-09-26 17:44 1303CREATE TABLE org_userbehavior_a ... -
get day
2017-09-19 08:41 578def timeDayNow() = { var ... -
thriftserver
2017-09-14 19:47 482export SPARK_CONF_DIR=/home/yun ... -
thriftserver dynamicallocation
2017-09-08 14:41 595./sbin/start-thriftserver.sh -- ... -
test code2
2017-09-03 13:45 497package org.test.udf import co ... -
test code
2017-08-24 17:52 293def taskcal(data:Array[(String, ...
相关推荐
在本文中,将详细介绍Spark的RDD API,以及如何在RDD、DataFrame和Dataset之间相互转换,以及如何使用Spark SQL进行操作。 首先来看RDD API的聚合操作aggregate。aggregate函数是RDD API中非常重要的一个动作操作,...
spark rdd函数大全。spark rdd操作为core操作,虽然后续版本主要以dataset来操作,但是rdd操作也是不可忽略的一部分。
这是一个基于Scala语言开发的Spark RDD、Spark SQL、Spark Streaming相关Demo,包含35个文件。主要文件类型包括29个Scala源文件、2个Markdown文档、1个Reduced文件、1个XML文件、1个Java源文件和1个TXT文件。该项目...
Spark思维导图之Spark RDD.png
spark rdd相关操作详解;包括全部的操作说明和举例;
### Spark RDD论文中文版知识点详述 #### 一、引言 Spark RDD(Resilient Distributed Datasets)作为Apache Spark的核心组件之一,在大数据处理领域扮演着至关重要的角色。本论文旨在探讨Spark RDD的设计理念及其在...
本资源提供了一套基于Scala的Apache Spark相关RDD、SQL、Streaming Demos的设计源码,包含35个文件,其中包括29个Scala源代码文件,2个Markdown文档,1个Reduced文件,1个XML配置文件,1个Java源代码文件,以及1个...
【资源说明】 1、该资源内项目代码都是经过测试运行成功,功能正常的情况下才上传的,请放心下载使用。...电影评分数据汇总,(使用spark2.4+scala完成, 分析采用spark RDD的API. 数据集采用标准电影评分数据).zip
Spark RDD(弹性分布式数据集)是Apache Spark框架中的核心组件,它是Spark处理大规模数据的核心抽象。RDD代表了一种可容错、只读、分片的数据集合,这些分片可以分布在Spark集群的不同节点上。RDD的设计目标是提供...
此外,Spark 还提供了对 SQL 查询的支持以及机器学习库 MLlib。 - **用户案例与基准测试**:Spark 项目评估了多个用户应用程序和基准测试,证明了 RDD 在迭代算法和交互式数据挖掘方面的优势。例如,在 PageRank、K-...
课时3:Spark RDD操作 课时4:SparkRDD原理剖析 课时5:Spark2sql从mysql中导入 课时6:Spark1.6.2sql与mysql数据交互 课时7:SparkSQL java操作mysql数据 课时8:Spark统计用户的收藏转换率 课时9:Spark梳理...
Spark RDD 练习作业(选择部分数据(可以是自拟,可以是采集的,也可以是现有的),进行多角度数据统计及分析,并进行数据整合及展示(尽量多的运用 Spark RDD API)).zip Spark RDD 练习作业(选择部分数据(可以...
* intersection(otherDataset) 算子对源 RDD 和参数 RDD 求交集后返回一个新的 RDD。 * groupByKey([numTasks]) 算子将 RDD[key,value] 按照相同的 key 进行分组,形成 RDD[key,Iterable[value]] 的形式。 知识点三...
RDD(Resilient Distributed Dataset)是 Spark 中的核心概念。RDD 是一个只读、分区记录的集合,可以被理解为一个存储数据的数据结构。RDD 可以从以下几种方式创建: * 集合转换 * 从文件系统输入 * 从父 RDD 转换...
Spark RDD思维导图,xmind
spark RDD论文:Resilient distributed datasets: A fault-tolerant abstraction for in-memory cluster computing
SparkRDD 啊 啊啊
- **定义**:RDD(Resilient Distributed Dataset)即弹性分布式数据集,是Spark中最基本的数据抽象。它代表了一个不可变、可分区、且元素可以并行计算的数据集合。RDD具有以下特性:自动容错、位置感知性调度以及可...