`

storm事务

阅读更多

转:http://blog.csdn.net/yangbutao/article/details/17844799

 

1、storm事务性topology的提出

对于容错机制,Storm通过一个系统级别的组件acker,结合xor校验机制判断一个msg是否发送成功,进而spout可以重发该msg,保证一个msg在出错的情况下至少被重发一次。但是在一些事务性要求比较高的场景中,需要保障一次只有一次的语义,比如需要精确统计tuple的数量等等。Storm 0.7.0引入了Transactional Topology, 它可以保证每个tuple”被且仅被处理一次”, 这样你就可以实现一种非常准确,非常可扩展,并且高度容错方式来实现计数类应用。

2、API介绍

 

IBatchBolt有三个方法

execute(Tuple tuple)

finishBatch()

prepare (java.util.Map conf, TopologyContext context, BatchOutputCollector collector,T id)

 

ITransactionalSpout有以下几个主要方法:

ITransactionalSpout.Coordinator<T> getCoordinator(java.util.Map conf,

                                                 TopologyContext context)

ITransactionalSpout.Emitter<T> getEmitter(java.util.Map conf,

                                          TopologyContext context)

 

3、事务机制原理分析

1) 对于一次只有一次的语义,从原理上来讲,需要在发送tuple的时候带上xid,在需要事务处理的时候,根据该xid是否以前已经处理成功来决定是否进行处理,当然需要把xid和处理结果一起做保存。并且需要保障顺序性,在当前请求xid提交前,所有比自己低xid请求都已经提交。

在事务处理时单个处理tuple效率比较低,因此storm中引入batch处理,一批tuple赋予一个xid,为了提高batch之间处理的并行度,storm采用了pipeline 处理的模型。参见下图pipeline模型,多个事务可以并行执行,但是commit的是严格按照顺序的。

 

对应到storm中的具体实现中,把一个batch的计算分成了两个阶段processing和commit阶段:

 

Processing阶段:多个batch可以并行计算,上面例子中bolt2是普通的batchbolt(实现BaseBatchBolt),那么多个batch在bolt2的task之间可以并行执行,比如对batch3和batch4并行执行execute或finishbatch(什么时候调用该操作,后面会介绍)方法。

Commiting阶段:batch之间强制按照顺序进行提交,上图中Bolt3实现BaseBatchBolt并且标记需要事务处理的(实现了ICommitter接口或者通过TransactionalTopologyBuilder的setCommitterBolt方法把BatchBolt添加到topology里面),那么在Storm认为可以提交(至于什么时候可以提交,后面会介绍)batch的时候调用finishbatch,在finishBatch做xid的比较以及状态保存工作。例子中batch2必须等待batch1提交后,才可以进行提交。

Storm事务性的拓扑看起来比较复杂,需要对batch的commit进行管理,错误的发现,batch的发射以及处理等等,其内部实现完全基于storm的相关底层操作进行抽象。

当使用Transactional Topologies的时候, storm为你做下面这些事情:

 

  • 管理状态: Storm把所有实现Transactional Topologies所必须的状态保存在zookeeper里面。 这包括当前transaction id以及定义每个batch的一些元数据。
  • 协调事务: Storm帮你管理所有事情, 以帮你决定在任何一个时间点是该proccessing还是该committing。
  • 错误检测: Storm利用acking框架来高效地检测什么时候一个batch被成功处理了,被成功提交了,或者失败了。Storm然后会相应地replay对应的batch。你不需要自己手动做任何acking或者anchoring — storm帮你搞定所有事情。
  • 内置的批处理API: Storm在普通bolt之上包装了一层API来提供对tuple的批处理支持。Storm管理所有的协调工作,包括决定什么时候一个bolt接收到一个特定transaction的所有tuple。Storm同时也会自动清理每个transaction所产生的中间数据。
  • 最后,需要注意的一点是Transactional Topologies需要一个可以完全重发(replay)一个特定batch的消息的队列系统(Message Queue)。storm-contrib里面的storm-kafka实现了这个。

 

事务性topology从实现上来讲,包括事务性的spout,以及事务性的bolt。

2) 事务性的spout需要实现ITransactionalSpout,这个接口包含两个内部类Coordinator和Emitter。在topology运行的时候,事务性的spout内部包含一个子的topology,类似下面这个结构:

 

其中coordinator是spout,emitter是bolt。

这里面有两种类型的tuple,一种是事务性的tuple,一种是真实batch中的tuple;

coordinator为事务性batch发射tuple,Emitter负责为每个batch实际发射tuple。

具体如下:

  • coordinator只有一个,emitter根据并行度可以有多个实例
  • emitter以all grouping(广播)的方式订阅coordinator的”batch emit”流
  • coordinator (其实是是一个内部的spout)开启一个事务准备发射一个batch时候,进入一个事务的processing阶段,会发射一个事务性tuple(transactionAttempt & metadata)到”batch emit”流

        *****说明******

       TransactionalTopology里发送的tuple都必须以TransactionAttempt作为第一个field,storm根据这个field来判断tuple属于哪一个batch。

       TransactionAttempt包含两个值:一个transaction id,一个attempt id。transaction id的作用就是我们上面介绍的对于每个batch中的tuple是唯一的

       ,而且不管这个batch    replay多少次都是一样的。attempt id是对于每个batch唯一的一个id, 但是对于同一个batch,它replay之后的attempt id跟replay之前就不一样了,

       我们可以把attempt id理解成replay-times, storm利用这个id来区别一个batch发射的tuple的不同版本

        metadata(元数据)中包含当前事务可以从哪个point进行重放数据,存放在zookeeper中的,spout可以通过Kryo从zookeeper中序列化和反序列化该元数据。

 

       **************

  • Emiter接收到这个tuble后,会进行batch tuple的发送
  • Storm通过anchoring/acking机制来检测事务是否已经完成了processing 阶段;
  • Processing阶段完成后,并且之前的transactions都已经提交了,coordinator发射一个tuble到” commit”流,进入commit阶段。
  • commiting bolts通过all grouping方式订阅该”commit”流,事务提交后,coordinator同样通过anchoring/acking机制确认已经完成了commit阶段,接收到ack后,在zookeeper上把该transaction标记为完成。

3) 事务性的Bolt继承BaseTransactionalBolt,处理batch在一起的tuples,对于每一个tuple调用调用execute方法,而在整个batch处理(processing)完成的时候调用finishBatch方法。如果BatchBolt被标记成Committer,则只能在commit阶段调用finishBolt方法。一个batch的commit阶 段由storm保证只在前一个batch成功提交之后才会执行。并且它会重试直到topology里面的所有bolt在commit完成提交。那么如何知道batch的processing完成了,也就是bolt是否接收处理了batch里面所有的tuple;在bolt内部,有一个CoordinatedBolt的模型。

 

CoordinateBolt具体原理如下:

 

CoordinateBolt具体原理如下:

 

  • 真正执行计算的bolt外面封装了一个CoordinateBolt。真正执行任务的bolt我们称为real bolt。
  • 每个CoordinateBolt记录两个值:有哪些task给我发送了tuple(根据topology的grouping信息);我要给哪些tuple发送信息(同样根据groping信息)
  •  Real bolt发出一个tuple后,其外层的CoordinateBolt会记录下这个tuple发送给哪个task了。
  • 等所有的tuple都发送完了之后,CoordinateBolt通过另外一个特殊的stream以emitDirect的方式告诉所有它发送过 tuple的task,它发送了多少tuple给这个task。下游task会将这个数字和自己已经接收到的tuple数量做对比,如果相等,则说明处理 完了所有的tuple。
  • 下游CoordinateBolt会重复上面的步骤,通知其下游。

事务性的拓扑在storm中的一个应用是Trident,它是在storm的原语和事务性的基础上做更高层次的抽象,做到一致性和恰好一次的语义,后续章节会对trident做分析。

官网:

http://storm.apache.org/documentation/Transactional-topologies.html

分享到:
评论

相关推荐

    storm事务详解(transactionTopology

    【标题】:Storm事务详解(Transaction Topology) 在分布式计算领域,Apache Storm是一个实时计算系统,它能够处理无界数据流并确保数据处理的正确性。"Storm事务详解"主要探讨的是Storm中的事务处理机制,即...

    storm事务1111

    "storm事务1111"可能指的是针对Storm的一种特定事务处理机制或实践案例。Storm的核心特性之一是其容错性,而事务处理是保证数据准确性和一致性的关键组成部分。 在Storm中,事务处理通常涉及到Trident框架,这是一...

    Storm事务性拓扑详解教程.docx

    在分布式流处理系统Apache Storm中,事务性拓扑是一个关键特性,它允许开发人员构建能够保证数据完整性和一致性的实时处理应用。事务性拓扑在Storm 0.7.0版本中引入,解决了消息可能被重复处理的问题,这对于那些...

    storm入门.pdf

    在事务性拓扑方面,Storm提供了对事务的支持,但这种事务并不等同于传统数据库中的事务。Storm的事务性拓扑主要用于确保数据处理的精确一次性交付(exactly-once processing semantics),这在某些需要极高可靠性的...

    Storm入门教程 之Storm原理和概念详解

    Storm流计算从入门到精通之技术篇(高并发策略、批处理事务、Trident精解、运维监控、企业场景) Storm入门教程 之Storm原理和概念详解,出自Storm流计算从入门到精通之技术篇,Storm入门视频教程用到技术:Storm集群...

    大数据课程体系.docx

    - **Storm事务处理**:介绍Storm如何支持事务性的数据处理任务。 - **Storm消息可靠性与容错原理**:解释Storm如何保证消息的可靠传输和容错机制。 - **Storm结合消息队列Kafka**:说明如何使用Kafka作为Storm的数据...

    细细品味Storm_Storm简介及安装

    第7章演示集成Storm和非事务型系统 的复杂性,通过集成Storm和开源探索性分析架构 Druid实现一个可配置的实时系统来分析金融事件。 第8章探讨Lambda体系结构的实现方法,讲解如何 将批处理机制和实时处理引擎结合...

    Storm笔记-PPT

    2. **事务处理**:通过 Trident API 提供了可靠的消息处理能力,确保消息的处理一致性。 3. **并发与通信机制**:每个Spout和Bolt可以有多个实例并发运行,通过TCP/IP进行进程间的通信。 **六、Storm应用场景** 1....

    基于Storm流计算天猫双十一作战室项目实战

    - **批处理事务**:介绍Storm Trident提供的批处理事务机制,提高数据处理的一致性和可靠性。 - **DRPC**:讨论Distributed Remote Procedure Call(分布式远程过程调用)的实现方法和应用场景,为复杂系统的构建...

    storm企业应用 实战 运维和调优

    - 了解Storm的事务拓扑和状态管理机制,以及它们对运维调优的影响。 7. 实时数据分析: - 利用Storm的实时分析能力进行数据聚合、窗口计算和去重等操作。 - 结合Hadoop生态系统中的其他工具(如HBase、Hive)...

    Storm源码走读笔记

    TridentTopology允许开发者以更简单的API进行实时计算任务的构建,同时支持了状态管理和事务处理。TridentTopology提交过程包括AckingFramework的创建和消息的ack确认机制,确保了消息的可靠传输。TridentTopology的...

    Getting Started with Storm

    无论是从简单的 Hello World 示例,还是到涉及多种非 JVM 语言和高级事务处理的大型应用,Storm 都能提供支持。通过本篇指南的学习,你应该能够掌握 Storm 的基础知识,并能够着手构建自己的数据流处理应用程序。

    storm0.9-源码包

    Trident 将数据流分割为一系列小的、可管理的事务。 - **检查点(Checkpointing)**: 通过定期保存状态,Trident 可以在出现故障时恢复到已知的正确状态。 5. **配置与部署** - **storm.yaml**: Storm 的配置文件...

    Storm实时数据处理.pdf

    根据提供的文件信息,“Storm实时数据处理.pdf”,我们可以深入探讨与Apache Storm相关的实时数据处理技术。 ### Apache Storm简介 Apache Storm是一种分布式实时计算系统,能够处理无界数据流,即连续不断的数据...

    Storm中涉及到的类

    在Apache Storm中,事务处理是实现可靠数据处理的关键特性,特别是在大数据流处理场景下。以下是一些关于Storm中涉及的类及其在不同事务模式中的作用的详细解释: 1. **BaseTransactionalSpout**: 这是所有事务性_...

    Go-Storm-BoltDB的简单而强大ORM框架

    它的主要特点是易于使用、性能良好,并且提供了丰富的功能,如数据模型映射、事务处理、查询构建等。Storm ORM 支持多种数据库,但在这个特定的情况下,它与BoltDB进行了集成。 BoltDB是一个由Bolt项目提供的单文件...

    apache-storm-1.1.0.tar.gz

    它通过细粒度的事务模型确保数据处理的准确性和可靠性。 7. ** Zookeeper**:Apache Storm依赖Zookeeper进行集群协调,存储元数据并维护集群状态。Zookeeper确保Nimbus和Supervisors之间的通信是可靠的。 在...

    Storm实战:构建大数据实时计算

    第6章~第8章详细而系统地讲解了几个高级特性:事务、DRPC和Trident;第9章以实例的方式讲解了Storm在实际业务场景中的应用;第10章总结了几个在大数据场景应用过程中遇到的经典问题,以及详细的排查过程。

Global site tag (gtag.js) - Google Analytics