本文属原创系列,转载请注明。
转自:http://blog.csdn.net/xeseo/article/details/17754825
对于Storm,它有一个很重要的特性:“Guarantee no data loss” ——可靠性
很显然,要做到这个特性,必须要track每个data的去向和结果。Storm是如何做到的呢——acker机制。
先概括下acker所参与的工作流程:
1. Spout创建一个新的Tuple时,会发一个消息通知acker去跟踪;
2. Bolt在处理Tuple成功或失败后,也会发一个消息通知acker;
3. acker会找到发射该Tuple的Spout,回调其ack或fail方法。
我们说RichBolt和BasicBolt的区别是后者会自动ack。那么是不是我们只要实现了Spout的ack或fail方法就能看到反馈了呢?
试试在RandomSpout中加入如下代码:
- @Override
- public void ack(Object msgId) {
- System.err.println("ack " + msgId);
- }
- @Override
- public void fail(Object msgId) {
- System.err.println("fail " + msgId);
- }
重新运行ExclaimBasicTopo,看下结果。并没有任何的ack 和 fail 出现?
原因是,Storm要求如果要track一个Tuple,必须要指定其messageId,也就是回调回ack和fail方法的参数。如果我们不指定,Storm是不会去track该tuple的,即不保证消息丢失!
我们改下Spout代码,为每个消息加入一个唯一Id。同时,为了方便看结果,加入更多的打印,并且靠sleep减慢发送速度。(只是为了演示!)
- public class RandomSpout extends BaseRichSpout {
- private SpoutOutputCollector collector;
- private Random rand;
- private AtomicInteger counter;
- private static String[] sentences = new String[] {"edi:I'm happy", "marry:I'm angry", "john:I'm sad", "ted:I'm excited", "laden:I'm dangerous"};
- @Override
- public void open(Map conf, TopologyContext context,
- SpoutOutputCollector collector) {
- this.collector = collector;
- this.rand = new Random();
- counter = new AtomicInteger();
- }
- @Override
- public void nextTuple() {
- Utils.sleep(5000);
- String toSay = sentences[rand.nextInt(sentences.length)];
- int msgId = this.counter.getAndIncrement();
- toSay = "["+ msgId + "]"+ toSay;
- PrintHelper.print("Send " + toSay );
- this.collector.emit(new Values(toSay), msgId);
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("sentence"));
- }
- @Override
- public void ack(Object msgId) {
- PrintHelper.print("ack " + msgId);
- }
- @Override
- public void fail(Object msgId) {
- PrintHelper.print("fail " + msgId);
- }
- }
PrintHelper类:
- public class PrintHelper {
- private static SimpleDateFormat sf = new SimpleDateFormat("mm:ss:SSS");
- public static void print(String out){
- System.err.println(sf.format(new Date()) + " [" + Thread.currentThread().getName() + "] " + out);
- }
- }
同时把PrintBolt里面打印也换成PrintHelper.print打印
看下打印结果:
- 53:33:891 [Thread-26-spout] Send [0]ted:I'm excited
- 53:33:896 [Thread-20-print] Bolt[0] String recieved: [0]ted:I'm excited!
- 53:38:895 [Thread-26-spout] Send [1]edi:I'm happy
- 53:38:895 [Thread-22-print] Bolt[1] String recieved: [1]edi:I'm happy!
- 53:38:895 [Thread-26-spout] ack 0
- 53:43:896 [Thread-26-spout] Send [2]edi:I'm happy
- 53:43:896 [Thread-22-print] Bolt[1] String recieved: [2]edi:I'm happy!
- 53:43:896 [Thread-26-spout] ack 1
- 53:48:896 [Thread-26-spout] Send [3]edi:I'm happy
- 53:48:896 [Thread-26-spout] ack 2
- 53:48:896 [Thread-24-print] Bolt[2] String recieved: [3]edi:I'm happy!
- 53:53:896 [Thread-26-spout] Send [4]ted:I'm excited
- 53:53:896 [Thread-26-spout] ack 3
- 53:53:896 [Thread-20-print] Bolt[0] String recieved: [4]ted:I'm excited!
- 53:58:897 [Thread-26-spout] Send [5]laden:I'm dangerous
- 53:58:897 [Thread-26-spout] ack 4
- 53:58:898 [Thread-24-print] Bolt[2] String recieved: [5]laden:I'm dangerous!
很明显看到:
a. 并发度为1的Spout确实是一个线程,并发度为3的Bolt确实是三个线程;
b. 消息完全处理完成后,确实回调了ack(Object msgId)方法,而且msgId的值,即为我们emit的msgId;
c. 虽然我们在topology中定义了两个bolt,但实际上ack对于每个tuple只调用了一次;
d. spout发出tuple后,Bolt很快就完成了,但是ack直到5秒后spout醒来才打印。
Tuple树
对于Spout创建的Tuple,在topology定义的流水线中经过Bolt处理时,可能会产生一个或多个新的Tuple。源Tuple+新产生的Tuple构成了一个Tuple树。当整棵树被处理完成,才算一个Tuple被完全处理,其中任何一个节点的Tuple处理失败或超时,则整棵树失败。
超时的值,可以通过定义topology时,conf.setMessageTimeoutSecs方法指定。
Anchor
在我们例子中ExclaimRichBolt用
collector.emit(inputTule, new Values(newTupleValue));
发射一个新的tuple。
第一个参数是传入Bolt的tuple,第二个参数是新产生的tuple的value,这种emit的方式,在Storm中称为: "anchor"。
Tuple的ack
前面我们一直提到acker,看到这里,你应该能猜出acker其实就是Storm里面track一个Tuple保证其一定被处理的功能。acker也是一个component。
我们来看看acker的工作流程:
1. Spout在初始化时会产生一个tasksId;
2. Spout中创建新的Tuple,其id是一个64位的随机数;
3. Spout将新建的Tuple发送出去(给出了messageId来开启Tuple的追踪), 同时会发送一个消息到某个acker,要求acker进行追踪。该消息包含两部分:
- Spout的taskId:用户acker在整个tuple树被完全处理后找到原始的Spout进行回调ack或fail
- 一个64位的ack val值: 标志该tuple是否被完全处理。初始值为0。
3. 一个Bolt在处理完Tuple后,如果发射了一个新的anchor tuple,Storm会维护anchor tuple的列表;
4. 该Bolt调用OutputCollector.ack()时,Storm会做如下操作:
- 将anchor tuple列表中每个已经ack过的和新创建的Tuple的id做异或(XOR)。假定Spout发出的TupleID是tuple-id-0,该Bolt新生成的TupleID为tuple-id-1,那么,tuple-id-0XORtuple-id-0XOR tuple-id-1
- Storm根据该原始TupleID进行一致性hash算法,找到最开始Spout发送的那个acker,然后把上面异或后得出的ack val值发送给acker
5. acker收到新的ack val值后,与保存的原始的Tuple的id进行异或,如果为0,表示该Tuple已被完全处理,则根据其taskId找到原始的Spout,回调其ack()方法。
fail的机制类似,在发现fail后直接回调Spout的fail方法。
Storm就是通过这个acker的机制来保证数据不丢失。
回头再看看上面的打印结果,b、c两条得到很好的解释了。那d是为什么呢?
在最开始时,我曾经提到过,Storm的设计模型中,Spout是源源不断的产生数据的,所以其nextTuple()方法在任何时候不应该被打断。ack,fail 和 nextTuple是在同一个线程中完成的。
所以,虽然acker发现一个Tuple已经完全处理完成,但是由于Spout线程在Sleep,无法回调。
在设计中,我们应尽量避免在Spout、Bolt中去Sleep。如果确实需要控制,最好用异步线程来做,例如用异步线程读取数据到队列,再由Spout去取队列中数据。异步线程可以随意控制速度等。
另外,
Storm是否会自动重发失败的Tuple?
这里答案已经很明显了。fail方法如何实现取决于你自己。只有在fail中做了重发机制,才有重发。
注:Trident除外。这是Storm提供的特殊的事务性API,它确实会帮你自动重发的。
Unanchor
如果我们在Bolt中用OutputCollector.emit()发射一个新的Tuple时,并没有指定输入的Tuple(IBasicBolt的实现类用的是BasicOutPutCollector,其emit方法实际上还是调用OutputCollector.emit(),只不过内部会帮你填上输入的Tuple),那么行为称之为“Unanchor”。
是否用Unanchor方式取决于你的实现。
调整可靠性
http://blog.csdn.net/xeseo/article/details/17754825
相关推荐
自诞生之日起,Storm就以独特的设计理念、优良的产品质量表现及年轻、积极、性感的品牌形象迅速吸引了一大批品牌簇拥者。Storm仅仅用了几年时间,就跻身伦敦时尚的前沿,迅速成为时尚腕表的领航者。
下面我们将详细探讨Storm的ack机制以及它如何保证数据的可靠性。 1. **什么是Storm的Ack机制?** Ack(确认)机制是Storm中的一个核心组件,它的主要任务是跟踪和确保每个数据包(tuple)在拓扑中正确处理。当一个...
Storm框架在企业级应用中扮演着重要角色,运维和调优确保了实时数据处理的性能和可靠性。通过掌握以上知识点,可以让Storm集群在各种企业环境中高效稳定地运行,同时快速响应各种业务需求的变化。
在本主题中,我们将深入探讨Apache Storm 1.2.1版本中的“Hello World”示例,这是一个关于实时数据...这个简单的应用展示了Storm如何处理数据流并保证消息的可靠性,使得它成为大数据领域中实时分析和处理的有力工具。
- **acker**:负责确认消息处理的完整性,确保数据处理的可靠性。 3. **实时处理的优势与挑战** - 实时分析可以迅速响应业务变化,支持快速决策。 - 相比于批处理,实时处理能降低延迟,提高时效性。 - 挑战...
ZooKeeper是一个分布式应用程序协调服务,可以为分布式环境中的配置信息提供集中化的管理,保证配置信息的一致性和可靠性。在搭建Storm时,确保ZooKeeper集群的稳定性和数据一致性是非常重要的。 在安装配置好...
5. **容错机制**:讲解Storm的容错特性,如acker节点、故障恢复策略,确保数据的完全处理。 6. **监控与调试**:介绍如何监控Storm拓扑的性能,以及如何定位和解决常见问题。 7. **最佳实践**:提供实际项目中的...
【标题】"storm提交topology的过程"涉及到的是Apache Storm这一分布式实时计算系统中的核心操作——部署和运行流处理任务,即topology。Apache Storm被广泛应用于实时数据处理、在线机器学习、持续计算以及大规模...
了解这些核心概念和配置细节后,开发者能够更好地理解和优化 Storm 应用,确保数据流处理的高效性和可靠性。对于异常处理,Storm 提供了多种机制,如自动重试、故障切换和监控,以确保系统的健壮性。在实际开发过程...
**Storm记录级容错机制详解** Storm作为实时流处理框架,提供了一种高效且精确的一次处理(Exactly-once ...因此,Storm的记录级容错机制能够有效保证流处理的正确性和可靠性,为实时数据处理提供了坚实的基础。
5. **acker**:Storm提供了可靠的消息处理机制,acker组件负责确认每个tuple(数据单元)是否被完全处理。这确保了数据处理的容错性。 6. **容错性**:Apache Storm通过持久化未完成的tuples并在节点故障时重新分配...
Storm的可靠性是指Storm会告知用户每一个消息单元是否在一个指定的时间(timeout)内被完全处理。完全处理的意思是该MessageId绑定的源Tuple以及由该源Tuple衍生的所有Tuple都经过了Topology中每一个应该到达的Bolt的...
8. **容错机制**:Storm 通过检查点(checkpoints)和acker 组件实现容错。acker 负责确认 tuple 是否被完全处理,如果消息处理失败,系统会重新发送未确认的 tuple。 9. **Zookeeper 集群协调**:Storm 使用 ...
**第3章 Storm** Storm是实时大数据处理的重要框架,它为分布式、容错且可扩展的数据流处理提供了强大的支持...同时,Storm的容错机制保证了服务的稳定性和数据的可靠性,使得在大规模的WiFi数据流面前也能游刃有余。
Storm是一个分布式实时计算系统,它允许开发者处理无界数据流,具有高容错性和高性能的特点。...在实际应用中,根据业务需求调整这些配置和优化executor、worker、task的分配,可以显著提高Storm拓扑的性能和稳定性。
- **准确性**:采用Acker机制确保数据不丢失,并使用事务机制保证数据准确性。 #### 三、JStorm vs Storm JStorm相较于原始的Apache Storm,进行了多方面的优化和改进,具体包括但不限于: ##### 更稳定 - **...
4. **reliability**:可靠性是Storm的一个关键特性,它通过心跳检测、消息确认机制(ACKer)来确保数据流的处理不丢失。在这个部分,你可能能学习到如何设置ACKers,以及如何处理错误和重试策略。 5. **storm资料**...
本压缩包提供的Java代码示例主要讲解了如何在Storm中对Tuple进行分词操作,这在构建自然语言处理、文本分析等实时流处理应用时非常常见。以下是对这一主题的详细说明: 1. **Storm基础**: Storm 是一个开源的实时...