`

用实例理解Storm的Stream概念

 
阅读更多

事情源于在看基于Storm的CEP引擎:flowmix
FlowmixBuilder代码,
每个Bolt设置了这么多的Group
而且declareStream也声明了这么多的stream-id,
对于只写过WordCountTopology的小白而言,
直接懵逼了,没见过这么用的啊,我承认一开始是拒绝的,每个Bolt都设置了这么多Group,这TMD拓扑图是什么样的?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public TopologyBuilder create() {
  TopologyBuilder builder = new TopologyBuilder();
  builder.setSpout(EVENT, (IRichSpout) eventsComponent, eventLoaderParallelism == -1 ? parallelismHint : eventLoaderParallelism);
  builder.setSpout(FLOW_LOADER_STREAM, (IRichSpout) flowLoaderSpout, 1);
  builder.setSpout("tick", new TickSpout(1000), 1);
  builder.setBolt(INITIALIZER, new FlowInitializerBolt(), parallelismHint)  // kicks off a flow determining where to start
            .localOrShuffleGrouping(EVENT)
            .allGrouping(FLOW_LOADER_STREAM, FLOW_LOADER_STREAM);

  declarebolt(builder, FILTER, new FilterBolt(), parallelismHint, true);
  declarebolt(builder, SELECT, new SelectorBolt(), parallelismHint, true);
  declarebolt(builder, PARTITION, new PartitionBolt(), parallelismHint, true);
  declarebolt(builder, SWITCH, new SwitchBolt(), parallelismHint, true);
  declarebolt(builder, AGGREGATE, new AggregatorBolt(), parallelismHint, true);
  declarebolt(builder, JOIN, new JoinBolt(), parallelismHint, true);
  declarebolt(builder, EACH, new EachBolt(), parallelismHint, true);
  declarebolt(builder, SORT, new SortBolt(), parallelismHint, true);
  declarebolt(builder, SPLIT, new SplitBolt(), parallelismHint, true);
  declarebolt(builder, OUTPUT, outputBolt, parallelismHint, false);

  return builder;
}
private static void declarebolt(TopologyBuilder builder, String boltName, IRichBolt bolt, int parallelism, boolean control) {
    BoltDeclarer declarer = builder.setBolt(boltName, bolt, parallelism)
        .allGrouping(FLOW_LOADER_STREAM, FLOW_LOADER_STREAM)
        .allGrouping("tick", "tick")
        .localOrShuffleGrouping(INITIALIZER, boltName)
        .localOrShuffleGrouping(FILTER, boltName)
        .fieldsGrouping(PARTITION, boltName, new Fields(FLOW_ID, PARTITION))    // guaranteed partitions will always group the same flow for flows that have joins with default partitions.
        .localOrShuffleGrouping(AGGREGATE, boltName)
        .localOrShuffleGrouping(SELECT, boltName)
        .localOrShuffleGrouping(EACH, boltName)
        .localOrShuffleGrouping(SORT, boltName)
        .localOrShuffleGrouping(SWITCH, boltName)
        .localOrShuffleGrouping(SPLIT, boltName)
        .localOrShuffleGrouping(JOIN, boltName);
  }
public static void declareOutputStreams(OutputFieldsDeclarer declarer, Fields fields) {
    declarer.declareStream(PARTITION, fields);
    declarer.declareStream(FILTER, fields);
    declarer.declareStream(SELECT, fields);
    declarer.declareStream(AGGREGATE, fields);
    declarer.declareStream(SWITCH, fields);
    declarer.declareStream(SORT, fields);
    declarer.declareStream(JOIN, fields);
    declarer.declareStream(SPLIT, fields);
    declarer.declareStream(EACH, fields);
    declarer.declareStream(OUTPUT, fields);
}

先来复习下经典的WordCountTopology

WordCountTopology Default Stream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
public class WordCountTopologySimple {

    public static class RandomSentenceSpout extends BaseRichSpout {
        SpoutOutputCollector collector;
        Random rand;
        String[] sentences = null;

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
            rand = new Random();
            sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
        }

        @Override
        public void nextTuple() {
            Utils.sleep(1000);
            String sentence = sentences[rand.nextInt(sentences.length)];
            System.out.println("\n" + sentence);
            this.collector.emit(new Values(sentence));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("sentence"));
        }
        public void ack(Object id) {}
        public void fail(Object id) {}
    }

    public static class SplitSentenceBolt extends BaseRichBolt {
        private OutputCollector collector;

        @Override
        public void prepare(Map config, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }

        @Override
        public void execute(Tuple tuple) {
            String sentence = tuple.getStringByField("sentence");
            String[] words = sentence.split(" ");
            for (String word : words) {
                this.collector.emit(new Values(word));
            }
            this.collector.ack(tuple);
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    }

    public static class WordCountBolt extends BaseBasicBolt {
        Map<String, Integer> counts = new HashMap<String, Integer>();
        private OutputCollector collector;

        @Override
        public void prepare(Map config, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String word = tuple.getString(0);
            Integer count = counts.get(word);
            if (count == null) count = 0;
            count++;
            counts.put(word, count);
            collector.emit(new Values(word, count));
        }
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));
        }
    }

    public static class PrinterBolt extends BaseBasicBolt {
        private OutputCollector collector;

        @Override
        public void prepare(Map config, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String first = tuple.getString(0);
            int second = tuple.getInteger(1);
            System.out.println(first + "," + second);
        }
        @Override
        public void declareOutputFields(OutputFieldsDeclarer ofd) {}
    }

    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new RandomSentenceSpout(), 1);

        builder.setBolt("split", new SplitSentenceBolt(), 2).shuffleGrouping("spout");
        builder.setBolt("count", new WordCountBolt(), 2).fieldsGrouping("split", new Fields("word"));
        builder.setBolt("print", new PrinterBolt(), 1).shuffleGrouping("count");

        Config conf = new Config();
        conf.setDebug(false);
        if (args != null && args.length > 0) {
            conf.setNumWorkers(3);
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
        } else {
            conf.setMaxTaskParallelism(3);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", conf, builder.createTopology());
            Thread.sleep(10000);
            cluster.shutdown();
        }
    }
}

SingleStream

默认情况下:Spout发送到下游Bolt的stream-id,以及Bolt发送到下游Bolt或者接收上游Spout/Bolt的stream-id都是default

可以对Spout/Bolt在发送消息时自定义stream-id,同时必须在声明输出字段时,指定对应的stream-id。

代码说明:发射时指定一个stream-id,声明流时指定一个stream-id,topology设置Bolt时除了通过Group的component-id,还会指定上游组件的stream-id

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class RandomSentenceSpout {
    public void nextTuple() {
        Utils.sleep(1000);
        String sentence = sentences[rand.nextInt(sentences.length)];
        System.out.println("\n" + sentence);
        this.collector.emit("split-stream", new Values(sentence));
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream("split-stream", new Fields("sentence"));
    }
}
class SplitSentenceBolt {
    public void execute(Tuple tuple) {
        String sentence = tuple.getStringByField("sentence");
        String[] words = sentence.split(" ");
        for (String word : words) {
            this.collector.emit("count-stream", new Values(word));
        }
        this.collector.ack(tuple);
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream("count-stream", new Fields("word"));
    }
}
class WordCountBolt {
    public void execute(Tuple tuple) {
        String word = tuple.getString(0);
        Integer count = counts.get(word);
        if (count == null) count = 0;
        count++;
        counts.put(word, count);
        collector.emit("print-stream", new Values(word, count));
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream("print-stream", new Fields("word", "count"));
    }
}
class Topology {
    main(){
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new RandomSentenceSpout(), 1);

        builder.setBolt("split", new SplitSentenceBolt(), 2).shuffleGrouping("spout", "split-stream");
        builder.setBolt("count", new WordCountBolt(), 2).fieldsGrouping("split", "count-stream", new Fields("word"));
        builder.setBolt("print", new PrinterBolt(), 1).shuffleGrouping("count", "print-stream");        
    }
}

使用自定义stream-id,主要分成两个步骤:

下图示例细说明了拓扑图中各个组件是怎么协调工作的:

MultiStream

Spout/Bolt发射时可以指定多个stream-id,同样要在声明输出字段时指定所有在发射过程指定的stream-id。
虽然每条消息的输出消息流并不一定会用到所有的stream,比如下面示例中一条消息发射到stream1和stream3,
另外一条消息发射到stream2和stream3,stream1和stream2是互斥的,不可能同时发送到这两个stream。
但是可以看到在declareStream中,要同时指定所有的stream-id。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void execute(Tuple input) {
    String word = input.getString(0);
    //小于j的word发送给stream1; 大于j的word发送给stream2;
    if(word.compareTo("j") < 0){
        collector.emit("stream1", new Values(word));
    }else if(word.compareTo("j") > 0){
        collector.emit("stream2", new Values(word));
    }
    //不管什么都发送给stream3
    collector.emit("stream3", new Values(word));
}
public void declareOutputFields(final OutputFieldsDeclarer outputFieldsDeclarer) {
    outputFieldsDeclarer.declareStream("stream1", new Fields("word"));
    outputFieldsDeclarer.declareStream("stream2", new Fields("word"));
    outputFieldsDeclarer.declareStream("stream3", new Fields("word"));
}

程序员都喜欢流程图,喏,下图左上角第一个就是了,右上角是对应到Storm中的Topology,下面两图示例了两条消息在Storm的消息流的走向。

仿照上面的示例,对WordCountTopology的Spout/Bolt的发射方法都指定一个输出的stream-id,
同时在declareOutputFields声明多个输出的stream-id。

现在虽然Spout/Bolt声明了多个输出stream-id,但是emit时还是只发射到一个stream-id中。
所以本质上和前面的SingleStream是一样的,所以Topology不需要做任何改动也还是可以运行的。

代码说明:发射时指定一个stream-id,声明流时指定多个stream-id,topology设置Bolt时除了通过Group的component-id,还会指定上游组件的stream-id
emit不变,topology不变

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class RandomSentenceSpout {
    public void nextTuple() {
        this.collector.emit("split-stream", new Values(sentence));              //⬅            
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream("split-stream", new Fields("sentence"));         //⬅
        declarer.declareStream("count-stream", new Fields("sentence"));
        declarer.declareStream("print-stream", new Fields("sentence"));
    }
}
class SplitSentenceBolt {
    public void execute(Tuple tuple) {
        for (String word : words) {
            this.collector.emit("count-stream", new Values(word));              //⬅
        }
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream("split-stream", new Fields("word"));
        declarer.declareStream("count-stream", new Fields("word"));             //⬅  
        declarer.declareStream("print-stream", new Fields("word"));
    }
}
class WordCountBolt {
    public void execute(Tuple tuple) {
        collector.emit("print-stream", new Values(word, count));                //⬅
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream("split-stream", new Fields("word", "count"));
        declarer.declareStream("count-stream", new Fields("word", "count")); 
        declarer.declareStream("print-stream", new Fields("word", "count"));    //⬅
    }
}
class Topology {
    main(){
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new RandomSentenceSpout(), 1);

        builder.setBolt("split", new SplitSentenceBolt(), 2).shuffleGrouping("spout", "split-stream");
        builder.setBolt("count", new WordCountBolt(), 2).fieldsGrouping("split", "count-stream", new Fields("word"));
        builder.setBolt("print", new PrinterBolt(), 1).shuffleGrouping("count", "print-stream");        
    }
}

那么我们为什么还要在Spout/Bolt中定义多个输出流呢?观察这部分代码,stream-id都是一样的,不同的是Fields部分,
如果将每个Spout/Bolt的多个declarer.declareStream抽取出来:

1
2
3
4
5
6
public static void declareStream(OutputFieldsDeclarer declarer, 
        Fields fields){
    declarer.declareStream("split-stream", fields);
    declarer.declareStream("count-stream", fields);
    declarer.declareStream("print-stream", fields);
}

然后在Spout/Bolt的declareOutputFields调用declareStream方法一次声明所有的stream-id,只需要传递不同的Fields即可。

代码说明:声明多个stream时,每个组件的所有stream-id都一样,传入不同的Fields
emit不变,topology不变

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class RandomSentenceSpout {
    public void nextTuple() {
        this.collector.emit("split-stream", new Values(sentence));              //⬅            
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
         declareStream(declarer, new Fields("sentence"));
    }
}
class SplitSentenceBolt {
    public void execute(Tuple tuple) {
        for (String word : words) {
            this.collector.emit("count-stream", new Values(word));              //⬅
        }
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
         declareStream(declarer, new Fields("word"));
    }
}
class WordCountBolt {
    public void execute(Tuple tuple) {
        collector.emit("print-stream", new Values(word, count));                //⬅
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declareStream(declarer, new Fields("word", "count"));
    }
}
class Topology {
    main(){
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new RandomSentenceSpout(), 1);

        builder.setBolt("split", new SplitSentenceBolt(), 2)
            .shuffleGrouping("spout", "split-stream");
        builder.setBolt("count", new WordCountBolt(), 2)
            .fieldsGrouping("split", "count-stream", new Fields("word"));
        builder.setBolt("print", new PrinterBolt(), 1)
            .shuffleGrouping("count", "print-stream");        
    }
}

这样的好处是,如果事先知道所有的stream-id,只需要定义好declareStream,每个bolt都调用这个全局的方法即可。
实际上这种方式对于构建动态拓扑图是很有用的。

MultiGroup

通过把所有stream-id封装到一个方法中,而emit时只指定一个stream-id。
现在每个组件emit时只指定了一个stream-id,声明输出流时都指定了相同的stream-id集合。
也就是说Spout/Bolt中虽然声明了多个stream-id,但是一条消息只会选择一个stream-id。

那么可不可以对Group方式运用同样的方式呢,我们的目的是想要把setBolt这种逻辑也抽取出一个共同的方法。
下面这种方式肯定是不对的,首先无法抽取,因为每个Bolt的Group分组策略不同。

虽然是错误的,但是我们并没有对首尾组件用多个Group,这是为什么呢?
1.Spout没有所谓的分组,因为Spout就是源头,分组时指定component指的是当前component的数据源自这个指定的component
2.最后一个Bolt我们先不设置,这里有坑…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
main(){
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("spout", new RandomSentenceSpout(), 1);

    builder.setBolt("split", new SplitSentenceBolt(), 2)
            .shuffleGrouping("spout", "split-stream")                      //⬅
            .shuffleGrouping("split", "split-stream")
            .shuffleGrouping("count", "split-stream")
    ;
    builder.setBolt("count", new WordCountBolt(), 2)
            .fieldsGrouping("spout", "count-stream", new Fields("word"))
            .fieldsGrouping("split", "count-stream", new Fields("word"))   //⬅
            .fieldsGrouping("count", "count-stream", new Fields("word"))
    ;
    builder.setBolt("print", new PrinterBolt(), 1)
            .shuffleGrouping("count", "print-stream");
}

而且也无法构建拓扑图,比如WordCountBolt的输入component=”spout”时,
在拓扑图中这个组件是RandomSentenceSpout,它的输出字段名称为”sentence”,根本就没有word这个字段。
下面的错误也证实了这一点:Component: [count] subscribes from stream: [count-stream] of component [spout] with non-existent fields: #{"word"})
count这个组件(即WordCountBolt)订阅了spout组件(即RandomSentenceSpout)的count-stream输出流,但是spout组件并不存在word字段。

1
2
3
4
5
6972 [main] WARN  backtype.storm.daemon.nimbus - Topology submission exception. (topology name='word-count') #
<InvalidTopologyException InvalidTopologyException(msg:
    Component: [count] subscribes from stream: [count-stream] of component [spout] with non-existent fields: #{"word"})>
7002 [main] ERROR org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Thread Thread[main,5,main] died
backtype.storm.generated.InvalidTopologyException: null

正确使用多个stream-id的姿势:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
main(){
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("spout", new RandomSentenceSpout(), 1);

    builder.setBolt("split", new SplitSentenceBolt(), 2)
            .shuffleGrouping("spout", "split-stream")                      //⬅
            .fieldsGrouping("split", "split-stream", new Fields("word"))
            .shuffleGrouping("count", "split-stream")
    ;
    builder.setBolt("count", new WordCountBolt(), 2)
            .shuffleGrouping("spout", "count-stream")
            .fieldsGrouping("split", "count-stream", new Fields("word"))   //⬅
            .shuffleGrouping("count", "count-stream")
    ;
    builder.setBolt("print", new PrinterBolt(), 1)
            .shuffleGrouping("count", "print-stream");
}

现在每个Bolt的Group方式都是一样的了,并且component-id也是一样的,只有最后的stream-id不同。
很好,可以像抽取declareStream那样抽取setBolt了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
main(){
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("spout",new RandomSentenceSpout(),1);

    setBolt(builder, new SplitSentenceBolt(), "split");
    setBolt(builder, new WordCountBolt(), "count");
    builder.setBolt("print", new PrinterBolt(), 1)
        .shuffleGrouping("count", "print-stream");
}
public static void setBolt(TopologyBuilder builder,IRichBolt bolt,String name){
    builder.setBolt(name, bolt, 2)
            .shuffleGrouping("spout", name + "-stream")
            .fieldsGrouping("split", name + "-stream", new Fields("word"))
            .shuffleGrouping("count", name + "-stream")
    ;
}

每个Bolt都设置了多种分组策略,而分组的第一个参数component表示数据源自哪里,
现在SplitSentenceBolt和WordCountBolt都定义了三种分组策略,
那么是不是说[split]的数据源有:[spout],[split],[count],
同样[count]的数据源也有:[spout],[split],[count],这跟实际的Topology结构就完全不一样了。
可以看到下图的拓扑结构比原先的WordCountTopology多了几条线(而且还能自己指向自己我也是醉了)。

不过虽然每个Bolt都有多个输入源,但是输入源组件不一定有指定的stream-id。
比如split的数据源虽然有三个[spout],[split],[count],但是这三个组件中stream-id=”split-stream”的组件
只有[spout],因此即使设置了三个数据源,另外两个数据源是无效的。

同样[count]的数据源虽然也有三个[spout],[split],[count],但是这三个组件中stream-id=”count-stream”的组件也只有[split]才有。

所以最后实际上拓扑图还是最原始的[spout]->[split]->[count]->[print],并不会出现之前出现的多条线以及自己指向自己的情况。

最后一个Bolt

可以把最后一个PrintBolt也都加到每个Bolt的分组策略里吗?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
builder.setBolt("split", new SplitSentenceBolt(), 2)
        .shuffleGrouping("spout", "split-stream")                      //⬅
        .fieldsGrouping("split", "split-stream", new Fields("word"))
        .shuffleGrouping("count", "split-stream")
        .shuffleGrouping("print", "split-stream")
;
builder.setBolt("count", new WordCountBolt(), 2)
        .shuffleGrouping("spout", "count-stream")
        .fieldsGrouping("split", "count-stream", new Fields("word"))   //⬅
        .shuffleGrouping("count", "count-stream")
        .shuffleGrouping("print", "count-stream")
;
builder.setBolt("print", new PrinterBolt(), 1)
        .shuffleGrouping("spout", "print-stream")
        .fieldsGrouping("split", "print-stream", new Fields("word"))
        .shuffleGrouping("count", "print-stream")                      //⬅
        .shuffleGrouping("print", "print-stream")
;

拓扑图是这样的,虚线表示实际上是不存在的(因为输入源本身没有发射到这些stream)。

Opps….报错显示:[count]组件订阅了[print]组件中一个不存在的[count-stream]

1
2
3
4
5
9510 [main] WARN  backtype.storm.daemon.nimbus - Topology submission exception. (topology name='word-count') #
<InvalidTopologyException InvalidTopologyException(msg:Component: 
    [count] subscribes from non-existent stream: [count-stream] of component [print])>
9552 [main] ERROR org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Thread Thread[main,5,main] died
backtype.storm.generated.InvalidTopologyException: null

下面修改不同Bolt中和Print相关的分组方式,只有把Print全部注释掉才可以

  1. 不注释: [count] subscribes from non-existent stream: [count-stream] of component [print]
  2. 注释①: [split] subscribes from non-existent stream: [split-stream] of component [print]
  3. 注释①②: [print] subscribes from non-existent stream: [print-stream] of component [print]
  4. 注释①②③: SUCCESS!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
builder.setBolt("split", new SplitSentenceBolt(), 2)
        .shuffleGrouping("spout", "split-stream")                      //⬅
        .fieldsGrouping("split", "split-stream", new Fields("word"))
        .shuffleGrouping("count", "split-stream")
        //.shuffleGrouping("print", "split-stream")  //②
;
builder.setBolt("count", new WordCountBolt(), 2)
        .shuffleGrouping("spout", "count-stream")
        .fieldsGrouping("split", "count-stream", new Fields("word"))   //⬅
        .shuffleGrouping("count", "count-stream")
        //.shuffleGrouping("print", "count-stream")  //①
;
builder.setBolt("print", new PrinterBolt(), 1)
        .shuffleGrouping("spout", "print-stream")
        .fieldsGrouping("split", "print-stream", new Fields("word"))
        .shuffleGrouping("count", "print-stream")                      //⬅
        //.shuffleGrouping("print", "print-stream")  //③
;

发生了什么事?不存在stream为什么就不行?可是前面以SplitSentenceBolt为例,split和count也不存在split-stream啊,为什么就不会报错呢?
原因在于我们的PrintBolt只是打印数据,然后什么都不做,它没有emit出任何消息,也就没有emit消息到任何消息流,所以下图中从PrintBolt出来的线根本就不存在!

怎么办呢,很简单,给PrintBolt添加一个带有stream-id的emit,同时也要在declareOutputFields中声明这个输出流。
只要PrintBolt有输出流,就不会报错了。也就是确保每个Bolt都会往下发送消息

最终完整的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
public class WordCountTopologyStream3 {

    public static class RandomSentenceSpout extends BaseRichSpout {
        SpoutOutputCollector collector;
        Random rand;
        String[] sentences = null;

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
            rand = new Random();
            sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
        }

        @Override
        public void nextTuple() {
            Utils.sleep(1000);
            String sentence = sentences[rand.nextInt(sentences.length)];
            System.out.println("\n" + sentence);
            this.collector.emit("split-stream", new Values(sentence));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declareStream(declarer, new Fields("sentence"));
        }
        public void ack(Object id) {}
        public void fail(Object id) {}
    }

    public static class SplitSentenceBolt extends BaseRichBolt {
        private OutputCollector collector;

        @Override
        public void prepare(Map config, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }

        @Override
        public void execute(Tuple tuple) {
            String sentence = tuple.getStringByField("sentence");
            String[] words = sentence.split(" ");
            for (String word : words) {
                this.collector.emit("count-stream", new Values(word));
            }
            this.collector.ack(tuple);
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declareStream(declarer, new Fields("word"));
        }
    }

    public static class WordCountBolt extends BaseRichBolt {
        Map<String, Integer> counts = new HashMap<String, Integer>();
        private OutputCollector collector;
        @Override
        public void prepare(Map config, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }

        @Override
        public void execute(Tuple tuple) {
            String word = tuple.getString(0);
            Integer count = counts.get(word);
            if (count == null) count = 0;
            count++;
            counts.put(word, count);
            collector.emit("print-stream", new Values(word, count));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declareStream(declarer, new Fields("word", "count"));
        }
    }

    public static class PrinterBolt extends BaseRichBolt {
        private OutputCollector collector;
        @Override
        public void prepare(Map config, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
        @Override
        public void execute(Tuple tuple) {
            String first = tuple.getString(0);
            int second = tuple.getInteger(1);
            System.out.println(first + "," + second);
            collector.emit("whatever-stream", new Values(first + ":" + second));  //⬅
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declareStream(declarer, new Fields("word:count"));  //⬅
        }
    }

    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new RandomSentenceSpout(), 1);

        setBolt(builder, new SplitSentenceBolt(), "split");
        setBolt(builder, new WordCountBolt(), "count");
        setBolt(builder, new PrinterBolt(), "print");

        Config conf = new Config();
        conf.setDebug(false);
        if (args != null && args.length > 0) {
            conf.setNumWorkers(3);
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
        } else {
            conf.setMaxTaskParallelism(3);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", conf, builder.createTopology());
            Thread.sleep(10000);
            cluster.shutdown();
        }
    }

    public static void declareStream(OutputFieldsDeclarer declarer, Fields fields){
        declarer.declareStream("split-stream", fields);
        declarer.declareStream("count-stream", fields);
        declarer.declareStream("print-stream", fields);
        declarer.declareStream("whatever-stream", fields);      //⬅
    }

    public static void setBolt(TopologyBuilder builder, IRichBolt bolt, String name){
        builder.setBolt(name, bolt, 2)
                .shuffleGrouping("spout", name + "-stream")
                .fieldsGrouping("split", name + "-stream", new Fields("word"))
                .shuffleGrouping("count", name + "-stream")
                .shuffleGrouping("print", name + "-stream")     //⬅
        ;
    }
}

你以为这样就完了吗,如果把PrintBolt的输出stream-id去掉,即采用默认的default的话:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public static class PrinterBolt extends BaseRichBolt {
    @Override
    public void execute(Tuple tuple) {
        String first = tuple.getString(0);
        int second = tuple.getInteger(1);
        System.out.println(first + "," + second);
        //collector.emit("whatever-stream", new Values(first + ":" + second));
        collector.emit(new Values(first + ":" + second));
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        //declareStream(declarer, new Fields("word:count"));
        declarer.declare(new Fields("word:count"));
    }
}

public static void declareStream(OutputFieldsDeclarer declarer, Fields fields){
    declarer.declareStream("split-stream", fields);
    declarer.declareStream("count-stream", fields);
    declarer.declareStream("print-stream", fields);
    //declarer.declareStream("whatever-stream", fields);      //⬅
}

public static void setBolt(TopologyBuilder builder, IRichBolt bolt, String name){
    builder.setBolt(name, bolt, 2)
            .shuffleGrouping("spout", name + "-stream")
            .fieldsGrouping("split", name + "-stream", new Fields("word"))
            .shuffleGrouping("count", name + "-stream")
            .shuffleGrouping("print", name + "-stream")
    ;
}

还是报错:[count]组件订阅了[print]组件中不存在的[count-stream]

1
Component: [count] subscribes from non-existent stream: [count-stream] of component [print]

好吧,看来前面的组件都使用自定义的stream-id,最后一个组件也必须使用自定义的stream-id,即使这个stream-id看起来没什么意义!

EOF.

分享到:
评论

相关推荐

    Storm的WordCount实例

    总结,通过这个“Storm的WordCount实例”,我们可以了解到如何使用Apache Storm来处理实时数据流,并实现一个简单但功能强大的计数系统。这不仅展示了Storm的核心概念,也为我们提供了实践分布式实时计算的一个起点...

    Storm编程实例

    在这个“Storm编程实例”中,我们将深入理解如何利用Maven构建Storm项目,以及Storm的核心概念和运行流程。 **1. Maven集成** Maven是Java开发中的一个项目管理工具,用于构建、管理和部署项目。在Storm编程中,...

    apache-storm-2.4.0.tar.gz

    在使用 Storm 进行实时数据处理时,开发者需要理解以下几个核心概念: - **拓扑(Topology)**:定义了数据流的处理逻辑,由 Bolt 和 Spout 组成。 - **Spout**:数据源,负责读取和分发数据。 - **Bolt**:数据处理...

    Storm 源码分析

    Storm的基本概念包括Topology、Spout、Bolt和Stream。其中,Topology是一个计算任务的定义,由一组Spout和Bolt组成;Spout作为数据源,负责发送数据;Bolt作为数据处理组件,可以接收多个Spout或其他Bolt的数据并...

    Storm笔记-PPT

    **Storm笔记概述** Storm是一个分布式实时计算系统,由Twitter开源并广泛应用于...通过对Storm的学习,我们可以理解实时数据处理的核心原理,掌握如何构建和优化实时数据处理系统,为企业提供更敏捷、高效的决策支持。

    storm开发设计规范

    Storm被广泛应用于实时数据处理,如中行在监控APP使用频率和投资理财活动中的应用。 1. **计算拓扑(Topologies)**: 计算拓扑是Storm的核心概念,它定义了数据流的处理逻辑和组件间的连接关系。一个Topology由多...

    stormdemo.zip

    本资料“stormdemo.zip”提供了一个关于Storm的实战示例,名为“stormdemo”,旨在帮助用户深入理解并掌握Storm的核心概念和操作流程。 Apache Storm是一个开源的分布式实时计算系统,它允许开发者连续处理数据流,...

    Storm Real-time Processing Cookbook实例代码

    The input stream of a Storm cluster is handled by a component called a spout. The spout passes the data to a component called a bolt, which transforms it in some way. A bolt either persists the data ...

    Apache Storm-0.8.1 api文档 (html)

    在 Apache Storm 0.8.1 版本中,API 文档提供了对这些关键概念的深入理解。以下是一些主要知识点: 1. **拓扑(Topology)**:拓扑是 Storm 的核心,定义了数据流如何在系统中传输和处理。它由一系列 bolts 和 ...

    storm程序代码示例

    Apache Storm的核心概念包括:拓扑(Topology)、工作者(Worker)、节点(Spout)和 bolt(Bolt)。拓扑是 Storm 应用的基本结构,由多个节点和bolt组成,它们之间通过流(Stream)进行连接。节点负责产生数据流(Spout),而...

    storm统计单词数的demo

    【storm统计单词数的demo】是一个基于Apache Storm的入门级示例,旨在帮助初学者理解这个分布式实时计算系统的运作机制。...对于进一步深入学习Storm,以及理解分布式流处理的概念,这是一个非常有价值的起点。

    Test_Storm_0_java_begun6u4_zookeeper_storm_apachestorm_

    标题中的“Test_Storm_0_java_begun6u4_zookeeper_storm_apachestorm_”暗示了我们讨论的主题是关于Apache Storm的入门教程,其中可能涵盖了Java编程、Zookeeper协调服务以及Apache Storm的核心概念。描述中提到的流...

    Storm的文档详解

    ### Storm的文档详解 #### 一、Storm基础概念 **1.1 什么是Storm?** Apache Storm 是一款免费且开源的分布式实时计算系统。...通过对以上核心概念的理解,可以更好地利用 Storm 构建高效稳定的实时数据处理系统。

    apache-storm-1.0.2.tar.gz

    Apache Storm 是一个开源的分布式实时计算系统,它允许开发者处理...在 Linux 环境下,用户需要了解如何解压和部署这个软件包,理解 Storm 的核心组件和工作原理,以及如何构建和管理 Topologies,才能充分利用其潜力。

    ( Storm实时数据处理.zip )PDF 高清版

    通过阅读《Storm实时数据处理》高清PDF,你将能够深入理解Storm的架构,掌握如何设计和部署实时数据处理拓扑,以及如何调试和优化性能。此外,你还将了解到如何与其他大数据技术,如Hadoop、Cassandra等集成,实现...

    Storm中spout和bolt之间发送和接收数据的java源代码实例

    本篇文章将深入探讨Spout和Bolt之间的数据交互,并提供相关的Java源代码实例,帮助你理解这一过程。 首先,让我们了解Spout和Bolt的基本概念: 1. Spout:Spout是Storm中的数据源,它负责生成数据流。Spout可以是...

    从零开始学习storm最新版

    理解这些基础概念是深入学习Storm的前提。 接下来,读者将了解到如何在本地环境搭建Storm开发环境,包括安装Java、JDK和Storm,以及配置相关环境变量。书中还会详细讲解如何使用Storm的本地模式进行开发调试,这对...

    Storm Applied Strategies for real-time event processing.pdf

    - **理解Storm中的并行性**:Storm通过并行执行多个任务实例来提高处理能力。 - **调整拓扑以解决瓶颈**: - 在设计中解决固有的瓶颈。 - 针对数据流本身的瓶颈进行优化。 #### 五、总结 本书《Storm Applied》...

Global site tag (gtag.js) - Google Analytics