- 浏览: 56653 次
- 性别:
- 来自: 北京
文章分类
最新评论
1、创建发射所有字符串统计总个数及去重个数处理类
2、topology增加字符统计
-----------------------其它类--------------------------------------
3.字符发射spout类
4.字符切割处理bolt类
5.pom文件
public class SumBolt implements IBasicBolt { /** * 对发射所有字符串统计总个数及去重个数 */ private static final long serialVersionUID = 1L; Map<String, Integer> counts = new HashMap<String, Integer>(); public void execute(Tuple input, BasicOutputCollector collector) { try { // 变量放在方法外面会进行累加,所有数据放在counts这个Map当中。 long word_sum = 0;// 总数 long word_count = 0;// 去重后个数 String word = input.getString(0); Integer count = input.getInteger(1); counts.put(word, count); // 获取总数,遍历counts的values,进行sum Iterator<Integer> i = counts.values().iterator(); while (i.hasNext()) { word_sum += i.next(); } Iterator<String> i2 = counts.keySet().iterator(); while (i2.hasNext()) { String oneWord = i2.next(); if (oneWord != null) { word_count ++; } } System.err.println("a="+counts.get("a")+" b="+counts.get("b")+" c="+counts.get("c")+" d="+counts.get("d")); System.err.println( Thread.currentThread().getName() + "word_sum=" + word_sum + ",-------word_count=" + word_count); } catch (Exception e) { throw new FailedException("split error!"); } } public void cleanup() { // TODO Auto-generated method stub } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } public void prepare(Map stormConf, TopologyContext context) { // TODO Auto-generated method stub } }
2、topology增加字符统计
public class WordCountTopology { /** * 提交topology的main函数及字符统计处理类 */ public static class SplitSentence extends ShellBolt implements IRichBolt { private static final long serialVersionUID = 1L; /** * 字符统计处理bolt类 */ public static class WordCount extends BaseBasicBolt { private static final long serialVersionUID = 1L; // 多线程下不能统计Map的key个数,和value表示的字符总数。因为多线下表示只是一部分 Map<String, Integer> counts = new HashMap<String, Integer>(); public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) { count = 0; } count++; counts.put(word, count); //System.err.println(Thread.currentThread().getName() + "---word:" + word + " count:" + count); collector.emit(new Values(word, count)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } //提交topology的main函数 public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); //读取数据用1个线程,防止数据重复读取 builder.setSpout("spout", new RandomSentenceSpout(), 1); //从spout源读取数据,设置2个线程处理字符分割 builder.setBolt("split", new MysplitBolt(" "), 2).shuffleGrouping("spout"); /** * 上个bolt接收数据,设置3个线程处理数据统计。 * Fields Grouping:按Field分组,相同的tuple会分发给同一个线程(Executer或task)处理。所以不担心多线程问题 * 比如按singleWord来分组, 具有同样singleWord的tuple会被分到相同的Bolts, 而不同的word则会被分配到不同的Bolts。 */ //builder.setBolt("count", new WordCount(), 3).fieldsGrouping("split", new Fields("singleWord")); //平均分配tuple数据至每一个线程处理,统计会有线程安全问题 builder.setBolt("count", new WordCount(), 3).shuffleGrouping("split"); builder.setBolt("sum", new SumBolt(),1).shuffleGrouping("count"); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } } }
-----------------------其它类--------------------------------------
3.字符发射spout类
/** * 字符发射spout类 */ public class RandomSentenceSpout extends BaseRichSpout { private static final long serialVersionUID = 1L; SpoutOutputCollector _collector; Random _rand; public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _rand = new Random(); } public void nextTuple() { //a:2 , b:2 , c:1 , d:3 String[] sentences = new String[] { sentence("a b c d "), sentence("b d"), sentence("a d") }; for (String sentence : sentences) {// 发射三行数据致bolt处理 _collector.emit(new Values(sentence)); } Utils.sleep(1000 * 1000); } protected String sentence(String input) { return input; } @Override public void ack(Object id) { } @Override public void fail(Object id) { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("firstSpout")); } // Add unique identifier to each tuple, which is helpful for debugging public static class TimeStamped extends RandomSentenceSpout { private final String prefix; public TimeStamped() { this(""); } public TimeStamped(String prefix) { this.prefix = prefix; } protected String sentence(String input) { return prefix + currentDate() + " " + input; } private String currentDate() { return new SimpleDateFormat("yyyy.MM.dd_HH:mm:ss.SSSSSSSSS").format(new Date()); } } }
4.字符切割处理bolt类
/** * 字符切割处理bolt类 */ public class MysplitBolt implements IBasicBolt { private static final long serialVersionUID = 1L; String patton; public MysplitBolt(String patton) { this.patton = patton; } /** * 接收处理每一行数据 */ public void execute(Tuple input, BasicOutputCollector collector) { try { String sen = input.getStringByField("firstSpout"); if (sen != null) { for (String word : sen.split(patton)) {// 发射多个字符数据,让下一级bolt处理 collector.emit(new Values(word)); } } } catch (Exception e) { throw new FailedException("split error!"); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("singleWord")); } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } public void prepare(Map stormConf, TopologyContext context) { // TODO Auto-generated method stub } public void cleanup() { // TODO Auto-generated method stub } }
5.pom文件
引用
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test</groupId>
<artifactId>StormMavenProject</artifactId>
<packaging>jar</packaging>
<version>0.0.1-SNAPSHOT</version>
<name>StormMavenProject</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
<version>5.0.3</version>
</dependency>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>minlog</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>reflectasm</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-rename-hack</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ring-cors</groupId>
<artifactId>ring-cors</artifactId>
<version>0.1.5</version>
</dependency>
</dependencies>
<build>
<finalName>StormMavenProject</finalName>
</build>
</project>
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test</groupId>
<artifactId>StormMavenProject</artifactId>
<packaging>jar</packaging>
<version>0.0.1-SNAPSHOT</version>
<name>StormMavenProject</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
<version>5.0.3</version>
</dependency>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>minlog</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>reflectasm</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-rename-hack</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ring-cors</groupId>
<artifactId>ring-cors</artifactId>
<version>0.1.5</version>
</dependency>
</dependencies>
<build>
<finalName>StormMavenProject</finalName>
</build>
</project>
发表评论
-
ITridentSpout、FirstN(取Top N)实现、 流合并和join
2017-05-25 10:01 1032一、ITridentSpout 基于事务 static int ... -
Trident实战之计算网站PV
2017-05-24 13:24 6481、Trident实战之计算网站PV /** * ... -
Trident API和概念
2017-05-23 10:57 750一、Trident API——Spout ITride ... -
Trident入门
2017-05-22 13:44 516英文原址:https://github.com/nathanm ... -
分布式远程调用drpc实例
2017-05-22 10:53 416一、DRPC定义 分布式dRPC(distributed RP ... -
不透明分区事务IOpaquePartitionedTransactional实例
2017-05-22 10:54 6791、spout public class MyOpaq ... -
分区事务IPartitionedTransactionalSpout实例
2017-05-21 11:02 5821.分区事务spout public class My ... -
普通事务ITransactionalSpout实例之按天统计数据
2017-05-20 16:56 4871、普通事务Spout /** * 普通事务Spou ... -
普通事务ITransactionalSpout实例
2017-05-20 15:45 8201、普通事务Spout /** * 普通事务Spou ... -
Storm事务API
2017-05-19 16:00 613Spout ITransactionalSpout<T& ... -
Storm批处理事务原理详解
2017-05-19 15:54 2102事务:Storm容错机制通 ... -
集群统一启动和停止shell脚本开发
2017-05-17 09:56 4481、cd 2、ls -al 显示隐藏目录 3、rm -rf ... -
storm高并发UV统计
2017-05-14 22:05 1131统计高并发UV可行的方案(类似WordCount的计算去重wo ... -
storm高并发PV统计,利用zookeeper锁输出汇总值
2017-05-14 14:42 896汇总型方案: 1、shuffleGrouping下,pv(单线 ... -
storm高并发PV统计
2017-04-16 17:54 686一、PV统计思考 方案需要考虑分析多线程下,注意线程安全问题。 ... -
storm分组策略介绍
2017-04-16 11:46 702一、storm数据来源 Spout的数据源: MQ:直接流数 ... -
Storm高并发介绍
2017-04-16 10:18 590并发度: worker:指的是component (spo ... -
Storm 字符统计Demo
2017-04-14 13:57 5331、数据源读取,字符发射spout类 /** * 字符 ... -
Storm 本地模式
2017-04-09 22:25 395本地模式,是在eclipse等编译器编写strom运行文件 ... -
Storm启动配置
2017-03-29 17:40 672一、安装Storm wget ...
相关推荐
Storm流计算从入门到精通之技术篇(高并发策略、批处理事务、Trident精解、运维监控、企业场景) Storm入门教程 之Storm原理和概念详解,出自Storm流计算从入门到精通之技术篇,Storm入门视频教程用到技术:Storm集群...
在实践中,开发者应重视对Storm各种术语的准确理解和运用,例如spout、bolt、topology、nimbus和事务性拓扑等。 Storm作为一个开源的实时计算系统,在大数据和云计算领域具有广泛的应用前景。通过阅读和学习Storm...
主分支: ##包裹包战 mvn clean package -DskipTests=true -Dwarcp ./target/storm-ui.war $TOMCAT_HOME/webapps/包装罐 mvn clean package -DskipTests=truecp ./target/storm-ui-*.jar $STORM_HOME/external/...
写第一个Storm应用--数单词数量(一个spout读取文本,第一个bolt用来标准化单词,第二个bolt为单词计数) 一、Storm运行模式: 1.本地模式(Local Mode): 即Topology(相当于一个任务,后续会详细讲解) 运行在本地...
* fault-tolerant:Storm 可以自动恢复故障节点,保证系统的高可用性。 * scalable:Storm 可以根据需要水平扩展,提高系统的处理能力。 * flexible:Storm 支持多种数据源和处理方式,可以满足不同的业务需求。 ...
在Storm中,你可以模拟多个并发用户,进行负载和压力测试,评估Web服务在高并发情况下的稳定性和性能。通过分析响应时间、错误率等指标,可以找出系统的瓶颈和优化方向。 6. 自动化测试: Storm支持脚本化的测试...
4. 鲁棒性:Storm设计目标之一是提供健壮、容易管理的集群,即使在高并发的情况下也能保证系统的稳定运行。 5. 容错性:Storm具备容错能力,当计算出现错误时,系统能够重新分配任务,保证计算的持续进行。 6. 编程...
### Storm原理分析 #### 一、Storm基本结构 Apache Storm 是一个开源的分布式实时计算系统,主要用于处理流式数据。Storm 提供了一种简单而强大的模型来定义并行计算过程,使得用户能够轻松地处理无限的数据流。...
而Apache Kafka则是一个高吞吐量的分布式发布订阅消息系统,常用于构建实时数据管道和流处理应用。将两者结合,可以构建出强大的实时数据处理平台。 **二、写入数据到Kafka** 在Storm-Kafka集成中,首先需要将数据...
- **低延迟和高性能**:Storm设计用于处理高频率、低延迟的数据流,可以处理每秒数百万条消息。 - **分布式和可扩展**:Storm可以在多个服务器上分布运行,通过添加更多的服务器轻松扩展处理能力。 - **容错**:...
Storm Trident作为Storm的一个高级API,提供了更高级别的抽象和事务支持,使得复杂的数据处理变得更加简单和可靠。 **2. Storm的全面讲解** - **深度解析**:课程不仅覆盖了Storm的基础概念和架构,还深入探讨了其...
Apache Storm 是一个分布式实时计算系统,它被设计用于处理大规模数据流,提供高吞吐量、低延迟的数据处理能力。在大数据领域,Storm 被广泛应用于实时分析、在线机器学习、持续计算、数据集成以及任何需要实时处理...
使用Storm可以实现这些功能的自动化和实时化,对于大规模或高并发的Web Service环境尤其有价值。 然而,提供的压缩包文件名称“Storm_r1.1-Adarna”并没有给出足够的信息来详细解释具体如何使用Storm调试Web ...
它的设计目标是处理PB级别的数据,适用于需要低延迟、高并发读写的场景。 三、Storm 与 HBase 集成的原因 1. 实时性:Storm 的实时处理能力与 HBase 的高速写入和查询性能相结合,可以实现实时数据的快速处理和存储...
Apache Storm是一款开源的分布式实时计算系统,它提供了简单而强大的API来定义数据流处理逻辑,同时保证了消息处理的容错性和高性能。Storm的设计目标是成为实时计算领域的Hadoop,支持多种编程语言,并且能够很好地...
Apache Storm(apache-storm-2.3.0.tar.gz) 是一个免费的开源分布式实时计算系统。Apache Storm 可以轻松可靠地处理无限制的数据流,实时处理就像 Hadoop 进行批处理一样。Apache Storm 很简单,可以与任何编程语言...
标题中的"storm开发jar包以及storm例子源码"表明了我们即将探讨的是关于Apache Storm的开发环境设置和示例代码。Apache Storm是一个开源的分布式实时计算系统,它允许开发者处理无限的数据流,常用于大数据实时处理...
在大数据处理领域,Apache Storm是一个实时计算系统,它能够持续处理数据流,实现低延迟、高吞吐量的数据分析。在这个“Storm API实现词频统计”的案例中,我们将深入探讨如何利用Java编程语言和Storm API来构建一个...
Apache Storm是一个开源的分布式实时计算系统,它能够处理无界数据流,确保每个事件都得到正确的处理,即使在高并发和大规模数据输入的情况下也能保持高效。 **一、Storm简介** Apache Storm的核心概念包括:拓扑...