`

test code2

 
阅读更多
package org.test.udf

import com.google.gson.{Gson, GsonBuilder}
import org.apache.spark.sql.Row
import org.apache.spark.sql.api.java.UDF2
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import scala.collection.mutable

/**
  * Created by yunzhi.lyz on 17-8-23.
  **/

object FunnelUdf {
 
  private[udf] var gson: Gson = new GsonBuilder().create

  class FunnelCalMerge extends UserDefinedAggregateFunction {
    override def inputSchema = StructType(StructField("item_id", ArrayType(LongType, false), true) ::
      StructField("item_timestamp", ArrayType(LongType, false), true) ::
      StructField("funnelDesc", StringType, true) ::
      StructField("windowLongernal", LongType, true) :: Nil)
    def bufferSchema: StructType = StructType(StructField("countArray", ArrayType(LongType, false), true) :: Nil)
    override def dataType: DataType = StructType(StructField("countArray", ArrayType(LongType, false), true) :: Nil)
    override def deterministic: Boolean = true
    override def initialize(buffer: MutableAggregationBuffer): Unit = {
      buffer(0) = Array.fill[Long](10)(0)
    }
    override def update(buffer: MutableAggregationBuffer, inputrow: Row): Unit = {
      var itemIDArray = inputrow.getAs[mutable.WrappedArray[Long]](0)
      var itemTimestamp = inputrow.getAs[mutable.WrappedArray[Long]](1)
      val funnelRoute = inputrow.getString(2).split(",").map(x => x.toLong)
      val windowLongernal = inputrow.getLong(3)
      var bufferValue = buffer.getAs[mutable.WrappedArray[Long]](0)
      val r = funnelProcess(itemIDArray.toArray[Long], itemTimestamp.toArray[Long], funnelRoute, windowLongernal)
      for (i <- 0 until funnelRoute.length if r(i)) bufferValue(i) = bufferValue(i) + 1
      buffer(0) = bufferValue
    }
    override def merge(buffer: MutableAggregationBuffer, buffer2: Row): Unit = {
      val r1 = buffer.getAs[mutable.WrappedArray[Long]](0)
      val r2 = buffer2.getAs[mutable.WrappedArray[Long]](0)
      for (i <- 0 until 10) r1(i) = r1(i) + r2(i)
      buffer(0) = r1
    }
    override def evaluate(buffer: Row): Any = buffer
  }

  class FunnelCal extends UserDefinedAggregateFunction {
    override def inputSchema: StructType = StructType(
      StructField("item_id", LongType, true) ::
        StructField("item_timestamp", LongType, true) :: Nil)
    def bufferSchema: StructType = StructType(StructField("item_id", ArrayType(LongType), true) :: StructField("item_timestamp", ArrayType(LongType), true) :: Nil)
    override def dataType: DataType = StructType(StructField("item_id", ArrayType(LongType), true) :: StructField("item_timestamp", ArrayType(LongType), true) :: Nil)
    override def deterministic: Boolean = true
    override def initialize(buffer: MutableAggregationBuffer): Unit = {
      buffer(0) = Array.empty[LongType]
      buffer(1) = Array.empty[LongType]
    }
    override def update(buffer: MutableAggregationBuffer, inputrow: Row): Unit = {
      buffer(0) = (buffer.getAs[mutable.WrappedArray[Long]](0)).toArray[Long].+:(inputrow.getAs[Long](0))
      buffer(1) = (buffer.getAs[mutable.WrappedArray[Long]](1)).toArray[Long].+:(inputrow.getAs[Long](1))
    }
    override def merge(buffer: MutableAggregationBuffer, buffer2: Row): Unit = {
      buffer(0) = (buffer.getAs[mutable.WrappedArray[Long]](0)).toArray[Long].++:((buffer2.getAs[mutable.WrappedArray[Long]](0)).toArray[Long])
      buffer(1) = (buffer.getAs[mutable.WrappedArray[Long]](1)).toArray[Long].++:((buffer2.getAs[mutable.WrappedArray[Long]](1)).toArray[Long])
    }
    override def evaluate(buffer: Row): Any = buffer
  }

  class FunnelCalMerge2 extends UserDefinedAggregateFunction {
    override def inputSchema = StructType(StructField("result", ArrayType(BooleanType), true) :: Nil)
    def bufferSchema: StructType = StructType(StructField("countArray", ArrayType(LongType, false), true) :: Nil)
    override def dataType: DataType = ArrayType(LongType, false)
    override def deterministic: Boolean = true
    override def initialize(buffer: MutableAggregationBuffer): Unit = {
      buffer(0) = Array.fill[Long](10)(0)
    }
    override def update(buffer: MutableAggregationBuffer, inputrow: Row): Unit = {
      var bufferValue = buffer.getAs[mutable.WrappedArray[Long]](0)
      val r = (inputrow.getAs[mutable.WrappedArray[Boolean]](0)).toArray[Boolean]
      for (i <- 0 until r.length if r(i)) bufferValue(i) = bufferValue(i) + 1
      buffer(0) = bufferValue
    }
    override def merge(buffer: MutableAggregationBuffer, buffer2: Row): Unit = {
      val r1 = buffer.getAs[mutable.WrappedArray[Long]](0)
      val r2 = buffer2.getAs[mutable.WrappedArray[Long]](0)
      for (i <- 0 until 10) r1(i) = r1(i) + r2(i)
      buffer(0) = r1
    }
    override def evaluate(buffer: Row): Any = buffer(0)
  }


  class FunnelCal2 extends UserDefinedAggregateFunction {
    override def inputSchema: StructType = StructType(
      StructField("item_id", LongType, true) ::
        StructField("item_timestamp", LongType, true) ::
        StructField("funnelDesc", StringType, true) ::
        StructField("windowLongernal", LongType, true) :: Nil)
    def bufferSchema: StructType = StructType(
      StructField("item_id", ArrayType(LongType), true) ::
        StructField("item_timestamp", ArrayType(LongType), true) ::
        StructField("funnelDesc", StringType, true) ::
        StructField("windowLongernal", LongType, true) :: Nil)
    override def dataType: DataType = ArrayType(BooleanType, false)
    override def deterministic: Boolean = true
    override def initialize(buffer: MutableAggregationBuffer): Unit = {
      buffer(0) = Array.empty[LongType]
      buffer(1) = Array.empty[LongType]
      buffer(2) = ""
      buffer(3) = 0l
    }
    override def update(buffer: MutableAggregationBuffer, inputrow: Row): Unit = {
      buffer(0) = (buffer.getAs[mutable.WrappedArray[Long]](0)).toArray[Long].+:(inputrow.getAs[Long](0))
      buffer(1) = (buffer.getAs[mutable.WrappedArray[Long]](1)).toArray[Long].+:(inputrow.getAs[Long](1))
      buffer(2) = inputrow.getString(2)
      buffer(3) = inputrow.getLong(3)
    }
    override def merge(buffer: MutableAggregationBuffer, buffer2: Row): Unit = {
      buffer(0) = (buffer.getAs[mutable.WrappedArray[Long]](0)).toArray[Long].++:((buffer2.getAs[mutable.WrappedArray[Long]](0)).toArray[Long])
      buffer(1) = (buffer.getAs[mutable.WrappedArray[Long]](1)).toArray[Long].++:((buffer2.getAs[mutable.WrappedArray[Long]](1)).toArray[Long])
      buffer(2) = buffer2.getString(2)
      buffer(3) = buffer2.getLong(3)
    }
    override def evaluate(buffer: Row): Any = {
      val funnelRoute = buffer.getString(2).split(",").map(x => x.toLong)
      val windowLongernal = buffer.getLong(3)
      val r = funnelProcess((buffer.getAs[mutable.WrappedArray[Long]](0)).toArray[Long],
        (buffer.getAs[mutable.WrappedArray[Long]](1)).toArray[Long], funnelRoute, windowLongernal)
      r
    }
  }

  class JsonInfoGet extends UDF2[String, String, String] {
    def call(jsonInfo: String, keyName: String): String = {
      var value: String = ""
      val map = CalJson.jsonToMap(jsonInfo, gson)
      if (map.containsKey(keyName))
        value = map.get(keyName).toString
      value
    }
  }

  def funnelProcess(dataItem: Array[Long], dataEventTime: Array[Long], rt: Array[Long], wd: Long): Array[Boolean] = {
    val result = Array.fill[Boolean](rt.length)(false)
    val data = dataItem.zip(dataEventTime)
    val sortData = data.sortBy(_._2)
    val indexArrayLength = rt.length - 1
    var startTimeArray = Array.fill[Long](rt.length)(0l)
    val indexMap = rt.map(item => item -> rt.indexOf(item)).toMap
    var notSatisfy = true
    for (itemWithTimeKv <- sortData if notSatisfy) {
      val itemIndex = indexMap(itemWithTimeKv._1)
      // first item
      if (itemIndex == 0) {
        startTimeArray(0) = itemWithTimeKv._2;
        result(0) = true
      } // pre item exists?
      else if (startTimeArray(itemIndex - 1) != 0) {
        // in range
        if ((itemWithTimeKv._2 - startTimeArray(itemIndex - 1)) < wd) {
          startTimeArray(itemIndex) = startTimeArray(itemIndex - 1)
          result(itemIndex) = true
          // out range
        } else
          startTimeArray(itemIndex - 1) = 0
      }
      if (result(indexArrayLength) == true) notSatisfy = false
    }
    result
  }


}
分享到:
评论

相关推荐

    CodeTest自动生成POC刷漏洞工具

    CodeTest是一款自动化安全测试工具,主要用于自动生成Proof of Concept(POC)来检测软件系统中的安全漏洞。在IT行业中,POC是证明一个安全漏洞确实存在的一种技术验证,它通常是一段代码或脚本,当运行时能复现漏洞...

    CodeTEST_native的使用

    CodeTEST_native是一款用于软件在电路(Software-In-Circuit, SWIC)测试的工具,尤其适用于嵌入式系统中的代码测试。本指南将详细介绍如何在主机平台上利用CodeTEST_native进行SWIC版测试,主要涵盖以下四个步骤: ...

    code test by about

    code test public partial class About : System.Web.UI.Page

    Test PHP Code

    Test PHP Code

    pyopengl testcode demo

    pyopengl testcode demopyopengl testcode demopyopengl testcode demopyopengl testcode demopyopengl testcode demo

    11-2 testcode

    11-2 testcode

    testcode图书管理系统样例

    《testcode图书管理系统样例》是一个专为演示和学习设计的软件系统,它展示了如何有效管理图书信息,包括书籍的录入、查询、借阅和归还等操作。在这个系统中,testcode扮演着核心角色,它是实现图书管理功能的关键...

    test-code-9.rar_testcode 实验

    "test-code-9.rar_testcode 实验"作为一个专为达盛ARM处理器编写的实验代码集合,为初学者提供了一个学习平台,用以加深对ARM架构的理解,锻炼编程技能。 达盛ARM,作为ARM处理器的一种变体,因其实用性和广泛的...

    GPSTEST android gpstest source code

    2. 工作流程: - GPS初始化:GPSTEST首先请求Android系统的Location Services权限,然后开启GPS硬件,等待卫星信号。 - 卫星搜索:应用通过监听卫星状态,获取可视卫星的数量,以及每个卫星的伪距和信号质量。 - ...

    interview-code test

    通过对“interview-code test”文档的部分内容分析,我们了解到这是一份与原材料库存管理和容量规划相关的文档。它不仅反映了企业在供应链管理方面的需求,同时也展示了IT技术在这一领域的广泛应用。对于IT行业的...

    test code project for UI

    "test code project for UI" 指的是一种专门针对UI进行的自动化测试项目,目的是为了验证应用程序的外观、响应速度、易用性以及各种交互元素是否符合设计规范和用户需求。通过编写测试代码,可以有效地减少手动测试...

    ISPorCOM test code

    本项目“ISPorCOM test code”旨在演示如何利用AVRUSB进行ISP(In-System Programming)和COM(串行通信)功能。下面我们将深入探讨这些关键知识点。 首先,ISP是嵌入式系统领域中的一种编程方式,它允许开发者在...

    codetest简介

    2. **CodeTEST Software-In-Circuit (SIC)**:用于在目标硬件上进行软件测试,但不直接连接到硬件。 3. **CodeTEST Hardware-In-Circuit (HIC)**:直接与目标硬件相连进行测试,适用于更复杂的情况。 #### 四、Code...

    嵌入式软件测试讲解及CodeTest测试工具介绍

    从给定的文件信息来看,主要内容集中在嵌入式软件测试及其相关的CodeTest测试工具的介绍上。嵌入式软件测试是确保嵌入式系统稳定性和可靠性的关键环节,而CodeTest作为一款专业的测试工具,其在嵌入式软件开发中的...

    嵌入式软件测试工具codetest

    **嵌入式软件测试工具 Codetest** Codetest 是一款专门针对嵌入式系统软件测试的高效工具套件,全球首款为解决嵌入式应用程序的测试挑战而设计。它提供了实时在线的解决方案,涵盖追踪、性能分析、代码覆盖率测试...

    current_test_code.rar_current te_current test code_current test手

    "current_test_code.rar_current te_current test code_current test手"这个标题暗示我们这里包含的是一份关于手机待机电流测试的代码或者程序,可能是用于自动化测试的脚本或应用。 描述中的“测试手机待机电流, ...

    ISO 230-2-2014 , Test Code for Machine Tools-4th Ed.pdf

    国际标准原文,国标依据此文件的第三版制订了《GBT 17421.2-2016 机床检验通则》。 这是第四版,高清英文版。 This part of ISO 230 specifies methods for testing and evaluating the accuracy and repeatability ...

Global site tag (gtag.js) - Google Analytics