- 浏览: 56410 次
- 性别:
- 来自: 北京
文章分类
最新评论
一、PV统计思考
方案需要考虑分析多线程下,注意线程安全问题。
线程安全:多线程处理的结果和单线程一致
如下是否可行?
不可行方案:
定义static long pv, Synchronized 控制累计操作。Synchronized 和 Lock在单个JVM下有效,但在多JVM下无效。
可行方案两个方案:
1、shuffleGrouping下,pv * Executer并发数
2、bolt1进行多并发局部汇总,bolt2单线程进行全局汇总
二、实现
注意:多线程下每一个bolt中的execute方法都会执行多次,类似一个while循环。
1、bolt1进行多并发(局部)汇总处理类
2、bolt2单线程进行全局汇总处理类
3、topology运行main类
-------------------------------其它辅助类---------------------------
4、数据读取spout处理类
5、pom文件引用前几篇文章
6、处理结果
方案需要考虑分析多线程下,注意线程安全问题。
线程安全:多线程处理的结果和单线程一致
如下是否可行?
不可行方案:
定义static long pv, Synchronized 控制累计操作。Synchronized 和 Lock在单个JVM下有效,但在多JVM下无效。
可行方案两个方案:
1、shuffleGrouping下,pv * Executer并发数
2、bolt1进行多并发局部汇总,bolt2单线程进行全局汇总
二、实现
注意:多线程下每一个bolt中的execute方法都会执行多次,类似一个while循环。
1、bolt1进行多并发(局部)汇总处理类
public class PVBolt1 implements IRichBolt{ /** * bolt1进行多并发(局部)汇总 */ OutputCollector collector = null; private static final long serialVersionUID = 1L; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } String logString; String session_id; long pv = 0; public void execute(Tuple input) { logString = input.getString(0); session_id = logString.split("\t")[1]; if(session_id !=null){ pv ++; } collector.emit(new Values(Thread.currentThread().getId(),pv)); System.err.println("threadId = "+ Thread.currentThread().getId()+"; pv="+pv); } public void cleanup() { // TODO Auto-generated method stub } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("threadId", "count")); } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } }
2、bolt2单线程进行全局汇总处理类
public class PVBolt2 implements IRichBolt{ /** * bolt2单线程进行全局汇总 */ private static final long serialVersionUID = 1L; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { // TODO Auto-generated method stub } Map<Long,Long>counts = new HashMap<Long,Long>(); public void execute(Tuple input) { Long thread_id = input.getLong(0); Long pv = input.getLong(1); counts.put(thread_id,pv); System.err.println(" threadId="+thread_id+"-------------pv="+pv); long word_sum = 0; //获取总数,遍历counts 的values,进行sum Iterator<Long> i = counts.values().iterator() ; while(i.hasNext()) { word_sum += i.next(); } System.err.println("PVBolt2-------------pv="+word_sum+"\r"); } public void cleanup() { // TODO Auto-generated method stub } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } }
3、topology运行main类
public class Main { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new MySpout(), 1); builder.setBolt("bolt1", new PVBolt1(),4).shuffleGrouping("spout"); builder.setBolt("bolt2", new PVBolt2(),1).shuffleGrouping("bolt1"); Map conf = new HashMap(); conf.put(Config.TOPOLOGY_WORKERS, 4); if (args.length > 0) { try { StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); }catch (AuthorizationException e) { e.printStackTrace(); } }else { LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mytopology", conf, builder.createTopology()); } } }
-------------------------------其它辅助类---------------------------
4、数据读取spout处理类
public class MySpout implements IRichSpout{ /** * 数据读取spout处理类 */ private static final long serialVersionUID = 1L; FileInputStream fis; InputStreamReader isr; BufferedReader br; SpoutOutputCollector collector = null; String str = null; public void nextTuple() { try { while ((str = this.br.readLine()) != null) { // 过滤动作 collector.emit(new Values(str)); // Thread.sleep(3000); //to do } } catch (Exception e) { // TODO: handle exception } } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { try { this.collector = collector; this.fis = new FileInputStream("track.log"); this.isr = new InputStreamReader(fis, "UTF-8"); this.br = new BufferedReader(isr); } catch (Exception e) { e.printStackTrace(); } // 打开文件 } public void declareOutputFields(OutputFieldsDeclarer declarer) { // 发射数据格式,与bolt接收数据一致 declarer.declare(new Fields("log")); } public Map<String, Object> getComponentConfiguration() { // 与ope方法中的map对应 return null; } public void ack(Object msgId) { // TODO Auto-generated method stub } public void activate() { // TODO Auto-generated method stub } public void close() { // TODO Auto-generated method stub } public void deactivate() { // TODO Auto-generated method stub } public void fail(Object msgId) { // TODO Auto-generated method stub } }
5、pom文件引用前几篇文章
6、处理结果
引用
threadId=156-------------pv=44
PVBolt2-------------pv=44
threadId=156-------------pv=45
PVBolt2-------------pv=45
threadId=156-------------pv=46
PVBolt2-------------pv=46
threadId=156-------------pv=47
PVBolt2-------------pv=47
threadId=152-------------pv=1
PVBolt2-------------pv=48
threadId=215-------------pv=1
PVBolt2-------------pv=49
9234 [Thread-62-bolt1-executor[5 5]]
threadId = 227; pv=1
threadId=227-------------pv=1
PVBolt2-------------pv=50
PVBolt2-------------pv=44
threadId=156-------------pv=45
PVBolt2-------------pv=45
threadId=156-------------pv=46
PVBolt2-------------pv=46
threadId=156-------------pv=47
PVBolt2-------------pv=47
threadId=152-------------pv=1
PVBolt2-------------pv=48
threadId=215-------------pv=1
PVBolt2-------------pv=49
9234 [Thread-62-bolt1-executor[5 5]]
threadId = 227; pv=1
threadId=227-------------pv=1
PVBolt2-------------pv=50
发表评论
-
ITridentSpout、FirstN(取Top N)实现、 流合并和join
2017-05-25 10:01 1029一、ITridentSpout 基于事务 static int ... -
Trident实战之计算网站PV
2017-05-24 13:24 6451、Trident实战之计算网站PV /** * ... -
Trident API和概念
2017-05-23 10:57 747一、Trident API——Spout ITride ... -
Trident入门
2017-05-22 13:44 512英文原址:https://github.com/nathanm ... -
分布式远程调用drpc实例
2017-05-22 10:53 414一、DRPC定义 分布式dRPC(distributed RP ... -
不透明分区事务IOpaquePartitionedTransactional实例
2017-05-22 10:54 6751、spout public class MyOpaq ... -
分区事务IPartitionedTransactionalSpout实例
2017-05-21 11:02 5791.分区事务spout public class My ... -
普通事务ITransactionalSpout实例之按天统计数据
2017-05-20 16:56 4821、普通事务Spout /** * 普通事务Spou ... -
普通事务ITransactionalSpout实例
2017-05-20 15:45 8171、普通事务Spout /** * 普通事务Spou ... -
Storm事务API
2017-05-19 16:00 610Spout ITransactionalSpout<T& ... -
Storm批处理事务原理详解
2017-05-19 15:54 2094事务:Storm容错机制通 ... -
集群统一启动和停止shell脚本开发
2017-05-17 09:56 4461、cd 2、ls -al 显示隐藏目录 3、rm -rf ... -
storm高并发UV统计
2017-05-14 22:05 1125统计高并发UV可行的方案(类似WordCount的计算去重wo ... -
storm高并发PV统计,利用zookeeper锁输出汇总值
2017-05-14 14:42 891汇总型方案: 1、shuffleGrouping下,pv(单线 ... -
Storm高并发运用WordSum
2017-04-16 14:21 10661、创建发射所有字符串统计总个数及去重个数处理类 pub ... -
storm分组策略介绍
2017-04-16 11:46 699一、storm数据来源 Spout的数据源: MQ:直接流数 ... -
Storm高并发介绍
2017-04-16 10:18 586并发度: worker:指的是component (spo ... -
Storm 字符统计Demo
2017-04-14 13:57 5311、数据源读取,字符发射spout类 /** * 字符 ... -
Storm 本地模式
2017-04-09 22:25 392本地模式,是在eclipse等编译器编写strom运行文件 ... -
Storm启动配置
2017-03-29 17:40 666一、安装Storm wget ...
相关推荐
在大数据处理领域,Apache Storm是一个实时计算系统,它能够持续处理数据流,实现低延迟、高吞吐量的数据分析。在这个“Storm API实现词频统计”的案例中,我们将深入探讨如何利用Java编程语言和Storm API来构建一个...
本实战案例将重点介绍如何使用Storm Trident来计算网站的页面浏览量(PV,Page View)。 页面浏览量是衡量一个网站受欢迎程度的重要指标,通常通过记录用户对各个页面的访问次数来计算。在传统的批处理场景下,这...
Storm流计算从入门到精通之技术篇(高并发策略、批处理事务、Trident精解、运维监控、企业场景) Storm入门教程 之Storm原理和概念详解,出自Storm流计算从入门到精通之技术篇,Storm入门视频教程用到技术:Storm集群...
【storm统计单词数的demo】是一个基于Apache Storm的入门级示例,旨在帮助初学者理解这个分布式实时计算系统的运作机制。Apache Storm是一个开源的流处理框架,它允许开发者处理无界数据流,实现高吞吐量、低延迟的...
实时数据统计 logstash + redis + storm + mysql 实时统计日志(浏览+交易等)_log_topology
Highcharts是一个JavaScript图表库,用于在Web上生成高质量的、交互式的图表。在“地区销售实时统计”项目中,Highcharts用于将后台处理后的销售数据以图表形式展示出来,例如折线图、柱状图等,使得用户可以直观地...
标题中的“Storm 本地运行 统计字母出现次数”指的是使用Apache Storm分布式流处理系统,在本地环境中进行测试,实现一个简单的应用,该应用的任务是统计输入数据中各个字母出现的频率。Apache Storm是一个实时计算...
主分支: ##包裹包战 mvn clean package -DskipTests=true -Dwarcp ./target/storm-ui.war $TOMCAT_HOME/webapps/包装罐 mvn clean package -DskipTests=truecp ./target/storm-ui-*.jar $STORM_HOME/external/...
Storm的事务性拓扑主要用于确保数据处理的精确一次性交付(exactly-once processing semantics),这在某些需要极高可靠性的实时计算场景中非常关键。 Storm集群的运行模式包括本地模式和分布式模式。本地模式适用...
Storm的设计目标是确保数据处理的高可靠性和低延迟。在Storm中,"ack机制"是实现数据可靠传输的关键特性,它确保了每个消息至少被处理一次(At-Least-Once Delivery)。下面我们将详细探讨Storm的ack机制以及它如何...
1. **Storm/workerbeats/<topology-id>/node-port**:存储 Worker 的运行状态和统计信息,包括 topology-id、Worker 上所有 Executor 的统计信息(例如发送和接收的消息数)、Worker 的启动时间和最后更新时间等。...
* fault-tolerant:Storm 可以自动恢复故障节点,保证系统的高可用性。 * scalable:Storm 可以根据需要水平扩展,提高系统的处理能力。 * flexible:Storm 支持多种数据源和处理方式,可以满足不同的业务需求。 ...
这样的话,storm才能去消费kafka中的实时的访问日志,然后去进行缓存热数据的统计 用得技术方案非常简单,从lua脚本直接创建一个kafka producer,发送数据到kafka ``` wget ...
在Storm中,你可以模拟多个并发用户,进行负载和压力测试,评估Web服务在高并发情况下的稳定性和性能。通过分析响应时间、错误率等指标,可以找出系统的瓶颈和优化方向。 6. 自动化测试: Storm支持脚本化的测试...
4. 鲁棒性:Storm设计目标之一是提供健壮、容易管理的集群,即使在高并发的情况下也能保证系统的稳定运行。 5. 容错性:Storm具备容错能力,当计算出现错误时,系统能够重新分配任务,保证计算的持续进行。 6. 编程...
本教程将详细介绍如何基于Storm搭建本地集群,并实现一个可运行的实时统计CallLog的示例。这个过程涉及到的知识点包括Storm的基本概念、Maven的使用以及Java编程。 首先,让我们了解一下Apache Storm的核心概念。...
- **低延迟和高性能**:Storm设计用于处理高频率、低延迟的数据流,可以处理每秒数百万条消息。 - **分布式和可扩展**:Storm可以在多个服务器上分布运行,通过添加更多的服务器轻松扩展处理能力。 - **容错**:...
Apache Storm是一款开源的分布式实时计算系统,它提供了简单而强大的API来定义数据流处理逻辑,同时保证了消息处理的容错性和高性能。Storm的设计目标是成为实时计算领域的Hadoop,支持多种编程语言,并且能够很好地...
10. **性能调优**:考虑到基站数据的高并发性和实时性要求,项目可能涉及到 Storm 的性能调优,包括优化拓扑结构、调整并行度以及优化数据处理逻辑等。 综上所述,【中国移动storm项目代码】是一个综合性的实时大...