`

udaf 返回的 子属性

 
阅读更多


udaf 返回的 子属性

spark.sql("select createCrowdHllc(uuid,tmp_id,'crowdid_appid').uuiduv   from h5     ").show(10)

package cn.analysys.udf.crowd

import cn.analysys.batch.userprocess.HbaseInit
import cn.analysys.meta.MetaMapInfo
import cn.analysys.udf.utils.CommonUtils
import cn.analysys.udf.utils.CommonUtils.HbasePutArrayData
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import scala.collection.mutable

/*
* input:  uuid iterater
* output: Bytes[]
* */

class CreateCrowdHllc extends UserDefinedAggregateFunction {
  override def inputSchema: StructType = StructType(
    StructField("uuid", StringType, true) ::
    StructField("imeisi", StringType, true) ::
    StructField("crowdid", StringType, true) :: Nil)

  def bufferSchema: StructType = StructType(
    StructField("uuidByes", ArrayType(ByteType), true) ::
    StructField("imeiByes", ArrayType(ByteType), true) ::
    StructField("crowdid", StringType, true) :: Nil)

  override def dataType: DataType = StructType(
    StructField("uuiduv", LongType, true)
    ::StructField("imeiuv", LongType, true)
    :: Nil) //ArrayType(LongType)


  override def deterministic: Boolean = true

  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = Array.empty[ByteType]
    buffer(1) = Array.empty[ByteType]
    buffer(2) = ""
  }

  override def update(buffer: MutableAggregationBuffer, inputrow: Row): Unit = {
    //如果有特别大的 app,有这个函数,避免数据倾斜,大内存占用的问题。
    val hllcUuid = CommonUtils.getHllcFromByte(buffer.getAs[mutable.WrappedArray[Byte]](0).toArray[Byte])
    val hllcImei = CommonUtils.getHllcFromByte(buffer.getAs[mutable.WrappedArray[Byte]](1).toArray[Byte])
    hllcUuid.add(inputrow.getAs[String](0))
    hllcImei.add(inputrow.getAs[String](1))
    buffer(0) = CommonUtils.getByteFromHllc(hllcUuid)
    buffer(1) = CommonUtils.getByteFromHllc(hllcImei)
    buffer(2) = inputrow.getAs[String](2)
  }

  override def merge(buffer: MutableAggregationBuffer, buffer2: Row): Unit = {
    val hllcUuid = CommonUtils.getHllcFromByte(buffer.getAs[mutable.WrappedArray[Byte]](0).toArray[Byte])
    val hllcImei = CommonUtils.getHllcFromByte(buffer.getAs[mutable.WrappedArray[Byte]](1).toArray[Byte])
    val hllcUuid2 = CommonUtils.getHllcFromByte(buffer2.getAs[mutable.WrappedArray[Byte]](0).toArray[Byte])
    val hllcImei2 = CommonUtils.getHllcFromByte(buffer2.getAs[mutable.WrappedArray[Byte]](1).toArray[Byte])
    hllcUuid.merge(hllcUuid2)
    hllcImei.merge(hllcImei2)
    buffer(0) = CommonUtils.getByteFromHllc(hllcUuid)
    buffer(1) = CommonUtils.getByteFromHllc(hllcImei)
    buffer(2) = buffer2.getAs[String](2)
  }

  override def evaluate(buffer: Row): Any = {
    val hllcUuid = CommonUtils.getHllcFromByte(buffer.getAs[mutable.WrappedArray[Byte]](0).toArray[Byte])
    val hllcImei = CommonUtils.getHllcFromByte(buffer.getAs[mutable.WrappedArray[Byte]](1).toArray[Byte])
    val crowd_id = buffer.getAs[String](2)
    println(s"uuid uv:${hllcUuid.getCountEstimate} ; ")
    println(s"imei uv:${hllcImei.getCountEstimate} ; ")
    // put byte[] to hbase
    CommonUtils.putData(new HbasePutArrayData(HbaseInit.CROWD_INFO, crowd_id,
      Map(MetaMapInfo.QUALIFIER_CROWD_UUID -> CommonUtils.getByteFromHllc(hllcUuid))))
    CommonUtils.putData(new HbasePutArrayData(HbaseInit.CROWD_INFO, crowd_id,
      Map(MetaMapInfo.QUALIFIER_CROWD_IMEI -> CommonUtils.getByteFromHllc(hllcImei))))
    (hllcUuid.getCountEstimate,hllcImei.getCountEstimate)
  }






}
分享到:
评论

相关推荐

    hive udaf 实现按位取与或

    UDAF是一种特殊的用户自定义函数,它负责处理一组输入值并返回一个单一的输出值。与UDFs(用户自定义函数)不同,UDAF通常涉及多步操作,包括初始化、累积和最终化等阶段,常用于统计分析和复杂计算。 二、按位逻辑...

    数据架构师第015节UDAF实战:实现udaf第16节数据说明和重要操作演示.mp4

    数据架构师第015节UDAF实战:实现udaf第16节数据说明和重要操作演示.mp4

    Hive UDAF示例

    A custom UDAF to group oncatenates all arguments from different rows into a single string.

    doris-udaf 源码包

    【标题】"Doris-UDAF 源码解析" 在大数据处理领域,Apache Doris 是一款高效、易用的在线分析处理(OLAP)系统,适用于实时数据分析和报表查询。UDAF(User-Defined Aggregation Function)是 Doris 提供的一种用户...

    【SparkSql篇02】SparkSql之自定义UDF和UDAF函数1

    除了 UDF,SparkSQL 还支持用户定义聚合函数,用于对一组值进行计算,返回单个值。例如,我们可能需要计算 `price` 列的平均值,而内置的 `avg()` 函数无法满足特定需求时,可以自定义聚合函数。 1. **创建自定义...

    hive:个人配置单元 UDAF

    个人 Hive UDAF 有一堆 Hive UDAF(用户定义的聚合函数)不在标准 Hive 分布中,因为它们可能会导致大型数据集的 OOM。 要使用它们,您需要加载 jar 文件,然后为每个要使用的函数创建一个临时函数: ADD JAR target...

    young1lin#notes#使用 UDAF 计算两个数的线性回归系数1

    title: "使用 [通用] UDAF 计算两个数的线性回归系数"例如select s,regression(x,y) group by s;参考Hive U

    Hive收集的电子文档

    7. **存储过程(UDF/UDAF/UDTF)**:Hive支持用户自定义函数,包括UDF(单行函数)、UDAF(聚合函数)和UDTF(多行函数),以扩展其功能。 8. **Hive与MapReduce的关系**:Hive将HQL转换为一系列的MapReduce任务...

    自定义hive函数

    接着,UDAF(用户定义的多行函数)是处理多行数据并返回单个值的函数。例如,“根据ip获取区域码”可能需要一个UDAF,它接收一组IP地址,然后聚合这些IP到它们相应的地理区域。UDAF的实现会涉及到状态维护和聚合操作...

    mustached-hive-udfs:一些有用的 Hive UDF 和 UDAF

    这是一些有用的 Hive UDF 和 UDAF 的集合。 提供的功能 UDAF Mode ( de.frosner.hive.udaf.Mode ) - 计算组列的统计模式 从源头构建 git clone https://github.com/FRosner/mustached-hive-udfs.git cd mustached...

    Apache Hive Functions Cheat Sheet

    用户定义聚合函数(UDAF)聚合多行中的列值,并返回一个单一值,例如sum(c1)。用户定义表生成函数(UDTF)接受零个或多个输入,并产生多列或多行的输出,例如explode()。此外,宏则是使用其他Hive函数的函数。 在...

    hive函数.docx

    例如:`'foobar' LIKE 'foo%'` 返回`TRUE`,其中`%`表示零个或多个任意字符,`_`表示单个任意字符。 #### 1.2 算术运算符 算术运算符用于执行基本的数学计算。常见的算术运算符有: - **`+`**:加法 - **`-`**:...

    hive常用函数

    在本文中,我们将深入探讨Hive的常用函数,包括时间函数、类型转换函数、用户自定义函数(UDF)以及用户自定义聚合函数(UDAF)等。 关系运算符是Hive中的基础函数,用于比较两个值是否相等。例如,A=B用于判断A...

    Spark 1.X 大数据平台

    如果链接失效,请与我联系!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

    Clickhouse之自定义函数

    UDAF接收一组输入,处理后返回单个聚合结果。 - **UDTF (User-Defined Table-Generating Functions)**:用户自定义的一进多出的函数,可以将单行输入转换为多行输出,比如用于数据拆解或生成新的数据集。 2. **...

    blink_udx_3x-master.zip

    UDAF可以处理多行数据并返回单个结果,如求和、平均值等;UDTF则能产生多行输出,例如通过拆分字符串生成多行。 二、创建UDF 在"blink_udx_3x-master"中,我们能看到一个简单的UDF示例。创建UDF通常包括以下步骤: ...

    藏经阁-MaxCompute重磅发布.pdf

    UDTF可以返回多行或多列的结果,而UDAF则用于处理一组值并返回单个结果,比如求平均值、求和等。这两种函数在数据处理中非常常见,特别是在数据清洗、转换和统计分析中。 “高效安全兼容”这部分可能强调了...

    SQL3复习(新系统).rar_softulf_sql_sql3

    10. **子查询改进**:子查询可以嵌套在更复杂的查询中,提高了查询的复杂性和灵活性。 在SQL Server中,学习和应用这些SQL3特性将有助于提高数据库设计、性能优化和数据处理的效率。通过本教学练习,你将有机会实践...

    【官网汉化中文】Hive函数运算符使用方法大全

    hive所有函数 包括UDTs、UDAF、UDTF函数和运算符等,中文汉化,翻译并测试

    dataiku hive udf

    UDAF处理一组输入并返回单个值,比如SUM或AVG。UDTF则可以产生多行输出,例如explode函数用于拆分数组。 在"dataiku-hive-udf-master"这个压缩包中,我们可以期待找到各种不同类型的Hive UDF示例源代码。这可能包括...

Global site tag (gtag.js) - Google Analytics