`

storm 可靠性机制保证

 
阅读更多

本文属原创系列,转载请注明。

转自:http://blog.csdn.net/xeseo/article/details/17754825

 

对于Storm,它有一个很重要的特性:“Guarantee no data loss” ——可靠性

很显然,要做到这个特性,必须要track每个data的去向和结果。Storm是如何做到的呢——acker机制。

先概括下acker所参与的工作流程:

1. Spout创建一个新的Tuple时,会发一个消息通知acker去跟踪;

2. Bolt在处理Tuple成功或失败后,也会发一个消息通知acker;

3. acker会找到发射该Tuple的Spout,回调其ack或fail方法。

 

我们说RichBolt和BasicBolt的区别是后者会自动ack。那么是不是我们只要实现了Spout的ack或fail方法就能看到反馈了呢?

试试在RandomSpout中加入如下代码:

 

  1. @Override  
  2.     public void ack(Object msgId) {  
  3.         System.err.println("ack " + msgId);  
  4.     }  
  5.   
  6.     @Override  
  7.     public void fail(Object msgId) {  
  8.         System.err.println("fail " + msgId);  
  9.     }  


重新运行ExclaimBasicTopo,看下结果。并没有任何的ack 和 fail 出现?

 

原因是,Storm要求如果要track一个Tuple,必须要指定其messageId,也就是回调回ack和fail方法的参数。如果我们不指定,Storm是不会去track该tuple的,即不保证消息丢失!

我们改下Spout代码,为每个消息加入一个唯一Id。同时,为了方便看结果,加入更多的打印,并且靠sleep减慢发送速度。(只是为了演示!)

 

  1. public class RandomSpout extends BaseRichSpout {  
  2.   
  3.     private SpoutOutputCollector collector;  
  4.   
  5.     private Random rand;  
  6.       
  7.     private AtomicInteger counter;  
  8.       
  9.     private static String[] sentences = new String[] {"edi:I'm happy""marry:I'm angry""john:I'm sad""ted:I'm excited""laden:I'm dangerous"};  
  10.       
  11.     @Override  
  12.     public void open(Map conf, TopologyContext context,  
  13.             SpoutOutputCollector collector) {  
  14.         this.collector = collector;  
  15.         this.rand = new Random();  
  16.         counter = new AtomicInteger();  
  17.     }  
  18.   
  19.     @Override  
  20.     public void nextTuple() {  
  21.         Utils.sleep(5000);  
  22.         String toSay = sentences[rand.nextInt(sentences.length)];  
  23.         int msgId = this.counter.getAndIncrement();  
  24.         toSay = "["+ msgId + "]"+ toSay;  
  25.         PrintHelper.print("Send " + toSay );  
  26.           
  27.         this.collector.emit(new Values(toSay), msgId);  
  28.     }  
  29.   
  30.     @Override  
  31.     public void declareOutputFields(OutputFieldsDeclarer declarer) {  
  32.         declarer.declare(new Fields("sentence"));  
  33.     }  
  34.       
  35.     @Override  
  36.     public void ack(Object msgId) {  
  37.         PrintHelper.print("ack " + msgId);  
  38.     }  
  39.   
  40.     @Override  
  41.     public void fail(Object msgId) {  
  42.         PrintHelper.print("fail " + msgId);  
  43.     }  
  44.   
  45. }  

 

 

 

PrintHelper类:

 

  1. public class PrintHelper {  
  2.   
  3.     private static SimpleDateFormat sf = new SimpleDateFormat("mm:ss:SSS");  
  4.       
  5.     public static void print(String out){  
  6.         System.err.println(sf.format(new Date()) + " [" + Thread.currentThread().getName() + "] " + out);  
  7.     }  
  8.       
  9. }  

 

同时把PrintBolt里面打印也换成PrintHelper.print打印


看下打印结果:

 

[plain] view plaincopyprint?在CODE上查看代码片派生到我的代码片
 
  1. 53:33:891 [Thread-26-spout] Send [0]ted:I'm excited  
  2. 53:33:896 [Thread-20-print] Bolt[0] String recieved: [0]ted:I'm excited!  
  3. 53:38:895 [Thread-26-spout] Send [1]edi:I'm happy  
  4. 53:38:895 [Thread-22-print] Bolt[1] String recieved: [1]edi:I'm happy!  
  5. 53:38:895 [Thread-26-spout] ack 0  
  6. 53:43:896 [Thread-26-spout] Send [2]edi:I'm happy  
  7. 53:43:896 [Thread-22-print] Bolt[1] String recieved: [2]edi:I'm happy!  
  8. 53:43:896 [Thread-26-spout] ack 1  
  9. 53:48:896 [Thread-26-spout] Send [3]edi:I'm happy  
  10. 53:48:896 [Thread-26-spout] ack 2  
  11. 53:48:896 [Thread-24-print] Bolt[2] String recieved: [3]edi:I'm happy!  
  12. 53:53:896 [Thread-26-spout] Send [4]ted:I'm excited  
  13. 53:53:896 [Thread-26-spout] ack 3  
  14. 53:53:896 [Thread-20-print] Bolt[0] String recieved: [4]ted:I'm excited!  
  15. 53:58:897 [Thread-26-spout] Send [5]laden:I'm dangerous  
  16. 53:58:897 [Thread-26-spout] ack 4  
  17. 53:58:898 [Thread-24-print] Bolt[2] String recieved: [5]laden:I'm dangerous!  


很明显看到:

 

a. 并发度为1的Spout确实是一个线程,并发度为3的Bolt确实是三个线程;

b. 消息完全处理完成后,确实回调了ack(Object msgId)方法,而且msgId的值,即为我们emit的msgId;

c. 虽然我们在topology中定义了两个bolt,但实际上ack对于每个tuple只调用了一次;

d. spout发出tuple后,Bolt很快就完成了,但是ack直到5秒后spout醒来才打印。

 

Tuple树

对于Spout创建的Tuple,在topology定义的流水线中经过Bolt处理时,可能会产生一个或多个新的Tuple。源Tuple+新产生的Tuple构成了一个Tuple树。当整棵树被处理完成,才算一个Tuple被完全处理,其中任何一个节点的Tuple处理失败或超时,则整棵树失败。

超时的值,可以通过定义topology时,conf.setMessageTimeoutSecs方法指定。

 

Anchor

在我们例子中ExclaimRichBolt

collector.emit(inputTule, new Values(newTupleValue));

发射一个新的tuple。

第一个参数是传入Bolttuple,第二个参数是新产生的tuplevalue,这种emit的方式,在Storm中称为: "anchor"。

 

Tuple的ack

前面我们一直提到acker,看到这里,你应该能猜出acker其实就是Storm里面track一个Tuple保证其一定被处理的功能。acker也是一个component

我们来看看acker的工作流程:

1. Spout在初始化时会产生一个tasksId;

2. Spout中创建新的Tuple,其id是一个64位的随机数;

3. Spout将新建的Tuple发送出去(给出了messageId来开启Tuple的追踪), 同时会发送一个消息到某个acker,要求acker进行追踪。该消息包含两部分:

 

  • Spout的taskId:用户acker在整个tuple树被完全处理后找到原始的Spout进行回调ack或fail
  • 一个64位的ack val值: 标志该tuple是否被完全处理。初始值为0。

 

3. 一个Bolt在处理完Tuple后,如果发射了一个新的anchor tuple,Storm会维护anchor tuple的列表;

4. 该Bolt调用OutputCollector.ack()时,Storm会做如下操作:

 

  • anchor tuple列表中每个已经ack过的和新创建的Tuple的id做异或(XOR)。假定Spout发出的TupleID是tuple-id-0,该Bolt新生成的TupleID为tuple-id-1,那么,tuple-id-0XORtuple-id-0XOR tuple-id-1
  • Storm根据该原始TupleID进行一致性hash算法,找到最开始Spout发送的那个acker,然后把上面异或后得出的ack val值发送给acker

 

5. acker收到新的ack val值后,与保存的原始的Tuple的id进行异或,如果为0,表示该Tuple已被完全处理,则根据其taskId找到原始的Spout,回调其ack()方法。

 

fail的机制类似,在发现fail后直接回调Spout的fail方法。

Storm就是通过这个acker的机制来保证数据不丢失。

 

回头再看看上面的打印结果,b、c两条得到很好的解释了。那d是为什么呢?

在最开始时,我曾经提到过,Storm的设计模型中,Spout是源源不断的产生数据的,所以其nextTuple()方法在任何时候不应该被打断。ack,fail 和 nextTuple是在同一个线程中完成的。

所以,虽然acker发现一个Tuple已经完全处理完成,但是由于Spout线程在Sleep,无法回调。

 

在设计中,我们应尽量避免在Spout、Bolt中去Sleep。如果确实需要控制,最好用异步线程来做,例如用异步线程读取数据到队列,再由Spout去取队列中数据。异步线程可以随意控制速度等。

 

另外,

Storm是否会自动重发失败的Tuple?

这里答案已经很明显了。fail方法如何实现取决于你自己。只有在fail中做了重发机制,才有重发。

注:Trident除外。这是Storm提供的特殊的事务性API,它确实会帮你自动重发的。

Unanchor

如果我们在Bolt中用OutputCollector.emit()发射一个新的Tuple时,并没有指定输入的Tuple(IBasicBolt的实现类用的是BasicOutPutCollector,其emit方法实际上还是调用OutputCollector.emit(),只不过内部会帮你填上输入的Tuple),那么行为称之为“Unanchor”。

是否用Unanchor方式取决于你的实现。

 

调整可靠性

在某些特定的情况下,你或许想调整Storm的可靠性。例如,你并不关心数据是否丢失,或者你想看看后面是否有某个Bolt拖慢了Spout的速度?
那么,有三种方法可以实现:
1. 在build topology时,设置acker数目为0,即conf.setNumAckers(0);
2. 在Spout中,不指定messageId,使得Storm无法追踪;
3. 在Bolt中,使用Unanchor方式发射新的Tuple。
分享到:
评论

相关推荐

    storm利用ack保证数据的可靠性源码

    下面我们将详细探讨Storm的ack机制以及它如何保证数据的可靠性。 1. **什么是Storm的Ack机制?** Ack(确认)机制是Storm中的一个核心组件,它的主要任务是跟踪和确保每个数据包(tuple)在拓扑中正确处理。当一个...

    Storm如何保证可靠的消息处理

    Storm的可靠性机制是完全分布式的(distributed),可伸缩的(scalable),容错的(fault-tolerant)。本文介绍了Storm如何保证可靠性以及作为Storm使用者,我们需要怎么做,才能充分利用Storm的可靠性。理解一些实现细节...

    Storm源码走读笔记

    文档最后提及了Storm的可靠性机制,这是指Storm如何保证每个Tuple在Topology中至少被处理一次。Storm通过ack消息来确保Tuple被正确处理。如果某个Tuple在指定的时间内没有被ack,系统会自动重新发送该Tuple。 以上...

    storm1.2.1-helloword可靠消息

    在本主题中,我们将深入探讨Apache Storm 1.2.1版本中的“Hello World”示例,这是一个关于实时数据...这个简单的应用展示了Storm如何处理数据流并保证消息的可靠性,使得它成为大数据领域中实时分析和处理的有力工具。

    细细品味Storm_Storm简介及安装

    未来,Storm将进一步优化其性能和可靠性,增强对更多编程语言的支持,并与其他大数据生态系统更好地集成。 #### 二、Storm安装 **2.1 版本选择** 在安装之前,应先了解所需的Storm版本及其兼容性。通常建议使用...

    storm 从零到精通 非常实用的文件

    - Storm 的可靠性机制确保每个消息至少被处理一次。 - **2.5.2 如果一个消息被完全处理或完全处理失败会发生什么** - 如果消息成功处理,则不会被重复处理;如果处理失败,Storm 会重新发送消息直到成功为止。 - *...

    storm on yarn概念架构消息机制概述

    Storm的高可靠性、高容错性的特性意味着即使有节点失败,整个系统也不会停止运行,因为Nimbus和Supervisor可以重启失败的worker。Supervisor负责管理属于自己的worker进程,运行具体的逻辑,而Nimbus负责资源分配和...

    storm的jar包

    此外,Storm支持容错机制,即使部分节点失败,系统也能自动恢复,保证服务的高可用性。 总结来说,"storm的jar包"是Storm实时处理框架的基础,包含了运行和开发Storm应用所需的所有组件和库。通过深入理解和使用这...

    Storm 源码分析

    5. **容错机制**:为了保证数据处理的可靠性,Storm设计了一套完整的容错机制,包括消息确认机制、任务重启机制等。 #### 六、Storm与Hadoop的集成 虽然Storm和Hadoop分别针对实时计算和批处理两个不同的场景,但...

    storm-hbase集成

    2. 可靠性:Storm 的处理保证和 HBase 的分布式存储机制确保了数据的完整性和一致性。 3. 扩展性:两者都是高度可扩展的系统,可以随着数据量的增长无缝扩展。 四、Storm-HBase 集成的实现方式 通常,集成 Storm 与...

    storm开发jar包以及storm例子源码

    7. **容错机制**:Storm通过检查点和故障恢复来保证容错性,即使节点失败,也能从最近的检查点恢复。 8. **Zookeeper**:作为协调服务,存储元数据和状态信息,确保集群的一致性和高可用性。 9. **nimbus**和**...

    storm-wordcount例子

    Spout需要保证数据的可靠性和无重复性。 2. **Split Bolt**:接收到Spout发送的Tuples后,Split Bolt负责对每个Tuple中的文本进行分词操作,将单词分离出来。这个Bolt会创建新的Tuples,每个Tuple包含一个单词,...

    storm剖析(pdf)

    ACK机制用于确保消息的可靠性处理,调度器则负责在Worker之间分发任务,多语言支持允许用户使用除Java以外的语言编写Spout和Bolt组件。 在运行Storm集群时,还需要理解关于资源隔离的概念,比如使用CGroup来限制...

    storm企业应用 实战 运维和调优

    Storm框架在企业级应用中扮演着重要角色,运维和调优确保了实时数据处理的性能和可靠性。通过掌握以上知识点,可以让Storm集群在各种企业环境中高效稳定地运行,同时快速响应各种业务需求的变化。

    Twitter storm

    Storm 的设计目标是提供一个简单易用、可扩展且容错性高的平台,用于处理无界数据流(即持续不断地产生数据的数据流),并且能够保证消息的可靠传输。 Storm 与 Hadoop 类似,但主要针对的是实时数据处理,而Hadoop...

    storm开发设计规范

    - Spout应确保数据源的可靠性,如使用可靠的消息队列作为后端存储。 - 实现正确的失败恢复机制,确保在异常情况下能正确重放未处理的数据。 2. **Bolt开发规范**: - Bolt的设计应尽可能高效,避免不必要的计算...

    实时计算平台STORM流式数据核心技术与报文系统.pdf

    通过对Storm的深入理解和应用,开发者可以构建出高性能、高可靠性的实时数据处理系统,满足现代企业对实时数据洞察的需求。同时,结合报文系统的特性和需求,可以定制化地设计和实现满足业务场景的解决方案。

    storm1.0 搭建

    2. **容错机制**:Storm 1.0 在容错机制上做了改进,支持更灵活的任务重试策略,提高了系统的稳定性和可靠性。 3. **监控与日志**:通过集成第三方监控工具如 Nagios 或 Prometheus 可以对集群进行实时监控,同时...

Global site tag (gtag.js) - Google Analytics