- 浏览: 126083 次
- 性别:
- 来自: 杭州
文章分类
最新评论
def taskcal(data:Array[(String,Long)],rt:Array[String],wd:Int):Array[Boolean]={
val result = Array.fill[Boolean](rt.length)(false)
val sortData = data.sortBy(_._2)
val indexArrayLength = rt.length - 1
var startTimeArray = Array.fill[Long](rt.length)(0l)
val indexMap = rt.map(item => item -> rt.indexOf(item)).toMap
var notFull = true
for(itemWithTimeKv <- sortData if notFull ){
val itemIndex = indexMap(itemWithTimeKv._1)
if(itemIndex == 0) { startTimeArray(0) = itemWithTimeKv._2 ; result(0) = true} // first item
else if(startTimeArray(itemIndex-1) !=0) { // pre item exists?
if( (itemWithTimeKv._2 - startTimeArray(itemIndex-1))< wd) { // in range
startTimeArray(itemIndex) = startTimeArray(itemIndex-1)
result(itemIndex) = true
}else // out range
startTimeArray = Array.fill[Long](rt.length)(0l)
}
if(result(indexArrayLength) == true) notFull = false
}
result
}
def main(args:Array[String]): Unit = {
val data =Array(("A",1450000000000l),
("B",1450000000001l),
("C",1430000000002l),
("A",1460000000001l)
)
val rt = Array("A","B","C")
val wd = 3600000
println(taskcal(data,rt,wd).mkString(","))
// bench("r",100000,taskcal(data,rt,wd))
}
def bench(name:String,count:Int,f: => Unit): Unit ={
val begin = System.currentTimeMillis()
for(i <-0 to count) f
val end = System.currentTimeMillis()
println(s"name : ${name} count: $count count:${end - begin} ")
}
val storageDir = "UserBehaviorDStream"
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
val sqlContext = new SQLContext(ssc.sparkContext)
val getPartitionDate = udf(FunnelUtil.getDatePartiton _)
lines.foreachRDD(rdd => {
val userBehiviorData = rdd.map(x => {println("**:"+x);x.split(",")}).filter(_.length == 6)
.map(r => RowFactory.create(r(0), r(1), r(2), r(3), r(4), r(5)))
val userBehiviorDataDF = sqlContext.createDataFrame(userBehiviorData, getStructType)
val userBehiviorDataPartition = userBehiviorDataDF.withColumn("yyyyMMddHH", getPartitionDate(userBehiviorDataDF("eventTime"))).coalesce(1)
userBehiviorDataPartition.write.format("parquet").mode("append").partitionBy("yyyyMMddHH").save(storageDir)
})
ssc.start()
ssc.awaitTermination()
}
val getStructType = {
val structFields = mutable.ArrayBuffer[StructField]()
structFields += DataTypes.createStructField("userId", DataTypes.StringType, true)
structFields += DataTypes.createStructField("eventTime", DataTypes.StringType, true)
structFields += DataTypes.createStructField("itemId", DataTypes.StringType, true)
structFields += DataTypes.createStructField("itemName", DataTypes.StringType, true)
structFields += DataTypes.createStructField("eventAttribute", DataTypes.StringType, true)
structFields += DataTypes.createStructField("eventDate", DataTypes.StringType, true)
val structType = DataTypes.createStructType(structFields.toArray)
structType
}
val result = Array.fill[Boolean](rt.length)(false)
val sortData = data.sortBy(_._2)
val indexArrayLength = rt.length - 1
var startTimeArray = Array.fill[Long](rt.length)(0l)
val indexMap = rt.map(item => item -> rt.indexOf(item)).toMap
var notFull = true
for(itemWithTimeKv <- sortData if notFull ){
val itemIndex = indexMap(itemWithTimeKv._1)
if(itemIndex == 0) { startTimeArray(0) = itemWithTimeKv._2 ; result(0) = true} // first item
else if(startTimeArray(itemIndex-1) !=0) { // pre item exists?
if( (itemWithTimeKv._2 - startTimeArray(itemIndex-1))< wd) { // in range
startTimeArray(itemIndex) = startTimeArray(itemIndex-1)
result(itemIndex) = true
}else // out range
startTimeArray = Array.fill[Long](rt.length)(0l)
}
if(result(indexArrayLength) == true) notFull = false
}
result
}
def main(args:Array[String]): Unit = {
val data =Array(("A",1450000000000l),
("B",1450000000001l),
("C",1430000000002l),
("A",1460000000001l)
)
val rt = Array("A","B","C")
val wd = 3600000
println(taskcal(data,rt,wd).mkString(","))
// bench("r",100000,taskcal(data,rt,wd))
}
def bench(name:String,count:Int,f: => Unit): Unit ={
val begin = System.currentTimeMillis()
for(i <-0 to count) f
val end = System.currentTimeMillis()
println(s"name : ${name} count: $count count:${end - begin} ")
}
val storageDir = "UserBehaviorDStream"
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
val sqlContext = new SQLContext(ssc.sparkContext)
val getPartitionDate = udf(FunnelUtil.getDatePartiton _)
lines.foreachRDD(rdd => {
val userBehiviorData = rdd.map(x => {println("**:"+x);x.split(",")}).filter(_.length == 6)
.map(r => RowFactory.create(r(0), r(1), r(2), r(3), r(4), r(5)))
val userBehiviorDataDF = sqlContext.createDataFrame(userBehiviorData, getStructType)
val userBehiviorDataPartition = userBehiviorDataDF.withColumn("yyyyMMddHH", getPartitionDate(userBehiviorDataDF("eventTime"))).coalesce(1)
userBehiviorDataPartition.write.format("parquet").mode("append").partitionBy("yyyyMMddHH").save(storageDir)
})
ssc.start()
ssc.awaitTermination()
}
val getStructType = {
val structFields = mutable.ArrayBuffer[StructField]()
structFields += DataTypes.createStructField("userId", DataTypes.StringType, true)
structFields += DataTypes.createStructField("eventTime", DataTypes.StringType, true)
structFields += DataTypes.createStructField("itemId", DataTypes.StringType, true)
structFields += DataTypes.createStructField("itemName", DataTypes.StringType, true)
structFields += DataTypes.createStructField("eventAttribute", DataTypes.StringType, true)
structFields += DataTypes.createStructField("eventDate", DataTypes.StringType, true)
val structType = DataTypes.createStructType(structFields.toArray)
structType
}
发表评论
-
Spark SQL运行 过程 抄的别人的,记录 学习
2018-05-13 23:07 1038抄的别人的,觉得写的特别好 val FILESOURCE ... -
thriftserver log4j.properties 生效
2018-04-09 11:46 455/home/isuhadoop/spark2/sbin/sta ... -
udaf 返回的 子属性
2018-03-20 13:22 448udaf 返回的 子属性 spark.sql(" ... -
spark datasource
2018-03-16 16:36 673DataFrameWriter format val c ... -
如何 map 端 Join。
2018-03-04 19:31 634Hive 中 修改表的 rawDataSize = 1 1 ... -
spark thrift server 修改
2018-03-04 12:58 591org.apache.spark.sql.hive.thrif ... -
hive hbase thriftserver run
2018-03-03 15:13 418正确方法 : 0\ 拷贝对应目录到 spark2 jars ... -
scala package
2018-01-25 09:48 538#scala 打包 mvn clean scala:com ... -
SPARK SERVER
2018-01-23 22:15 557sbin/start-thriftserver.sh --dr ... -
driver class
2018-01-21 22:11 528sbin/start-thriftserver.sh -- ... -
spark thrift server 调试
2017-10-20 15:50 869spark-hive-thriftserver 本地调试 ... -
spark SQL conf
2017-10-18 14:36 648org.apache.spark.sql.internal.S ... -
java 死锁 ,内存问题 分析
2017-10-17 10:50 356jstack -l pid /opt/soft/jdk/ ... -
thriftServer proxy
2017-10-16 14:21 949sudo yum install haproxy 257 ... -
hive spark conf
2017-09-26 17:44 1302CREATE TABLE org_userbehavior_a ... -
get day
2017-09-19 08:41 577def timeDayNow() = { var ... -
thriftserver
2017-09-14 19:47 481export SPARK_CONF_DIR=/home/yun ... -
thriftserver dynamicallocation
2017-09-08 14:41 594./sbin/start-thriftserver.sh -- ... -
test code2
2017-09-03 13:45 494package org.test.udf import co ... -
struct streaming SQL udf udaf
2017-08-22 11:50 683spark aggregator class H ...
相关推荐
"test-code-9.rar_testcode 实验"作为一个专为达盛ARM处理器编写的实验代码集合,为初学者提供了一个学习平台,用以加深对ARM架构的理解,锻炼编程技能。 达盛ARM,作为ARM处理器的一种变体,因其实用性和广泛的...
《testcode图书管理系统样例》是一个专为演示和学习设计的软件系统,它展示了如何有效管理图书信息,包括书籍的录入、查询、借阅和归还等操作。在这个系统中,testcode扮演着核心角色,它是实现图书管理功能的关键...
本项目“ISPorCOM test code”旨在演示如何利用AVRUSB进行ISP(In-System Programming)和COM(串行通信)功能。下面我们将深入探讨这些关键知识点。 首先,ISP是嵌入式系统领域中的一种编程方式,它允许开发者在...
"test code project for UI" 指的是一种专门针对UI进行的自动化测试项目,目的是为了验证应用程序的外观、响应速度、易用性以及各种交互元素是否符合设计规范和用户需求。通过编写测试代码,可以有效地减少手动测试...
在这个场景中,"test code for video" 提到了针对视频处理的测试代码,这意味着我们可能涉及多媒体处理、流媒体传输或者视频编码解码等相关技术。下面我们将详细探讨这些领域的知识点。 1. **多媒体处理**:视频是...
"current_test_code.rar_current te_current test code_current test手"这个标题暗示我们这里包含的是一份关于手机待机电流测试的代码或者程序,可能是用于自动化测试的脚本或应用。 描述中的“测试手机待机电流, ...
pyopengl testcode demopyopengl testcode demopyopengl testcode demopyopengl testcode demopyopengl testcode demo
本压缩包"testcode.zip"包含了与MyBatis批量更新相关的示例代码,我们可以从中学习到如何在实际项目中实现这一功能。 首先,MyBatis批量更新的基本思想是通过一次SQL执行来更新多条记录,从而减少数据库的交互次数...
【标题】"TestCode.rar" 是一个包含了作者在学习Java编程过程中积累的案例代码集合。这个压缩包很可能是为了方便分享、备份或者系统性地整理Java编程的学习成果。通过这样的案例,我们可以深入理解Java语言的核心...
标题 "CodedUI test code for excel" 描述的是使用Microsoft的Coded UI测试工具来编写自动化测试代码,针对Excel应用程序进行测试的一种技术。Coded UI测试是Visual Studio中的一种功能,它允许开发者录制并自动生成...
在这个场景中,"MoneyAccount_shorter2bc_testcode_"很可能是一个测试项目的名字,它专注于管理金钱账户的方面。"vc++"和"c++11"标签揭示了这个项目使用的是Microsoft的Visual C++编译器,并且遵循C++11标准进行编程...
在描述中提到的“test code”可能包括了以下几种类型的测试: 1. **单元测试**:测试代码中的单个函数或方法,确保它们按预期工作。这通常涉及到模拟依赖项以隔离被测试的代码。 2. **集成测试**:测试多个组件...
国际标准原文,国标依据此文件的第三版制订了《GBT 17421.2-2016 机床检验通则》。 这是第四版,高清英文版。 This part of ISO 230 specifies methods for testing and evaluating the accuracy and repeatability ...
11-2 testcode
标题中的“进制转换TestCode”表明这是一个关于编程中不同进制之间转换的代码测试案例。在计算机科学和编程中,进制转换是一项基础但至关重要的技能,因为计算机内部使用二进制(0和1)表示数据,而我们通常与十进制...
本项目"django + jquery First Test Code"旨在演示如何结合这两种技术进行开发,以实现高效且用户友好的Web应用程序。 Django是一个用Python编写的高级Web框架,它遵循模型-视图-控制器(MVC)架构模式。Django强调...
本文将深入探讨“反射学习PPT + TEST CODE”中的核心知识点。 一、反射基础 1. 类型信息:反射允许程序在运行时获取关于类、接口、结构等类型的信息,如类型名称、属性、方法、构造函数等。这在设计动态代码、元...
标题中的“DSP 28335 TestCode4PI.rar”指的是一个针对TI公司的TMS320F28335 DSP(数字信号处理器)的测试代码压缩包,主要涉及的是PI(比例积分)调节算法的应用。这个压缩包包含了用于PI控制器设计的示例代码,...