- 浏览: 125268 次
- 性别:
- 来自: 杭州
文章分类
最新评论
udaf 返回的 子属性
spark.sql("select createCrowdHllc(uuid,tmp_id,'crowdid_appid').uuiduv from h5 ").show(10)
package cn.analysys.udf.crowd
import cn.analysys.batch.userprocess.HbaseInit
import cn.analysys.meta.MetaMapInfo
import cn.analysys.udf.utils.CommonUtils
import cn.analysys.udf.utils.CommonUtils.HbasePutArrayData
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import scala.collection.mutable
/*
* input: uuid iterater
* output: Bytes[]
* */
class CreateCrowdHllc extends UserDefinedAggregateFunction {
override def inputSchema: StructType = StructType(
StructField("uuid", StringType, true) ::
StructField("imeisi", StringType, true) ::
StructField("crowdid", StringType, true) :: Nil)
def bufferSchema: StructType = StructType(
StructField("uuidByes", ArrayType(ByteType), true) ::
StructField("imeiByes", ArrayType(ByteType), true) ::
StructField("crowdid", StringType, true) :: Nil)
override def dataType: DataType = StructType(
StructField("uuiduv", LongType, true)
::StructField("imeiuv", LongType, true)
:: Nil) //ArrayType(LongType)
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Array.empty[ByteType]
buffer(1) = Array.empty[ByteType]
buffer(2) = ""
}
override def update(buffer: MutableAggregationBuffer, inputrow: Row): Unit = {
//如果有特别大的 app,有这个函数,避免数据倾斜,大内存占用的问题。
val hllcUuid = CommonUtils.getHllcFromByte(buffer.getAs[mutable.WrappedArray[Byte]](0).toArray[Byte])
val hllcImei = CommonUtils.getHllcFromByte(buffer.getAs[mutable.WrappedArray[Byte]](1).toArray[Byte])
hllcUuid.add(inputrow.getAs[String](0))
hllcImei.add(inputrow.getAs[String](1))
buffer(0) = CommonUtils.getByteFromHllc(hllcUuid)
buffer(1) = CommonUtils.getByteFromHllc(hllcImei)
buffer(2) = inputrow.getAs[String](2)
}
override def merge(buffer: MutableAggregationBuffer, buffer2: Row): Unit = {
val hllcUuid = CommonUtils.getHllcFromByte(buffer.getAs[mutable.WrappedArray[Byte]](0).toArray[Byte])
val hllcImei = CommonUtils.getHllcFromByte(buffer.getAs[mutable.WrappedArray[Byte]](1).toArray[Byte])
val hllcUuid2 = CommonUtils.getHllcFromByte(buffer2.getAs[mutable.WrappedArray[Byte]](0).toArray[Byte])
val hllcImei2 = CommonUtils.getHllcFromByte(buffer2.getAs[mutable.WrappedArray[Byte]](1).toArray[Byte])
hllcUuid.merge(hllcUuid2)
hllcImei.merge(hllcImei2)
buffer(0) = CommonUtils.getByteFromHllc(hllcUuid)
buffer(1) = CommonUtils.getByteFromHllc(hllcImei)
buffer(2) = buffer2.getAs[String](2)
}
override def evaluate(buffer: Row): Any = {
val hllcUuid = CommonUtils.getHllcFromByte(buffer.getAs[mutable.WrappedArray[Byte]](0).toArray[Byte])
val hllcImei = CommonUtils.getHllcFromByte(buffer.getAs[mutable.WrappedArray[Byte]](1).toArray[Byte])
val crowd_id = buffer.getAs[String](2)
println(s"uuid uv:${hllcUuid.getCountEstimate} ; ")
println(s"imei uv:${hllcImei.getCountEstimate} ; ")
// put byte[] to hbase
CommonUtils.putData(new HbasePutArrayData(HbaseInit.CROWD_INFO, crowd_id,
Map(MetaMapInfo.QUALIFIER_CROWD_UUID -> CommonUtils.getByteFromHllc(hllcUuid))))
CommonUtils.putData(new HbasePutArrayData(HbaseInit.CROWD_INFO, crowd_id,
Map(MetaMapInfo.QUALIFIER_CROWD_IMEI -> CommonUtils.getByteFromHllc(hllcImei))))
(hllcUuid.getCountEstimate,hllcImei.getCountEstimate)
}
}
发表评论
-
Spark SQL运行 过程 抄的别人的,记录 学习
2018-05-13 23:07 1035抄的别人的,觉得写的特别好 val FILESOURCE ... -
thriftserver log4j.properties 生效
2018-04-09 11:46 453/home/isuhadoop/spark2/sbin/sta ... -
spark datasource
2018-03-16 16:36 669DataFrameWriter format val c ... -
如何 map 端 Join。
2018-03-04 19:31 627Hive 中 修改表的 rawDataSize = 1 1 ... -
spark thrift server 修改
2018-03-04 12:58 587org.apache.spark.sql.hive.thrif ... -
hive hbase thriftserver run
2018-03-03 15:13 415正确方法 : 0\ 拷贝对应目录到 spark2 jars ... -
scala package
2018-01-25 09:48 533#scala 打包 mvn clean scala:com ... -
SPARK SERVER
2018-01-23 22:15 554sbin/start-thriftserver.sh --dr ... -
driver class
2018-01-21 22:11 525sbin/start-thriftserver.sh -- ... -
spark thrift server 调试
2017-10-20 15:50 867spark-hive-thriftserver 本地调试 ... -
spark SQL conf
2017-10-18 14:36 624org.apache.spark.sql.internal.S ... -
java 死锁 ,内存问题 分析
2017-10-17 10:50 349jstack -l pid /opt/soft/jdk/ ... -
thriftServer proxy
2017-10-16 14:21 943sudo yum install haproxy 257 ... -
hive spark conf
2017-09-26 17:44 1299CREATE TABLE org_userbehavior_a ... -
get day
2017-09-19 08:41 570def timeDayNow() = { var ... -
thriftserver
2017-09-14 19:47 473export SPARK_CONF_DIR=/home/yun ... -
thriftserver dynamicallocation
2017-09-08 14:41 590./sbin/start-thriftserver.sh -- ... -
test code2
2017-09-03 13:45 492package org.test.udf import co ... -
test code
2017-08-24 17:52 290def taskcal(data:Array[(String, ... -
struct streaming SQL udf udaf
2017-08-22 11:50 680spark aggregator class H ...
相关推荐
UDAF是一种特殊的用户自定义函数,它负责处理一组输入值并返回一个单一的输出值。与UDFs(用户自定义函数)不同,UDAF通常涉及多步操作,包括初始化、累积和最终化等阶段,常用于统计分析和复杂计算。 二、按位逻辑...
数据架构师第015节UDAF实战:实现udaf第16节数据说明和重要操作演示.mp4
A custom UDAF to group oncatenates all arguments from different rows into a single string.
【标题】"Doris-UDAF 源码解析" 在大数据处理领域,Apache Doris 是一款高效、易用的在线分析处理(OLAP)系统,适用于实时数据分析和报表查询。UDAF(User-Defined Aggregation Function)是 Doris 提供的一种用户...
除了 UDF,SparkSQL 还支持用户定义聚合函数,用于对一组值进行计算,返回单个值。例如,我们可能需要计算 `price` 列的平均值,而内置的 `avg()` 函数无法满足特定需求时,可以自定义聚合函数。 1. **创建自定义...
个人 Hive UDAF 有一堆 Hive UDAF(用户定义的聚合函数)不在标准 Hive 分布中,因为它们可能会导致大型数据集的 OOM。 要使用它们,您需要加载 jar 文件,然后为每个要使用的函数创建一个临时函数: ADD JAR target...
title: "使用 [通用] UDAF 计算两个数的线性回归系数"例如select s,regression(x,y) group by s;参考Hive U
7. **存储过程(UDF/UDAF/UDTF)**:Hive支持用户自定义函数,包括UDF(单行函数)、UDAF(聚合函数)和UDTF(多行函数),以扩展其功能。 8. **Hive与MapReduce的关系**:Hive将HQL转换为一系列的MapReduce任务...
接着,UDAF(用户定义的多行函数)是处理多行数据并返回单个值的函数。例如,“根据ip获取区域码”可能需要一个UDAF,它接收一组IP地址,然后聚合这些IP到它们相应的地理区域。UDAF的实现会涉及到状态维护和聚合操作...
这是一些有用的 Hive UDF 和 UDAF 的集合。 提供的功能 UDAF Mode ( de.frosner.hive.udaf.Mode ) - 计算组列的统计模式 从源头构建 git clone https://github.com/FRosner/mustached-hive-udfs.git cd mustached...
用户定义聚合函数(UDAF)聚合多行中的列值,并返回一个单一值,例如sum(c1)。用户定义表生成函数(UDTF)接受零个或多个输入,并产生多列或多行的输出,例如explode()。此外,宏则是使用其他Hive函数的函数。 在...
例如:`'foobar' LIKE 'foo%'` 返回`TRUE`,其中`%`表示零个或多个任意字符,`_`表示单个任意字符。 #### 1.2 算术运算符 算术运算符用于执行基本的数学计算。常见的算术运算符有: - **`+`**:加法 - **`-`**:...
在本文中,我们将深入探讨Hive的常用函数,包括时间函数、类型转换函数、用户自定义函数(UDF)以及用户自定义聚合函数(UDAF)等。 关系运算符是Hive中的基础函数,用于比较两个值是否相等。例如,A=B用于判断A...
如果链接失效,请与我联系!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
UDAF接收一组输入,处理后返回单个聚合结果。 - **UDTF (User-Defined Table-Generating Functions)**:用户自定义的一进多出的函数,可以将单行输入转换为多行输出,比如用于数据拆解或生成新的数据集。 2. **...
UDAF可以处理多行数据并返回单个结果,如求和、平均值等;UDTF则能产生多行输出,例如通过拆分字符串生成多行。 二、创建UDF 在"blink_udx_3x-master"中,我们能看到一个简单的UDF示例。创建UDF通常包括以下步骤: ...
UDTF可以返回多行或多列的结果,而UDAF则用于处理一组值并返回单个结果,比如求平均值、求和等。这两种函数在数据处理中非常常见,特别是在数据清洗、转换和统计分析中。 “高效安全兼容”这部分可能强调了...
10. **子查询改进**:子查询可以嵌套在更复杂的查询中,提高了查询的复杂性和灵活性。 在SQL Server中,学习和应用这些SQL3特性将有助于提高数据库设计、性能优化和数据处理的效率。通过本教学练习,你将有机会实践...
hive所有函数 包括UDTs、UDAF、UDTF函数和运算符等,中文汉化,翻译并测试
UDAF处理一组输入并返回单个值,比如SUM或AVG。UDTF则可以产生多行输出,例如explode函数用于拆分数组。 在"dataiku-hive-udf-master"这个压缩包中,我们可以期待找到各种不同类型的Hive UDF示例源代码。这可能包括...