- 浏览: 56441 次
- 性别:
- 来自: 北京
文章分类
最新评论
1、数据源读取,字符发射spout类
2、第一次对字符串加工处下,切割处理bolt类
3、提交topology的main函数及字符统计处理类
4、处理结果
Thread-22-count-executor[3 3]---word:c count:1
Thread-18-count-executor[2 2]---word:b count:1
Thread-32-count-executor[4 4]---word:a count:1
Thread-32-count-executor[4 4]---word:d count:1
Thread-18-count-executor[2 2]---word:b count:2
Thread-32-count-executor[4 4]---word:d count:2
Thread-32-count-executor[4 4]---word:a count:2
Thread-32-count-executor[4 4]---word:d count:3
5、相关总结
1、每一个线程bolt获取处理数据与上一个bolt或spout输出的数据方式一致。
declarer.declare(new Fields("firstSpout"));
2、每一个线程bolt在topology运行中,一直处理运行状态。而声明的全局变量是针对每个线程的全局变量,每一个线程输出统计数据是当前线程的变量数据。
3、每个spout或bolt处理数据时,都可以设置对应的线程数。但spout读取数据时,会重复读取数据。
4、bolt与bolt数据传递,bolt数据输出格式与下一个bolt数据接收格式扭转,都是通过对应的”相同字符”扭转。
6、相关pom.xml文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test</groupId>
<artifactId>StormMavenProject</artifactId>
<packaging>jar</packaging>
<version>0.0.1-SNAPSHOT</version>
<name>StormMavenProject</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
<version>5.0.3</version>
</dependency>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>minlog</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>reflectasm</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-rename-hack</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ring-cors</groupId>
<artifactId>ring-cors</artifactId>
<version>0.1.5</version>
</dependency>
</dependencies>
<build>
<finalName>StormMavenProject</finalName>
</build>
</project>
/** * 字符发射spout类 */ public class RandomSentenceSpout extends BaseRichSpout { private static final long serialVersionUID = 1L; SpoutOutputCollector _collector; Random _rand; public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _rand = new Random(); } public void nextTuple() { String[] sentences = new String[] { sentence("a b c d "), sentence("b d"), sentence("a d") }; for (String sentence : sentences) {// 发射三行数据致bolt处理 _collector.emit(new Values(sentence)); } Utils.sleep(1000 * 1000); } protected String sentence(String input) { return input; } @Override public void ack(Object id) { } @Override public void fail(Object id) { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("firstSpout")); } // Add unique identifier to each tuple, which is helpful for debugging public static class TimeStamped extends RandomSentenceSpout { private final String prefix; public TimeStamped() { this(""); } public TimeStamped(String prefix) { this.prefix = prefix; } protected String sentence(String input) { return prefix + currentDate() + " " + input; } private String currentDate() { return new SimpleDateFormat("yyyy.MM.dd_HH:mm:ss.SSSSSSSSS").format(new Date()); } } }
2、第一次对字符串加工处下,切割处理bolt类
/** * 字符切割处理bolt类 */ public class MysplitBolt implements IBasicBolt { private static final long serialVersionUID = 1L; String patton; public MysplitBolt(String patton) { this.patton = patton; } /** * 接收处理每一行数据 */ public void execute(Tuple input, BasicOutputCollector collector) { try { String sen = input.getStringByField("firstSpout"); if (sen != null) { for (String word : sen.split(patton)) {// 发射多个字符数据,让下一级bolt处理 collector.emit(new Values(word)); } } } catch (FailedException e) { e.printStackTrace();// TODO: handle exception } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("singleWord")); } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } public void prepare(Map stormConf, TopologyContext context) { // TODO Auto-generated method stub } public void cleanup() { // TODO Auto-generated method stub } }
3、提交topology的main函数及字符统计处理类
public class WordCountTopology { /** * 提交topology的main函数及字符统计处理类 */ public static class SplitSentence extends ShellBolt implements IRichBolt { private static final long serialVersionUID = 1L; /** * 字符统计处理bolt类 */ public static class WordCount extends BaseBasicBolt { private static final long serialVersionUID = 1L; // 声明当前线程全局变量,统计字母个数,线程一直处于运行状态 Map<String, Integer> counts = new HashMap<String, Integer>(); public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) { count = 0; } count++; counts.put(word, count); System.err.println(Thread.currentThread().getName() + "---word:" + word + " count:" + count); collector.emit(new Values(word, count)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } //提交topology的main函数 public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); //读取数据用1个线程,防止数据重复读取 builder.setSpout("spout", new RandomSentenceSpout(), 1); //从spout源读取数据,设置2个线程处理字符分割 builder.setBolt("split", new MysplitBolt(" "), 2).shuffleGrouping("spout"); /** * 上个bolt接收数据,设置3个线程处理数据统计。 * Fields Grouping:按Field分组,相同的tuple会分发给同一个线程(Executer或task)处理。 * 比如按singleWord来分组, 具有同样singleWord的tuple会被分到相同的Bolts, 而不同的word则会被分配到不同的Bolts。 */ builder.setBolt("count", new WordCount(), 3).fieldsGrouping("split", new Fields("singleWord")); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } } }
4、处理结果
引用
Thread-22-count-executor[3 3]---word:c count:1
Thread-18-count-executor[2 2]---word:b count:1
Thread-32-count-executor[4 4]---word:a count:1
Thread-32-count-executor[4 4]---word:d count:1
Thread-18-count-executor[2 2]---word:b count:2
Thread-32-count-executor[4 4]---word:d count:2
Thread-32-count-executor[4 4]---word:a count:2
Thread-32-count-executor[4 4]---word:d count:3
5、相关总结
引用
1、每一个线程bolt获取处理数据与上一个bolt或spout输出的数据方式一致。
declarer.declare(new Fields("firstSpout"));
2、每一个线程bolt在topology运行中,一直处理运行状态。而声明的全局变量是针对每个线程的全局变量,每一个线程输出统计数据是当前线程的变量数据。
3、每个spout或bolt处理数据时,都可以设置对应的线程数。但spout读取数据时,会重复读取数据。
4、bolt与bolt数据传递,bolt数据输出格式与下一个bolt数据接收格式扭转,都是通过对应的”相同字符”扭转。
6、相关pom.xml文件
引用
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test</groupId>
<artifactId>StormMavenProject</artifactId>
<packaging>jar</packaging>
<version>0.0.1-SNAPSHOT</version>
<name>StormMavenProject</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
<version>5.0.3</version>
</dependency>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>minlog</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>reflectasm</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-rename-hack</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ring-cors</groupId>
<artifactId>ring-cors</artifactId>
<version>0.1.5</version>
</dependency>
</dependencies>
<build>
<finalName>StormMavenProject</finalName>
</build>
</project>
发表评论
-
ITridentSpout、FirstN(取Top N)实现、 流合并和join
2017-05-25 10:01 1029一、ITridentSpout 基于事务 static int ... -
Trident实战之计算网站PV
2017-05-24 13:24 6461、Trident实战之计算网站PV /** * ... -
Trident API和概念
2017-05-23 10:57 748一、Trident API——Spout ITride ... -
Trident入门
2017-05-22 13:44 513英文原址:https://github.com/nathanm ... -
分布式远程调用drpc实例
2017-05-22 10:53 415一、DRPC定义 分布式dRPC(distributed RP ... -
不透明分区事务IOpaquePartitionedTransactional实例
2017-05-22 10:54 6771、spout public class MyOpaq ... -
分区事务IPartitionedTransactionalSpout实例
2017-05-21 11:02 5791.分区事务spout public class My ... -
普通事务ITransactionalSpout实例之按天统计数据
2017-05-20 16:56 4841、普通事务Spout /** * 普通事务Spou ... -
普通事务ITransactionalSpout实例
2017-05-20 15:45 8191、普通事务Spout /** * 普通事务Spou ... -
Storm事务API
2017-05-19 16:00 611Spout ITransactionalSpout<T& ... -
Storm批处理事务原理详解
2017-05-19 15:54 2095事务:Storm容错机制通 ... -
集群统一启动和停止shell脚本开发
2017-05-17 09:56 4461、cd 2、ls -al 显示隐藏目录 3、rm -rf ... -
storm高并发UV统计
2017-05-14 22:05 1127统计高并发UV可行的方案(类似WordCount的计算去重wo ... -
storm高并发PV统计,利用zookeeper锁输出汇总值
2017-05-14 14:42 892汇总型方案: 1、shuffleGrouping下,pv(单线 ... -
storm高并发PV统计
2017-04-16 17:54 682一、PV统计思考 方案需要考虑分析多线程下,注意线程安全问题。 ... -
Storm高并发运用WordSum
2017-04-16 14:21 10671、创建发射所有字符串统计总个数及去重个数处理类 pub ... -
storm分组策略介绍
2017-04-16 11:46 700一、storm数据来源 Spout的数据源: MQ:直接流数 ... -
Storm高并发介绍
2017-04-16 10:18 586并发度: worker:指的是component (spo ... -
Storm 本地模式
2017-04-09 22:25 392本地模式,是在eclipse等编译器编写strom运行文件 ... -
Storm启动配置
2017-03-29 17:40 667一、安装Storm wget ...
相关推荐
【标题】:“Storm 上手 demo 例子演示” 【描述】:“Storm demo例子案例” 这篇文章将深入探讨Apache Storm,一个开源的分布式实时计算系统,通过实际的demo案例来帮助你快速上手。Apache Storm的设计目标是处理...
【标题】"storm集成kafka插demo.zip"指的是一个演示如何将Apache Storm与Apache Kafka集成的实例项目。这个压缩包包含了一个示例,用于展示如何在Storm拓扑中消费和处理Kafka的消息。 【描述】"storm集成kafka插件...
标题“storm_Kafka_demo”指的是一个使用Apache Storm处理Kafka数据流的示例项目。Apache Storm是一个开源的分布式实时计算系统,它允许开发者处理无界数据流,而Kafka是高吞吐量的分布式发布订阅消息系统。在这个...
【storm统计单词数的demo】是一个基于Apache Storm的入门级示例,旨在帮助初学者理解这个分布式实时计算系统的运作机制。Apache Storm是一个开源的流处理框架,它允许开发者处理无界数据流,实现高吞吐量、低延迟的...
生成的JAR文件(如`storm_demo.jar`)可以用Storm的命令行工具提交到本地或远程的Storm集群上运行,例如`storm jar storm_demo.jar WordCountTopology wordcount`。 5. **监控与调试** Storm提供了Web UI,可以...
【标题】"StormDemo.tar.gz" 是一个与Apache Storm相关的压缩包文件,它提供了一个入门级别的示例,帮助用户理解并开始使用这个分布式实时计算系统。Apache Storm是一个开源的流处理框架,它允许开发者处理和分析...
【storm demo】是一个基于Apache Storm的实践项目,旨在教授如何运用Storm的核心组件——Spout、Bolt和Topology来构建实时数据处理系统。Apache Storm是一个开源的分布式实时计算系统,能够处理大量的流数据,并保证...
simple storm demosimple storm demosimple storm demosimple storm demosimple storm demosimple storm demo
"jstorm storm入门demo" 这个标题表明了这是一个关于JStorm和Storm框架的基础教学示例。JStorm是阿里巴巴开源的一个分布式实时计算系统,它基于Apache Storm,但提供了更稳定、高性能以及易用的特性。这个demo可能是...
描述提到"stormdemo实现不安装storm系列软件的基础上,运行调试storm拓扑",这暗示了该项目包含了一种方法,可以在没有预先安装Apache Storm的情况下运行和调试Storm拓扑。通常,这可以通过使用Maven的`storm-...
本资料“stormdemo.zip”提供了一个关于Storm的实战示例,名为“stormdemo”,旨在帮助用户深入理解并掌握Storm的核心概念和操作流程。 Apache Storm是一个开源的分布式实时计算系统,它允许开发者连续处理数据流,...
总结来说,"storm之drpc操作demo示例"是一个很好的学习资源,它涵盖了Storm DRPC的核心概念和实践操作,对于想要在实时计算项目中运用Storm DRPC功能的开发者来说,极具参考价值。通过实际操作这个示例,你将能够...
简单的storm入门示例,从0到1让你清楚的理解storm.下面是代码示例: import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology...
标题中的“地区销售实时统计”指的是一个实时数据分析系统,它能够快速收集、处理并展示不同地区的销售数据。这个系统利用了Kafka、Storm、HBase、Servlet和Highcharts等技术来实现这一目标。 Kafka是一个分布式流...
Storm JAVA版上手demo下载地址 整理学习Storm过程中的代码和文档 更新中......
"storm-demo" 项目是针对 Storm 的一个入门级示例,旨在帮助初学者快速理解 Storm 的工作原理和编程模型。 在 Java 开发环境中,Storm 可以通过编写 Bolt 和 Spout 组件来构建实时处理管道。Bolt 负责执行业务逻辑...
我们知道storm的作用主要是进行流式计算,对于源源不断的均匀数据流流入处理是非常有效的,而现实生活中大部分场景并不是均匀的数据流,而是时而多时而少的数据流入,这种情况下显然用批量处理是不合适的,如果使用...
在分布式计算领域,Apache Storm是一个实时处理系统,它允许开发者处理和分析连续的数据流。Storm的设计目标是确保数据处理的高可靠性和低延迟。在Storm中,"ack机制"是实现数据可靠传输的关键特性,它确保了每个...
在"storm实时单词计数"这个场景中,我们主要探讨的是如何使用Storm来实现一个实时分析应用,该应用可以统计输入文本中的单词数量。 在Storm中,数据流是通过拓扑结构(Topology)进行组织的,由多个 bolts(处理...
本篇将详细探讨"storm-word-count-demo4.zip"这个项目,这是一个基于Storm的Java实现的Word Count示例,旨在帮助初学者理解如何在Storm框架下进行实时数据处理。 一、Spout组件 在Storm中,Spout是数据流的源头,...