`
wbj0110
  • 浏览: 1587711 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

本地模式运行storm的demo

阅读更多

本例实现的是本地模式运行storm的wordcount demo!

开发过程中,可以用本地模式来运行Storm,这样就能在本地开发,在进程中测试Topology。一切就绪后,以远程模式运行 Storm,提交用于在集群中运行的Topology。

创建工程:demo-storm
目录结构如下:
demo-storm
——src/main/java
————com.youku.demo
————————bolts
————————spouts
——src/test/java
——src/main/resource
————words.txt

storm-demo工程目录

storm-demo工程目录

WordCounter.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.youku.demo.bolts;
 
import java.util.HashMap;
import java.util.Map;
 
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
 
public class WordCounter extends BaseBasicBolt {
 
    Integer id;
    String name;
    Map<String, Integer> counters;
 
    /**
     * At the end of the spout (when the cluster is shutdown
     * We will show the word counters
     */
    @Override
    public void cleanup() {
        System.out.println("-- Word Counter ["+name+"-"+id+"] --");
        for(Map.Entry<String, Integer> entry : counters.entrySet()){
            System.out.println(entry.getKey()+": "+entry.getValue());
        }
    }
 
    /**
     * On create
     */
    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        this.counters = new HashMap<String, Integer>();
        this.name = context.getThisComponentId();
        this.id = context.getThisTaskId();
    }
 
    public void declareOutputFields(OutputFieldsDeclarer declarer) {}
 
    public void execute(Tuple input, BasicOutputCollector collector) {
        String str = input.getString(0);
        /**
         * If the word dosn't exist in the map we will create
         * this, if not We will add 1
         */
        if(!counters.containsKey(str)){
            counters.put(str, 1);
        }else{
            Integer c = counters.get(str) + 1;
            counters.put(str, c);
        }
    }
}

WordNormalizer.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.youku.demo.bolts;
 
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
 
public class WordNormalizer extends BaseBasicBolt {
 
    public void cleanup() {}
 
    /**
     * The bolt will receive the line from the
     * words file and process it to Normalize this line
     *
     * The normalize will be put the words in lower case
     * and split the line to get all words in this
     */
    public void execute(Tuple input, BasicOutputCollector collector) {
        String sentence = input.getString(0);
        String[] words = sentence.split(" ");
        for(String word : words){
            word = word.trim();
            if(!word.isEmpty()){
                word = word.toLowerCase();
                collector.emit(new Values(word));
            }
        }
    }
 
    /**
     * The bolt will only emit the field "word"
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

WordReader.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package com.youku.demo.spouts;
 
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
 
public class WordReader extends BaseRichSpout {
 
    private SpoutOutputCollector collector;
    private FileReader fileReader;
    private boolean completed = false;
    public void ack(Object msgId) {
        System.out.println("OK:"+msgId);
    }
    public void close() {}
    public void fail(Object msgId) {
        System.out.println("FAIL:"+msgId);
    }
 
    /**
     * The only thing that the methods will do It is emit each
     * file line
     */
    public void nextTuple() {
        /**
         * The nextuple it is called forever, so if we have been readed the file
         * we will wait and then return
         */
        if(completed){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                //Do nothing
            }
            return;
        }
        String str;
        //Open the reader
        BufferedReader reader = new BufferedReader(fileReader);
        try{
            //Read all lines
            while((str = reader.readLine()) != null){
                /**
                 * By each line emmit a new value with the line as a their
                 */
                this.collector.emit(new Values(str),str);
            }
        }catch(Exception e){
            throw new RuntimeException("Error reading tuple",e);
        }finally{
            completed = true;
        }
    }
 
    /**
     * We will create the file and get the collector object
     */
    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        try {
            this.fileReader = new FileReader(conf.get("wordsFile").toString());
        } catch (FileNotFoundException e) {
            throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
        }
        this.collector = collector;
    }
 
    /**
     * Declare the output field "word"
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("line"));
    }
}

TopologyMain.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.youku.demo;
 
import com.youku.demo.bolts.WordCounter;
import com.youku.demo.bolts.WordNormalizer;
import com.youku.demo.spouts.WordReader;
 
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
 
public class TopologyMain {
    public static void main(String[] args) throws InterruptedException {
 
        //Topology definition
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("word-reader",new WordReader());
        builder.setBolt("word-normalizer", new WordNormalizer())
            .shuffleGrouping("word-reader");
        builder.setBolt("word-counter", new WordCounter(),1)
            .fieldsGrouping("word-normalizer", new Fields("word"));
 
        //Configuration
        Config conf = new Config();
        conf.put("wordsFile", args[0]);
        conf.setDebug(true);
        //Topology run
        conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
        Thread.sleep(2000);
        cluster.shutdown();
    }
}

pom.xml:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
    <modelVersion>4.0.0</modelVersion>
 
    <groupId>com.youku.demo</groupId>
    <artifactId>demo-storm</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>
 
    <name>demo-storm</name>
    <url>http://maven.apache.org</url>
 
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                    <compilerVersion>1.6</compilerVersion>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <repositories>
        <!-- Repository where we can found the storm dependencies -->
        <repository>
            <id>clojars.org</id>
            <url>http://clojars.org/repo</url>
        </repository>
    </repositories>
 
    <dependencies>
        <!-- Storm Dependency -->
        <dependency>
            <groupId>storm</groupId>
            <artifactId>storm</artifactId>
            <version>0.8.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

words.txt:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
storm
test
are
great
is
an
storm
simple
application
but
very
powerfull
really
StOrm
is
great

运行的时候需要配置参数:src/main/resources/words.txt 指定输入文件

运行命令

运行命令

日志输出:

运行日志

运行日志

会报好多zookeeper异常,还有最后的日志文件无法删除的异常,目前忽略了,O(∩_∩)O呵呵~

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
271  [main-SendThread(localhost:2000)] WARN  org.apache.zookeeper.ClientCnxn  - Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.SocketException: Address family not supported by protocol family: connect
    at sun.nio.ch.Net.connect(Native Method)
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:507)
    at org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:1050)
    at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1077)
 
java.io.IOException: Unable to delete file: C:\Users\ThinkPad\AppData\Local\Temp\3fbb080f-e585-42e6-8b1b-d6ae024503ac\version-2\log.1
    at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:1390)
    at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1044)
    at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:977)
    at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:1381)
    at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1044)
    at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:977)
    at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:1381)
    at backtype.storm.util$rmr.invoke(util.clj:413)
    at backtype.storm.testing$kill_local_storm_cluster.invoke(testing.clj:163)
    at backtype.storm.LocalCluster$_shutdown.invoke(LocalCluster.clj:21)
    at backtype.storm.LocalCluster.shutdown(Unknown Source)
    at com.youku.demo.TopologyMain.main(TopologyMain.java:33)
分享到:
评论

相关推荐

    Storm 上手 demo 例子 演示

    5. **提交拓扑**:一旦在本地模式下测试成功,你可以将拓扑提交到运行Storm的集群,进行实时处理。 6. **Zookeeper的使用**:Storm依赖Zookeeper进行集群管理和协调,理解Zookeeper的角色和配置也很重要。 7. **...

    storm demo

    3. **本地模式**:在开发过程中,可以使用本地模式在单机上模拟全分布式环境,方便快速测试和调试。 4. ** storm.yaml配置**:配置文件storm.yaml用于设置Storm集群的参数,如nimbus服务器地址、supervisor节点配置...

    StormDemo.tar.gz

    4. **本地模式**:在本地环境中运行拓扑,进行测试和调试。 5. **提交到集群**:了解如何配置和提交拓扑到真实的Storm集群,以处理大规模的数据流。 6. **容错性**:学习Storm如何确保数据流的可靠性和无丢失处理。 ...

    storm demo 单机版 maven

    4. **本地模式运行**:在开发和测试阶段,Storm提供了一个本地模式,可以在单机上模拟整个集群的运行情况,无需真正部署到集群。 5. **Java编程**:编写Storm拓扑通常使用Java或JVM语言(如Scala),利用Storm的...

    jstorm storm入门demo

    描述提到"jstorm storm 入门demo,包含本地模式 和 集群模式",这表明示例涵盖了两种运行模式。本地模式允许开发者在单机环境下快速测试和调试拓扑,而集群模式则是在分布式环境中运行大规模实时处理任务。四个类的...

    storm集成kafka插demo.zip

    "storm"和"kafka"是两个关键的技术组件,"插件"可能指的是在Storm中用于连接到Kafka的特定库或组件,如storm-kafka或者storm-kafka-client,而"demo"意味着这是一段可运行的代码,用于演示如何操作。 【压缩包子...

    storm统计单词数的demo

    5. **本地运行与集群部署**:Storm提供了本地模式,可以在单机上测试Topology,验证其正确性。一旦调试完毕,可以将其部署到Storm集群上,实现分布式运行。 6. **容错机制**:Storm通过检查点和状态持久化来确保...

    storm练习代码

    通过"StormDemo"这个练习,你可以了解如何构建和运行一个简单的Storm应用,包括定义Spout和Bolt,配置拓扑,设置分组策略,以及如何在本地和集群环境中部署。实践中,你可以逐步增加复杂性,如实现更复杂的业务逻辑...

    Storm:使用 Apache Storm 的示例

    要以分布式模式运行拓扑,您需要一个 Storm 集群,或者只是一个 VM。 以下部分将解释如何在 Linux VM 上安装 Storm。 安装风暴服务器 这些说明是通过阅读大量文档和网络搜索而得到的。 最重要的资源是一篇博文。 ...

    storm之drpc操作demo示例.zip

    `storm drpc`命令会在本地启动一个DRPC服务器。 2. **创建DRPC拓扑**:编写一个Storm拓扑,其中包含DRPC bolt。DRPC bolt是执行DRPC请求的特殊类型,它会从DRPC服务器接收请求,执行指定的操作,然后返回结果。 3....

    storm-word-count-demo4.zip

    通过学习和实践这个项目,开发者可以深入理解Storm的基本原理,包括Spout和Bolt的工作方式、拓扑结构的构建,以及如何在实际环境中部署和运行Storm应用。同时,这个项目也提供了扩展和优化实时数据处理逻辑的基础,...

    第一个Storm应用

    1.本地模式(Local Mode): 即Topology(相当于一个任务,后续会详细讲解) 运行在本地机器的单一JVM上,这个模式主要用来开发、调试。 2.远程模式(Remote Mode):在这个模式,我们把我们的Topology提交到集群,在这个...

    alfresco-apache-storm-demo:Apache Storm 与 A​​lfresco 的演示集成以处理内容

    要开始使用 alfresco-apache-storm-demo,建议您在本地模式下运行 CrawlTopology。 注意:这些说明假定您已安装 Maven。 首先,从github克隆项目: git clone ...

    超级简单入门的strom的java代码demo

    通过运行这个项目,学习者可以了解如何在Java中编写Storm的Spout和Bolt,如何构建拓扑结构,以及如何在本地模式或分布式模式下提交和运行Storm应用。同时,通过参考博客文章,可以获得更深入的理论知识和实践技巧。...

    新版Hadoop视频教程 段海涛老师Hadoop八天完全攻克Hadoop视频教程 Hadoop开发

    05-mr程序的本地运行模式.avi 06-job提交的逻辑及YARN框架的技术机制.avi 07-MR程序的几种提交运行模式.avi 08-YARN的通用性意义.avi 09-yarn的job提交流程.avi 第四天 常见mr算法实现和shuffle的机制 01-...

    cep-football-demo:此 repo 包含所需的文件,例如配置,用于使用下载的新 CEP 包设置足球演示

    在IT行业中,CEP(Complex Event Processing)是一种高级的数据处理技术,主要用于实时分析和识别具有复杂关系或模式的事件流。在这个"cep-football-demo"项目中,我们可以推测它是一个使用JavaScript实现的CEP演示...

    mycat权威指南

    日志分析部分讲述了如何通过wrapper日志、mycat日志以及debug模式下的sql执行分析来检查和优化Mycat的运行情况,同时也包括了异常日志的处理。 在防火墙配置部分,介绍了如何配置防火墙规则以确保Mycat的安全运行。...

    mycat-权威指南.pdf

    这部分应该会介绍不同方式的全局序列号生成机制,包括本地文件方式、数据库方式、本地时间戳方式、分布式ZKID生成器和zk递增方式等。 ### 第10章 MYCAT分片规则 #### 10.1 分片规则概述 解释了分片规则在Mycat中...

Global site tag (gtag.js) - Google Analytics