- 浏览: 2188862 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (682)
- 软件思想 (7)
- Lucene(修真篇) (17)
- Lucene(仙界篇) (20)
- Lucene(神界篇) (11)
- Solr (48)
- Hadoop (77)
- Spark (38)
- Hbase (26)
- Hive (19)
- Pig (25)
- ELK (64)
- Zookeeper (12)
- JAVA (119)
- Linux (59)
- 多线程 (8)
- Nutch (5)
- JAVA EE (21)
- Oracle (7)
- Python (32)
- Xml (5)
- Gson (1)
- Cygwin (1)
- JavaScript (4)
- MySQL (9)
- Lucene/Solr(转) (5)
- 缓存 (2)
- Github/Git (1)
- 开源爬虫 (1)
- Hadoop运维 (7)
- shell命令 (9)
- 生活感悟 (42)
- shell编程 (23)
- Scala (11)
- MongoDB (3)
- docker (2)
- Nodejs (3)
- Neo4j (5)
- storm (3)
- opencv (1)
最新评论
-
qindongliang1922:
粟谷_sugu 写道不太理解“分词字段存储docvalue是没 ...
浅谈Lucene中的DocValues -
粟谷_sugu:
不太理解“分词字段存储docvalue是没有意义的”,这句话, ...
浅谈Lucene中的DocValues -
yin_bp:
高性能elasticsearch ORM开发库使用文档http ...
为什么说Elasticsearch搜索是近实时的? -
hackWang:
请问博主,有用solr做电商的搜索项目?
Solr中Group和Facet的用法 -
章司nana:
遇到的问题同楼上 为什么会返回null
Lucene4.3开发之第八步之渡劫初期(八)
有关strom的具体介绍,本文不再过多叙述,不了解的朋友可参考之前的文章
http://qindongliang.iteye.com/category/361820
本文主要以一个简单的wordcount例子,来了解下storm应用程序的开发,虽然只是一个简单的例子
但麻雀虽小,五脏俱全,主要涉及的内容:
(1)wordcount的拓扑定义
(2)spout的使用
(3)bolt的使用
(4)tick定时器的使用
(5) bolt之间数据传输的坑
简单的数据流程图如下:
提交到storm集群上的拓扑图:
maven项目的pom依赖:
(1)Topology主拓扑类:
(2)Spout数据源类
(3)Split的bolt类
(4)Sum的bolt类
(5)Show的bolt类
(6)Final的bolt类
有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。
技术债不能欠,健康债更不能欠, 求道之路,与君同行。
http://qindongliang.iteye.com/category/361820
本文主要以一个简单的wordcount例子,来了解下storm应用程序的开发,虽然只是一个简单的例子
但麻雀虽小,五脏俱全,主要涉及的内容:
(1)wordcount的拓扑定义
(2)spout的使用
(3)bolt的使用
(4)tick定时器的使用
(5) bolt之间数据传输的坑
简单的数据流程图如下:
提交到storm集群上的拓扑图:
maven项目的pom依赖:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.jstrom.demo</groupId> <artifactId>jstrom-test</artifactId> <version>1.0-SNAPSHOT</version> <properties> <jstorm.version>2.1.1</jstorm.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <slf4j.version>1.7.12</slf4j.version> <joad-time.version>2.9.4</joad-time.version> <storm-kafka.version>0.9.4</storm-kafka.version> <kafka.version>0.9.0.0</kafka.version> <esper.version>5.4.0</esper.version> </properties> <dependencies> <!-- https://mvnrepository.com/artifact/com.espertech/esper --> <!-- https://mvnrepository.com/artifact/joda-time/joda-time --> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>${joad-time.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>${kafka.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba.jstorm</groupId> <artifactId>jstorm-core</artifactId> <version>${jstorm.version}</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>${storm-kafka.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-jdk14</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-nop</artifactId> <version>${slf4j.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> <mainClass>换成自己的主类</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-my-jar-with-dependencies</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
(1)Topology主拓扑类:
package com.jstorm.wd; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; /** * Created by QinDongLiang on 2016/9/12. */ public class TopologyWordCount { public static void main(String[] args) throws Exception { TopologyBuilder builder=new TopologyBuilder(); //设置数据源 builder.setSpout("spout",new CreateSentenceSpout(),1); //读取spout数据源的数据,进行split业务逻辑 builder.setBolt("split",new SplitWordBolt(),1).shuffleGrouping("spout"); //读取split后的数据,进行count (tick周期10秒) builder.setBolt("count",new SumWordBolt(),1).fieldsGrouping("split",new Fields("word")); //读取count后的数据,进行缓冲打印 (tick周期3秒,仅仅为测试tick使用,所以多加了这个bolt) builder.setBolt("show",new ShowBolt(),1).shuffleGrouping("count"); //读取show后缓冲后的数据,进行最终的打印 (实际应用中,最后一个阶段应该为持久层) builder.setBolt("final",new FinalBolt(),1).allGrouping("show"); Config config=new Config(); config.setDebug(false); //集群模式 if(args!=null&&args.length>0){ config.setNumWorkers(2); StormSubmitter.submitTopology(args[0],config,builder.createTopology()); //单机模式 }else{ config.setMaxTaskParallelism(1);; LocalCluster cluster=new LocalCluster(); cluster.submitTopology("word-count",config,builder.createTopology()); Thread.sleep(3000000); cluster.shutdown(); } } }
(2)Spout数据源类
package com.jstorm.wd; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import org.joda.time.DateTime; import java.util.Map; import java.util.Random; /** * Created by QinDongLiang on 2016/8/31. * 创建数据源 */ public class CreateSentenceSpout extends BaseRichSpout { // SpoutOutputCollector collector; Random random; String [] sentences=null; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.collector=spoutOutputCollector;//spout_collector random=new Random();// sentences=new String[]{"hadoop hadoop hadoop java java "}; } @Override public void nextTuple() { Utils.sleep(10000); //获取数据 String sentence=sentences[random.nextInt(sentences.length)]; System.out.println("线程名:"+Thread.currentThread().getName()+" "+new DateTime().toString("yyyy-MM-dd HH:mm:ss ")+"10s发射一次数据:"+sentence); //向下游发射数据 this.collector.emit(new Values(sentence)); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("sentence")); } }
(3)Split的bolt类
package com.jstorm.wd; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import java.util.HashMap; import java.util.Map; /** * 简单的按照空格进行切分后,发射到下一阶段bolt * Created by QinDongLiang on 2016/8/31. */ public class SplitWordBolt extends BaseRichBolt { Map<String,Integer> counts=new HashMap<>(); private OutputCollector outputCollector; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.outputCollector=outputCollector; } @Override public void execute(Tuple tuple) { String sentence=tuple.getString(0); // System.out.println("线程"+Thread.currentThread().getName()); // 简单的按照空格进行切分后,发射到下一阶段bolt for(String word:sentence.split(" ") ){ outputCollector.emit(new Values(word));//发送split } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { //声明输出的filed outputFieldsDeclarer.declare(new Fields("word")); } }
(4)Sum的bolt类
package com.jstorm.wd; import backtype.storm.Config; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import backtype.storm.utils.TupleHelpers; import backtype.storm.utils.Utils; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.*; import java.util.HashMap; import java.util.Map; /** * Created by QinDongLiang on 2016/8/31. */ public class SumWordBolt extends BaseRichBolt { Map<String,Integer> counts=new HashMap<>(); private OutputCollector outputCollector; final static Logger logger= LoggerFactory.getLogger(SumWordBolt.class); @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.outputCollector=outputCollector; } @Override public Map<String, Object> getComponentConfiguration() { Map<String, Object> conf = new HashMap<String, Object>(); conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);//加入Tick时间窗口,进行统计 return conf; } public static Object deepCopy(Object srcObj) { Object cloneObj = null; try { ByteArrayOutputStream out = new ByteArrayOutputStream(); ObjectOutputStream oo = new ObjectOutputStream(out); oo.writeObject(srcObj); ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray()); ObjectInputStream oi = new ObjectInputStream(in); cloneObj = oi.readObject(); } catch(IOException e) { e.printStackTrace(); } catch(ClassNotFoundException e) { e.printStackTrace(); } return cloneObj; } @Override public void execute(Tuple tuple) { //时间窗口定义为10s内的统计数据,统计完毕后,发射到下一阶段的bolt进行处理 //发射完成后retun结束,开始新一轮的时间窗口计数操作 if(TupleHelpers.isTickTuple(tuple)){ System.out.println(new DateTime().toString("yyyy-MM-dd HH:mm:ss")+" 每隔10s发射一次map 大小:"+counts.size()); // Map<String,Integer> copyMap= (Map<String, Integer>) deepCopy(counts); outputCollector.emit(new Values(counts));//10S发射一次 // counts.clear(); counts=new HashMap<>();//这个地方,不能执行clear方法,可以再new一个对象,否则下游接受的数据,有可能为空 或者深度copy也行,推荐new return; } //如果没到发射时间,就继续统计wordcount System.out.println("线程"+Thread.currentThread().getName()+" map 缓冲统计中...... map size:"+counts.size()); //String word=tuple.getString(0);//如果有多tick,就不用使用这种方式获取tuple里面的数据 String word=tuple.getStringByField("word"); Integer count=counts.get(word); if(count==null){ count=0; } count++; counts.put(word,count); // System.out.println(word+" =====> "+count); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word_map")); } }
(5)Show的bolt类
/** * Created by QinDongLiang on 2016/9/12. */ public class ShowBolt extends BaseRichBolt { private OutputCollector outputCollector; @Override public Map<String, Object> getComponentConfiguration() { Map<String, Object> conf = new HashMap<String, Object>(); conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 3);//tick时间窗口3秒后,发射到下一阶段的bolt,仅为测试用 return conf; } @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.outputCollector=outputCollector; } Map<String,Integer> counts=new HashMap<>(); @Override public void execute(Tuple tuple) { //tick时间窗口3秒后,发射到下一阶段的bolt,仅为测试用,故多加了这个bolt逻辑 if(TupleHelpers.isTickTuple(tuple)){ System.out.println(new DateTime().toString("yyyy-MM-dd HH:mm:ss")+" showbolt间隔 应该是 3 秒后 "); // System.out.println("what: "+tuple.getValue(0)+" "+tuple.getFields().toList()); outputCollector.emit(new Values(counts)); return; } counts= (Map<String, Integer>) tuple.getValueByField("word_map"); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("final_result")); } }
(6)Final的bolt类
package com.jstorm.wd; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; import org.joda.time.DateTime; import java.util.Map; /** * Created by QinDongLiang on 2016/9/12. * 最终的结果打印bolt */ public class FinalBolt extends BaseRichBolt { @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { } @Override public void execute(Tuple tuple) { // 最终的结果打印bolt System.out.println(new DateTime().toString("yyyy-MM-dd HH:mm:ss")+" final bolt "); Map<String,Integer> counts= (Map<String, Integer>) tuple.getValue(0); for(Map.Entry<String,Integer> kv:counts.entrySet()){ System.out.println(kv.getKey()+" "+kv.getValue()); } //实际应用中,最后一个阶段,大部分应该是持久化到mysql,redis,es,solr或mongodb中 } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } }
有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。
技术债不能欠,健康债更不能欠, 求道之路,与君同行。
相关推荐
本压缩包提供的"storm之WordCount示例Java代码"是针对Storm的一个经典入门教程,展示了如何使用Java语言实现一个简单的WordCount程序。这个程序的主要目标是统计文本数据流中的单词出现次数。 首先,我们需要理解...
在这个“Storm的WordCount实例”中,我们将深入探讨如何利用Storm来实现经典的WordCount程序,这是一个在大数据处理中常见的示例,用于统计文本中的单词出现频率。 首先,理解Storm的基本架构是至关重要的。Storm由...
"storm-wordcount"是Storm中的一个经典示例,用于演示如何处理实时数据流并进行简单的统计计算,类似于Hadoop的WordCount程序。这个例子的核心目标是统计输入文本中每个单词出现的次数。 在Storm中,数据流被抽象为...
文档详细的描述了Hadoop在Linux上的安装过程,并且附带了Wordcount程序示例
在"Spark local下 WordCount运行示例"中,我们将探讨如何在本地模式(local mode)下使用Spark执行一个简单的WordCount程序。WordCount是大数据处理领域的一个经典例子,用于统计文本中各个单词出现的次数。 首先,...
### Hadoop集群中WordCount示例详解 #### Hadoop简介 Hadoop是一个开源软件框架,用于分布式存储和处理大规模数据集。它通过提供一个高效、可靠且可扩展的环境来支持大数据处理应用,使得开发者可以在商品硬件上...
本项目主要实现的功能是:统计单词的个数 jdk1.8 jstorm2.2.1 执行步骤: 1. 本地正确安装maven 2. 本地正确安装zookeeper,并启动 3. Idea导入项目源码,以...4. 可分别运行random或wordcount下topology下的main类
【标题】"storm_wordcount.zip" 是一个基于Java开发的Apache Storm项目,主要实现的功能是对英语单词进行实时统计。Storm是一个分布式实时计算系统,能够处理海量数据流,并保证每个事件只被处理一次(Exactly-once...
**Storm本地模式WordCount亲测可用** 在大数据处理领域,Apache Storm是一个实时计算框架,它被广泛用于处理无界数据流。"Storm本地模式"是Storm提供的一种在单机环境中进行开发和测试的机制,无需分布式环境即可...
Hadoop示例程序WordCount运行及详解 Hadoop平台上进行WordCount是非常重要的,掌握了WordCount可以更好地理解Hadoop的map-reduce编程模型。本文将详细讲解Hadoop平台上WordCount的运行和实现。 基于Hadoop的map-...
标题中的"storm开发jar包以及storm例子源码"表明了我们即将探讨的是关于Apache Storm的开发环境设置和示例代码。Apache Storm是一个开源的分布式实时计算系统,它允许开发者处理无限的数据流,常用于大数据实时处理...
WordCount 是 MapReduce 框架中经典的入门示例,它统计文本文件中每个单词出现的次数。在这个案例中,我们将深入探讨如何在 Hadoop 环境中使用 MapReduce 实现 WordCount。 【描述】在 Hadoop 环境中,WordCount 的...
### Linux环境下运行Eclipse以测试WordCount示例 #### Eclipse简介 Eclipse 是一款非常流行的开源集成开发环境(IDE)。其强大的功能不仅限于Java应用的开发,还通过丰富的插件支持多种编程语言如C/C++、Python、...
在Storm中,WordCount示例会演示如何实时地统计文本中的单词出现次数,这将帮助初学者快速上手Storm的编程模型。 通过深入学习这些资料,你不仅能掌握Storm的集群搭建,还能了解到其核心概念、编程模型以及实际应用...
而"WordCount"是Hadoop入门的经典示例,用于演示如何处理大规模数据。这个例子简单直观,帮助初学者理解Hadoop MapReduce的工作原理。下面我们将详细讨论这个"wordcount"示例及其相关知识点。 一、Hadoop简介 ...
在“mapreduce:用于罗马尼亚大数据研讨会的 Wordcount MapReduce 示例”中,我们将探讨如何使用Java实现这一经典案例,以及它在大数据处理中的应用。 MapReduce的工作流程如下: 1. **映射(Map)阶段**: 在这个...
生成的JAR文件(如`storm_demo.jar`)可以用Storm的命令行工具提交到本地或远程的Storm集群上运行,例如`storm jar storm_demo.jar WordCountTopology wordcount`。 5. **监控与调试** Storm提供了Web UI,可以...
wordcount 项目说明 WordCount, 一个Storm入门实例。 实现了如下的流程: 抓取ChinaDaily的网页内容作为数据源;对数据进行分词处理,按词频排序并打印排序结果。 相关信息 作者:robin 博客地址:
【标题】"最简单MR WordCount" 涉及到的是MapReduce编程模型中的一个经典示例,WordCount。在Hadoop生态系统中,WordCount是一个基础但非常重要的应用,用于统计文本文件中每个单词出现的次数。这个程序展示了...