本例实现的是本地模式运行storm的wordcount demo!
开发过程中,可以用本地模式来运行Storm,这样就能在本地开发,在进程中测试Topology。一切就绪后,以远程模式运行 Storm,提交用于在集群中运行的Topology。
创建工程:demo-storm
目录结构如下:
demo-storm
——src/main/java
————com.youku.demo
————————bolts
————————spouts
——src/test/java
——src/main/resource
————words.txt
WordCounter.java:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
package com.youku.demo.bolts; import java.util.HashMap; import java.util.Map; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple; public class WordCounter extends BaseBasicBolt { Integer id; String name; Map<String, Integer> counters; /** * At the end of the spout (when the cluster is shutdown * We will show the word counters */ @Override public void cleanup() { System.out.println( "-- Word Counter [" +name+ "-" +id+ "] --" ); for (Map.Entry<String, Integer> entry : counters.entrySet()){ System.out.println(entry.getKey()+ ": " +entry.getValue()); } } /** * On create */ @Override public void prepare(Map stormConf, TopologyContext context) { this .counters = new HashMap<String, Integer>(); this .name = context.getThisComponentId(); this .id = context.getThisTaskId(); } public void declareOutputFields(OutputFieldsDeclarer declarer) {} public void execute(Tuple input, BasicOutputCollector collector) { String str = input.getString( 0 ); /** * If the word dosn't exist in the map we will create * this, if not We will add 1 */ if (!counters.containsKey(str)){ counters.put(str, 1 ); } else { Integer c = counters.get(str) + 1 ; counters.put(str, c); } } } |
WordNormalizer.java:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
package com.youku.demo.bolts; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class WordNormalizer extends BaseBasicBolt { public void cleanup() {} /** * The bolt will receive the line from the * words file and process it to Normalize this line * * The normalize will be put the words in lower case * and split the line to get all words in this */ public void execute(Tuple input, BasicOutputCollector collector) { String sentence = input.getString( 0 ); String[] words = sentence.split( " " ); for (String word : words){ word = word.trim(); if (!word.isEmpty()){ word = word.toLowerCase(); collector.emit( new Values(word)); } } } /** * The bolt will only emit the field "word" */ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields( "word" )); } } |
WordReader.java:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
package com.youku.demo.spouts; import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.util.Map; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class WordReader extends BaseRichSpout { private SpoutOutputCollector collector; private FileReader fileReader; private boolean completed = false ; public void ack(Object msgId) { System.out.println( "OK:" +msgId); } public void close() {} public void fail(Object msgId) { System.out.println( "FAIL:" +msgId); } /** * The only thing that the methods will do It is emit each * file line */ public void nextTuple() { /** * The nextuple it is called forever, so if we have been readed the file * we will wait and then return */ if (completed){ try { Thread.sleep( 1000 ); } catch (InterruptedException e) { //Do nothing } return ; } String str; //Open the reader BufferedReader reader = new BufferedReader(fileReader); try { //Read all lines while ((str = reader.readLine()) != null ){ /** * By each line emmit a new value with the line as a their */ this .collector.emit( new Values(str),str); } } catch (Exception e){ throw new RuntimeException( "Error reading tuple" ,e); } finally { completed = true ; } } /** * We will create the file and get the collector object */ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { try { this .fileReader = new FileReader(conf.get( "wordsFile" ).toString()); } catch (FileNotFoundException e) { throw new RuntimeException( "Error reading file [" +conf.get( "wordFile" )+ "]" ); } this .collector = collector; } /** * Declare the output field "word" */ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields( "line" )); } } |
TopologyMain.java:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
package com.youku.demo; import com.youku.demo.bolts.WordCounter; import com.youku.demo.bolts.WordNormalizer; import com.youku.demo.spouts.WordReader; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; public class TopologyMain { public static void main(String[] args) throws InterruptedException { //Topology definition TopologyBuilder builder = new TopologyBuilder(); builder.setSpout( "word-reader" , new WordReader()); builder.setBolt( "word-normalizer" , new WordNormalizer()) .shuffleGrouping( "word-reader" ); builder.setBolt( "word-counter" , new WordCounter(), 1 ) .fieldsGrouping( "word-normalizer" , new Fields( "word" )); //Configuration Config conf = new Config(); conf.put( "wordsFile" , args[ 0 ]); conf.setDebug( true ); //Topology run conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1 ); LocalCluster cluster = new LocalCluster(); cluster.submitTopology( "Getting-Started-Toplogie" , conf, builder.createTopology()); Thread.sleep( 2000 ); cluster.shutdown(); } } |
pom.xml:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
< project xmlns = "http://maven.apache.org/POM/4.0.0" xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > < modelVersion >4.0.0</ modelVersion > < groupId >com.youku.demo</ groupId > < artifactId >demo-storm</ artifactId > < version >0.0.1-SNAPSHOT</ version > < packaging >jar</ packaging > < name >demo-storm</ name > < url >http://maven.apache.org</ url > < properties > < project.build.sourceEncoding >UTF-8</ project.build.sourceEncoding > </ properties > < build > < plugins > < plugin > < groupId >org.apache.maven.plugins</ groupId > < artifactId >maven-compiler-plugin</ artifactId > < version >2.3.2</ version > < configuration > < source >1.6</ source > < target >1.6</ target > < compilerVersion >1.6</ compilerVersion > </ configuration > </ plugin > </ plugins > </ build > < repositories > <!-- Repository where we can found the storm dependencies --> < repository > < id >clojars.org</ id > < url >http://clojars.org/repo</ url > </ repository > </ repositories > < dependencies > <!-- Storm Dependency --> < dependency > < groupId >storm</ groupId > < artifactId >storm</ artifactId > < version >0.8.0</ version > </ dependency > < dependency > < groupId >junit</ groupId > < artifactId >junit</ artifactId > < version >3.8.1</ version > < scope >test</ scope > </ dependency > </ dependencies > </ project > |
words.txt:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
storm test are great is an storm simple application but very powerfull really StOrm is great |
运行的时候需要配置参数:src/main/resources/words.txt 指定输入文件
日志输出:
会报好多zookeeper异常,还有最后的日志文件无法删除的异常,目前忽略了,O(∩_∩)O呵呵~
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
271 [main-SendThread(localhost:2000)] WARN org.apache.zookeeper.ClientCnxn - Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect java.net.SocketException: Address family not supported by protocol family: connect at sun.nio.ch.Net.connect(Native Method) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:507) at org.apache.zookeeper.ClientCnxn $SendThread .startConnect(ClientCnxn.java:1050) at org.apache.zookeeper.ClientCnxn $SendThread .run(ClientCnxn.java:1077) java.io.IOException: Unable to delete file: C:\Users\ThinkPad\AppData\Local\Temp\3fbb080f-e585-42e6-8b1b-d6ae024503ac\version-2\log.1 at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:1390) at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1044) at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:977) at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:1381) at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1044) at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:977) at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:1381) at backtype.storm.util $rmr .invoke(util.clj:413) at backtype.storm.testing $kill_local_storm_cluster .invoke(testing.clj:163) at backtype.storm.LocalCluster $_shutdown .invoke(LocalCluster.clj:21) at backtype.storm.LocalCluster.shutdown(Unknown Source) at com.youku.demo.TopologyMain.main(TopologyMain.java:33) |
相关推荐
5. **提交拓扑**:一旦在本地模式下测试成功,你可以将拓扑提交到运行Storm的集群,进行实时处理。 6. **Zookeeper的使用**:Storm依赖Zookeeper进行集群管理和协调,理解Zookeeper的角色和配置也很重要。 7. **...
3. **本地模式**:在开发过程中,可以使用本地模式在单机上模拟全分布式环境,方便快速测试和调试。 4. ** storm.yaml配置**:配置文件storm.yaml用于设置Storm集群的参数,如nimbus服务器地址、supervisor节点配置...
4. **本地模式**:在本地环境中运行拓扑,进行测试和调试。 5. **提交到集群**:了解如何配置和提交拓扑到真实的Storm集群,以处理大规模的数据流。 6. **容错性**:学习Storm如何确保数据流的可靠性和无丢失处理。 ...
4. **本地模式运行**:在开发和测试阶段,Storm提供了一个本地模式,可以在单机上模拟整个集群的运行情况,无需真正部署到集群。 5. **Java编程**:编写Storm拓扑通常使用Java或JVM语言(如Scala),利用Storm的...
描述提到"jstorm storm 入门demo,包含本地模式 和 集群模式",这表明示例涵盖了两种运行模式。本地模式允许开发者在单机环境下快速测试和调试拓扑,而集群模式则是在分布式环境中运行大规模实时处理任务。四个类的...
"storm"和"kafka"是两个关键的技术组件,"插件"可能指的是在Storm中用于连接到Kafka的特定库或组件,如storm-kafka或者storm-kafka-client,而"demo"意味着这是一段可运行的代码,用于演示如何操作。 【压缩包子...
5. **本地运行与集群部署**:Storm提供了本地模式,可以在单机上测试Topology,验证其正确性。一旦调试完毕,可以将其部署到Storm集群上,实现分布式运行。 6. **容错机制**:Storm通过检查点和状态持久化来确保...
通过"StormDemo"这个练习,你可以了解如何构建和运行一个简单的Storm应用,包括定义Spout和Bolt,配置拓扑,设置分组策略,以及如何在本地和集群环境中部署。实践中,你可以逐步增加复杂性,如实现更复杂的业务逻辑...
要以分布式模式运行拓扑,您需要一个 Storm 集群,或者只是一个 VM。 以下部分将解释如何在 Linux VM 上安装 Storm。 安装风暴服务器 这些说明是通过阅读大量文档和网络搜索而得到的。 最重要的资源是一篇博文。 ...
`storm drpc`命令会在本地启动一个DRPC服务器。 2. **创建DRPC拓扑**:编写一个Storm拓扑,其中包含DRPC bolt。DRPC bolt是执行DRPC请求的特殊类型,它会从DRPC服务器接收请求,执行指定的操作,然后返回结果。 3....
通过学习和实践这个项目,开发者可以深入理解Storm的基本原理,包括Spout和Bolt的工作方式、拓扑结构的构建,以及如何在实际环境中部署和运行Storm应用。同时,这个项目也提供了扩展和优化实时数据处理逻辑的基础,...
1.本地模式(Local Mode): 即Topology(相当于一个任务,后续会详细讲解) 运行在本地机器的单一JVM上,这个模式主要用来开发、调试。 2.远程模式(Remote Mode):在这个模式,我们把我们的Topology提交到集群,在这个...
要开始使用 alfresco-apache-storm-demo,建议您在本地模式下运行 CrawlTopology。 注意:这些说明假定您已安装 Maven。 首先,从github克隆项目: git clone ...
通过运行这个项目,学习者可以了解如何在Java中编写Storm的Spout和Bolt,如何构建拓扑结构,以及如何在本地模式或分布式模式下提交和运行Storm应用。同时,通过参考博客文章,可以获得更深入的理论知识和实践技巧。...
05-mr程序的本地运行模式.avi 06-job提交的逻辑及YARN框架的技术机制.avi 07-MR程序的几种提交运行模式.avi 08-YARN的通用性意义.avi 09-yarn的job提交流程.avi 第四天 常见mr算法实现和shuffle的机制 01-...
在IT行业中,CEP(Complex Event Processing)是一种高级的数据处理技术,主要用于实时分析和识别具有复杂关系或模式的事件流。在这个"cep-football-demo"项目中,我们可以推测它是一个使用JavaScript实现的CEP演示...
日志分析部分讲述了如何通过wrapper日志、mycat日志以及debug模式下的sql执行分析来检查和优化Mycat的运行情况,同时也包括了异常日志的处理。 在防火墙配置部分,介绍了如何配置防火墙规则以确保Mycat的安全运行。...
这部分应该会介绍不同方式的全局序列号生成机制,包括本地文件方式、数据库方式、本地时间戳方式、分布式ZKID生成器和zk递增方式等。 ### 第10章 MYCAT分片规则 #### 10.1 分片规则概述 解释了分片规则在Mycat中...