`

Storm事务API

 
阅读更多
Spout
ITransactionalSpout<T>,同BaseTransactionalSpout<T>,普通事务Spout
IPartitionedTransactionalSpout<T>,同BasePartitionedTransactionalSpout<T>,分区事务Spout
IOpaquePartitionedTransactionalSpout<T>:同BaseOpaquePartitionedTransactionalSpout<T>,不透明分区事务Spout


Bolt
IBatchBolt<T>:同BaseBatchBolt<T>,普通批处理
BaseTransactionalBolt:事务Bolt

接口Icommitter:标识IBatchBolt 或BaseTransactionalBolt是否是一个committer CoordinatedBolt


ITransactionalSpout<T>普通事务Spout
ITransactionalSpout<T>:普通事务Spout
  -- ITransactionalSpout.Coordinator<X>
       --initializeTransaction(BigInteger txid, X prevMetadata) :创建一个新的metadata,当isReady() 为true时,发射该metadata(事务tuple)到“batch emit”流
       --isReady() :为true时启动新事务,需要时可以在此sleep

  -- ITransactionalSpout.Emitter<X>
      -- emitBatch(TransactionAttempt tx, X coordinatorMeta,        BatchOutputCollector collector) :逐个发射batch的tuple


IPartitionedTransactionalSpout<T>:分区事务Spout
IPartitionedTransactionalSpout<T>:分区事务Spout,主流事务Spout,原因是目前主流Message Queue都支持分区,分区的作用是增加MQ的吞吐量(每个分区作为一个数据源发送点),主流MQ如Kafka、RocketMQ
-- IPartitionedTransactionalSpout.Coordinator
    -- isReady() :同上    
    -- numPartitions() :返回分区个数。当增加了数据源新分区,同时一个事务被replayed ,此时则不发射新分区的tuples,因为它知道该事务中有多少个分区。

-- IPartitionedTransactionalSpout.Emitter<X>       
    --emitPartitionBatchNew(TransactionAttempt tx, BatchOutputCollector collector, int partition, X lastPartitionMeta) :发射一个新的Batch,返回Metadata
    --emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector, int partition, X partitionMeta) :如果这批消息Bolt消费失败了,emitPartitionBatch负责重发这批消息

IOpaquePartitionedTransactionalSpout:不透明分区事务Spout
IOpaquePartitionedTransactionalSpout<T>:不透明分区事务Spout
  --IOpaquePartitionedTransactionalSpout.Coordinator
       --isReady() :同上   --IOpaquePartitionedTransactionalSpout.Emitter<X>
       -- emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector, int partition, X lastPartitionMeta)
       -- numPartitions()

它不区分发新消息还是重发旧消息,全部用emitPartitionBatch搞定。虽然emitPartitionBatch返回的X应该是下一批次供自己使用的(emitPartitionBatch的第4个参数),但是只有一个批次成功以后X才会更新到ZooKeeper中,如果失败重发,emitPartitionBatch读取的X还是旧的。所以这时候自定义的X不需要记录当前批次的开始位置和下一批次的开始位置两个值,只需要记录下一批次开始位置一个值即可,例如:
public class BatchMeta {      
   public long  nextOffset; //下一批次的偏移量    
}





分享到:
评论

相关推荐

    storm企业应用 实战 运维和调优

    - 掌握Storm Trident API使用,它为高级流处理提供了状态管理和事务处理机制。 - 了解Storm的事务拓扑和状态管理机制,以及它们对运维调优的影响。 7. 实时数据分析: - 利用Storm的实时分析能力进行数据聚合、...

    storm0.9-源码包

    在 Storm 0.9 源码包中,我们可以深入理解其内部工作原理,以及如何利用 Storm 的 API 进行实时流处理应用开发。源码分析对于开发者来说,是提升技能和优化应用的关键步骤。 1. **核心组件** - **Bolt**: Bolt 是 ...

    Storm笔记-PPT

    2. **事务处理**:通过 Trident API 提供了可靠的消息处理能力,确保消息的处理一致性。 3. **并发与通信机制**:每个Spout和Bolt可以有多个实例并发运行,通过TCP/IP进行进程间的通信。 **六、Storm应用场景** 1....

    基于Storm流计算天猫双十一作战室项目实战

    Storm Trident作为Storm的一个高级API,提供了更高级别的抽象和事务支持,使得复杂的数据处理变得更加简单和可靠。 **2. Storm的全面讲解** - **深度解析**:课程不仅覆盖了Storm的基础概念和架构,还深入探讨了其...

    Storm源码走读笔记

    TridentTopology允许开发者以更简单的API进行实时计算任务的构建,同时支持了状态管理和事务处理。TridentTopology提交过程包括AckingFramework的创建和消息的ack确认机制,确保了消息的可靠传输。TridentTopology的...

    apache-storm-1.1.0.tar.gz

    6. ** Trident**:Trident是Storm提供的高级API,用于构建强一致性的数据处理应用。它通过细粒度的事务模型确保数据处理的准确性和可靠性。 7. ** Zookeeper**:Apache Storm依赖Zookeeper进行集群协调,存储元数据...

    Go-Storm-BoltDB的简单而强大ORM框架

    它的主要特点是易于使用、性能良好,并且提供了丰富的功能,如数据模型映射、事务处理、查询构建等。Storm ORM 支持多种数据库,但在这个特定的情况下,它与BoltDB进行了集成。 BoltDB是一个由Bolt项目提供的单文件...

    apache-storm-2.1.0.tar.gz

    2. **Spout**:Spout 是 Storm 拓扑的输入源,它可以是任何类型的数据源,如数据库、消息队列或者API。Spout负责从这些源头读取数据并以流的形式分发到Bolt。 3. **Bolt**:Bolt 执行实际的数据处理任务,如清洗、...

    Apache+Storm+快速起步.pdf 亲测 好评哦

    Storm通过其丰富的API和高可扩展性,为开发者提供了构建复杂实时数据处理应用的强大工具。无论是在小规模的测试还是在大规模的生产环境中,Storm都显示出其强大的实时数据处理能力。随着数据时代的到来,Storm技术的...

    实时计算Storm核心技术及其在报文系统中的应用.pdf

    Trident是Storm提供的高级API,提供了一种更抽象的处理模型,保证每个数据元精确处理一次,简化了开发复杂实时应用的难度。 3.3 DRPC(Distributed Remote Procedure Call) DRPC允许进行分布式远程过程调用,使得...

    Storm实战构建大数据实时计算( 带书签目录 高清完整版)

    Trident是Storm提供的高级API,它提供了更强大的状态管理和事务性保证。Trident将一个复杂的实时处理任务分解为一系列简单的操作,这些操作在数据流上以原子方式执行。 5. **容错机制** Storm通过检查点和故障...

    实时计算平台STORM流式数据核心技术与报文系统.pdf

    - **事务处理**:支持事务性消息处理,保证数据的一致性和完整性。 - **Trident API**:提供了一种更高级别的抽象,使得构建复杂、可靠的实时应用程序更加简单。 - **DRPC(Distributed Remote Procedure Call)*...

    spark Streaming和storm的对比

    Storm还支持事务性topology,确保每个元组“被且仅被处理一次”。此外,Storm允许在运行时水平扩展,即在不中断服务的情况下增加或减少节点来处理更多的负载。 在对比两者的性能时,Spark Streaming具有以下优势: ...

    从零开始学习storm最新版

    Trident是Storm的一个高级接口,它提供了状态管理和事务性保证,使得实时处理更加稳定可靠。 书中还会涉及Storm的监控与运维,包括如何使用Storm UI查看拓扑运行状态、如何调整拓扑参数优化性能,以及如何处理常见...

    trident-gcd:Storm Trident API 的 Google Cloud Datastore 状态实现

    Trident-GCD是一个开源项目,它为Apache Storm的Trident API提供了一种集成Google Cloud Datastore的状态管理实现。Trident是Storm的一个高级接口,用于构建复杂的数据处理管道,而Google Cloud Datastore则是一个...

    hadoop、storm、spark的区别对比

    Storm通过消息的ack机制保证了消息至少处理一次,支持事务拓扑。Spark提供了一种弹性分布式数据集(RDD)的概念,可以记录数据的转换过程,实现高效的容错。 总体来说,这三个框架各自有所长。Hadoop适合于需要处理...

    storm资源分享

    在实际操作中,你将学习到如何使用Clojure或Java API来编写Storm应用程序,并通过Zookeeper进行集群协调。 其次,"storm原理介绍.doc"文件则深入探讨了Storm的工作原理。它可能涵盖了Storm的容错机制,如ackers如何...

    开源项目-asdine-storm.zip

    3. **事务处理**:ORM通常会包含对数据库事务的支持,Storm可能也提供了事务管理,确保数据操作的一致性和完整性。 4. **数据类型转换**:Storm可能自动处理Go语言数据类型与BoltDB内部数据表示之间的转换,使得...

    apache-storm-1.2.3.rar

    - **API更新**:可能引入了新的API或改进了现有API,使开发更加简便和灵活。 - **多语言支持**:除了Java,Storm还支持其他编程语言,如Python和Clojure,允许开发者选择最合适的工具进行开发。 在解压“apache-...

Global site tag (gtag.js) - Google Analytics