`

storm高并发PV统计

 
阅读更多
一、PV统计思考
方案需要考虑分析多线程下,注意线程安全问题。
线程安全:多线程处理的结果和单线程一致


如下是否可行?
不可行方案:
  定义static long pv, Synchronized 控制累计操作。Synchronized 和 Lock在单个JVM下有效,但在多JVM下无效。


可行方案两个方案:
1、shuffleGrouping下,pv * Executer并发数
2、bolt1进行多并发局部汇总,bolt2单线程进行全局汇总

二、实现
注意:多线程下每一个bolt中的execute方法都会执行多次,类似一个while循环。


1、bolt1进行多并发(局部)汇总处理类
public class PVBolt1 implements IRichBolt{

	/**
	 * bolt1进行多并发(局部)汇总
	 */
	OutputCollector collector = null;
	private static final long serialVersionUID = 1L;
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		this.collector = collector;
	}
	
	
	String logString;
	String session_id;
	long pv = 0;
	public void execute(Tuple input) {
		logString = input.getString(0);
		session_id = logString.split("\t")[1];
		if(session_id !=null){
			pv ++;
		}
		
		 collector.emit(new Values(Thread.currentThread().getId(),pv));
		 System.err.println("threadId = "+ Thread.currentThread().getId()+"; pv="+pv);
	}

	public void cleanup() {
		// TODO Auto-generated method stub
		
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("threadId", "count"));
		
	}

	public Map<String, Object> getComponentConfiguration() {
		// TODO Auto-generated method stub
		return null;
	}

}


2、bolt2单线程进行全局汇总处理类
public class PVBolt2 implements IRichBolt{

	/**
	 * bolt2单线程进行全局汇总
	 */
	private static final long serialVersionUID = 1L;
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		// TODO Auto-generated method stub
		
	}
	
	Map<Long,Long>counts = new HashMap<Long,Long>();
	
	public void execute(Tuple input) {
		Long thread_id = input.getLong(0);
		Long pv = input.getLong(1);
		counts.put(thread_id,pv);
		System.err.println("  threadId="+thread_id+"-------------pv="+pv);
		long word_sum = 0;
		//获取总数,遍历counts 的values,进行sum
		Iterator<Long> i = counts.values().iterator() ;
		while(i.hasNext())
		{
			word_sum += i.next();
		}
		System.err.println("PVBolt2-------------pv="+word_sum+"\r");
	}

	public void cleanup() {
		// TODO Auto-generated method stub
		
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// TODO Auto-generated method stub
		
	}

	public Map<String, Object> getComponentConfiguration() {
		// TODO Auto-generated method stub
		return null;
	}

}


3、topology运行main类
public class Main {

	public static void main(String[] args) {

		TopologyBuilder builder = new TopologyBuilder();

		builder.setSpout("spout", new MySpout(), 1);
		
		builder.setBolt("bolt1", new PVBolt1(),4).shuffleGrouping("spout");
		builder.setBolt("bolt2", new PVBolt2(),1).shuffleGrouping("bolt1");
		
		Map conf = new HashMap();
		conf.put(Config.TOPOLOGY_WORKERS, 4);

		if (args.length > 0) {
			try {
					StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
			} catch (AlreadyAliveException e) {
				e.printStackTrace();
			} catch (InvalidTopologyException e) {
				e.printStackTrace();
			}catch (AuthorizationException e) {
				e.printStackTrace();
			}
		}else {
			LocalCluster localCluster = new LocalCluster();
			localCluster.submitTopology("mytopology", conf, builder.createTopology());
		}
		
	}

}


-------------------------------其它辅助类---------------------------

4、数据读取spout处理类

public class MySpout implements IRichSpout{

	/**
	 * 数据读取spout处理类
	 */
	private static final long serialVersionUID = 1L;

	FileInputStream fis;
	InputStreamReader isr;
	BufferedReader br;			

	SpoutOutputCollector collector = null;
	
	
	String str = null;

	
	public void nextTuple() {
		try {
			while ((str = this.br.readLine()) != null) {
				// 过滤动作
				
				collector.emit(new Values(str));
				
//				Thread.sleep(3000);
				//to do 
			}
		} catch (Exception e) {
			// TODO: handle exception
		}
		
		
	}

	
	public void open(Map conf, TopologyContext context,
			SpoutOutputCollector collector) {
		try {
			this.collector = collector;
			this.fis = new FileInputStream("track.log");
			this.isr = new InputStreamReader(fis, "UTF-8");
			this.br = new BufferedReader(isr);
		} catch (Exception e) {
			e.printStackTrace();
		}
		
		// 打开文件
		
	}

	
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// 发射数据格式,与bolt接收数据一致
		declarer.declare(new Fields("log"));
	}

	
	public Map<String, Object> getComponentConfiguration() {
		// 与ope方法中的map对应
		return null;
	}
	

	public void ack(Object msgId) {
		// TODO Auto-generated method stub
		
	}

	
	public void activate() {
		// TODO Auto-generated method stub
		
	}

	
	public void close() {
		// TODO Auto-generated method stub
		
	}

	
	public void deactivate() {
		// TODO Auto-generated method stub
		
	}

	
	public void fail(Object msgId) {
		// TODO Auto-generated method stub
		
	}

}


5、pom文件引用前几篇文章

6、处理结果
引用
  threadId=156-------------pv=44
PVBolt2-------------pv=44

  threadId=156-------------pv=45
PVBolt2-------------pv=45

  threadId=156-------------pv=46
PVBolt2-------------pv=46

  threadId=156-------------pv=47
PVBolt2-------------pv=47

  threadId=152-------------pv=1
PVBolt2-------------pv=48

  threadId=215-------------pv=1
PVBolt2-------------pv=49

9234 [Thread-62-bolt1-executor[5 5]]
threadId = 227; pv=1
  threadId=227-------------pv=1
PVBolt2-------------pv=50
分享到:
评论

相关推荐

    Storm API实现词频统计

    在大数据处理领域,Apache Storm是一个实时计算系统,它能够持续处理数据流,实现低延迟、高吞吐量的数据分析。在这个“Storm API实现词频统计”的案例中,我们将深入探讨如何利用Java编程语言和Storm API来构建一个...

    Storm Trident实战之计算网站PV.rar

    本实战案例将重点介绍如何使用Storm Trident来计算网站的页面浏览量(PV,Page View)。 页面浏览量是衡量一个网站受欢迎程度的重要指标,通常通过记录用户对各个页面的访问次数来计算。在传统的批处理场景下,这...

    Storm入门教程 之Storm原理和概念详解

    Storm流计算从入门到精通之技术篇(高并发策略、批处理事务、Trident精解、运维监控、企业场景) Storm入门教程 之Storm原理和概念详解,出自Storm流计算从入门到精通之技术篇,Storm入门视频教程用到技术:Storm集群...

    storm统计单词数的demo

    【storm统计单词数的demo】是一个基于Apache Storm的入门级示例,旨在帮助初学者理解这个分布式实时计算系统的运作机制。Apache Storm是一个开源的流处理框架,它允许开发者处理无界数据流,实现高吞吐量、低延迟的...

    实时数据统计 logstash + redis + storm + mysql 实时统计日志(浏览+交易

    实时数据统计 logstash + redis + storm + mysql 实时统计日志(浏览+交易等)_log_topology

    地区销售实时统计 kakfa+storm+hbase+servlet+highcharts

    Highcharts是一个JavaScript图表库,用于在Web上生成高质量的、交互式的图表。在“地区销售实时统计”项目中,Highcharts用于将后台处理后的销售数据以图表形式展示出来,例如折线图、柱状图等,使得用户可以直观地...

    Storm 本地运行 统计字母出现次数

    标题中的“Storm 本地运行 统计字母出现次数”指的是使用Apache Storm分布式流处理系统,在本地环境中进行测试,实现一个简单的应用,该应用的任务是统计输入数据中各个字母出现的频率。Apache Storm是一个实时计算...

    storm-ui:Apache Storm 的用户界面

    主分支: ##包裹包战 mvn clean package -DskipTests=true -Dwarcp ./target/storm-ui.war $TOMCAT_HOME/webapps/包装罐 mvn clean package -DskipTests=truecp ./target/storm-ui-*.jar $STORM_HOME/external/...

    storm入门.pdf

    Storm的事务性拓扑主要用于确保数据处理的精确一次性交付(exactly-once processing semantics),这在某些需要极高可靠性的实时计算场景中非常关键。 Storm集群的运行模式包括本地模式和分布式模式。本地模式适用...

    storm利用ack保证数据的可靠性源码

    Storm的设计目标是确保数据处理的高可靠性和低延迟。在Storm中,"ack机制"是实现数据可靠传输的关键特性,它确保了每个消息至少被处理一次(At-Least-Once Delivery)。下面我们将详细探讨Storm的ack机制以及它如何...

    storm原理分析

    1. **Storm/workerbeats/&lt;topology-id&gt;/node-port**:存储 Worker 的运行状态和统计信息,包括 topology-id、Worker 上所有 Executor 的统计信息(例如发送和接收的消息数)、Worker 的启动时间和最后更新时间等。...

    Storm入门到精通

    * fault-tolerant:Storm 可以自动恢复故障节点,保证系统的高可用性。 * scalable:Storm 可以根据需要水平扩展,提高系统的处理能力。 * flexible:Storm 支持多种数据源和处理方式,可以满足不同的业务需求。 ...

    基于storm实时热点统计的分布式并行缓存预热

    这样的话,storm才能去消费kafka中的实时的访问日志,然后去进行缓存热数据的统计 用得技术方案非常简单,从lua脚本直接创建一个kafka producer,发送数据到kafka ``` wget ...

    webservice测试工具storm

    在Storm中,你可以模拟多个并发用户,进行负载和压力测试,评估Web服务在高并发情况下的稳定性和性能。通过分析响应时间、错误率等指标,可以找出系统的瓶颈和优化方向。 6. 自动化测试: Storm支持脚本化的测试...

    从零开始学Storm.pdf

    4. 鲁棒性:Storm设计目标之一是提供健壮、容易管理的集群,即使在高并发的情况下也能保证系统的稳定运行。 5. 容错性:Storm具备容错能力,当计算出现错误时,系统能够重新分配任务,保证计算的持续进行。 6. 编程...

    基于Storm本地集群搭建实时统计CallLog实现可运行

    本教程将详细介绍如何基于Storm搭建本地集群,并实现一个可运行的实时统计CallLog的示例。这个过程涉及到的知识点包括Storm的基本概念、Maven的使用以及Java编程。 首先,让我们了解一下Apache Storm的核心概念。...

    细细品味Storm_Storm简介及安装

    - **低延迟和高性能**:Storm设计用于处理高频率、低延迟的数据流,可以处理每秒数百万条消息。 - **分布式和可扩展**:Storm可以在多个服务器上分布运行,通过添加更多的服务器轻松扩展处理能力。 - **容错**:...

    Storm 源码分析

    Apache Storm是一款开源的分布式实时计算系统,它提供了简单而强大的API来定义数据流处理逻辑,同时保证了消息处理的容错性和高性能。Storm的设计目标是成为实时计算领域的Hadoop,支持多种编程语言,并且能够很好地...

    中国移动storm项目代码

    10. **性能调优**:考虑到基站数据的高并发性和实时性要求,项目可能涉及到 Storm 的性能调优,包括优化拓扑结构、调整并行度以及优化数据处理逻辑等。 综上所述,【中国移动storm项目代码】是一个综合性的实时大...

Global site tag (gtag.js) - Google Analytics