`
liyonghui160com
  • 浏览: 777188 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

IRichBolt和IBasicBolt对比

阅读更多

 


 作为storm的使用者,有两件事情要做以更好的利用storm的可靠性特征。 首先,在你生成一个新的tuple的时候要通知storm; 其次,完成处理一个tuple之后要通知storm。 这样storm就可以检测整个tuple树有没有完成处理,并且通知源spout处理结果。storm提供了一些简洁的api来做这些事情。

由一个tuple产生一个新的tuple称为: anchoring。你发射一个新tuple的同时也就完成了一次anchoring。看下面这个例子: 这个bolt把一个包含一个句子的tuple分割成每个单词一个tuple。

   

public class SplitSentence implements IRichBolt { 
            Output Collector _collector; 
      
            public void prepare(Map conf, 
                                TopologyContext context, 
                                OutputCollector collector) { 
                _collector = collector; 
            } 
      
            public void execute(Tuple tuple) { 
                String sentence = tuple.getString(0); 
                for(String word: sentence.split(" ")) { 
                    _collector.emit(tuple,newValues(word)); 
                } 
                _collector.ack(tuple); 
            } 
      
            public void cleanup() { 
            } 
      
            public void declareOutputFields(OutputFieldsDeclarer declarer) { 
                declarer.declare(newFields("word")); 
            } 
        }  

 

 我们通过anchoring来构造这个tuple树,最后一件要做的事情是在你处理完当个tuple的时候告诉storm,  通过OutputCollector类的ack和fail方法来做,如果你回过头来看看SplitSentence的例子, 你可以看到“句子tuple”在所有“单词tuple”被发出之后调用了ack。

你可以调用OutputCollector 的fail方法去立即将从消息源头发出的那个tuple标记为fail, 比如你查询了数据库,发现一个错误,你可以马上fail那个输入tuple, 这样可以让这个tuple被快速的重新处理, 因为你不需要等那个timeout时间来让它自动fail。

每个你处理的tuple, 必须被ack或者fail。因为storm追踪每个tuple要占用内存。所以如果你不ack/fail每一个tuple, 那么最终你会看到OutOfMemory错误。

大多数Bolt遵循这样的规律:读取一个tuple;发射一些新的tuple;在execute的结束的时候ack这个tuple。这些Bolt往往是一些过滤器或者简单函数。Storm为这类规律封装了一个BasicBolt类。如果用BasicBolt来做, 上面那个SplitSentence可以改写成这样:

   

public class SplitSentence implements IBasicBolt { 
            public void prepare(Map conf, 
                                TopologyContext context) { 
            } 
      
            public void execute(Tuple tuple, 
                                BasicOutputCollector collector) { 
                String sentence = tuple.getString(0); 
                for(String word: sentence.split(" ")) { 
                    collector.emit(newValues(word)); 
                } 
            } 
      
            public void cleanup() { 
            } 
      
            public void declareOutputFields( 
                            OutputFieldsDeclarer declarer) { 
                declarer.declare(newFields("word")); 
            } 
        }  

 
这个实现比之前的实现简单多了, 但是功能上是一样的。

发送到BasicOutputCollector的tuple会自动和输入tuple相关联,而在execute方法结束的时候那个输入tuple会被自动ack的。
我们编写的时候使用IBasicBolt最方便了。或者 extends BaseBasicBolt类

分享到:
评论

相关推荐

    Storm中涉及到的类

    15. **BaseBasicBolt** 和 **IRichBolt**: 这些类和接口定义了_bolt_的行为,`BaseBasicBolt` 提供了简单的一次处理模型,而 `IRichBolt` 支持更复杂的生命周期方法。 16. **CoordinatedBolt**: 这是可靠的_bolt_...

    Storm实战构建大数据实时计算

    - **Bolt**:实现`IBasicBolt`或`IRichBolt`接口,定义`execute()`方法来处理数据。 ##### 2. 定义Topology 通过创建Topology对象,并添加Spouts和Bolts,定义数据流向。例如: ```java TopologyBuilder builder ...

    简单的storm例子.rar

    在src目录中,我们可以找到定义这些组件的Java类,它们通过`IRichSpout`和`IRichBolt`接口实现。 为了运行这个storm例子,用户需要先解压文件,然后使用Maven或者类似的构建工具编译源代码,最后使用Storm CLI或...

    java-sdk-storm

    - **` topology-api`**: 提供了编写拓扑的高级API,如`IRichSpout`和`IRichBolt`接口,以及`StreamGroupings`用于定义数据流的分发策略。 - **` storm-core`**: 核心库,包含了 Storm 的基本运行时组件,如`...

    Storm中spout和bolt之间发送和接收数据的java源代码实例

    Bolt通过实现`IRichBolt`接口来创建。Bolts可以连接到多个Spouts,并且可以与其他Bolts形成拓扑结构。 3. Topology:Topology是Spout和Bolt的组织方式,定义了数据流如何在它们之间传输。在Storm中,你可以通过`...

    storm0.9-源码包

    - **IRichBolt** 和 **IRichSpout**: 这些接口定义了 Bolt 和 Spout 的行为,例如 open()、execute() 和 close() 方法,提供了与 Storm 运行时环境交互的入口。 - **Fields**: 用于定义数据字段的类,用于描述 ...

    Apache Storm-0.8.1 API 参考文档 ( Html版 )

    2. **IRichSpout** 和 **IRichBolt**:是用户自定义Spout和Bolt的接口,包含了打开、关闭、nextTuple、acknowledge和fail等方法。 3. **Fields**:定义了数据流的字段名,用于定义Tuple的结构。 4. **...

    apache-storm-0.9.5源码

    `backtype.storm.task`包包含了Bolt的接口和实现,如`IRichBolt`是Bolt的基本接口,而`FilterBolt`、`CountBolt`等则是具体的操作实现。 3. Topology:Topology是Storm处理数据的核心逻辑,由多个Spout和Bolt组成,...

    JStorm 2.1.1 API

    JStorm 2.1.1还支持`IRichBolt`,增加了初始化和关闭方法,使得Bolt可以进行更复杂的初始化设置和资源清理。 4. **Grouping策略** 数据在Bolt之间传输时,需要指定Grouping策略,如`FieldsGrouping`(按字段分组)...

    Apache Storm-0.9.1 API 参考文档

    Bolt 执行实际的处理逻辑,通过实现IRichBolt接口创建。Bolt有prepare()、execute()、cleanup()等方法。execute()方法接收来自Spout的tuple,进行处理并可能发射新的tuple到下游。Bolt还支持ack和fail机制,以确保...

    storm_simple_example

    2. **创建拓扑**:在Java中,你可以通过实现`IRichBolt`和`IRichSpout`接口来定义自定义的螺栓和喷口。然后,使用`TopologyBuilder`来构建拓扑,将螺栓和喷口连接起来,定义数据流的流向。 3. **配置与提交**:配置...

    real-time:udacity Apache Storm 课程的最终项目

    - Java 中的 `IRichSpout` 和 `IRichBolt` 接口是创建 spouts 和 bolts 的基础。你需要实现这些接口来定义数据处理逻辑。 4. **数据流与并行度**: - 在 Storm 中,数据流通过多个组件以拓扑的形式流动。并行度...

    apache-storm:学习阿帕奇风暴

    1. **创建Spout和Bolt**:在Java中,你可以继承IRichSpout或IRichBolt接口,并实现其方法来定义自己的Spout和Bolt。 2. **定义拓扑**:使用Java API创建TopologyBuilder对象,添加Spouts和Bolts,并指定它们之间的...

    storm-samples

    IRichBolt接口的execute方法是关键。 4. **拓扑定义**:了解如何使用Java API定义和配置拓扑,包括设置并行度、流分组等。 5. **Maven配置**:学习如何在Maven的pom.xml文件中配置Storm相关的依赖,如storm-core、...

    MatchAlgorithmUsingStorm:使用Storm在群集环境中实现子发布匹配算法

    Spout和Bolt可以通过继承`IRichSpout`和`IRichBolt`接口,并实现其方法来创建。Java的面向对象特性使得我们能清晰地组织匹配逻辑,并方便地与其他Java代码集成。 五、性能优化与扩展 为了提高匹配效率,可以考虑...

    udacity-storm:使用 Apache Storm 课程的 Udacity 实时分析代码

    - **JStorm API**: 学习如何使用 Java 类和接口来定义 Spout 和 Bolt,包括 `IRichSpout` 和 `IRichBolt` 接口。 - **配置 Topology**: 设置拓扑的配置选项,例如设置并行度(每种组件的实例数量)。 - **提交 ...

    storm-demo:风暴演示应用程序

    Bolt 需要实现 `IRichBolt` 接口,包括 `execute(Tuple input)` 方法,用于处理到来的数据。例如,`SentenceSplitterBolt` 可能将完整的句子拆分成单个单词,`WordCounterBolt` 则统计每个单词出现的次数。 4. **...

    Storm:内容改编自https

    Spout通常会实现`IRichSpout`接口,而Bolt则实现`IRichBolt`接口。开发过程中,可以通过`emit()`方法发送tuples,通过`execute(Tuple input)`方法处理传入的tuples。 **实时处理的优势** 实时处理能够快速响应数据...

Global site tag (gtag.js) - Google Analytics