`

Twitter Storm如何保证消息不丢失

 
阅读更多
作者: xumingming | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明
网址: http://xumingming.sinaapp.com/127/twitter-storm如何保证消息不丢失/

 
本文翻译自: https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing

storm保证从spout发出的每个tuple都会被完全处理。这篇文章介绍storm是怎么做到这个保证的,以及我们使用者怎么做才能充分利用storm的可靠性特点。

一个tuple被”完全处理”是什么意思?

就如同蝴蝶效应一样,从spout发射的一个tuple可以引起其它成千上万个tuple因它而产生, 想想那个计算一篇文章中每个单词出现次数的topology.

1
2
3
4
5
6
7
8
9
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(1, new KestrelSpout("kestrel.backtype.com",
                                     22133,
                                     "sentence_queue",
                                     new StringScheme()));
builder.setBolt(2, new SplitSentence(), 10)
        .shuffleGrouping(1);
builder.setBolt(3, new WordCount(), 20)
        .fieldsGrouping(2, new Fields("word"));

这个topology从一个Kestrel队列读取句子, 把每个句子分割成一个个单词, 然后发射这一个个单词: 一个源tuple(一个句子)引起后面很多tuple的产生(一个个单词), 这个消息流大概是这样的:

统计单词出现次数的tuple树

统计单词出现次数的tuple树

在storm里面一个tuple被完全处理的意思是: 这个tuple以及由这个tuple所导致的所有的tuple都被成功处理。而一个tuple会被认为处理失败了如果这个消息在timeout所指定的时间内没有成功处理。 而这个timetout可以通过Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS来指定。

 

如果一个消息处理成功了或者失败了会发生什么?

FYI。 下面这个是spout要实现的接口:

1
2
3
4
5
6
7
8
public interface ISpout extends Serializable {
    void open(Map conf, TopologyContext context,
              SpoutOutputCollector collector);
    void close();
    void nextTuple();
    void ack(Object msgId);
    void fail(Object msgId);
}

首先storm通过调用spout的nextTuple方法来获取下一个tuple, Spout通过open方法参数里面提供的SpoutOutputCollector来发射新tuple到它的其中一个输出消息流, 发射tuple的时候spout会提供一个message-id, 后面我们通过这个message-id来追踪这个tuple。举例来说, KestrelSpout从kestrel队列里面读取一个消息,并且把kestrel提供的消息id作为message-id, 看例子:

1
_collector.emit(new Values("field1","field2", 3),msgId);

接下来, 这个发射的tuple被传送到消息处理者bolt那里, storm会跟踪由此所产生的这课tuple树。如果storm检测到一个tuple被完全处理了, 那么storm会以最开始的那个message-id作为参数去调用消息源的ack方法;反之storm会调用spout的fail方法。值得注意的一点是, storm调用ack或者fail的task始终是产生这个tuple的那个task。所以如果一个spout被分成很多个task来执行, 消息执行的成功失败与否始终会通知最开始发出tuple的那个task。

我们再以KestrelSpout为例来看看spout需要做些什么才能保证“一个消息始终被完全处理”, 当KestrelSpout从Kestrel里面读出一条消息, 首先它“打开”这条消息, 这意味着这条消息还在kestrel队列里面, 不过这条消息会被标示成“处理中”直到ack或者fail被调用。处于“处理中“状态的消息不会被发给其他消息处理者了;并且如果这个spout“断线”了, 那么所有处于“处理中”状态的消息会被重新标示成“等待处理”.

Storm的可靠性API

作为storm的使用者,有两件事情要做以更好的利用storm的可靠性特征。 首先,在你生成一个新的tuple的时候要通知storm; 其次,完成处理一个tuple之后要通知storm。 这样storm就可以检测整个tuple树有没有完成处理,并且通知源spout处理结果。storm提供了一些简洁的api来做这些事情。

由一个tuple产生一个新的tuple称为: anchoring。你发射一个新tuple的同时也就完成了一次anchoring。看下面这个例子: 这个bolt把一个包含一个句子的tuple分割成每个单词一个tuple。

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class SplitSentence implements IRichBolt {
        OutputCollector _collector;
 
        public void prepare(Map conf,
                            TopologyContext context,
                            OutputCollector collector) {
            _collector = collector;
        }
 
        public void execute(Tuple tuple) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                _collector.emit(tuple, new Values(word));
            }
            _collector.ack(tuple);
        }
 
        public void cleanup() {
        }
 
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    }

看一下这个execute方法, emit的第一个参数是输入tuple, 第二个参数则是输出tuple, 这其实就是通过输入tuple anchoring了一个新的输出tuple。因为这个“单词tuple”被anchoring在“句子tuple”一起, 如果其中一个单词处理出错,那么这整个句子会被重新处理。作为对比, 我们看看如果通过下面这行代码来发射一个新的tuple的话会有什么结果。

1
_collector.emit(new Values(word));

用这种方法发射会导致新发射的这个tuple脱离原来的tuple树(unanchoring), 如果这个tuple处理失败了, 整个句子不会被重新处理。到底要anchoring还是要 unanchoring则完全取决于你的业务需求。

一个输出tuple可以被anchoring到多个输入tuple。这种方式在stream合并或者stream聚合的时候很有用。一个多入口tuple处理失败的话,那么它对应的所有输入tuple都要重新执行。看看下面演示怎么指定多个输入tuple:

1
2
3
4
List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));

多入口tuple把这个新tuple加到了多个tuple树里面去了。

我们通过anchoring来构造这个tuple树,最后一件要做的事情是在你处理完当个tuple的时候告诉storm,  通过OutputCollector类的ack和fail方法来做,如果你回过头来看看SplitSentence的例子, 你可以看到“句子tuple”在所有“单词tuple”被发出之后调用了ack。

你可以调用OutputCollector 的fail方法去立即将从消息源头发出的那个tuple标记为fail, 比如你查询了数据库,发现一个错误,你可以马上fail那个输入tuple, 这样可以让这个tuple被快速的重新处理, 因为你不需要等那个timeout时间来让它自动fail。

每个你处理的tuple, 必须被ack或者fail。因为storm追踪每个tuple要占用内存。所以如果你不ack/fail每一个tuple, 那么最终你会看到OutOfMemory错误。

大多数Bolt遵循这样的规律:读取一个tuple;发射一些新的tuple;在execute的结束的时候ack这个tuple。这些Bolt往往是一些过滤器或者简单函数。Storm为这类规律封装了一个BasicBolt类。如果用BasicBolt来做, 上面那个SplitSentence可以改写成这样:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
public class SplitSentence implements IBasicBolt {
        public void prepare(Map conf,
                            TopologyContext context) {
        }
 
        public void execute(Tuple tuple,
                            BasicOutputCollector collector) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                collector.emit(new Values(word));
            }
        }
 
        public void cleanup() {
        }
 
        public void declareOutputFields(
                        OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    }

这个实现比之前的实现简单多了, 但是功能上是一样的。发送到BasicOutputCollector的tuple会自动和输入tuple相关联,而在execute方法结束的时候那个输入tuple会被自动ack的。
作为对比,处理聚合和合并的bolt往往要处理一大堆的tuple之后才能被ack, 而这类tuple通常都是多输入的tuple, 所以这个已经不是IBasicBolt可以罩得住的了。

storm是怎么实现高效率的可靠性的?

storm里面有一类特殊的task称为:acker, 他们负责跟踪spout发出的每一个tuple的tuple树。当acker发现一个tuple树已经处理完成了。它会发送一个消息给产生这个tuple的那个task。你可以通过Config.TOPOLOGY_ACKERS来设置一个topology里面的acker的数量, 默认值是一。 如果你的topology里面的tuple比较多的话, 那么把acker的数量设置多一点,效率会高一点。

理解storm的可靠性的最好的方法是来看看tuple和tuple树的生命周期, 当一个tuple被创建, 不管是spout还是bolt创建的, 它会被赋予一个64位的id,而acker就是利用这个id去跟踪所有的tuple的。每个tuple知道它的祖宗的id(从spout发出来的那个tuple的id), 每当你新发射一个tuple, 它的祖宗id都会传给这个新的tuple。所以当一个tuple被ack的时候,它会发一个消息给acker,告诉它这个tuple树发生了怎么样的变化。具体来说就是:它告诉acker: 我呢已经完成了, 我有这些儿子tuple, 你跟踪一下他们吧。下面这个图演示了C被ack了之后,这个tuple树所发生的变化。

tuple ack示例

tuple ack示例

关于storm怎么跟踪tuple还有一些细节, 前面已经提到过了, 你可以自己设定你的topology里面有多少个acker。而这又给我们带来一个问题, 当一个tuple需要ack的时候,它到底选择哪个acker来发送这个信息呢?

storm使用一致性哈希来把一个spout-tuple-id对应到acker, 因为每一个tuple知道它所有的祖宗的tuple-id, 所以它自然可以算出要通知哪个acker来ack。(这里所有的祖宗是指这个tuple所对应的所有的根tuple。这里注意因为一个tuple可能存在于多个tuple树,所以才有所有一说)。

storm的另一个细节是acker是怎么知道每一个spout tuple应该交给哪个task来处理。当一个spout发射一个新的tuple, 它会简单的发一个消息给一个合适的acker,并且告诉acker它自己的id(taskid), 这样storm就有了taskid-tupleid的对应关系。 当acker发现一个树完成处理了, 它知道给哪个task发送成功的消息。

acker task并不显式的跟踪tuple树。对于那些有成千上万个节点的tuple树,把这么多的tuple信息都跟踪起来会耗费太多的内存。相反, acker用了一种不同的方式, 使得对于每个spout tuple所需要的内存量是恒定的(20 bytes) .  这个跟踪算法是storm如何工作的关键,并且也是它的主要突破。

一个acker task存储了一个spout-tuple-id到一对值的一个mapping。这个对子的第一个值是创建这个tuple的taskid, 这个是用来在完成处理tuple的时候发送消息用的。 第二个值是一个64位的数字称作:”ack val”, ack val是整个tuple树的状态的一个表示,不管这棵树多大。它只是简单地把这棵树上的所有创建的tupleid/ack的tupleid一起异或(XOR)。

当一个acker task 发现一个 ack val变成0了, 它知道这棵树已经处理完成了。 因为tupleid是随机的64位数字, 所以, ack val碰巧变成0(而不是因为所有创建的tuple都完成了)的几率极小。算一下就知道了, 就算每秒发生10000个ack, 那么需要50000000万年才可能碰到一个错误。而且就算碰到了一个错误, 也只有在这个tuple失败的时候才会造成数据丢失。 关于Acker的详细工作流程的分析可以看看这篇文章: Twitter Storm源代码分析之acker工作流程

既然你已经理解了storm的可靠性算法, 让我们一起过一遍所有可能的失败场景,并看看storm在每种情况下是怎么避免数据丢失的。

1. 由于对应的task挂掉了,一个tuple没有被ack: storm的超时机制在超时之后会把这个tuple标记为失败,从而可以重新处理。

2. Acker挂掉了: 这种情况下由这个acker所跟踪的所有spout tuple都会超时,也就会被重新处理。

3. Spout挂掉了: 在这种情况下给spout发送消息的消息源负责重新发送这些消息。比如Kestrel和RabbitMQ在一个客户端断开之后会把所有”处理中“的消息放回队列。

就像你看到的那样, storm的可靠性机制是完全分布式的, 可伸缩的并且是高度容错的。

 

调整可靠性 (Tuning Reliability)

acker task是非常轻量级的, 所以一个topology里面不需要很多acker。你可以通过Strom UI(id: -1)来跟踪它的性能。 如果它的吞吐量看起来不正常,那么你就需要多加点acker了。

如果可靠性对你来说不是那么重要 — 你不太在意在一些失败的情况下损失一些数据, 那么你可以通过不跟踪这些tuple树来获取更好的性能。不去跟踪消息的话会使得系统里面的消息数量减少一半, 因为对于每一个tuple都要发送一个ack消息。并且它需要更少的id来保存下游的tuple, 减少带宽占用。

有三种方法可以去掉可靠性。第一是把Config.TOPOLOGY_ACKERS 设置成 0. 在这种情况下, storm会在spout发射一个tuple之后马上调用spout的ack方法。也就是说这个tuple树不会被跟踪。

第二个方法是在tuple层面去掉可靠性。 你可以在发射tuple的时候不指定messageid来达到不跟粽某个特定的spout tuple的目的。

最后一个方法是如果你对于一个tuple树里面的某一部分到底成不成功不是很关心,那么可以在发射这些tuple的时候unanchor它们。 这样这些tuple就不在tuple树里面, 也就不会被跟踪了

分享到:
评论

相关推荐

    Twitter storm

    - **消息无丢失**:为了确保消息的可靠传输,Storm提供了消息确认机制。当一个Bolt处理完一条消息后,必须显式地向Spout发送确认信息,这样Spout才能知道该消息已经被成功处理。 - **消息按序处理**:在某些场景下,...

    基于java的开放实时数据处理平台 Twitter Storm.zip

    3. **容错机制**:Storm利用ZooKeeper进行协调,确保在节点故障时,任务可以自动重新分配,保证数据不丢失且处理继续进行。 4. **拓扑结构**:在Storm中,数据处理逻辑被组织成“拓扑”,由“spout”(数据源)和...

    Storm @Twitter-Slides.pdf

    在容错性和可扩展性方面,Storm提供了保证消息处理(Guaranteed Message Processing)的能力,即使在故障发生时也能确保数据不丢失。同时,它支持水平扩展,只需增加更多的节点即可处理更大规模的数据流。此外,简洁...

    Storm @Twitter

    Storm通过记录每个消息的处理状态,并在节点发生故障时重新分配消息,保证了计算任务可以继续执行而不会丢失数据。 三、Storm的查询执行 Storm通过执行查询(拓扑)来完成数据流的处理。拓扑定义了数据流的处理逻辑...

    从零开始学Storm.pdf

    3. 保证数据不丢失:Storm的设计确保了数据的完整性,能够保证每条消息都被妥善处理,避免丢失。 4. 鲁棒性:Storm设计目标之一是提供健壮、容易管理的集群,即使在高并发的情况下也能保证系统的稳定运行。 5. 容错...

    从零开始学storm.pdf

    3. 保证无数据丢失:Storm保证每条消息都能被妥善处理,而不会发生丢失。 4. 强健的鲁棒性:Storm集群能够有效运行,并且鲁棒性高,易于管理。 5. 容错性好:在执行计算任务出现错误时,Storm能够重新分配任务,保证...

    storm 学习资源总结

    Storm 的架构特点包括编程简单、高性能、低延迟、分布式、可扩展、容错、消息不丢失等。 Storm 的应用场景包括实时计算分析、ETL、在线机器学习、Hadoop 处理静态数据,而 Storm 处理的是动态的、连续的数据。...

    storm组件应用说明书

    - **检查点(ckpt)**:用于实现状态持久化,保证故障恢复时数据不丢失。 - **动态调整拓扑**:根据负载情况,实时调整拓扑的执行策略。 - **监控指标**:如消息处理延迟、吞吐量等,监控系统的健康状况。 - **日志...

    lamp安装配置及flume+Kafka+Storm+HDFS实时系统搭分享

    在Flume、Kafka和Storm处理完数据后,HDFS可以提供可靠的存储,确保数据不丢失,便于后续的数据挖掘和分析。 具体搭建流程如下: 1. 安装Linux系统并配置网络环境。 2. 安装Nginx并配置服务器,部署PHP应用。 3. ...

    实时计算Storm核心技术及其在报文系统中的应用.pdf

    Storm通过Ack机制确保每个数据元被正确处理,即使在网络故障或节点失效的情况下,也能保证数据不丢失。 3. Storm中的高层机制 3.1 事务处理 Storm支持事务处理,保证数据处理的原子性和一致性,尤其适用于金融等...

    Storm实战构建大数据实时计算( 带书签目录 高清完整版)

    当某个节点失败时,系统能自动重新分配任务,确保数据不丢失且处理继续进行。 6. **实时数据处理** Storm适用于实时数据分析,例如实时日志分析、实时推荐系统、实时计费系统等。它能够快速响应新产生的数据,满足...

    论Storm分布式实时计算工具.pdf

    1. 高可靠性:Storm 能保证消息不会丢失,避免数据丢失导致的问题。 2. 高扩展性:Storm 集群可以根据需要轻松扩展,以处理更大规模的数据流。 3. 管理简易性:设计时考虑了易于管理的目标,使得集群的日常运维更加...

    Storm的文档详解

    它专为处理无界数据流而设计,能够实现实时数据处理任务,具备低延迟、高可用、分布式的特性,并支持线性扩展以及确保数据处理过程中数据的零丢失。 **实时计算与离线计算对比:** - **离线计算**: - 批量获取...

    基于storm的实时计算架构

    然而,Storm也有一些问题,如编程门槛较高,框架缺乏持久化存储,没有提供消息接入模块,Storm UI功能简单,跨topology的bolt复用问题,以及Nimbus单点故障和topology不支持动态部署等问题。 阿里巴巴数据平台部的...

    Storm:distributed and fault-tolerant realtime computation

    - **容错性**:自动处理节点故障,保证数据不丢失且系统持续运行。 - **无中间消息代理**:减少额外的中间件依赖,提高性能和降低复杂性。 - **高级抽象**:提供比简单的消息传递更高级的抽象接口,方便开发者快速...

    starter_联合开发_storm_

    它们必须是可容错的,因为Storm会尝试重新启动失败的spout任务以确保数据不丢失。 "Blot"可能是对Bolt的误拼。Bolt是拓扑中的处理组件,它们执行实际的业务逻辑。Bolts可以是简单的过滤器,也可以是复杂的计算单元...

    storm深入学习.pdf

    这种机制确保了数据的可靠传输,即使在节点故障时也能重试处理,防止数据丢失。 3. **Storm 批处理** 尽管Storm主要用于实时流处理,但可以通过批处理策略来处理数据流的一部分,比如使用窗口来聚合数据。 4. **...

    02、Storm入门到精通storm3-1.pptx

    5. **可靠性保证**: Storm通过`Ack机制`确保数据不丢失。当一个Bolt处理完一个Tuple后,它会发送一个确认(Ack),如果某个Bolt在处理过程中失败,Storm会重新调度处理,直到成功。此外,如果任务失败,Storm会自动...

    Storm事务性拓扑详解教程.docx

    事务性拓扑在Storm 0.7.0版本中引入,解决了消息可能被重复处理的问题,这对于那些需要准确无误计算的场景至关重要,比如金融交易、数据分析或者任何不能容忍数据丢失或重复的应用。 在标准的Storm拓扑中,处理数据...

Global site tag (gtag.js) - Google Analytics