- 浏览: 125376 次
- 性别:
- 来自: 杭州
文章分类
最新评论
spark aggregator
class HllcdistinctByte extends Aggregator[Row, HLLCounter, Array[Byte]] {
// A zero value for this aggregation. Should satisfy the property that any b + zero = b
def zero: HLLCounter = new HLLCounter(14)
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
def reduce(buffer: HLLCounter, employee: Row): HLLCounter = {
buffer.add(employee.getString(2))
buffer
}
// Merge two intermediate values
def merge(b1: HLLCounter, b2: HLLCounter): HLLCounter = {
b1.merge(b2)
b1
}
// Transform the output of the reduction
def finish(reduction: HLLCounter): Array[Byte] = {
val out1 = ByteBuffer.allocate(reduction.maxLength())
reduction.writeRegisters(out1)
out1.array()
}
// Specifies the Encoder for the intermediate value type
def bufferEncoder: Encoder[HLLCounter] = Encoders.javaSerialization
// Specifies the Encoder for the final output value type
def outputEncoder: Encoder[Array[Byte]] = Encoders.BINARY
}
使用:
val uvbytes = new HllcdistinctByte().toColumn
val uvb = wordsDataFrame.where("event_id = '2001'").groupByKey(_.getString(0)).agg(uvbytes)
但是 UDaf 的 Byte 的类型是固定的
class HLLCUDAFByte extends UserDefinedAggregateFunction{ //ctrl+I实现复写方法
override def inputSchema: StructType = StructType(Array(StructField("input", StringType, true)))
override def bufferSchema: StructType = StructType(Array(StructField("hllcbyte",BinaryType , true)))
override def dataType: DataType = BinaryType
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0)= {
val hllc = new HLLCounter(14)
val bytes1 = ByteBuffer.allocate(hllc.maxLength())
hllc.writeRegisters(bytes1)
bytes1.array
}}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val hllc = new HLLCounter(14)
hllc.readRegisters(ByteBuffer.wrap(buffer.getAs[Array[Byte]](0)))
hllc.add(input.getAs[String](0))
val bytes1 = ByteBuffer.allocate(hllc.maxLength())
hllc.writeRegisters(bytes1)
buffer(0) = bytes1.array
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val hllc = new HLLCounter(14)
hllc.readRegisters(ByteBuffer.wrap(buffer1.getAs[Array[Byte]](0)))
val hllc2 = new HLLCounter(14)
hllc2.readRegisters(ByteBuffer.wrap(buffer2.getAs[Array[Byte]](0)))
hllc.merge(hllc2)
val bytes1 = ByteBuffer.allocate(hllc.maxLength())
hllc.writeRegisters(bytes1)
buffer1(0) = bytes1.array
}
override def evaluate(buffer: Row): Any = buffer.getAs[Array[Byte]](0)
}
这里序列化 ,反序列化要很多资源。 如果自定义 类型 ?
spark.udf.register("HLLCUDAFInt", new HLLCUDAFInt() )
class HllcdistinctByte extends Aggregator[Row, HLLCounter, Array[Byte]] {
// A zero value for this aggregation. Should satisfy the property that any b + zero = b
def zero: HLLCounter = new HLLCounter(14)
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
def reduce(buffer: HLLCounter, employee: Row): HLLCounter = {
buffer.add(employee.getString(2))
buffer
}
// Merge two intermediate values
def merge(b1: HLLCounter, b2: HLLCounter): HLLCounter = {
b1.merge(b2)
b1
}
// Transform the output of the reduction
def finish(reduction: HLLCounter): Array[Byte] = {
val out1 = ByteBuffer.allocate(reduction.maxLength())
reduction.writeRegisters(out1)
out1.array()
}
// Specifies the Encoder for the intermediate value type
def bufferEncoder: Encoder[HLLCounter] = Encoders.javaSerialization
// Specifies the Encoder for the final output value type
def outputEncoder: Encoder[Array[Byte]] = Encoders.BINARY
}
使用:
val uvbytes = new HllcdistinctByte().toColumn
val uvb = wordsDataFrame.where("event_id = '2001'").groupByKey(_.getString(0)).agg(uvbytes)
但是 UDaf 的 Byte 的类型是固定的
class HLLCUDAFByte extends UserDefinedAggregateFunction{ //ctrl+I实现复写方法
override def inputSchema: StructType = StructType(Array(StructField("input", StringType, true)))
override def bufferSchema: StructType = StructType(Array(StructField("hllcbyte",BinaryType , true)))
override def dataType: DataType = BinaryType
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0)= {
val hllc = new HLLCounter(14)
val bytes1 = ByteBuffer.allocate(hllc.maxLength())
hllc.writeRegisters(bytes1)
bytes1.array
}}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val hllc = new HLLCounter(14)
hllc.readRegisters(ByteBuffer.wrap(buffer.getAs[Array[Byte]](0)))
hllc.add(input.getAs[String](0))
val bytes1 = ByteBuffer.allocate(hllc.maxLength())
hllc.writeRegisters(bytes1)
buffer(0) = bytes1.array
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val hllc = new HLLCounter(14)
hllc.readRegisters(ByteBuffer.wrap(buffer1.getAs[Array[Byte]](0)))
val hllc2 = new HLLCounter(14)
hllc2.readRegisters(ByteBuffer.wrap(buffer2.getAs[Array[Byte]](0)))
hllc.merge(hllc2)
val bytes1 = ByteBuffer.allocate(hllc.maxLength())
hllc.writeRegisters(bytes1)
buffer1(0) = bytes1.array
}
override def evaluate(buffer: Row): Any = buffer.getAs[Array[Byte]](0)
}
这里序列化 ,反序列化要很多资源。 如果自定义 类型 ?
spark.udf.register("HLLCUDAFInt", new HLLCUDAFInt() )
发表评论
-
Spark SQL运行 过程 抄的别人的,记录 学习
2018-05-13 23:07 1035抄的别人的,觉得写的特别好 val FILESOURCE ... -
thriftserver log4j.properties 生效
2018-04-09 11:46 453/home/isuhadoop/spark2/sbin/sta ... -
udaf 返回的 子属性
2018-03-20 13:22 445udaf 返回的 子属性 spark.sql(" ... -
spark datasource
2018-03-16 16:36 669DataFrameWriter format val c ... -
如何 map 端 Join。
2018-03-04 19:31 627Hive 中 修改表的 rawDataSize = 1 1 ... -
spark thrift server 修改
2018-03-04 12:58 587org.apache.spark.sql.hive.thrif ... -
hive hbase thriftserver run
2018-03-03 15:13 415正确方法 : 0\ 拷贝对应目录到 spark2 jars ... -
scala package
2018-01-25 09:48 534#scala 打包 mvn clean scala:com ... -
SPARK SERVER
2018-01-23 22:15 554sbin/start-thriftserver.sh --dr ... -
driver class
2018-01-21 22:11 525sbin/start-thriftserver.sh -- ... -
spark thrift server 调试
2017-10-20 15:50 868spark-hive-thriftserver 本地调试 ... -
spark SQL conf
2017-10-18 14:36 625org.apache.spark.sql.internal.S ... -
java 死锁 ,内存问题 分析
2017-10-17 10:50 351jstack -l pid /opt/soft/jdk/ ... -
thriftServer proxy
2017-10-16 14:21 945sudo yum install haproxy 257 ... -
hive spark conf
2017-09-26 17:44 1299CREATE TABLE org_userbehavior_a ... -
get day
2017-09-19 08:41 572def timeDayNow() = { var ... -
thriftserver
2017-09-14 19:47 473export SPARK_CONF_DIR=/home/yun ... -
thriftserver dynamicallocation
2017-09-08 14:41 591./sbin/start-thriftserver.sh -- ... -
test code2
2017-09-03 13:45 492package org.test.udf import co ... -
test code
2017-08-24 17:52 291def taskcal(data:Array[(String, ...
相关推荐
在本篇文章中,我们将深入探讨如何在 SparkSQL 中创建和使用自定义用户定义函数(UDF)和用户定义聚合函数(UDAF)。首先,我们需要理解 SparkSQL 的基本工作原理,它允许我们将 DataFrame 处理与 SQL 查询结合起来...
SQL2Struct是一款对golang开发者友好的chrome插件,根据在mysql中创建数据表的sql语句,自动生成golang中的struct,在golang开发者使用诸如gorm之类的框架时,可以很好的把mysql中的数据表与orm的结构体关联起来。
Struts+SQL开发的源代码是一套非常适合初学者学习的编程资源,主要涵盖了Structs、JSP、Servlet和SQL这四个关键的技术领域。这个压缩包中的"StrutsDB"很可能包含了一个基于Struts框架和数据库操作的应用实例,帮助...
SQL2Struct SQL2Struct是一款对golang开发者友好的chrome插件,根据在mysql中创建数据表的sql语句,自动生成golang中的struct,在golang开发者使用诸如gorm之类的框架时,可以很好的把mysql中的数据表与orm的结构体...
【标题】"struct2+mysql"的描述指出,这是一个针对初学者的学习资源,结合了Eclipse、MySQL和Structs2这三个关键元素。Eclipse是一种广泛使用的集成开发环境(IDE),适用于多种编程语言,包括Java。MySQL是世界上最...
在"sqlserver+struct宾馆信息管理系统"中,SQLServer 被用作核心数据存储和管理平台,负责处理宾馆的各种业务数据,如客房信息、客户预订、入住退房记录、账单结算等。 1. **关系型数据库模型**:SQLServer 采用...
这些函数可能会使用Linux内核提供的API来与UDF文件系统交互,例如调用`blkid`库或者直接操作`struct udf_fs_info`这样的结构体。 在Linux内核中,UDF文件系统的实现通常包括解析文件系统的元数据区,这包括Volume ...
Meddler是一个小型工具包,可以消除在SQL查询和结构之间来回移动数据时的一些乏味。 它不是完整的ORM。 Meddler旨在以一种轻量级的方式来增加ORM的一些便利,同时将更多的控制权交给程序员。 软件包文档可在以下...
在Go语言中,数据库操作是开发过程中的常见任务,`sql`包是Go标准库提供的一个强大工具,用于处理SQL数据库。然而,直接使用`sql`包进行数据映射可能会显得繁琐,因为需要手动编写SQL语句并将结果映射到Go的结构体中...
"struct和typedef struct的区别" 在编程语言中,struct和typedef struct都是用来定义结构体类型的,但它们之间存在一些关键的区别。 首先,在C语言中,定义一个结构体类型需要使用typedef关键字,例如: ```c ...
https://github.com/whr-helen/go-struct-auto 自动构建工具使用 安装包命令:go get github.com/whr-helen/go-struct-auto 注释:参数信息 -host host改为自己数据库的地址(默认127.0.0.1) -port port改为...
一个J2EE购物网站的实现 运用struts1.2+hibernate+spring 框架,数据库连接池,事务管理;Struts 应用国际化,Struts 标签库与Tiles框架, JSTL标签库,Spring IOC。 采用优化性能技术,采用oscache缓存,freemarker静态...
在C/C++编程语言中,`typedef` 和 `struct` 是两种不同的声明结构体类型的方式,它们各有特点,但也有一定的关联。理解这两者之间的差异以及如何初始化结构体是编程中非常重要的概念。 首先,`struct` 关键字用于...
在编程语言中,`struct`关键字用于定义一种复合数据类型,它允许我们将多个不同类型的变量组合成一个单一的实体。在C和C++中,`struct`的使用方式略有不同,特别是在与`typedef`关键字结合使用时。 1. `struct`定义...
本项目以“struct+hibernate+sql”为核心,结合Java、Ajax和JSP,为初学者提供了一个深入学习框架的实践平台。下面将详细介绍这些技术在OA项目中的应用及其重要性。 1. Struts(Structs)框架: Struts 是一个基于...
"personal sqlserver database struct"这个标题暗示了我们即将探讨的是一个个人SQL Server数据库的架构和设计。下面将详细阐述SQL Server数据库的基础知识,以及如何构建和优化个人数据库结构。 一、SQL Server...
XML(eXtensible Markup Language)和C语言中的struct是两种不同的数据表示方式。XML是一种用于标记数据的标准化格式,而struct是C语言中用来结构化数据的类型。在编程中,有时我们需要在两者之间进行转换,以实现...
赠送jar包:mapstruct-1.2.0.Final.jar; 赠送原API文档:mapstruct-1.2.0.Final-javadoc.jar; 赠送源代码:mapstruct-1.2.0.Final-sources.jar; 赠送Maven依赖信息文件:mapstruct-1.2.0.Final.pom; 包含翻译后...
综上所述,通过对 `struct ip_hdr`、`struct ip` 和 `struct eth_hdr` 的分析,我们不仅了解了 IP 数据报首部的关键组成部分,还进一步掌握了以太网头部的结构和意义。这些知识点对于理解互联网工作原理以及进行网络...
c++笔记struct和typedef struct彻底明白了 在 C++ 编程语言中,struct 和 typedef struct 是两个非常重要的概念。今天,我们将深入探讨这两个概念的区别和使用方法。 首先,让我们从基本概念开始。struct 是一种...