- 浏览: 126100 次
- 性别:
- 来自: 杭州
文章分类
最新评论
package org.test.udf
import com.google.gson.{Gson, GsonBuilder}
import org.apache.spark.sql.Row
import org.apache.spark.sql.api.java.UDF2
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import scala.collection.mutable
/**
* Created by yunzhi.lyz on 17-8-23.
**/
object FunnelUdf {
private[udf] var gson: Gson = new GsonBuilder().create
class FunnelCalMerge extends UserDefinedAggregateFunction {
override def inputSchema = StructType(StructField("item_id", ArrayType(LongType, false), true) ::
StructField("item_timestamp", ArrayType(LongType, false), true) ::
StructField("funnelDesc", StringType, true) ::
StructField("windowLongernal", LongType, true) :: Nil)
def bufferSchema: StructType = StructType(StructField("countArray", ArrayType(LongType, false), true) :: Nil)
override def dataType: DataType = StructType(StructField("countArray", ArrayType(LongType, false), true) :: Nil)
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Array.fill[Long](10)(0)
}
override def update(buffer: MutableAggregationBuffer, inputrow: Row): Unit = {
var itemIDArray = inputrow.getAs[mutable.WrappedArray[Long]](0)
var itemTimestamp = inputrow.getAs[mutable.WrappedArray[Long]](1)
val funnelRoute = inputrow.getString(2).split(",").map(x => x.toLong)
val windowLongernal = inputrow.getLong(3)
var bufferValue = buffer.getAs[mutable.WrappedArray[Long]](0)
val r = funnelProcess(itemIDArray.toArray[Long], itemTimestamp.toArray[Long], funnelRoute, windowLongernal)
for (i <- 0 until funnelRoute.length if r(i)) bufferValue(i) = bufferValue(i) + 1
buffer(0) = bufferValue
}
override def merge(buffer: MutableAggregationBuffer, buffer2: Row): Unit = {
val r1 = buffer.getAs[mutable.WrappedArray[Long]](0)
val r2 = buffer2.getAs[mutable.WrappedArray[Long]](0)
for (i <- 0 until 10) r1(i) = r1(i) + r2(i)
buffer(0) = r1
}
override def evaluate(buffer: Row): Any = buffer
}
class FunnelCal extends UserDefinedAggregateFunction {
override def inputSchema: StructType = StructType(
StructField("item_id", LongType, true) ::
StructField("item_timestamp", LongType, true) :: Nil)
def bufferSchema: StructType = StructType(StructField("item_id", ArrayType(LongType), true) :: StructField("item_timestamp", ArrayType(LongType), true) :: Nil)
override def dataType: DataType = StructType(StructField("item_id", ArrayType(LongType), true) :: StructField("item_timestamp", ArrayType(LongType), true) :: Nil)
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Array.empty[LongType]
buffer(1) = Array.empty[LongType]
}
override def update(buffer: MutableAggregationBuffer, inputrow: Row): Unit = {
buffer(0) = (buffer.getAs[mutable.WrappedArray[Long]](0)).toArray[Long].+:(inputrow.getAs[Long](0))
buffer(1) = (buffer.getAs[mutable.WrappedArray[Long]](1)).toArray[Long].+:(inputrow.getAs[Long](1))
}
override def merge(buffer: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer(0) = (buffer.getAs[mutable.WrappedArray[Long]](0)).toArray[Long].++:((buffer2.getAs[mutable.WrappedArray[Long]](0)).toArray[Long])
buffer(1) = (buffer.getAs[mutable.WrappedArray[Long]](1)).toArray[Long].++:((buffer2.getAs[mutable.WrappedArray[Long]](1)).toArray[Long])
}
override def evaluate(buffer: Row): Any = buffer
}
class FunnelCalMerge2 extends UserDefinedAggregateFunction {
override def inputSchema = StructType(StructField("result", ArrayType(BooleanType), true) :: Nil)
def bufferSchema: StructType = StructType(StructField("countArray", ArrayType(LongType, false), true) :: Nil)
override def dataType: DataType = ArrayType(LongType, false)
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Array.fill[Long](10)(0)
}
override def update(buffer: MutableAggregationBuffer, inputrow: Row): Unit = {
var bufferValue = buffer.getAs[mutable.WrappedArray[Long]](0)
val r = (inputrow.getAs[mutable.WrappedArray[Boolean]](0)).toArray[Boolean]
for (i <- 0 until r.length if r(i)) bufferValue(i) = bufferValue(i) + 1
buffer(0) = bufferValue
}
override def merge(buffer: MutableAggregationBuffer, buffer2: Row): Unit = {
val r1 = buffer.getAs[mutable.WrappedArray[Long]](0)
val r2 = buffer2.getAs[mutable.WrappedArray[Long]](0)
for (i <- 0 until 10) r1(i) = r1(i) + r2(i)
buffer(0) = r1
}
override def evaluate(buffer: Row): Any = buffer(0)
}
class FunnelCal2 extends UserDefinedAggregateFunction {
override def inputSchema: StructType = StructType(
StructField("item_id", LongType, true) ::
StructField("item_timestamp", LongType, true) ::
StructField("funnelDesc", StringType, true) ::
StructField("windowLongernal", LongType, true) :: Nil)
def bufferSchema: StructType = StructType(
StructField("item_id", ArrayType(LongType), true) ::
StructField("item_timestamp", ArrayType(LongType), true) ::
StructField("funnelDesc", StringType, true) ::
StructField("windowLongernal", LongType, true) :: Nil)
override def dataType: DataType = ArrayType(BooleanType, false)
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Array.empty[LongType]
buffer(1) = Array.empty[LongType]
buffer(2) = ""
buffer(3) = 0l
}
override def update(buffer: MutableAggregationBuffer, inputrow: Row): Unit = {
buffer(0) = (buffer.getAs[mutable.WrappedArray[Long]](0)).toArray[Long].+:(inputrow.getAs[Long](0))
buffer(1) = (buffer.getAs[mutable.WrappedArray[Long]](1)).toArray[Long].+:(inputrow.getAs[Long](1))
buffer(2) = inputrow.getString(2)
buffer(3) = inputrow.getLong(3)
}
override def merge(buffer: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer(0) = (buffer.getAs[mutable.WrappedArray[Long]](0)).toArray[Long].++:((buffer2.getAs[mutable.WrappedArray[Long]](0)).toArray[Long])
buffer(1) = (buffer.getAs[mutable.WrappedArray[Long]](1)).toArray[Long].++:((buffer2.getAs[mutable.WrappedArray[Long]](1)).toArray[Long])
buffer(2) = buffer2.getString(2)
buffer(3) = buffer2.getLong(3)
}
override def evaluate(buffer: Row): Any = {
val funnelRoute = buffer.getString(2).split(",").map(x => x.toLong)
val windowLongernal = buffer.getLong(3)
val r = funnelProcess((buffer.getAs[mutable.WrappedArray[Long]](0)).toArray[Long],
(buffer.getAs[mutable.WrappedArray[Long]](1)).toArray[Long], funnelRoute, windowLongernal)
r
}
}
class JsonInfoGet extends UDF2[String, String, String] {
def call(jsonInfo: String, keyName: String): String = {
var value: String = ""
val map = CalJson.jsonToMap(jsonInfo, gson)
if (map.containsKey(keyName))
value = map.get(keyName).toString
value
}
}
def funnelProcess(dataItem: Array[Long], dataEventTime: Array[Long], rt: Array[Long], wd: Long): Array[Boolean] = {
val result = Array.fill[Boolean](rt.length)(false)
val data = dataItem.zip(dataEventTime)
val sortData = data.sortBy(_._2)
val indexArrayLength = rt.length - 1
var startTimeArray = Array.fill[Long](rt.length)(0l)
val indexMap = rt.map(item => item -> rt.indexOf(item)).toMap
var notSatisfy = true
for (itemWithTimeKv <- sortData if notSatisfy) {
val itemIndex = indexMap(itemWithTimeKv._1)
// first item
if (itemIndex == 0) {
startTimeArray(0) = itemWithTimeKv._2;
result(0) = true
} // pre item exists?
else if (startTimeArray(itemIndex - 1) != 0) {
// in range
if ((itemWithTimeKv._2 - startTimeArray(itemIndex - 1)) < wd) {
startTimeArray(itemIndex) = startTimeArray(itemIndex - 1)
result(itemIndex) = true
// out range
} else
startTimeArray(itemIndex - 1) = 0
}
if (result(indexArrayLength) == true) notSatisfy = false
}
result
}
}
import com.google.gson.{Gson, GsonBuilder}
import org.apache.spark.sql.Row
import org.apache.spark.sql.api.java.UDF2
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import scala.collection.mutable
/**
* Created by yunzhi.lyz on 17-8-23.
**/
object FunnelUdf {
private[udf] var gson: Gson = new GsonBuilder().create
class FunnelCalMerge extends UserDefinedAggregateFunction {
override def inputSchema = StructType(StructField("item_id", ArrayType(LongType, false), true) ::
StructField("item_timestamp", ArrayType(LongType, false), true) ::
StructField("funnelDesc", StringType, true) ::
StructField("windowLongernal", LongType, true) :: Nil)
def bufferSchema: StructType = StructType(StructField("countArray", ArrayType(LongType, false), true) :: Nil)
override def dataType: DataType = StructType(StructField("countArray", ArrayType(LongType, false), true) :: Nil)
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Array.fill[Long](10)(0)
}
override def update(buffer: MutableAggregationBuffer, inputrow: Row): Unit = {
var itemIDArray = inputrow.getAs[mutable.WrappedArray[Long]](0)
var itemTimestamp = inputrow.getAs[mutable.WrappedArray[Long]](1)
val funnelRoute = inputrow.getString(2).split(",").map(x => x.toLong)
val windowLongernal = inputrow.getLong(3)
var bufferValue = buffer.getAs[mutable.WrappedArray[Long]](0)
val r = funnelProcess(itemIDArray.toArray[Long], itemTimestamp.toArray[Long], funnelRoute, windowLongernal)
for (i <- 0 until funnelRoute.length if r(i)) bufferValue(i) = bufferValue(i) + 1
buffer(0) = bufferValue
}
override def merge(buffer: MutableAggregationBuffer, buffer2: Row): Unit = {
val r1 = buffer.getAs[mutable.WrappedArray[Long]](0)
val r2 = buffer2.getAs[mutable.WrappedArray[Long]](0)
for (i <- 0 until 10) r1(i) = r1(i) + r2(i)
buffer(0) = r1
}
override def evaluate(buffer: Row): Any = buffer
}
class FunnelCal extends UserDefinedAggregateFunction {
override def inputSchema: StructType = StructType(
StructField("item_id", LongType, true) ::
StructField("item_timestamp", LongType, true) :: Nil)
def bufferSchema: StructType = StructType(StructField("item_id", ArrayType(LongType), true) :: StructField("item_timestamp", ArrayType(LongType), true) :: Nil)
override def dataType: DataType = StructType(StructField("item_id", ArrayType(LongType), true) :: StructField("item_timestamp", ArrayType(LongType), true) :: Nil)
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Array.empty[LongType]
buffer(1) = Array.empty[LongType]
}
override def update(buffer: MutableAggregationBuffer, inputrow: Row): Unit = {
buffer(0) = (buffer.getAs[mutable.WrappedArray[Long]](0)).toArray[Long].+:(inputrow.getAs[Long](0))
buffer(1) = (buffer.getAs[mutable.WrappedArray[Long]](1)).toArray[Long].+:(inputrow.getAs[Long](1))
}
override def merge(buffer: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer(0) = (buffer.getAs[mutable.WrappedArray[Long]](0)).toArray[Long].++:((buffer2.getAs[mutable.WrappedArray[Long]](0)).toArray[Long])
buffer(1) = (buffer.getAs[mutable.WrappedArray[Long]](1)).toArray[Long].++:((buffer2.getAs[mutable.WrappedArray[Long]](1)).toArray[Long])
}
override def evaluate(buffer: Row): Any = buffer
}
class FunnelCalMerge2 extends UserDefinedAggregateFunction {
override def inputSchema = StructType(StructField("result", ArrayType(BooleanType), true) :: Nil)
def bufferSchema: StructType = StructType(StructField("countArray", ArrayType(LongType, false), true) :: Nil)
override def dataType: DataType = ArrayType(LongType, false)
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Array.fill[Long](10)(0)
}
override def update(buffer: MutableAggregationBuffer, inputrow: Row): Unit = {
var bufferValue = buffer.getAs[mutable.WrappedArray[Long]](0)
val r = (inputrow.getAs[mutable.WrappedArray[Boolean]](0)).toArray[Boolean]
for (i <- 0 until r.length if r(i)) bufferValue(i) = bufferValue(i) + 1
buffer(0) = bufferValue
}
override def merge(buffer: MutableAggregationBuffer, buffer2: Row): Unit = {
val r1 = buffer.getAs[mutable.WrappedArray[Long]](0)
val r2 = buffer2.getAs[mutable.WrappedArray[Long]](0)
for (i <- 0 until 10) r1(i) = r1(i) + r2(i)
buffer(0) = r1
}
override def evaluate(buffer: Row): Any = buffer(0)
}
class FunnelCal2 extends UserDefinedAggregateFunction {
override def inputSchema: StructType = StructType(
StructField("item_id", LongType, true) ::
StructField("item_timestamp", LongType, true) ::
StructField("funnelDesc", StringType, true) ::
StructField("windowLongernal", LongType, true) :: Nil)
def bufferSchema: StructType = StructType(
StructField("item_id", ArrayType(LongType), true) ::
StructField("item_timestamp", ArrayType(LongType), true) ::
StructField("funnelDesc", StringType, true) ::
StructField("windowLongernal", LongType, true) :: Nil)
override def dataType: DataType = ArrayType(BooleanType, false)
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Array.empty[LongType]
buffer(1) = Array.empty[LongType]
buffer(2) = ""
buffer(3) = 0l
}
override def update(buffer: MutableAggregationBuffer, inputrow: Row): Unit = {
buffer(0) = (buffer.getAs[mutable.WrappedArray[Long]](0)).toArray[Long].+:(inputrow.getAs[Long](0))
buffer(1) = (buffer.getAs[mutable.WrappedArray[Long]](1)).toArray[Long].+:(inputrow.getAs[Long](1))
buffer(2) = inputrow.getString(2)
buffer(3) = inputrow.getLong(3)
}
override def merge(buffer: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer(0) = (buffer.getAs[mutable.WrappedArray[Long]](0)).toArray[Long].++:((buffer2.getAs[mutable.WrappedArray[Long]](0)).toArray[Long])
buffer(1) = (buffer.getAs[mutable.WrappedArray[Long]](1)).toArray[Long].++:((buffer2.getAs[mutable.WrappedArray[Long]](1)).toArray[Long])
buffer(2) = buffer2.getString(2)
buffer(3) = buffer2.getLong(3)
}
override def evaluate(buffer: Row): Any = {
val funnelRoute = buffer.getString(2).split(",").map(x => x.toLong)
val windowLongernal = buffer.getLong(3)
val r = funnelProcess((buffer.getAs[mutable.WrappedArray[Long]](0)).toArray[Long],
(buffer.getAs[mutable.WrappedArray[Long]](1)).toArray[Long], funnelRoute, windowLongernal)
r
}
}
class JsonInfoGet extends UDF2[String, String, String] {
def call(jsonInfo: String, keyName: String): String = {
var value: String = ""
val map = CalJson.jsonToMap(jsonInfo, gson)
if (map.containsKey(keyName))
value = map.get(keyName).toString
value
}
}
def funnelProcess(dataItem: Array[Long], dataEventTime: Array[Long], rt: Array[Long], wd: Long): Array[Boolean] = {
val result = Array.fill[Boolean](rt.length)(false)
val data = dataItem.zip(dataEventTime)
val sortData = data.sortBy(_._2)
val indexArrayLength = rt.length - 1
var startTimeArray = Array.fill[Long](rt.length)(0l)
val indexMap = rt.map(item => item -> rt.indexOf(item)).toMap
var notSatisfy = true
for (itemWithTimeKv <- sortData if notSatisfy) {
val itemIndex = indexMap(itemWithTimeKv._1)
// first item
if (itemIndex == 0) {
startTimeArray(0) = itemWithTimeKv._2;
result(0) = true
} // pre item exists?
else if (startTimeArray(itemIndex - 1) != 0) {
// in range
if ((itemWithTimeKv._2 - startTimeArray(itemIndex - 1)) < wd) {
startTimeArray(itemIndex) = startTimeArray(itemIndex - 1)
result(itemIndex) = true
// out range
} else
startTimeArray(itemIndex - 1) = 0
}
if (result(indexArrayLength) == true) notSatisfy = false
}
result
}
}
发表评论
-
Spark SQL运行 过程 抄的别人的,记录 学习
2018-05-13 23:07 1038抄的别人的,觉得写的特别好 val FILESOURCE ... -
thriftserver log4j.properties 生效
2018-04-09 11:46 455/home/isuhadoop/spark2/sbin/sta ... -
udaf 返回的 子属性
2018-03-20 13:22 448udaf 返回的 子属性 spark.sql(" ... -
spark datasource
2018-03-16 16:36 673DataFrameWriter format val c ... -
如何 map 端 Join。
2018-03-04 19:31 634Hive 中 修改表的 rawDataSize = 1 1 ... -
spark thrift server 修改
2018-03-04 12:58 591org.apache.spark.sql.hive.thrif ... -
hive hbase thriftserver run
2018-03-03 15:13 418正确方法 : 0\ 拷贝对应目录到 spark2 jars ... -
scala package
2018-01-25 09:48 538#scala 打包 mvn clean scala:com ... -
SPARK SERVER
2018-01-23 22:15 557sbin/start-thriftserver.sh --dr ... -
driver class
2018-01-21 22:11 528sbin/start-thriftserver.sh -- ... -
spark thrift server 调试
2017-10-20 15:50 869spark-hive-thriftserver 本地调试 ... -
spark SQL conf
2017-10-18 14:36 648org.apache.spark.sql.internal.S ... -
java 死锁 ,内存问题 分析
2017-10-17 10:50 356jstack -l pid /opt/soft/jdk/ ... -
thriftServer proxy
2017-10-16 14:21 949sudo yum install haproxy 257 ... -
hive spark conf
2017-09-26 17:44 1302CREATE TABLE org_userbehavior_a ... -
get day
2017-09-19 08:41 577def timeDayNow() = { var ... -
thriftserver
2017-09-14 19:47 481export SPARK_CONF_DIR=/home/yun ... -
thriftserver dynamicallocation
2017-09-08 14:41 594./sbin/start-thriftserver.sh -- ... -
test code
2017-08-24 17:52 293def taskcal(data:Array[(String, ... -
struct streaming SQL udf udaf
2017-08-22 11:50 683spark aggregator class H ...
相关推荐
CodeTest是一款自动化安全测试工具,主要用于自动生成Proof of Concept(POC)来检测软件系统中的安全漏洞。在IT行业中,POC是证明一个安全漏洞确实存在的一种技术验证,它通常是一段代码或脚本,当运行时能复现漏洞...
CodeTEST_native是一款用于软件在电路(Software-In-Circuit, SWIC)测试的工具,尤其适用于嵌入式系统中的代码测试。本指南将详细介绍如何在主机平台上利用CodeTEST_native进行SWIC版测试,主要涵盖以下四个步骤: ...
code test public partial class About : System.Web.UI.Page
Test PHP Code
pyopengl testcode demopyopengl testcode demopyopengl testcode demopyopengl testcode demopyopengl testcode demo
11-2 testcode
《testcode图书管理系统样例》是一个专为演示和学习设计的软件系统,它展示了如何有效管理图书信息,包括书籍的录入、查询、借阅和归还等操作。在这个系统中,testcode扮演着核心角色,它是实现图书管理功能的关键...
"test-code-9.rar_testcode 实验"作为一个专为达盛ARM处理器编写的实验代码集合,为初学者提供了一个学习平台,用以加深对ARM架构的理解,锻炼编程技能。 达盛ARM,作为ARM处理器的一种变体,因其实用性和广泛的...
2. 工作流程: - GPS初始化:GPSTEST首先请求Android系统的Location Services权限,然后开启GPS硬件,等待卫星信号。 - 卫星搜索:应用通过监听卫星状态,获取可视卫星的数量,以及每个卫星的伪距和信号质量。 - ...
通过对“interview-code test”文档的部分内容分析,我们了解到这是一份与原材料库存管理和容量规划相关的文档。它不仅反映了企业在供应链管理方面的需求,同时也展示了IT技术在这一领域的广泛应用。对于IT行业的...
"test code project for UI" 指的是一种专门针对UI进行的自动化测试项目,目的是为了验证应用程序的外观、响应速度、易用性以及各种交互元素是否符合设计规范和用户需求。通过编写测试代码,可以有效地减少手动测试...
本项目“ISPorCOM test code”旨在演示如何利用AVRUSB进行ISP(In-System Programming)和COM(串行通信)功能。下面我们将深入探讨这些关键知识点。 首先,ISP是嵌入式系统领域中的一种编程方式,它允许开发者在...
2. **CodeTEST Software-In-Circuit (SIC)**:用于在目标硬件上进行软件测试,但不直接连接到硬件。 3. **CodeTEST Hardware-In-Circuit (HIC)**:直接与目标硬件相连进行测试,适用于更复杂的情况。 #### 四、Code...
从给定的文件信息来看,主要内容集中在嵌入式软件测试及其相关的CodeTest测试工具的介绍上。嵌入式软件测试是确保嵌入式系统稳定性和可靠性的关键环节,而CodeTest作为一款专业的测试工具,其在嵌入式软件开发中的...
**嵌入式软件测试工具 Codetest** Codetest 是一款专门针对嵌入式系统软件测试的高效工具套件,全球首款为解决嵌入式应用程序的测试挑战而设计。它提供了实时在线的解决方案,涵盖追踪、性能分析、代码覆盖率测试...
"current_test_code.rar_current te_current test code_current test手"这个标题暗示我们这里包含的是一份关于手机待机电流测试的代码或者程序,可能是用于自动化测试的脚本或应用。 描述中的“测试手机待机电流, ...
国际标准原文,国标依据此文件的第三版制订了《GBT 17421.2-2016 机床检验通则》。 这是第四版,高清英文版。 This part of ISO 230 specifies methods for testing and evaluating the accuracy and repeatability ...