一个消息(tuple)从spout发送出来,可能导致成百上千消息基于此消息创建,这些消息构成一个树状结构,称之为“tuple tree”,如下图:
同时满足了下面两个条件,storm会认为消息被完整的处理了,如果在指定的时间内,一个消息衍生出来的tuple tree未被完全处理成功,则认为消息未被完整处理。这个超时时间可以通过任务参数Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS进行配置,默认是30秒。
- tuple tree 不再生长
- tuple tree 中的任何消息被标记为"已处理"
消息的生命周期
ISpout接口
public interface ISpout extends Serializable { void open(Map conf, TopologyContext context, SpoutOutputCollector collector); void close(); void activate(); void deactivate(); void nextTuple(); void ack(Object msgId); void fail(Object msgId); }
首先,使用spout的nextTuple()方法从spout请求一个tuple。收到请求后,spout使用open方法中提供的SpoutOutputCollector向它的输出流发送一个或多个消息。每发送一个消息,spout会给这个消息提供一个id,它将被用来标识这个消息。
假设我们从kestrel队列中读取消息,spout会将kestrel队列为这个消息设置的id作为此消息的id,向SpoutOutputCollector中发送的格式如下:
_collector.emit(new Values("field1","field2",3),msgId);
接下来,这个消息会被发送到后续业务处理的bolts,并且strom会跟踪由此消息产生出来的新消息。当检测到一个消息衍生出来的tuple tree被完整处理后,storm会调用spout中的ack方法,并将此消息的id作为参数传入。同理,如果某消息处理超时,则此消息对应的spout的fail方法会被调用,调用时此消息的id会被作为参数传入。一个消息只会由发送它的哪个spout任务调用ack或fail,如果系统中的某个spout由多个任务运行,消息也只会由穿件它的spout任务来应答(ack或fail)。
当KestrelSpout从kestrel队列中读取一个消息,表示它“打开”了队列中某个消息。这意味着,此消息并为从队列中真正删除,而是被设置为“pending”状态,它等待来自客户端的应答,被应答以后,此消息才会被真正的从队列中删除。处于pending状态的消息不会被其他客户端看到。另外,如果一个客户端意外断开连接,则由此客户端“打开”的所有消息都会被重新加入到队列中。当消息被“打开”的时候,kestrel队列同时会为这个消息提供一个唯一表示。
KestrelSpout使用这个唯一的标识作为这个tuple的id,当ack或fail被调用时,KestrelSpout会把ack或者fail连同id一起发送给kestrel队列,kestrel会将消息从队列中真正删除或者将它重新放回队列中。
可靠相关的API
为了使用Storm提供的可靠处理特性,我们需要做两件事情。
- 只要在tuple tree中创建了一个新节点,就要明确地通知storm
- 当处理完一个单独的消息时,告知storm这棵tuple tree的变化状态
为tuple tree中指定的节点增加一个新的节点,我们称之为锚定(anchoring)。锚定是在我们发送消息的同时进行的。bolt将包含整句话的消息分解为一系列的消息,每个消息包含一个单词。
public class WordBolt extends BaseRichBolt { OutputCollector _collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { _collector = collector; } @Override public void execute(Tuple tuple) { String value = tuple.getString(0); for(String word : value.split(" ")){ //锚定方式发送消息 _collector.emit(tuple,new Values(word)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
每个消息都通过这种方式被锚定,把输入消息作为emit方法的第一个参数。因为word消息被锚定在了输入消息上,这个输入消息时发送过来的tuple tree的跟节点,如果任意一个word消息处理失败,派生这个tuple tree的哪个消息将会重新发送。
与此相反,如果以这种方式发送消息,将会导致这个消息不会被锚定。如果tuple tree中的消息处理失败,派生此tuple tree的消息不会被重新发送,根据任务的容错级别,有时候适合发送一个非锚定的消息。_collector.emit(new Values(word))
很多Bolt遵循特定的处理流程,读取一个消息,发送它派生出来的子消息,在execute结尾处应答此消息,storm有一个BasicBolt接口封装了上述流程,使用这种方式比之前稍微简单一些,实现功能是一样的。发送到BasicOutputCollector的消息会被自动锚定到输入消息中,并且,当execute执行完毕时,会自动应答输入消息。
//BaseRichBolt与BaseBasicBolt public class WordBolt extends BaseBasicBolt { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String value = tuple.getString(0); for(String word : value.split(" ")){ //默认会锚定到tuple collector.emit(new Values(word)); } } }
高效地实现tuple tree
Tuple是如何被跟踪的呢?系统中有成千上万的消息,如果为每个spout发送的消息都构建一棵树的话,很快内存就会耗尽。所以必须采用不同的策略跟踪每个消息。由于使用了新的跟踪算法,storm只需要固定的内存(大约20字节)就可以跟踪一棵树。这个算法是storm正确运行的核心,也是storm最大的突破。
Acker任务记录了spout消息id到一对值的映射。第一个值就是spout的任务id,通过这个id,acker就知道消息处理完成是该通知哪个spout的任务。第二个值是一个64bit的数字,称之为ack val,他是树中所有消息的随机id的异或结果。ack val表示了整棵树的状态,无论这棵树多大,只要固定大小的数字就可以跟踪整棵树。当消息被创建和被应答是都会有相同的消息id发送过来做异或。
每当acker发现一棵树的ack val值为0是,它就知道这棵树已经被完全处理了。
选择合适的可靠性级别
Acker任务是轻量级的,所以在拓扑中并不需要太多的acker存在。可以通过Storm UI观察acker任务的吞吐量,如果吞吐量不够的话,说明需要增加额外的acker。如果你并不要求每个消息必须被处理,那么可以关闭消息的可靠处理机制,从而获取较好的性能。关闭消息的可靠处理机制意味着系统的消息会减半。
有三种方法可以调整消息的可靠性处理机制。
- 将参数Config.TOPOLOGY_ACKERS设置为0,通过此方法,当spout发送一个消息时,它的ack方法将立刻被调用。
- Spout发送一个消息时,不指定此消息的id,当需要关闭特定消息可靠性是,可以使用此方法。
- 如果你不在意派生出来的自消息的可靠性,则子消息不进行锚定,子消息失败不会引起任何spout重发消息。
相关推荐
在Storm中,"ack机制"是实现数据可靠传输的关键特性,它确保了每个消息至少被处理一次(At-Least-Once Delivery)。下面我们将详细探讨Storm的ack机制以及它如何保证数据的可靠性。 1. **什么是Storm的Ack机制?** ...
此外,《大数据技术丛书:Storm实时数据处理》旨在围绕Storm技术促进DevOps实践,使读者能够开发Storm解决方案,同时可靠地交付有价值的产品。 《大数据技术丛书:Storm实时数据处理》适合想学习实时处理技术或者...
Apache Storm是一个开源的分布式实时计算系统,它允许开发者处理无界数据流,确保每个事件都能得到精确一次(exactly-once)的处理,从而实现高度可靠的消息传递。 首先,我们需要理解Storm的核心概念。Storm由多个...
《storm实时数据处理》通过丰富的实例,系统讲解Storm的基础知识和实时数据处理的最佳实践...此外,Storm实时数据处理旨在围绕Storm技术促进DevOps实践,使读者能够开发Storm解决方案,同时可靠地交付有价值的产品。
Storm可以保证从Spout发出的每个消息都能被完全处理。Storm的可靠性机制是完全分布式的(distributed),可伸缩的(scalable),容错的(fault-tolerant)。本文介绍了Storm如何保证可靠性以及作为Storm使用者,我们需要...
Storm技术提供了一种基于消息处理的计算模型,支持多个作业同时进行,每个作业可以视为一个拓扑,其中包含了数据处理的组件和流数据之间的关系。这使得Storm非常适合于对实时性要求极高的数据处理场景,例如实时分析...
本书中的每一个步骤都应用了成熟的开发和操作实践,确保你能够可靠地交付产品。 通过阅读本书,你将能够: 搭建你的开发环境并测试Strom集群。 处理数据流,包括基于规则的处理流程。 构建分布式远程过程调用。 交付...
Storm的事务性拓扑主要用于确保数据处理的精确一次性交付(exactly-once processing semantics),这在某些需要极高可靠性的实时计算场景中非常关键。 Storm集群的运行模式包括本地模式和分布式模式。本地模式适用...
- **消息处理保证**:Storm提供了一种可靠的机制来确保每个消息至少被处理一次。 - **编程语言的灵活性**:Storm支持多种编程语言,默认支持Clojure、Java、Ruby和Python,也可以通过实现特定通信协议支持其他语言。...
Storm作为Apache软件基金会的顶级项目,是一个分布式、容错的实时计算系统,能够处理无界数据流,确保每个消息至少被处理一次,从而提供高可靠性的保障。 **Zookeeper在Storm中的角色** Zookeeper是一个分布式的,...
以上这些知识点对于理解Storm的工作原理至关重要,它们涵盖了从Storm的基本架构、进程启动和初始化、Topology的创建和提交,到消息的接收、传递以及可靠性的保证。通过深入分析这些知识点,可以加深对Storm源码的...
Storm是一个开源的分布式实时计算系统,由Twitter开发并开源,旨在实现高可靠性、可伸缩性、快速处理无界数据流。Storm可以与Hadoop进行类比,但相较于Hadoop处理批量数据的批处理方式,Storm更专注于处理实时数据流...
Twitter于2011年开源Storm,它提供了对大规模、无界数据流进行连续计算的能力,保证了数据处理的低延迟和高可靠性。 1.2 我司实时服务现状 公司当前的实时服务面临着数据量大、处理延迟和业务连续性挑战。传统的...
Storm作为一个分布式的、容错的实时计算系统,特别适合于需要高实时性的数据处理任务,如实时监控、日志分析、消息服务、在线机器学习等场景。 在系统设计方面,本文提出了一种基于Storm的实时数据处理平台架构,该...
KafkaSpout能保证消息的可靠处理,即使在故障恢复后也能确保不丢失任何数据。开发者需要配置KafkaSpout以指向正确的Zookeeper集群(用于Kafka的协调),并指定要消费的Kafka主题。 接下来,我们关注Bolts。在这个...
2. **事务处理**:通过 Trident API 提供了可靠的消息处理能力,确保消息的处理一致性。 3. **并发与通信机制**:每个Spout和Bolt可以有多个实例并发运行,通过TCP/IP进行进程间的通信。 **六、Storm应用场景** 1....