- 浏览: 56666 次
- 性别:
- 来自: 北京
文章分类
最新评论
统计高并发UV可行的方案(类似WordCount的计算去重word总数):
bolt1通过fieldGrouping 进行多线程局部汇总,下一级blot2进行单线程保存session_id和count数到Map且进行遍历,可以得到:
Pv、UV、访问深度(按每个session_id 的浏览数)
2014-05-01 UV数(按日期统计)
既然去重,必须持久化。两种持久化数据:
1、内存(适用中小型数据)
数据结构Map
2、no-sql 分布式数据库,如Hbase(适用大型数据)
1、数据源
2、日期格式化处理类
3、多线程局部汇总深度数据
4、单线程汇总数据
5、topoly类
6、pom.xml文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test</groupId>
<artifactId>StormMavenProject</artifactId>
<packaging>jar</packaging>
<version>0.0.1-SNAPSHOT</version>
<name>StormMavenProject</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
<version>5.0.3</version>
</dependency>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>minlog</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>reflectasm</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-rename-hack</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ring-cors</groupId>
<artifactId>ring-cors</artifactId>
<version>0.1.5</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
</dependency>
</dependencies>
<build>
<finalName>StormMavenProject</finalName>
</build>
</project>
7、日期处理类
8、测试结果
bolt1通过fieldGrouping 进行多线程局部汇总,下一级blot2进行单线程保存session_id和count数到Map且进行遍历,可以得到:
Pv、UV、访问深度(按每个session_id 的浏览数)
2014-05-01 UV数(按日期统计)
既然去重,必须持久化。两种持久化数据:
1、内存(适用中小型数据)
数据结构Map
2、no-sql 分布式数据库,如Hbase(适用大型数据)
1、数据源
public class SourceSpout implements IRichSpout{ /** * 数据源Spout */ private static final long serialVersionUID = 1L; Queue<String> queue = new ConcurrentLinkedQueue<String>(); SpoutOutputCollector collector = null; String str = null; public void nextTuple() { if (queue.size() >= 0) { collector.emit(new Values(queue.poll())); } try { Thread.sleep(500) ; } catch (InterruptedException e) { e.printStackTrace(); } } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { try { this.collector = collector; Random random = new Random(); String[] hosts = { "www.taobao.com" }; String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7", "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" }; String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53", "2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" }; for (int i = 0; i < 20; i++) { queue.add(hosts[0]+"\t"+session_id[random.nextInt(5)]+"\t"+time[random.nextInt(8)]); } } catch (Exception e) { e.printStackTrace(); } } public void close() { // TODO Auto-generated method stub } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("log")); } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } public void ack(Object msgId) { // TODO Auto-generated method stub System.out.println("spout ack:"+msgId.toString()); } public void activate() { // TODO Auto-generated method stub } public void deactivate() { // TODO Auto-generated method stub } public void fail(Object msgId) { // TODO Auto-generated method stub System.out.println("spout fail:"+msgId.toString()); } }
2、日期格式化处理类
public class FmtLogBolt implements IBasicBolt{ /** * 格式化日期 */ private static final long serialVersionUID = 1L; public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("date","session_id")); } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } public void prepare(Map stormConf, TopologyContext context) { // TODO Auto-generated method stub } String eachLog = null; public void execute(Tuple input, BasicOutputCollector collector) { eachLog=input.getStringByField("log"); if (eachLog != null && eachLog.length() > 0 ) { collector.emit(new Values(DateFmt.getCountDate(eachLog.split("\t")[2],DateFmt.date_short),eachLog.split("\t")[1])) ;// 日期, session_id } } public void cleanup() { // TODO Auto-generated method stub } }
3、多线程局部汇总深度数据
public class DeepVisitBolt implements IBasicBolt{ /** * 多线程局部汇总深度数据 */ private static final long serialVersionUID = 1L; public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("date_session_id","count")); } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } public void prepare(Map stormConf, TopologyContext context) { // TODO Auto-generated method stub } Map<String, Integer> counts = new HashMap<String, Integer>(); public void execute(Tuple input, BasicOutputCollector collector) { String dateString =input.getStringByField("date"); String session_id = input.getStringByField("session_id"); Integer count = counts.get(dateString+"_"+session_id); if (count == null) { count = 0; } count ++ ; counts.put(dateString+"_"+session_id,count) ; collector.emit(new Values(dateString+"_"+session_id,count)) ; } public void cleanup() { // TODO Auto-generated method stub }; }
4、单线程汇总数据
public class UVSumBolt implements IBasicBolt{ /** * 单线程汇总数据 */ private static final long serialVersionUID = 1L; Map<String, Integer> counts = new HashMap<String, Integer>(); public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } public void prepare(Map stormConf, TopologyContext context) { cur_date = DateFmt.getCountDate("2014-01-07", DateFmt.date_short); } long beginTime = System.currentTimeMillis() ; long endTime = 0; String cur_date = null; public void execute(Tuple input, BasicOutputCollector collector) { try { endTime = System.currentTimeMillis() ; long PV = 0;// 总数 long UV = 0; // 个数,去重后 String dateSession_id = input.getString(0); Integer count = input.getInteger(1); //清空不是当天的数据 if (!dateSession_id.startsWith(cur_date) && DateFmt.parseDate(dateSession_id.split("_")[0]).after( DateFmt.parseDate(cur_date))) { cur_date = dateSession_id.split("_")[0]; counts.clear(); } counts.put(dateSession_id, count); if (endTime - beginTime >= 2000) {//两秒输出一次 // 获取word去重个数,遍历counts 的keySet,取count Iterator<String> i2 = counts.keySet().iterator(); while (i2.hasNext()) { String key = i2.next(); if (key != null) { if (key.startsWith(cur_date)) { UV++; PV += counts.get(key); } } } System.err.println("PV=" + PV + "; UV="+ UV); } } catch (Exception e) { throw new FailedException("SumBolt fail!"); } } public void cleanup() { // TODO Auto-generated method stub } }
5、topoly类
public class UVTopo { /** * topoly类 */ public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new SourceSpout(), 1); builder.setBolt("FmtLogBolt", new FmtLogBolt(), 4).shuffleGrouping("spout"); // Fields Grouping:按Field分组,比如按word来分组, 具有同样word的tuple会被分到相同的Bolts, 而不同的word则会被分配到不同的Bolts。 builder.setBolt("sumBolt", new DeepVisitBolt(),4).fieldsGrouping("FmtLogBolt", new Fields("date","session_id")); builder.setBolt("UvSum", new UVSumBolt(), 1).shuffleGrouping("sumBolt") ; Config conf = new Config() ; conf.setDebug(true); if (args.length > 0) { try { StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } catch (AuthorizationException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mytopology", conf, builder.createTopology()); } } }
6、pom.xml文件
引用
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test</groupId>
<artifactId>StormMavenProject</artifactId>
<packaging>jar</packaging>
<version>0.0.1-SNAPSHOT</version>
<name>StormMavenProject</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
<version>5.0.3</version>
</dependency>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>minlog</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>reflectasm</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-rename-hack</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ring-cors</groupId>
<artifactId>ring-cors</artifactId>
<version>0.1.5</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
</dependency>
</dependencies>
<build>
<finalName>StormMavenProject</finalName>
</build>
</project>
7、日期处理类
public class DateFmt { /* * 日期处理类 */ public static final String date_long = "yyyy-MM-dd HH:mm:ss" ; public static final String date_short = "yyyy-MM-dd" ; public static SimpleDateFormat sdf = new SimpleDateFormat(date_short); public static String getCountDate(String date,String patton) { SimpleDateFormat sdf = new SimpleDateFormat(patton); Calendar cal = Calendar.getInstance(); if (date != null) { try { cal.setTime(sdf.parse(date)) ; } catch (ParseException e) { e.printStackTrace(); } } return sdf.format(cal.getTime()); } public static Date parseDate(String dateStr) throws Exception { return sdf.parse(dateStr); } public static void main(String[] args) throws Exception{ // System.out.println(DateFmt.getCountDate("2014-03-01 12:13:14", DateFmt.date_short)); System.out.println(parseDate("2014-05-02").after(parseDate("2014-05-01"))); } }
8、测试结果
发表评论
-
ITridentSpout、FirstN(取Top N)实现、 流合并和join
2017-05-25 10:01 1032一、ITridentSpout 基于事务 static int ... -
Trident实战之计算网站PV
2017-05-24 13:24 6481、Trident实战之计算网站PV /** * ... -
Trident API和概念
2017-05-23 10:57 750一、Trident API——Spout ITride ... -
Trident入门
2017-05-22 13:44 516英文原址:https://github.com/nathanm ... -
分布式远程调用drpc实例
2017-05-22 10:53 416一、DRPC定义 分布式dRPC(distributed RP ... -
不透明分区事务IOpaquePartitionedTransactional实例
2017-05-22 10:54 6791、spout public class MyOpaq ... -
分区事务IPartitionedTransactionalSpout实例
2017-05-21 11:02 5821.分区事务spout public class My ... -
普通事务ITransactionalSpout实例之按天统计数据
2017-05-20 16:56 4881、普通事务Spout /** * 普通事务Spou ... -
普通事务ITransactionalSpout实例
2017-05-20 15:45 8201、普通事务Spout /** * 普通事务Spou ... -
Storm事务API
2017-05-19 16:00 614Spout ITransactionalSpout<T& ... -
Storm批处理事务原理详解
2017-05-19 15:54 2102事务:Storm容错机制通 ... -
集群统一启动和停止shell脚本开发
2017-05-17 09:56 4481、cd 2、ls -al 显示隐藏目录 3、rm -rf ... -
storm高并发PV统计,利用zookeeper锁输出汇总值
2017-05-14 14:42 896汇总型方案: 1、shuffleGrouping下,pv(单线 ... -
storm高并发PV统计
2017-04-16 17:54 687一、PV统计思考 方案需要考虑分析多线程下,注意线程安全问题。 ... -
Storm高并发运用WordSum
2017-04-16 14:21 10701、创建发射所有字符串统计总个数及去重个数处理类 pub ... -
storm分组策略介绍
2017-04-16 11:46 703一、storm数据来源 Spout的数据源: MQ:直接流数 ... -
Storm高并发介绍
2017-04-16 10:18 591并发度: worker:指的是component (spo ... -
Storm 字符统计Demo
2017-04-14 13:57 5341、数据源读取,字符发射spout类 /** * 字符 ... -
Storm 本地模式
2017-04-09 22:25 395本地模式,是在eclipse等编译器编写strom运行文件 ... -
Storm启动配置
2017-03-29 17:40 672一、安装Storm wget ...
相关推荐
在大数据处理领域,Apache Storm是一个实时计算系统,它能够持续处理数据流,实现低延迟、高吞吐量的数据分析。在这个“Storm API实现词频统计”的案例中,我们将深入探讨如何利用Java编程语言和Storm API来构建一个...
Storm流计算从入门到精通之技术篇(高并发策略、批处理事务、Trident精解、运维监控、企业场景) Storm入门教程 之Storm原理和概念详解,出自Storm流计算从入门到精通之技术篇,Storm入门视频教程用到技术:Storm集群...
【storm统计单词数的demo】是一个基于Apache Storm的入门级示例,旨在帮助初学者理解这个分布式实时计算系统的运作机制。Apache Storm是一个开源的流处理框架,它允许开发者处理无界数据流,实现高吞吐量、低延迟的...
实时数据统计 logstash + redis + storm + mysql 实时统计日志(浏览+交易等)_log_topology
Highcharts是一个JavaScript图表库,用于在Web上生成高质量的、交互式的图表。在“地区销售实时统计”项目中,Highcharts用于将后台处理后的销售数据以图表形式展示出来,例如折线图、柱状图等,使得用户可以直观地...
标题中的“791792259ARMA_Analysis_storm1uv_armamatlab_频率预测_”暗示了这是一个关于使用ARMA(自回归移动平均)模型进行频率预测的项目,其中“storm1uv”可能指的是某种特定的数据集或场景,比如模拟风暴期间的...
标题中的“Storm 本地运行 统计字母出现次数”指的是使用Apache Storm分布式流处理系统,在本地环境中进行测试,实现一个简单的应用,该应用的任务是统计输入数据中各个字母出现的频率。Apache Storm是一个实时计算...
主分支: ##包裹包战 mvn clean package -DskipTests=true -Dwarcp ./target/storm-ui.war $TOMCAT_HOME/webapps/包装罐 mvn clean package -DskipTests=truecp ./target/storm-ui-*.jar $STORM_HOME/external/...
Storm的事务性拓扑主要用于确保数据处理的精确一次性交付(exactly-once processing semantics),这在某些需要极高可靠性的实时计算场景中非常关键。 Storm集群的运行模式包括本地模式和分布式模式。本地模式适用...
Storm的设计目标是确保数据处理的高可靠性和低延迟。在Storm中,"ack机制"是实现数据可靠传输的关键特性,它确保了每个消息至少被处理一次(At-Least-Once Delivery)。下面我们将详细探讨Storm的ack机制以及它如何...
1. **Storm/workerbeats/<topology-id>/node-port**:存储 Worker 的运行状态和统计信息,包括 topology-id、Worker 上所有 Executor 的统计信息(例如发送和接收的消息数)、Worker 的启动时间和最后更新时间等。...
* fault-tolerant:Storm 可以自动恢复故障节点,保证系统的高可用性。 * scalable:Storm 可以根据需要水平扩展,提高系统的处理能力。 * flexible:Storm 支持多种数据源和处理方式,可以满足不同的业务需求。 ...
这样的话,storm才能去消费kafka中的实时的访问日志,然后去进行缓存热数据的统计 用得技术方案非常简单,从lua脚本直接创建一个kafka producer,发送数据到kafka ``` wget ...
在Storm中,你可以模拟多个并发用户,进行负载和压力测试,评估Web服务在高并发情况下的稳定性和性能。通过分析响应时间、错误率等指标,可以找出系统的瓶颈和优化方向。 6. 自动化测试: Storm支持脚本化的测试...
4. 鲁棒性:Storm设计目标之一是提供健壮、容易管理的集群,即使在高并发的情况下也能保证系统的稳定运行。 5. 容错性:Storm具备容错能力,当计算出现错误时,系统能够重新分配任务,保证计算的持续进行。 6. 编程...
项目3-非跳出UV-Storm topology开发二】 在本项目中,我们将探讨如何利用Apache Storm开发一个实时数据分析系统,特别是关注非跳出用户视图(UV)的计算。非跳出UV是指在网站上至少访问了两个不同页面的用户数,它...
本教程将详细介绍如何基于Storm搭建本地集群,并实现一个可运行的实时统计CallLog的示例。这个过程涉及到的知识点包括Storm的基本概念、Maven的使用以及Java编程。 首先,让我们了解一下Apache Storm的核心概念。...
而Apache Kafka则是一个高吞吐量的分布式发布订阅消息系统,常用于构建实时数据管道和流处理应用。将两者结合,可以构建出强大的实时数据处理平台。 **二、写入数据到Kafka** 在Storm-Kafka集成中,首先需要将数据...
- 除了基础的Apache Storm,课程中还提到了JStorm,它是阿里巴巴开源的一个高性能、高稳定性的Storm版本,针对大规模集群和高并发场景进行了优化。 6. **HBase存储与State运用**: - 在项目1中,我们使用HBase...