- 浏览: 121735 次
- 性别:
- 来自: 杭州
-
文章分类
最新评论
package org.test.udf
import com.google.gson.{Gson, GsonBuilder}
import org.apache.spark.sql.Row
import org.apache.spark.sql.api.java.UDF2
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import scala.collection.mutable
/**
* Created by yunzhi.lyz on 17-8-23.
**/
object FunnelUdf {
private[udf] var gson: Gson = new GsonBuilder().create
class FunnelCalMerge extends UserDefinedAggregateFunction {
override def inputSchema = StructType(StructField("item_id", ArrayType(LongType, false), true) ::
StructField("item_timestamp", ArrayType(LongType, false), true) ::
StructField("funnelDesc", StringType, true) ::
StructField("windowLongernal", LongType, true) :: Nil)
def bufferSchema: StructType = StructType(StructField("countArray", ArrayType(LongType, false), true) :: Nil)
override def dataType: DataType = StructType(StructField("countArray", ArrayType(LongType, false), true) :: Nil)
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Array.fill[Long](10)(0)
}
override def update(buffer: MutableAggregationBuffer, inputrow: Row): Unit = {
var itemIDArray = inputrow.getAs[mutable.WrappedArray[Long]](0)
var itemTimestamp = inputrow.getAs[mutable.WrappedArray[Long]](1)
val funnelRoute = inputrow.getString(2).split(",").map(x => x.toLong)
val windowLongernal = inputrow.getLong(3)
var bufferValue = buffer.getAs[mutable.WrappedArray[Long]](0)
val r = funnelProcess(itemIDArray.toArray[Long], itemTimestamp.toArray[Long], funnelRoute, windowLongernal)
for (i <- 0 until funnelRoute.length if r(i)) bufferValue(i) = bufferValue(i) + 1
buffer(0) = bufferValue
}
override def merge(buffer: MutableAggregationBuffer, buffer2: Row): Unit = {
val r1 = buffer.getAs[mutable.WrappedArray[Long]](0)
val r2 = buffer2.getAs[mutable.WrappedArray[Long]](0)
for (i <- 0 until 10) r1(i) = r1(i) + r2(i)
buffer(0) = r1
}
override def evaluate(buffer: Row): Any = buffer
}
class FunnelCal extends UserDefinedAggregateFunction {
override def inputSchema: StructType = StructType(
StructField("item_id", LongType, true) ::
StructField("item_timestamp", LongType, true) :: Nil)
def bufferSchema: StructType = StructType(StructField("item_id", ArrayType(LongType), true) :: StructField("item_timestamp", ArrayType(LongType), true) :: Nil)
override def dataType: DataType = StructType(StructField("item_id", ArrayType(LongType), true) :: StructField("item_timestamp", ArrayType(LongType), true) :: Nil)
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Array.empty[LongType]
buffer(1) = Array.empty[LongType]
}
override def update(buffer: MutableAggregationBuffer, inputrow: Row): Unit = {
buffer(0) = (buffer.getAs[mutable.WrappedArray[Long]](0)).toArray[Long].+:(inputrow.getAs[Long](0))
buffer(1) = (buffer.getAs[mutable.WrappedArray[Long]](1)).toArray[Long].+:(inputrow.getAs[Long](1))
}
override def merge(buffer: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer(0) = (buffer.getAs[mutable.WrappedArray[Long]](0)).toArray[Long].++:((buffer2.getAs[mutable.WrappedArray[Long]](0)).toArray[Long])
buffer(1) = (buffer.getAs[mutable.WrappedArray[Long]](1)).toArray[Long].++:((buffer2.getAs[mutable.WrappedArray[Long]](1)).toArray[Long])
}
override def evaluate(buffer: Row): Any = buffer
}
class FunnelCalMerge2 extends UserDefinedAggregateFunction {
override def inputSchema = StructType(StructField("result", ArrayType(BooleanType), true) :: Nil)
def bufferSchema: StructType = StructType(StructField("countArray", ArrayType(LongType, false), true) :: Nil)
override def dataType: DataType = ArrayType(LongType, false)
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Array.fill[Long](10)(0)
}
override def update(buffer: MutableAggregationBuffer, inputrow: Row): Unit = {
var bufferValue = buffer.getAs[mutable.WrappedArray[Long]](0)
val r = (inputrow.getAs[mutable.WrappedArray[Boolean]](0)).toArray[Boolean]
for (i <- 0 until r.length if r(i)) bufferValue(i) = bufferValue(i) + 1
buffer(0) = bufferValue
}
override def merge(buffer: MutableAggregationBuffer, buffer2: Row): Unit = {
val r1 = buffer.getAs[mutable.WrappedArray[Long]](0)
val r2 = buffer2.getAs[mutable.WrappedArray[Long]](0)
for (i <- 0 until 10) r1(i) = r1(i) + r2(i)
buffer(0) = r1
}
override def evaluate(buffer: Row): Any = buffer(0)
}
class FunnelCal2 extends UserDefinedAggregateFunction {
override def inputSchema: StructType = StructType(
StructField("item_id", LongType, true) ::
StructField("item_timestamp", LongType, true) ::
StructField("funnelDesc", StringType, true) ::
StructField("windowLongernal", LongType, true) :: Nil)
def bufferSchema: StructType = StructType(
StructField("item_id", ArrayType(LongType), true) ::
StructField("item_timestamp", ArrayType(LongType), true) ::
StructField("funnelDesc", StringType, true) ::
StructField("windowLongernal", LongType, true) :: Nil)
override def dataType: DataType = ArrayType(BooleanType, false)
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Array.empty[LongType]
buffer(1) = Array.empty[LongType]
buffer(2) = ""
buffer(3) = 0l
}
override def update(buffer: MutableAggregationBuffer, inputrow: Row): Unit = {
buffer(0) = (buffer.getAs[mutable.WrappedArray[Long]](0)).toArray[Long].+:(inputrow.getAs[Long](0))
buffer(1) = (buffer.getAs[mutable.WrappedArray[Long]](1)).toArray[Long].+:(inputrow.getAs[Long](1))
buffer(2) = inputrow.getString(2)
buffer(3) = inputrow.getLong(3)
}
override def merge(buffer: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer(0) = (buffer.getAs[mutable.WrappedArray[Long]](0)).toArray[Long].++:((buffer2.getAs[mutable.WrappedArray[Long]](0)).toArray[Long])
buffer(1) = (buffer.getAs[mutable.WrappedArray[Long]](1)).toArray[Long].++:((buffer2.getAs[mutable.WrappedArray[Long]](1)).toArray[Long])
buffer(2) = buffer2.getString(2)
buffer(3) = buffer2.getLong(3)
}
override def evaluate(buffer: Row): Any = {
val funnelRoute = buffer.getString(2).split(",").map(x => x.toLong)
val windowLongernal = buffer.getLong(3)
val r = funnelProcess((buffer.getAs[mutable.WrappedArray[Long]](0)).toArray[Long],
(buffer.getAs[mutable.WrappedArray[Long]](1)).toArray[Long], funnelRoute, windowLongernal)
r
}
}
class JsonInfoGet extends UDF2[String, String, String] {
def call(jsonInfo: String, keyName: String): String = {
var value: String = ""
val map = CalJson.jsonToMap(jsonInfo, gson)
if (map.containsKey(keyName))
value = map.get(keyName).toString
value
}
}
def funnelProcess(dataItem: Array[Long], dataEventTime: Array[Long], rt: Array[Long], wd: Long): Array[Boolean] = {
val result = Array.fill[Boolean](rt.length)(false)
val data = dataItem.zip(dataEventTime)
val sortData = data.sortBy(_._2)
val indexArrayLength = rt.length - 1
var startTimeArray = Array.fill[Long](rt.length)(0l)
val indexMap = rt.map(item => item -> rt.indexOf(item)).toMap
var notSatisfy = true
for (itemWithTimeKv <- sortData if notSatisfy) {
val itemIndex = indexMap(itemWithTimeKv._1)
// first item
if (itemIndex == 0) {
startTimeArray(0) = itemWithTimeKv._2;
result(0) = true
} // pre item exists?
else if (startTimeArray(itemIndex - 1) != 0) {
// in range
if ((itemWithTimeKv._2 - startTimeArray(itemIndex - 1)) < wd) {
startTimeArray(itemIndex) = startTimeArray(itemIndex - 1)
result(itemIndex) = true
// out range
} else
startTimeArray(itemIndex - 1) = 0
}
if (result(indexArrayLength) == true) notSatisfy = false
}
result
}
}
import com.google.gson.{Gson, GsonBuilder}
import org.apache.spark.sql.Row
import org.apache.spark.sql.api.java.UDF2
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import scala.collection.mutable
/**
* Created by yunzhi.lyz on 17-8-23.
**/
object FunnelUdf {
private[udf] var gson: Gson = new GsonBuilder().create
class FunnelCalMerge extends UserDefinedAggregateFunction {
override def inputSchema = StructType(StructField("item_id", ArrayType(LongType, false), true) ::
StructField("item_timestamp", ArrayType(LongType, false), true) ::
StructField("funnelDesc", StringType, true) ::
StructField("windowLongernal", LongType, true) :: Nil)
def bufferSchema: StructType = StructType(StructField("countArray", ArrayType(LongType, false), true) :: Nil)
override def dataType: DataType = StructType(StructField("countArray", ArrayType(LongType, false), true) :: Nil)
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Array.fill[Long](10)(0)
}
override def update(buffer: MutableAggregationBuffer, inputrow: Row): Unit = {
var itemIDArray = inputrow.getAs[mutable.WrappedArray[Long]](0)
var itemTimestamp = inputrow.getAs[mutable.WrappedArray[Long]](1)
val funnelRoute = inputrow.getString(2).split(",").map(x => x.toLong)
val windowLongernal = inputrow.getLong(3)
var bufferValue = buffer.getAs[mutable.WrappedArray[Long]](0)
val r = funnelProcess(itemIDArray.toArray[Long], itemTimestamp.toArray[Long], funnelRoute, windowLongernal)
for (i <- 0 until funnelRoute.length if r(i)) bufferValue(i) = bufferValue(i) + 1
buffer(0) = bufferValue
}
override def merge(buffer: MutableAggregationBuffer, buffer2: Row): Unit = {
val r1 = buffer.getAs[mutable.WrappedArray[Long]](0)
val r2 = buffer2.getAs[mutable.WrappedArray[Long]](0)
for (i <- 0 until 10) r1(i) = r1(i) + r2(i)
buffer(0) = r1
}
override def evaluate(buffer: Row): Any = buffer
}
class FunnelCal extends UserDefinedAggregateFunction {
override def inputSchema: StructType = StructType(
StructField("item_id", LongType, true) ::
StructField("item_timestamp", LongType, true) :: Nil)
def bufferSchema: StructType = StructType(StructField("item_id", ArrayType(LongType), true) :: StructField("item_timestamp", ArrayType(LongType), true) :: Nil)
override def dataType: DataType = StructType(StructField("item_id", ArrayType(LongType), true) :: StructField("item_timestamp", ArrayType(LongType), true) :: Nil)
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Array.empty[LongType]
buffer(1) = Array.empty[LongType]
}
override def update(buffer: MutableAggregationBuffer, inputrow: Row): Unit = {
buffer(0) = (buffer.getAs[mutable.WrappedArray[Long]](0)).toArray[Long].+:(inputrow.getAs[Long](0))
buffer(1) = (buffer.getAs[mutable.WrappedArray[Long]](1)).toArray[Long].+:(inputrow.getAs[Long](1))
}
override def merge(buffer: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer(0) = (buffer.getAs[mutable.WrappedArray[Long]](0)).toArray[Long].++:((buffer2.getAs[mutable.WrappedArray[Long]](0)).toArray[Long])
buffer(1) = (buffer.getAs[mutable.WrappedArray[Long]](1)).toArray[Long].++:((buffer2.getAs[mutable.WrappedArray[Long]](1)).toArray[Long])
}
override def evaluate(buffer: Row): Any = buffer
}
class FunnelCalMerge2 extends UserDefinedAggregateFunction {
override def inputSchema = StructType(StructField("result", ArrayType(BooleanType), true) :: Nil)
def bufferSchema: StructType = StructType(StructField("countArray", ArrayType(LongType, false), true) :: Nil)
override def dataType: DataType = ArrayType(LongType, false)
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Array.fill[Long](10)(0)
}
override def update(buffer: MutableAggregationBuffer, inputrow: Row): Unit = {
var bufferValue = buffer.getAs[mutable.WrappedArray[Long]](0)
val r = (inputrow.getAs[mutable.WrappedArray[Boolean]](0)).toArray[Boolean]
for (i <- 0 until r.length if r(i)) bufferValue(i) = bufferValue(i) + 1
buffer(0) = bufferValue
}
override def merge(buffer: MutableAggregationBuffer, buffer2: Row): Unit = {
val r1 = buffer.getAs[mutable.WrappedArray[Long]](0)
val r2 = buffer2.getAs[mutable.WrappedArray[Long]](0)
for (i <- 0 until 10) r1(i) = r1(i) + r2(i)
buffer(0) = r1
}
override def evaluate(buffer: Row): Any = buffer(0)
}
class FunnelCal2 extends UserDefinedAggregateFunction {
override def inputSchema: StructType = StructType(
StructField("item_id", LongType, true) ::
StructField("item_timestamp", LongType, true) ::
StructField("funnelDesc", StringType, true) ::
StructField("windowLongernal", LongType, true) :: Nil)
def bufferSchema: StructType = StructType(
StructField("item_id", ArrayType(LongType), true) ::
StructField("item_timestamp", ArrayType(LongType), true) ::
StructField("funnelDesc", StringType, true) ::
StructField("windowLongernal", LongType, true) :: Nil)
override def dataType: DataType = ArrayType(BooleanType, false)
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Array.empty[LongType]
buffer(1) = Array.empty[LongType]
buffer(2) = ""
buffer(3) = 0l
}
override def update(buffer: MutableAggregationBuffer, inputrow: Row): Unit = {
buffer(0) = (buffer.getAs[mutable.WrappedArray[Long]](0)).toArray[Long].+:(inputrow.getAs[Long](0))
buffer(1) = (buffer.getAs[mutable.WrappedArray[Long]](1)).toArray[Long].+:(inputrow.getAs[Long](1))
buffer(2) = inputrow.getString(2)
buffer(3) = inputrow.getLong(3)
}
override def merge(buffer: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer(0) = (buffer.getAs[mutable.WrappedArray[Long]](0)).toArray[Long].++:((buffer2.getAs[mutable.WrappedArray[Long]](0)).toArray[Long])
buffer(1) = (buffer.getAs[mutable.WrappedArray[Long]](1)).toArray[Long].++:((buffer2.getAs[mutable.WrappedArray[Long]](1)).toArray[Long])
buffer(2) = buffer2.getString(2)
buffer(3) = buffer2.getLong(3)
}
override def evaluate(buffer: Row): Any = {
val funnelRoute = buffer.getString(2).split(",").map(x => x.toLong)
val windowLongernal = buffer.getLong(3)
val r = funnelProcess((buffer.getAs[mutable.WrappedArray[Long]](0)).toArray[Long],
(buffer.getAs[mutable.WrappedArray[Long]](1)).toArray[Long], funnelRoute, windowLongernal)
r
}
}
class JsonInfoGet extends UDF2[String, String, String] {
def call(jsonInfo: String, keyName: String): String = {
var value: String = ""
val map = CalJson.jsonToMap(jsonInfo, gson)
if (map.containsKey(keyName))
value = map.get(keyName).toString
value
}
}
def funnelProcess(dataItem: Array[Long], dataEventTime: Array[Long], rt: Array[Long], wd: Long): Array[Boolean] = {
val result = Array.fill[Boolean](rt.length)(false)
val data = dataItem.zip(dataEventTime)
val sortData = data.sortBy(_._2)
val indexArrayLength = rt.length - 1
var startTimeArray = Array.fill[Long](rt.length)(0l)
val indexMap = rt.map(item => item -> rt.indexOf(item)).toMap
var notSatisfy = true
for (itemWithTimeKv <- sortData if notSatisfy) {
val itemIndex = indexMap(itemWithTimeKv._1)
// first item
if (itemIndex == 0) {
startTimeArray(0) = itemWithTimeKv._2;
result(0) = true
} // pre item exists?
else if (startTimeArray(itemIndex - 1) != 0) {
// in range
if ((itemWithTimeKv._2 - startTimeArray(itemIndex - 1)) < wd) {
startTimeArray(itemIndex) = startTimeArray(itemIndex - 1)
result(itemIndex) = true
// out range
} else
startTimeArray(itemIndex - 1) = 0
}
if (result(indexArrayLength) == true) notSatisfy = false
}
result
}
}
发表评论
-
Spark SQL运行 过程 抄的别人的,记录 学习
2018-05-13 23:07 1016抄的别人的,觉得写的特别好 val FILESOURCE ... -
thriftserver log4j.properties 生效
2018-04-09 11:46 432/home/isuhadoop/spark2/sbin/sta ... -
udaf 返回的 子属性
2018-03-20 13:22 424udaf 返回的 子属性 spark.sql(" ... -
spark datasource
2018-03-16 16:36 652DataFrameWriter format val c ... -
如何 map 端 Join。
2018-03-04 19:31 606Hive 中 修改表的 rawDataSize = 1 1 ... -
spark thrift server 修改
2018-03-04 12:58 563org.apache.spark.sql.hive.thrif ... -
hive hbase thriftserver run
2018-03-03 15:13 385正确方法 : 0\ 拷贝对应目录到 spark2 jars ... -
scala package
2018-01-25 09:48 504#scala 打包 mvn clean scala:com ... -
SPARK SERVER
2018-01-23 22:15 512sbin/start-thriftserver.sh --dr ... -
driver class
2018-01-21 22:11 507sbin/start-thriftserver.sh -- ... -
spark thrift server 调试
2017-10-20 15:50 845spark-hive-thriftserver 本地调试 ... -
spark SQL conf
2017-10-18 14:36 598org.apache.spark.sql.internal.S ... -
java 死锁 ,内存问题 分析
2017-10-17 10:50 327jstack -l pid /opt/soft/jdk/ ... -
thriftServer proxy
2017-10-16 14:21 909sudo yum install haproxy 257 ... -
hive spark conf
2017-09-26 17:44 1283CREATE TABLE org_userbehavior_a ... -
get day
2017-09-19 08:41 328def timeDayNow() = { var ... -
thriftserver
2017-09-14 19:47 443export SPARK_CONF_DIR=/home/yun ... -
thriftserver dynamicallocation
2017-09-08 14:41 561./sbin/start-thriftserver.sh -- ... -
test code
2017-08-24 17:52 268def taskcal(data:Array[(String, ... -
struct streaming SQL udf udaf
2017-08-22 11:50 657spark aggregator class H ...
相关推荐
test code project for UI
GPSTEST android gpstest source code
CodeTest 是一款用于软件测试和硬件验证的工具,尤其在PCI(Peripheral Component Interconnect)设备的测试中发挥了重要作用。PCI是一种广泛使用的计算机总线标准,允许外部设备高速连接到主板上,例如网络适配器、...
《CodeTEST PCI测试使用指南》 CodeTEST是一款强大的测试工具,其PCI部分主要用于对基于PCI总线的设备进行功能和性能测试。本篇主要介绍如何有效地使用CodeTEST 4.2.1版本中的PCI Probe功能进行系统级的测试工作,...
CodeTEST_native是一款用于软件在电路(Software-In-Circuit, SWIC)测试的工具,尤其适用于嵌入式系统中的代码测试。本指南将详细介绍如何在主机平台上利用CodeTEST_native进行SWIC版测试,主要涵盖以下四个步骤: ...
interview-code test.pdf
code test public partial class About : System.Web.UI.Page
Test PHP Code
CodeTest自动生成POC刷漏洞工具
pyopengl testcode demopyopengl testcode demopyopengl testcode demopyopengl testcode demopyopengl testcode demo
11-2 testcode
Codetest简介Codetest简介Codetest简介
test code for video
testcode图书管理系统样例 testcode图书管理系统样例 testcode图书管理系统样例
php symphony test code
嵌入式软件测试讲解及CodeTest测试工具介绍
CodedUI test code for excel
self test code for demo using
stock test code with zf
主要展示了avrusb通信的主要几个函数的使用方法,这只是下位机程序即固件程序,代码中,有两种类的通信,一种是cdc类通信,另外是其他设备类