`

struct streaming SQL udf udaf

 
阅读更多
object StructuredNetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: StructuredNetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    val host = args(0)
    val port = args(1).toInt

    val spark = SparkSession
      .builder
      .appName("StructuredNetworkWordCount")
          .config("spark.default.parallelism",3)
      .getOrCreate()

    import spark.implicits._

    // Create DataFrame representing the stream of input lines from connection to host:port
    val lines = spark.readStream
      .format("socket")
      .option("host", host)
      .option("port", port)
      .load()

    val words = lines.as[String]
      .map(x => {println("**:"+x);x.split(" ")})
      .filter(_.length == 3)
       .map(x => (x(0),new Timestamp(x(1).toLong),x(2)))
      .withColumnRenamed("_1","dim1").withColumnRenamed("_2","time").withColumnRenamed("_3","imeisi")
    words.printSchema()

    val wordCounts2 = words
      .withWatermark("time", "10 minutes")
    wordCounts2.registerTempTable("uv")
    spark.udf.register("doubleString",Utils.udfDoubleString _ )
    spark.udf.register("HLLCUDAFInt", new  HLLCUDAFInt()  )


    val wordCounts =  spark.sql(" select * from  (select time,doubleString(dim1) as aa, doubleString(dim1) as bb  ,HLLCUDAFInt(imeisi) as uv  from uv group by time,doubleString(dim1)) tampa ")
     // 如何拆新表, insert into ,其实不用, 用临时表 就ok , sql 负责一点.

    val query = wordCounts.writeStream
      .outputMode("update")
      .foreach(new ForeachWriter[Row] {
        override def process(value: Row): Unit = {
          println(s" ${value.getAs[String](0)}   ${value.getAs[String](1)}     ${value.getAs[String](2)}   ${value.getAs[Int](3)}   ")
        }
        override def close(errorOrNull: Throwable): Unit = {}
        override def open(partitionId: Long, version: Long): Boolean = true
      }).start()

    query.awaitTermination()
  }
}

class HLLCUDAFInt extends UserDefinedAggregateFunction{ //ctrl+I实现复写方法
    override def inputSchema: StructType = StructType(Array(StructField("input", StringType, true)))
    override def bufferSchema: StructType = StructType(Array(StructField("hllcbyte",BinaryType , true)))
    override def dataType: DataType = LongType
    override def deterministic: Boolean = true
    override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0)=  {
      val hllc = new HLLCounter(14)
      val bytes1 = ByteBuffer.allocate(hllc.maxLength())
      hllc.writeRegisters(bytes1)
      bytes1.array
    }}
    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
      val hllc = new HLLCounter(14)
      hllc.readRegisters(ByteBuffer.wrap(buffer.getAs[Array[Byte]](0)))
      hllc.add(input.getAs[String](0))
      val bytes1 = ByteBuffer.allocate(hllc.maxLength())
      hllc.writeRegisters(bytes1)
      buffer(0) =  bytes1.array
    }
    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
      val hllc = new HLLCounter(14)
      hllc.readRegisters(ByteBuffer.wrap(buffer1.getAs[Array[Byte]](0)))
      val hllc2 = new HLLCounter(14)
      hllc2.readRegisters(ByteBuffer.wrap(buffer2.getAs[Array[Byte]](0)))
      hllc.merge(hllc2)
      val bytes1 = ByteBuffer.allocate(hllc.maxLength())
      hllc.writeRegisters(bytes1)
      buffer1(0) =  bytes1.array
    }
    override def evaluate(buffer: Row): Any = {
      val hllc = new HLLCounter(14)
      hllc.readRegisters(ByteBuffer.wrap(buffer.getAs[Array[Byte]](0)))
      hllc.getCountEstimate
    }
  }






Aggregator 写法

class HllcdistinctValue extends Aggregator[Row, HLLCounter, Long] {
      // A zero value for this aggregation. Should satisfy the property that any b + zero = b
      def zero: HLLCounter = new HLLCounter()
      // Combine two values to produce a new value. For performance, the function may modify `buffer`
      // and return it instead of constructing a new object
      def reduce(buffer: HLLCounter, employee: Row): HLLCounter = {
        buffer.add(employee.getString(0))
    buffer
  }
  // Merge two intermediate values
  def merge(b1: HLLCounter, b2: HLLCounter): HLLCounter = {
    b1.merge(b2)
    b1
  }
  // Transform the output of the reduction
  def finish(reduction: HLLCounter): Long = reduction.getCountEstimate
  // Specifies the Encoder for the intermediate value type
  def bufferEncoder: Encoder[HLLCounter] = Encoders.javaSerialization
  // Specifies the Encoder for the final output value type
  def outputEncoder: Encoder[Long] = Encoders.scalaLong
}

用法:

   val averageSalary =  new  HllcdistinctValue().toColumn
    // words 已经为 dateframe 结构了.
    // Generate running word count
    val windowedCounts = words
      //.groupBy("word", "timestamp").count()
      .withWatermark("timestamp", "10 minutes")
      // .groupBy(window($"timestamp", windowDuration, slideDuration), $"word").agg(averageSalary)
      .groupBy(window($"timestamp", windowDuration, slideDuration)).agg(averageSalary)
分享到:
评论

相关推荐

    【SparkSql篇02】SparkSql之自定义UDF和UDAF函数1

    在本篇文章中,我们将深入探讨如何在 SparkSQL 中创建和使用自定义用户定义函数(UDF)和用户定义聚合函数(UDAF)。首先,我们需要理解 SparkSQL 的基本工作原理,它允许我们将 DataFrame 处理与 SQL 查询结合起来...

    Go-SQL2Struct一款根据sql语句自动生成golang结构体的chrome插件

    SQL2Struct是一款对golang开发者友好的chrome插件,根据在mysql中创建数据表的sql语句,自动生成golang中的struct,在golang开发者使用诸如gorm之类的框架时,可以很好的把mysql中的数据表与orm的结构体关联起来。

    Struct+SQL开发的源代码

    Struts+SQL开发的源代码是一套非常适合初学者学习的编程资源,主要涵盖了Structs、JSP、Servlet和SQL这四个关键的技术领域。这个压缩包中的"StrutsDB"很可能包含了一个基于Struts框架和数据库操作的应用实例,帮助...

    sql2struct:一个开发者友好的使用sql生成golang结构体的工具

    SQL2Struct SQL2Struct是一款对golang开发者友好的chrome插件,根据在mysql中创建数据表的sql语句,自动生成golang中的struct,在golang开发者使用诸如gorm之类的框架时,可以很好的把mysql中的数据表与orm的结构体...

    struct2+mysql

    【标题】"struct2+mysql"的描述指出,这是一个针对初学者的学习资源,结合了Eclipse、MySQL和Structs2这三个关键元素。Eclipse是一种广泛使用的集成开发环境(IDE),适用于多种编程语言,包括Java。MySQL是世界上最...

    sqlserver+struct宾馆信息管理系统

    在"sqlserver+struct宾馆信息管理系统"中,SQLServer 被用作核心数据存储和管理平台,负责处理宾馆的各种业务数据,如客房信息、客户预订、入住退房记录、账单结算等。 1. **关系型数据库模型**:SQLServer 采用...

    udf.rar_linux udf_udf_uuid

    这些函数可能会使用Linux内核提供的API来与UDF文件系统交互,例如调用`blkid`库或者直接操作`struct udf_fs_info`这样的结构体。 在Linux内核中,UDF文件系统的实现通常包括解析文件系统的元数据区,这包括Volume ...

    meddler:sql和struct之间的转换

    Meddler是一个小型工具包,可以消除在SQL查询和结构之间来回移动数据时的一些乏味。 它不是完整的ORM。 Meddler旨在以一种轻量级的方式来增加ORM的一些便利,同时将更多的控制权交给程序员。 软件包文档可在以下...

    sqlstruct:sqlstruct提供了一些方便的功能,可将结构体与go的databasesql包一起使用

    在Go语言中,数据库操作是开发过程中的常见任务,`sql`包是Go标准库提供的一个强大工具,用于处理SQL数据库。然而,直接使用`sql`包进行数据映射可能会显得繁琐,因为需要手动编写SQL语句并将结果映射到Go的结构体中...

    struct和typedef struct区别

    "struct和typedef struct的区别" 在编程语言中,struct和typedef struct都是用来定义结构体类型的,但它们之间存在一些关键的区别。 首先,在C语言中,定义一个结构体类型需要使用typedef关键字,例如: ```c ...

    golang struct 自动生成工具

    https://github.com/whr-helen/go-struct-auto 自动构建工具使用 安装包命令:go get github.com/whr-helen/go-struct-auto 注释:参数信息 -host host改为自己数据库的地址(默认127.0.0.1) -port port改为...

    运用struts1.2+hibernate+spring 框架完整购物商城项目(内含sql文件)

    一个J2EE购物网站的实现 运用struts1.2+hibernate+spring 框架,数据库连接池,事务管理;Struts 应用国际化,Struts 标签库与Tiles框架, JSTL标签库,Spring IOC。 采用优化性能技术,采用oscache缓存,freemarker静态...

    typedef struct 与 struct 的区别及初始化

    在C/C++编程语言中,`typedef` 和 `struct` 是两种不同的声明结构体类型的方式,它们各有特点,但也有一定的关联。理解这两者之间的差异以及如何初始化结构体是编程中非常重要的概念。 首先,`struct` 关键字用于...

    structtype&def_struct

    在编程语言中,`struct`关键字用于定义一种复合数据类型,它允许我们将多个不同类型的变量组合成一个单一的实体。在C和C++中,`struct`的使用方式略有不同,特别是在与`typedef`关键字结合使用时。 1. `struct`定义...

    OA 项目(struct+hibernet+sql),java jsp java内部培训时做的,很适合学习框架的同志参考

    本项目以“struct+hibernate+sql”为核心,结合Java、Ajax和JSP,为初学者提供了一个深入学习框架的实践平台。下面将详细介绍这些技术在OA项目中的应用及其重要性。 1. Struts(Structs)框架: Struts 是一个基于...

    personal sqlserver database struct

    "personal sqlserver database struct"这个标题暗示了我们即将探讨的是一个个人SQL Server数据库的架构和设计。下面将详细阐述SQL Server数据库的基础知识,以及如何构建和优化个人数据库结构。 一、SQL Server...

    xml和struct之间的相互转换

    XML(eXtensible Markup Language)和C语言中的struct是两种不同的数据表示方式。XML是一种用于标记数据的标准化格式,而struct是C语言中用来结构化数据的类型。在编程中,有时我们需要在两者之间进行转换,以实现...

    mapstruct-1.2.0.Final-API文档-中文版.zip

    赠送jar包:mapstruct-1.2.0.Final.jar; 赠送原API文档:mapstruct-1.2.0.Final-javadoc.jar; 赠送源代码:mapstruct-1.2.0.Final-sources.jar; 赠送Maven依赖信息文件:mapstruct-1.2.0.Final.pom; 包含翻译后...

    MapStruct 1.2.0 参考指南

    MapStruct是一款非常流行的Java注释处理器,主要用于简化Java对象(也称为Java Bean)之间的映射操作。它通过在编译时自动生成源对象到目标对象的映射代码,提高了开发效率并减少了手动编写映射代码时可能出现的错误...

    ip struct ip struct

    综上所述,通过对 `struct ip_hdr`、`struct ip` 和 `struct eth_hdr` 的分析,我们不仅了解了 IP 数据报首部的关键组成部分,还进一步掌握了以太网头部的结构和意义。这些知识点对于理解互联网工作原理以及进行网络...

Global site tag (gtag.js) - Google Analytics