- 浏览: 58255 次
- 性别:
- 来自: 北京
-
文章分类
最新评论
一、PV统计思考
方案需要考虑分析多线程下,注意线程安全问题。
线程安全:多线程处理的结果和单线程一致
如下是否可行?
不可行方案:
定义static long pv, Synchronized 控制累计操作。Synchronized 和 Lock在单个JVM下有效,但在多JVM下无效。
可行方案两个方案:
1、shuffleGrouping下,pv * Executer并发数
2、bolt1进行多并发局部汇总,bolt2单线程进行全局汇总
二、实现
注意:多线程下每一个bolt中的execute方法都会执行多次,类似一个while循环。
1、bolt1进行多并发(局部)汇总处理类
2、bolt2单线程进行全局汇总处理类
3、topology运行main类
-------------------------------其它辅助类---------------------------
4、数据读取spout处理类
5、pom文件引用前几篇文章
6、处理结果
方案需要考虑分析多线程下,注意线程安全问题。
线程安全:多线程处理的结果和单线程一致
如下是否可行?
不可行方案:
定义static long pv, Synchronized 控制累计操作。Synchronized 和 Lock在单个JVM下有效,但在多JVM下无效。
可行方案两个方案:
1、shuffleGrouping下,pv * Executer并发数
2、bolt1进行多并发局部汇总,bolt2单线程进行全局汇总
二、实现
注意:多线程下每一个bolt中的execute方法都会执行多次,类似一个while循环。
1、bolt1进行多并发(局部)汇总处理类
public class PVBolt1 implements IRichBolt{ /** * bolt1进行多并发(局部)汇总 */ OutputCollector collector = null; private static final long serialVersionUID = 1L; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } String logString; String session_id; long pv = 0; public void execute(Tuple input) { logString = input.getString(0); session_id = logString.split("\t")[1]; if(session_id !=null){ pv ++; } collector.emit(new Values(Thread.currentThread().getId(),pv)); System.err.println("threadId = "+ Thread.currentThread().getId()+"; pv="+pv); } public void cleanup() { // TODO Auto-generated method stub } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("threadId", "count")); } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } }
2、bolt2单线程进行全局汇总处理类
public class PVBolt2 implements IRichBolt{ /** * bolt2单线程进行全局汇总 */ private static final long serialVersionUID = 1L; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { // TODO Auto-generated method stub } Map<Long,Long>counts = new HashMap<Long,Long>(); public void execute(Tuple input) { Long thread_id = input.getLong(0); Long pv = input.getLong(1); counts.put(thread_id,pv); System.err.println(" threadId="+thread_id+"-------------pv="+pv); long word_sum = 0; //获取总数,遍历counts 的values,进行sum Iterator<Long> i = counts.values().iterator() ; while(i.hasNext()) { word_sum += i.next(); } System.err.println("PVBolt2-------------pv="+word_sum+"\r"); } public void cleanup() { // TODO Auto-generated method stub } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } }
3、topology运行main类
public class Main { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new MySpout(), 1); builder.setBolt("bolt1", new PVBolt1(),4).shuffleGrouping("spout"); builder.setBolt("bolt2", new PVBolt2(),1).shuffleGrouping("bolt1"); Map conf = new HashMap(); conf.put(Config.TOPOLOGY_WORKERS, 4); if (args.length > 0) { try { StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); }catch (AuthorizationException e) { e.printStackTrace(); } }else { LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mytopology", conf, builder.createTopology()); } } }
-------------------------------其它辅助类---------------------------
4、数据读取spout处理类
public class MySpout implements IRichSpout{ /** * 数据读取spout处理类 */ private static final long serialVersionUID = 1L; FileInputStream fis; InputStreamReader isr; BufferedReader br; SpoutOutputCollector collector = null; String str = null; public void nextTuple() { try { while ((str = this.br.readLine()) != null) { // 过滤动作 collector.emit(new Values(str)); // Thread.sleep(3000); //to do } } catch (Exception e) { // TODO: handle exception } } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { try { this.collector = collector; this.fis = new FileInputStream("track.log"); this.isr = new InputStreamReader(fis, "UTF-8"); this.br = new BufferedReader(isr); } catch (Exception e) { e.printStackTrace(); } // 打开文件 } public void declareOutputFields(OutputFieldsDeclarer declarer) { // 发射数据格式,与bolt接收数据一致 declarer.declare(new Fields("log")); } public Map<String, Object> getComponentConfiguration() { // 与ope方法中的map对应 return null; } public void ack(Object msgId) { // TODO Auto-generated method stub } public void activate() { // TODO Auto-generated method stub } public void close() { // TODO Auto-generated method stub } public void deactivate() { // TODO Auto-generated method stub } public void fail(Object msgId) { // TODO Auto-generated method stub } }
5、pom文件引用前几篇文章
6、处理结果
引用
threadId=156-------------pv=44
PVBolt2-------------pv=44
threadId=156-------------pv=45
PVBolt2-------------pv=45
threadId=156-------------pv=46
PVBolt2-------------pv=46
threadId=156-------------pv=47
PVBolt2-------------pv=47
threadId=152-------------pv=1
PVBolt2-------------pv=48
threadId=215-------------pv=1
PVBolt2-------------pv=49
9234 [Thread-62-bolt1-executor[5 5]]
threadId = 227; pv=1
threadId=227-------------pv=1
PVBolt2-------------pv=50
PVBolt2-------------pv=44
threadId=156-------------pv=45
PVBolt2-------------pv=45
threadId=156-------------pv=46
PVBolt2-------------pv=46
threadId=156-------------pv=47
PVBolt2-------------pv=47
threadId=152-------------pv=1
PVBolt2-------------pv=48
threadId=215-------------pv=1
PVBolt2-------------pv=49
9234 [Thread-62-bolt1-executor[5 5]]
threadId = 227; pv=1
threadId=227-------------pv=1
PVBolt2-------------pv=50
发表评论
-
ITridentSpout、FirstN(取Top N)实现、 流合并和join
2017-05-25 10:01 1052一、ITridentSpout 基于事务 static int ... -
Trident实战之计算网站PV
2017-05-24 13:24 6611、Trident实战之计算网站PV /** * ... -
Trident API和概念
2017-05-23 10:57 773一、Trident API——Spout ITride ... -
Trident入门
2017-05-22 13:44 534英文原址:https://github.com/nathanm ... -
分布式远程调用drpc实例
2017-05-22 10:53 430一、DRPC定义 分布式dRPC(distributed RP ... -
不透明分区事务IOpaquePartitionedTransactional实例
2017-05-22 10:54 6901、spout public class MyOpaq ... -
分区事务IPartitionedTransactionalSpout实例
2017-05-21 11:02 5961.分区事务spout public class My ... -
普通事务ITransactionalSpout实例之按天统计数据
2017-05-20 16:56 5001、普通事务Spout /** * 普通事务Spou ... -
普通事务ITransactionalSpout实例
2017-05-20 15:45 8351、普通事务Spout /** * 普通事务Spou ... -
Storm事务API
2017-05-19 16:00 625Spout ITransactionalSpout<T& ... -
Storm批处理事务原理详解
2017-05-19 15:54 2137事务:Storm容错机制通 ... -
集群统一启动和停止shell脚本开发
2017-05-17 09:56 4711、cd 2、ls -al 显示隐藏目录 3、rm -rf ... -
storm高并发UV统计
2017-05-14 22:05 1158统计高并发UV可行的方案(类似WordCount的计算去重wo ... -
storm高并发PV统计,利用zookeeper锁输出汇总值
2017-05-14 14:42 917汇总型方案: 1、shuffleGrouping下,pv(单线 ... -
Storm高并发运用WordSum
2017-04-16 14:21 10851、创建发射所有字符串统计总个数及去重个数处理类 pub ... -
storm分组策略介绍
2017-04-16 11:46 712一、storm数据来源 Spout的数据源: MQ:直接流数 ... -
Storm高并发介绍
2017-04-16 10:18 620并发度: worker:指的是component (spo ... -
Storm 字符统计Demo
2017-04-14 13:57 5451、数据源读取,字符发射spout类 /** * 字符 ... -
Storm 本地模式
2017-04-09 22:25 407本地模式,是在eclipse等编译器编写strom运行文件 ... -
Storm启动配置
2017-03-29 17:40 686一、安装Storm wget ...
相关推荐
项目的实施过程中,需要注意的问题包括如何高效地进行数据聚合,如何确保在高并发下的数据一致性,以及如何优化HTTP长连接以保证实时数据推送的稳定性和效率。通过这种方式,1号店电商实时数据分析系统能够提供对非...
数据批处理平台包含不同类型的Job,例如基础类Url处理、统计分布类PV/UV计算、安全分析类漏洞分析等。流数据处理平台需要具备横向扩展、负载均衡、错误恢复、任务迁移、集中控制和引擎异构等特点。 4. 安全服务协作...
轴类零件加工工艺设计.zip
资源内项目源码是来自个人的毕业设计,代码都测试ok,包含源码、数据集、可视化页面和部署说明,可产生核心指标曲线图、混淆矩阵、F1分数曲线、精确率-召回率曲线、验证集预测结果、标签分布图。都是运行成功后才上传资源,毕设答辩评审绝对信服的保底85分以上,放心下载使用,拿来就能用。包含源码、数据集、可视化页面和部署说明一站式服务,拿来就能用的绝对好资源!!! 项目备注 1、该资源内项目代码都经过测试运行成功,功能ok的情况下才上传的,请放心下载使用! 2、本项目适合计算机相关专业(如计科、人工智能、通信工程、自动化、电子信息等)的在校学生、老师或者企业员工下载学习,也适合小白学习进阶,当然也可作为毕设项目、课程设计、大作业、项目初期立项演示等。 3、如果基础还行,也可在此代码基础上进行修改,以实现其他功能,也可用于毕设、课设、作业等。 下载后请首先打开README.txt文件,仅供学习参考, 切勿用于商业用途。
seaborn基本绘图人力资源数据集
移动机器人(sw三维)
自制html网页源代码查看器
3吨叉车的液压系统设计().zip
1_实验三 扰码、卷积编码及交织.ppt
北京交通大学软件学院自命题科目考试大纲.pdf
雅鲁藏布江流域 shp矢量数据 (范围+DEM).zip
基于RUST的数据结构代码示例,栈、队列、图等
NIFD:2024Q1房地产金融报告
详细介绍及样例数据:https://blog.csdn.net/li514006030/article/details/146916652
【工业机器视觉定位软件Vision-Detect】基于C#的WPF与Halcon开发的工业机器视觉定位软件(整套源码),开箱即用 有用户登录,图片加载,模板创建,通讯工具,抓边抓圆,良率统计,LOG日志,异常管理,九点标定和流程加载保存等模块,功能不是很完善,适合初学者参考学习。 资源介绍请查阅:https://blog.csdn.net/m0_37302966/article/details/146912206 更多视觉框架资源:https://blog.csdn.net/m0_37302966/article/details/146583453
内容概要:本文档详细介绍了Java虚拟机(JVM)的相关知识点,涵盖Java内存模型、垃圾回收机制及算法、垃圾收集器、内存分配策略、虚拟机类加载机制和JVM调优等内容。首先阐述了Java代码的编译和运行过程,以及JVM的基本组成部分及其运行流程。接着深入探讨了JVM的各个运行时数据区,如程序计数器、Java虚拟机栈、本地方法栈、Java堆、方法区等的作用和特点。随后,文档详细解析了垃圾回收机制,包括GC的概念、工作原理、优点和缺点,并介绍了几种常见的垃圾回收算法。此外,文档还讲解了JVM的分代收集策略,新生代和老年代的区别,以及不同垃圾收集器的工作方式。最后,文档介绍了类加载机制、JVM调优的方法和工具,以及常用的JVM调优参数。 适合人群:具备一定Java编程基础的研发人员,尤其是希望深入了解JVM内部机制、优化程序性能的技术人员。 使用场景及目标:①帮助开发人员理解Java代码的编译和执行过程;②掌握JVM内存管理机制,包括内存分配、垃圾回收等;③熟悉类加载机制,了解类加载器的工作原理;④学会使用JVM调优工具,掌握常用调优参数,提升应用程序性能。 其他说明:本文档内容详尽,适合用作面试准备材料和技术学习资料,有助于提高开发人员对JVM的理解和应用能力。
Android项目原生java语言课程设计,包含LW+ppt
戴德梁行&中国房地产协会:2021亚洲房地产投资信托基金研究报告
Android项目原生java语言课程设计,包含LW+ppt
Thinkphp6.0+vue个人虚拟物品发卡网站源码 支持码支付对接 扫码自动发货 源码一共包含两个部分thinkphp6.0后端文件,以及vue前端文件.zip