`

storm高并发UV统计

 
阅读更多
统计高并发UV可行的方案(类似WordCount的计算去重word总数):
bolt1通过fieldGrouping 进行多线程局部汇总,下一级blot2进行单线程保存session_id和count数到Map且进行遍历,可以得到:
Pv、UV、访问深度(按每个session_id 的浏览数)
2014-05-01     UV数(按日期统计)


既然去重,必须持久化。两种持久化数据:

1、内存(适用中小型数据)
数据结构Map

2、no-sql 分布式数据库,如Hbase(适用大型数据)





1、数据源
public class SourceSpout implements IRichSpout{

	/**
	 * 数据源Spout
	 */
	private static final long serialVersionUID = 1L;
	
	Queue<String> queue = new ConcurrentLinkedQueue<String>();
	
	SpoutOutputCollector collector = null;
	
	String str = null;

	public void nextTuple() {
		if (queue.size() >= 0) {
			collector.emit(new Values(queue.poll()));
		}
		try {
			Thread.sleep(500) ;
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	
	}
	
	public void open(Map conf, TopologyContext context,
			SpoutOutputCollector collector) {
		try {
			this.collector = collector;
			
			Random random = new Random();
			String[] hosts = { "www.taobao.com" };
			String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7",
					"CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };
			String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53", 
					"2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" };
			
			for (int i = 0; i < 20; i++) {
				queue.add(hosts[0]+"\t"+session_id[random.nextInt(5)]+"\t"+time[random.nextInt(8)]);
			}
			
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	public void close() {
		// TODO Auto-generated method stub
	}
	
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// TODO Auto-generated method stub
		declarer.declare(new Fields("log"));
	}

	
	public Map<String, Object> getComponentConfiguration() {
		// TODO Auto-generated method stub
		return null;
	}
	
	public void ack(Object msgId) {
		// TODO Auto-generated method stub
		System.out.println("spout ack:"+msgId.toString());
	}

	
	public void activate() {
		// TODO Auto-generated method stub
		
	}



	
	public void deactivate() {
		// TODO Auto-generated method stub
		
	}

	
	public void fail(Object msgId) {
		// TODO Auto-generated method stub
		System.out.println("spout fail:"+msgId.toString());
	}

}


2、日期格式化处理类

public class FmtLogBolt implements IBasicBolt{

	/**
	 * 格式化日期
	 */
	private static final long serialVersionUID = 1L;

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("date","session_id"));
		
	}

	public Map<String, Object> getComponentConfiguration() {
		// TODO Auto-generated method stub
		return null;
	}

	public void prepare(Map stormConf, TopologyContext context) {
		// TODO Auto-generated method stub
		
	}

	String eachLog = null;
	public void execute(Tuple input, BasicOutputCollector collector) {
		eachLog=input.getStringByField("log");
		if (eachLog != null && eachLog.length() > 0 ) {
			collector.emit(new Values(DateFmt.getCountDate(eachLog.split("\t")[2],DateFmt.date_short),eachLog.split("\t")[1])) ;// 日期, session_id
		}
	}

	public void cleanup() {
		// TODO Auto-generated method stub
		
	}

}


3、多线程局部汇总深度数据


public class DeepVisitBolt implements IBasicBolt{

	/**
	 * 多线程局部汇总深度数据
	 */
	private static final long serialVersionUID = 1L;

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("date_session_id","count"));
		
	}

	public Map<String, Object> getComponentConfiguration() {
		// TODO Auto-generated method stub
		return null;
	}

	public void prepare(Map stormConf, TopologyContext context) {
		// TODO Auto-generated method stub
		
	}

	Map<String, Integer> counts = new HashMap<String, Integer>();
	public void execute(Tuple input, BasicOutputCollector collector) {
		String dateString =input.getStringByField("date");
		String session_id = input.getStringByField("session_id");
		Integer count = counts.get(dateString+"_"+session_id);
		if (count == null) {
			count = 0;
		}
		count ++ ;
		
		counts.put(dateString+"_"+session_id,count) ;
		collector.emit(new Values(dateString+"_"+session_id,count)) ;
	}

	public void cleanup() {
		// TODO Auto-generated method stub
		
	};

}


4、单线程汇总数据
public class UVSumBolt implements IBasicBolt{

	/**
	 * 单线程汇总数据
	 */
	private static final long serialVersionUID = 1L;
	Map<String, Integer> counts = new HashMap<String, Integer>();

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// TODO Auto-generated method stub
	}

	public Map<String, Object> getComponentConfiguration() {
		// TODO Auto-generated method stub
		return null;
	}

	public void prepare(Map stormConf, TopologyContext context) {
		cur_date = DateFmt.getCountDate("2014-01-07", DateFmt.date_short);
		
	}
	
	long beginTime = System.currentTimeMillis() ;
	long endTime = 0;
	String cur_date = null;
	public void execute(Tuple input, BasicOutputCollector collector) {
		try {
			endTime = System.currentTimeMillis() ;
			long PV = 0;// 总数
			long UV = 0; // 个数,去重后

			String dateSession_id = input.getString(0);
			Integer count = input.getInteger(1);

			//清空不是当天的数据
			if (!dateSession_id.startsWith(cur_date)
					&& DateFmt.parseDate(dateSession_id.split("_")[0]).after(
							DateFmt.parseDate(cur_date))) {
				cur_date = dateSession_id.split("_")[0];
				counts.clear();
			}

			counts.put(dateSession_id, count);

			if (endTime - beginTime >= 2000) {//两秒输出一次
				// 获取word去重个数,遍历counts 的keySet,取count
				Iterator<String> i2 = counts.keySet().iterator();
				while (i2.hasNext()) {
					String key = i2.next();
					if (key != null) {
						if (key.startsWith(cur_date)) {
							UV++;
							PV += counts.get(key);
						}
					}
				}
				System.err.println("PV=" + PV + ";  UV="+ UV);
			}

		} catch (Exception e) {
			throw new FailedException("SumBolt fail!");
		}
		
	}

	public void cleanup() {
		// TODO Auto-generated method stub
		
	}

}


5、topoly类

public class UVTopo {

	/**
	 * topoly类
	 */
	public static void main(String[] args) {

		TopologyBuilder builder = new TopologyBuilder();

		builder.setSpout("spout", new SourceSpout(), 1);
		builder.setBolt("FmtLogBolt", new FmtLogBolt(), 4).shuffleGrouping("spout");
		// Fields Grouping:按Field分组,比如按word来分组, 具有同样word的tuple会被分到相同的Bolts, 而不同的word则会被分配到不同的Bolts。 
		builder.setBolt("sumBolt", new DeepVisitBolt(),4).fieldsGrouping("FmtLogBolt", new Fields("date","session_id"));
		builder.setBolt("UvSum", new UVSumBolt(), 1).shuffleGrouping("sumBolt") ;
		
		Config conf = new Config() ;
		conf.setDebug(true);

		if (args.length > 0) {
			try {
				StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
			} catch (AlreadyAliveException e) {
				e.printStackTrace();
			} catch (InvalidTopologyException e) {
				e.printStackTrace();
			} catch (AuthorizationException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}else {
			LocalCluster localCluster = new LocalCluster();
			localCluster.submitTopology("mytopology", conf, builder.createTopology());
		}

	}

}


6、pom.xml文件
引用

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.test</groupId>
  <artifactId>StormMavenProject</artifactId>
  <packaging>jar</packaging>
  <version>0.0.1-SNAPSHOT</version>
  <name>StormMavenProject</name>
  <url>http://maven.apache.org</url>
  <dependencies>
   
   <dependency>
    <groupId>org.ow2.asm</groupId>
    <artifactId>asm</artifactId>
    <version>5.0.3</version>
   </dependency>
<dependency>
    <groupId>org.clojure</groupId>
    <artifactId>clojure</artifactId>
    <version>1.7.0</version>
</dependency>
<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.3.2</version>
</dependency>
<dependency>
    <groupId>com.esotericsoftware</groupId>
    <artifactId>kryo</artifactId>
    <version>3.0.3</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-api</artifactId>
    <version>2.8</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-core</artifactId>
    <version>2.8</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>log4j-over-slf4j</artifactId>
    <version>1.6.6</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-slf4j-impl</artifactId>
    <version>2.8</version>
</dependency>
<dependency>
    <groupId>com.esotericsoftware</groupId>
    <artifactId>minlog</artifactId>
    <version>1.3.0</version>
</dependency>
<dependency>
    <groupId>org.objenesis</groupId>
    <artifactId>objenesis</artifactId>
    <version>2.1</version>
</dependency>
<dependency>
    <groupId>com.esotericsoftware</groupId>
    <artifactId>reflectasm</artifactId>
    <version>1.10.1</version>
</dependency>

<dependency>
    <groupId>javax.servlet</groupId>
    <artifactId>servlet-api</artifactId>
    <version>2.5</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.21</version>
</dependency>
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-rename-hack</artifactId>
    <version>1.1.0</version>
</dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>

<dependency>
    <groupId>ring-cors</groupId>
    <artifactId>ring-cors</artifactId>
    <version>0.1.5</version>
</dependency>


<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.10</version>
</dependency>



  </dependencies>
  <build>
    <finalName>StormMavenProject</finalName>
  </build>
</project>


7、日期处理类
public class DateFmt {
	/*
	 * 日期处理类
	 */
	public static final String date_long = "yyyy-MM-dd HH:mm:ss" ;
	public static final String date_short = "yyyy-MM-dd" ;
	
	public static SimpleDateFormat sdf = new SimpleDateFormat(date_short);
	
	public static String getCountDate(String date,String patton)
	{
		SimpleDateFormat sdf = new SimpleDateFormat(patton);
		Calendar cal = Calendar.getInstance(); 
		if (date != null) {
			try {
				cal.setTime(sdf.parse(date)) ;
			} catch (ParseException e) {
				e.printStackTrace();
			}
		}
		return sdf.format(cal.getTime());
	}
	
	public static Date parseDate(String dateStr) throws Exception
	{
		return sdf.parse(dateStr);
	}
	
	public static void main(String[] args) throws Exception{

//		System.out.println(DateFmt.getCountDate("2014-03-01 12:13:14", DateFmt.date_short));
		System.out.println(parseDate("2014-05-02").after(parseDate("2014-05-01")));
	}

}


8、测试结果




  • 大小: 191.2 KB
分享到:
评论

相关推荐

    Storm API实现词频统计

    在大数据处理领域,Apache Storm是一个实时计算系统,它能够持续处理数据流,实现低延迟、高吞吐量的数据分析。在这个“Storm API实现词频统计”的案例中,我们将深入探讨如何利用Java编程语言和Storm API来构建一个...

    Storm入门教程 之Storm原理和概念详解

    Storm流计算从入门到精通之技术篇(高并发策略、批处理事务、Trident精解、运维监控、企业场景) Storm入门教程 之Storm原理和概念详解,出自Storm流计算从入门到精通之技术篇,Storm入门视频教程用到技术:Storm集群...

    storm统计单词数的demo

    【storm统计单词数的demo】是一个基于Apache Storm的入门级示例,旨在帮助初学者理解这个分布式实时计算系统的运作机制。Apache Storm是一个开源的流处理框架,它允许开发者处理无界数据流,实现高吞吐量、低延迟的...

    实时数据统计 logstash + redis + storm + mysql 实时统计日志(浏览+交易

    实时数据统计 logstash + redis + storm + mysql 实时统计日志(浏览+交易等)_log_topology

    地区销售实时统计 kakfa+storm+hbase+servlet+highcharts

    Highcharts是一个JavaScript图表库,用于在Web上生成高质量的、交互式的图表。在“地区销售实时统计”项目中,Highcharts用于将后台处理后的销售数据以图表形式展示出来,例如折线图、柱状图等,使得用户可以直观地...

    791792259ARMA_Analysis_storm1uv_armamatlab_频率预测_

    标题中的“791792259ARMA_Analysis_storm1uv_armamatlab_频率预测_”暗示了这是一个关于使用ARMA(自回归移动平均)模型进行频率预测的项目,其中“storm1uv”可能指的是某种特定的数据集或场景,比如模拟风暴期间的...

    Storm 本地运行 统计字母出现次数

    标题中的“Storm 本地运行 统计字母出现次数”指的是使用Apache Storm分布式流处理系统,在本地环境中进行测试,实现一个简单的应用,该应用的任务是统计输入数据中各个字母出现的频率。Apache Storm是一个实时计算...

    storm-ui:Apache Storm 的用户界面

    主分支: ##包裹包战 mvn clean package -DskipTests=true -Dwarcp ./target/storm-ui.war $TOMCAT_HOME/webapps/包装罐 mvn clean package -DskipTests=truecp ./target/storm-ui-*.jar $STORM_HOME/external/...

    storm入门.pdf

    Storm的事务性拓扑主要用于确保数据处理的精确一次性交付(exactly-once processing semantics),这在某些需要极高可靠性的实时计算场景中非常关键。 Storm集群的运行模式包括本地模式和分布式模式。本地模式适用...

    storm利用ack保证数据的可靠性源码

    Storm的设计目标是确保数据处理的高可靠性和低延迟。在Storm中,"ack机制"是实现数据可靠传输的关键特性,它确保了每个消息至少被处理一次(At-Least-Once Delivery)。下面我们将详细探讨Storm的ack机制以及它如何...

    storm原理分析

    1. **Storm/workerbeats/&lt;topology-id&gt;/node-port**:存储 Worker 的运行状态和统计信息,包括 topology-id、Worker 上所有 Executor 的统计信息(例如发送和接收的消息数)、Worker 的启动时间和最后更新时间等。...

    从零开始学Storm.pdf

    4. 鲁棒性:Storm设计目标之一是提供健壮、容易管理的集群,即使在高并发的情况下也能保证系统的稳定运行。 5. 容错性:Storm具备容错能力,当计算出现错误时,系统能够重新分配任务,保证计算的持续进行。 6. 编程...

    Storm入门到精通

    * fault-tolerant:Storm 可以自动恢复故障节点,保证系统的高可用性。 * scalable:Storm 可以根据需要水平扩展,提高系统的处理能力。 * flexible:Storm 支持多种数据源和处理方式,可以满足不同的业务需求。 ...

    基于storm实时热点统计的分布式并行缓存预热

    这样的话,storm才能去消费kafka中的实时的访问日志,然后去进行缓存热数据的统计 用得技术方案非常简单,从lua脚本直接创建一个kafka producer,发送数据到kafka ``` wget ...

    webservice测试工具storm

    在Storm中,你可以模拟多个并发用户,进行负载和压力测试,评估Web服务在高并发情况下的稳定性和性能。通过分析响应时间、错误率等指标,可以找出系统的瓶颈和优化方向。 6. 自动化测试: Storm支持脚本化的测试...

    Storm流计算项目:1号店电商实时数据分析系统-33.项目3-非跳出UV-Storm topology开发二.pptx

    项目3-非跳出UV-Storm topology开发二】 在本项目中,我们将探讨如何利用Apache Storm开发一个实时数据分析系统,特别是关注非跳出用户视图(UV)的计算。非跳出UV是指在网站上至少访问了两个不同页面的用户数,它...

    基于Storm本地集群搭建实时统计CallLog实现可运行

    本教程将详细介绍如何基于Storm搭建本地集群,并实现一个可运行的实时统计CallLog的示例。这个过程涉及到的知识点包括Storm的基本概念、Maven的使用以及Java编程。 首先,让我们了解一下Apache Storm的核心概念。...

    StormStorm集成Kafka 从Kafka中读取数据

    而Apache Kafka则是一个高吞吐量的分布式发布订阅消息系统,常用于构建实时数据管道和流处理应用。将两者结合,可以构建出强大的实时数据处理平台。 **二、写入数据到Kafka** 在Storm-Kafka集成中,首先需要将数据...

    Storm流计算项目:1号店电商实时数据分析系统-39.项目3-非跳出UV-升级图表增加柱图二.pptx

    - 除了基础的Apache Storm,课程中还提到了JStorm,它是阿里巴巴开源的一个高性能、高稳定性的Storm版本,针对大规模集群和高并发场景进行了优化。 6. **HBase存储与State运用**: - 在项目1中,我们使用HBase...

Global site tag (gtag.js) - Google Analytics