作为storm的使用者,有两件事情要做以更好的利用storm的可靠性特征。 首先,在你生成一个新的tuple的时候要通知storm; 其次,完成处理一个tuple之后要通知storm。 这样storm就可以检测整个tuple树有没有完成处理,并且通知源spout处理结果。storm提供了一些简洁的api来做这些事情。
由一个tuple产生一个新的tuple称为: anchoring。你发射一个新tuple的同时也就完成了一次anchoring。看下面这个例子: 这个bolt把一个包含一个句子的tuple分割成每个单词一个tuple。
public class SplitSentence implements IRichBolt { Output Collector _collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } public void execute(Tuple tuple) { String sentence = tuple.getString(0); for(String word: sentence.split(" ")) { _collector.emit(tuple,newValues(word)); } _collector.ack(tuple); } public void cleanup() { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(newFields("word")); } }
我们通过anchoring来构造这个tuple树,最后一件要做的事情是在你处理完当个tuple的时候告诉storm, 通过OutputCollector类的ack和fail方法来做,如果你回过头来看看SplitSentence的例子, 你可以看到“句子tuple”在所有“单词tuple”被发出之后调用了ack。
你可以调用OutputCollector 的fail方法去立即将从消息源头发出的那个tuple标记为fail, 比如你查询了数据库,发现一个错误,你可以马上fail那个输入tuple, 这样可以让这个tuple被快速的重新处理, 因为你不需要等那个timeout时间来让它自动fail。
每个你处理的tuple, 必须被ack或者fail。因为storm追踪每个tuple要占用内存。所以如果你不ack/fail每一个tuple, 那么最终你会看到OutOfMemory错误。
大多数Bolt遵循这样的规律:读取一个tuple;发射一些新的tuple;在execute的结束的时候ack这个tuple。这些Bolt往往是一些过滤器或者简单函数。Storm为这类规律封装了一个BasicBolt类。如果用BasicBolt来做, 上面那个SplitSentence可以改写成这样:
public class SplitSentence implements IBasicBolt { public void prepare(Map conf, TopologyContext context) { } public void execute(Tuple tuple, BasicOutputCollector collector) { String sentence = tuple.getString(0); for(String word: sentence.split(" ")) { collector.emit(newValues(word)); } } public void cleanup() { } public void declareOutputFields( OutputFieldsDeclarer declarer) { declarer.declare(newFields("word")); } }
这个实现比之前的实现简单多了, 但是功能上是一样的。
发送到BasicOutputCollector的tuple会自动和输入tuple相关联,而在execute方法结束的时候那个输入tuple会被自动ack的。
我们编写的时候使用IBasicBolt最方便了。或者 extends BaseBasicBolt类
相关推荐
15. **BaseBasicBolt** 和 **IRichBolt**: 这些类和接口定义了_bolt_的行为,`BaseBasicBolt` 提供了简单的一次处理模型,而 `IRichBolt` 支持更复杂的生命周期方法。 16. **CoordinatedBolt**: 这是可靠的_bolt_...
- **Bolt**:实现`IBasicBolt`或`IRichBolt`接口,定义`execute()`方法来处理数据。 ##### 2. 定义Topology 通过创建Topology对象,并添加Spouts和Bolts,定义数据流向。例如: ```java TopologyBuilder builder ...
在src目录中,我们可以找到定义这些组件的Java类,它们通过`IRichSpout`和`IRichBolt`接口实现。 为了运行这个storm例子,用户需要先解压文件,然后使用Maven或者类似的构建工具编译源代码,最后使用Storm CLI或...
- **` topology-api`**: 提供了编写拓扑的高级API,如`IRichSpout`和`IRichBolt`接口,以及`StreamGroupings`用于定义数据流的分发策略。 - **` storm-core`**: 核心库,包含了 Storm 的基本运行时组件,如`...
Bolt通过实现`IRichBolt`接口来创建。Bolts可以连接到多个Spouts,并且可以与其他Bolts形成拓扑结构。 3. Topology:Topology是Spout和Bolt的组织方式,定义了数据流如何在它们之间传输。在Storm中,你可以通过`...
- **IRichBolt** 和 **IRichSpout**: 这些接口定义了 Bolt 和 Spout 的行为,例如 open()、execute() 和 close() 方法,提供了与 Storm 运行时环境交互的入口。 - **Fields**: 用于定义数据字段的类,用于描述 ...
2. **IRichSpout** 和 **IRichBolt**:是用户自定义Spout和Bolt的接口,包含了打开、关闭、nextTuple、acknowledge和fail等方法。 3. **Fields**:定义了数据流的字段名,用于定义Tuple的结构。 4. **...
`backtype.storm.task`包包含了Bolt的接口和实现,如`IRichBolt`是Bolt的基本接口,而`FilterBolt`、`CountBolt`等则是具体的操作实现。 3. Topology:Topology是Storm处理数据的核心逻辑,由多个Spout和Bolt组成,...
JStorm 2.1.1还支持`IRichBolt`,增加了初始化和关闭方法,使得Bolt可以进行更复杂的初始化设置和资源清理。 4. **Grouping策略** 数据在Bolt之间传输时,需要指定Grouping策略,如`FieldsGrouping`(按字段分组)...
Bolt 执行实际的处理逻辑,通过实现IRichBolt接口创建。Bolt有prepare()、execute()、cleanup()等方法。execute()方法接收来自Spout的tuple,进行处理并可能发射新的tuple到下游。Bolt还支持ack和fail机制,以确保...
2. **创建拓扑**:在Java中,你可以通过实现`IRichBolt`和`IRichSpout`接口来定义自定义的螺栓和喷口。然后,使用`TopologyBuilder`来构建拓扑,将螺栓和喷口连接起来,定义数据流的流向。 3. **配置与提交**:配置...
- Java 中的 `IRichSpout` 和 `IRichBolt` 接口是创建 spouts 和 bolts 的基础。你需要实现这些接口来定义数据处理逻辑。 4. **数据流与并行度**: - 在 Storm 中,数据流通过多个组件以拓扑的形式流动。并行度...
1. **创建Spout和Bolt**:在Java中,你可以继承IRichSpout或IRichBolt接口,并实现其方法来定义自己的Spout和Bolt。 2. **定义拓扑**:使用Java API创建TopologyBuilder对象,添加Spouts和Bolts,并指定它们之间的...
IRichBolt接口的execute方法是关键。 4. **拓扑定义**:了解如何使用Java API定义和配置拓扑,包括设置并行度、流分组等。 5. **Maven配置**:学习如何在Maven的pom.xml文件中配置Storm相关的依赖,如storm-core、...
Spout和Bolt可以通过继承`IRichSpout`和`IRichBolt`接口,并实现其方法来创建。Java的面向对象特性使得我们能清晰地组织匹配逻辑,并方便地与其他Java代码集成。 五、性能优化与扩展 为了提高匹配效率,可以考虑...
- **JStorm API**: 学习如何使用 Java 类和接口来定义 Spout 和 Bolt,包括 `IRichSpout` 和 `IRichBolt` 接口。 - **配置 Topology**: 设置拓扑的配置选项,例如设置并行度(每种组件的实例数量)。 - **提交 ...
Bolt 需要实现 `IRichBolt` 接口,包括 `execute(Tuple input)` 方法,用于处理到来的数据。例如,`SentenceSplitterBolt` 可能将完整的句子拆分成单个单词,`WordCounterBolt` 则统计每个单词出现的次数。 4. **...
Spout通常会实现`IRichSpout`接口,而Bolt则实现`IRichBolt`接口。开发过程中,可以通过`emit()`方法发送tuples,通过`execute(Tuple input)`方法处理传入的tuples。 **实时处理的优势** 实时处理能够快速响应数据...