为了保证数据能正确的被处理, 对于spout产生的每一个tuple, storm都会进行跟踪, 这里面涉及到ack/fail的处理, 如果一个tuple处理成功, 会调用spout的ack方法, 如果失败, 会调用fail方法. 而在处理tuple的每一个bolt都会通过OutputCollector来告知storm, 当前bolt处理是否成功. 为了了解OutputCollector的ack/fail与Spout的ack/fail之间的关系, 我调试跟踪了一下storm代码.
IBasicBolt 实现类不关心ack/fail, spout的ack/fail完全由后面的bolt的ack/fail来决定. 其execute方法的BasicOutputCollector参数也没有提供ack/fail方法给你调用. 相当于忽略了该bolt的ack/fail行为. 所以IBasicBolt用来做filter或者简单的计算比较合适.
可以参考BasicBoltExecutor代码里面的实现就可以明白了:
public void execute(Tuple input) {
_collector.setContext(input);
try {
_bolt.execute(input, _collector);
_collector.getOutputter().ack(input);
} catch(FailedException e) {
LOG.warn("Failed to process tuple", e);
_collector.getOutputter().fail(input);
}
}
在IRichBolt实现类中, 如果OutputCollector.emit(oldTuple, newTuple)这样调用来发射tuple(在storm中称之为anchoring), 那么后面的bolt的ack/fail会影响spout的ack/fail, 如果collector.emit(newTuple)这样来发射tuple(在storm称之为unanchoring), 则相当于断开了后面bolt的ack/fail对spout的影响.spout将立即根据当前bolt前面的ack/fail的情况来决定调用spout的ack/fail. 所以某个bolt后面的bolt的成功失败对你来说不关心, 你可以直接通过这种方式来忽略
中间的某个bolt fail了, 不会影响后面的bolt执行, 但是会立即触发spout的fail. 相当于短路了, 后面bolt虽然也执行了, 但是ack/fail对spout已经无意义了. 也就是说, 只要bolt集合中的任何一个fail了, 会立即触发spout的fail方法. 而ack方法需要所有的bolt调用为ack才能触发.
另外一点, storm只是通过ack/fail机制来告诉应用方bolt中间的处理情况, 对于成功/失败该如何处理, 必须由应用自己来决定, 因为storm内部也没有保存失败的具体数据, 但是也有办法知道失败记录, 因为spout的ack/fail方法会附带一个msgId对象, 我们可以在最初发射tuple的时候将将msgId设置为tuple, 然后在ack/fail中对该tuple进行处理.
这里有个问题, 就是每个bolt执行完之后要显式的调用ack/fail, 否则会出现tuple不释放导致oom. 不知道storm在最初设计的时候, 为什么不将bolt的ack设置为默认调用
参考文档:
https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing
分享到:
相关推荐
Acker bolt是Storm的一部分,它负责处理ack和fail消息。acker收到ack或fail消息后,会更新状态并决定是否需要重试未完成的tuple。 6. **优化与注意事项** - 虽然Ack机制确保了数据的可靠性,但过多的重试可能会...
深入学习Storm,我们需要理解其核心概念,包括数据模型(topology)、容错机制(ack和fail)、批处理、TOPN算法、流程聚合、DRPC(Direct Remote Procedure Call)以及executor、worker和task的关系及优化,以及如何...
Bolt还支持ack和fail机制,以确保数据处理的可靠性。 **5. Topology提交** 使用SubmitTopology方法将拓扑提交到Storm集群。提交时需要指定拓扑名、配置和构建好的拓扑对象。配置可以包含各种运行时参数,例如nimbus...
Nimbus和Supervisor节点都是无状态(state-less)的,并且设计为快速失败(fail-fast)的,这意味着任何一个节点的故障不会影响整个集群的功能。 在Storm中,Slot是对物理机器资源的划分,通常用一个端口来表示,一...
在这个例子中,Spout可能实现了`IRichSpout`接口,该接口包含`nextTuple()`、`ack()`和`fail()`等方法,用于生成新的元组、确认元组处理成功以及处理元组处理失败的情况。 1. `nextTuple()`:这是Spout的核心方法,...
在标准的Storm拓扑中,处理数据主要依赖于`ack`和`fail`机制。当一个元组被成功处理后,会发送一个`ack`信号,如果处理失败则发送`fail`。然而,当元组需要重发时,可能会导致重复处理,这就引入了不准确性和一致性...
**2.1 ack和fail方法** - **ack方法**:当一个Tuple被成功处理后,会调用此方法告知Spout,以便从缓存中移除该Tuple。 - **fail方法**:如果Tuple处理超时,则会调用此方法,通常会触发重试机制。 **2.2 消息发射...
### 大数据开发高薪就业指导课程:实时计算框架Storm基础理论与案例 #### Storm简单介绍 - **起源与发展**:...同时,对Storm架构体系和编程模型的深入理解,有助于在实际工作中更好地设计和优化实时数据处理流程。
4、ack/fail消息确认机制(确保一个tuple被完全处理)在spout中发射tuple的时候需要同时发送messageid,这样才相当于开启了消息确认机制如果你的topology里面的tuple比较多的话,那么把acker的数量设置多一点,效率会高...
这通常涉及实现IRichSpout接口并处理nextTuple、ack和fail方法。 3. **Bolt组件**:熟悉Bolt的实现,它负责数据的处理、过滤、聚合等操作。IRichBolt接口的execute方法是关键。 4. **拓扑定义**:了解如何使用Java...
在JStorm 2.1.1中,`ISpout`接口定义了Spout的基本行为,包括`nextTuple`(发布新数据)、`ack`(确认消息已被处理)和`fail`(处理失败的消息)。开发者可以自定义实现`ISpout`,创建满足特定数据源需求的Spout。 ...
2. **失败数据重试**:利用Storm提供的ACK/FAIL机制,对失败的数据进行重试处理。 3. **数据积压消解**:一旦出现数据积压情况,可以通过增加计算资源或优化算法来快速消解积压。 4. **BUG快速修复**:一旦发现BUG,...
JStorm是一个分布式实时计算系统,类似于Apache Storm,它允许开发者构建容错、低延迟的实时数据处理应用。在JStorm中,Bolt是处理数据的核心组件,它们负责对Tuples(数据单元)进行各种操作,如计算、过滤、聚合等...
1. "Storm学习":可能是一个教程文档或课程,包含Strom的基础概念和实践案例,适合初学者。 2. "Stormѧϰ":可能是一份进阶资料,如源码分析或高级应用,帮助理解Strom内部机制和优化技巧。 通过深入理解和掌握...
在Storm中,如果一条消息没有被ack,那么Spout将不会继续获取后面的数据。因此,如果一条数据需要等待超时才能失败,那么在该超时周期内,同分区下的所有数据都无法被处理,这会严重影响系统的吞吐量。 从另一个...
当一个tuple被spout生成后,会经过bolt的处理,最终被ack(确认)或fail(失败)。JStorm的可靠性机制依赖于tuple的生命周期管理。 2.2 容错机制 JStorm通过acker组件实现了 tuples 的可靠传输。每个提交的拓扑都会...