使用spark streaming 需要搭建Kafka、zookeeper,搭建的方法网上有很多,再此不再多讲:
文章中的代码参考:https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice2/
代码如下:
import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Duration, Seconds, StreamingContext} object WebPagePopularityValueCalculator { private val checkpointDir = "popularity-data-checkpoint" //kafka message consumer group ID private val msgConsumerGroup = "user-behavior-topic-message-consumer-group" def main(args:Array[String]){ //定义zkserver和端口号及多少秒收集一次数据 val Array(zkServers,processingInterval) = Array("172.4.23.99:2181,172.4.23.99:2182,172.4.23.99:2183","2") val conf = new SparkConf().setAppName("WebPagePopularityValueCalculator") val ssc = new StreamingContext(conf,Seconds(processingInterval.toInt)) //using updateStateByKey asks for enabling checkpoint ssc.checkpoint(checkpointDir) //msgConsumerGroup:kafka的客户端可以是一个组,一条信息只能由组里的一台机器消费 val kafkaStream:ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,zkServers,msgConsumerGroup,Map("user-behavior-topic" -> 3)) //返回的数据集是k,v形型,k:topic,v:写入kafka的数据集 val msgDataRDD = kafkaStream.map(_._2) //for debug use only //println("Coming data in this interval...") //msgDataRDD.print() //计算权重 val popularityData = msgDataRDD.map{msgLine => { val dataArr:Array[String] = msgLine.split("[|]") val pageID = dataArr(0) val popValue:Double = dataArr(1).toFloat * 0.8 + dataArr(2).toFloat * 0.8 + dataArr(3).toFloat * 1 (pageID,popValue) }} //sum the previous popularity value and current value //iterator有三个参数,第一个参数为key,第二个参数:上一次的数据集,第三个参数:本次的数据 //处理逻辑是:将上一次的数据集求和,再加上本次的数据,返回结果之和 //再返回(key,sumedValue) val updatePopularityValue = (iterator: Iterator[(String, Seq[Double], Option[Double])]) => { iterator.flatMap(t => { val newValue:Double = t._2.sum val stateValue:Double = t._3.getOrElse(0) Some(newValue + stateValue) }.map(sumedValue => (t._1, sumedValue))) } val initialRDD = ssc.sparkContext.parallelize(List(("page1",0.00))) val stateDstream = popularityData.updateStateByKey[Double](updatePopularityValue,new HashPartitioner(ssc.sparkContext.defaultParallelism),true,initialRDD) //set the checkpoint interval to avoid too frequently data checkpoint which may //may significantly reduce operation throughput stateDstream.checkpoint(Duration(8 * processingInterval.toInt * 1000)) //after calculation, we need to sort the result and only show the top 10 hot pages stateDstream.foreachRDD(rdd => { val sortedData = rdd.map{case (k,v) => (v,k)}.sortByKey(false) val topKData = sortedData.take(10).map{case (v,k) => (k,v)} topKData.foreach(x => { println(x) }) }) ssc.start() ssc.awaitTermination() } }
上述代码的核心是:updateStateByKey。看一下源码:
/** * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of each key. * org.apache.spark.Partitioner is used to control the partitioning of each RDD. * @param updateFunc State update function. Note, that this function may generate a different * tuple with a different key than the input key. Therefore keys may be removed * or added in this way. It is up to the developer to decide whether to * remember the partitioner despite the key being changed. * @param partitioner Partitioner for controlling the partitioning of each RDD in the new * DStream * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs. * @tparam S State type */ def updateStateByKey[S: ClassTag]( updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean ): DStream[(K, S)] = ssc.withScope { new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, None) }
- initialRDD是(K,S)类型的RDD,它表示一组Key的初始状态,每个(K,S)表示一个Key以及它对应的State状态。 K表示updateStateByKey的Key的类型,比如String,而S表示Key对应的状态(State)类型,在上例中,是Int
- rememberPartitioner: 表示是否在接下来的Spark Streaming执行过程中产生的RDD使用相同的分区算法
- partitioner: 分区算法,上例中使用的Hash分区算法,分区数为ssc.sparkContext.defaultParallelism
- updateFunc是函数常量,类型为(Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],表示状态更新函数
(Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)]如何解读?
入参: 三元组迭代器,三元组中K表示Key,Seq[V]表示一个时间间隔中产生的Key对应的Value集合(Seq类型,需要对这个集合定义累加函数逻辑进行累加),Option[S]表示上个时间间隔的累加值(表示这个Key上个时间点的状态)
出参:二元组迭代器,二元组中K表示Key,S表示当前时间点执行结束后,得到的累加值(即最新状态)
总结:(1)环境的搭建;(2)理解Kafka的基本源理;(3)明白updateStateByKey的使用;
参考:
http://bit1129.iteye.com/blog/2198682
https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice2/
相关推荐
- **使用 Accumulators**:在需要全局计数器的场景下,使用累加器可以减少通信开销。 - **合理使用 Checkpointing**:频繁的检查点可能会引入额外的 I/O 开销,应根据实际需求调整检查点的频率。 综上所述,Spark ...
还将学习RDD的关键动作运算,如collectAsMap、countByKey,以及共享变量(累加器和广播变量)的概念。 **第四章 DataFrame&&SparkSQL** 本章引入DataFrame,这是Spark处理结构化数据的主要工具。学生将学习如何进行...
描述:Gowin(高云)是国产FPGA里做的比较好的几家之一(安路、紫光、高云、复旦微等)。由于开发板和实际项目较少,此处按开发板对工程项目进行分类。 其他:大部分工程均有从Cyclone IV仓库移植的影子,如有其他项目需求移步Cyclone IV的仓库。 主要目录: Pocket_lab_F0: 基于高云GW1N-LV9的一款FPGA开发板,易思达和高云大学计划出品 Pocket_lab_F2: 基于高云GW2A-LV18的一款FPGA开发板,易思达和高云大学计划出品 Tang Mega 138K:基于 GW5AST-LV138 的一款FPGA开发板,Sipeed出品,曾用于 2023 年全国大学生 FPGA 大赛高云赛区 Tang Primer 20K:基于 GW2A-LV18 的一款FPGA开发板,Sipeed出品,曾用于 2022-2023 年全国大学生 FPGA 大赛高云赛区 Tang Nano 20K: 基于GW2A-LV18的一款FPGA开发板,Sipeed出品 Tang Nano 9K:基于 GW1NR-LV9 的一款 FPGA 开发板,Sipeed出
TensorFlow Python版环境安装指南:从底层环境到anaconda配置的详细步骤,使用Mask R-CNN源码实现多张连续输出,兼容项目迁移至TensorFlow.js的技巧。,TensorFlow.Python版底层环境安装指南及Anaconda环境快速配置说明:涵盖Mask R-CNN源码实现及多图输出功能的训练项目搭建教程(附以往程序回溯及新版tensorflowjs的应用介绍),tensotflow.python版本底层环境安装命令。 程序都写好复制就可以安装anacoda环境。 maskrcnn源码可以连续输出多张。 以及可以跑通项目,现在用tensorflowjs了。 这个是之前跑通的主程序很多忘记了。 源程了 ,tensorflow_python_安装命令; anaconda环境安装; maskrcnn_源码; 连续输出多张; tensorflowjs; 主程序跑通。,TensorFlow环境安装指南:Python版anaconda环境配置命令及MaskRCNN源码使用教程
如何基于大模型(DeepSeek)实现一个多智能体的对话系统的,python脚本
AIAG(汽车工业行动小组)与VDA(德国汽车工业联合会)联合发布的FMEA(潜在失效模式及后果分析)标准,已成为全球汽车制造商和供应商的权威指南。本课程专注于过程FMEA(PFMEA),旨在帮助您深入理解其核心理念、方法与实践应用,掌握如何通过PFMEA识别设计缺陷、预防潜在问题,从而提升产品设计质量,降低开发成本,增强市场竞争力。 讲解新版FMEA七步法:1.策划与准备;2.结构分析;3.功能分析;4.失效分析;5.风险分析;6.优化;7.结果文件化。
实验报告
matlab下载
21考试真题最近的t319.txt
基于Java Swing 写的学生成绩管理系统 有数据库文件,用了仿苹果化的皮肤界面.zip项目工程资源经过严格测试运行并且功能上ok,可实现复现复刻,拿到资料包后可实现复现出一样的项目,本人系统开发经验充足(全栈全领域),有任何使用问题欢迎随时与我联系,我会抽时间努力为您解惑,提供帮助 【资源内容】:包含源码+工程文件+说明等。答辩评审平均分达到96分,放心下载使用!可实现复现;设计报告也可借鉴此项目;该资源内项目代码都经过测试运行,功能ok 【项目价值】:可用在相关项目设计中,皆可应用在项目、毕业设计、课程设计、期末/期中/大作业、工程实训、大创等学科竞赛比赛、初期项目立项、学习/练手等方面,可借鉴此优质项目实现复刻,设计报告也可借鉴此项目,也可基于此项目来扩展开发出更多功能 【提供帮助】:有任何使用上的问题欢迎随时与我联系,抽时间努力解答解惑,提供帮助 【附带帮助】:若还需要相关开发工具、学习资料等,我会提供帮助,提供资料,鼓励学习进步 下载后请首先打开说明文件(如有);整理时不同项目所包含资源内容不同;项目工程可实现复现复刻,如果基础还行,也可在此程序基础上进行修改,以实现其它功能。供开源学习/技术交流/学习参考,勿用于商业用途。质量优质,放心下载使用,资源为网络商品(电子资料类)基于网络商品和电子资料商品的性质和特征不支持退款,质量优质,放心下载使用
21考试真题最近的t284.txt
项目工程资源经过严格测试运行并且功能上ok,可实现复现复刻,拿到资料包后可实现复现出一样的项目,本人系统开发经验充足(全栈全领域),有任何使用问题欢迎随时与我联系,我会抽时间努力为您解惑,提供帮助 【资源内容】:包含源码+工程文件+说明等。答辩评审平均分达到96分,放心下载使用!可实现复现;设计报告也可借鉴此项目;该资源内项目代码都经过测试运行,功能ok 【项目价值】:可用在相关项目设计中,皆可应用在项目、毕业设计、课程设计、期末/期中/大作业、工程实训、大创等学科竞赛比赛、初期项目立项、学习/练手等方面,可借鉴此优质项目实现复刻,设计报告也可借鉴此项目,也可基于此项目来扩展开发出更多功能 【提供帮助】:有任何使用上的问题欢迎随时与我联系,抽时间努力解答解惑,提供帮助 【附带帮助】:若还需要相关开发工具、学习资料等,我会提供帮助,提供资料,鼓励学习进步 下载后请首先打开说明文件(如有);整理时不同项目所包含资源内容不同;项目工程可实现复现复刻,如果基础还行,也可在此程序基础上进行修改,以实现其它功能。供开源学习/技术交流/学习参考,勿用于商业用途。质量优质,放心下载使用,资源为网络商品(电子资料类)基于网络商品和电子资料商品的性质和特征不支持退款,质量优质,放心下载使用
项目工程资源经过严格测试运行并且功能上ok,可实现复现复刻,拿到资料包后可实现复现出一样的项目,本人系统开发经验充足(全栈全领域),有任何使用问题欢迎随时与我联系,我会抽时间努力为您解惑,提供帮助 【资源内容】:包含源码+工程文件+说明等。答辩评审平均分达到96分,放心下载使用!可实现复现;设计报告也可借鉴此项目;该资源内项目代码都经过测试运行,功能ok 【项目价值】:可用在相关项目设计中,皆可应用在项目、毕业设计、课程设计、期末/期中/大作业、工程实训、大创等学科竞赛比赛、初期项目立项、学习/练手等方面,可借鉴此优质项目实现复刻,设计报告也可借鉴此项目,也可基于此项目来扩展开发出更多功能 【提供帮助】:有任何使用上的问题欢迎随时与我联系,抽时间努力解答解惑,提供帮助 【附带帮助】:若还需要相关开发工具、学习资料等,我会提供帮助,提供资料,鼓励学习进步 下载后请首先打开说明文件(如有);整理时不同项目所包含资源内容不同;项目工程可实现复现复刻,如果基础还行,也可在此程序基础上进行修改,以实现其它功能。供开源学习/技术交流/学习参考,勿用于商业用途。质量优质,放心下载使用,资源为网络商品(电子资料类)基于网络商品和电子资料商品的性质和特征不支持退款,质量优质,放心下载使用
多维度基因表达与关联性分析:WGCNA、共表达网络、表型关联与聚类模块解析,基于生物信息学技术的基因网络与模块综合分析,WGCNA分析 基因共表达网络分析 基因表型关联分析 基因聚类分析 基因模块分析 ,WGCNA分析; 基因共表达网络分析; 基因表型关联分析; 基因聚类分析; 基因模块分析,基因网络与模块分析综合研究
1a89e6f5b1485b055ed7f0aeccd7b9f9.docx
实验报告
Matlab Simulink模型:三机九节点系统中双馈风机虚拟惯性与下垂控制在频率二次跌落中的应用与对比,Matlab Simulink模型:三机九节点系统下的风机虚拟惯性与下垂控制参与一次调频及频率二次跌落对比研究,Matlab simulink 频率二次跌落,双馈风机惯性控制+下垂控制参与系统一次调频的Matlab Simulink模型,调频结束后转速回复,造成频率二次跌落 系统为三机九节点模型,所有参数已调好且可调,可直接运行,风电渗透率20% 风机采用惯性+下垂控制。 有文档,可讲解。 对比不同恢复时间下二次跌落。 ,核心关键词: Matlab Simulink; 频率二次跌落; 双馈风机; 虚拟惯性控制; 下垂控制; 系统一次调频; 转速恢复; 三机九节点模型; 风电渗透率; 恢复时间; 对比。,Matlab Simulink模型中双馈风机虚拟惯性控制与下垂控制对频率二次跌落的影响研究
国家农业龙头企业数量主要指的是经过国家相关部门认定,并在农业产业化方面发挥重要引领作用的企业的总数。这些企业通常以农产品加工或流通为主业,通过各种利益联结机制与农户相联系,带动农户进入市场,实现农产品生产、加工、销售的有机结合和相互促进。 数据名称:国家农业龙头企业数量 数据年份:2010-2022年 ## 02、相关数据 省份、年份、龙头企业数量。
基于5G指纹的智能室内定位技术研究_王志坤.pdf
项目工程资源经过严格测试运行并且功能上ok,可实现复现复刻,拿到资料包后可实现复现出一样的项目,本人系统开发经验充足(全栈全领域),有任何使用问题欢迎随时与我联系,我会抽时间努力为您解惑,提供帮助 【资源内容】:包含源码+工程文件+说明等。答辩评审平均分达到96分,放心下载使用!可实现复现;设计报告也可借鉴此项目;该资源内项目代码都经过测试运行,功能ok 【项目价值】:可用在相关项目设计中,皆可应用在项目、毕业设计、课程设计、期末/期中/大作业、工程实训、大创等学科竞赛比赛、初期项目立项、学习/练手等方面,可借鉴此优质项目实现复刻,设计报告也可借鉴此项目,也可基于此项目来扩展开发出更多功能 【提供帮助】:有任何使用上的问题欢迎随时与我联系,抽时间努力解答解惑,提供帮助 【附带帮助】:若还需要相关开发工具、学习资料等,我会提供帮助,提供资料,鼓励学习进步 下载后请首先打开说明文件(如有);整理时不同项目所包含资源内容不同;项目工程可实现复现复刻,如果基础还行,也可在此程序基础上进行修改,以实现其它功能。供开源学习/技术交流/学习参考,勿用于商业用途。质量优质,放心下载使用,资源为网络商品(电子资料类)基于网络商品和电子资料商品的性质和特征不支持退款,质量优质,放心下载使用